Coverage for src / bluetooth_sig / gatt / characteristics / cycling_power_measurement.py: 79%

231 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-03 16:41 +0000

1"""Cycling Power Measurement characteristic implementation.""" 

2 

3from __future__ import annotations 

4 

5from enum import IntFlag 

6from typing import Any, ClassVar 

7 

8import msgspec 

9 

10from ..constants import SINT16_MAX, SINT16_MIN, UINT8_MAX, UINT16_MAX, UINT32_MAX 

11from ..context import CharacteristicContext 

12from .base import BaseCharacteristic 

13from .cycling_power_feature import CyclingPowerFeatureCharacteristic 

14from .utils import DataParser 

15 

16 

17class CyclingPowerMeasurementFlags(IntFlag): 

18 """Cycling Power Measurement Flags as per CPS v1.1 Table 3.2.""" 

19 

20 PEDAL_POWER_BALANCE_PRESENT = 0x0001 # bit 0 

21 PEDAL_POWER_BALANCE_REFERENCE = 0x0002 # bit 1 — 0=Unknown, 1=Left 

22 ACCUMULATED_TORQUE_PRESENT = 0x0004 # bit 2 

23 ACCUMULATED_TORQUE_SOURCE = 0x0008 # bit 3 — 0=Wheel Based, 1=Crank Based 

24 WHEEL_REVOLUTION_DATA_PRESENT = 0x0010 # bit 4 

25 CRANK_REVOLUTION_DATA_PRESENT = 0x0020 # bit 5 

26 EXTREME_FORCE_MAGNITUDES_PRESENT = 0x0040 # bit 6 

27 EXTREME_TORQUE_MAGNITUDES_PRESENT = 0x0080 # bit 7 

28 EXTREME_ANGLES_PRESENT = 0x0100 # bit 8 

29 TOP_DEAD_SPOT_ANGLE_PRESENT = 0x0200 # bit 9 

30 BOTTOM_DEAD_SPOT_ANGLE_PRESENT = 0x0400 # bit 10 

31 ACCUMULATED_ENERGY_PRESENT = 0x0800 # bit 11 

32 OFFSET_COMPENSATION_INDICATOR = 0x1000 # bit 12 

33 

34 

35class CyclingPowerMeasurementData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods,too-many-instance-attributes 

36 """Parsed data from Cycling Power Measurement characteristic.""" 

37 

38 flags: CyclingPowerMeasurementFlags 

39 instantaneous_power: int # Watts (sint16) 

40 pedal_power_balance: float | None = None # Percentage (0.5% resolution) 

41 accumulated_torque: float | None = None # Newton metres (1/32 Nm resolution) 

42 accumulated_energy: int | None = None # kJ 

43 cumulative_wheel_revolutions: int | None = None 

44 last_wheel_event_time: float | None = None # seconds 

45 cumulative_crank_revolutions: int | None = None 

46 last_crank_event_time: float | None = None # seconds 

47 maximum_force_magnitude: int | None = None # Newtons (sint16) 

48 minimum_force_magnitude: int | None = None # Newtons (sint16) 

49 maximum_torque_magnitude: float | None = None # Nm (sint16, 1/32 resolution) 

50 minimum_torque_magnitude: float | None = None # Nm (sint16, 1/32 resolution) 

51 maximum_angle: int | None = None # degrees (uint16) 

52 minimum_angle: int | None = None # degrees (uint16) 

53 top_dead_spot_angle: int | None = None # degrees (uint16) 

54 bottom_dead_spot_angle: int | None = None # degrees (uint16) 

55 

56 def __post_init__(self) -> None: 

57 """Validate cycling power measurement data.""" 

58 flags_value = int(self.flags) 

59 if not 0 <= flags_value <= UINT16_MAX: 

60 raise ValueError("Flags must be a uint16 value (0-UINT16_MAX)") 

61 if not SINT16_MIN <= self.instantaneous_power <= SINT16_MAX: 

62 raise ValueError("Instantaneous power must be a sint16 value") 

63 

64 

