Coverage for src / bluetooth_sig / gatt / characteristics / indoor_bike_data.py: 92%

196 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 11:17 +0000

1"""Indoor Bike Data characteristic implementation. 

2 

3Implements the Indoor Bike Data characteristic (0x2AD2) from the Fitness 

4Machine Service. A 16-bit flags field controls the presence of optional 

5data fields. 

6 

7Bit 0 ("More Data") uses **inverted logic**: when bit 0 is 0 the 

8Instantaneous Speed field IS present; when bit 0 is 1 it is absent. 

9All other bits use normal logic (1 = present). 

10 

11References: 

12 Bluetooth SIG Fitness Machine Service 1.0 

13 org.bluetooth.characteristic.indoor_bike_data (GSS YAML) 

14""" 

15 

16from __future__ import annotations 

17 

18from enum import IntFlag 

19 

20import msgspec 

21 

22from ..constants import SINT16_MAX, SINT16_MIN, UINT8_MAX, UINT16_MAX, UINT24_MAX 

23from ..context import CharacteristicContext 

24from .base import BaseCharacteristic 

25from .fitness_machine_common import ( 

26 MET_RESOLUTION, 

27 decode_elapsed_time, 

28 decode_energy_triplet, 

29 decode_heart_rate, 

30 decode_metabolic_equivalent, 

31 decode_remaining_time, 

32 encode_elapsed_time, 

33 encode_energy_triplet, 

34 encode_heart_rate, 

35 encode_metabolic_equivalent, 

36 encode_remaining_time, 

37) 

38from .utils import DataParser 

39 

40# Speed: M=1, d=-2, b=0 -> actual = raw / 100 km/h 

41_SPEED_RESOLUTION = 100.0 

42 

43# Cadence: M=1, d=0, b=-1 -> actual = raw / 2 rpm 

44_CADENCE_DIVISOR = 2.0 

45 

46# Resistance level: M=1, d=1, b=0 -> actual = raw * 10 

47_RESISTANCE_RESOLUTION = 10.0 

48 

49 

50class IndoorBikeDataFlags(IntFlag): 

51 """Indoor Bike Data flags as per Bluetooth SIG specification. 

52 

53 Bit 0 uses inverted logic: 0 = Instantaneous Speed present, 

54 1 = absent. 

55 """ 

56 

57 MORE_DATA = 0x0001 # Inverted: 0 -> Speed present, 1 -> absent 

58 AVERAGE_SPEED_PRESENT = 0x0002 

59 INSTANTANEOUS_CADENCE_PRESENT = 0x0004 

60 AVERAGE_CADENCE_PRESENT = 0x0008 

61 TOTAL_DISTANCE_PRESENT = 0x0010 

62 RESISTANCE_LEVEL_PRESENT = 0x0020 

63 INSTANTANEOUS_POWER_PRESENT = 0x0040 

64 AVERAGE_POWER_PRESENT = 0x0080 

65 EXPENDED_ENERGY_PRESENT = 0x0100 

66 HEART_RATE_PRESENT = 0x0200 

67 METABOLIC_EQUIVALENT_PRESENT = 0x0400 

68 ELAPSED_TIME_PRESENT = 0x0800 

69 REMAINING_TIME_PRESENT = 0x1000 

70 

71 

72class IndoorBikeData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods,too-many-instance-attributes 

73 """Parsed data from Indoor Bike Data characteristic. 

74 

75 Attributes: 

76 flags: Raw 16-bit flags field. 

77 instantaneous_speed: Instantaneous speed in km/h (0.01 resolution). 

78 average_speed: Average speed in km/h (0.01 resolution). 

79 instantaneous_cadence: Instantaneous cadence in rpm (0.5 resolution). 

80 average_cadence: Average cadence in rpm (0.5 resolution). 

81 total_distance: Total distance in metres (uint24). 

82 resistance_level: Resistance level (unitless, resolution 10). 

83 instantaneous_power: Instantaneous power in watts (signed). 

84 average_power: Average power in watts (signed). 

85 total_energy: Total expended energy in kcal. 

86 energy_per_hour: Expended energy per hour in kcal. 

87 energy_per_minute: Expended energy per minute in kcal. 

88 heart_rate: Heart rate in bpm. 

89 metabolic_equivalent: MET value (0.1 resolution). 

90 elapsed_time: Elapsed time in seconds. 

91 remaining_time: Remaining time in seconds. 

92 

93 """ 

