Coverage for src / bluetooth_sig / gatt / characteristics / heart_rate_measurement.py: 89%

102 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 20:14 +0000

1"""Heart Rate Measurement characteristic implementation.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6from enum import IntEnum, IntFlag 

7 

8import msgspec 

9 

10from ...types.gatt_enums import CharacteristicName 

11from ..constants import UINT16_MAX 

12from ..context import CharacteristicContext 

13from .base import BaseCharacteristic 

14from .body_sensor_location import BodySensorLocation, BodySensorLocationCharacteristic 

15from .utils import DataParser 

16 

17logger = logging.getLogger(__name__) 

18 

19# RR-Interval resolution: 1/1024 seconds per unit 

20RR_INTERVAL_RESOLUTION = 1024.0 

21 

22 

23class HeartRateMeasurementFlags(IntFlag): 

24 """Heart Rate Measurement flags as per Bluetooth SIG specification.""" 

25 

26 HEART_RATE_VALUE_FORMAT_UINT16 = 0x01 

27 SENSOR_CONTACT_SUPPORTED = 0x02 

28 SENSOR_CONTACT_DETECTED = 0x04 

29 ENERGY_EXPENDED_PRESENT = 0x08 

30 RR_INTERVAL_PRESENT = 0x10 

31 

32 

33class SensorContactState(IntEnum): 

34 """Sensor contact state enumeration.""" 

35 

36 NOT_SUPPORTED = 0 

37 NOT_DETECTED = 1 

38 DETECTED = 2 

39 

40 def __str__(self) -> str: 

41 """Return human-readable sensor contact state.""" 

42 return {0: "not_supported", 1: "not_detected", 2: "detected"}[self.value] 

43 

44 def __eq__(self, other: object) -> bool: 

45 """Support comparison with string values for backward compatibility.""" 

46 if isinstance(other, str): 

47 return str(self) == other 

48 return super().__eq__(other) 

49 

50 def __hash__(self) -> int: 

51 """Make enum hashable.""" 

52 return super().__hash__() 

53 

54 @classmethod 

55 def from_flags(cls, flags: int) -> SensorContactState: 

56 """Create enum from heart rate flags.""" 

57 if not flags & HeartRateMeasurementFlags.SENSOR_CONTACT_SUPPORTED: # Sensor Contact Supported bit not set 

58 return cls.NOT_SUPPORTED 

59 if flags & HeartRateMeasurementFlags.SENSOR_CONTACT_DETECTED: # Sensor Contact Detected bit set 

60 return cls.DETECTED 

61 return cls.NOT_DETECTED 

62 

63 

64class HeartRateData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods 

65 """Parsed data from Heart Rate Measurement characteristic. 

66 

67 Attributes: 

68 heart_rate: Heart rate in beats per minute (0-UINT16_MAX) 

69 sensor_contact: State of sensor contact detection 

70 energy_expended: Optional energy expended in kilojoules 

71 rr_intervals: Tuple of R-R intervals in seconds (immutable) 

72 flags: Raw flags byte for reference 

73 sensor_location: Optional body sensor location from context (BodySensorLocation enum) 

74 """ 

75 

76 heart_rate: int # BPM (0-UINT16_MAX) 

77 sensor_contact: SensorContactState 

78 energy_expended: int | None = None # kJ 

79 rr_intervals: tuple[float, ...] = () # R-R intervals in seconds, immutable tuple 

80 flags: HeartRateMeasurementFlags = HeartRateMeasurementFlags(0) # Raw flags byte for reference 

81 sensor_location: BodySensorLocation | None = None # Body sensor location from context (0x2A38) 

82 

83 def __post_init__(self) -> None: 

84 """Validate heart rate measurement data.""" 

85 if not 0 <= self.heart_rate <= UINT16_MAX: 

86 raise ValueError(f"Heart rate must be 0-{UINT16_MAX} bpm, got {self.heart_rate}") 

87 if self.energy_expended is not None and not 0 <= self.energy_expended <= UINT16_MAX: 

88 raise ValueError(f"Energy expended must be 0-{UINT16_MAX} kJ, got {self.energy_expended}") 

89 for interval in self.rr_intervals: 

90 if not 0.0 <= interval <= (UINT16_MAX / RR_INTERVAL_RESOLUTION): # Max RR interval in seconds 

91 raise ValueError( 

92 f"RR interval must be 0.0-{UINT16_MAX / RR_INTERVAL_RESOLUTION} seconds, got {interval}" 

93 ) 

94 

95 

96class HeartRateMeasurementCharacteristic(BaseCharacteristic[HeartRateData]): 

97 """Heart Rate Measurement characteristic (0x2A37). 

