Coverage for src / bluetooth_sig / gatt / characteristics / cycling_power_measurement.py: 86%

139 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 20:14 +0000

1"""Cycling Power Measurement characteristic implementation.""" 

2 

3from __future__ import annotations 

4 

5from enum import IntFlag 

6 

7import msgspec 

8 

9from ..constants import SINT16_MAX, SINT16_MIN, UINT8_MAX, UINT16_MAX 

10from ..context import CharacteristicContext 

11from .base import BaseCharacteristic 

12from .cycling_power_feature import CyclingPowerFeatureCharacteristic 

13from .utils import DataParser 

14 

15 

16class CyclingPowerMeasurementFlags(IntFlag): 

17 """Cycling Power Measurement Flags as per Bluetooth SIG specification.""" 

18 

19 PEDAL_POWER_BALANCE_PRESENT = 0x0001 

20 PEDAL_POWER_BALANCE_REFERENCE = 0x0002 # 0 = Unknown, 1 = Left 

21 ACCUMULATED_TORQUE_PRESENT = 0x0004 

22 ACCUMULATED_ENERGY_PRESENT = 0x0008 

23 WHEEL_REVOLUTION_DATA_PRESENT = 0x0010 

24 CRANK_REVOLUTION_DATA_PRESENT = 0x0020 

25 EXTREME_FORCE_MAGNITUDES_PRESENT = 0x0040 

26 EXTREME_TORQUE_MAGNITUDES_PRESENT = 0x0080 

27 EXTREME_ANGLES_PRESENT = 0x0100 

28 TOP_DEAD_SPOT_ANGLE_PRESENT = 0x0200 

29 BOTTOM_DEAD_SPOT_ANGLE_PRESENT = 0x0400 

30 ACCUMULATED_ENERGY_RESERVED = 0x0800 

31 

32 

33class CyclingPowerMeasurementData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods,too-many-instance-attributes 

34 """Parsed data from Cycling Power Measurement characteristic.""" 

35 

36 flags: CyclingPowerMeasurementFlags 

37 instantaneous_power: int # Watts 

38 pedal_power_balance: float | None = None # Percentage (0.5% resolution) 

39 accumulated_energy: int | None = None # kJ 

40 cumulative_wheel_revolutions: int | None = None # Changed to match decode_value 

41 last_wheel_event_time: float | None = None # seconds 

42 cumulative_crank_revolutions: int | None = None # Changed to match decode_value 

43 last_crank_event_time: float | None = None # seconds 

44 

45 def __post_init__(self) -> None: 

46 """Validate cycling power measurement data.""" 

47 flags_value = int(self.flags) 

48 if not 0 <= flags_value <= UINT16_MAX: 

49 raise ValueError("Flags must be a uint16 value (0-UINT16_MAX)") 

50 if not 0 <= self.instantaneous_power <= UINT16_MAX: 

51 raise ValueError("Instantaneous power must be a uint16 value (0-UINT16_MAX)") 

52 

53 

54class CyclingPowerMeasurementCharacteristic(BaseCharacteristic[CyclingPowerMeasurementData]): 

55 """Cycling Power Measurement characteristic (0x2A63). 

56 

57 Used to transmit cycling power measurement data including 

58 instantaneous power, pedal power balance, accumulated energy, and 

59 revolution data. 

60 """ 

61 

62 # Special values 

63 UNKNOWN_PEDAL_POWER_BALANCE = 0xFF # Value indicating unknown power balance 

64 

65 # Time resolution constants 

66 WHEEL_TIME_RESOLUTION = 2048.0 # 1/2048 second resolution 

67 CRANK_TIME_RESOLUTION = 1024.0 # 1/1024 second resolution 

68 PEDAL_POWER_BALANCE_RESOLUTION = 2.0 # 0.5% resolution 

69 

70 _manual_unit: str = "W" # Watts unit for power measurement 

71 

72 _optional_dependencies = [CyclingPowerFeatureCharacteristic] 

73 

74 min_length: int = 4 # Flags(2) + Instantaneous Power(2) 

75 allow_variable_length: bool = True # Many optional fields based on flags 

76 

77 def _decode_value(self, data: bytearray, ctx: CharacteristicContext | None = None) -> CyclingPowerMeasurementData: # pylint: disable=too-many-locals # Complex parsing with many optional fields 

78 """Parse cycling power measurement data according to Bluetooth specification. 

79 

80 Format: Flags(2) + Instantaneous Power(2) + [Pedal Power Balance(1)] + 

81 [Accumulated Energy(2)] + [Wheel Revolutions(4)] + [Last Wheel Event Time(2)] + 

82 [Crank Revolutions(2)] + [Last Crank Event Time(2)] 

83 

84 Args: 

85 data: Raw bytearray from BLE characteristic. 

86 ctx: Optional CharacteristicContext providing surrounding context (may be None). 

87 

88 Returns: 

89 CyclingPowerMeasurementData containing parsed power measurement data. 

90 

91 Raises: 

92 ValueError: If data format is invalid. 

93 

94 """ 

