Coverage for src/bluetooth_sig/gatt/characteristics/cycling_power_measurement.py: 85%

137 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-30 00:10 +0000

1"""Cycling Power Measurement characteristic implementation.""" 

2 

3from __future__ import annotations 

4 

5from enum import IntFlag 

6 

7import msgspec 

8 

9from ..constants import ( 

10 SINT16_MAX, 

11 SINT16_MIN, 

12 UINT8_MAX, 

13 UINT16_MAX, 

14) 

15from ..context import CharacteristicContext 

16from .base import BaseCharacteristic 

17from .cycling_power_feature import CyclingPowerFeatureCharacteristic, CyclingPowerFeatureData 

18from .utils import DataParser 

19 

20 

21class CyclingPowerMeasurementFlags(IntFlag): 

22 """Cycling Power Measurement Flags as per Bluetooth SIG specification.""" 

23 

24 PEDAL_POWER_BALANCE_PRESENT = 0x0001 

25 PEDAL_POWER_BALANCE_REFERENCE = 0x0002 # 0 = Unknown, 1 = Left 

26 ACCUMULATED_TORQUE_PRESENT = 0x0004 

27 ACCUMULATED_ENERGY_PRESENT = 0x0008 

28 WHEEL_REVOLUTION_DATA_PRESENT = 0x0010 

29 CRANK_REVOLUTION_DATA_PRESENT = 0x0020 

30 EXTREME_FORCE_MAGNITUDES_PRESENT = 0x0040 

31 EXTREME_TORQUE_MAGNITUDES_PRESENT = 0x0080 

32 EXTREME_ANGLES_PRESENT = 0x0100 

33 TOP_DEAD_SPOT_ANGLE_PRESENT = 0x0200 

34 BOTTOM_DEAD_SPOT_ANGLE_PRESENT = 0x0400 

35 ACCUMULATED_ENERGY_RESERVED = 0x0800 

36 

37 

38class CyclingPowerMeasurementData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods,too-many-instance-attributes 

39 """Parsed data from Cycling Power Measurement characteristic.""" 

40 

41 flags: CyclingPowerMeasurementFlags 

42 instantaneous_power: int # Watts 

43 pedal_power_balance: float | None = None # Percentage (0.5% resolution) 

44 accumulated_energy: int | None = None # kJ 

45 cumulative_wheel_revolutions: int | None = None # Changed to match decode_value 

46 last_wheel_event_time: float | None = None # seconds 

47 cumulative_crank_revolutions: int | None = None # Changed to match decode_value 

48 last_crank_event_time: float | None = None # seconds 

49 

50 def __post_init__(self) -> None: 

51 """Validate cycling power measurement data.""" 

52 flags_value = int(self.flags) 

53 if not 0 <= flags_value <= UINT16_MAX: 

54 raise ValueError("Flags must be a uint16 value (0-UINT16_MAX)") 

55 if not 0 <= self.instantaneous_power <= UINT16_MAX: 

56 raise ValueError("Instantaneous power must be a uint16 value (0-UINT16_MAX)") 

57 

58 

59class CyclingPowerMeasurementCharacteristic(BaseCharacteristic): 

60 """Cycling Power Measurement characteristic (0x2A63). 

61 

62 Used to transmit cycling power measurement data including 

63 instantaneous power, pedal power balance, accumulated energy, and 

64 revolution data. 

65 """ 

66 

67 # Special values 

68 UNKNOWN_PEDAL_POWER_BALANCE = 0xFF # Value indicating unknown power balance 

69 

70 # Time resolution constants 

71 WHEEL_TIME_RESOLUTION = 2048.0 # 1/2048 second resolution 

72 CRANK_TIME_RESOLUTION = 1024.0 # 1/1024 second resolution 

73 PEDAL_POWER_BALANCE_RESOLUTION = 2.0 # 0.5% resolution 

74 

75 _manual_unit: str = "W" # Watts unit for power measurement 

76 

77 def decode_value(self, data: bytearray, ctx: CharacteristicContext | None = None) -> CyclingPowerMeasurementData: # pylint: disable=too-many-locals # Complex parsing with many optional fields 

78 """Parse cycling power measurement data according to Bluetooth specification. 

79 

80 Format: Flags(2) + Instantaneous Power(2) + [Pedal Power Balance(1)] + 

81 [Accumulated Energy(2)] + [Wheel Revolutions(4)] + [Last Wheel Event Time(2)] + 

82 [Crank Revolutions(2)] + [Last Crank Event Time(2)] 

83 

84 Args: 

85 data: Raw bytearray from BLE characteristic. 

86 ctx: Optional CharacteristicContext providing surrounding context (may be None). 

87 

88 Returns: 

89 CyclingPowerMeasurementData containing parsed power measurement data. 

90 

91 Raises: 

92 ValueError: If data format is invalid. 

93 

94 """ 

