Coverage for src / bluetooth_sig / gatt / characteristics / cycling_power_measurement.py: 86%
138 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
1"""Cycling Power Measurement characteristic implementation."""
3from __future__ import annotations
5from enum import IntFlag
6from typing import Any, ClassVar
8import msgspec
10from ..constants import SINT16_MAX, SINT16_MIN, UINT8_MAX, UINT16_MAX, UINT32_MAX
11from ..context import CharacteristicContext
12from .base import BaseCharacteristic
13from .cycling_power_feature import CyclingPowerFeatureCharacteristic
14from .utils import DataParser
17class CyclingPowerMeasurementFlags(IntFlag):
18 """Cycling Power Measurement Flags as per Bluetooth SIG specification."""
20 PEDAL_POWER_BALANCE_PRESENT = 0x0001
21 PEDAL_POWER_BALANCE_REFERENCE = 0x0002 # 0 = Unknown, 1 = Left
22 ACCUMULATED_TORQUE_PRESENT = 0x0004
23 ACCUMULATED_ENERGY_PRESENT = 0x0008
24 WHEEL_REVOLUTION_DATA_PRESENT = 0x0010
25 CRANK_REVOLUTION_DATA_PRESENT = 0x0020
26 EXTREME_FORCE_MAGNITUDES_PRESENT = 0x0040
27 EXTREME_TORQUE_MAGNITUDES_PRESENT = 0x0080
28 EXTREME_ANGLES_PRESENT = 0x0100
29 TOP_DEAD_SPOT_ANGLE_PRESENT = 0x0200
30 BOTTOM_DEAD_SPOT_ANGLE_PRESENT = 0x0400
31 ACCUMULATED_ENERGY_RESERVED = 0x0800
34class CyclingPowerMeasurementData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods,too-many-instance-attributes
35 """Parsed data from Cycling Power Measurement characteristic."""
37 flags: CyclingPowerMeasurementFlags
38 instantaneous_power: int # Watts
39 pedal_power_balance: float | None = None # Percentage (0.5% resolution)
40 accumulated_energy: int | None = None # kJ
41 cumulative_wheel_revolutions: int | None = None # Changed to match decode_value
42 last_wheel_event_time: float | None = None # seconds
43 cumulative_crank_revolutions: int | None = None # Changed to match decode_value
44 last_crank_event_time: float | None = None # seconds
46 def __post_init__(self) -> None:
47 """Validate cycling power measurement data."""
48 flags_value = int(self.flags)
49 if not 0 <= flags_value <= UINT16_MAX:
50 raise ValueError("Flags must be a uint16 value (0-UINT16_MAX)")
51 if not 0 <= self.instantaneous_power <= UINT16_MAX:
52 raise ValueError("Instantaneous power must be a uint16 value (0-UINT16_MAX)")
55class CyclingPowerMeasurementCharacteristic(BaseCharacteristic[CyclingPowerMeasurementData]):
56 """Cycling Power Measurement characteristic (0x2A63).
58 Used to transmit cycling power measurement data including
59 instantaneous power, pedal power balance, accumulated energy, and
60 revolution data.
61 """
63 # Special values
64 UNKNOWN_PEDAL_POWER_BALANCE = 0xFF # Value indicating unknown power balance
66 # Time resolution constants
67 WHEEL_TIME_RESOLUTION = 2048.0 # 1/2048 second resolution
68 CRANK_TIME_RESOLUTION = 1024.0 # 1/1024 second resolution
69 PEDAL_POWER_BALANCE_RESOLUTION = 2.0 # 0.5% resolution
71 _manual_unit: str = "W" # Watts unit for power measurement
73 _optional_dependencies: ClassVar[list[type[BaseCharacteristic[Any]]]] = [CyclingPowerFeatureCharacteristic]
75 min_length: int = 4 # Flags(2) + Instantaneous Power(2)
76 allow_variable_length: bool = True # Many optional fields based on flags
78 def _decode_value(
79 self, data: bytearray, ctx: CharacteristicContext | None = None, *, validate: bool = True
80 ) -> CyclingPowerMeasurementData: # pylint: disable=too-many-locals # Complex parsing with many optional fields
81 """Parse cycling power measurement data according to Bluetooth specification.
83 Format: Flags(2) + Instantaneous Power(2) + [Pedal Power Balance(1)] +
84 [Accumulated Energy(2)] + [Wheel Revolutions(4)] + [Last Wheel Event Time(2)] +
85 [Crank Revolutions(2)] + [Last Crank Event Time(2)]
87 Args:
88 data: Raw bytearray from BLE characteristic.
89 ctx: Optional CharacteristicContext providing surrounding context (may be None).
90 validate: Whether to validate ranges (default True)
92 Returns:
93 CyclingPowerMeasurementData containing parsed power measurement data.
95 Raises:
96 ValueError: If data format is invalid.
98 """
99 # Parse flags (16-bit)
100 flags = DataParser.parse_int16(data, 0, signed=False)
102 # Parse instantaneous power (16-bit signed integer in watts)
103 instantaneous_power = DataParser.parse_int16(data, 2, signed=True)
105 offset = 4
107 # Parse optional fields
108 pedal_power_balance = None
109 accumulated_energy = None
110 cumulative_wheel_revolutions = None
111 last_wheel_event_time = None
112 cumulative_crank_revolutions = None
113 last_crank_event_time = None
115 # Parse optional pedal power balance (1 byte) if present
116 if (flags & CyclingPowerMeasurementFlags.PEDAL_POWER_BALANCE_PRESENT) and len(data) >= offset + 1:
117 pedal_power_balance_raw = data[offset]
118 # Value UNKNOWN_PEDAL_POWER_BALANCE indicates unknown, otherwise percentage (0-100)
119 if pedal_power_balance_raw != self.UNKNOWN_PEDAL_POWER_BALANCE:
120 pedal_power_balance = pedal_power_balance_raw / self.PEDAL_POWER_BALANCE_RESOLUTION # 0.5% resolution
121 offset += 1
123 # Parse optional accumulated energy (2 bytes) if present
124 if (flags & CyclingPowerMeasurementFlags.ACCUMULATED_ENERGY_PRESENT) and len(data) >= offset + 2:
125 accumulated_energy = DataParser.parse_int16(data, offset, signed=False) # kJ
126 offset += 2
128 # Parse optional wheel revolution data (6 bytes total) if present
129 if (flags & CyclingPowerMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT) and len(data) >= offset + 6:
130 cumulative_wheel_revolutions = DataParser.parse_int32(data, offset, signed=False)
131 wheel_event_time_raw = DataParser.parse_int16(data, offset + 4, signed=False)
132 # Wheel event time is in 1/WHEEL_TIME_RESOLUTION second units
133 last_wheel_event_time = wheel_event_time_raw / self.WHEEL_TIME_RESOLUTION
134 offset += 6
136 # Parse optional crank revolution data (4 bytes total) if present
137 if (flags & CyclingPowerMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT) and len(data) >= offset + 4:
138 cumulative_crank_revolutions = DataParser.parse_int16(data, offset, signed=False)
139 crank_event_time_raw = DataParser.parse_int16(data, offset + 2, signed=False)
140 # Crank event time is in 1/CRANK_TIME_RESOLUTION second units
141 last_crank_event_time = crank_event_time_raw / self.CRANK_TIME_RESOLUTION
142 offset += 4
144 # Validate flags against Cycling Power Feature if available
145 if ctx is not None:
146 feature_data = self.get_context_characteristic(ctx, CyclingPowerFeatureCharacteristic)
147 if feature_data is not None:
148 # feature_data is the CyclingPowerFeatureData struct
150 # Check if reported features are supported
151 reported_features = int(flags)
153 # Validate that reported features are actually supported
154 if (
155 reported_features & CyclingPowerMeasurementFlags.PEDAL_POWER_BALANCE_PRESENT
156 ) and not feature_data.pedal_power_balance_supported:
157 raise ValueError("Pedal power balance reported but not supported by Cycling Power Feature")
158 if (
159 reported_features & CyclingPowerMeasurementFlags.ACCUMULATED_ENERGY_PRESENT
160 ) and not feature_data.accumulated_energy_supported:
161 raise ValueError("Accumulated energy reported but not supported by Cycling Power Feature")
162 if (
163 reported_features & CyclingPowerMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT
164 ) and not feature_data.wheel_revolution_data_supported:
165 raise ValueError("Wheel revolution data reported but not supported by Cycling Power Feature")
166 if (
167 reported_features & CyclingPowerMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT
168 ) and not feature_data.crank_revolution_data_supported:
169 raise ValueError("Crank revolution data reported but not supported by Cycling Power Feature")
171 # Create struct with all parsed values
172 return CyclingPowerMeasurementData(
173 flags=CyclingPowerMeasurementFlags(flags),
174 instantaneous_power=instantaneous_power,
175 pedal_power_balance=pedal_power_balance,
176 accumulated_energy=accumulated_energy,
177 cumulative_wheel_revolutions=cumulative_wheel_revolutions,
178 last_wheel_event_time=last_wheel_event_time,
179 cumulative_crank_revolutions=cumulative_crank_revolutions,
180 last_crank_event_time=last_crank_event_time,
181 )
183 def _encode_value(self, data: CyclingPowerMeasurementData) -> bytearray: # pylint: disable=too-many-locals,too-many-branches,too-many-statements # Complex cycling power measurement with numerous optional fields
184 """Encode cycling power measurement value back to bytes.
186 Args:
187 data: CyclingPowerMeasurementData containing cycling power measurement data
189 Returns:
190 Encoded bytes representing the power measurement
192 """
193 instantaneous_power = data.instantaneous_power
194 pedal_power_balance = data.pedal_power_balance
195 accumulated_energy = data.accumulated_energy
196 wheel_revolutions = data.cumulative_wheel_revolutions # Updated field name
197 wheel_event_time = data.last_wheel_event_time
198 crank_revolutions = data.cumulative_crank_revolutions # Updated field name
199 crank_event_time = data.last_crank_event_time
201 # Build flags based on available data
202 flags = 0
203 if pedal_power_balance is not None:
204 flags |= CyclingPowerMeasurementFlags.PEDAL_POWER_BALANCE_PRESENT # Pedal power balance present
205 if accumulated_energy is not None:
206 flags |= CyclingPowerMeasurementFlags.ACCUMULATED_ENERGY_PRESENT # Accumulated energy present
207 if wheel_revolutions is not None and wheel_event_time is not None:
208 flags |= CyclingPowerMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT # Wheel revolution data present
209 if crank_revolutions is not None and crank_event_time is not None:
210 flags |= CyclingPowerMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT # Crank revolution data present
212 # Validate instantaneous power (sint16 range)
213 if not SINT16_MIN <= instantaneous_power <= SINT16_MAX:
214 raise ValueError(f"Instantaneous power {instantaneous_power} W exceeds sint16 range")
216 # Start with flags and instantaneous power
217 result = bytearray()
218 result.extend(DataParser.encode_int16(flags, signed=False)) # Flags (16-bit)
219 result.extend(DataParser.encode_int16(instantaneous_power, signed=True)) # Power (sint16)
221 # Add optional fields based on flags
222 if pedal_power_balance is not None:
223 balance = int(pedal_power_balance * self.PEDAL_POWER_BALANCE_RESOLUTION) # Convert back to raw value
224 if not 0 <= balance <= UINT8_MAX:
225 raise ValueError(f"Pedal power balance {balance} exceeds uint8 range")
226 result.append(balance)
228 if accumulated_energy is not None:
229 energy = int(accumulated_energy)
230 if not 0 <= energy <= UINT16_MAX:
231 raise ValueError(f"Accumulated energy {energy} exceeds uint16 range")
232 result.extend(DataParser.encode_int16(energy, signed=False))
234 if wheel_revolutions is not None and wheel_event_time is not None:
235 wheel_rev = int(wheel_revolutions)
236 wheel_time = round(wheel_event_time * self.WHEEL_TIME_RESOLUTION)
237 if not 0 <= wheel_rev <= UINT32_MAX:
238 raise ValueError(f"Wheel revolutions {wheel_rev} exceeds uint32 range")
239 if not 0 <= wheel_time <= UINT16_MAX:
240 raise ValueError(f"Wheel event time {wheel_time} exceeds uint16 range")
241 result.extend(DataParser.encode_int32(wheel_rev, signed=False))
242 result.extend(DataParser.encode_int16(wheel_time, signed=False))
244 if crank_revolutions is not None and crank_event_time is not None:
245 crank_rev = int(crank_revolutions)
246 crank_time = round(crank_event_time * self.CRANK_TIME_RESOLUTION)
247 if not 0 <= crank_rev <= UINT16_MAX:
248 raise ValueError(f"Crank revolutions {crank_rev} exceeds uint16 range")
249 if not 0 <= crank_time <= UINT16_MAX:
250 raise ValueError(f"Crank event time {crank_time} exceeds uint16 range")
251 result.extend(DataParser.encode_int16(crank_rev, signed=False))
252 result.extend(DataParser.encode_int16(crank_time, signed=False))
254 return result