94 

95 flags: IndoorBikeDataFlags 

96 instantaneous_speed: float | None = None 

97 average_speed: float | None = None 

98 instantaneous_cadence: float | None = None 

99 average_cadence: float | None = None 

100 total_distance: int | None = None 

101 resistance_level: float | None = None 

102 instantaneous_power: int | None = None 

103 average_power: int | None = None 

104 total_energy: int | None = None 

105 energy_per_hour: int | None = None 

106 energy_per_minute: int | None = None 

107 heart_rate: int | None = None 

108 metabolic_equivalent: float | None = None 

109 elapsed_time: int | None = None 

110 remaining_time: int | None = None 

111 

112 def __post_init__(self) -> None: 

113 """Validate field ranges.""" 

114 if ( 

115 self.instantaneous_speed is not None 

116 and not 0.0 <= self.instantaneous_speed <= UINT16_MAX / _SPEED_RESOLUTION 

117 ): 

118 raise ValueError( 

119 f"Instantaneous speed must be 0.0-{UINT16_MAX / _SPEED_RESOLUTION}, got {self.instantaneous_speed}" 

120 ) 

121 if self.average_speed is not None and not 0.0 <= self.average_speed <= UINT16_MAX / _SPEED_RESOLUTION: 

122 raise ValueError(f"Average speed must be 0.0-{UINT16_MAX / _SPEED_RESOLUTION}, got {self.average_speed}") 

123 if ( 

124 self.instantaneous_cadence is not None 

125 and not 0.0 <= self.instantaneous_cadence <= UINT16_MAX / _CADENCE_DIVISOR 

126 ): 

127 raise ValueError( 

128 f"Instantaneous cadence must be 0.0-{UINT16_MAX / _CADENCE_DIVISOR}, got {self.instantaneous_cadence}" 

129 ) 

130 if self.average_cadence is not None and not 0.0 <= self.average_cadence <= UINT16_MAX / _CADENCE_DIVISOR: 

131 raise ValueError(f"Average cadence must be 0.0-{UINT16_MAX / _CADENCE_DIVISOR}, got {self.average_cadence}") 

132 if self.total_distance is not None and not 0 <= self.total_distance <= UINT24_MAX: 

133 raise ValueError(f"Total distance must be 0-{UINT24_MAX}, got {self.total_distance}") 

134 if self.resistance_level is not None and not 0.0 <= self.resistance_level <= UINT8_MAX * _RESISTANCE_RESOLUTION: 

135 raise ValueError( 

136 f"Resistance level must be 0.0-{UINT8_MAX * _RESISTANCE_RESOLUTION}, got {self.resistance_level}" 

137 ) 

138 if self.instantaneous_power is not None and not SINT16_MIN <= self.instantaneous_power <= SINT16_MAX: 

139 raise ValueError(f"Instantaneous power must be {SINT16_MIN}-{SINT16_MAX}, got {self.instantaneous_power}") 

140 if self.average_power is not None and not SINT16_MIN <= self.average_power <= SINT16_MAX: 

141 raise ValueError(f"Average power must be {SINT16_MIN}-{SINT16_MAX}, got {self.average_power}") 

142 if self.total_energy is not None and not 0 <= self.total_energy <= UINT16_MAX: 

143 raise ValueError(f"Total energy must be 0-{UINT16_MAX}, got {self.total_energy}") 

144 if self.energy_per_hour is not None and not 0 <= self.energy_per_hour <= UINT16_MAX: 

145 raise ValueError(f"Energy per hour must be 0-{UINT16_MAX}, got {self.energy_per_hour}") 

146 if self.energy_per_minute is not None and not 0 <= self.energy_per_minute <= UINT8_MAX: 

147 raise ValueError(f"Energy per minute must be 0-{UINT8_MAX}, got {self.energy_per_minute}") 

148 if self.heart_rate is not None and not 0 <= self.heart_rate <= UINT8_MAX: 

149 raise ValueError(f"Heart rate must be 0-{UINT8_MAX}, got {self.heart_rate}") 

