Coverage for src / bluetooth_sig / gatt / characteristics / plx_spot_check_measurement.py: 63%

89 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 11:17 +0000

1"""PLX Spot-Check Measurement characteristic implementation.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6from enum import IntFlag 

7from typing import Any, ClassVar 

8 

9import msgspec 

10 

11from ...types.gatt_enums import CharacteristicName 

12from ..context import CharacteristicContext 

13from .base import BaseCharacteristic 

14from .plx_features import PLXFeatureFlags, PLXFeaturesCharacteristic 

15from .utils import DataParser, IEEE11073Parser 

16 

17logger = logging.getLogger(__name__) 

18 

19 

20class PLXSpotCheckFlags(IntFlag): 

21 """PLX Spot-Check measurement flags.""" 

22 

23 SPO2PR_FAST = 0x01 

24 MEASUREMENT_STATUS_PRESENT = 0x02 

25 DEVICE_AND_SENSOR_STATUS_PRESENT = 0x04 

26 PULSE_AMPLITUDE_INDEX_PRESENT = 0x08 

27 

28 

29class PLXMeasurementStatus(IntFlag): 

30 """PLX Measurement Status flags (16-bit).""" 

31 

32 MEASUREMENT_ONGOING = 0x0001 

33 EARLY_ESTIMATED_DATA = 0x0002 

34 VALIDATED_DATA = 0x0004 

35 FULLY_QUALIFIED_DATA = 0x0008 

36 DATA_FROM_MEASUREMENT_STORAGE = 0x0010 

37 DATA_FOR_DEMONSTRATION = 0x0020 

38 DATA_FROM_TESTING_SIMULATION = 0x0040 

39 DATA_FROM_CALIBRATION_TEST = 0x0080 

40 

41 

42class PLXDeviceAndSensorStatus(IntFlag): 

43 """PLX Device and Sensor Status flags (24-bit).""" 

44 

45 # Device Status (bits 0-15, same as Measurement Status) 

46 DEVICE_MEASUREMENT_ONGOING = 0x000001 

47 DEVICE_EARLY_ESTIMATED_DATA = 0x000002 

48 DEVICE_VALIDATED_DATA = 0x000004 

49 DEVICE_FULLY_QUALIFIED_DATA = 0x000008 

50 DEVICE_DATA_FROM_MEASUREMENT_STORAGE = 0x000010 

51 DEVICE_DATA_FOR_DEMONSTRATION = 0x000020 

52 DEVICE_DATA_FROM_TESTING_SIMULATION = 0x000040 

53 DEVICE_DATA_FROM_CALIBRATION_TEST = 0x000080 

54 

55 # Sensor Status (bits 16-23) 

56 SENSOR_OPERATIONAL = 0x000100 

57 SENSOR_DEFECTIVE = 0x000200 

58 SENSOR_DISCONNECTED = 0x000400 

59 SENSOR_MALFUNCTIONING = 0x000800 

60 SENSOR_UNCALIBRATED = 0x001000 

61 SENSOR_NOT_OPERATIONAL = 0x002000 

62 

63 

64class PLXSpotCheckData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods 

65 """Parsed PLX spot-check measurement data.""" 

66 

67 spot_check_flags: PLXSpotCheckFlags # PLX spot-check measurement flags 

68 spo2: float # Blood oxygen saturation percentage (SpO2) 

69 pulse_rate: float # Pulse rate in beats per minute 

70 measurement_status: PLXMeasurementStatus | None = None # Optional measurement status flags 

71 device_and_sensor_status: PLXDeviceAndSensorStatus | None = None # Optional device and sensor status flags 

72 pulse_amplitude_index: float | None = None # Optional pulse amplitude index value 

73 supported_features: PLXFeatureFlags | None = None # Optional PLX features from context (PLXFeatureFlags enum) 

74 

75 

76class PLXSpotCheckMeasurementCharacteristic(BaseCharacteristic[PLXSpotCheckData]): 

77 """PLX Spot-Check Measurement characteristic (0x2A5E). 

78 

79 Used to transmit single SpO2 (blood oxygen saturation) and pulse rate 

80 measurements from spot-check readings. 

81 """ 

82 

83 _characteristic_name: str = "PLX Spot-Check Measurement" 

84 

85 _optional_dependencies: ClassVar[list[type[BaseCharacteristic[Any]]]] = [PLXFeaturesCharacteristic] 

86 

87 # Declarative validation (automatic) 

88 min_length: int | None = 5 # Flags(1) + SpO2(2) + PulseRate(2) minimum 

89 max_length: int | None = 12 # + MeasurementStatus(2) + DeviceAndSensorStatus(3) + PulseAmplitudeIndex(2) maximum 

90 allow_variable_length: bool = True # Variable optional fields 

91 

92 def _decode_value( # pylint: disable=too-many-locals,too-many-branches # Complexity needed for spec parsing 

93 self, data: bytearray, ctx: CharacteristicContext | None = None, *, validate: bool = True 

94 ) -> PLXSpotCheckData: 

95 """Parse PLX spot-check measurement data according to Bluetooth specification. 