65class CyclingPowerMeasurementCharacteristic(BaseCharacteristic[CyclingPowerMeasurementData]): 

66 """Cycling Power Measurement characteristic (0x2A63). 

67 

68 Used to transmit cycling power measurement data including 

69 instantaneous power, pedal power balance, accumulated energy, and 

70 revolution data. 

71 """ 

72 

73 # Special values 

74 UNKNOWN_PEDAL_POWER_BALANCE = 0xFF # Value indicating unknown power balance 

75 

76 # Time resolution constants 

77 WHEEL_TIME_RESOLUTION = 2048.0 # 1/2048 second resolution 

78 CRANK_TIME_RESOLUTION = 1024.0 # 1/1024 second resolution 

79 PEDAL_POWER_BALANCE_RESOLUTION = 2.0 # 0.5% resolution 

80 ACCUMULATED_TORQUE_RESOLUTION = 32.0 # 1/32 Nm resolution 

81 

82 _manual_unit: str = "W" # Watts unit for power measurement 

83 

84 _optional_dependencies: ClassVar[list[type[BaseCharacteristic[Any]]]] = [CyclingPowerFeatureCharacteristic] 

85 

86 min_length: int = 4 # Flags(2) + Instantaneous Power(2) 

87 allow_variable_length: bool = True # Many optional fields based on flags 

88 

89 def _decode_value( 

90 self, data: bytearray, ctx: CharacteristicContext | None = None, *, validate: bool = True 

91 ) -> CyclingPowerMeasurementData: # pylint: disable=too-many-locals # Complex parsing with many optional fields 

92 """Parse cycling power measurement data according to Bluetooth specification. 

93 

94 Format: Flags(2) + Instantaneous Power(2) + [Pedal Power Balance(1)] + 

95 [Accumulated Torque(2)] + [Wheel Revolutions(4) + Last Wheel Event Time(2)] + 

96 [Crank Revolutions(2) + Last Crank Event Time(2)] + [Accumulated Energy(2)] 

97 

98 Args: 

99 data: Raw bytearray from BLE characteristic. 

100 ctx: Optional CharacteristicContext providing surrounding context (may be None). 

101 validate: Whether to validate ranges (default True) 

102 

103 Returns: 

104 CyclingPowerMeasurementData containing parsed power measurement data. 

105 

106 Raises: 

107 ValueError: If data format is invalid. 

108 

109 """ 

110 # Parse flags (16-bit) 

111 flags = DataParser.parse_int16(data, 0, signed=False) 

112 

113 # Parse instantaneous power (16-bit signed integer in watts) 

114 instantaneous_power = DataParser.parse_int16(data, 2, signed=True) 

115 

116 offset = 4 

117 

118 # Parse optional fields 

119 pedal_power_balance = None 

120 accumulated_torque = None 

121 accumulated_energy = None 

122 cumulative_wheel_revolutions = None 

123 last_wheel_event_time = None 

124 cumulative_crank_revolutions = None 

125 last_crank_event_time = None 

126 maximum_force_magnitude = None 

127 minimum_force_magnitude = None 

128 maximum_torque_magnitude = None 

129 minimum_torque_magnitude = None 

130 maximum_angle = None 

131 minimum_angle = None 

132 top_dead_spot_angle = None 

133 bottom_dead_spot_angle = None 

134 

135 # Parse optional pedal power balance (1 byte) if present (bit 0) 

136 if (flags & CyclingPowerMeasurementFlags.PEDAL_POWER_BALANCE_PRESENT) and len(data) >= offset + 1: 

137 pedal_power_balance_raw = data[offset] 

138 # Value UNKNOWN_PEDAL_POWER_BALANCE indicates unknown, otherwise percentage 

139 if pedal_power_balance_raw != self.UNKNOWN_PEDAL_POWER_BALANCE: 

140 pedal_power_balance = pedal_power_balance_raw / self.PEDAL_POWER_BALANCE_RESOLUTION # 0.5% resolution 

141 offset += 1 

142 

