Coverage for src / bluetooth_sig / gatt / characteristics / heart_rate_measurement.py: 91%

101 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 11:17 +0000

1"""Heart Rate Measurement characteristic implementation.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6from enum import IntEnum, IntFlag 

7from typing import Any, ClassVar 

8 

9import msgspec 

10 

11from ...types.gatt_enums import CharacteristicName 

12from ..constants import UINT16_MAX 

13from ..context import CharacteristicContext 

14from .base import BaseCharacteristic 

15from .body_sensor_location import BodySensorLocation, BodySensorLocationCharacteristic 

16from .utils import DataParser 

17 

18logger = logging.getLogger(__name__) 

19 

20# RR-Interval resolution: 1/1024 seconds per unit 

21RR_INTERVAL_RESOLUTION = 1024.0 

22 

23 

24class HeartRateMeasurementFlags(IntFlag): 

25 """Heart Rate Measurement flags as per Bluetooth SIG specification.""" 

26 

27 HEART_RATE_VALUE_FORMAT_UINT16 = 0x01 

28 SENSOR_CONTACT_SUPPORTED = 0x02 

29 SENSOR_CONTACT_DETECTED = 0x04 

30 ENERGY_EXPENDED_PRESENT = 0x08 

31 RR_INTERVAL_PRESENT = 0x10 

32 

33 

34class SensorContactState(IntEnum): 

35 """Sensor contact state enumeration.""" 

36 

37 NOT_SUPPORTED = 0 

38 NOT_DETECTED = 1 

39 DETECTED = 2 

40 

41 def __str__(self) -> str: 

42 """Return human-readable sensor contact state.""" 

43 return {0: "not_supported", 1: "not_detected", 2: "detected"}[self.value] 

44 

45 def __eq__(self, other: object) -> bool: 

46 """Support comparison with string values for backward compatibility.""" 

47 if isinstance(other, str): 

48 return str(self) == other 

49 return super().__eq__(other) 

50 

51 def __hash__(self) -> int: 

52 """Make enum hashable.""" 

53 return super().__hash__() 

54 

55 @classmethod 

56 def from_flags(cls, flags: int) -> SensorContactState: 

57 """Create enum from heart rate flags.""" 

58 if not flags & HeartRateMeasurementFlags.SENSOR_CONTACT_SUPPORTED: # Sensor Contact Supported bit not set 

59 return cls.NOT_SUPPORTED 

60 if flags & HeartRateMeasurementFlags.SENSOR_CONTACT_DETECTED: # Sensor Contact Detected bit set 

61 return cls.DETECTED 

62 return cls.NOT_DETECTED 

63 

64 

65class HeartRateData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods 

66 """Parsed data from Heart Rate Measurement characteristic. 

67 

68 Attributes: 

69 heart_rate: Heart rate in beats per minute (0-UINT16_MAX) 

70 sensor_contact: State of sensor contact detection 

71 energy_expended: Optional energy expended in kilojoules 

72 rr_intervals: Tuple of R-R intervals in seconds (immutable) 

73 flags: Raw flags byte for reference 

74 sensor_location: Optional body sensor location from context (BodySensorLocation enum) 

75 """ 

76 

77 heart_rate: int # BPM (0-UINT16_MAX) 

78 sensor_contact: SensorContactState 

79 energy_expended: int | None = None # kJ 

80 rr_intervals: tuple[float, ...] = () # R-R intervals in seconds, immutable tuple 

81 flags: HeartRateMeasurementFlags = HeartRateMeasurementFlags(0) # Raw flags byte for reference 

82 sensor_location: BodySensorLocation | None = None # Body sensor location from context (0x2A38) 

83 

84 def __post_init__(self) -> None: 

85 """Validate heart rate measurement data.""" 

86 if not 0 <= self.heart_rate <= UINT16_MAX: 

87 raise ValueError(f"Heart rate must be 0-{UINT16_MAX} bpm, got {self.heart_rate}") 

88 if self.energy_expended is not None and not 0 <= self.energy_expended <= UINT16_MAX: 

89 raise ValueError(f"Energy expended must be 0-{UINT16_MAX} kJ, got {self.energy_expended}") 

90 for interval in self.rr_intervals: 

91 if not 0.0 <= interval <= (UINT16_MAX / RR_INTERVAL_RESOLUTION): # Max RR interval in seconds 

92 raise ValueError( 

93 f"RR interval must be 0.0-{UINT16_MAX / RR_INTERVAL_RESOLUTION} seconds, got {interval}" 

94 ) 

95 

96 

97class HeartRateMeasurementCharacteristic(BaseCharacteristic[HeartRateData]): 

98 """Heart Rate Measurement characteristic (0x2A37). 