96 

97 Format: Flags(1) + SpO2(2) + Pulse Rate(2) + [Measurement Status(2)] + 

98 [Device and Sensor Status(3)] + [Pulse Amplitude Index(2)] 

99 SpO2 and Pulse Rate are IEEE-11073 16-bit SFLOAT. 

100 

101 Context Enhancement: 

102 If ctx is provided, this method will attempt to enhance the parsed data with: 

103 - PLX Features (0x2A60): Device capabilities and supported measurement types 

104 

105 Args: 

106 data: Raw bytearray from BLE characteristic. 

107 ctx: Optional CharacteristicContext providing surrounding context (may be None). 

108 validate: Whether to validate ranges (default True) 

109 

110 Returns: 

111 PLXSpotCheckData containing parsed PLX spot-check data with optional 

112 context-enhanced information. 

113 

114 """ 

115 flags = PLXSpotCheckFlags(data[0]) 

116 

117 # Parse SpO2 and pulse rate using IEEE-11073 SFLOAT format 

118 spo2 = IEEE11073Parser.parse_sfloat(data, 1) 

119 pulse_rate = IEEE11073Parser.parse_sfloat(data, 3) 

120 

121 # Parse optional fields 

122 measurement_status: PLXMeasurementStatus | None = None 

123 device_and_sensor_status: PLXDeviceAndSensorStatus | None = None 

124 pulse_amplitude_index: float | None = None 

125 offset = 5 

126 

127 if PLXSpotCheckFlags.MEASUREMENT_STATUS_PRESENT in flags and len(data) >= offset + 2: 

128 measurement_status = PLXMeasurementStatus(DataParser.parse_int16(data, offset, signed=False)) 

129 offset += 2 

130 

131 if PLXSpotCheckFlags.DEVICE_AND_SENSOR_STATUS_PRESENT in flags and len(data) >= offset + 3: 

132 device_and_sensor_status = PLXDeviceAndSensorStatus( 

133 DataParser.parse_int32(data[offset : offset + 3] + b"\x00", 0, signed=False) 

134 ) # Pad to 4 bytes 

135 offset += 3 

136 

137 if PLXSpotCheckFlags.PULSE_AMPLITUDE_INDEX_PRESENT in flags and len(data) >= offset + 2: 

138 pulse_amplitude_index = IEEE11073Parser.parse_sfloat(data, offset) 

139 

140 # Context enhancement: PLX Features 

141 supported_features: PLXFeatureFlags | None = None 

142 if ctx: 

143 plx_features_value = self.get_context_characteristic(ctx, CharacteristicName.PLX_FEATURES) 

144 if plx_features_value is not None: 

145 # PLX Features returns PLXFeatureFlags enum 

146 supported_features = plx_features_value 

147 

148 # Create immutable struct with all values 

149 return PLXSpotCheckData( 

150 spot_check_flags=flags, 

151 spo2=spo2, 

152 pulse_rate=pulse_rate, 

153 measurement_status=measurement_status, 

154 device_and_sensor_status=device_and_sensor_status, 

155 pulse_amplitude_index=pulse_amplitude_index, 

156 supported_features=supported_features, 

157 ) 

158 

159 def _encode_value(self, data: PLXSpotCheckData) -> bytearray: 

160 """Encode PLX spot-check measurement value back to bytes. 

161 

162 Args: 

163 data: PLXSpotCheckData instance to encode 

164 

165 Returns: 

166 Encoded bytes representing the measurement 

167 

168 """ 

169 # Build flags 

170 flags = data.spot_check_flags 

171 

172 # Build result 

173 result = bytearray([int(flags)]) 

174 result.extend(IEEE11073Parser.encode_sfloat(data.spo2)) 

175 result.extend(IEEE11073Parser.encode_sfloat(data.pulse_rate)) 

176 

177 # Encode optional measurement status 

178 if data.measurement_status is not None: 

179 result.extend(DataParser.encode_int16(int(data.measurement_status), signed=False)) 

180 

181 # Encode optional device and sensor status 

182 if data.device_and_sensor_status is not None: 

183 # Device and sensor status is 3 bytes (24-bit value) 

184 device_status_bytes = DataParser.encode_int32(int(data.device_and_sensor_status), signed=False)[:3] 

185 result.extend(device_status_bytes) 

186 

187 # Encode optional pulse amplitude index 

188 if data.pulse_amplitude_index is not None: 

189 result.extend(IEEE11073Parser.encode_sfloat(data.pulse_amplitude_index)) 

190 

191 return result