143 # Parse optional accumulated torque (2 bytes, uint16, 1/32 Nm resolution) if present (bit 2) 

144 if (flags & CyclingPowerMeasurementFlags.ACCUMULATED_TORQUE_PRESENT) and len(data) >= offset + 2: 

145 accumulated_torque_raw = DataParser.parse_int16(data, offset, signed=False) 

146 accumulated_torque = accumulated_torque_raw / self.ACCUMULATED_TORQUE_RESOLUTION 

147 offset += 2 

148 

149 # Parse optional wheel revolution data (6 bytes total) if present (bit 4) 

150 if (flags & CyclingPowerMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT) and len(data) >= offset + 6: 

151 cumulative_wheel_revolutions = DataParser.parse_int32(data, offset, signed=False) 

152 wheel_event_time_raw = DataParser.parse_int16(data, offset + 4, signed=False) 

153 # Wheel event time is in 1/WHEEL_TIME_RESOLUTION second units 

154 last_wheel_event_time = wheel_event_time_raw / self.WHEEL_TIME_RESOLUTION 

155 offset += 6 

156 

157 # Parse optional crank revolution data (4 bytes total) if present (bit 5) 

158 if (flags & CyclingPowerMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT) and len(data) >= offset + 4: 

159 cumulative_crank_revolutions = DataParser.parse_int16(data, offset, signed=False) 

160 crank_event_time_raw = DataParser.parse_int16(data, offset + 2, signed=False) 

161 # Crank event time is in 1/CRANK_TIME_RESOLUTION second units 

162 last_crank_event_time = crank_event_time_raw / self.CRANK_TIME_RESOLUTION 

163 offset += 4 

164 

165 # Parse optional extreme force magnitudes (4 bytes: max sint16 + min sint16) if present (bit 6) 

166 if (flags & CyclingPowerMeasurementFlags.EXTREME_FORCE_MAGNITUDES_PRESENT) and len(data) >= offset + 4: 

167 maximum_force_magnitude = DataParser.parse_int16(data, offset, signed=True) 

168 minimum_force_magnitude = DataParser.parse_int16(data, offset + 2, signed=True) 

169 offset += 4 

170 

171 # Parse optional extreme torque magnitudes (4 bytes: max sint16 + min sint16, 1/32 Nm) if present (bit 7) 

172 if (flags & CyclingPowerMeasurementFlags.EXTREME_TORQUE_MAGNITUDES_PRESENT) and len(data) >= offset + 4: 

173 maximum_torque_magnitude = DataParser.parse_int16(data, offset, signed=True) / 32.0 

174 minimum_torque_magnitude = DataParser.parse_int16(data, offset + 2, signed=True) / 32.0 

175 offset += 4 

176 

177 # Parse optional extreme angles (3 bytes packed: max uint12 + min uint12) if present (bit 8) 

178 if (flags & CyclingPowerMeasurementFlags.EXTREME_ANGLES_PRESENT) and len(data) >= offset + 3: 

179 raw_bytes = data[offset : offset + 3] 

180 combined = raw_bytes[0] | (raw_bytes[1] << 8) | (raw_bytes[2] << 16) 

181 maximum_angle = combined & 0x0FFF 

182 minimum_angle = (combined >> 12) & 0x0FFF 

183 offset += 3 

184 

185 # Parse optional top dead spot angle (2 bytes, uint16 degrees) if present (bit 9) 

186 if (flags & CyclingPowerMeasurementFlags.TOP_DEAD_SPOT_ANGLE_PRESENT) and len(data) >= offset + 2: 

187 top_dead_spot_angle = DataParser.parse_int16(data, offset, signed=False) 

188 offset += 2 

189 

190 # Parse optional bottom dead spot angle (2 bytes, uint16 degrees) if present (bit 10) 

191 if (flags & CyclingPowerMeasurementFlags.BOTTOM_DEAD_SPOT_ANGLE_PRESENT) and len(data) >= offset + 2: 

192 bottom_dead_spot_angle = DataParser.parse_int16(data, offset, signed=False) 

193 offset += 2 