95 if len(data) < 4: 

96 raise ValueError("Cycling Power Measurement data must be at least 4 bytes") 

97 

98 # Parse flags (16-bit) 

99 flags = DataParser.parse_int16(data, 0, signed=False) 

100 

101 # Parse instantaneous power (16-bit signed integer in watts) 

102 instantaneous_power = DataParser.parse_int16(data, 2, signed=True) 

103 

104 offset = 4 

105 

106 # Parse optional fields 

107 pedal_power_balance = None 

108 accumulated_energy = None 

109 cumulative_wheel_revolutions = None 

110 last_wheel_event_time = None 

111 cumulative_crank_revolutions = None 

112 last_crank_event_time = None 

113 

114 # Parse optional pedal power balance (1 byte) if present 

115 if (flags & CyclingPowerMeasurementFlags.PEDAL_POWER_BALANCE_PRESENT) and len(data) >= offset + 1: 

116 pedal_power_balance_raw = data[offset] 

117 # Value UNKNOWN_PEDAL_POWER_BALANCE indicates unknown, otherwise percentage (0-100) 

118 if pedal_power_balance_raw != self.UNKNOWN_PEDAL_POWER_BALANCE: 

119 pedal_power_balance = pedal_power_balance_raw / self.PEDAL_POWER_BALANCE_RESOLUTION # 0.5% resolution 

120 offset += 1 

121 

122 # Parse optional accumulated energy (2 bytes) if present 

123 if (flags & CyclingPowerMeasurementFlags.ACCUMULATED_ENERGY_PRESENT) and len(data) >= offset + 2: 

124 accumulated_energy = DataParser.parse_int16(data, offset, signed=False) # kJ 

125 offset += 2 

126 

127 # Parse optional wheel revolution data (6 bytes total) if present 

128 if (flags & CyclingPowerMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT) and len(data) >= offset + 6: 

129 cumulative_wheel_revolutions = DataParser.parse_int32(data, offset, signed=False) 

130 wheel_event_time_raw = DataParser.parse_int16(data, offset + 4, signed=False) 

131 # Wheel event time is in 1/WHEEL_TIME_RESOLUTION second units 

132 last_wheel_event_time = wheel_event_time_raw / self.WHEEL_TIME_RESOLUTION 

133 offset += 6 

134 

135 # Parse optional crank revolution data (4 bytes total) if present 

136 if (flags & CyclingPowerMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT) and len(data) >= offset + 4: 

137 cumulative_crank_revolutions = DataParser.parse_int16(data, offset, signed=False) 

138 crank_event_time_raw = DataParser.parse_int16(data, offset + 2, signed=False) 

139 # Crank event time is in 1/CRANK_TIME_RESOLUTION second units 

140 last_crank_event_time = crank_event_time_raw / self.CRANK_TIME_RESOLUTION 

141 offset += 4 

142 

143 # Validate flags against Cycling Power Feature if available 

144 if ctx is not None: 

145 feature_char = self.get_context_characteristic(ctx, CyclingPowerFeatureCharacteristic) 

146 if feature_char and feature_char.parse_success and feature_char.value is not None: 

147 # feature_char.value is the CyclingPowerFeatureData struct 

148 feature_data: CyclingPowerFeatureData = feature_char.value 

149 

150 # Check if reported features are supported 

151 reported_features = int(flags) 

152 

153 # Validate that reported features are actually supported 

154 if ( 

155 reported_features & CyclingPowerMeasurementFlags.PEDAL_POWER_BALANCE_PRESENT 

156 ) and not feature_data.pedal_power_balance_supported: 

157 raise ValueError("Pedal power balance reported but not supported by Cycling Power Feature") 

158 if ( 

159 reported_features & CyclingPowerMeasurementFlags.ACCUMULATED_ENERGY_PRESENT 

160 ) and not feature_data.accumulated_energy_supported: 

161 raise ValueError("Accumulated energy reported but not supported by Cycling Power Feature") 

162 if ( 

163 reported_features & CyclingPowerMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT 

164 ) and not feature_data.wheel_revolution_data_supported: 

165 raise ValueError("Wheel revolution data reported but not supported by Cycling Power Feature") 

166 if ( 

167 reported_features & CyclingPowerMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT 

168 ) and not feature_data.crank_revolution_data_supported: 

169 raise ValueError("Crank revolution data reported but not supported by Cycling Power Feature") 

170 

171 # Create struct with all parsed values 

