Coverage for src / bluetooth_sig / gatt / characteristics / pulse_oximetry_measurement.py: 85%

80 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 11:17 +0000

1"""Pulse Oximetry Measurement characteristic implementation.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6from datetime import datetime 

7from enum import IntFlag 

8from typing import Any, ClassVar 

9 

10import msgspec 

11 

12from ...types.gatt_enums import CharacteristicName 

13from ..context import CharacteristicContext 

14from .base import BaseCharacteristic 

15from .plx_features import PLXFeatureFlags, PLXFeaturesCharacteristic 

16from .utils import DataParser, IEEE11073Parser 

17 

18logger = logging.getLogger(__name__) 

19 

20 

21class PulseOximetryFlags(IntFlag): 

22 """Pulse Oximetry measurement flags.""" 

23 

24 TIMESTAMP_PRESENT = 0x01 

25 MEASUREMENT_STATUS_PRESENT = 0x02 

26 DEVICE_STATUS_PRESENT = 0x04 

27 PULSE_AMPLITUDE_INDEX_PRESENT = 0x08 

28 

29 

30class PulseOximetryData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods 

31 """Parsed pulse oximetry measurement data. 

32 

33 Attributes: 

34 spo2: Blood oxygen saturation percentage (SpO2) 

35 pulse_rate: Pulse rate in beats per minute 

36 timestamp: Optional timestamp of the measurement 

37 measurement_status: Optional measurement status flags 

38 device_status: Optional device status flags 

39 pulse_amplitude_index: Optional pulse amplitude index value 

40 supported_features: Optional PLX features from context (PLXFeatureFlags enum) 

41 """ 

42 

43 spo2: float 

44 pulse_rate: float 

45 timestamp: datetime | None = None 

46 measurement_status: int | None = None 

47 device_status: int | None = None 

48 pulse_amplitude_index: float | None = None 

49 supported_features: PLXFeatureFlags | None = None # PLX Features from context (0x2A60) 

50 

51 

52class PulseOximetryMeasurementCharacteristic(BaseCharacteristic[PulseOximetryData]): 

53 """PLX Continuous Measurement characteristic (0x2A5F). 

54 

55 Used to transmit SpO2 (blood oxygen saturation) and pulse rate 

56 measurements. 

57 """ 

58 

59 _characteristic_name: str = "PLX Continuous Measurement" 

60 

61 _optional_dependencies: ClassVar[list[type[BaseCharacteristic[Any]]]] = [PLXFeaturesCharacteristic] 

62 

63 # Declarative validation (automatic) 

64 min_length: int | None = 5 # Flags(1) + SpO2(2) + PulseRate(2) minimum 

65 max_length: int | None = 16 # + Timestamp(7) + MeasurementStatus(2) + DeviceStatus(3) maximum 

66 allow_variable_length: bool = True # Variable optional fields 

67 

68 def _decode_value( # pylint: disable=too-many-locals,too-many-branches # Complexity needed for spec parsing 

69 self, data: bytearray, ctx: CharacteristicContext | None = None, *, validate: bool = True 

70 ) -> PulseOximetryData: 

71 """Parse pulse oximetry measurement data according to Bluetooth specification. 

72 

73 Format: Flags(1) + SpO2(2) + Pulse Rate(2) + [Timestamp(7)] + 

74 [Measurement Status(2)] + [Device Status(3)] + [Pulse Amplitude Index(2)] 

75 SpO2 and Pulse Rate are IEEE-11073 16-bit SFLOAT. 

76 

77 Context Enhancement: 

78 If ctx is provided, this method will attempt to enhance the parsed data with: 

79 - PLX Features (0x2A60): Device capabilities and supported measurement types 

80 

81 Args: 

82 data: Raw bytearray from BLE characteristic. 

83 ctx: Optional CharacteristicContext providing surrounding context (may be None). 

84 validate: Whether to validate ranges (default True) 

85 

86 Returns: 

87 PulseOximetryData containing parsed pulse oximetry data with optional 

88 context-enhanced information. 

89 

