Coverage for src/bluetooth_sig/gatt/characteristics/pulse_oximetry_measurement.py: 83%

70 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-30 00:10 +0000

1"""Pulse Oximetry Measurement characteristic implementation.""" 

2 

3from __future__ import annotations 

4 

5from datetime import datetime 

6from enum import IntFlag 

7 

8import msgspec 

9 

10from ..context import CharacteristicContext 

11from .base import BaseCharacteristic 

12from .utils import DataParser, IEEE11073Parser 

13 

14# TODO: Implement CharacteristicContext support 

15# This characteristic should access Pulse Oximetry Control Point (0x2A60) and Pulse Oximetry Features (0x2A61) 

16# from ctx.other_characteristics to determine supported measurement types and calibration data 

17 

18 

19class PulseOximetryFlags(IntFlag): 

20 """Pulse Oximetry measurement flags.""" 

21 

22 TIMESTAMP_PRESENT = 0x01 

23 MEASUREMENT_STATUS_PRESENT = 0x02 

24 DEVICE_STATUS_PRESENT = 0x04 

25 PULSE_AMPLITUDE_INDEX_PRESENT = 0x08 

26 

27 

28class PulseOximetryData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods 

29 """Parsed pulse oximetry measurement data.""" 

30 

31 spo2: float 

32 pulse_rate: float 

33 timestamp: datetime | None = None 

34 measurement_status: int | None = None 

35 device_status: int | None = None 

36 pulse_amplitude_index: float | None = None 

37 

38 

39class PulseOximetryMeasurementCharacteristic(BaseCharacteristic): 

40 """PLX Continuous Measurement characteristic (0x2A5F). 

41 

42 Used to transmit SpO2 (blood oxygen saturation) and pulse rate 

43 measurements. 

44 """ 

45 

46 _characteristic_name: str = "PLX Continuous Measurement" 

47 

48 # Declarative validation (automatic) 

49 min_length: int | None = 5 # Flags(1) + SpO2(2) + PulseRate(2) minimum 

50 max_length: int | None = 16 # + Timestamp(7) + MeasurementStatus(2) + DeviceStatus(3) maximum 

51 allow_variable_length: bool = True # Variable optional fields 

52 

53 def decode_value(self, data: bytearray, ctx: CharacteristicContext | None = None) -> PulseOximetryData: # pylint: disable=too-many-locals 

54 """Parse pulse oximetry measurement data according to Bluetooth specification. 

55 

56 Format: Flags(1) + SpO2(2) + Pulse Rate(2) + [Timestamp(7)] + 

57 [Measurement Status(2)] + [Device Status(3)] + [Pulse Amplitude Index(2)] 

58 SpO2 and Pulse Rate are IEEE-11073 16-bit SFLOAT. 

59 

60 Args: 

61 data: Raw bytearray from BLE characteristic. 

62 ctx: Optional CharacteristicContext providing surrounding context (may be None). 

63 

64 Returns: 

65 PulseOximetryData containing parsed pulse oximetry data. 

66 

67 """ 

68 if len(data) < 5: 

69 raise ValueError("Pulse Oximetry Measurement data must be at least 5 bytes") 

70 

71 flags = PulseOximetryFlags(data[0]) 

72 

73 # Parse SpO2 and pulse rate using IEEE-11073 SFLOAT format 

74 spo2 = IEEE11073Parser.parse_sfloat(data, 1) 

75 pulse_rate = IEEE11073Parser.parse_sfloat(data, 3) 

76 

77 # Parse optional fields 

78 timestamp: datetime | None = None 

79 measurement_status: int | None = None 

80 device_status: int | None = None 

81 pulse_amplitude_index: float | None = None 

82 offset = 5 

83 

84 if PulseOximetryFlags.TIMESTAMP_PRESENT in flags and len(data) >= offset + 7: 

85 timestamp = IEEE11073Parser.parse_timestamp(data, offset) 

86 offset += 7 

87 

88 if PulseOximetryFlags.MEASUREMENT_STATUS_PRESENT in flags and len(data) >= offset + 2: 

89 measurement_status = DataParser.parse_int16(data, offset, signed=False) 

90 offset += 2 

91 

92 if PulseOximetryFlags.DEVICE_STATUS_PRESENT in flags and len(data) >= offset + 3: 

93 device_status = DataParser.parse_int32( 

94 data[offset : offset + 3] + b"\x00", 0, signed=False 

95 ) # Pad to 4 bytes 

96 offset += 3 

97 

98 if PulseOximetryFlags.PULSE_AMPLITUDE_INDEX_PRESENT in flags and len(data) >= offset + 2: 

99 pulse_amplitude_index = IEEE11073Parser.parse_sfloat(data, offset) 

100 

101 # Create immutable struct with all values 

102 return PulseOximetryData( 

103 spo2=spo2, 

104 pulse_rate=pulse_rate, 

105 timestamp=timestamp, 

106 measurement_status=measurement_status, 

107 device_status=device_status, 

108 pulse_amplitude_index=pulse_amplitude_index, 

109 ) 

110 

111 def encode_value(self, data: PulseOximetryData) -> bytearray: 

112 """Encode pulse oximetry measurement value back to bytes. 

113 

114 Args: 

115 data: PulseOximetryData instance to encode 

116 

117 Returns: 

118 Encoded bytes representing the measurement 

119 

120 """ 

121 # Build flags 

122 flags = PulseOximetryFlags(0) 

123 if data.timestamp is not None: 

124 flags |= PulseOximetryFlags.TIMESTAMP_PRESENT 

125 if data.measurement_status is not None: 

126 flags |= PulseOximetryFlags.MEASUREMENT_STATUS_PRESENT 

127 if data.device_status is not None: 

128 flags |= PulseOximetryFlags.DEVICE_STATUS_PRESENT 

129 if data.pulse_amplitude_index is not None: 

130 flags |= PulseOximetryFlags.PULSE_AMPLITUDE_INDEX_PRESENT 

131 

132 # Build result 

133 result = bytearray([int(flags)]) 

134 result.extend(IEEE11073Parser.encode_sfloat(data.spo2)) 

135 result.extend(IEEE11073Parser.encode_sfloat(data.pulse_rate)) 

136 

137 # Encode optional timestamp 

138 if data.timestamp is not None: 

139 result.extend(IEEE11073Parser.encode_timestamp(data.timestamp)) 

140 

141 # Encode optional measurement status 

142 if data.measurement_status is not None: 

143 result.extend(DataParser.encode_int16(data.measurement_status, signed=False)) 

144 

145 # Encode optional device status 

146 if data.device_status is not None: 

147 # Device status is 3 bytes (24-bit value) 

148 device_status_bytes = DataParser.encode_int32(data.device_status, signed=False)[:3] 

149 result.extend(device_status_bytes) 

150 

151 # Encode optional pulse amplitude index 

152 if data.pulse_amplitude_index is not None: 

153 result.extend(IEEE11073Parser.encode_sfloat(data.pulse_amplitude_index)) 

154 

155 return result