194 

195 # Parse optional accumulated energy (2 bytes, uint16, kJ) if present (bit 11) 

196 if (flags & CyclingPowerMeasurementFlags.ACCUMULATED_ENERGY_PRESENT) and len(data) >= offset + 2: 

197 accumulated_energy = DataParser.parse_int16(data, offset, signed=False) # kJ 

198 offset += 2 

199 

200 # Validate flags against Cycling Power Feature if available 

201 if ctx is not None: 

202 feature_data = self.get_context_characteristic(ctx, CyclingPowerFeatureCharacteristic) 

203 if feature_data is not None: 

204 from .cycling_power_feature import CyclingPowerFeatures # noqa: PLC0415 — local import to avoid cycle 

205 

206 # Validate that reported features are actually supported 

207 if (flags & CyclingPowerMeasurementFlags.PEDAL_POWER_BALANCE_PRESENT) and not ( 

208 feature_data.features & CyclingPowerFeatures.PEDAL_POWER_BALANCE_SUPPORTED 

209 ): 

210 raise ValueError("Pedal power balance reported but not supported by Cycling Power Feature") 

211 if (flags & CyclingPowerMeasurementFlags.ACCUMULATED_ENERGY_PRESENT) and not ( 

212 feature_data.features & CyclingPowerFeatures.ACCUMULATED_ENERGY_SUPPORTED 

213 ): 

214 raise ValueError("Accumulated energy reported but not supported by Cycling Power Feature") 

215 if (flags & CyclingPowerMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT) and not ( 

216 feature_data.features & CyclingPowerFeatures.WHEEL_REVOLUTION_DATA_SUPPORTED 

217 ): 

218 raise ValueError("Wheel revolution data reported but not supported by Cycling Power Feature") 

219 if (flags & CyclingPowerMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT) and not ( 

220 feature_data.features & CyclingPowerFeatures.CRANK_REVOLUTION_DATA_SUPPORTED 

221 ): 

222 raise ValueError("Crank revolution data reported but not supported by Cycling Power Feature") 

223 

224 # Create struct with all parsed values 

225 return CyclingPowerMeasurementData( 

226 flags=CyclingPowerMeasurementFlags(flags), 

227 instantaneous_power=instantaneous_power, 

228 pedal_power_balance=pedal_power_balance, 

229 accumulated_torque=accumulated_torque, 

230 accumulated_energy=accumulated_energy, 

231 cumulative_wheel_revolutions=cumulative_wheel_revolutions, 

232 last_wheel_event_time=last_wheel_event_time, 

233 cumulative_crank_revolutions=cumulative_crank_revolutions, 

234 last_crank_event_time=last_crank_event_time, 

235 maximum_force_magnitude=maximum_force_magnitude, 

236 minimum_force_magnitude=minimum_force_magnitude, 

237 maximum_torque_magnitude=maximum_torque_magnitude, 

238 minimum_torque_magnitude=minimum_torque_magnitude, 

239 maximum_angle=maximum_angle, 

240 minimum_angle=minimum_angle, 

241 top_dead_spot_angle=top_dead_spot_angle, 

242 bottom_dead_spot_angle=bottom_dead_spot_angle, 

243 ) 

244 

245 def _encode_value(self, data: CyclingPowerMeasurementData) -> bytearray: # noqa: PLR0912 # pylint: disable=too-many-locals,too-many-branches,too-many-statements 

246 """Encode cycling power measurement value back to bytes. 

247 

248 Args: 

249 data: CyclingPowerMeasurementData containing cycling power measurement data 

250 

251 Returns: 

252 Encoded bytes representing the power measurement 

253 

254 """ 

255 instantaneous_power = data.instantaneous_power 

256 pedal_power_balance = data.pedal_power_balance 

257 accumulated_torque = data.accumulated_torque 

258 accumulated_energy = data.accumulated_energy 

259 wheel_revolutions = data.cumulative_wheel_revolutions 

260 wheel_event_time = data.last_wheel_event_time 