95 if len(data) < 4: 

96 raise ValueError("Cycling Power Measurement data must be at least 4 bytes") 

97 

98 # Parse flags (16-bit) 

99 flags = DataParser.parse_int16(data, 0, signed=False) 

100 

101 # Parse instantaneous power (16-bit signed integer in watts) 

102 instantaneous_power = DataParser.parse_int16(data, 2, signed=True) 

103 

104 offset = 4 

105 

106 # Parse optional fields 

107 pedal_power_balance = None 

108 accumulated_energy = None 

109 cumulative_wheel_revolutions = None 

110 last_wheel_event_time = None 

111 cumulative_crank_revolutions = None 

112 last_crank_event_time = None 

113 

114 # Parse optional pedal power balance (1 byte) if present 

115 if (flags & CyclingPowerMeasurementFlags.PEDAL_POWER_BALANCE_PRESENT) and len(data) >= offset + 1: 

116 pedal_power_balance_raw = data[offset] 

117 # Value UNKNOWN_PEDAL_POWER_BALANCE indicates unknown, otherwise percentage (0-100) 

118 if pedal_power_balance_raw != self.UNKNOWN_PEDAL_POWER_BALANCE: 

119 pedal_power_balance = pedal_power_balance_raw / self.PEDAL_POWER_BALANCE_RESOLUTION # 0.5% resolution 

120 offset += 1 

121 

122 # Parse optional accumulated energy (2 bytes) if present 

123 if (flags & CyclingPowerMeasurementFlags.ACCUMULATED_ENERGY_PRESENT) and len(data) >= offset + 2: 

124 accumulated_energy = DataParser.parse_int16(data, offset, signed=False) # kJ 

125 offset += 2 

126 

127 # Parse optional wheel revolution data (6 bytes total) if present 

128 if (flags & CyclingPowerMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT) and len(data) >= offset + 6: 

129 cumulative_wheel_revolutions = DataParser.parse_int32(data, offset, signed=False) 

130 wheel_event_time_raw = DataParser.parse_int16(data, offset + 4, signed=False) 

131 # Wheel event time is in 1/WHEEL_TIME_RESOLUTION second units 

132 last_wheel_event_time = wheel_event_time_raw / self.WHEEL_TIME_RESOLUTION 

133 offset += 6 

134 

135 # Parse optional crank revolution data (4 bytes total) if present 

136 if (flags & CyclingPowerMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT) and len(data) >= offset + 4: 

137 cumulative_crank_revolutions = DataParser.parse_int16(data, offset, signed=False) 

138 crank_event_time_raw = DataParser.parse_int16(data, offset + 2, signed=False) 

139 # Crank event time is in 1/CRANK_TIME_RESOLUTION second units 

140 last_crank_event_time = crank_event_time_raw / self.CRANK_TIME_RESOLUTION 

141 offset += 4 

142 

143 # Validate flags against Cycling Power Feature if available 

144 if ctx is not None: 

145 feature_data = self.get_context_characteristic(ctx, CyclingPowerFeatureCharacteristic) 

146 if feature_data is not None: 

147 # feature_data is the CyclingPowerFeatureData struct 

148 

149 # Check if reported features are supported 

150 reported_features = int(flags) 

151 

152 # Validate that reported features are actually supported 

153 if ( 

154 reported_features & CyclingPowerMeasurementFlags.PEDAL_POWER_BALANCE_PRESENT 

155 ) and not feature_data.pedal_power_balance_supported: 

156 raise ValueError("Pedal power balance reported but not supported by Cycling Power Feature") 

157 if ( 

158 reported_features & CyclingPowerMeasurementFlags.ACCUMULATED_ENERGY_PRESENT 

159 ) and not feature_data.accumulated_energy_supported: 

160 raise ValueError("Accumulated energy reported but not supported by Cycling Power Feature") 

161 if ( 

162 reported_features & CyclingPowerMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT 

163 ) and not feature_data.wheel_revolution_data_supported: 

164 raise ValueError("Wheel revolution data reported but not supported by Cycling Power Feature") 

165 if ( 

166 reported_features & CyclingPowerMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT 

167 ) and not feature_data.crank_revolution_data_supported: 

168 raise ValueError("Crank revolution data reported but not supported by Cycling Power Feature") 

169 

170 # Create struct with all parsed values 

