Coverage for src / bluetooth_sig / gatt / characteristics / plx_continuous_measurement.py: 78%

148 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-03 16:41 +0000

1"""PLX Continuous Measurement characteristic implementation.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6from enum import IntFlag 

7from typing import Any, ClassVar 

8 

9import msgspec 

10 

11from ...types.gatt_enums import CharacteristicName 

12from ..context import CharacteristicContext 

13from .base import BaseCharacteristic 

14from .plx_features import PLXFeatureFlags, PLXFeaturesCharacteristic 

15from .utils import DataParser, IEEE11073Parser 

16 

17logger = logging.getLogger(__name__) 

18 

19_PLX_CONTINUOUS_MIN_BYTES = 5 # Flags(1) + SpO2(2) + PulseRate(2) 

20 

21 

22class PLXContinuousFlags(IntFlag): 

23 """PLX Continuous measurement flags (Table 3.7 PLXS v1.0.1).""" 

24 

25 SPO2PR_FAST_PRESENT = 0x01 # Bit 0: SpO2PR-Fast field is present 

26 SPO2PR_SLOW_PRESENT = 0x02 # Bit 1: SpO2PR-Slow field is present 

27 MEASUREMENT_STATUS_PRESENT = 0x04 # Bit 2: Measurement Status field is present 

28 DEVICE_AND_SENSOR_STATUS_PRESENT = 0x08 # Bit 3: Device and Sensor Status field is present 

29 PULSE_AMPLITUDE_INDEX_PRESENT = 0x10 # Bit 4: Pulse Amplitude Index field is present 

30 

31 

32class PLXMeasurementStatus(IntFlag): 

33 """PLX Measurement Status flags (16-bit, Table 3.4 PLXS v1.0.1). 

34 

35 Bits 0-4 are RFU. Status bits start at bit 5. 

36 """ 

37 

38 MEASUREMENT_ONGOING = 0x0020 # Bit 5 

39 EARLY_ESTIMATED_DATA = 0x0040 # Bit 6 

40 VALIDATED_DATA = 0x0080 # Bit 7 

41 FULLY_QUALIFIED_DATA = 0x0100 # Bit 8 

42 DATA_FROM_MEASUREMENT_STORAGE = 0x0200 # Bit 9 

43 DATA_FOR_DEMONSTRATION = 0x0400 # Bit 10 

44 DATA_FOR_TESTING = 0x0800 # Bit 11 

45 CALIBRATION_ONGOING = 0x1000 # Bit 12 

46 MEASUREMENT_UNAVAILABLE = 0x2000 # Bit 13 

47 QUESTIONABLE_MEASUREMENT_DETECTED = 0x4000 # Bit 14 

48 INVALID_MEASUREMENT_DETECTED = 0x8000 # Bit 15 

49 

50 

51class PLXDeviceAndSensorStatus(IntFlag): 

52 """PLX Device and Sensor Status flags (24-bit, Table 3.5 PLXS v1.0.1).""" 

53 

54 EXTENDED_DISPLAY_UPDATE_ONGOING = 0x000001 # Bit 0 

55 EQUIPMENT_MALFUNCTION_DETECTED = 0x000002 # Bit 1 

56 SIGNAL_PROCESSING_IRREGULARITY = 0x000004 # Bit 2 

57 INADEQUATE_SIGNAL_DETECTED = 0x000008 # Bit 3 

58 POOR_SIGNAL_DETECTED = 0x000010 # Bit 4 

59 LOW_PERFUSION_DETECTED = 0x000020 # Bit 5 

60 ERRATIC_SIGNAL_DETECTED = 0x000040 # Bit 6 

61 NON_PULSATILE_SIGNAL_DETECTED = 0x000080 # Bit 7 

62 QUESTIONABLE_PULSE_DETECTED = 0x000100 # Bit 8 

63 SIGNAL_ANALYSIS_ONGOING = 0x000200 # Bit 9 

64 SENSOR_INTERFERENCE_DETECTED = 0x000400 # Bit 10 

65 SENSOR_UNCONNECTED_TO_USER = 0x000800 # Bit 11 

66 UNKNOWN_SENSOR_CONNECTED = 0x001000 # Bit 12 

67 SENSOR_DISPLACED = 0x002000 # Bit 13 

68 SENSOR_MALFUNCTIONING = 0x004000 # Bit 14 

69 SENSOR_DISCONNECTED = 0x008000 # Bit 15 

70 

71 

72class PLXContinuousData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods 

73 """Parsed PLX continuous measurement data (Table 3.6 PLXS v1.0.1).""" 

74 

75 continuous_flags: PLXContinuousFlags 

76 spo2: float # SpO2PR-Normal SpO2 (mandatory) 

77 pulse_rate: float # SpO2PR-Normal PR (mandatory) 

78 spo2_fast: float | None = None # SpO2PR-Fast SpO2 (optional, bit 0) 

79 pulse_rate_fast: float | None = None # SpO2PR-Fast PR (optional, bit 0) 

80 spo2_slow: float | None = None # SpO2PR-Slow SpO2 (optional, bit 1) 

81 pulse_rate_slow: float | None = None # SpO2PR-Slow PR (optional, bit 1) 

82 measurement_status: PLXMeasurementStatus | None = None 

83 device_and_sensor_status: PLXDeviceAndSensorStatus | None = None 

84 pulse_amplitude_index: float | None = None 

85 supported_features: PLXFeatureFlags | None = None 

86 

87 

88class PLXContinuousMeasurementCharacteristic(BaseCharacteristic[PLXContinuousData]): 

89 """PLX Continuous Measurement characteristic (0x2A5F). 