90 """ 

91 flags = PulseOximetryFlags(data[0]) 

92 

93 # Parse SpO2 and pulse rate using IEEE-11073 SFLOAT format 

94 spo2 = IEEE11073Parser.parse_sfloat(data, 1) 

95 pulse_rate = IEEE11073Parser.parse_sfloat(data, 3) 

96 

97 # Parse optional fields 

98 timestamp: datetime | None = None 

99 measurement_status: int | None = None 

100 device_status: int | None = None 

101 pulse_amplitude_index: float | None = None 

102 offset = 5 

103 

104 if PulseOximetryFlags.TIMESTAMP_PRESENT in flags and len(data) >= offset + 7: 

105 timestamp = IEEE11073Parser.parse_timestamp(data, offset) 

106 offset += 7 

107 

108 if PulseOximetryFlags.MEASUREMENT_STATUS_PRESENT in flags and len(data) >= offset + 2: 

109 measurement_status = DataParser.parse_int16(data, offset, signed=False) 

110 offset += 2 

111 

112 if PulseOximetryFlags.DEVICE_STATUS_PRESENT in flags and len(data) >= offset + 3: 

113 device_status = DataParser.parse_int32( 

114 data[offset : offset + 3] + b"\x00", 0, signed=False 

115 ) # Pad to 4 bytes 

116 offset += 3 

117 

118 if PulseOximetryFlags.PULSE_AMPLITUDE_INDEX_PRESENT in flags and len(data) >= offset + 2: 

119 pulse_amplitude_index = IEEE11073Parser.parse_sfloat(data, offset) 

120 

121 # Context enhancement: PLX Features 

122 supported_features: PLXFeatureFlags | None = None 

123 if ctx: 

124 plx_features_value = self.get_context_characteristic(ctx, CharacteristicName.PLX_FEATURES) 

125 if plx_features_value is not None: 

126 # PLX Features returns PLXFeatureFlags enum 

127 supported_features = plx_features_value 

128 

129 # Create immutable struct with all values 

130 return PulseOximetryData( 

131 spo2=spo2, 

132 pulse_rate=pulse_rate, 

133 timestamp=timestamp, 

134 measurement_status=measurement_status, 

135 device_status=device_status, 

136 pulse_amplitude_index=pulse_amplitude_index, 

137 supported_features=supported_features, 

138 ) 

139 

140 def _encode_value(self, data: PulseOximetryData) -> bytearray: 

141 """Encode pulse oximetry measurement value back to bytes. 

142 

143 Args: 

144 data: PulseOximetryData instance to encode 

145 

146 Returns: 

147 Encoded bytes representing the measurement 

148 

149 """ 

150 # Build flags 

151 flags = PulseOximetryFlags(0) 

152 if data.timestamp is not None: 

153 flags |= PulseOximetryFlags.TIMESTAMP_PRESENT 

154 if data.measurement_status is not None: 

155 flags |= PulseOximetryFlags.MEASUREMENT_STATUS_PRESENT 

156 if data.device_status is not None: 

157 flags |= PulseOximetryFlags.DEVICE_STATUS_PRESENT 

158 if data.pulse_amplitude_index is not None: 

159 flags |= PulseOximetryFlags.PULSE_AMPLITUDE_INDEX_PRESENT 

160 

161 # Build result 

162 result = bytearray([int(flags)]) 

163 result.extend(IEEE11073Parser.encode_sfloat(data.spo2)) 

164 result.extend(IEEE11073Parser.encode_sfloat(data.pulse_rate)) 

165 

166 # Encode optional timestamp 

167 if data.timestamp is not None: 

168 result.extend(IEEE11073Parser.encode_timestamp(data.timestamp)) 

169 

170 # Encode optional measurement status 

171 if data.measurement_status is not None: 

172 result.extend(DataParser.encode_int16(data.measurement_status, signed=False)) 

173 

174 # Encode optional device status 

175 if data.device_status is not None: 

176 # Device status is 3 bytes (24-bit value) 

177 device_status_bytes = DataParser.encode_int32(data.device_status, signed=False)[:3] 

178 result.extend(device_status_bytes) 

179 

180 # Encode optional pulse amplitude index 

181 if data.pulse_amplitude_index is not None: 

182 result.extend(IEEE11073Parser.encode_sfloat(data.pulse_amplitude_index)) 

183 

184 return result