Coverage for src/bluetooth_sig/gatt/characteristics/heart_rate_measurement.py: 91%
86 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 00:10 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 00:10 +0000
1"""Heart Rate Measurement characteristic implementation."""
3from __future__ import annotations
5from enum import IntEnum, IntFlag
7import msgspec
9from ..constants import UINT16_MAX
10from ..context import CharacteristicContext
11from .base import BaseCharacteristic
12from .utils import DataParser
14# RR-Interval resolution: 1/1024 seconds per unit
15RR_INTERVAL_RESOLUTION = 1024.0
17# TODO: Implement CharacteristicContext support
18# This characteristic should access Heart Rate Control Point (0x2A39) from ctx.other_characteristics
19# to provide calibration factors and control commands for enhanced heart rate monitoring
22class HeartRateMeasurementFlags(IntFlag):
23 """Heart Rate Measurement flags as per Bluetooth SIG specification."""
25 HEART_RATE_VALUE_FORMAT_UINT16 = 0x01
26 SENSOR_CONTACT_SUPPORTED = 0x02
27 SENSOR_CONTACT_DETECTED = 0x04
28 ENERGY_EXPENDED_PRESENT = 0x08
29 RR_INTERVAL_PRESENT = 0x10
32class SensorContactState(IntEnum):
33 """Sensor contact state enumeration."""
35 NOT_SUPPORTED = 0
36 NOT_DETECTED = 1
37 DETECTED = 2
39 def __str__(self) -> str:
40 """Return human-readable sensor contact state."""
41 return {0: "not_supported", 1: "not_detected", 2: "detected"}[self.value]
43 def __eq__(self, other: object) -> bool:
44 """Support comparison with string values for backward compatibility."""
45 if isinstance(other, str):
46 return str(self) == other
47 return super().__eq__(other)
49 def __hash__(self) -> int:
50 """Make enum hashable."""
51 return super().__hash__()
53 @classmethod
54 def from_flags(cls, flags: int) -> SensorContactState:
55 """Create enum from heart rate flags."""
56 if not flags & HeartRateMeasurementFlags.SENSOR_CONTACT_SUPPORTED: # Sensor Contact Supported bit not set
57 return cls.NOT_SUPPORTED
58 if flags & HeartRateMeasurementFlags.SENSOR_CONTACT_DETECTED: # Sensor Contact Detected bit set
59 return cls.DETECTED
60 return cls.NOT_DETECTED
63class HeartRateData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods
64 """Parsed data from Heart Rate Measurement characteristic."""
66 heart_rate: int # BPM (0-UINT16_MAX)
67 sensor_contact: SensorContactState
68 energy_expended: int | None = None # kJ
69 rr_intervals: tuple[float, ...] = () # R-R intervals in seconds, immutable tuple
70 flags: HeartRateMeasurementFlags = HeartRateMeasurementFlags(0) # Raw flags byte for reference
72 def __post_init__(self) -> None:
73 """Validate heart rate measurement data."""
74 if not 0 <= self.heart_rate <= UINT16_MAX:
75 raise ValueError(f"Heart rate must be 0-{UINT16_MAX} bpm, got {self.heart_rate}")
76 if self.energy_expended is not None and not 0 <= self.energy_expended <= UINT16_MAX:
77 raise ValueError(f"Energy expended must be 0-{UINT16_MAX} kJ, got {self.energy_expended}")
78 for interval in self.rr_intervals:
79 if not 0.0 <= interval <= (UINT16_MAX / RR_INTERVAL_RESOLUTION): # Max RR interval in seconds
80 raise ValueError(
81 f"RR interval must be 0.0-{UINT16_MAX / RR_INTERVAL_RESOLUTION} seconds, got {interval}"
82 )
85class HeartRateMeasurementCharacteristic(BaseCharacteristic):
86 """Heart Rate Measurement characteristic (0x2A37).
88 Used in Heart Rate Service (spec: Heart Rate Service 1.0, Heart Rate Profile 1.0)
89 to transmit instantaneous heart rate plus optional energy expended and
90 RR-Interval metrics.
92 Specification summary (flags byte bit assignments - see adopted spec & Errata 23224):
93 Bit 0 (0x01): Heart Rate Value Format (0 = uint8, 1 = uint16)
94 Bit 1 (0x02): Sensor Contact Supported
95 Bit 2 (0x04): Sensor Contact Detected (valid only when Bit1 set)
96 Bit 3 (0x08): Energy Expended Present (adds 2 bytes, little-endian, in kilo Joules)
97 Bit 4 (0x10): RR-Interval(s) Present (sequence of 16-bit values, units 1/1024 s)
98 Bits 5-7: Reserved (must be 0)
100 Parsing Rules:
101 * Minimum length: 2 bytes (flags + at least one byte of heart rate value)
102 * If Bit0 set, heart rate is 16 bits; else 8 bits
103 * Energy Expended only parsed if flag set AND 2 bytes remain
104 * RR-Intervals parsed greedily in pairs until buffer end when flag set
105 * RR-Interval raw value converted to seconds: raw / 1024.0
106 * Sensor contact state derived from Bits1/2 tri-state (not supported, not detected, detected)
108 Validation:
109 * Heart rate validated within 0..UINT16_MAX (spec does not strictly define upper physiological bound)
110 * RR interval constrained to 0.0-65.535 s (fits 16-bit / 1024 scaling and guards against malformed data)
111 * Energy expended 0..UINT16_MAX
113 References:
114 * Bluetooth SIG Heart Rate Service 1.0 (https://www.bluetooth.com/specifications/specs/heart-rate-service-1-0/)
115 * Bluetooth SIG Heart Rate Profile 1.0 (https://www.bluetooth.com/specifications/specs/heart-rate-profile-1-0/)
116 * Errata Correction 23224 (mandatory for compliance)
118 """
120 # RR-Interval resolution: 1/1024 seconds per unit
121 RR_INTERVAL_RESOLUTION = RR_INTERVAL_RESOLUTION
123 def decode_value(self, data: bytearray, ctx: CharacteristicContext | None = None) -> HeartRateData:
124 """Parse heart rate measurement data according to Bluetooth specification.
126 Format: Flags(1) + Heart Rate Value(1-2) + [Energy Expended(2)] + [RR-Intervals(2*n)]
128 Args:
129 data: Raw bytearray from BLE characteristic.
130 ctx: Optional CharacteristicContext providing surrounding context (may be None).
132 Returns:
133 HeartRateData containing parsed heart rate data with metadata.
135 """
136 if len(data) < 2:
137 raise ValueError("Heart rate measurement data must be at least 2 bytes")
139 flags = HeartRateMeasurementFlags(data[0])
140 offset = 1
142 # Parse heart rate value (8-bit or 16-bit depending on flag)
143 if flags & HeartRateMeasurementFlags.HEART_RATE_VALUE_FORMAT_UINT16: # 16-bit heart rate value
144 if len(data) < offset + 2:
145 raise ValueError("Insufficient data for 16-bit heart rate value")
146 heart_rate = DataParser.parse_int16(data, offset, signed=False)
147 offset += 2
148 else: # 8-bit heart rate value
149 heart_rate = DataParser.parse_int8(data, offset, signed=False)
150 offset += 1
152 sensor_contact = SensorContactState.from_flags(flags)
154 # Optional Energy Expended
155 energy_expended = None
156 if (flags & HeartRateMeasurementFlags.ENERGY_EXPENDED_PRESENT) and len(data) >= offset + 2:
157 energy_expended = DataParser.parse_int16(data, offset, signed=False)
158 offset += 2
160 # Optional RR-Intervals
161 rr_intervals: list[float] = []
162 if (flags & HeartRateMeasurementFlags.RR_INTERVAL_PRESENT) and len(data) >= offset + 2:
163 while offset + 2 <= len(data):
164 rr_interval_raw = DataParser.parse_int16(data, offset, signed=False)
165 rr_intervals.append(rr_interval_raw / RR_INTERVAL_RESOLUTION)
166 offset += 2
168 return HeartRateData(
169 heart_rate=heart_rate,
170 sensor_contact=sensor_contact,
171 energy_expended=energy_expended,
172 rr_intervals=tuple(rr_intervals), # Convert list to tuple for immutable struct
173 flags=flags,
174 )
176 def encode_value(self, data: HeartRateData) -> bytearray:
177 """Encode HeartRateData back to bytes.
179 The inverse of decode_value respecting the same flag semantics.
181 Args:
182 data: HeartRateData instance to encode
184 Returns:
185 Encoded bytes representing the heart rate measurement
187 """
188 flags = int(data.flags) # Use the flags from the data structure
190 result = bytearray([flags])
192 if flags & HeartRateMeasurementFlags.HEART_RATE_VALUE_FORMAT_UINT16:
193 result.extend(DataParser.encode_int16(data.heart_rate, signed=False))
194 else:
195 result.extend(DataParser.encode_int8(data.heart_rate, signed=False))
197 if data.energy_expended is not None:
198 result.extend(DataParser.encode_int16(data.energy_expended, signed=False))
200 for rr_interval in data.rr_intervals:
201 rr_raw = round(rr_interval * RR_INTERVAL_RESOLUTION)
202 rr_raw = min(rr_raw, UINT16_MAX)
203 result.extend(DataParser.encode_int16(rr_raw, signed=False))
205 return result