Coverage for src / bluetooth_sig / gatt / characteristics / plx_spot_check_measurement.py: 91%
103 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-03 16:41 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-03 16:41 +0000
1"""PLX Spot-Check Measurement characteristic implementation."""
3from __future__ import annotations
5import logging
6from datetime import datetime
7from enum import IntFlag
8from typing import Any, ClassVar
10import msgspec
12from ...types.gatt_enums import CharacteristicName
13from ..context import CharacteristicContext
14from .base import BaseCharacteristic
15from .plx_features import PLXFeatureFlags, PLXFeaturesCharacteristic
16from .utils import DataParser, IEEE11073Parser
18logger = logging.getLogger(__name__)
21class PLXSpotCheckFlags(IntFlag):
22 """PLX Spot-Check measurement flags (Table 3.3 PLXS v1.0.1)."""
24 TIMESTAMP_PRESENT = 0x01 # Bit 0: Timestamp field is present
25 MEASUREMENT_STATUS_PRESENT = 0x02 # Bit 1: Measurement Status field is present
26 DEVICE_AND_SENSOR_STATUS_PRESENT = 0x04 # Bit 2: Device and Sensor Status field is present
27 PULSE_AMPLITUDE_INDEX_PRESENT = 0x08 # Bit 3: Pulse Amplitude Index field is present
28 DEVICE_CLOCK_NOT_SET = 0x10 # Bit 4: Device Clock is Not Set
31class PLXMeasurementStatus(IntFlag):
32 """PLX Measurement Status flags (16-bit, Table 3.4 PLXS v1.0.1).
34 Bits 0-4 are RFU. Status bits start at bit 5.
35 """
37 MEASUREMENT_ONGOING = 0x0020 # Bit 5
38 EARLY_ESTIMATED_DATA = 0x0040 # Bit 6
39 VALIDATED_DATA = 0x0080 # Bit 7
40 FULLY_QUALIFIED_DATA = 0x0100 # Bit 8
41 DATA_FROM_MEASUREMENT_STORAGE = 0x0200 # Bit 9
42 DATA_FOR_DEMONSTRATION = 0x0400 # Bit 10
43 DATA_FOR_TESTING = 0x0800 # Bit 11
44 CALIBRATION_ONGOING = 0x1000 # Bit 12
45 MEASUREMENT_UNAVAILABLE = 0x2000 # Bit 13
46 QUESTIONABLE_MEASUREMENT_DETECTED = 0x4000 # Bit 14
47 INVALID_MEASUREMENT_DETECTED = 0x8000 # Bit 15
50class PLXDeviceAndSensorStatus(IntFlag):
51 """PLX Device and Sensor Status flags (24-bit, Table 3.5 PLXS v1.0.1)."""
53 EXTENDED_DISPLAY_UPDATE_ONGOING = 0x000001 # Bit 0
54 EQUIPMENT_MALFUNCTION_DETECTED = 0x000002 # Bit 1
55 SIGNAL_PROCESSING_IRREGULARITY = 0x000004 # Bit 2
56 INADEQUATE_SIGNAL_DETECTED = 0x000008 # Bit 3
57 POOR_SIGNAL_DETECTED = 0x000010 # Bit 4
58 LOW_PERFUSION_DETECTED = 0x000020 # Bit 5
59 ERRATIC_SIGNAL_DETECTED = 0x000040 # Bit 6
60 NON_PULSATILE_SIGNAL_DETECTED = 0x000080 # Bit 7
61 QUESTIONABLE_PULSE_DETECTED = 0x000100 # Bit 8
62 SIGNAL_ANALYSIS_ONGOING = 0x000200 # Bit 9
63 SENSOR_INTERFERENCE_DETECTED = 0x000400 # Bit 10
64 SENSOR_UNCONNECTED_TO_USER = 0x000800 # Bit 11
65 UNKNOWN_SENSOR_CONNECTED = 0x001000 # Bit 12
66 SENSOR_DISPLACED = 0x002000 # Bit 13
67 SENSOR_MALFUNCTIONING = 0x004000 # Bit 14
68 SENSOR_DISCONNECTED = 0x008000 # Bit 15
71class PLXSpotCheckData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods
72 """Parsed PLX spot-check measurement data (Table 3.2 PLXS v1.0.1)."""
74 spot_check_flags: PLXSpotCheckFlags # PLX spot-check measurement flags
75 spo2: float # Blood oxygen saturation percentage (SpO2) — SFLOAT
76 pulse_rate: float # Pulse rate in beats per minute — SFLOAT
77 timestamp: datetime | None = None # Optional DateTime (7 octets) per Table 3.3 bit 0
78 measurement_status: PLXMeasurementStatus | None = None # Optional measurement status flags (16-bit)
79 device_and_sensor_status: PLXDeviceAndSensorStatus | None = None # Optional device/sensor status (24-bit)
80 pulse_amplitude_index: float | None = None # Optional pulse amplitude index (SFLOAT, %)
81 supported_features: PLXFeatureFlags | None = None # Optional PLX features from context
84class PLXSpotCheckMeasurementCharacteristic(BaseCharacteristic[PLXSpotCheckData]):
85 """PLX Spot-Check Measurement characteristic (0x2A5E).
87 Used to transmit single SpO2 (blood oxygen saturation) and pulse rate
88 measurements from spot-check readings.
89 """
91 _characteristic_name: str = "PLX Spot-Check Measurement"
93 _optional_dependencies: ClassVar[list[type[BaseCharacteristic[Any]]]] = [PLXFeaturesCharacteristic]
95 # Declarative validation (automatic)
96 min_length: int | None = 5 # Flags(1) + SpO2(2) + PulseRate(2) minimum
97 max_length: int | None = (
98 19 # + Timestamp(7) + MeasurementStatus(2) + DeviceAndSensorStatus(3) + PulseAmplitudeIndex(2)
99 )
100 allow_variable_length: bool = True # Variable optional fields
102 def _decode_value( # pylint: disable=too-many-locals,too-many-branches # Complexity needed for spec parsing
103 self, data: bytearray, ctx: CharacteristicContext | None = None, *, validate: bool = True
104 ) -> PLXSpotCheckData:
105 """Parse PLX spot-check measurement data per PLXS v1.0.1.
107 Format (Table 3.2): Flags(1) + SpO2(2) + PR(2) + [Timestamp(7)] +
108 [Measurement Status(2)] + [Device and Sensor Status(3)] + [Pulse Amplitude Index(2)]
109 SpO2 and Pulse Rate are IEEE-11073 16-bit SFLOAT.
111 Args:
112 data: Raw bytearray from BLE characteristic.
113 ctx: Optional CharacteristicContext providing surrounding context (may be None).
114 validate: Whether to validate ranges (default True)
116 Returns:
117 PLXSpotCheckData containing parsed PLX spot-check data.
119 """
120 flags = PLXSpotCheckFlags(data[0])
122 # Parse SpO2 and pulse rate using IEEE-11073 SFLOAT format
123 spo2 = IEEE11073Parser.parse_sfloat(data, 1)
124 pulse_rate = IEEE11073Parser.parse_sfloat(data, 3)
126 # Parse optional fields in order per Table 3.2
127 timestamp: datetime | None = None
128 measurement_status: PLXMeasurementStatus | None = None
129 device_and_sensor_status: PLXDeviceAndSensorStatus | None = None
130 pulse_amplitude_index: float | None = None
131 offset = 5
133 # Timestamp (7 octets DateTime) — Table 3.3 bit 0
134 if PLXSpotCheckFlags.TIMESTAMP_PRESENT in flags and len(data) >= offset + 7:
135 timestamp = IEEE11073Parser.parse_timestamp(data, offset)
136 offset += 7
138 if PLXSpotCheckFlags.MEASUREMENT_STATUS_PRESENT in flags and len(data) >= offset + 2:
139 measurement_status = PLXMeasurementStatus(DataParser.parse_int16(data, offset, signed=False))
140 offset += 2
142 if PLXSpotCheckFlags.DEVICE_AND_SENSOR_STATUS_PRESENT in flags and len(data) >= offset + 3:
143 device_and_sensor_status = PLXDeviceAndSensorStatus(
144 DataParser.parse_int32(data[offset : offset + 3] + b"\x00", 0, signed=False)
145 ) # Pad to 4 bytes
146 offset += 3
148 if PLXSpotCheckFlags.PULSE_AMPLITUDE_INDEX_PRESENT in flags and len(data) >= offset + 2:
149 pulse_amplitude_index = IEEE11073Parser.parse_sfloat(data, offset)
151 # Context enhancement: PLX Features
152 supported_features: PLXFeatureFlags | None = None
153 if ctx:
154 plx_features_value = self.get_context_characteristic(ctx, CharacteristicName.PLX_FEATURES)
155 if plx_features_value is not None:
156 supported_features = plx_features_value
158 return PLXSpotCheckData(
159 spot_check_flags=flags,
160 spo2=spo2,
161 pulse_rate=pulse_rate,
162 timestamp=timestamp,
163 measurement_status=measurement_status,
164 device_and_sensor_status=device_and_sensor_status,
165 pulse_amplitude_index=pulse_amplitude_index,
166 supported_features=supported_features,
167 )
169 def _encode_value(self, data: PLXSpotCheckData) -> bytearray:
170 """Encode PLX spot-check measurement value back to bytes.
172 Args:
173 data: PLXSpotCheckData instance to encode
175 Returns:
176 Encoded bytes representing the measurement
178 """
179 # Build flags
180 flags = data.spot_check_flags
182 # Build result
183 result = bytearray([int(flags)])
184 result.extend(IEEE11073Parser.encode_sfloat(data.spo2))
185 result.extend(IEEE11073Parser.encode_sfloat(data.pulse_rate))
187 # Encode optional timestamp (7 bytes DateTime)
188 if data.timestamp is not None:
189 result.extend(IEEE11073Parser.encode_timestamp(data.timestamp))
191 # Encode optional measurement status
192 if data.measurement_status is not None:
193 result.extend(DataParser.encode_int16(int(data.measurement_status), signed=False))
195 # Encode optional device and sensor status
196 if data.device_and_sensor_status is not None:
197 # Device and sensor status is 3 bytes (24-bit value)
198 device_status_bytes = DataParser.encode_int32(int(data.device_and_sensor_status), signed=False)[:3]
199 result.extend(device_status_bytes)
201 # Encode optional pulse amplitude index
202 if data.pulse_amplitude_index is not None:
203 result.extend(IEEE11073Parser.encode_sfloat(data.pulse_amplitude_index))
205 return result