Coverage for src / bluetooth_sig / gatt / characteristics / pulse_oximetry_measurement.py: 84%

81 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 20:14 +0000

1"""Pulse Oximetry Measurement characteristic implementation.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6from datetime import datetime 

7from enum import IntFlag 

8 

9import msgspec 

10 

11from ...types.gatt_enums import CharacteristicName 

12from ..context import CharacteristicContext 

13from .base import BaseCharacteristic 

14from .plx_features import PLXFeatureFlags, PLXFeaturesCharacteristic 

15from .utils import DataParser, IEEE11073Parser 

16 

17logger = logging.getLogger(__name__) 

18 

19 

20class PulseOximetryFlags(IntFlag): 

21 """Pulse Oximetry measurement flags.""" 

22 

23 TIMESTAMP_PRESENT = 0x01 

24 MEASUREMENT_STATUS_PRESENT = 0x02 

25 DEVICE_STATUS_PRESENT = 0x04 

26 PULSE_AMPLITUDE_INDEX_PRESENT = 0x08 

27 

28 

29class PulseOximetryData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods 

30 """Parsed pulse oximetry measurement data. 

31 

32 Attributes: 

33 spo2: Blood oxygen saturation percentage (SpO2) 

34 pulse_rate: Pulse rate in beats per minute 

35 timestamp: Optional timestamp of the measurement 

36 measurement_status: Optional measurement status flags 

37 device_status: Optional device status flags 

38 pulse_amplitude_index: Optional pulse amplitude index value 

39 supported_features: Optional PLX features from context (PLXFeatureFlags enum) 

40 """ 

41 

42 spo2: float 

43 pulse_rate: float 

44 timestamp: datetime | None = None 

45 measurement_status: int | None = None 

46 device_status: int | None = None 

47 pulse_amplitude_index: float | None = None 

48 supported_features: PLXFeatureFlags | None = None # PLX Features from context (0x2A60) 

49 

50 

51class PulseOximetryMeasurementCharacteristic(BaseCharacteristic[PulseOximetryData]): 

52 """PLX Continuous Measurement characteristic (0x2A5F). 

53 

54 Used to transmit SpO2 (blood oxygen saturation) and pulse rate 

55 measurements. 

56 """ 

57 

58 _characteristic_name: str = "PLX Continuous Measurement" 

59 

60 _optional_dependencies = [PLXFeaturesCharacteristic] 

61 

62 # Declarative validation (automatic) 

63 min_length: int | None = 5 # Flags(1) + SpO2(2) + PulseRate(2) minimum 

64 max_length: int | None = 16 # + Timestamp(7) + MeasurementStatus(2) + DeviceStatus(3) maximum 

65 allow_variable_length: bool = True # Variable optional fields 

66 

67 def _decode_value( # pylint: disable=too-many-locals,too-many-branches # Complexity needed for spec parsing 

68 self, data: bytearray, ctx: CharacteristicContext | None = None 

69 ) -> PulseOximetryData: 

70 """Parse pulse oximetry measurement data according to Bluetooth specification. 

71 

72 Format: Flags(1) + SpO2(2) + Pulse Rate(2) + [Timestamp(7)] + 

73 [Measurement Status(2)] + [Device Status(3)] + [Pulse Amplitude Index(2)] 

74 SpO2 and Pulse Rate are IEEE-11073 16-bit SFLOAT. 

75 

76 Context Enhancement: 

77 If ctx is provided, this method will attempt to enhance the parsed data with: 

78 - PLX Features (0x2A60): Device capabilities and supported measurement types 

79 

80 Args: 

81 data: Raw bytearray from BLE characteristic. 

82 ctx: Optional CharacteristicContext providing surrounding context (may be None). 

83 

84 Returns: 

85 PulseOximetryData containing parsed pulse oximetry data with optional 

86 context-enhanced information. 

87 

