Coverage for src / bluetooth_sig / gatt / characteristics / heart_rate_measurement.py: 89%
102 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 20:14 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 20:14 +0000
1"""Heart Rate Measurement characteristic implementation."""
3from __future__ import annotations
5import logging
6from enum import IntEnum, IntFlag
8import msgspec
10from ...types.gatt_enums import CharacteristicName
11from ..constants import UINT16_MAX
12from ..context import CharacteristicContext
13from .base import BaseCharacteristic
14from .body_sensor_location import BodySensorLocation, BodySensorLocationCharacteristic
15from .utils import DataParser
17logger = logging.getLogger(__name__)
19# RR-Interval resolution: 1/1024 seconds per unit
20RR_INTERVAL_RESOLUTION = 1024.0
23class HeartRateMeasurementFlags(IntFlag):
24 """Heart Rate Measurement flags as per Bluetooth SIG specification."""
26 HEART_RATE_VALUE_FORMAT_UINT16 = 0x01
27 SENSOR_CONTACT_SUPPORTED = 0x02
28 SENSOR_CONTACT_DETECTED = 0x04
29 ENERGY_EXPENDED_PRESENT = 0x08
30 RR_INTERVAL_PRESENT = 0x10
33class SensorContactState(IntEnum):
34 """Sensor contact state enumeration."""
36 NOT_SUPPORTED = 0
37 NOT_DETECTED = 1
38 DETECTED = 2
40 def __str__(self) -> str:
41 """Return human-readable sensor contact state."""
42 return {0: "not_supported", 1: "not_detected", 2: "detected"}[self.value]
44 def __eq__(self, other: object) -> bool:
45 """Support comparison with string values for backward compatibility."""
46 if isinstance(other, str):
47 return str(self) == other
48 return super().__eq__(other)
50 def __hash__(self) -> int:
51 """Make enum hashable."""
52 return super().__hash__()
54 @classmethod
55 def from_flags(cls, flags: int) -> SensorContactState:
56 """Create enum from heart rate flags."""
57 if not flags & HeartRateMeasurementFlags.SENSOR_CONTACT_SUPPORTED: # Sensor Contact Supported bit not set
58 return cls.NOT_SUPPORTED
59 if flags & HeartRateMeasurementFlags.SENSOR_CONTACT_DETECTED: # Sensor Contact Detected bit set
60 return cls.DETECTED
61 return cls.NOT_DETECTED
64class HeartRateData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods
65 """Parsed data from Heart Rate Measurement characteristic.
67 Attributes:
68 heart_rate: Heart rate in beats per minute (0-UINT16_MAX)
69 sensor_contact: State of sensor contact detection
70 energy_expended: Optional energy expended in kilojoules
71 rr_intervals: Tuple of R-R intervals in seconds (immutable)
72 flags: Raw flags byte for reference
73 sensor_location: Optional body sensor location from context (BodySensorLocation enum)
74 """
76 heart_rate: int # BPM (0-UINT16_MAX)
77 sensor_contact: SensorContactState
78 energy_expended: int | None = None # kJ
79 rr_intervals: tuple[float, ...] = () # R-R intervals in seconds, immutable tuple
80 flags: HeartRateMeasurementFlags = HeartRateMeasurementFlags(0) # Raw flags byte for reference
81 sensor_location: BodySensorLocation | None = None # Body sensor location from context (0x2A38)
83 def __post_init__(self) -> None:
84 """Validate heart rate measurement data."""
85 if not 0 <= self.heart_rate <= UINT16_MAX:
86 raise ValueError(f"Heart rate must be 0-{UINT16_MAX} bpm, got {self.heart_rate}")
87 if self.energy_expended is not None and not 0 <= self.energy_expended <= UINT16_MAX:
88 raise ValueError(f"Energy expended must be 0-{UINT16_MAX} kJ, got {self.energy_expended}")
89 for interval in self.rr_intervals:
90 if not 0.0 <= interval <= (UINT16_MAX / RR_INTERVAL_RESOLUTION): # Max RR interval in seconds
91 raise ValueError(
92 f"RR interval must be 0.0-{UINT16_MAX / RR_INTERVAL_RESOLUTION} seconds, got {interval}"
93 )
96class HeartRateMeasurementCharacteristic(BaseCharacteristic[HeartRateData]):
97 """Heart Rate Measurement characteristic (0x2A37).
99 Used in Heart Rate Service (spec: Heart Rate Service 1.0, Heart Rate Profile 1.0)
100 to transmit instantaneous heart rate plus optional energy expended and
101 RR-Interval metrics.
103 Specification summary (flags byte bit assignments - see adopted spec & Errata 23224):
104 Bit 0 (0x01): Heart Rate Value Format (0 = uint8, 1 = uint16)
105 Bit 1 (0x02): Sensor Contact Supported
106 Bit 2 (0x04): Sensor Contact Detected (valid only when Bit1 set)
107 Bit 3 (0x08): Energy Expended Present (adds 2 bytes, little-endian, in kilo Joules)
108 Bit 4 (0x10): RR-Interval(s) Present (sequence of 16-bit values, units 1/1024 s)
109 Bits 5-7: Reserved (must be 0)
111 Parsing Rules:
112 * Minimum length: 2 bytes (flags + at least one byte of heart rate value)
113 * If Bit0 set, heart rate is 16 bits; else 8 bits
114 * Energy Expended only parsed if flag set AND 2 bytes remain
115 * RR-Intervals parsed greedily in pairs until buffer end when flag set
116 * RR-Interval raw value converted to seconds: raw / 1024.0
117 * Sensor contact state derived from Bits1/2 tri-state (not supported, not detected, detected)
119 Validation:
120 * Heart rate validated within 0..UINT16_MAX (spec does not strictly define upper physiological bound)
121 * RR interval constrained to 0.0-65.535 s (fits 16-bit / 1024 scaling and guards against malformed data)
122 * Energy expended 0..UINT16_MAX
124 References:
125 * Bluetooth SIG Heart Rate Service 1.0 (https://www.bluetooth.com/specifications/specs/heart-rate-service-1-0/)
126 * Bluetooth SIG Heart Rate Profile 1.0 (https://www.bluetooth.com/specifications/specs/heart-rate-profile-1-0/)
127 * Errata Correction 23224 (mandatory for compliance)
129 """
131 _optional_dependencies = [BodySensorLocationCharacteristic]
133 # RR-Interval resolution: 1/1024 seconds per unit
134 RR_INTERVAL_RESOLUTION = RR_INTERVAL_RESOLUTION
136 min_length: int = 2 # Flags(1) + HR(1/2)
137 allow_variable_length: bool = True # Optional energy expended and RR intervals
139 def _decode_value( # pylint: disable=too-many-branches # Branches needed for spec-compliant parsing
140 self, data: bytearray, ctx: CharacteristicContext | None = None
141 ) -> HeartRateData:
142 """Parse heart rate measurement data according to Bluetooth specification.
144 Format: Flags(1) + Heart Rate Value(1-2) + [Energy Expended(2)] + [RR-Intervals(2*n)]
146 Context Enhancement:
147 If ctx is provided, this method will attempt to enhance the parsed data with:
148 - Body Sensor Location (0x2A38): Location where the sensor is positioned
150 Args:
151 data: Raw bytearray from BLE characteristic.
152 ctx: Optional CharacteristicContext providing surrounding context (may be None).
154 Returns:
155 HeartRateData containing parsed heart rate data with metadata and optional
156 context-enhanced information.
158 """
159 if len(data) < 2:
160 raise ValueError("Heart rate measurement data must be at least 2 bytes")
162 flags = HeartRateMeasurementFlags(data[0])
163 offset = 1
165 # Parse heart rate value (8-bit or 16-bit depending on flag)
166 if flags & HeartRateMeasurementFlags.HEART_RATE_VALUE_FORMAT_UINT16: # 16-bit heart rate value
167 if len(data) < offset + 2:
168 raise ValueError("Insufficient data for 16-bit heart rate value")
169 heart_rate = DataParser.parse_int16(data, offset, signed=False)
170 offset += 2
171 else: # 8-bit heart rate value
172 heart_rate = DataParser.parse_int8(data, offset, signed=False)
173 offset += 1
175 sensor_contact = SensorContactState.from_flags(flags)
177 # Optional Energy Expended
178 energy_expended = None
179 if (flags & HeartRateMeasurementFlags.ENERGY_EXPENDED_PRESENT) and len(data) >= offset + 2:
180 energy_expended = DataParser.parse_int16(data, offset, signed=False)
181 offset += 2
183 # Optional RR-Intervals
184 rr_intervals: list[float] = []
185 if (flags & HeartRateMeasurementFlags.RR_INTERVAL_PRESENT) and len(data) >= offset + 2:
186 while offset + 2 <= len(data):
187 rr_interval_raw = DataParser.parse_int16(data, offset, signed=False)
188 rr_intervals.append(rr_interval_raw / RR_INTERVAL_RESOLUTION)
189 offset += 2
191 # Context enhancement: Body Sensor Location
192 sensor_location = None
193 if ctx:
194 location_value = self.get_context_characteristic(ctx, CharacteristicName.BODY_SENSOR_LOCATION)
195 if location_value is not None:
196 # Body Sensor Location is a uint8 enum value (0-6)
197 try:
198 sensor_location = BodySensorLocation(location_value)
199 except ValueError:
200 # Invalid value outside enum range, leave as None
201 logger.warning(
202 f"Invalid Body Sensor Location value {location_value} in context (valid range: 0-6), ignoring"
203 )
205 return HeartRateData(
206 heart_rate=heart_rate,
207 sensor_contact=sensor_contact,
208 energy_expended=energy_expended,
209 rr_intervals=tuple(rr_intervals), # Convert list to tuple for immutable struct
210 flags=flags,
211 sensor_location=sensor_location,
212 )
214 def _encode_value(self, data: HeartRateData) -> bytearray:
215 """Encode HeartRateData back to bytes.
217 The inverse of decode_value respecting the same flag semantics.
219 Args:
220 data: HeartRateData instance to encode
222 Returns:
223 Encoded bytes representing the heart rate measurement
225 """
226 flags = int(data.flags) # Use the flags from the data structure
228 result = bytearray([flags])
230 if flags & HeartRateMeasurementFlags.HEART_RATE_VALUE_FORMAT_UINT16:
231 result.extend(DataParser.encode_int16(data.heart_rate, signed=False))
232 else:
233 result.extend(DataParser.encode_int8(data.heart_rate, signed=False))
235 if data.energy_expended is not None:
236 result.extend(DataParser.encode_int16(data.energy_expended, signed=False))
238 for rr_interval in data.rr_intervals:
239 rr_raw = round(rr_interval * RR_INTERVAL_RESOLUTION)
240 rr_raw = min(rr_raw, UINT16_MAX)
241 result.extend(DataParser.encode_int16(rr_raw, signed=False))
243 return result