150 if self.metabolic_equivalent is not None and not 0.0 <= self.metabolic_equivalent <= UINT8_MAX / MET_RESOLUTION: 

151 raise ValueError( 

152 f"Metabolic equivalent must be 0.0-{UINT8_MAX / MET_RESOLUTION}, got {self.metabolic_equivalent}" 

153 ) 

154 if self.elapsed_time is not None and not 0 <= self.elapsed_time <= UINT16_MAX: 

155 raise ValueError(f"Elapsed time must be 0-{UINT16_MAX}, got {self.elapsed_time}") 

156 if self.remaining_time is not None and not 0 <= self.remaining_time <= UINT16_MAX: 

157 raise ValueError(f"Remaining time must be 0-{UINT16_MAX}, got {self.remaining_time}") 

158 

159 

160class IndoorBikeDataCharacteristic(BaseCharacteristic[IndoorBikeData]): 

161 """Indoor Bike Data characteristic (0x2AD2). 

162 

163 Used in the Fitness Machine Service to transmit indoor bike workout 

164 data. A 16-bit flags field controls which optional fields are present. 

165 

166 Flag-bit assignments (from GSS YAML): 

167 Bit 0: More Data -- **inverted**: 0 -> Inst. Speed present, 1 -> absent 

168 Bit 1: Average Speed present 

169 Bit 2: Instantaneous Cadence present 

170 Bit 3: Average Cadence present 

171 Bit 4: Total Distance present 

172 Bit 5: Resistance Level present 

173 Bit 6: Instantaneous Power present 

174 Bit 7: Average Power present 

175 Bit 8: Expended Energy present (gates triplet: total + /hr + /min) 

176 Bit 9: Heart Rate present 

177 Bit 10: Metabolic Equivalent present 

178 Bit 11: Elapsed Time present 

179 Bit 12: Remaining Time present 

180 Bits 13-15: Reserved for Future Use 

181 

182 """ 

183 

184 expected_type = IndoorBikeData 

185 min_length: int = 2 # Flags only (all optional fields absent) 

186 allow_variable_length: bool = True 

187 

188 def _decode_value( 

189 self, 

190 data: bytearray, 

191 ctx: CharacteristicContext | None = None, 

192 *, 

193 validate: bool = True, 

194 ) -> IndoorBikeData: 

195 """Parse Indoor Bike Data from raw BLE bytes. 

196 

197 Args: 

198 data: Raw bytearray from BLE characteristic. 

199 ctx: Optional context (unused). 

200 validate: Whether to validate ranges. 

201 

202 Returns: 

203 IndoorBikeData with all present fields populated. 

204 

205 """ 

206 flags = IndoorBikeDataFlags(DataParser.parse_int16(data, 0, signed=False)) 

207 offset = 2 

208 

209 # Bit 0 -- inverted: Instantaneous Speed present when bit is NOT set 

210 instantaneous_speed = None 

211 if not (flags & IndoorBikeDataFlags.MORE_DATA) and len(data) >= offset + 2: 

212 raw_speed = DataParser.parse_int16(data, offset, signed=False) 

213 instantaneous_speed = raw_speed / _SPEED_RESOLUTION 

214 offset += 2 

215 

216 # Bit 1 -- Average Speed 

217 average_speed = None 

218 if (flags & IndoorBikeDataFlags.AVERAGE_SPEED_PRESENT) and len(data) >= offset + 2: 

219 raw_avg_speed = DataParser.parse_int16(data, offset, signed=False) 

220 average_speed = raw_avg_speed / _SPEED_RESOLUTION 

221 offset += 2 

222 

223 # Bit 2 -- Instantaneous Cadence 

224 instantaneous_cadence = None 

225 if (flags & IndoorBikeDataFlags.INSTANTANEOUS_CADENCE_PRESENT) and len(data) >= offset + 2: 

226 raw_cadence = DataParser.parse_int16(data, offset, signed=False) 

227 instantaneous_cadence = raw_cadence / _CADENCE_DIVISOR 

228 offset += 2 

229 

230 # Bit 3 -- Average Cadence 

231 average_cadence = None 

232 if (flags & IndoorBikeDataFlags.AVERAGE_CADENCE_PRESENT) and len(data) >= offset + 2: 