98 

99 Used in Heart Rate Service (spec: Heart Rate Service 1.0, Heart Rate Profile 1.0) 

100 to transmit instantaneous heart rate plus optional energy expended and 

101 RR-Interval metrics. 

102 

103 Specification summary (flags byte bit assignments - see adopted spec & Errata 23224): 

104 Bit 0 (0x01): Heart Rate Value Format (0 = uint8, 1 = uint16) 

105 Bit 1 (0x02): Sensor Contact Supported 

106 Bit 2 (0x04): Sensor Contact Detected (valid only when Bit1 set) 

107 Bit 3 (0x08): Energy Expended Present (adds 2 bytes, little-endian, in kilo Joules) 

108 Bit 4 (0x10): RR-Interval(s) Present (sequence of 16-bit values, units 1/1024 s) 

109 Bits 5-7: Reserved (must be 0) 

110 

111 Parsing Rules: 

112 * Minimum length: 2 bytes (flags + at least one byte of heart rate value) 

113 * If Bit0 set, heart rate is 16 bits; else 8 bits 

114 * Energy Expended only parsed if flag set AND 2 bytes remain 

115 * RR-Intervals parsed greedily in pairs until buffer end when flag set 

116 * RR-Interval raw value converted to seconds: raw / 1024.0 

117 * Sensor contact state derived from Bits1/2 tri-state (not supported, not detected, detected) 

118 

119 Validation: 

120 * Heart rate validated within 0..UINT16_MAX (spec does not strictly define upper physiological bound) 

121 * RR interval constrained to 0.0-65.535 s (fits 16-bit / 1024 scaling and guards against malformed data) 

122 * Energy expended 0..UINT16_MAX 

123 

124 References: 

