Coverage for src / bluetooth_sig / gatt / characteristics / plx_spot_check_measurement.py: 91%

103 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-03 16:41 +0000

1"""PLX Spot-Check Measurement characteristic implementation.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6from datetime import datetime 

7from enum import IntFlag 

8from typing import Any, ClassVar 

9 

10import msgspec 

11 

12from ...types.gatt_enums import CharacteristicName 

13from ..context import CharacteristicContext 

14from .base import BaseCharacteristic 

15from .plx_features import PLXFeatureFlags, PLXFeaturesCharacteristic 

16from .utils import DataParser, IEEE11073Parser 

17 

18logger = logging.getLogger(__name__) 

19 

20 

21class PLXSpotCheckFlags(IntFlag): 

22 """PLX Spot-Check measurement flags (Table 3.3 PLXS v1.0.1).""" 

23 

24 TIMESTAMP_PRESENT = 0x01 # Bit 0: Timestamp field is present 

25 MEASUREMENT_STATUS_PRESENT = 0x02 # Bit 1: Measurement Status field is present 

26 DEVICE_AND_SENSOR_STATUS_PRESENT = 0x04 # Bit 2: Device and Sensor Status field is present 

27 PULSE_AMPLITUDE_INDEX_PRESENT = 0x08 # Bit 3: Pulse Amplitude Index field is present 

28 DEVICE_CLOCK_NOT_SET = 0x10 # Bit 4: Device Clock is Not Set 

29 

30 

31class PLXMeasurementStatus(IntFlag): 

32 """PLX Measurement Status flags (16-bit, Table 3.4 PLXS v1.0.1). 

33 

34 Bits 0-4 are RFU. Status bits start at bit 5. 

35 """ 

36 

37 MEASUREMENT_ONGOING = 0x0020 # Bit 5 

38 EARLY_ESTIMATED_DATA = 0x0040 # Bit 6 

39 VALIDATED_DATA = 0x0080 # Bit 7 

40 FULLY_QUALIFIED_DATA = 0x0100 # Bit 8 

41 DATA_FROM_MEASUREMENT_STORAGE = 0x0200 # Bit 9 

42 DATA_FOR_DEMONSTRATION = 0x0400 # Bit 10 

43 DATA_FOR_TESTING = 0x0800 # Bit 11 

44 CALIBRATION_ONGOING = 0x1000 # Bit 12 

45 MEASUREMENT_UNAVAILABLE = 0x2000 # Bit 13 

46 QUESTIONABLE_MEASUREMENT_DETECTED = 0x4000 # Bit 14 

47 INVALID_MEASUREMENT_DETECTED = 0x8000 # Bit 15 

48 

49 

50class PLXDeviceAndSensorStatus(IntFlag): 

51 """PLX Device and Sensor Status flags (24-bit, Table 3.5 PLXS v1.0.1).""" 

52 

53 EXTENDED_DISPLAY_UPDATE_ONGOING = 0x000001 # Bit 0 

54 EQUIPMENT_MALFUNCTION_DETECTED = 0x000002 # Bit 1 

55 SIGNAL_PROCESSING_IRREGULARITY = 0x000004 # Bit 2 

56 INADEQUATE_SIGNAL_DETECTED = 0x000008 # Bit 3 

57 POOR_SIGNAL_DETECTED = 0x000010 # Bit 4 

58 LOW_PERFUSION_DETECTED = 0x000020 # Bit 5 

59 ERRATIC_SIGNAL_DETECTED = 0x000040 # Bit 6 

60 NON_PULSATILE_SIGNAL_DETECTED = 0x000080 # Bit 7 

61 QUESTIONABLE_PULSE_DETECTED = 0x000100 # Bit 8 

62 SIGNAL_ANALYSIS_ONGOING = 0x000200 # Bit 9 

63 SENSOR_INTERFERENCE_DETECTED = 0x000400 # Bit 10 

64 SENSOR_UNCONNECTED_TO_USER = 0x000800 # Bit 11 

65 UNKNOWN_SENSOR_CONNECTED = 0x001000 # Bit 12 

66 SENSOR_DISPLACED = 0x002000 # Bit 13 

67 SENSOR_MALFUNCTIONING = 0x004000 # Bit 14 

68 SENSOR_DISCONNECTED = 0x008000 # Bit 15 

69 

70 

71class PLXSpotCheckData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods 

72 """Parsed PLX spot-check measurement data (Table 3.2 PLXS v1.0.1).""" 

73 

74 spot_check_flags: PLXSpotCheckFlags # PLX spot-check measurement flags 

75 spo2: float # Blood oxygen saturation percentage (SpO2) — SFLOAT 

76 pulse_rate: float # Pulse rate in beats per minute — SFLOAT 

77 timestamp: datetime | None = None # Optional DateTime (7 octets) per Table 3.3 bit 0 

78 measurement_status: PLXMeasurementStatus | None = None # Optional measurement status flags (16-bit) 

79 device_and_sensor_status: PLXDeviceAndSensorStatus | None = None # Optional device/sensor status (24-bit) 

80 pulse_amplitude_index: float | None = None # Optional pulse amplitude index (SFLOAT, %) 

81 supported_features: PLXFeatureFlags | None = None # Optional PLX features from context 

82 

83 

84class PLXSpotCheckMeasurementCharacteristic(BaseCharacteristic[PLXSpotCheckData]): 

85 """PLX Spot-Check Measurement characteristic (0x2A5E). 

86 

87 Used to transmit single SpO2 (blood oxygen saturation) and pulse rate 

88 measurements from spot-check readings. 