261 crank_revolutions = data.cumulative_crank_revolutions 

262 crank_event_time = data.last_crank_event_time 

263 

264 # Build flags based on available data 

265 flags = 0 

266 if pedal_power_balance is not None: 

267 flags |= CyclingPowerMeasurementFlags.PEDAL_POWER_BALANCE_PRESENT 

268 if accumulated_torque is not None: 

269 flags |= CyclingPowerMeasurementFlags.ACCUMULATED_TORQUE_PRESENT 

270 if wheel_revolutions is not None and wheel_event_time is not None: 

271 flags |= CyclingPowerMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT 

272 if crank_revolutions is not None and crank_event_time is not None: 

273 flags |= CyclingPowerMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT 

274 if data.maximum_force_magnitude is not None and data.minimum_force_magnitude is not None: 

275 flags |= CyclingPowerMeasurementFlags.EXTREME_FORCE_MAGNITUDES_PRESENT 

276 if data.maximum_torque_magnitude is not None and data.minimum_torque_magnitude is not None: 

277 flags |= CyclingPowerMeasurementFlags.EXTREME_TORQUE_MAGNITUDES_PRESENT 

278 if data.maximum_angle is not None and data.minimum_angle is not None: 

279 flags |= CyclingPowerMeasurementFlags.EXTREME_ANGLES_PRESENT 

280 if data.top_dead_spot_angle is not None: 

281 flags |= CyclingPowerMeasurementFlags.TOP_DEAD_SPOT_ANGLE_PRESENT 

282 if data.bottom_dead_spot_angle is not None: 

283 flags |= CyclingPowerMeasurementFlags.BOTTOM_DEAD_SPOT_ANGLE_PRESENT 

284 if accumulated_energy is not None: 

285 flags |= CyclingPowerMeasurementFlags.ACCUMULATED_ENERGY_PRESENT 

286 

287 # Validate instantaneous power (sint16 range) 

288 if not SINT16_MIN <= instantaneous_power <= SINT16_MAX: 

289 raise ValueError(f"Instantaneous power {instantaneous_power} W exceeds sint16 range") 

290 

291 # Start with flags and instantaneous power 

292 result = bytearray() 

293 result.extend(DataParser.encode_int16(flags, signed=False)) # Flags (16-bit) 

294 result.extend(DataParser.encode_int16(instantaneous_power, signed=True)) # Power (sint16) 

295 

296 # Add optional fields in spec order (per CPS v1.1 §3.2.1) 

297 if pedal_power_balance is not None: 

298 balance = int(pedal_power_balance * self.PEDAL_POWER_BALANCE_RESOLUTION) 

299 if not 0 <= balance <= UINT8_MAX: 

300 raise ValueError(f"Pedal power balance {balance} exceeds uint8 range") 

301 result.append(balance) 

302 

303 if accumulated_torque is not None: 

304 torque_raw = round(accumulated_torque * self.ACCUMULATED_TORQUE_RESOLUTION) 

305 if not 0 <= torque_raw <= UINT16_MAX: 

306 raise ValueError(f"Accumulated torque {torque_raw} exceeds uint16 range") 

307 result.extend(DataParser.encode_int16(torque_raw, signed=False)) 

308 

309 if wheel_revolutions is not None and wheel_event_time is not None: 

310 wheel_rev = int(wheel_revolutions) 

311 wheel_time = round(wheel_event_time * self.WHEEL_TIME_RESOLUTION) 

312 if not 0 <= wheel_rev <= UINT32_MAX: 

313 raise ValueError(f"Wheel revolutions {wheel_rev} exceeds uint32 range") 

314 if not 0 <= wheel_time <= UINT16_MAX: 

315 raise ValueError(f"Wheel event time {wheel_time} exceeds uint16 range") 

316 result.extend(DataParser.encode_int32(wheel_rev, signed=False)) 

317 result.extend(DataParser.encode_int16(wheel_time, signed=False)) 

318 

319 if crank_revolutions is not None and crank_event_time is not None: 

320 crank_rev = int(crank_revolutions) 