125 * Bluetooth SIG Heart Rate Service 1.0 (https://www.bluetooth.com/specifications/specs/heart-rate-service-1-0/) 

126 * Bluetooth SIG Heart Rate Profile 1.0 (https://www.bluetooth.com/specifications/specs/heart-rate-profile-1-0/) 

127 * Errata Correction 23224 (mandatory for compliance) 

128 

129 """ 

130 

131 _optional_dependencies = [BodySensorLocationCharacteristic] 

132 

133 # RR-Interval resolution: 1/1024 seconds per unit 

134 RR_INTERVAL_RESOLUTION = RR_INTERVAL_RESOLUTION 

135 

136 min_length: int = 2 # Flags(1) + HR(1/2) 

137 allow_variable_length: bool = True # Optional energy expended and RR intervals 

138 

139 def _decode_value( # pylint: disable=too-many-branches # Branches needed for spec-compliant parsing 

140 self, data: bytearray, ctx: CharacteristicContext | None = None 

141 ) -> HeartRateData: 

142 """Parse heart rate measurement data according to Bluetooth specification. 

143 

144 Format: Flags(1) + Heart Rate Value(1-2) + [Energy Expended(2)] + [RR-Intervals(2*n)] 

145 

146 Context Enhancement: 

147 If ctx is provided, this method will attempt to enhance the parsed data with: 

148 - Body Sensor Location (0x2A38): Location where the sensor is positioned 

149 

150 Args: 

151 data: Raw bytearray from BLE characteristic. 

152 ctx: Optional CharacteristicContext providing surrounding context (may be None). 

153 

154 Returns: 

155 HeartRateData containing parsed heart rate data with metadata and optional 

156 context-enhanced information. 

157 

158 """ 

159 if len(data) < 2: 

160 raise ValueError("Heart rate measurement data must be at least 2 bytes") 

161 

162 flags = HeartRateMeasurementFlags(data[0]) 

163 offset = 1 

164 

165 # Parse heart rate value (8-bit or 16-bit depending on flag) 

166 if flags & HeartRateMeasurementFlags.HEART_RATE_VALUE_FORMAT_UINT16: # 16-bit heart rate value 

167 if len(data) < offset + 2: 

168 raise ValueError("Insufficient data for 16-bit heart rate value") 

169 heart_rate = DataParser.parse_int16(data, offset, signed=False) 

170 offset += 2 

171 else: # 8-bit heart rate value 

172 heart_rate = DataParser.parse_int8(data, offset, signed=False) 

173 offset += 1 

174 

175 sensor_contact = SensorContactState.from_flags(flags) 

176 

177 # Optional Energy Expended 

178 energy_expended = None 

179 if (flags & HeartRateMeasurementFlags.ENERGY_EXPENDED_PRESENT) and len(data) >= offset + 2: 

180 energy_expended = DataParser.parse_int16(data, offset, signed=False) 

181 offset += 2 

182 

183 # Optional RR-Intervals 

184 rr_intervals: list[float] = [] 

185 if (flags & HeartRateMeasurementFlags.RR_INTERVAL_PRESENT) and len(data) >= offset + 2: 

186 while offset + 2 <= len(data): 

187 rr_interval_raw = DataParser.parse_int16(data, offset, signed=False) 

188 rr_intervals.append(rr_interval_raw / RR_INTERVAL_RESOLUTION) 

189 offset += 2 

190 

191 # Context enhancement: Body Sensor Location 

192 sensor_location = None 

193 if ctx: 

194 location_value = self.get_context_characteristic(ctx, CharacteristicName.BODY_SENSOR_LOCATION) 

195 if location_value is not None: 

196 # Body Sensor Location is a uint8 enum value (0-6) 

197 try: 

198 sensor_location = BodySensorLocation(location_value) 

199 except ValueError: 

200 # Invalid value outside enum range, leave as None 

201 logger.warning( 

202 f"Invalid Body Sensor Location value {location_value} in context (valid range: 0-6), ignoring" 

203 ) 

204 

205 return HeartRateData( 

206 heart_rate=heart_rate, 

207 sensor_contact=sensor_contact, 

208 energy_expended=energy_expended, 

209 rr_intervals=tuple(rr_intervals), # Convert list to tuple for immutable struct 

210 flags=flags, 

211 sensor_location=sensor_location, 

212 ) 

213 

214 def _encode_value(self, data: HeartRateData) -> bytearray: 

215 """Encode HeartRateData back to bytes. 

216 

217 The inverse of decode_value respecting the same flag semantics. 

218 

219 Args: 

220 data: HeartRateData instance to encode 

221 

222 Returns: 

223 Encoded bytes representing the heart rate measurement 

224 

225 """ 

226 flags = int(data.flags) # Use the flags from the data structure 

227 

228 result = bytearray([flags]) 

229 

230 if flags & HeartRateMeasurementFlags.HEART_RATE_VALUE_FORMAT_UINT16: 

231 result.extend(DataParser.encode_int16(data.heart_rate, signed=False)) 

232 else: 

233 result.extend(DataParser.encode_int8(data.heart_rate, signed=False)) 

234 

235 if data.energy_expended is not None: 

236 result.extend(DataParser.encode_int16(data.energy_expended, signed=False)) 

237 

238 for rr_interval in data.rr_intervals: 

239 rr_raw = round(rr_interval * RR_INTERVAL_RESOLUTION) 

240 rr_raw = min(rr_raw, UINT16_MAX) 

241 result.extend(DataParser.encode_int16(rr_raw, signed=False)) 

242 

243 return result