Coverage for src/bluetooth_sig/gatt/characteristics/cycling_power_measurement.py: 85%
137 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 00:10 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 00:10 +0000
1"""Cycling Power Measurement characteristic implementation."""
3from __future__ import annotations
5from enum import IntFlag
7import msgspec
9from ..constants import (
10 SINT16_MAX,
11 SINT16_MIN,
12 UINT8_MAX,
13 UINT16_MAX,
14)
15from ..context import CharacteristicContext
16from .base import BaseCharacteristic
17from .cycling_power_feature import CyclingPowerFeatureCharacteristic, CyclingPowerFeatureData
18from .utils import DataParser
21class CyclingPowerMeasurementFlags(IntFlag):
22 """Cycling Power Measurement Flags as per Bluetooth SIG specification."""
24 PEDAL_POWER_BALANCE_PRESENT = 0x0001
25 PEDAL_POWER_BALANCE_REFERENCE = 0x0002 # 0 = Unknown, 1 = Left
26 ACCUMULATED_TORQUE_PRESENT = 0x0004
27 ACCUMULATED_ENERGY_PRESENT = 0x0008
28 WHEEL_REVOLUTION_DATA_PRESENT = 0x0010
29 CRANK_REVOLUTION_DATA_PRESENT = 0x0020
30 EXTREME_FORCE_MAGNITUDES_PRESENT = 0x0040
31 EXTREME_TORQUE_MAGNITUDES_PRESENT = 0x0080
32 EXTREME_ANGLES_PRESENT = 0x0100
33 TOP_DEAD_SPOT_ANGLE_PRESENT = 0x0200
34 BOTTOM_DEAD_SPOT_ANGLE_PRESENT = 0x0400
35 ACCUMULATED_ENERGY_RESERVED = 0x0800
38class CyclingPowerMeasurementData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods,too-many-instance-attributes
39 """Parsed data from Cycling Power Measurement characteristic."""
41 flags: CyclingPowerMeasurementFlags
42 instantaneous_power: int # Watts
43 pedal_power_balance: float | None = None # Percentage (0.5% resolution)
44 accumulated_energy: int | None = None # kJ
45 cumulative_wheel_revolutions: int | None = None # Changed to match decode_value
46 last_wheel_event_time: float | None = None # seconds
47 cumulative_crank_revolutions: int | None = None # Changed to match decode_value
48 last_crank_event_time: float | None = None # seconds
50 def __post_init__(self) -> None:
51 """Validate cycling power measurement data."""
52 flags_value = int(self.flags)
53 if not 0 <= flags_value <= UINT16_MAX:
54 raise ValueError("Flags must be a uint16 value (0-UINT16_MAX)")
55 if not 0 <= self.instantaneous_power <= UINT16_MAX:
56 raise ValueError("Instantaneous power must be a uint16 value (0-UINT16_MAX)")
59class CyclingPowerMeasurementCharacteristic(BaseCharacteristic):
60 """Cycling Power Measurement characteristic (0x2A63).
62 Used to transmit cycling power measurement data including
63 instantaneous power, pedal power balance, accumulated energy, and
64 revolution data.
65 """
67 # Special values
68 UNKNOWN_PEDAL_POWER_BALANCE = 0xFF # Value indicating unknown power balance
70 # Time resolution constants
71 WHEEL_TIME_RESOLUTION = 2048.0 # 1/2048 second resolution
72 CRANK_TIME_RESOLUTION = 1024.0 # 1/1024 second resolution
73 PEDAL_POWER_BALANCE_RESOLUTION = 2.0 # 0.5% resolution
75 _manual_unit: str = "W" # Watts unit for power measurement
77 def decode_value(self, data: bytearray, ctx: CharacteristicContext | None = None) -> CyclingPowerMeasurementData: # pylint: disable=too-many-locals # Complex parsing with many optional fields
78 """Parse cycling power measurement data according to Bluetooth specification.
80 Format: Flags(2) + Instantaneous Power(2) + [Pedal Power Balance(1)] +
81 [Accumulated Energy(2)] + [Wheel Revolutions(4)] + [Last Wheel Event Time(2)] +
82 [Crank Revolutions(2)] + [Last Crank Event Time(2)]
84 Args:
85 data: Raw bytearray from BLE characteristic.
86 ctx: Optional CharacteristicContext providing surrounding context (may be None).
88 Returns:
89 CyclingPowerMeasurementData containing parsed power measurement data.
91 Raises:
92 ValueError: If data format is invalid.
94 """
95 if len(data) < 4:
96 raise ValueError("Cycling Power Measurement data must be at least 4 bytes")
98 # Parse flags (16-bit)
99 flags = DataParser.parse_int16(data, 0, signed=False)
101 # Parse instantaneous power (16-bit signed integer in watts)
102 instantaneous_power = DataParser.parse_int16(data, 2, signed=True)
104 offset = 4
106 # Parse optional fields
107 pedal_power_balance = None
108 accumulated_energy = None
109 cumulative_wheel_revolutions = None
110 last_wheel_event_time = None
111 cumulative_crank_revolutions = None
112 last_crank_event_time = None
114 # Parse optional pedal power balance (1 byte) if present
115 if (flags & CyclingPowerMeasurementFlags.PEDAL_POWER_BALANCE_PRESENT) and len(data) >= offset + 1:
116 pedal_power_balance_raw = data[offset]
117 # Value UNKNOWN_PEDAL_POWER_BALANCE indicates unknown, otherwise percentage (0-100)
118 if pedal_power_balance_raw != self.UNKNOWN_PEDAL_POWER_BALANCE:
119 pedal_power_balance = pedal_power_balance_raw / self.PEDAL_POWER_BALANCE_RESOLUTION # 0.5% resolution
120 offset += 1
122 # Parse optional accumulated energy (2 bytes) if present
123 if (flags & CyclingPowerMeasurementFlags.ACCUMULATED_ENERGY_PRESENT) and len(data) >= offset + 2:
124 accumulated_energy = DataParser.parse_int16(data, offset, signed=False) # kJ
125 offset += 2
127 # Parse optional wheel revolution data (6 bytes total) if present
128 if (flags & CyclingPowerMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT) and len(data) >= offset + 6:
129 cumulative_wheel_revolutions = DataParser.parse_int32(data, offset, signed=False)
130 wheel_event_time_raw = DataParser.parse_int16(data, offset + 4, signed=False)
131 # Wheel event time is in 1/WHEEL_TIME_RESOLUTION second units
132 last_wheel_event_time = wheel_event_time_raw / self.WHEEL_TIME_RESOLUTION
133 offset += 6
135 # Parse optional crank revolution data (4 bytes total) if present
136 if (flags & CyclingPowerMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT) and len(data) >= offset + 4:
137 cumulative_crank_revolutions = DataParser.parse_int16(data, offset, signed=False)
138 crank_event_time_raw = DataParser.parse_int16(data, offset + 2, signed=False)
139 # Crank event time is in 1/CRANK_TIME_RESOLUTION second units
140 last_crank_event_time = crank_event_time_raw / self.CRANK_TIME_RESOLUTION
141 offset += 4
143 # Validate flags against Cycling Power Feature if available
144 if ctx is not None:
145 feature_char = self.get_context_characteristic(ctx, CyclingPowerFeatureCharacteristic)
146 if feature_char and feature_char.parse_success and feature_char.value is not None:
147 # feature_char.value is the CyclingPowerFeatureData struct
148 feature_data: CyclingPowerFeatureData = feature_char.value
150 # Check if reported features are supported
151 reported_features = int(flags)
153 # Validate that reported features are actually supported
154 if (
155 reported_features & CyclingPowerMeasurementFlags.PEDAL_POWER_BALANCE_PRESENT
156 ) and not feature_data.pedal_power_balance_supported:
157 raise ValueError("Pedal power balance reported but not supported by Cycling Power Feature")
158 if (
159 reported_features & CyclingPowerMeasurementFlags.ACCUMULATED_ENERGY_PRESENT
160 ) and not feature_data.accumulated_energy_supported:
161 raise ValueError("Accumulated energy reported but not supported by Cycling Power Feature")
162 if (
163 reported_features & CyclingPowerMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT
164 ) and not feature_data.wheel_revolution_data_supported:
165 raise ValueError("Wheel revolution data reported but not supported by Cycling Power Feature")
166 if (
167 reported_features & CyclingPowerMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT
168 ) and not feature_data.crank_revolution_data_supported:
169 raise ValueError("Crank revolution data reported but not supported by Cycling Power Feature")
171 # Create struct with all parsed values
172 return CyclingPowerMeasurementData(
173 flags=CyclingPowerMeasurementFlags(flags),
174 instantaneous_power=instantaneous_power,
175 pedal_power_balance=pedal_power_balance,
176 accumulated_energy=accumulated_energy,
177 cumulative_wheel_revolutions=cumulative_wheel_revolutions,
178 last_wheel_event_time=last_wheel_event_time,
179 cumulative_crank_revolutions=cumulative_crank_revolutions,
180 last_crank_event_time=last_crank_event_time,
181 )
183 def encode_value(self, data: CyclingPowerMeasurementData) -> bytearray: # pylint: disable=too-many-locals,too-many-branches,too-many-statements # Complex cycling power measurement with numerous optional fields
184 """Encode cycling power measurement value back to bytes.
186 Args:
187 data: CyclingPowerMeasurementData containing cycling power measurement data
189 Returns:
190 Encoded bytes representing the power measurement
192 """
193 instantaneous_power = data.instantaneous_power
194 pedal_power_balance = data.pedal_power_balance
195 accumulated_energy = data.accumulated_energy
196 wheel_revolutions = data.cumulative_wheel_revolutions # Updated field name
197 wheel_event_time = data.last_wheel_event_time
198 crank_revolutions = data.cumulative_crank_revolutions # Updated field name
199 crank_event_time = data.last_crank_event_time
201 # Build flags based on available data
202 flags = 0
203 if pedal_power_balance is not None:
204 flags |= CyclingPowerMeasurementFlags.PEDAL_POWER_BALANCE_PRESENT # Pedal power balance present
205 if accumulated_energy is not None:
206 flags |= CyclingPowerMeasurementFlags.ACCUMULATED_ENERGY_PRESENT # Accumulated energy present
207 if wheel_revolutions is not None and wheel_event_time is not None:
208 flags |= CyclingPowerMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT # Wheel revolution data present
209 if crank_revolutions is not None and crank_event_time is not None:
210 flags |= CyclingPowerMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT # Crank revolution data present
212 # Validate instantaneous power (sint16 range)
213 if not SINT16_MIN <= instantaneous_power <= SINT16_MAX:
214 raise ValueError(f"Instantaneous power {instantaneous_power} W exceeds sint16 range")
216 # Start with flags and instantaneous power
217 result = bytearray()
218 result.extend(DataParser.encode_int16(flags, signed=False)) # Flags (16-bit)
219 result.extend(DataParser.encode_int16(instantaneous_power, signed=True)) # Power (sint16)
221 # Add optional fields based on flags
222 if pedal_power_balance is not None:
223 balance = int(pedal_power_balance * self.PEDAL_POWER_BALANCE_RESOLUTION) # Convert back to raw value
224 if not 0 <= balance <= UINT8_MAX:
225 raise ValueError(f"Pedal power balance {balance} exceeds uint8 range")
226 result.append(balance)
228 if accumulated_energy is not None:
229 energy = int(accumulated_energy)
230 if not 0 <= energy <= 0xFFFF:
231 raise ValueError(f"Accumulated energy {energy} exceeds uint16 range")
232 result.extend(DataParser.encode_int16(energy, signed=False))
234 if wheel_revolutions is not None and wheel_event_time is not None:
235 wheel_rev = int(wheel_revolutions)
236 wheel_time = round(wheel_event_time * self.WHEEL_TIME_RESOLUTION)
237 if not 0 <= wheel_rev <= 0xFFFFFFFF:
238 raise ValueError(f"Wheel revolutions {wheel_rev} exceeds uint32 range")
239 if not 0 <= wheel_time <= 0xFFFF:
240 raise ValueError(f"Wheel event time {wheel_time} exceeds uint16 range")
241 result.extend(DataParser.encode_int32(wheel_rev, signed=False))
242 result.extend(DataParser.encode_int16(wheel_time, signed=False))
244 if crank_revolutions is not None and crank_event_time is not None:
245 crank_rev = int(crank_revolutions)
246 crank_time = round(crank_event_time * self.CRANK_TIME_RESOLUTION)
247 if not 0 <= crank_rev <= 0xFFFF:
248 raise ValueError(f"Crank revolutions {crank_rev} exceeds uint16 range")
249 if not 0 <= crank_time <= 0xFFFF:
250 raise ValueError(f"Crank event time {crank_time} exceeds uint16 range")
251 result.extend(DataParser.encode_int16(crank_rev, signed=False))
252 result.extend(DataParser.encode_int16(crank_time, signed=False))
254 return result