90 

91 Used to transmit continuous SpO2 (blood oxygen saturation) and pulse rate 

92 measurements from pulse oximetry devices. 

93 

94 Format (Table 3.6): Flags(1) + SpO2PR-Normal(4) + [SpO2PR-Fast(4)] + 

95 [SpO2PR-Slow(4)] + [Measurement Status(2)] + [Device and Sensor Status(3)] + 

96 [Pulse Amplitude Index(2)] 

97 """ 

98 

99 _characteristic_name: str = "PLX Continuous Measurement" 

100 

101 _optional_dependencies: ClassVar[list[type[BaseCharacteristic[Any]]]] = [PLXFeaturesCharacteristic] 

102 

103 # Declarative validation 

104 min_length: int | None = _PLX_CONTINUOUS_MIN_BYTES # Flags(1) + SpO2(2) + PulseRate(2) minimum 

105 max_length: int | None = 20 # + Fast(4) + Slow(4) + MeasStatus(2) + DevSensStatus(3) + PAI(2) 

106 allow_variable_length: bool = True 

107 

108 def _decode_value( # pylint: disable=too-many-locals,too-many-branches 

109 self, data: bytearray, ctx: CharacteristicContext | None = None, *, validate: bool = True 

110 ) -> PLXContinuousData: 

111 """Parse PLX continuous measurement data per PLXS v1.0.1. 

112 

113 Format (Table 3.6): Flags(1) + SpO2PR-Normal(4) + [SpO2PR-Fast(4)] + 

114 [SpO2PR-Slow(4)] + [Measurement Status(2)] + [Device and Sensor Status(3)] + 

115 [Pulse Amplitude Index(2)] 

116 All SpO2/PR fields are IEEE-11073 16-bit SFLOAT. 

