Coverage for src / bluetooth_sig / gatt / characteristics / cycling_power_measurement.py: 79%
231 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-03 16:41 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-03 16:41 +0000
1"""Cycling Power Measurement characteristic implementation."""
3from __future__ import annotations
5from enum import IntFlag
6from typing import Any, ClassVar
8import msgspec
10from ..constants import SINT16_MAX, SINT16_MIN, UINT8_MAX, UINT16_MAX, UINT32_MAX
11from ..context import CharacteristicContext
12from .base import BaseCharacteristic
13from .cycling_power_feature import CyclingPowerFeatureCharacteristic
14from .utils import DataParser
17class CyclingPowerMeasurementFlags(IntFlag):
18 """Cycling Power Measurement Flags as per CPS v1.1 Table 3.2."""
20 PEDAL_POWER_BALANCE_PRESENT = 0x0001 # bit 0
21 PEDAL_POWER_BALANCE_REFERENCE = 0x0002 # bit 1 — 0=Unknown, 1=Left
22 ACCUMULATED_TORQUE_PRESENT = 0x0004 # bit 2
23 ACCUMULATED_TORQUE_SOURCE = 0x0008 # bit 3 — 0=Wheel Based, 1=Crank Based
24 WHEEL_REVOLUTION_DATA_PRESENT = 0x0010 # bit 4
25 CRANK_REVOLUTION_DATA_PRESENT = 0x0020 # bit 5
26 EXTREME_FORCE_MAGNITUDES_PRESENT = 0x0040 # bit 6
27 EXTREME_TORQUE_MAGNITUDES_PRESENT = 0x0080 # bit 7
28 EXTREME_ANGLES_PRESENT = 0x0100 # bit 8
29 TOP_DEAD_SPOT_ANGLE_PRESENT = 0x0200 # bit 9
30 BOTTOM_DEAD_SPOT_ANGLE_PRESENT = 0x0400 # bit 10
31 ACCUMULATED_ENERGY_PRESENT = 0x0800 # bit 11
32 OFFSET_COMPENSATION_INDICATOR = 0x1000 # bit 12
35class CyclingPowerMeasurementData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods,too-many-instance-attributes
36 """Parsed data from Cycling Power Measurement characteristic."""
38 flags: CyclingPowerMeasurementFlags
39 instantaneous_power: int # Watts (sint16)
40 pedal_power_balance: float | None = None # Percentage (0.5% resolution)
41 accumulated_torque: float | None = None # Newton metres (1/32 Nm resolution)
42 accumulated_energy: int | None = None # kJ
43 cumulative_wheel_revolutions: int | None = None
44 last_wheel_event_time: float | None = None # seconds
45 cumulative_crank_revolutions: int | None = None
46 last_crank_event_time: float | None = None # seconds
47 maximum_force_magnitude: int | None = None # Newtons (sint16)
48 minimum_force_magnitude: int | None = None # Newtons (sint16)
49 maximum_torque_magnitude: float | None = None # Nm (sint16, 1/32 resolution)
50 minimum_torque_magnitude: float | None = None # Nm (sint16, 1/32 resolution)
51 maximum_angle: int | None = None # degrees (uint16)
52 minimum_angle: int | None = None # degrees (uint16)
53 top_dead_spot_angle: int | None = None # degrees (uint16)
54 bottom_dead_spot_angle: int | None = None # degrees (uint16)
56 def __post_init__(self) -> None:
57 """Validate cycling power measurement data."""
58 flags_value = int(self.flags)
59 if not 0 <= flags_value <= UINT16_MAX:
60 raise ValueError("Flags must be a uint16 value (0-UINT16_MAX)")
61 if not SINT16_MIN <= self.instantaneous_power <= SINT16_MAX:
62 raise ValueError("Instantaneous power must be a sint16 value")
65class CyclingPowerMeasurementCharacteristic(BaseCharacteristic[CyclingPowerMeasurementData]):
66 """Cycling Power Measurement characteristic (0x2A63).
68 Used to transmit cycling power measurement data including
69 instantaneous power, pedal power balance, accumulated energy, and
70 revolution data.
71 """
73 # Special values
74 UNKNOWN_PEDAL_POWER_BALANCE = 0xFF # Value indicating unknown power balance
76 # Time resolution constants
77 WHEEL_TIME_RESOLUTION = 2048.0 # 1/2048 second resolution
78 CRANK_TIME_RESOLUTION = 1024.0 # 1/1024 second resolution
79 PEDAL_POWER_BALANCE_RESOLUTION = 2.0 # 0.5% resolution
80 ACCUMULATED_TORQUE_RESOLUTION = 32.0 # 1/32 Nm resolution
82 _manual_unit: str = "W" # Watts unit for power measurement
84 _optional_dependencies: ClassVar[list[type[BaseCharacteristic[Any]]]] = [CyclingPowerFeatureCharacteristic]
86 min_length: int = 4 # Flags(2) + Instantaneous Power(2)
87 allow_variable_length: bool = True # Many optional fields based on flags
89 def _decode_value(
90 self, data: bytearray, ctx: CharacteristicContext | None = None, *, validate: bool = True
91 ) -> CyclingPowerMeasurementData: # pylint: disable=too-many-locals # Complex parsing with many optional fields
92 """Parse cycling power measurement data according to Bluetooth specification.
94 Format: Flags(2) + Instantaneous Power(2) + [Pedal Power Balance(1)] +
95 [Accumulated Torque(2)] + [Wheel Revolutions(4) + Last Wheel Event Time(2)] +
96 [Crank Revolutions(2) + Last Crank Event Time(2)] + [Accumulated Energy(2)]
98 Args:
99 data: Raw bytearray from BLE characteristic.
100 ctx: Optional CharacteristicContext providing surrounding context (may be None).
101 validate: Whether to validate ranges (default True)
103 Returns:
104 CyclingPowerMeasurementData containing parsed power measurement data.
106 Raises:
107 ValueError: If data format is invalid.
109 """
110 # Parse flags (16-bit)
111 flags = DataParser.parse_int16(data, 0, signed=False)
113 # Parse instantaneous power (16-bit signed integer in watts)
114 instantaneous_power = DataParser.parse_int16(data, 2, signed=True)
116 offset = 4
118 # Parse optional fields
119 pedal_power_balance = None
120 accumulated_torque = None
121 accumulated_energy = None
122 cumulative_wheel_revolutions = None
123 last_wheel_event_time = None
124 cumulative_crank_revolutions = None
125 last_crank_event_time = None
126 maximum_force_magnitude = None
127 minimum_force_magnitude = None
128 maximum_torque_magnitude = None
129 minimum_torque_magnitude = None
130 maximum_angle = None
131 minimum_angle = None
132 top_dead_spot_angle = None
133 bottom_dead_spot_angle = None
135 # Parse optional pedal power balance (1 byte) if present (bit 0)
136 if (flags & CyclingPowerMeasurementFlags.PEDAL_POWER_BALANCE_PRESENT) and len(data) >= offset + 1:
137 pedal_power_balance_raw = data[offset]
138 # Value UNKNOWN_PEDAL_POWER_BALANCE indicates unknown, otherwise percentage
139 if pedal_power_balance_raw != self.UNKNOWN_PEDAL_POWER_BALANCE:
140 pedal_power_balance = pedal_power_balance_raw / self.PEDAL_POWER_BALANCE_RESOLUTION # 0.5% resolution
141 offset += 1
143 # Parse optional accumulated torque (2 bytes, uint16, 1/32 Nm resolution) if present (bit 2)
144 if (flags & CyclingPowerMeasurementFlags.ACCUMULATED_TORQUE_PRESENT) and len(data) >= offset + 2:
145 accumulated_torque_raw = DataParser.parse_int16(data, offset, signed=False)
146 accumulated_torque = accumulated_torque_raw / self.ACCUMULATED_TORQUE_RESOLUTION
147 offset += 2
149 # Parse optional wheel revolution data (6 bytes total) if present (bit 4)
150 if (flags & CyclingPowerMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT) and len(data) >= offset + 6:
151 cumulative_wheel_revolutions = DataParser.parse_int32(data, offset, signed=False)
152 wheel_event_time_raw = DataParser.parse_int16(data, offset + 4, signed=False)
153 # Wheel event time is in 1/WHEEL_TIME_RESOLUTION second units
154 last_wheel_event_time = wheel_event_time_raw / self.WHEEL_TIME_RESOLUTION
155 offset += 6
157 # Parse optional crank revolution data (4 bytes total) if present (bit 5)
158 if (flags & CyclingPowerMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT) and len(data) >= offset + 4:
159 cumulative_crank_revolutions = DataParser.parse_int16(data, offset, signed=False)
160 crank_event_time_raw = DataParser.parse_int16(data, offset + 2, signed=False)
161 # Crank event time is in 1/CRANK_TIME_RESOLUTION second units
162 last_crank_event_time = crank_event_time_raw / self.CRANK_TIME_RESOLUTION
163 offset += 4
165 # Parse optional extreme force magnitudes (4 bytes: max sint16 + min sint16) if present (bit 6)
166 if (flags & CyclingPowerMeasurementFlags.EXTREME_FORCE_MAGNITUDES_PRESENT) and len(data) >= offset + 4:
167 maximum_force_magnitude = DataParser.parse_int16(data, offset, signed=True)
168 minimum_force_magnitude = DataParser.parse_int16(data, offset + 2, signed=True)
169 offset += 4
171 # Parse optional extreme torque magnitudes (4 bytes: max sint16 + min sint16, 1/32 Nm) if present (bit 7)
172 if (flags & CyclingPowerMeasurementFlags.EXTREME_TORQUE_MAGNITUDES_PRESENT) and len(data) >= offset + 4:
173 maximum_torque_magnitude = DataParser.parse_int16(data, offset, signed=True) / 32.0
174 minimum_torque_magnitude = DataParser.parse_int16(data, offset + 2, signed=True) / 32.0
175 offset += 4
177 # Parse optional extreme angles (3 bytes packed: max uint12 + min uint12) if present (bit 8)
178 if (flags & CyclingPowerMeasurementFlags.EXTREME_ANGLES_PRESENT) and len(data) >= offset + 3:
179 raw_bytes = data[offset : offset + 3]
180 combined = raw_bytes[0] | (raw_bytes[1] << 8) | (raw_bytes[2] << 16)
181 maximum_angle = combined & 0x0FFF
182 minimum_angle = (combined >> 12) & 0x0FFF
183 offset += 3
185 # Parse optional top dead spot angle (2 bytes, uint16 degrees) if present (bit 9)
186 if (flags & CyclingPowerMeasurementFlags.TOP_DEAD_SPOT_ANGLE_PRESENT) and len(data) >= offset + 2:
187 top_dead_spot_angle = DataParser.parse_int16(data, offset, signed=False)
188 offset += 2
190 # Parse optional bottom dead spot angle (2 bytes, uint16 degrees) if present (bit 10)
191 if (flags & CyclingPowerMeasurementFlags.BOTTOM_DEAD_SPOT_ANGLE_PRESENT) and len(data) >= offset + 2:
192 bottom_dead_spot_angle = DataParser.parse_int16(data, offset, signed=False)
193 offset += 2
195 # Parse optional accumulated energy (2 bytes, uint16, kJ) if present (bit 11)
196 if (flags & CyclingPowerMeasurementFlags.ACCUMULATED_ENERGY_PRESENT) and len(data) >= offset + 2:
197 accumulated_energy = DataParser.parse_int16(data, offset, signed=False) # kJ
198 offset += 2
200 # Validate flags against Cycling Power Feature if available
201 if ctx is not None:
202 feature_data = self.get_context_characteristic(ctx, CyclingPowerFeatureCharacteristic)
203 if feature_data is not None:
204 from .cycling_power_feature import CyclingPowerFeatures # noqa: PLC0415 — local import to avoid cycle
206 # Validate that reported features are actually supported
207 if (flags & CyclingPowerMeasurementFlags.PEDAL_POWER_BALANCE_PRESENT) and not (
208 feature_data.features & CyclingPowerFeatures.PEDAL_POWER_BALANCE_SUPPORTED
209 ):
210 raise ValueError("Pedal power balance reported but not supported by Cycling Power Feature")
211 if (flags & CyclingPowerMeasurementFlags.ACCUMULATED_ENERGY_PRESENT) and not (
212 feature_data.features & CyclingPowerFeatures.ACCUMULATED_ENERGY_SUPPORTED
213 ):
214 raise ValueError("Accumulated energy reported but not supported by Cycling Power Feature")
215 if (flags & CyclingPowerMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT) and not (
216 feature_data.features & CyclingPowerFeatures.WHEEL_REVOLUTION_DATA_SUPPORTED
217 ):
218 raise ValueError("Wheel revolution data reported but not supported by Cycling Power Feature")
219 if (flags & CyclingPowerMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT) and not (
220 feature_data.features & CyclingPowerFeatures.CRANK_REVOLUTION_DATA_SUPPORTED
221 ):
222 raise ValueError("Crank revolution data reported but not supported by Cycling Power Feature")
224 # Create struct with all parsed values
225 return CyclingPowerMeasurementData(
226 flags=CyclingPowerMeasurementFlags(flags),
227 instantaneous_power=instantaneous_power,
228 pedal_power_balance=pedal_power_balance,
229 accumulated_torque=accumulated_torque,
230 accumulated_energy=accumulated_energy,
231 cumulative_wheel_revolutions=cumulative_wheel_revolutions,
232 last_wheel_event_time=last_wheel_event_time,
233 cumulative_crank_revolutions=cumulative_crank_revolutions,
234 last_crank_event_time=last_crank_event_time,
235 maximum_force_magnitude=maximum_force_magnitude,
236 minimum_force_magnitude=minimum_force_magnitude,
237 maximum_torque_magnitude=maximum_torque_magnitude,
238 minimum_torque_magnitude=minimum_torque_magnitude,
239 maximum_angle=maximum_angle,
240 minimum_angle=minimum_angle,
241 top_dead_spot_angle=top_dead_spot_angle,
242 bottom_dead_spot_angle=bottom_dead_spot_angle,
243 )
245 def _encode_value(self, data: CyclingPowerMeasurementData) -> bytearray: # noqa: PLR0912 # pylint: disable=too-many-locals,too-many-branches,too-many-statements
246 """Encode cycling power measurement value back to bytes.
248 Args:
249 data: CyclingPowerMeasurementData containing cycling power measurement data
251 Returns:
252 Encoded bytes representing the power measurement
254 """
255 instantaneous_power = data.instantaneous_power
256 pedal_power_balance = data.pedal_power_balance
257 accumulated_torque = data.accumulated_torque
258 accumulated_energy = data.accumulated_energy
259 wheel_revolutions = data.cumulative_wheel_revolutions
260 wheel_event_time = data.last_wheel_event_time
261 crank_revolutions = data.cumulative_crank_revolutions
262 crank_event_time = data.last_crank_event_time
264 # Build flags based on available data
265 flags = 0
266 if pedal_power_balance is not None:
267 flags |= CyclingPowerMeasurementFlags.PEDAL_POWER_BALANCE_PRESENT
268 if accumulated_torque is not None:
269 flags |= CyclingPowerMeasurementFlags.ACCUMULATED_TORQUE_PRESENT
270 if wheel_revolutions is not None and wheel_event_time is not None:
271 flags |= CyclingPowerMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT
272 if crank_revolutions is not None and crank_event_time is not None:
273 flags |= CyclingPowerMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT
274 if data.maximum_force_magnitude is not None and data.minimum_force_magnitude is not None:
275 flags |= CyclingPowerMeasurementFlags.EXTREME_FORCE_MAGNITUDES_PRESENT
276 if data.maximum_torque_magnitude is not None and data.minimum_torque_magnitude is not None:
277 flags |= CyclingPowerMeasurementFlags.EXTREME_TORQUE_MAGNITUDES_PRESENT
278 if data.maximum_angle is not None and data.minimum_angle is not None:
279 flags |= CyclingPowerMeasurementFlags.EXTREME_ANGLES_PRESENT
280 if data.top_dead_spot_angle is not None:
281 flags |= CyclingPowerMeasurementFlags.TOP_DEAD_SPOT_ANGLE_PRESENT
282 if data.bottom_dead_spot_angle is not None:
283 flags |= CyclingPowerMeasurementFlags.BOTTOM_DEAD_SPOT_ANGLE_PRESENT
284 if accumulated_energy is not None:
285 flags |= CyclingPowerMeasurementFlags.ACCUMULATED_ENERGY_PRESENT
287 # Validate instantaneous power (sint16 range)
288 if not SINT16_MIN <= instantaneous_power <= SINT16_MAX:
289 raise ValueError(f"Instantaneous power {instantaneous_power} W exceeds sint16 range")
291 # Start with flags and instantaneous power
292 result = bytearray()
293 result.extend(DataParser.encode_int16(flags, signed=False)) # Flags (16-bit)
294 result.extend(DataParser.encode_int16(instantaneous_power, signed=True)) # Power (sint16)
296 # Add optional fields in spec order (per CPS v1.1 §3.2.1)
297 if pedal_power_balance is not None:
298 balance = int(pedal_power_balance * self.PEDAL_POWER_BALANCE_RESOLUTION)
299 if not 0 <= balance <= UINT8_MAX:
300 raise ValueError(f"Pedal power balance {balance} exceeds uint8 range")
301 result.append(balance)
303 if accumulated_torque is not None:
304 torque_raw = round(accumulated_torque * self.ACCUMULATED_TORQUE_RESOLUTION)
305 if not 0 <= torque_raw <= UINT16_MAX:
306 raise ValueError(f"Accumulated torque {torque_raw} exceeds uint16 range")
307 result.extend(DataParser.encode_int16(torque_raw, signed=False))
309 if wheel_revolutions is not None and wheel_event_time is not None:
310 wheel_rev = int(wheel_revolutions)
311 wheel_time = round(wheel_event_time * self.WHEEL_TIME_RESOLUTION)
312 if not 0 <= wheel_rev <= UINT32_MAX:
313 raise ValueError(f"Wheel revolutions {wheel_rev} exceeds uint32 range")
314 if not 0 <= wheel_time <= UINT16_MAX:
315 raise ValueError(f"Wheel event time {wheel_time} exceeds uint16 range")
316 result.extend(DataParser.encode_int32(wheel_rev, signed=False))
317 result.extend(DataParser.encode_int16(wheel_time, signed=False))
319 if crank_revolutions is not None and crank_event_time is not None:
320 crank_rev = int(crank_revolutions)
321 crank_time = round(crank_event_time * self.CRANK_TIME_RESOLUTION)
322 if not 0 <= crank_rev <= UINT16_MAX:
323 raise ValueError(f"Crank revolutions {crank_rev} exceeds uint16 range")
324 if not 0 <= crank_time <= UINT16_MAX:
325 raise ValueError(f"Crank event time {crank_time} exceeds uint16 range")
326 result.extend(DataParser.encode_int16(crank_rev, signed=False))
327 result.extend(DataParser.encode_int16(crank_time, signed=False))
329 # Encode extreme force magnitudes (bit 6): max sint16 + min sint16
330 if data.maximum_force_magnitude is not None and data.minimum_force_magnitude is not None:
331 if not SINT16_MIN <= data.maximum_force_magnitude <= SINT16_MAX:
332 raise ValueError(f"Maximum force magnitude {data.maximum_force_magnitude} exceeds sint16 range")
333 if not SINT16_MIN <= data.minimum_force_magnitude <= SINT16_MAX:
334 raise ValueError(f"Minimum force magnitude {data.minimum_force_magnitude} exceeds sint16 range")
335 result.extend(DataParser.encode_int16(data.maximum_force_magnitude, signed=True))
336 result.extend(DataParser.encode_int16(data.minimum_force_magnitude, signed=True))
338 # Encode extreme torque magnitudes (bit 7): max sint16 + min sint16, 1/32 Nm resolution
339 if data.maximum_torque_magnitude is not None and data.minimum_torque_magnitude is not None:
340 max_torque_raw = round(data.maximum_torque_magnitude * 32)
341 min_torque_raw = round(data.minimum_torque_magnitude * 32)
342 if not SINT16_MIN <= max_torque_raw <= SINT16_MAX:
343 raise ValueError(f"Maximum torque magnitude raw {max_torque_raw} exceeds sint16 range")
344 if not SINT16_MIN <= min_torque_raw <= SINT16_MAX:
345 raise ValueError(f"Minimum torque magnitude raw {min_torque_raw} exceeds sint16 range")
346 result.extend(DataParser.encode_int16(max_torque_raw, signed=True))
347 result.extend(DataParser.encode_int16(min_torque_raw, signed=True))
349 # Encode extreme angles (bit 8): two uint12 packed in 3 bytes
350 if data.maximum_angle is not None and data.minimum_angle is not None:
351 max_ang = data.maximum_angle & 0x0FFF
352 min_ang = data.minimum_angle & 0x0FFF
353 combined = max_ang | (min_ang << 12)
354 result.append(combined & 0xFF)
355 result.append((combined >> 8) & 0xFF)
356 result.append((combined >> 16) & 0xFF)
358 # Encode top dead spot angle (bit 9): uint16 degrees
359 if data.top_dead_spot_angle is not None:
360 if not 0 <= data.top_dead_spot_angle <= UINT16_MAX:
361 raise ValueError(f"Top dead spot angle {data.top_dead_spot_angle} exceeds uint16 range")
362 result.extend(DataParser.encode_int16(data.top_dead_spot_angle, signed=False))
364 # Encode bottom dead spot angle (bit 10): uint16 degrees
365 if data.bottom_dead_spot_angle is not None:
366 if not 0 <= data.bottom_dead_spot_angle <= UINT16_MAX:
367 raise ValueError(f"Bottom dead spot angle {data.bottom_dead_spot_angle} exceeds uint16 range")
368 result.extend(DataParser.encode_int16(data.bottom_dead_spot_angle, signed=False))
370 if accumulated_energy is not None:
371 energy = int(accumulated_energy)
372 if not 0 <= energy <= UINT16_MAX:
373 raise ValueError(f"Accumulated energy {energy} exceeds uint16 range")
374 result.extend(DataParser.encode_int16(energy, signed=False))
376 return result