99 

100 Used in Heart Rate Service (spec: Heart Rate Service 1.0, Heart Rate Profile 1.0) 

101 to transmit instantaneous heart rate plus optional energy expended and 

102 RR-Interval metrics. 

103 

104 Specification summary (flags byte bit assignments - see adopted spec & Errata 23224): 

105 Bit 0 (0x01): Heart Rate Value Format (0 = uint8, 1 = uint16) 

106 Bit 1 (0x02): Sensor Contact Supported 

107 Bit 2 (0x04): Sensor Contact Detected (valid only when Bit1 set) 

108 Bit 3 (0x08): Energy Expended Present (adds 2 bytes, little-endian, in kilo Joules) 

109 Bit 4 (0x10): RR-Interval(s) Present (sequence of 16-bit values, units 1/1024 s) 

110 Bits 5-7: Reserved (must be 0) 

111 

112 Parsing Rules: 

113 * Minimum length: 2 bytes (flags + at least one byte of heart rate value) 

114 * If Bit0 set, heart rate is 16 bits; else 8 bits 

115 * Energy Expended only parsed if flag set AND 2 bytes remain 

116 * RR-Intervals parsed greedily in pairs until buffer end when flag set 

117 * RR-Interval raw value converted to seconds: raw / 1024.0 

118 * Sensor contact state derived from Bits1/2 tri-state (not supported, not detected, detected) 

119 

120 Validation: 

121 * Heart rate validated within 0..UINT16_MAX (spec does not strictly define upper physiological bound) 

122 * RR interval constrained to 0.0-65.535 s (fits 16-bit / 1024 scaling and guards against malformed data) 

123 * Energy expended 0..UINT16_MAX 

124 

125 References: 

