Coverage for src / bluetooth_sig / gatt / characteristics / heart_rate_measurement.py: 91%
101 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
1"""Heart Rate Measurement characteristic implementation."""
3from __future__ import annotations
5import logging
6from enum import IntEnum, IntFlag
7from typing import Any, ClassVar
9import msgspec
11from ...types.gatt_enums import CharacteristicName
12from ..constants import UINT16_MAX
13from ..context import CharacteristicContext
14from .base import BaseCharacteristic
15from .body_sensor_location import BodySensorLocation, BodySensorLocationCharacteristic
16from .utils import DataParser
18logger = logging.getLogger(__name__)
20# RR-Interval resolution: 1/1024 seconds per unit
21RR_INTERVAL_RESOLUTION = 1024.0
24class HeartRateMeasurementFlags(IntFlag):
25 """Heart Rate Measurement flags as per Bluetooth SIG specification."""
27 HEART_RATE_VALUE_FORMAT_UINT16 = 0x01
28 SENSOR_CONTACT_SUPPORTED = 0x02
29 SENSOR_CONTACT_DETECTED = 0x04
30 ENERGY_EXPENDED_PRESENT = 0x08
31 RR_INTERVAL_PRESENT = 0x10
34class SensorContactState(IntEnum):
35 """Sensor contact state enumeration."""
37 NOT_SUPPORTED = 0
38 NOT_DETECTED = 1
39 DETECTED = 2
41 def __str__(self) -> str:
42 """Return human-readable sensor contact state."""
43 return {0: "not_supported", 1: "not_detected", 2: "detected"}[self.value]
45 def __eq__(self, other: object) -> bool:
46 """Support comparison with string values for backward compatibility."""
47 if isinstance(other, str):
48 return str(self) == other
49 return super().__eq__(other)
51 def __hash__(self) -> int:
52 """Make enum hashable."""
53 return super().__hash__()
55 @classmethod
56 def from_flags(cls, flags: int) -> SensorContactState:
57 """Create enum from heart rate flags."""
58 if not flags & HeartRateMeasurementFlags.SENSOR_CONTACT_SUPPORTED: # Sensor Contact Supported bit not set
59 return cls.NOT_SUPPORTED
60 if flags & HeartRateMeasurementFlags.SENSOR_CONTACT_DETECTED: # Sensor Contact Detected bit set
61 return cls.DETECTED
62 return cls.NOT_DETECTED
65class HeartRateData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods
66 """Parsed data from Heart Rate Measurement characteristic.
68 Attributes:
69 heart_rate: Heart rate in beats per minute (0-UINT16_MAX)
70 sensor_contact: State of sensor contact detection
71 energy_expended: Optional energy expended in kilojoules
72 rr_intervals: Tuple of R-R intervals in seconds (immutable)
73 flags: Raw flags byte for reference
74 sensor_location: Optional body sensor location from context (BodySensorLocation enum)
75 """
77 heart_rate: int # BPM (0-UINT16_MAX)
78 sensor_contact: SensorContactState
79 energy_expended: int | None = None # kJ
80 rr_intervals: tuple[float, ...] = () # R-R intervals in seconds, immutable tuple
81 flags: HeartRateMeasurementFlags = HeartRateMeasurementFlags(0) # Raw flags byte for reference
82 sensor_location: BodySensorLocation | None = None # Body sensor location from context (0x2A38)
84 def __post_init__(self) -> None:
85 """Validate heart rate measurement data."""
86 if not 0 <= self.heart_rate <= UINT16_MAX:
87 raise ValueError(f"Heart rate must be 0-{UINT16_MAX} bpm, got {self.heart_rate}")
88 if self.energy_expended is not None and not 0 <= self.energy_expended <= UINT16_MAX:
89 raise ValueError(f"Energy expended must be 0-{UINT16_MAX} kJ, got {self.energy_expended}")
90 for interval in self.rr_intervals:
91 if not 0.0 <= interval <= (UINT16_MAX / RR_INTERVAL_RESOLUTION): # Max RR interval in seconds
92 raise ValueError(
93 f"RR interval must be 0.0-{UINT16_MAX / RR_INTERVAL_RESOLUTION} seconds, got {interval}"
94 )
97class HeartRateMeasurementCharacteristic(BaseCharacteristic[HeartRateData]):
98 """Heart Rate Measurement characteristic (0x2A37).
100 Used in Heart Rate Service (spec: Heart Rate Service 1.0, Heart Rate Profile 1.0)
101 to transmit instantaneous heart rate plus optional energy expended and
102 RR-Interval metrics.
104 Specification summary (flags byte bit assignments - see adopted spec & Errata 23224):
105 Bit 0 (0x01): Heart Rate Value Format (0 = uint8, 1 = uint16)
106 Bit 1 (0x02): Sensor Contact Supported
107 Bit 2 (0x04): Sensor Contact Detected (valid only when Bit1 set)
108 Bit 3 (0x08): Energy Expended Present (adds 2 bytes, little-endian, in kilo Joules)
109 Bit 4 (0x10): RR-Interval(s) Present (sequence of 16-bit values, units 1/1024 s)
110 Bits 5-7: Reserved (must be 0)
112 Parsing Rules:
113 * Minimum length: 2 bytes (flags + at least one byte of heart rate value)
114 * If Bit0 set, heart rate is 16 bits; else 8 bits
115 * Energy Expended only parsed if flag set AND 2 bytes remain
116 * RR-Intervals parsed greedily in pairs until buffer end when flag set
117 * RR-Interval raw value converted to seconds: raw / 1024.0
118 * Sensor contact state derived from Bits1/2 tri-state (not supported, not detected, detected)
120 Validation:
121 * Heart rate validated within 0..UINT16_MAX (spec does not strictly define upper physiological bound)
122 * RR interval constrained to 0.0-65.535 s (fits 16-bit / 1024 scaling and guards against malformed data)
123 * Energy expended 0..UINT16_MAX
125 References:
126 * Bluetooth SIG Heart Rate Service 1.0 (https://www.bluetooth.com/specifications/specs/heart-rate-service-1-0/)
127 * Bluetooth SIG Heart Rate Profile 1.0 (https://www.bluetooth.com/specifications/specs/heart-rate-profile-1-0/)
128 * Errata Correction 23224 (mandatory for compliance)
130 """
132 _optional_dependencies: ClassVar[list[type[BaseCharacteristic[Any]]]] = [BodySensorLocationCharacteristic]
134 # RR-Interval resolution: 1/1024 seconds per unit
135 RR_INTERVAL_RESOLUTION = RR_INTERVAL_RESOLUTION
137 min_length: int = 2 # Flags(1) + HR(1/2)
138 allow_variable_length: bool = True # Optional energy expended and RR intervals
140 def _decode_value( # pylint: disable=too-many-branches # Branches needed for spec-compliant parsing
141 self, data: bytearray, ctx: CharacteristicContext | None = None, *, validate: bool = True
142 ) -> HeartRateData:
143 """Parse heart rate measurement data according to Bluetooth specification.
145 Format: Flags(1) + Heart Rate Value(1-2) + [Energy Expended(2)] + [RR-Intervals(2*n)]
147 Context Enhancement:
148 If ctx is provided, this method will attempt to enhance the parsed data with:
149 - Body Sensor Location (0x2A38): Location where the sensor is positioned
151 Args:
152 data: Raw bytearray from BLE characteristic.
153 ctx: Optional CharacteristicContext providing surrounding context (may be None).
154 validate: Whether to perform validation (currently unused).
156 Returns:
157 HeartRateData containing parsed heart rate data with metadata and optional
158 context-enhanced information.
160 """
161 flags = HeartRateMeasurementFlags(data[0])
162 offset = 1
164 # Parse heart rate value (8-bit or 16-bit depending on flag)
165 if flags & HeartRateMeasurementFlags.HEART_RATE_VALUE_FORMAT_UINT16: # 16-bit heart rate value
166 if len(data) < offset + 2:
167 raise ValueError("Insufficient data for 16-bit heart rate value")
168 heart_rate = DataParser.parse_int16(data, offset, signed=False)
169 offset += 2
170 else: # 8-bit heart rate value
171 heart_rate = DataParser.parse_int8(data, offset, signed=False)
172 offset += 1
174 sensor_contact = SensorContactState.from_flags(flags)
176 # Optional Energy Expended
177 energy_expended = None
178 if (flags & HeartRateMeasurementFlags.ENERGY_EXPENDED_PRESENT) and len(data) >= offset + 2:
179 energy_expended = DataParser.parse_int16(data, offset, signed=False)
180 offset += 2
182 # Optional RR-Intervals
183 rr_intervals: list[float] = []
184 if (flags & HeartRateMeasurementFlags.RR_INTERVAL_PRESENT) and len(data) >= offset + 2:
185 while offset + 2 <= len(data):
186 rr_interval_raw = DataParser.parse_int16(data, offset, signed=False)
187 rr_intervals.append(rr_interval_raw / RR_INTERVAL_RESOLUTION)
188 offset += 2
190 # Context enhancement: Body Sensor Location
191 sensor_location = None
192 if ctx:
193 location_value = self.get_context_characteristic(ctx, CharacteristicName.BODY_SENSOR_LOCATION)
194 if location_value is not None:
195 # Body Sensor Location is a uint8 enum value (0-6)
196 try:
197 sensor_location = BodySensorLocation(location_value)
198 except ValueError:
199 # Invalid value outside enum range, leave as None
200 logger.warning(
201 "Invalid Body Sensor Location value %s in context (valid range: 0-6), ignoring",
202 location_value,
203 )
205 return HeartRateData(
206 heart_rate=heart_rate,
207 sensor_contact=sensor_contact,
208 energy_expended=energy_expended,
209 rr_intervals=tuple(rr_intervals), # Convert list to tuple for immutable struct
210 flags=flags,
211 sensor_location=sensor_location,
212 )
214 def _encode_value(self, data: HeartRateData) -> bytearray:
215 """Encode HeartRateData back to bytes.
217 The inverse of decode_value respecting the same flag semantics.
219 Args:
220 data: HeartRateData instance to encode
222 Returns:
223 Encoded bytes representing the heart rate measurement
225 """
226 flags = int(data.flags) # Use the flags from the data structure
228 result = bytearray([flags])
230 if flags & HeartRateMeasurementFlags.HEART_RATE_VALUE_FORMAT_UINT16:
231 result.extend(DataParser.encode_int16(data.heart_rate, signed=False))
232 else:
233 result.extend(DataParser.encode_int8(data.heart_rate, signed=False))
235 if data.energy_expended is not None:
236 result.extend(DataParser.encode_int16(data.energy_expended, signed=False))
238 for rr_interval in data.rr_intervals:
239 rr_raw = round(rr_interval * RR_INTERVAL_RESOLUTION)
240 rr_raw = min(rr_raw, UINT16_MAX)
241 result.extend(DataParser.encode_int16(rr_raw, signed=False))
243 return result