Coverage for src/bluetooth_sig/gatt/characteristics/heart_rate_measurement.py: 91%

86 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-30 00:10 +0000

1"""Heart Rate Measurement characteristic implementation.""" 

2 

3from __future__ import annotations 

4 

5from enum import IntEnum, IntFlag 

6 

7import msgspec 

8 

9from ..constants import UINT16_MAX 

10from ..context import CharacteristicContext 

11from .base import BaseCharacteristic 

12from .utils import DataParser 

13 

14# RR-Interval resolution: 1/1024 seconds per unit 

15RR_INTERVAL_RESOLUTION = 1024.0 

16 

17# TODO: Implement CharacteristicContext support 

18# This characteristic should access Heart Rate Control Point (0x2A39) from ctx.other_characteristics 

19# to provide calibration factors and control commands for enhanced heart rate monitoring 

20 

21 

22class HeartRateMeasurementFlags(IntFlag): 

23 """Heart Rate Measurement flags as per Bluetooth SIG specification.""" 

24 

25 HEART_RATE_VALUE_FORMAT_UINT16 = 0x01 

26 SENSOR_CONTACT_SUPPORTED = 0x02 

27 SENSOR_CONTACT_DETECTED = 0x04 

28 ENERGY_EXPENDED_PRESENT = 0x08 

29 RR_INTERVAL_PRESENT = 0x10 

30 

31 

32class SensorContactState(IntEnum): 

33 """Sensor contact state enumeration.""" 

34 

35 NOT_SUPPORTED = 0 

36 NOT_DETECTED = 1 

37 DETECTED = 2 

38 

39 def __str__(self) -> str: 

40 """Return human-readable sensor contact state.""" 

41 return {0: "not_supported", 1: "not_detected", 2: "detected"}[self.value] 

42 

43 def __eq__(self, other: object) -> bool: 

44 """Support comparison with string values for backward compatibility.""" 

45 if isinstance(other, str): 

46 return str(self) == other 

47 return super().__eq__(other) 

48 

49 def __hash__(self) -> int: 

50 """Make enum hashable.""" 

51 return super().__hash__() 

52 

53 @classmethod 

54 def from_flags(cls, flags: int) -> SensorContactState: 

55 """Create enum from heart rate flags.""" 

56 if not flags & HeartRateMeasurementFlags.SENSOR_CONTACT_SUPPORTED: # Sensor Contact Supported bit not set 

57 return cls.NOT_SUPPORTED 

58 if flags & HeartRateMeasurementFlags.SENSOR_CONTACT_DETECTED: # Sensor Contact Detected bit set 

59 return cls.DETECTED 

60 return cls.NOT_DETECTED 

61 

62 

63class HeartRateData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods 

64 """Parsed data from Heart Rate Measurement characteristic.""" 

65 

66 heart_rate: int # BPM (0-UINT16_MAX) 

67 sensor_contact: SensorContactState 

68 energy_expended: int | None = None # kJ 

69 rr_intervals: tuple[float, ...] = () # R-R intervals in seconds, immutable tuple 

70 flags: HeartRateMeasurementFlags = HeartRateMeasurementFlags(0) # Raw flags byte for reference 

71 

72 def __post_init__(self) -> None: 

73 """Validate heart rate measurement data.""" 

74 if not 0 <= self.heart_rate <= UINT16_MAX: 

75 raise ValueError(f"Heart rate must be 0-{UINT16_MAX} bpm, got {self.heart_rate}") 

76 if self.energy_expended is not None and not 0 <= self.energy_expended <= UINT16_MAX: 

77 raise ValueError(f"Energy expended must be 0-{UINT16_MAX} kJ, got {self.energy_expended}") 

78 for interval in self.rr_intervals: 

79 if not 0.0 <= interval <= (UINT16_MAX / RR_INTERVAL_RESOLUTION): # Max RR interval in seconds 

80 raise ValueError( 

81 f"RR interval must be 0.0-{UINT16_MAX / RR_INTERVAL_RESOLUTION} seconds, got {interval}" 

82 ) 

83 

84 

85class HeartRateMeasurementCharacteristic(BaseCharacteristic): 