171 return CyclingPowerMeasurementData( 

172 flags=CyclingPowerMeasurementFlags(flags), 

173 instantaneous_power=instantaneous_power, 

174 pedal_power_balance=pedal_power_balance, 

175 accumulated_energy=accumulated_energy, 

176 cumulative_wheel_revolutions=cumulative_wheel_revolutions, 

177 last_wheel_event_time=last_wheel_event_time, 

178 cumulative_crank_revolutions=cumulative_crank_revolutions, 

179 last_crank_event_time=last_crank_event_time, 

180 ) 

181 

182 def _encode_value(self, data: CyclingPowerMeasurementData) -> bytearray: # pylint: disable=too-many-locals,too-many-branches,too-many-statements # Complex cycling power measurement with numerous optional fields 

183 """Encode cycling power measurement value back to bytes. 

184 

185 Args: 

186 data: CyclingPowerMeasurementData containing cycling power measurement data 

187 

188 Returns: 

189 Encoded bytes representing the power measurement 

190 

191 """ 

192 instantaneous_power = data.instantaneous_power 

193 pedal_power_balance = data.pedal_power_balance 

194 accumulated_energy = data.accumulated_energy 

195 wheel_revolutions = data.cumulative_wheel_revolutions # Updated field name 

196 wheel_event_time = data.last_wheel_event_time 

197 crank_revolutions = data.cumulative_crank_revolutions # Updated field name 

198 crank_event_time = data.last_crank_event_time 

199 

200 # Build flags based on available data 

201 flags = 0 

202 if pedal_power_balance is not None: 

203 flags |= CyclingPowerMeasurementFlags.PEDAL_POWER_BALANCE_PRESENT # Pedal power balance present 

204 if accumulated_energy is not None: 

205 flags |= CyclingPowerMeasurementFlags.ACCUMULATED_ENERGY_PRESENT # Accumulated energy present 

206 if wheel_revolutions is not None and wheel_event_time is not None: 

207 flags |= CyclingPowerMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT # Wheel revolution data present 

208 if crank_revolutions is not None and crank_event_time is not None: 

209 flags |= CyclingPowerMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT # Crank revolution data present 

210 

211 # Validate instantaneous power (sint16 range) 

212 if not SINT16_MIN <= instantaneous_power <= SINT16_MAX: 

213 raise ValueError(f"Instantaneous power {instantaneous_power} W exceeds sint16 range") 

214 

215 # Start with flags and instantaneous power 

216 result = bytearray() 

217 result.extend(DataParser.encode_int16(flags, signed=False)) # Flags (16-bit) 

218 result.extend(DataParser.encode_int16(instantaneous_power, signed=True)) # Power (sint16) 

219 

220 # Add optional fields based on flags 

221 if pedal_power_balance is not None: 

222 balance = int(pedal_power_balance * self.PEDAL_POWER_BALANCE_RESOLUTION) # Convert back to raw value 

223 if not 0 <= balance <= UINT8_MAX: 

224 raise ValueError(f"Pedal power balance {balance} exceeds uint8 range") 

225 result.append(balance) 

226 

227 if accumulated_energy is not None: 

228 energy = int(accumulated_energy) 

229 if not 0 <= energy <= 0xFFFF: 

230 raise ValueError(f"Accumulated energy {energy} exceeds uint16 range") 

231 result.extend(DataParser.encode_int16(energy, signed=False)) 

232 

233 if wheel_revolutions is not None and wheel_event_time is not None: 

234 wheel_rev = int(wheel_revolutions) 

235 wheel_time = round(wheel_event_time * self.WHEEL_TIME_RESOLUTION) 

236 if not 0 <= wheel_rev <= 0xFFFFFFFF: 

237 raise ValueError(f"Wheel revolutions {wheel_rev} exceeds uint32 range") 

238 if not 0 <= wheel_time <= 0xFFFF: 

239 raise ValueError(f"Wheel event time {wheel_time} exceeds uint16 range") 

240 result.extend(DataParser.encode_int32(wheel_rev, signed=False)) 

241 result.extend(DataParser.encode_int16(wheel_time, signed=False)) 

242 

243 if crank_revolutions is not None and crank_event_time is not None: 

244 crank_rev = int(crank_revolutions) 

245 crank_time = round(crank_event_time * self.CRANK_TIME_RESOLUTION) 

246 if not 0 <= crank_rev <= 0xFFFF: 

247 raise ValueError(f"Crank revolutions {crank_rev} exceeds uint16 range") 

248 if not 0 <= crank_time <= 0xFFFF: 

249 raise ValueError(f"Crank event time {crank_time} exceeds uint16 range") 

250 result.extend(DataParser.encode_int16(crank_rev, signed=False)) 

251 result.extend(DataParser.encode_int16(crank_time, signed=False)) 

252 

253 return result