Coverage for src / bluetooth_sig / gatt / characteristics / plx_continuous_measurement.py: 78%
148 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-03 16:41 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-03 16:41 +0000
1"""PLX Continuous Measurement characteristic implementation."""
3from __future__ import annotations
5import logging
6from enum import IntFlag
7from typing import Any, ClassVar
9import msgspec
11from ...types.gatt_enums import CharacteristicName
12from ..context import CharacteristicContext
13from .base import BaseCharacteristic
14from .plx_features import PLXFeatureFlags, PLXFeaturesCharacteristic
15from .utils import DataParser, IEEE11073Parser
17logger = logging.getLogger(__name__)
19_PLX_CONTINUOUS_MIN_BYTES = 5 # Flags(1) + SpO2(2) + PulseRate(2)
22class PLXContinuousFlags(IntFlag):
23 """PLX Continuous measurement flags (Table 3.7 PLXS v1.0.1)."""
25 SPO2PR_FAST_PRESENT = 0x01 # Bit 0: SpO2PR-Fast field is present
26 SPO2PR_SLOW_PRESENT = 0x02 # Bit 1: SpO2PR-Slow field is present
27 MEASUREMENT_STATUS_PRESENT = 0x04 # Bit 2: Measurement Status field is present
28 DEVICE_AND_SENSOR_STATUS_PRESENT = 0x08 # Bit 3: Device and Sensor Status field is present
29 PULSE_AMPLITUDE_INDEX_PRESENT = 0x10 # Bit 4: Pulse Amplitude Index field is present
32class PLXMeasurementStatus(IntFlag):
33 """PLX Measurement Status flags (16-bit, Table 3.4 PLXS v1.0.1).
35 Bits 0-4 are RFU. Status bits start at bit 5.
36 """
38 MEASUREMENT_ONGOING = 0x0020 # Bit 5
39 EARLY_ESTIMATED_DATA = 0x0040 # Bit 6
40 VALIDATED_DATA = 0x0080 # Bit 7
41 FULLY_QUALIFIED_DATA = 0x0100 # Bit 8
42 DATA_FROM_MEASUREMENT_STORAGE = 0x0200 # Bit 9
43 DATA_FOR_DEMONSTRATION = 0x0400 # Bit 10
44 DATA_FOR_TESTING = 0x0800 # Bit 11
45 CALIBRATION_ONGOING = 0x1000 # Bit 12
46 MEASUREMENT_UNAVAILABLE = 0x2000 # Bit 13
47 QUESTIONABLE_MEASUREMENT_DETECTED = 0x4000 # Bit 14
48 INVALID_MEASUREMENT_DETECTED = 0x8000 # Bit 15
51class PLXDeviceAndSensorStatus(IntFlag):
52 """PLX Device and Sensor Status flags (24-bit, Table 3.5 PLXS v1.0.1)."""
54 EXTENDED_DISPLAY_UPDATE_ONGOING = 0x000001 # Bit 0
55 EQUIPMENT_MALFUNCTION_DETECTED = 0x000002 # Bit 1
56 SIGNAL_PROCESSING_IRREGULARITY = 0x000004 # Bit 2
57 INADEQUATE_SIGNAL_DETECTED = 0x000008 # Bit 3
58 POOR_SIGNAL_DETECTED = 0x000010 # Bit 4
59 LOW_PERFUSION_DETECTED = 0x000020 # Bit 5
60 ERRATIC_SIGNAL_DETECTED = 0x000040 # Bit 6
61 NON_PULSATILE_SIGNAL_DETECTED = 0x000080 # Bit 7
62 QUESTIONABLE_PULSE_DETECTED = 0x000100 # Bit 8
63 SIGNAL_ANALYSIS_ONGOING = 0x000200 # Bit 9
64 SENSOR_INTERFERENCE_DETECTED = 0x000400 # Bit 10
65 SENSOR_UNCONNECTED_TO_USER = 0x000800 # Bit 11
66 UNKNOWN_SENSOR_CONNECTED = 0x001000 # Bit 12
67 SENSOR_DISPLACED = 0x002000 # Bit 13
68 SENSOR_MALFUNCTIONING = 0x004000 # Bit 14
69 SENSOR_DISCONNECTED = 0x008000 # Bit 15
72class PLXContinuousData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods
73 """Parsed PLX continuous measurement data (Table 3.6 PLXS v1.0.1)."""
75 continuous_flags: PLXContinuousFlags
76 spo2: float # SpO2PR-Normal SpO2 (mandatory)
77 pulse_rate: float # SpO2PR-Normal PR (mandatory)
78 spo2_fast: float | None = None # SpO2PR-Fast SpO2 (optional, bit 0)
79 pulse_rate_fast: float | None = None # SpO2PR-Fast PR (optional, bit 0)
80 spo2_slow: float | None = None # SpO2PR-Slow SpO2 (optional, bit 1)
81 pulse_rate_slow: float | None = None # SpO2PR-Slow PR (optional, bit 1)
82 measurement_status: PLXMeasurementStatus | None = None
83 device_and_sensor_status: PLXDeviceAndSensorStatus | None = None
84 pulse_amplitude_index: float | None = None
85 supported_features: PLXFeatureFlags | None = None
88class PLXContinuousMeasurementCharacteristic(BaseCharacteristic[PLXContinuousData]):
89 """PLX Continuous Measurement characteristic (0x2A5F).
91 Used to transmit continuous SpO2 (blood oxygen saturation) and pulse rate
92 measurements from pulse oximetry devices.
94 Format (Table 3.6): Flags(1) + SpO2PR-Normal(4) + [SpO2PR-Fast(4)] +
95 [SpO2PR-Slow(4)] + [Measurement Status(2)] + [Device and Sensor Status(3)] +
96 [Pulse Amplitude Index(2)]
97 """
99 _characteristic_name: str = "PLX Continuous Measurement"
101 _optional_dependencies: ClassVar[list[type[BaseCharacteristic[Any]]]] = [PLXFeaturesCharacteristic]
103 # Declarative validation
104 min_length: int | None = _PLX_CONTINUOUS_MIN_BYTES # Flags(1) + SpO2(2) + PulseRate(2) minimum
105 max_length: int | None = 20 # + Fast(4) + Slow(4) + MeasStatus(2) + DevSensStatus(3) + PAI(2)
106 allow_variable_length: bool = True
108 def _decode_value( # pylint: disable=too-many-locals,too-many-branches
109 self, data: bytearray, ctx: CharacteristicContext | None = None, *, validate: bool = True
110 ) -> PLXContinuousData:
111 """Parse PLX continuous measurement data per PLXS v1.0.1.
113 Format (Table 3.6): Flags(1) + SpO2PR-Normal(4) + [SpO2PR-Fast(4)] +
114 [SpO2PR-Slow(4)] + [Measurement Status(2)] + [Device and Sensor Status(3)] +
115 [Pulse Amplitude Index(2)]
116 All SpO2/PR fields are IEEE-11073 16-bit SFLOAT.
117 """
118 if validate and len(data) < _PLX_CONTINUOUS_MIN_BYTES:
119 raise ValueError(
120 f"Insufficient data for PLX continuous measurement: {len(data)} < {_PLX_CONTINUOUS_MIN_BYTES}"
121 )
123 offset = 0
125 # Parse flags
126 continuous_flags = PLXContinuousFlags(data[offset])
127 offset += 1
129 # Parse SpO2PR-Normal (mandatory): SpO2 + PR — IEEE-11073 SFLOAT
130 spo2 = IEEE11073Parser.parse_sfloat(data, offset)
131 offset += 2
133 pulse_rate = IEEE11073Parser.parse_sfloat(data, offset)
134 offset += 2
136 # Parse optional SpO2PR-Fast (4 bytes) — bit 0
137 spo2_fast: float | None = None
138 pulse_rate_fast: float | None = None
139 if continuous_flags & PLXContinuousFlags.SPO2PR_FAST_PRESENT:
140 if validate and offset + 4 > len(data):
141 raise ValueError(f"Not enough data for SpO2PR-Fast: {len(data)} < {offset + 4}")
142 spo2_fast = IEEE11073Parser.parse_sfloat(data, offset)
143 offset += 2
144 pulse_rate_fast = IEEE11073Parser.parse_sfloat(data, offset)
145 offset += 2
147 # Parse optional SpO2PR-Slow (4 bytes) — bit 1
148 spo2_slow: float | None = None
149 pulse_rate_slow: float | None = None
150 if continuous_flags & PLXContinuousFlags.SPO2PR_SLOW_PRESENT:
151 if validate and offset + 4 > len(data):
152 raise ValueError(f"Not enough data for SpO2PR-Slow: {len(data)} < {offset + 4}")
153 spo2_slow = IEEE11073Parser.parse_sfloat(data, offset)
154 offset += 2
155 pulse_rate_slow = IEEE11073Parser.parse_sfloat(data, offset)
156 offset += 2
158 # Parse optional Measurement Status (2 bytes) — bit 2
159 measurement_status = None
160 if continuous_flags & PLXContinuousFlags.MEASUREMENT_STATUS_PRESENT:
161 if validate and offset + 2 > len(data):
162 raise ValueError(f"Not enough data for measurement status: {len(data)} < {offset + 2}")
163 measurement_status = PLXMeasurementStatus(DataParser.parse_int16(data, offset, signed=False))
164 offset += 2
166 # Parse optional Device and Sensor Status (3 bytes) — bit 3
167 device_and_sensor_status = None
168 if continuous_flags & PLXContinuousFlags.DEVICE_AND_SENSOR_STATUS_PRESENT:
169 if validate and offset + 3 > len(data):
170 raise ValueError(f"Not enough data for device/sensor status: {len(data)} < {offset + 3}")
171 device_and_sensor_status = PLXDeviceAndSensorStatus(
172 DataParser.parse_int32(data[offset : offset + 3] + b"\x00", 0, signed=False)
173 )
174 offset += 3
176 # Parse optional Pulse Amplitude Index (2 bytes) — bit 4
177 pulse_amplitude_index = None
178 if continuous_flags & PLXContinuousFlags.PULSE_AMPLITUDE_INDEX_PRESENT:
179 if validate and offset + 2 > len(data):
180 raise ValueError(f"Not enough data for pulse amplitude index: {len(data)} < {offset + 2}")
181 pulse_amplitude_index = IEEE11073Parser.parse_sfloat(data, offset)
182 offset += 2
184 # Context enhancement: PLX Features
185 supported_features: PLXFeatureFlags | None = None
186 if ctx:
187 plx_features_value = self.get_context_characteristic(ctx, CharacteristicName.PLX_FEATURES)
188 if plx_features_value is not None:
189 supported_features = plx_features_value
191 return PLXContinuousData(
192 continuous_flags=continuous_flags,
193 spo2=spo2,
194 pulse_rate=pulse_rate,
195 spo2_fast=spo2_fast,
196 pulse_rate_fast=pulse_rate_fast,
197 spo2_slow=spo2_slow,
198 pulse_rate_slow=pulse_rate_slow,
199 measurement_status=measurement_status,
200 device_and_sensor_status=device_and_sensor_status,
201 pulse_amplitude_index=pulse_amplitude_index,
202 supported_features=supported_features,
203 )
205 def _encode_value(self, data: PLXContinuousData) -> bytearray:
206 """Encode PLX continuous measurement data."""
207 result = bytearray()
209 # Encode flags
210 result.append(int(data.continuous_flags))
212 # Encode SpO2PR-Normal (mandatory): SpO2 + PR
213 result.extend(IEEE11073Parser.encode_sfloat(data.spo2))
214 result.extend(IEEE11073Parser.encode_sfloat(data.pulse_rate))
216 # Encode optional SpO2PR-Fast
217 if data.continuous_flags & PLXContinuousFlags.SPO2PR_FAST_PRESENT:
218 if data.spo2_fast is None or data.pulse_rate_fast is None:
219 raise ValueError("SpO2PR-Fast flag set but values are None")
220 result.extend(IEEE11073Parser.encode_sfloat(data.spo2_fast))
221 result.extend(IEEE11073Parser.encode_sfloat(data.pulse_rate_fast))
223 # Encode optional SpO2PR-Slow
224 if data.continuous_flags & PLXContinuousFlags.SPO2PR_SLOW_PRESENT:
225 if data.spo2_slow is None or data.pulse_rate_slow is None:
226 raise ValueError("SpO2PR-Slow flag set but values are None")
227 result.extend(IEEE11073Parser.encode_sfloat(data.spo2_slow))
228 result.extend(IEEE11073Parser.encode_sfloat(data.pulse_rate_slow))
230 # Encode optional Measurement Status
231 if data.continuous_flags & PLXContinuousFlags.MEASUREMENT_STATUS_PRESENT:
232 if data.measurement_status is None:
233 raise ValueError("Measurement Status flag set but value is None")
234 result.extend(int(data.measurement_status).to_bytes(2, byteorder="little", signed=False))
236 # Encode optional Device and Sensor Status
237 if data.continuous_flags & PLXContinuousFlags.DEVICE_AND_SENSOR_STATUS_PRESENT:
238 if data.device_and_sensor_status is None:
239 raise ValueError("Device/Sensor Status flag set but value is None")
240 status_val = int(data.device_and_sensor_status)
241 result.append(status_val & 0xFF)
242 result.append((status_val >> 8) & 0xFF)
243 result.append((status_val >> 16) & 0xFF)
245 # Encode optional Pulse Amplitude Index
246 if data.continuous_flags & PLXContinuousFlags.PULSE_AMPLITUDE_INDEX_PRESENT:
247 if data.pulse_amplitude_index is None:
248 raise ValueError("Pulse Amplitude Index flag set but value is None")
249 result.extend(IEEE11073Parser.encode_sfloat(data.pulse_amplitude_index))
251 return result