126 * Bluetooth SIG Heart Rate Service 1.0 (https://www.bluetooth.com/specifications/specs/heart-rate-service-1-0/) 

127 * Bluetooth SIG Heart Rate Profile 1.0 (https://www.bluetooth.com/specifications/specs/heart-rate-profile-1-0/) 

128 * Errata Correction 23224 (mandatory for compliance) 

129 

130 """ 

131 

132 _optional_dependencies: ClassVar[list[type[BaseCharacteristic[Any]]]] = [BodySensorLocationCharacteristic] 

133 

134 # RR-Interval resolution: 1/1024 seconds per unit 

135 RR_INTERVAL_RESOLUTION = RR_INTERVAL_RESOLUTION 

136 

137 min_length: int = 2 # Flags(1) + HR(1/2) 

138 allow_variable_length: bool = True # Optional energy expended and RR intervals 

139 

140 def _decode_value( # pylint: disable=too-many-branches # Branches needed for spec-compliant parsing 

141 self, data: bytearray, ctx: CharacteristicContext | None = None, *, validate: bool = True 

142 ) -> HeartRateData: 

143 """Parse heart rate measurement data according to Bluetooth specification. 

144 

145 Format: Flags(1) + Heart Rate Value(1-2) + [Energy Expended(2)] + [RR-Intervals(2*n)] 

146 

147 Context Enhancement: 

148 If ctx is provided, this method will attempt to enhance the parsed data with: 

149 - Body Sensor Location (0x2A38): Location where the sensor is positioned 

150 

151 Args: 

152 data: Raw bytearray from BLE characteristic. 

153 ctx: Optional CharacteristicContext providing surrounding context (may be None). 

154 validate: Whether to perform validation (currently unused). 

155 

156 Returns: 

157 HeartRateData containing parsed heart rate data with metadata and optional 

158 context-enhanced information. 

159 

160 """ 

161 flags = HeartRateMeasurementFlags(data[0]) 

162 offset = 1 

163 

164 # Parse heart rate value (8-bit or 16-bit depending on flag) 

165 if flags & HeartRateMeasurementFlags.HEART_RATE_VALUE_FORMAT_UINT16: # 16-bit heart rate value 

166 if len(data) < offset + 2: 

167 raise ValueError("Insufficient data for 16-bit heart rate value") 

168 heart_rate = DataParser.parse_int16(data, offset, signed=False) 

169 offset += 2 

170 else: # 8-bit heart rate value 

171 heart_rate = DataParser.parse_int8(data, offset, signed=False) 

172 offset += 1 

173 

174 sensor_contact = SensorContactState.from_flags(flags) 

175 

176 # Optional Energy Expended 

177 energy_expended = None 

178 if (flags & HeartRateMeasurementFlags.ENERGY_EXPENDED_PRESENT) and len(data) >= offset + 2: 

179 energy_expended = DataParser.parse_int16(data, offset, signed=False) 

180 offset += 2 

181 

182 # Optional RR-Intervals 

183 rr_intervals: list[float] = [] 

184 if (flags & HeartRateMeasurementFlags.RR_INTERVAL_PRESENT) and len(data) >= offset + 2: 

185 while offset + 2 <= len(data): 

186 rr_interval_raw = DataParser.parse_int16(data, offset, signed=False) 

187 rr_intervals.append(rr_interval_raw / RR_INTERVAL_RESOLUTION) 

188 offset += 2 

189 

190 # Context enhancement: Body Sensor Location 

191 sensor_location = None 

192 if ctx: 

193 location_value = self.get_context_characteristic(ctx, CharacteristicName.BODY_SENSOR_LOCATION) 

194 if location_value is not None: 

195 # Body Sensor Location is a uint8 enum value (0-6) 

196 try: 

197 sensor_location = BodySensorLocation(location_value) 

198 except ValueError: 

199 # Invalid value outside enum range, leave as None 

200 logger.warning( 

201 "Invalid Body Sensor Location value %s in context (valid range: 0-6), ignoring", 

202 location_value, 

203 ) 

204 

205 return HeartRateData( 

206 heart_rate=heart_rate, 

207 sensor_contact=sensor_contact, 

208 energy_expended=energy_expended, 

209 rr_intervals=tuple(rr_intervals), # Convert list to tuple for immutable struct 

210 flags=flags, 

211 sensor_location=sensor_location, 

212 ) 

213 

214 def _encode_value(self, data: HeartRateData) -> bytearray: 

215 """Encode HeartRateData back to bytes. 

216 

217 The inverse of decode_value respecting the same flag semantics. 

218 

219 Args: 

220 data: HeartRateData instance to encode 

221 

222 Returns: 

223 Encoded bytes representing the heart rate measurement 

224 

225 """ 

226 flags = int(data.flags) # Use the flags from the data structure 

227 

228 result = bytearray([flags]) 

229 

230 if flags & HeartRateMeasurementFlags.HEART_RATE_VALUE_FORMAT_UINT16: 

231 result.extend(DataParser.encode_int16(data.heart_rate, signed=False)) 

232 else: 

233 result.extend(DataParser.encode_int8(data.heart_rate, signed=False)) 

234 

235 if data.energy_expended is not None: 

236 result.extend(DataParser.encode_int16(data.energy_expended, signed=False)) 

237 

238 for rr_interval in data.rr_intervals: 

239 rr_raw = round(rr_interval * RR_INTERVAL_RESOLUTION) 

240 rr_raw = min(rr_raw, UINT16_MAX) 

241 result.extend(DataParser.encode_int16(rr_raw, signed=False)) 

242 

243 return result