321 crank_time = round(crank_event_time * self.CRANK_TIME_RESOLUTION) 

322 if not 0 <= crank_rev <= UINT16_MAX: 

323 raise ValueError(f"Crank revolutions {crank_rev} exceeds uint16 range") 

324 if not 0 <= crank_time <= UINT16_MAX: 

325 raise ValueError(f"Crank event time {crank_time} exceeds uint16 range") 

326 result.extend(DataParser.encode_int16(crank_rev, signed=False)) 

327 result.extend(DataParser.encode_int16(crank_time, signed=False)) 

328 

329 # Encode extreme force magnitudes (bit 6): max sint16 + min sint16 

330 if data.maximum_force_magnitude is not None and data.minimum_force_magnitude is not None: 

331 if not SINT16_MIN <= data.maximum_force_magnitude <= SINT16_MAX: 

332 raise ValueError(f"Maximum force magnitude {data.maximum_force_magnitude} exceeds sint16 range") 

333 if not SINT16_MIN <= data.minimum_force_magnitude <= SINT16_MAX: 

334 raise ValueError(f"Minimum force magnitude {data.minimum_force_magnitude} exceeds sint16 range") 

335 result.extend(DataParser.encode_int16(data.maximum_force_magnitude, signed=True)) 

336 result.extend(DataParser.encode_int16(data.minimum_force_magnitude, signed=True)) 

337 

338 # Encode extreme torque magnitudes (bit 7): max sint16 + min sint16, 1/32 Nm resolution 

339 if data.maximum_torque_magnitude is not None and data.minimum_torque_magnitude is not None: 

340 max_torque_raw = round(data.maximum_torque_magnitude * 32) 

341 min_torque_raw = round(data.minimum_torque_magnitude * 32) 

342 if not SINT16_MIN <= max_torque_raw <= SINT16_MAX: 

343 raise ValueError(f"Maximum torque magnitude raw {max_torque_raw} exceeds sint16 range") 

344 if not SINT16_MIN <= min_torque_raw <= SINT16_MAX: 

345 raise ValueError(f"Minimum torque magnitude raw {min_torque_raw} exceeds sint16 range") 

346 result.extend(DataParser.encode_int16(max_torque_raw, signed=True)) 

347 result.extend(DataParser.encode_int16(min_torque_raw, signed=True)) 

348 

349 # Encode extreme angles (bit 8): two uint12 packed in 3 bytes 

350 if data.maximum_angle is not None and data.minimum_angle is not None: 

351 max_ang = data.maximum_angle & 0x0FFF 

352 min_ang = data.minimum_angle & 0x0FFF 

353 combined = max_ang | (min_ang << 12) 

354 result.append(combined & 0xFF) 

355 result.append((combined >> 8) & 0xFF) 

356 result.append((combined >> 16) & 0xFF) 

357 

358 # Encode top dead spot angle (bit 9): uint16 degrees 

359 if data.top_dead_spot_angle is not None: 

360 if not 0 <= data.top_dead_spot_angle <= UINT16_MAX: 

361 raise ValueError(f"Top dead spot angle {data.top_dead_spot_angle} exceeds uint16 range") 

362 result.extend(DataParser.encode_int16(data.top_dead_spot_angle, signed=False)) 

363 

364 # Encode bottom dead spot angle (bit 10): uint16 degrees 

365 if data.bottom_dead_spot_angle is not None: 

366 if not 0 <= data.bottom_dead_spot_angle <= UINT16_MAX: 

367 raise ValueError(f"Bottom dead spot angle {data.bottom_dead_spot_angle} exceeds uint16 range") 

368 result.extend(DataParser.encode_int16(data.bottom_dead_spot_angle, signed=False)) 

369 

370 if accumulated_energy is not None: 

371 energy = int(accumulated_energy) 

372 if not 0 <= energy <= UINT16_MAX: 

373 raise ValueError(f"Accumulated energy {energy} exceeds uint16 range") 

374 result.extend(DataParser.encode_int16(energy, signed=False)) 

375 

376 return result