88 """ 

89 if len(data) < 5: 

90 raise ValueError("Pulse Oximetry Measurement data must be at least 5 bytes") 

91 

92 flags = PulseOximetryFlags(data[0]) 

93 

94 # Parse SpO2 and pulse rate using IEEE-11073 SFLOAT format 

95 spo2 = IEEE11073Parser.parse_sfloat(data, 1) 

96 pulse_rate = IEEE11073Parser.parse_sfloat(data, 3) 

97 

98 # Parse optional fields 

99 timestamp: datetime | None = None 

100 measurement_status: int | None = None 

101 device_status: int | None = None 

102 pulse_amplitude_index: float | None = None 

103 offset = 5 

104 

105 if PulseOximetryFlags.TIMESTAMP_PRESENT in flags and len(data) >= offset + 7: 

106 timestamp = IEEE11073Parser.parse_timestamp(data, offset) 

107 offset += 7 

108 

109 if PulseOximetryFlags.MEASUREMENT_STATUS_PRESENT in flags and len(data) >= offset + 2: 

110 measurement_status = DataParser.parse_int16(data, offset, signed=False) 

111 offset += 2 

112 

113 if PulseOximetryFlags.DEVICE_STATUS_PRESENT in flags and len(data) >= offset + 3: 

114 device_status = DataParser.parse_int32( 

115 data[offset : offset + 3] + b"\x00", 0, signed=False 

116 ) # Pad to 4 bytes 

117 offset += 3 

118 

119 if PulseOximetryFlags.PULSE_AMPLITUDE_INDEX_PRESENT in flags and len(data) >= offset + 2: 

120 pulse_amplitude_index = IEEE11073Parser.parse_sfloat(data, offset) 

121 

122 # Context enhancement: PLX Features 

123 supported_features: PLXFeatureFlags | None = None 

124 if ctx: 

125 plx_features_value = self.get_context_characteristic(ctx, CharacteristicName.PLX_FEATURES) 

126 if plx_features_value is not None: 

127 # PLX Features returns PLXFeatureFlags enum 

128 supported_features = plx_features_value 

129 

130 # Create immutable struct with all values 

131 return PulseOximetryData( 

132 spo2=spo2, 

133 pulse_rate=pulse_rate, 

134 timestamp=timestamp, 

135 measurement_status=measurement_status, 

136 device_status=device_status, 

137 pulse_amplitude_index=pulse_amplitude_index, 

138 supported_features=supported_features, 

139 ) 

140 

141 def _encode_value(self, data: PulseOximetryData) -> bytearray: 

142 """Encode pulse oximetry measurement value back to bytes. 

143 

144 Args: 

145 data: PulseOximetryData instance to encode 

146 

147 Returns: 

148 Encoded bytes representing the measurement 

149 

150 """ 

151 # Build flags 

152 flags = PulseOximetryFlags(0) 

153 if data.timestamp is not None: 

154 flags |= PulseOximetryFlags.TIMESTAMP_PRESENT 

155 if data.measurement_status is not None: 

156 flags |= PulseOximetryFlags.MEASUREMENT_STATUS_PRESENT 

157 if data.device_status is not None: 

158 flags |= PulseOximetryFlags.DEVICE_STATUS_PRESENT 

159 if data.pulse_amplitude_index is not None: 

160 flags |= PulseOximetryFlags.PULSE_AMPLITUDE_INDEX_PRESENT 

161 

162 # Build result 

163 result = bytearray([int(flags)]) 

164 result.extend(IEEE11073Parser.encode_sfloat(data.spo2)) 

165 result.extend(IEEE11073Parser.encode_sfloat(data.pulse_rate)) 

166 

167 # Encode optional timestamp 

168 if data.timestamp is not None: 

169 result.extend(IEEE11073Parser.encode_timestamp(data.timestamp)) 

170 

171 # Encode optional measurement status 

172 if data.measurement_status is not None: 

173 result.extend(DataParser.encode_int16(data.measurement_status, signed=False)) 

174 

175 # Encode optional device status 

176 if data.device_status is not None: 

177 # Device status is 3 bytes (24-bit value) 

178 device_status_bytes = DataParser.encode_int32(data.device_status, signed=False)[:3] 

179 result.extend(device_status_bytes) 

180 

181 # Encode optional pulse amplitude index 

182 if data.pulse_amplitude_index is not None: 

183 result.extend(IEEE11073Parser.encode_sfloat(data.pulse_amplitude_index)) 

184 

185 return result