233 raw_avg_cadence = DataParser.parse_int16(data, offset, signed=False) 

234 average_cadence = raw_avg_cadence / _CADENCE_DIVISOR 

235 offset += 2 

236 

237 # Bit 4 -- Total Distance (uint24) 

238 total_distance = None 

239 if (flags & IndoorBikeDataFlags.TOTAL_DISTANCE_PRESENT) and len(data) >= offset + 3: 

240 total_distance = DataParser.parse_int24(data, offset, signed=False) 

241 offset += 3 

242 

243 # Bit 5 -- Resistance Level 

244 resistance_level = None 

245 if (flags & IndoorBikeDataFlags.RESISTANCE_LEVEL_PRESENT) and len(data) >= offset + 1: 

246 raw_resistance = DataParser.parse_int8(data, offset, signed=False) 

247 resistance_level = raw_resistance * _RESISTANCE_RESOLUTION 

248 offset += 1 

249 

250 # Bit 6 -- Instantaneous Power (sint16) 

251 instantaneous_power = None 

252 if (flags & IndoorBikeDataFlags.INSTANTANEOUS_POWER_PRESENT) and len(data) >= offset + 2: 

253 instantaneous_power = DataParser.parse_int16(data, offset, signed=True) 

254 offset += 2 

255 

256 # Bit 7 -- Average Power (sint16) 

257 average_power = None 

258 if (flags & IndoorBikeDataFlags.AVERAGE_POWER_PRESENT) and len(data) >= offset + 2: 

259 average_power = DataParser.parse_int16(data, offset, signed=True) 

260 offset += 2 

261 

262 # Bit 8 -- Energy triplet (Total + Per Hour + Per Minute) 

263 total_energy = None 

264 energy_per_hour = None 

265 energy_per_minute = None 

266 if flags & IndoorBikeDataFlags.EXPENDED_ENERGY_PRESENT: 

267 total_energy, energy_per_hour, energy_per_minute, offset = decode_energy_triplet(data, offset) 

268 

269 # Bit 9 -- Heart Rate 

270 heart_rate = None 

271 if flags & IndoorBikeDataFlags.HEART_RATE_PRESENT: 

272 heart_rate, offset = decode_heart_rate(data, offset) 

273 

274 # Bit 10 -- Metabolic Equivalent 

275 metabolic_equivalent = None 

276 if flags & IndoorBikeDataFlags.METABOLIC_EQUIVALENT_PRESENT: 

277 metabolic_equivalent, offset = decode_metabolic_equivalent(data, offset) 

278 

279 # Bit 11 -- Elapsed Time 

280 elapsed_time = None 

281 if flags & IndoorBikeDataFlags.ELAPSED_TIME_PRESENT: 

282 elapsed_time, offset = decode_elapsed_time(data, offset) 

283 

284 # Bit 12 -- Remaining Time 

285 remaining_time = None 

286 if flags & IndoorBikeDataFlags.REMAINING_TIME_PRESENT: 

287 remaining_time, offset = decode_remaining_time(data, offset) 

288 

289 return IndoorBikeData( 

290 flags=flags, 

291 instantaneous_speed=instantaneous_speed, 

292 average_speed=average_speed, 

293 instantaneous_cadence=instantaneous_cadence, 

294 average_cadence=average_cadence, 

295 total_distance=total_distance, 

296 resistance_level=resistance_level, 

297 instantaneous_power=instantaneous_power, 

298 average_power=average_power, 

299 total_energy=total_energy, 

300 energy_per_hour=energy_per_hour, 

301 energy_per_minute=energy_per_minute, 

302 heart_rate=heart_rate, 

303 metabolic_equivalent=metabolic_equivalent, 

304 elapsed_time=elapsed_time, 

305 remaining_time=remaining_time, 

306 ) 

307 

308 def _encode_value(self, data: IndoorBikeData) -> bytearray: # noqa: PLR0912 

309 """Encode IndoorBikeData back to BLE bytes. 

310 

311 Reconstructs flags from present fields so round-trip encoding 

312 preserves the original wire format. 

313 

314 Args: 

315 data: IndoorBikeData instance. 

316 

317 Returns: 

318 Encoded bytearray matching the BLE wire format. 

319 

320 """ 