86 """Heart Rate Measurement characteristic (0x2A37). 

87 

88 Used in Heart Rate Service (spec: Heart Rate Service 1.0, Heart Rate Profile 1.0) 

89 to transmit instantaneous heart rate plus optional energy expended and 

90 RR-Interval metrics. 

91 

92 Specification summary (flags byte bit assignments - see adopted spec & Errata 23224): 

93 Bit 0 (0x01): Heart Rate Value Format (0 = uint8, 1 = uint16) 

94 Bit 1 (0x02): Sensor Contact Supported 

95 Bit 2 (0x04): Sensor Contact Detected (valid only when Bit1 set) 

96 Bit 3 (0x08): Energy Expended Present (adds 2 bytes, little-endian, in kilo Joules) 

97 Bit 4 (0x10): RR-Interval(s) Present (sequence of 16-bit values, units 1/1024 s) 

98 Bits 5-7: Reserved (must be 0) 

99 

100 Parsing Rules: 

101 * Minimum length: 2 bytes (flags + at least one byte of heart rate value) 

102 * If Bit0 set, heart rate is 16 bits; else 8 bits 

103 * Energy Expended only parsed if flag set AND 2 bytes remain 

104 * RR-Intervals parsed greedily in pairs until buffer end when flag set 

105 * RR-Interval raw value converted to seconds: raw / 1024.0 

106 * Sensor contact state derived from Bits1/2 tri-state (not supported, not detected, detected) 

107 

108 Validation: 

109 * Heart rate validated within 0..UINT16_MAX (spec does not strictly define upper physiological bound) 

110 * RR interval constrained to 0.0-65.535 s (fits 16-bit / 1024 scaling and guards against malformed data) 

111 * Energy expended 0..UINT16_MAX 

112 

113 References: 

114 * Bluetooth SIG Heart Rate Service 1.0 (https://www.bluetooth.com/specifications/specs/heart-rate-service-1-0/) 

115 * Bluetooth SIG Heart Rate Profile 1.0 (https://www.bluetooth.com/specifications/specs/heart-rate-profile-1-0/) 

116 * Errata Correction 23224 (mandatory for compliance) 

117 

118 """ 

119 

120 # RR-Interval resolution: 1/1024 seconds per unit 

121 RR_INTERVAL_RESOLUTION = RR_INTERVAL_RESOLUTION 

122 

123 def decode_value(self, data: bytearray, ctx: CharacteristicContext | None = None) -> HeartRateData: 

124 """Parse heart rate measurement data according to Bluetooth specification. 

125 

126 Format: Flags(1) + Heart Rate Value(1-2) + [Energy Expended(2)] + [RR-Intervals(2*n)] 

127 

128 Args: 

129 data: Raw bytearray from BLE characteristic. 

130 ctx: Optional CharacteristicContext providing surrounding context (may be None). 

131 

132 Returns: 

133 HeartRateData containing parsed heart rate data with metadata. 

134 

135 """ 

136 if len(data) < 2: 

137 raise ValueError("Heart rate measurement data must be at least 2 bytes") 

138 

139 flags = HeartRateMeasurementFlags(data[0]) 

140 offset = 1 

141 

142 # Parse heart rate value (8-bit or 16-bit depending on flag) 

143 if flags & HeartRateMeasurementFlags.HEART_RATE_VALUE_FORMAT_UINT16: # 16-bit heart rate value 

144 if len(data) < offset + 2: 

145 raise ValueError("Insufficient data for 16-bit heart rate value") 

146 heart_rate = DataParser.parse_int16(data, offset, signed=False) 

147 offset += 2 

148 else: # 8-bit heart rate value 

149 heart_rate = DataParser.parse_int8(data, offset, signed=False) 

150 offset += 1 

151 

152 sensor_contact = SensorContactState.from_flags(flags) 

153 

154 # Optional Energy Expended 

155 energy_expended = None 

156 if (flags & HeartRateMeasurementFlags.ENERGY_EXPENDED_PRESENT) and len(data) >= offset + 2: 

157 energy_expended = DataParser.parse_int16(data, offset, signed=False) 

158 offset += 2 

159 

160 # Optional RR-Intervals 

161 rr_intervals: list[float] = [] 

162 if (flags & HeartRateMeasurementFlags.RR_INTERVAL_PRESENT) and len(data) >= offset + 2: 

163 while offset + 2 <= len(data): 

164 rr_interval_raw = DataParser.parse_int16(data, offset, signed=False) 

165 rr_intervals.append(rr_interval_raw / RR_INTERVAL_RESOLUTION) 

166 offset += 2 

167 

168 return HeartRateData( 

169 heart_rate=heart_rate, 

170 sensor_contact=sensor_contact, 

171 energy_expended=energy_expended, 

172 rr_intervals=tuple(rr_intervals), # Convert list to tuple for immutable struct 

173 flags=flags, 

174 ) 

175 

176 def encode_value(self, data: HeartRateData) -> bytearray: 

177 """Encode HeartRateData back to bytes. 

178 

179 The inverse of decode_value respecting the same flag semantics. 

180 

181 Args: 

182 data: HeartRateData instance to encode 

183 

184 Returns: 

185 Encoded bytes representing the heart rate measurement 

186 

187 """ 

188 flags = int(data.flags) # Use the flags from the data structure 

189 

190 result = bytearray([flags]) 

191 

192 if flags & HeartRateMeasurementFlags.HEART_RATE_VALUE_FORMAT_UINT16: 

193 result.extend(DataParser.encode_int16(data.heart_rate, signed=False)) 

194 else: 

195 result.extend(DataParser.encode_int8(data.heart_rate, signed=False)) 

196 

197 if data.energy_expended is not None: 

198 result.extend(DataParser.encode_int16(data.energy_expended, signed=False)) 

199 

200 for rr_interval in data.rr_intervals: 

201 rr_raw = round(rr_interval * RR_INTERVAL_RESOLUTION) 

202 rr_raw = min(rr_raw, UINT16_MAX) 

203 result.extend(DataParser.encode_int16(rr_raw, signed=False)) 

204 

205 return result