89 """ 

90 

91 _characteristic_name: str = "PLX Spot-Check Measurement" 

92 

93 _optional_dependencies: ClassVar[list[type[BaseCharacteristic[Any]]]] = [PLXFeaturesCharacteristic] 

94 

95 # Declarative validation (automatic) 

96 min_length: int | None = 5 # Flags(1) + SpO2(2) + PulseRate(2) minimum 

97 max_length: int | None = ( 

98 19 # + Timestamp(7) + MeasurementStatus(2) + DeviceAndSensorStatus(3) + PulseAmplitudeIndex(2) 

99 ) 

100 allow_variable_length: bool = True # Variable optional fields 

101 

102 def _decode_value( # pylint: disable=too-many-locals,too-many-branches # Complexity needed for spec parsing 

103 self, data: bytearray, ctx: CharacteristicContext | None = None, *, validate: bool = True 

104 ) -> PLXSpotCheckData: 

105 """Parse PLX spot-check measurement data per PLXS v1.0.1. 

106 

107 Format (Table 3.2): Flags(1) + SpO2(2) + PR(2) + [Timestamp(7)] + 

108 [Measurement Status(2)] + [Device and Sensor Status(3)] + [Pulse Amplitude Index(2)] 

109 SpO2 and Pulse Rate are IEEE-11073 16-bit SFLOAT. 

110 

111 Args: 

112 data: Raw bytearray from BLE characteristic. 

113 ctx: Optional CharacteristicContext providing surrounding context (may be None). 

114 validate: Whether to validate ranges (default True) 

115 

116 Returns: 

117 PLXSpotCheckData containing parsed PLX spot-check data. 

118 

119 """ 

120 flags = PLXSpotCheckFlags(data[0]) 

121 

122 # Parse SpO2 and pulse rate using IEEE-11073 SFLOAT format 

123 spo2 = IEEE11073Parser.parse_sfloat(data, 1) 

124 pulse_rate = IEEE11073Parser.parse_sfloat(data, 3) 

125 

126 # Parse optional fields in order per Table 3.2 

127 timestamp: datetime | None = None 

128 measurement_status: PLXMeasurementStatus | None = None 

129 device_and_sensor_status: PLXDeviceAndSensorStatus | None = None 

130 pulse_amplitude_index: float | None = None 

131 offset = 5 

132 

133 # Timestamp (7 octets DateTime) — Table 3.3 bit 0 

134 if PLXSpotCheckFlags.TIMESTAMP_PRESENT in flags and len(data) >= offset + 7: 

135 timestamp = IEEE11073Parser.parse_timestamp(data, offset) 

136 offset += 7 

137 

138 if PLXSpotCheckFlags.MEASUREMENT_STATUS_PRESENT in flags and len(data) >= offset + 2: 

139 measurement_status = PLXMeasurementStatus(DataParser.parse_int16(data, offset, signed=False)) 

140 offset += 2 

141 

142 if PLXSpotCheckFlags.DEVICE_AND_SENSOR_STATUS_PRESENT in flags and len(data) >= offset + 3: 

143 device_and_sensor_status = PLXDeviceAndSensorStatus( 

144 DataParser.parse_int32(data[offset : offset + 3] + b"\x00", 0, signed=False) 

145 ) # Pad to 4 bytes 

146 offset += 3 

147 

148 if PLXSpotCheckFlags.PULSE_AMPLITUDE_INDEX_PRESENT in flags and len(data) >= offset + 2: 

149 pulse_amplitude_index = IEEE11073Parser.parse_sfloat(data, offset) 

150 

151 # Context enhancement: PLX Features 

152 supported_features: PLXFeatureFlags | None = None 

153 if ctx: 

154 plx_features_value = self.get_context_characteristic(ctx, CharacteristicName.PLX_FEATURES) 

155 if plx_features_value is not None: 

156 supported_features = plx_features_value 

157 

158 return PLXSpotCheckData( 

159 spot_check_flags=flags, 

160 spo2=spo2, 

161 pulse_rate=pulse_rate, 

162 timestamp=timestamp, 

163 measurement_status=measurement_status, 

164 device_and_sensor_status=device_and_sensor_status, 

165 pulse_amplitude_index=pulse_amplitude_index, 

166 supported_features=supported_features, 

167 ) 

168 

169 def _encode_value(self, data: PLXSpotCheckData) -> bytearray: 

170 """Encode PLX spot-check measurement value back to bytes. 

171 

172 Args: 

173 data: PLXSpotCheckData instance to encode 

174 

175 Returns: 

176 Encoded bytes representing the measurement 

177 

178 """ 

179 # Build flags 

180 flags = data.spot_check_flags 

181 

182 # Build result 

183 result = bytearray([int(flags)]) 

184 result.extend(IEEE11073Parser.encode_sfloat(data.spo2)) 

185 result.extend(IEEE11073Parser.encode_sfloat(data.pulse_rate)) 

186 

187 # Encode optional timestamp (7 bytes DateTime) 

188 if data.timestamp is not None: 

189 result.extend(IEEE11073Parser.encode_timestamp(data.timestamp)) 

190 

191 # Encode optional measurement status 

192 if data.measurement_status is not None: 

193 result.extend(DataParser.encode_int16(int(data.measurement_status), signed=False)) 

194 

195 # Encode optional device and sensor status 

196 if data.device_and_sensor_status is not None: 

197 # Device and sensor status is 3 bytes (24-bit value) 

198 device_status_bytes = DataParser.encode_int32(int(data.device_and_sensor_status), signed=False)[:3] 

199 result.extend(device_status_bytes) 

200 

201 # Encode optional pulse amplitude index 

202 if data.pulse_amplitude_index is not None: 

203 result.extend(IEEE11073Parser.encode_sfloat(data.pulse_amplitude_index)) 

204 

205 return result