321 flags = IndoorBikeDataFlags(0) 

322 

323 # Bit 0 -- inverted: set MORE_DATA when Speed is absent 

324 if data.instantaneous_speed is None: 

325 flags |= IndoorBikeDataFlags.MORE_DATA 

326 if data.average_speed is not None: 

327 flags |= IndoorBikeDataFlags.AVERAGE_SPEED_PRESENT 

328 if data.instantaneous_cadence is not None: 

329 flags |= IndoorBikeDataFlags.INSTANTANEOUS_CADENCE_PRESENT 

330 if data.average_cadence is not None: 

331 flags |= IndoorBikeDataFlags.AVERAGE_CADENCE_PRESENT 

332 if data.total_distance is not None: 

333 flags |= IndoorBikeDataFlags.TOTAL_DISTANCE_PRESENT 

334 if data.resistance_level is not None: 

335 flags |= IndoorBikeDataFlags.RESISTANCE_LEVEL_PRESENT 

336 if data.instantaneous_power is not None: 

337 flags |= IndoorBikeDataFlags.INSTANTANEOUS_POWER_PRESENT 

338 if data.average_power is not None: 

339 flags |= IndoorBikeDataFlags.AVERAGE_POWER_PRESENT 

340 if data.total_energy is not None: 

341 flags |= IndoorBikeDataFlags.EXPENDED_ENERGY_PRESENT 

342 if data.heart_rate is not None: 

343 flags |= IndoorBikeDataFlags.HEART_RATE_PRESENT 

344 if data.metabolic_equivalent is not None: 

345 flags |= IndoorBikeDataFlags.METABOLIC_EQUIVALENT_PRESENT 

346 if data.elapsed_time is not None: 

347 flags |= IndoorBikeDataFlags.ELAPSED_TIME_PRESENT 

348 if data.remaining_time is not None: 

349 flags |= IndoorBikeDataFlags.REMAINING_TIME_PRESENT 

350 

351 result = DataParser.encode_int16(int(flags), signed=False) 

352 

353 if data.instantaneous_speed is not None: 

354 raw_speed = round(data.instantaneous_speed * _SPEED_RESOLUTION) 

355 result.extend(DataParser.encode_int16(raw_speed, signed=False)) 

356 if data.average_speed is not None: 

357 raw_avg_speed = round(data.average_speed * _SPEED_RESOLUTION) 

358 result.extend(DataParser.encode_int16(raw_avg_speed, signed=False)) 

359 if data.instantaneous_cadence is not None: 

360 raw_cadence = round(data.instantaneous_cadence * _CADENCE_DIVISOR) 

361 result.extend(DataParser.encode_int16(raw_cadence, signed=False)) 

362 if data.average_cadence is not None: 

363 raw_avg_cadence = round(data.average_cadence * _CADENCE_DIVISOR) 

364 result.extend(DataParser.encode_int16(raw_avg_cadence, signed=False)) 

365 if data.total_distance is not None: 

366 result.extend(DataParser.encode_int24(data.total_distance, signed=False)) 

367 if data.resistance_level is not None: 

368 raw_resistance = round(data.resistance_level / _RESISTANCE_RESOLUTION) 

369 result.extend(DataParser.encode_int8(raw_resistance, signed=False)) 

370 if data.instantaneous_power is not None: 

371 result.extend(DataParser.encode_int16(data.instantaneous_power, signed=True)) 

372 if data.average_power is not None: 

373 result.extend(DataParser.encode_int16(data.average_power, signed=True)) 

374 if data.total_energy is not None: 

375 result.extend(encode_energy_triplet(data.total_energy, data.energy_per_hour, data.energy_per_minute)) 

376 if data.heart_rate is not None: 

377 result.extend(encode_heart_rate(data.heart_rate)) 

378 if data.metabolic_equivalent is not None: 

379 result.extend(encode_metabolic_equivalent(data.metabolic_equivalent)) 

380 if data.elapsed_time is not None: 

381 result.extend(encode_elapsed_time(data.elapsed_time)) 

382 if data.remaining_time is not None: 

383 result.extend(encode_remaining_time(data.remaining_time)) 

384 

385 return result