Coverage for src / bluetooth_sig / gatt / characteristics / cycling_power_measurement.py: 86%
139 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 20:14 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 20:14 +0000
1"""Cycling Power Measurement characteristic implementation."""
3from __future__ import annotations
5from enum import IntFlag
7import msgspec
9from ..constants import SINT16_MAX, SINT16_MIN, UINT8_MAX, UINT16_MAX
10from ..context import CharacteristicContext
11from .base import BaseCharacteristic
12from .cycling_power_feature import CyclingPowerFeatureCharacteristic
13from .utils import DataParser
16class CyclingPowerMeasurementFlags(IntFlag):
17 """Cycling Power Measurement Flags as per Bluetooth SIG specification."""
19 PEDAL_POWER_BALANCE_PRESENT = 0x0001
20 PEDAL_POWER_BALANCE_REFERENCE = 0x0002 # 0 = Unknown, 1 = Left
21 ACCUMULATED_TORQUE_PRESENT = 0x0004
22 ACCUMULATED_ENERGY_PRESENT = 0x0008
23 WHEEL_REVOLUTION_DATA_PRESENT = 0x0010
24 CRANK_REVOLUTION_DATA_PRESENT = 0x0020
25 EXTREME_FORCE_MAGNITUDES_PRESENT = 0x0040
26 EXTREME_TORQUE_MAGNITUDES_PRESENT = 0x0080
27 EXTREME_ANGLES_PRESENT = 0x0100
28 TOP_DEAD_SPOT_ANGLE_PRESENT = 0x0200
29 BOTTOM_DEAD_SPOT_ANGLE_PRESENT = 0x0400
30 ACCUMULATED_ENERGY_RESERVED = 0x0800
33class CyclingPowerMeasurementData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods,too-many-instance-attributes
34 """Parsed data from Cycling Power Measurement characteristic."""
36 flags: CyclingPowerMeasurementFlags
37 instantaneous_power: int # Watts
38 pedal_power_balance: float | None = None # Percentage (0.5% resolution)
39 accumulated_energy: int | None = None # kJ
40 cumulative_wheel_revolutions: int | None = None # Changed to match decode_value
41 last_wheel_event_time: float | None = None # seconds
42 cumulative_crank_revolutions: int | None = None # Changed to match decode_value
43 last_crank_event_time: float | None = None # seconds
45 def __post_init__(self) -> None:
46 """Validate cycling power measurement data."""
47 flags_value = int(self.flags)
48 if not 0 <= flags_value <= UINT16_MAX:
49 raise ValueError("Flags must be a uint16 value (0-UINT16_MAX)")
50 if not 0 <= self.instantaneous_power <= UINT16_MAX:
51 raise ValueError("Instantaneous power must be a uint16 value (0-UINT16_MAX)")
54class CyclingPowerMeasurementCharacteristic(BaseCharacteristic[CyclingPowerMeasurementData]):
55 """Cycling Power Measurement characteristic (0x2A63).
57 Used to transmit cycling power measurement data including
58 instantaneous power, pedal power balance, accumulated energy, and
59 revolution data.
60 """
62 # Special values
63 UNKNOWN_PEDAL_POWER_BALANCE = 0xFF # Value indicating unknown power balance
65 # Time resolution constants
66 WHEEL_TIME_RESOLUTION = 2048.0 # 1/2048 second resolution
67 CRANK_TIME_RESOLUTION = 1024.0 # 1/1024 second resolution
68 PEDAL_POWER_BALANCE_RESOLUTION = 2.0 # 0.5% resolution
70 _manual_unit: str = "W" # Watts unit for power measurement
72 _optional_dependencies = [CyclingPowerFeatureCharacteristic]
74 min_length: int = 4 # Flags(2) + Instantaneous Power(2)
75 allow_variable_length: bool = True # Many optional fields based on flags
77 def _decode_value(self, data: bytearray, ctx: CharacteristicContext | None = None) -> CyclingPowerMeasurementData: # pylint: disable=too-many-locals # Complex parsing with many optional fields
78 """Parse cycling power measurement data according to Bluetooth specification.
80 Format: Flags(2) + Instantaneous Power(2) + [Pedal Power Balance(1)] +
81 [Accumulated Energy(2)] + [Wheel Revolutions(4)] + [Last Wheel Event Time(2)] +
82 [Crank Revolutions(2)] + [Last Crank Event Time(2)]
84 Args:
85 data: Raw bytearray from BLE characteristic.
86 ctx: Optional CharacteristicContext providing surrounding context (may be None).
88 Returns:
89 CyclingPowerMeasurementData containing parsed power measurement data.
91 Raises:
92 ValueError: If data format is invalid.
94 """
95 if len(data) < 4:
96 raise ValueError("Cycling Power Measurement data must be at least 4 bytes")
98 # Parse flags (16-bit)
99 flags = DataParser.parse_int16(data, 0, signed=False)
101 # Parse instantaneous power (16-bit signed integer in watts)
102 instantaneous_power = DataParser.parse_int16(data, 2, signed=True)
104 offset = 4
106 # Parse optional fields
107 pedal_power_balance = None
108 accumulated_energy = None
109 cumulative_wheel_revolutions = None
110 last_wheel_event_time = None
111 cumulative_crank_revolutions = None
112 last_crank_event_time = None
114 # Parse optional pedal power balance (1 byte) if present
115 if (flags & CyclingPowerMeasurementFlags.PEDAL_POWER_BALANCE_PRESENT) and len(data) >= offset + 1:
116 pedal_power_balance_raw = data[offset]
117 # Value UNKNOWN_PEDAL_POWER_BALANCE indicates unknown, otherwise percentage (0-100)
118 if pedal_power_balance_raw != self.UNKNOWN_PEDAL_POWER_BALANCE:
119 pedal_power_balance = pedal_power_balance_raw / self.PEDAL_POWER_BALANCE_RESOLUTION # 0.5% resolution
120 offset += 1
122 # Parse optional accumulated energy (2 bytes) if present
123 if (flags & CyclingPowerMeasurementFlags.ACCUMULATED_ENERGY_PRESENT) and len(data) >= offset + 2:
124 accumulated_energy = DataParser.parse_int16(data, offset, signed=False) # kJ
125 offset += 2
127 # Parse optional wheel revolution data (6 bytes total) if present
128 if (flags & CyclingPowerMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT) and len(data) >= offset + 6:
129 cumulative_wheel_revolutions = DataParser.parse_int32(data, offset, signed=False)
130 wheel_event_time_raw = DataParser.parse_int16(data, offset + 4, signed=False)
131 # Wheel event time is in 1/WHEEL_TIME_RESOLUTION second units
132 last_wheel_event_time = wheel_event_time_raw / self.WHEEL_TIME_RESOLUTION
133 offset += 6
135 # Parse optional crank revolution data (4 bytes total) if present
136 if (flags & CyclingPowerMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT) and len(data) >= offset + 4:
137 cumulative_crank_revolutions = DataParser.parse_int16(data, offset, signed=False)
138 crank_event_time_raw = DataParser.parse_int16(data, offset + 2, signed=False)
139 # Crank event time is in 1/CRANK_TIME_RESOLUTION second units
140 last_crank_event_time = crank_event_time_raw / self.CRANK_TIME_RESOLUTION
141 offset += 4
143 # Validate flags against Cycling Power Feature if available
144 if ctx is not None:
145 feature_data = self.get_context_characteristic(ctx, CyclingPowerFeatureCharacteristic)
146 if feature_data is not None:
147 # feature_data is the CyclingPowerFeatureData struct
149 # Check if reported features are supported
150 reported_features = int(flags)
152 # Validate that reported features are actually supported
153 if (
154 reported_features & CyclingPowerMeasurementFlags.PEDAL_POWER_BALANCE_PRESENT
155 ) and not feature_data.pedal_power_balance_supported:
156 raise ValueError("Pedal power balance reported but not supported by Cycling Power Feature")
157 if (
158 reported_features & CyclingPowerMeasurementFlags.ACCUMULATED_ENERGY_PRESENT
159 ) and not feature_data.accumulated_energy_supported:
160 raise ValueError("Accumulated energy reported but not supported by Cycling Power Feature")
161 if (
162 reported_features & CyclingPowerMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT
163 ) and not feature_data.wheel_revolution_data_supported:
164 raise ValueError("Wheel revolution data reported but not supported by Cycling Power Feature")
165 if (
166 reported_features & CyclingPowerMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT
167 ) and not feature_data.crank_revolution_data_supported:
168 raise ValueError("Crank revolution data reported but not supported by Cycling Power Feature")
170 # Create struct with all parsed values
171 return CyclingPowerMeasurementData(
172 flags=CyclingPowerMeasurementFlags(flags),
173 instantaneous_power=instantaneous_power,
174 pedal_power_balance=pedal_power_balance,
175 accumulated_energy=accumulated_energy,
176 cumulative_wheel_revolutions=cumulative_wheel_revolutions,
177 last_wheel_event_time=last_wheel_event_time,
178 cumulative_crank_revolutions=cumulative_crank_revolutions,
179 last_crank_event_time=last_crank_event_time,
180 )
182 def _encode_value(self, data: CyclingPowerMeasurementData) -> bytearray: # pylint: disable=too-many-locals,too-many-branches,too-many-statements # Complex cycling power measurement with numerous optional fields
183 """Encode cycling power measurement value back to bytes.
185 Args:
186 data: CyclingPowerMeasurementData containing cycling power measurement data
188 Returns:
189 Encoded bytes representing the power measurement
191 """
192 instantaneous_power = data.instantaneous_power
193 pedal_power_balance = data.pedal_power_balance
194 accumulated_energy = data.accumulated_energy
195 wheel_revolutions = data.cumulative_wheel_revolutions # Updated field name
196 wheel_event_time = data.last_wheel_event_time
197 crank_revolutions = data.cumulative_crank_revolutions # Updated field name
198 crank_event_time = data.last_crank_event_time
200 # Build flags based on available data
201 flags = 0
202 if pedal_power_balance is not None:
203 flags |= CyclingPowerMeasurementFlags.PEDAL_POWER_BALANCE_PRESENT # Pedal power balance present
204 if accumulated_energy is not None:
205 flags |= CyclingPowerMeasurementFlags.ACCUMULATED_ENERGY_PRESENT # Accumulated energy present
206 if wheel_revolutions is not None and wheel_event_time is not None:
207 flags |= CyclingPowerMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT # Wheel revolution data present
208 if crank_revolutions is not None and crank_event_time is not None:
209 flags |= CyclingPowerMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT # Crank revolution data present
211 # Validate instantaneous power (sint16 range)
212 if not SINT16_MIN <= instantaneous_power <= SINT16_MAX:
213 raise ValueError(f"Instantaneous power {instantaneous_power} W exceeds sint16 range")
215 # Start with flags and instantaneous power
216 result = bytearray()
217 result.extend(DataParser.encode_int16(flags, signed=False)) # Flags (16-bit)
218 result.extend(DataParser.encode_int16(instantaneous_power, signed=True)) # Power (sint16)
220 # Add optional fields based on flags
221 if pedal_power_balance is not None:
222 balance = int(pedal_power_balance * self.PEDAL_POWER_BALANCE_RESOLUTION) # Convert back to raw value
223 if not 0 <= balance <= UINT8_MAX:
224 raise ValueError(f"Pedal power balance {balance} exceeds uint8 range")
225 result.append(balance)
227 if accumulated_energy is not None:
228 energy = int(accumulated_energy)
229 if not 0 <= energy <= 0xFFFF:
230 raise ValueError(f"Accumulated energy {energy} exceeds uint16 range")
231 result.extend(DataParser.encode_int16(energy, signed=False))
233 if wheel_revolutions is not None and wheel_event_time is not None:
234 wheel_rev = int(wheel_revolutions)
235 wheel_time = round(wheel_event_time * self.WHEEL_TIME_RESOLUTION)
236 if not 0 <= wheel_rev <= 0xFFFFFFFF:
237 raise ValueError(f"Wheel revolutions {wheel_rev} exceeds uint32 range")
238 if not 0 <= wheel_time <= 0xFFFF:
239 raise ValueError(f"Wheel event time {wheel_time} exceeds uint16 range")
240 result.extend(DataParser.encode_int32(wheel_rev, signed=False))
241 result.extend(DataParser.encode_int16(wheel_time, signed=False))
243 if crank_revolutions is not None and crank_event_time is not None:
244 crank_rev = int(crank_revolutions)
245 crank_time = round(crank_event_time * self.CRANK_TIME_RESOLUTION)
246 if not 0 <= crank_rev <= 0xFFFF:
247 raise ValueError(f"Crank revolutions {crank_rev} exceeds uint16 range")
248 if not 0 <= crank_time <= 0xFFFF:
249 raise ValueError(f"Crank event time {crank_time} exceeds uint16 range")
250 result.extend(DataParser.encode_int16(crank_rev, signed=False))
251 result.extend(DataParser.encode_int16(crank_time, signed=False))
253 return result