172 return CyclingPowerMeasurementData( 

173 flags=CyclingPowerMeasurementFlags(flags), 

174 instantaneous_power=instantaneous_power, 

175 pedal_power_balance=pedal_power_balance, 

176 accumulated_energy=accumulated_energy, 

177 cumulative_wheel_revolutions=cumulative_wheel_revolutions, 

178 last_wheel_event_time=last_wheel_event_time, 

179 cumulative_crank_revolutions=cumulative_crank_revolutions, 

180 last_crank_event_time=last_crank_event_time, 

181 ) 

182 

183 def encode_value(self, data: CyclingPowerMeasurementData) -> bytearray: # pylint: disable=too-many-locals,too-many-branches,too-many-statements # Complex cycling power measurement with numerous optional fields 

184 """Encode cycling power measurement value back to bytes. 

185 

186 Args: 

187 data: CyclingPowerMeasurementData containing cycling power measurement data 

188 

189 Returns: 

190 Encoded bytes representing the power measurement 

191 

192 """ 

193 instantaneous_power = data.instantaneous_power 

194 pedal_power_balance = data.pedal_power_balance 

195 accumulated_energy = data.accumulated_energy 

196 wheel_revolutions = data.cumulative_wheel_revolutions # Updated field name 

197 wheel_event_time = data.last_wheel_event_time 

198 crank_revolutions = data.cumulative_crank_revolutions # Updated field name 

199 crank_event_time = data.last_crank_event_time 

200 

201 # Build flags based on available data 

202 flags = 0 

203 if pedal_power_balance is not None: 

204 flags |= CyclingPowerMeasurementFlags.PEDAL_POWER_BALANCE_PRESENT # Pedal power balance present 

205 if accumulated_energy is not None: 

206 flags |= CyclingPowerMeasurementFlags.ACCUMULATED_ENERGY_PRESENT # Accumulated energy present 

207 if wheel_revolutions is not None and wheel_event_time is not None: 

208 flags |= CyclingPowerMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT # Wheel revolution data present 

209 if crank_revolutions is not None and crank_event_time is not None: 

210 flags |= CyclingPowerMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT # Crank revolution data present 

211 

212 # Validate instantaneous power (sint16 range) 

213 if not SINT16_MIN <= instantaneous_power <= SINT16_MAX: 

214 raise ValueError(f"Instantaneous power {instantaneous_power} W exceeds sint16 range") 

215 

216 # Start with flags and instantaneous power 

217 result = bytearray() 

218 result.extend(DataParser.encode_int16(flags, signed=False)) # Flags (16-bit) 

219 result.extend(DataParser.encode_int16(instantaneous_power, signed=True)) # Power (sint16) 

220 

221 # Add optional fields based on flags 

222 if pedal_power_balance is not None: 

223 balance = int(pedal_power_balance * self.PEDAL_POWER_BALANCE_RESOLUTION) # Convert back to raw value 

224 if not 0 <= balance <= UINT8_MAX: 

225 raise ValueError(f"Pedal power balance {balance} exceeds uint8 range") 

226 result.append(balance) 

227 

228 if accumulated_energy is not None: 

229 energy = int(accumulated_energy) 

230 if not 0 <= energy <= 0xFFFF: 

231 raise ValueError(f"Accumulated energy {energy} exceeds uint16 range") 

232 result.extend(DataParser.encode_int16(energy, signed=False)) 

233 

234 if wheel_revolutions is not None and wheel_event_time is not None: 

235 wheel_rev = int(wheel_revolutions) 

236 wheel_time = round(wheel_event_time * self.WHEEL_TIME_RESOLUTION) 

237 if not 0 <= wheel_rev <= 0xFFFFFFFF: 

238 raise ValueError(f"Wheel revolutions {wheel_rev} exceeds uint32 range") 

239 if not 0 <= wheel_time <= 0xFFFF: 

240 raise ValueError(f"Wheel event time {wheel_time} exceeds uint16 range") 

241 result.extend(DataParser.encode_int32(wheel_rev, signed=False)) 

242 result.extend(DataParser.encode_int16(wheel_time, signed=False)) 

243 

244 if crank_revolutions is not None and crank_event_time is not None: 

245 crank_rev = int(crank_revolutions) 

246 crank_time = round(crank_event_time * self.CRANK_TIME_RESOLUTION) 

247 if not 0 <= crank_rev <= 0xFFFF: 

248 raise ValueError(f"Crank revolutions {crank_rev} exceeds uint16 range") 

249 if not 0 <= crank_time <= 0xFFFF: 

250 raise ValueError(f"Crank event time {crank_time} exceeds uint16 range") 

251 result.extend(DataParser.encode_int16(crank_rev, signed=False)) 

252 result.extend(DataParser.encode_int16(crank_time, signed=False)) 

253 

254 return result