117 """ 

118 if validate and len(data) < _PLX_CONTINUOUS_MIN_BYTES: 

119 raise ValueError( 

120 f"Insufficient data for PLX continuous measurement: {len(data)} < {_PLX_CONTINUOUS_MIN_BYTES}" 

121 ) 

122 

123 offset = 0 

124 

125 # Parse flags 

126 continuous_flags = PLXContinuousFlags(data[offset]) 

127 offset += 1 

128 

129 # Parse SpO2PR-Normal (mandatory): SpO2 + PR — IEEE-11073 SFLOAT 

130 spo2 = IEEE11073Parser.parse_sfloat(data, offset) 

131 offset += 2 

132 

133 pulse_rate = IEEE11073Parser.parse_sfloat(data, offset) 

134 offset += 2 

135 

136 # Parse optional SpO2PR-Fast (4 bytes) — bit 0 

137 spo2_fast: float | None = None 

138 pulse_rate_fast: float | None = None 

139 if continuous_flags & PLXContinuousFlags.SPO2PR_FAST_PRESENT: 

140 if validate and offset + 4 > len(data): 

141 raise ValueError(f"Not enough data for SpO2PR-Fast: {len(data)} < {offset + 4}") 

142 spo2_fast = IEEE11073Parser.parse_sfloat(data, offset) 

143 offset += 2 

144 pulse_rate_fast = IEEE11073Parser.parse_sfloat(data, offset) 

145 offset += 2 

146 

147 # Parse optional SpO2PR-Slow (4 bytes) — bit 1 

148 spo2_slow: float | None = None 

149 pulse_rate_slow: float | None = None 

150 if continuous_flags & PLXContinuousFlags.SPO2PR_SLOW_PRESENT: 

151 if validate and offset + 4 > len(data): 

152 raise ValueError(f"Not enough data for SpO2PR-Slow: {len(data)} < {offset + 4}") 

153 spo2_slow = IEEE11073Parser.parse_sfloat(data, offset) 

154 offset += 2 

155 pulse_rate_slow = IEEE11073Parser.parse_sfloat(data, offset) 

156 offset += 2 

157 

158 # Parse optional Measurement Status (2 bytes) — bit 2 

159 measurement_status = None 

160 if continuous_flags & PLXContinuousFlags.MEASUREMENT_STATUS_PRESENT: 

161 if validate and offset + 2 > len(data): 

162 raise ValueError(f"Not enough data for measurement status: {len(data)} < {offset + 2}") 

163 measurement_status = PLXMeasurementStatus(DataParser.parse_int16(data, offset, signed=False)) 

164 offset += 2 

165 

166 # Parse optional Device and Sensor Status (3 bytes) — bit 3 

167 device_and_sensor_status = None 

168 if continuous_flags & PLXContinuousFlags.DEVICE_AND_SENSOR_STATUS_PRESENT: 

169 if validate and offset + 3 > len(data): 

170 raise ValueError(f"Not enough data for device/sensor status: {len(data)} < {offset + 3}") 

171 device_and_sensor_status = PLXDeviceAndSensorStatus( 

172 DataParser.parse_int32(data[offset : offset + 3] + b"\x00", 0, signed=False) 

173 ) 

174 offset += 3 

175 

176 # Parse optional Pulse Amplitude Index (2 bytes) — bit 4 

177 pulse_amplitude_index = None 

178 if continuous_flags & PLXContinuousFlags.PULSE_AMPLITUDE_INDEX_PRESENT: 

179 if validate and offset + 2 > len(data): 

180 raise ValueError(f"Not enough data for pulse amplitude index: {len(data)} < {offset + 2}") 

181 pulse_amplitude_index = IEEE11073Parser.parse_sfloat(data, offset) 

182 offset += 2 

183 

184 # Context enhancement: PLX Features 

185 supported_features: PLXFeatureFlags | None = None 

186 if ctx: 

187 plx_features_value = self.get_context_characteristic(ctx, CharacteristicName.PLX_FEATURES) 

188 if plx_features_value is not None: 

189 supported_features = plx_features_value 

190 

191 return PLXContinuousData( 

192 continuous_flags=continuous_flags, 

193 spo2=spo2, 

194 pulse_rate=pulse_rate, 

195 spo2_fast=spo2_fast, 

196 pulse_rate_fast=pulse_rate_fast, 

197 spo2_slow=spo2_slow, 

198 pulse_rate_slow=pulse_rate_slow, 

199 measurement_status=measurement_status, 

200 device_and_sensor_status=device_and_sensor_status, 

201 pulse_amplitude_index=pulse_amplitude_index, 

202 supported_features=supported_features, 

203 ) 

204 

205 def _encode_value(self, data: PLXContinuousData) -> bytearray: 

206 """Encode PLX continuous measurement data.""" 

207 result = bytearray() 

208 

209 # Encode flags 

210 result.append(int(data.continuous_flags)) 

211 

212 # Encode SpO2PR-Normal (mandatory): SpO2 + PR 

213 result.extend(IEEE11073Parser.encode_sfloat(data.spo2)) 

214 result.extend(IEEE11073Parser.encode_sfloat(data.pulse_rate)) 

215 

216 # Encode optional SpO2PR-Fast 

217 if data.continuous_flags & PLXContinuousFlags.SPO2PR_FAST_PRESENT: 

218 if data.spo2_fast is None or data.pulse_rate_fast is None: 

219 raise ValueError("SpO2PR-Fast flag set but values are None") 

220 result.extend(IEEE11073Parser.encode_sfloat(data.spo2_fast)) 

221 result.extend(IEEE11073Parser.encode_sfloat(data.pulse_rate_fast)) 

222 

223 # Encode optional SpO2PR-Slow 

224 if data.continuous_flags & PLXContinuousFlags.SPO2PR_SLOW_PRESENT: 

225 if data.spo2_slow is None or data.pulse_rate_slow is None: 

226 raise ValueError("SpO2PR-Slow flag set but values are None") 

227 result.extend(IEEE11073Parser.encode_sfloat(data.spo2_slow)) 

228 result.extend(IEEE11073Parser.encode_sfloat(data.pulse_rate_slow)) 

229 

230 # Encode optional Measurement Status 

231 if data.continuous_flags & PLXContinuousFlags.MEASUREMENT_STATUS_PRESENT: 

232 if data.measurement_status is None: 

233 raise ValueError("Measurement Status flag set but value is None") 

234 result.extend(int(data.measurement_status).to_bytes(2, byteorder="little", signed=False)) 

235 

236 # Encode optional Device and Sensor Status 

237 if data.continuous_flags & PLXContinuousFlags.DEVICE_AND_SENSOR_STATUS_PRESENT: 

238 if data.device_and_sensor_status is None: 

239 raise ValueError("Device/Sensor Status flag set but value is None") 

240 status_val = int(data.device_and_sensor_status) 

241 result.append(status_val & 0xFF) 

242 result.append((status_val >> 8) & 0xFF) 

243 result.append((status_val >> 16) & 0xFF) 

244 

245 # Encode optional Pulse Amplitude Index 

246 if data.continuous_flags & PLXContinuousFlags.PULSE_AMPLITUDE_INDEX_PRESENT: 

247 if data.pulse_amplitude_index is None: 

248 raise ValueError("Pulse Amplitude Index flag set but value is None") 

249 result.extend(IEEE11073Parser.encode_sfloat(data.pulse_amplitude_index)) 

250 

251 return result