Coverage for src / bluetooth_sig / gatt / characteristics / pulse_oximetry_measurement.py: 85%
80 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
1"""Pulse Oximetry Measurement characteristic implementation."""
3from __future__ import annotations
5import logging
6from datetime import datetime
7from enum import IntFlag
8from typing import Any, ClassVar
10import msgspec
12from ...types.gatt_enums import CharacteristicName
13from ..context import CharacteristicContext
14from .base import BaseCharacteristic
15from .plx_features import PLXFeatureFlags, PLXFeaturesCharacteristic
16from .utils import DataParser, IEEE11073Parser
18logger = logging.getLogger(__name__)
21class PulseOximetryFlags(IntFlag):
22 """Pulse Oximetry measurement flags."""
24 TIMESTAMP_PRESENT = 0x01
25 MEASUREMENT_STATUS_PRESENT = 0x02
26 DEVICE_STATUS_PRESENT = 0x04
27 PULSE_AMPLITUDE_INDEX_PRESENT = 0x08
30class PulseOximetryData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods
31 """Parsed pulse oximetry measurement data.
33 Attributes:
34 spo2: Blood oxygen saturation percentage (SpO2)
35 pulse_rate: Pulse rate in beats per minute
36 timestamp: Optional timestamp of the measurement
37 measurement_status: Optional measurement status flags
38 device_status: Optional device status flags
39 pulse_amplitude_index: Optional pulse amplitude index value
40 supported_features: Optional PLX features from context (PLXFeatureFlags enum)
41 """
43 spo2: float
44 pulse_rate: float
45 timestamp: datetime | None = None
46 measurement_status: int | None = None
47 device_status: int | None = None
48 pulse_amplitude_index: float | None = None
49 supported_features: PLXFeatureFlags | None = None # PLX Features from context (0x2A60)
52class PulseOximetryMeasurementCharacteristic(BaseCharacteristic[PulseOximetryData]):
53 """PLX Continuous Measurement characteristic (0x2A5F).
55 Used to transmit SpO2 (blood oxygen saturation) and pulse rate
56 measurements.
57 """
59 _characteristic_name: str = "PLX Continuous Measurement"
61 _optional_dependencies: ClassVar[list[type[BaseCharacteristic[Any]]]] = [PLXFeaturesCharacteristic]
63 # Declarative validation (automatic)
64 min_length: int | None = 5 # Flags(1) + SpO2(2) + PulseRate(2) minimum
65 max_length: int | None = 16 # + Timestamp(7) + MeasurementStatus(2) + DeviceStatus(3) maximum
66 allow_variable_length: bool = True # Variable optional fields
68 def _decode_value( # pylint: disable=too-many-locals,too-many-branches # Complexity needed for spec parsing
69 self, data: bytearray, ctx: CharacteristicContext | None = None, *, validate: bool = True
70 ) -> PulseOximetryData:
71 """Parse pulse oximetry measurement data according to Bluetooth specification.
73 Format: Flags(1) + SpO2(2) + Pulse Rate(2) + [Timestamp(7)] +
74 [Measurement Status(2)] + [Device Status(3)] + [Pulse Amplitude Index(2)]
75 SpO2 and Pulse Rate are IEEE-11073 16-bit SFLOAT.
77 Context Enhancement:
78 If ctx is provided, this method will attempt to enhance the parsed data with:
79 - PLX Features (0x2A60): Device capabilities and supported measurement types
81 Args:
82 data: Raw bytearray from BLE characteristic.
83 ctx: Optional CharacteristicContext providing surrounding context (may be None).
84 validate: Whether to validate ranges (default True)
86 Returns:
87 PulseOximetryData containing parsed pulse oximetry data with optional
88 context-enhanced information.
90 """
91 flags = PulseOximetryFlags(data[0])
93 # Parse SpO2 and pulse rate using IEEE-11073 SFLOAT format
94 spo2 = IEEE11073Parser.parse_sfloat(data, 1)
95 pulse_rate = IEEE11073Parser.parse_sfloat(data, 3)
97 # Parse optional fields
98 timestamp: datetime | None = None
99 measurement_status: int | None = None
100 device_status: int | None = None
101 pulse_amplitude_index: float | None = None
102 offset = 5
104 if PulseOximetryFlags.TIMESTAMP_PRESENT in flags and len(data) >= offset + 7:
105 timestamp = IEEE11073Parser.parse_timestamp(data, offset)
106 offset += 7
108 if PulseOximetryFlags.MEASUREMENT_STATUS_PRESENT in flags and len(data) >= offset + 2:
109 measurement_status = DataParser.parse_int16(data, offset, signed=False)
110 offset += 2
112 if PulseOximetryFlags.DEVICE_STATUS_PRESENT in flags and len(data) >= offset + 3:
113 device_status = DataParser.parse_int32(
114 data[offset : offset + 3] + b"\x00", 0, signed=False
115 ) # Pad to 4 bytes
116 offset += 3
118 if PulseOximetryFlags.PULSE_AMPLITUDE_INDEX_PRESENT in flags and len(data) >= offset + 2:
119 pulse_amplitude_index = IEEE11073Parser.parse_sfloat(data, offset)
121 # Context enhancement: PLX Features
122 supported_features: PLXFeatureFlags | None = None
123 if ctx:
124 plx_features_value = self.get_context_characteristic(ctx, CharacteristicName.PLX_FEATURES)
125 if plx_features_value is not None:
126 # PLX Features returns PLXFeatureFlags enum
127 supported_features = plx_features_value
129 # Create immutable struct with all values
130 return PulseOximetryData(
131 spo2=spo2,
132 pulse_rate=pulse_rate,
133 timestamp=timestamp,
134 measurement_status=measurement_status,
135 device_status=device_status,
136 pulse_amplitude_index=pulse_amplitude_index,
137 supported_features=supported_features,
138 )
140 def _encode_value(self, data: PulseOximetryData) -> bytearray:
141 """Encode pulse oximetry measurement value back to bytes.
143 Args:
144 data: PulseOximetryData instance to encode
146 Returns:
147 Encoded bytes representing the measurement
149 """
150 # Build flags
151 flags = PulseOximetryFlags(0)
152 if data.timestamp is not None:
153 flags |= PulseOximetryFlags.TIMESTAMP_PRESENT
154 if data.measurement_status is not None:
155 flags |= PulseOximetryFlags.MEASUREMENT_STATUS_PRESENT
156 if data.device_status is not None:
157 flags |= PulseOximetryFlags.DEVICE_STATUS_PRESENT
158 if data.pulse_amplitude_index is not None:
159 flags |= PulseOximetryFlags.PULSE_AMPLITUDE_INDEX_PRESENT
161 # Build result
162 result = bytearray([int(flags)])
163 result.extend(IEEE11073Parser.encode_sfloat(data.spo2))
164 result.extend(IEEE11073Parser.encode_sfloat(data.pulse_rate))
166 # Encode optional timestamp
167 if data.timestamp is not None:
168 result.extend(IEEE11073Parser.encode_timestamp(data.timestamp))
170 # Encode optional measurement status
171 if data.measurement_status is not None:
172 result.extend(DataParser.encode_int16(data.measurement_status, signed=False))
174 # Encode optional device status
175 if data.device_status is not None:
176 # Device status is 3 bytes (24-bit value)
177 device_status_bytes = DataParser.encode_int32(data.device_status, signed=False)[:3]
178 result.extend(device_status_bytes)
180 # Encode optional pulse amplitude index
181 if data.pulse_amplitude_index is not None:
182 result.extend(IEEE11073Parser.encode_sfloat(data.pulse_amplitude_index))
184 return result