Coverage for src / bluetooth_sig / gatt / characteristics / treadmill_data.py: 92%

221 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 11:17 +0000

1"""Treadmill Data characteristic implementation. 

2 

3Implements the Treadmill Data characteristic (0x2ACD) from the Fitness Machine 

4Service. A 16-bit flags field controls the presence of optional data fields. 

5 

6Bit 0 ("More Data") uses **inverted logic**: when bit 0 is 0 the 

7Instantaneous Speed field IS present; when bit 0 is 1 it is absent. 

8All other bits use normal logic (1 = present). 

9 

10References: 

11 Bluetooth SIG Fitness Machine Service 1.0 

12 org.bluetooth.characteristic.treadmill_data (GSS YAML) 

13""" 

14 

15from __future__ import annotations 

16 

17from enum import IntFlag 

18 

19import msgspec 

20 

21from ..constants import SINT16_MAX, SINT16_MIN, UINT8_MAX, UINT16_MAX, UINT24_MAX 

22from ..context import CharacteristicContext 

23from .base import BaseCharacteristic 

24from .fitness_machine_common import ( 

25 MET_RESOLUTION, 

26 decode_elapsed_time, 

27 decode_energy_triplet, 

28 decode_heart_rate, 

29 decode_metabolic_equivalent, 

30 decode_remaining_time, 

31 encode_elapsed_time, 

32 encode_energy_triplet, 

33 encode_heart_rate, 

34 encode_metabolic_equivalent, 

35 encode_remaining_time, 

36) 

37from .utils import DataParser 

38 

39# Speed: M=1, d=-2, b=0 -> actual = raw / 100 km/h 

40_SPEED_RESOLUTION = 100.0 

41 

42# Inclination: M=1, d=-1, b=0 -> actual = raw / 10 % 

43# Ramp Angle: M=1, d=-1, b=0 -> actual = raw / 10 degrees 

44# Elevation: M=1, d=-1, b=0 -> actual = raw / 10 metres 

45_TENTH_RESOLUTION = 10.0 

46 

47 

48class TreadmillDataFlags(IntFlag): 

49 """Treadmill Data flags as per Bluetooth SIG specification. 

50 

51 Bit 0 uses inverted logic: 0 = Instantaneous Speed present, 1 = absent. 

52 """ 

53 

54 MORE_DATA = 0x0001 # Inverted: 0 -> Speed present, 1 -> absent 

55 AVERAGE_SPEED_PRESENT = 0x0002 

56 TOTAL_DISTANCE_PRESENT = 0x0004 

57 INCLINATION_AND_RAMP_PRESENT = 0x0008 

58 ELEVATION_GAIN_PRESENT = 0x0010 

59 INSTANTANEOUS_PACE_PRESENT = 0x0020 

60 AVERAGE_PACE_PRESENT = 0x0040 

61 EXPENDED_ENERGY_PRESENT = 0x0080 

62 HEART_RATE_PRESENT = 0x0100 

63 METABOLIC_EQUIVALENT_PRESENT = 0x0200 

64 ELAPSED_TIME_PRESENT = 0x0400 

65 REMAINING_TIME_PRESENT = 0x0800 

66 FORCE_AND_POWER_PRESENT = 0x1000 

67 

68 

69class TreadmillData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods,too-many-instance-attributes 

70 """Parsed data from Treadmill Data characteristic. 

71 

72 Attributes: 

73 flags: Raw 16-bit flags field. 

74 instantaneous_speed: Instantaneous belt speed in km/h (0.01 resolution). 

75 average_speed: Average speed in km/h (0.01 resolution). 

76 total_distance: Total distance in metres (uint24). 

77 inclination: Current inclination in % (0.1 resolution, signed). 

78 ramp_angle_setting: Current ramp angle in degrees (0.1 resolution, signed). 

79 positive_elevation_gain: Positive elevation gain in metres (0.1 resolution). 

80 negative_elevation_gain: Negative elevation gain in metres (0.1 resolution). 

81 instantaneous_pace: Instantaneous pace in seconds per 500 m. 

82 average_pace: Average pace in seconds per 500 m. 

83 total_energy: Total expended energy in kcal. 

84 energy_per_hour: Expended energy per hour in kcal. 

85 energy_per_minute: Expended energy per minute in kcal. 

86 heart_rate: Heart rate in bpm. 

87 metabolic_equivalent: MET value (0.1 resolution). 

88 elapsed_time: Elapsed time in seconds. 

89 remaining_time: Remaining time in seconds. 

90 force_on_belt: Force on belt in newtons (signed). 

91 power_output: Power output in watts (signed). 

92 

93 """ 

94 

95 flags: TreadmillDataFlags 

96 instantaneous_speed: float | None = None 

97 average_speed: float | None = None 

98 total_distance: int | None = None 

99 inclination: float | None = None 

100 ramp_angle_setting: float | None = None 

101 positive_elevation_gain: float | None = None 

102 negative_elevation_gain: float | None = None 

103 instantaneous_pace: int | None = None 

104 average_pace: int | None = None 

105 total_energy: int | None = None 

106 energy_per_hour: int | None = None 

107 energy_per_minute: int | None = None 

108 heart_rate: int | None = None 

109 metabolic_equivalent: float | None = None 

110 elapsed_time: int | None = None 

111 remaining_time: int | None = None 

112 force_on_belt: int | None = None 

113 power_output: int | None = None 

114 

115 def __post_init__(self) -> None: 

116 """Validate field ranges.""" 

117 if ( 

118 self.instantaneous_speed is not None 

119 and not 0.0 <= self.instantaneous_speed <= UINT16_MAX / _SPEED_RESOLUTION 

120 ): 

121 raise ValueError( 

122 f"Instantaneous speed must be 0.0-{UINT16_MAX / _SPEED_RESOLUTION}, got {self.instantaneous_speed}" 

123 ) 

124 if self.average_speed is not None and not 0.0 <= self.average_speed <= UINT16_MAX / _SPEED_RESOLUTION: 

125 raise ValueError(f"Average speed must be 0.0-{UINT16_MAX / _SPEED_RESOLUTION}, got {self.average_speed}") 

126 if self.total_distance is not None and not 0 <= self.total_distance <= UINT24_MAX: 

127 raise ValueError(f"Total distance must be 0-{UINT24_MAX}, got {self.total_distance}") 

128 if ( 

129 self.inclination is not None 

130 and not SINT16_MIN / _TENTH_RESOLUTION <= self.inclination <= SINT16_MAX / _TENTH_RESOLUTION 

131 ): 

132 raise ValueError( 

133 f"Inclination must be {SINT16_MIN / _TENTH_RESOLUTION}-{SINT16_MAX / _TENTH_RESOLUTION}, " 

134 f"got {self.inclination}" 

135 ) 

136 if ( 

137 self.ramp_angle_setting is not None 

138 and not SINT16_MIN / _TENTH_RESOLUTION <= self.ramp_angle_setting <= SINT16_MAX / _TENTH_RESOLUTION 

139 ): 

140 raise ValueError( 

141 f"Ramp angle must be {SINT16_MIN / _TENTH_RESOLUTION}-{SINT16_MAX / _TENTH_RESOLUTION}, " 

142 f"got {self.ramp_angle_setting}" 

143 ) 

144 if ( 

145 self.positive_elevation_gain is not None 

146 and not 0.0 <= self.positive_elevation_gain <= UINT16_MAX / _TENTH_RESOLUTION 

147 ): 

148 raise ValueError( 

149 f"Positive elevation must be 0.0-{UINT16_MAX / _TENTH_RESOLUTION}, got {self.positive_elevation_gain}" 

150 ) 

151 if ( 

152 self.negative_elevation_gain is not None 

153 and not 0.0 <= self.negative_elevation_gain <= UINT16_MAX / _TENTH_RESOLUTION 

154 ): 

155 raise ValueError( 

156 f"Negative elevation must be 0.0-{UINT16_MAX / _TENTH_RESOLUTION}, got {self.negative_elevation_gain}" 

157 ) 

158 if self.instantaneous_pace is not None and not 0 <= self.instantaneous_pace <= UINT16_MAX: 

159 raise ValueError(f"Instantaneous pace must be 0-{UINT16_MAX}, got {self.instantaneous_pace}") 

160 if self.average_pace is not None and not 0 <= self.average_pace <= UINT16_MAX: 

161 raise ValueError(f"Average pace must be 0-{UINT16_MAX}, got {self.average_pace}") 

162 if self.total_energy is not None and not 0 <= self.total_energy <= UINT16_MAX: 

163 raise ValueError(f"Total energy must be 0-{UINT16_MAX}, got {self.total_energy}") 

164 if self.energy_per_hour is not None and not 0 <= self.energy_per_hour <= UINT16_MAX: 

165 raise ValueError(f"Energy per hour must be 0-{UINT16_MAX}, got {self.energy_per_hour}") 

166 if self.energy_per_minute is not None and not 0 <= self.energy_per_minute <= UINT8_MAX: 

167 raise ValueError(f"Energy per minute must be 0-{UINT8_MAX}, got {self.energy_per_minute}") 

168 if self.heart_rate is not None and not 0 <= self.heart_rate <= UINT8_MAX: 

169 raise ValueError(f"Heart rate must be 0-{UINT8_MAX}, got {self.heart_rate}") 

170 if self.metabolic_equivalent is not None and not 0.0 <= self.metabolic_equivalent <= UINT8_MAX / MET_RESOLUTION: 

171 raise ValueError( 

172 f"Metabolic equivalent must be 0.0-{UINT8_MAX / MET_RESOLUTION}, got {self.metabolic_equivalent}" 

173 ) 

174 if self.elapsed_time is not None and not 0 <= self.elapsed_time <= UINT16_MAX: 

175 raise ValueError(f"Elapsed time must be 0-{UINT16_MAX}, got {self.elapsed_time}") 

176 if self.remaining_time is not None and not 0 <= self.remaining_time <= UINT16_MAX: 

177 raise ValueError(f"Remaining time must be 0-{UINT16_MAX}, got {self.remaining_time}") 

178 if self.force_on_belt is not None and not SINT16_MIN <= self.force_on_belt <= SINT16_MAX: 

179 raise ValueError(f"Force on belt must be {SINT16_MIN}-{SINT16_MAX}, got {self.force_on_belt}") 

180 if self.power_output is not None and not SINT16_MIN <= self.power_output <= SINT16_MAX: 

181 raise ValueError(f"Power output must be {SINT16_MIN}-{SINT16_MAX}, got {self.power_output}") 

182 

183 

184class TreadmillDataCharacteristic(BaseCharacteristic[TreadmillData]): 

185 """Treadmill Data characteristic (0x2ACD). 

186 

187 Used in the Fitness Machine Service to transmit treadmill workout data. 

188 A 16-bit flags field controls which optional fields are present. 

189 

190 Flag-bit assignments (from GSS YAML): 

191 Bit 0: More Data -- **inverted**: 0 -> Inst. Speed present, 1 -> absent 

192 Bit 1: Average Speed present 

193 Bit 2: Total Distance present 

194 Bit 3: Inclination and Ramp Angle Setting present (gates 2 fields) 

195 Bit 4: Elevation Gain present (gates 2 fields: pos + neg) 

196 Bit 5: Instantaneous Pace present 

197 Bit 6: Average Pace present 

198 Bit 7: Expended Energy present (gates triplet: total + /hr + /min) 

199 Bit 8: Heart Rate present 

200 Bit 9: Metabolic Equivalent present 

201 Bit 10: Elapsed Time present 

202 Bit 11: Remaining Time present 

203 Bit 12: Force On Belt and Power Output present (gates 2 fields) 

204 Bits 13-15: Reserved for Future Use 

205 

206 """ 

207 

208 expected_type = TreadmillData 

209 min_length: int = 2 # Flags only (all optional fields absent) 

210 allow_variable_length: bool = True 

211 

212 def _decode_value( 

213 self, 

214 data: bytearray, 

215 ctx: CharacteristicContext | None = None, 

216 *, 

217 validate: bool = True, 

218 ) -> TreadmillData: 

219 """Parse Treadmill Data from raw BLE bytes. 

220 

221 Args: 

222 data: Raw bytearray from BLE characteristic. 

223 ctx: Optional context (unused). 

224 validate: Whether to validate ranges. 

225 

226 Returns: 

227 TreadmillData with all present fields populated. 

228 

229 """ 

230 flags = TreadmillDataFlags(DataParser.parse_int16(data, 0, signed=False)) 

231 offset = 2 

232 

233 # Bit 0 -- inverted: Instantaneous Speed present when bit is NOT set 

234 instantaneous_speed = None 

235 if not (flags & TreadmillDataFlags.MORE_DATA) and len(data) >= offset + 2: 

236 raw_speed = DataParser.parse_int16(data, offset, signed=False) 

237 instantaneous_speed = raw_speed / _SPEED_RESOLUTION 

238 offset += 2 

239 

240 # Bit 1 -- Average Speed 

241 average_speed = None 

242 if (flags & TreadmillDataFlags.AVERAGE_SPEED_PRESENT) and len(data) >= offset + 2: 

243 raw_avg_speed = DataParser.parse_int16(data, offset, signed=False) 

244 average_speed = raw_avg_speed / _SPEED_RESOLUTION 

245 offset += 2 

246 

247 # Bit 2 -- Total Distance (uint24) 

248 total_distance = None 

249 if (flags & TreadmillDataFlags.TOTAL_DISTANCE_PRESENT) and len(data) >= offset + 3: 

250 total_distance = DataParser.parse_int24(data, offset, signed=False) 

251 offset += 3 

252 

253 # Bit 3 -- Inclination (sint16) + Ramp Angle Setting (sint16) 

254 inclination = None 

255 ramp_angle_setting = None 

256 if (flags & TreadmillDataFlags.INCLINATION_AND_RAMP_PRESENT) and len(data) >= offset + 4: 

257 raw_incl = DataParser.parse_int16(data, offset, signed=True) 

258 inclination = raw_incl / _TENTH_RESOLUTION 

259 offset += 2 

260 raw_ramp = DataParser.parse_int16(data, offset, signed=True) 

261 ramp_angle_setting = raw_ramp / _TENTH_RESOLUTION 

262 offset += 2 

263 

264 # Bit 4 -- Positive Elevation Gain (uint16) + Negative Elevation Gain (uint16) 

265 positive_elevation_gain = None 

266 negative_elevation_gain = None 

267 if (flags & TreadmillDataFlags.ELEVATION_GAIN_PRESENT) and len(data) >= offset + 4: 

268 raw_pos = DataParser.parse_int16(data, offset, signed=False) 

269 positive_elevation_gain = raw_pos / _TENTH_RESOLUTION 

270 offset += 2 

271 raw_neg = DataParser.parse_int16(data, offset, signed=False) 

272 negative_elevation_gain = raw_neg / _TENTH_RESOLUTION 

273 offset += 2 

274 

275 # Bit 5 -- Instantaneous Pace 

276 instantaneous_pace = None 

277 if (flags & TreadmillDataFlags.INSTANTANEOUS_PACE_PRESENT) and len(data) >= offset + 2: 

278 instantaneous_pace = DataParser.parse_int16(data, offset, signed=False) 

279 offset += 2 

280 

281 # Bit 6 -- Average Pace 

282 average_pace = None 

283 if (flags & TreadmillDataFlags.AVERAGE_PACE_PRESENT) and len(data) >= offset + 2: 

284 average_pace = DataParser.parse_int16(data, offset, signed=False) 

285 offset += 2 

286 

287 # Bit 7 -- Energy triplet (Total + Per Hour + Per Minute) 

288 total_energy = None 

289 energy_per_hour = None 

290 energy_per_minute = None 

291 if flags & TreadmillDataFlags.EXPENDED_ENERGY_PRESENT: 

292 total_energy, energy_per_hour, energy_per_minute, offset = decode_energy_triplet(data, offset) 

293 

294 # Bit 8 -- Heart Rate 

295 heart_rate = None 

296 if flags & TreadmillDataFlags.HEART_RATE_PRESENT: 

297 heart_rate, offset = decode_heart_rate(data, offset) 

298 

299 # Bit 9 -- Metabolic Equivalent 

300 metabolic_equivalent = None 

301 if flags & TreadmillDataFlags.METABOLIC_EQUIVALENT_PRESENT: 

302 metabolic_equivalent, offset = decode_metabolic_equivalent(data, offset) 

303 

304 # Bit 10 -- Elapsed Time 

305 elapsed_time = None 

306 if flags & TreadmillDataFlags.ELAPSED_TIME_PRESENT: 

307 elapsed_time, offset = decode_elapsed_time(data, offset) 

308 

309 # Bit 11 -- Remaining Time 

310 remaining_time = None 

311 if flags & TreadmillDataFlags.REMAINING_TIME_PRESENT: 

312 remaining_time, offset = decode_remaining_time(data, offset) 

313 

314 # Bit 12 -- Force On Belt (sint16) + Power Output (sint16) 

315 force_on_belt = None 

316 power_output = None 

317 if (flags & TreadmillDataFlags.FORCE_AND_POWER_PRESENT) and len(data) >= offset + 4: 

318 force_on_belt = DataParser.parse_int16(data, offset, signed=True) 

319 offset += 2 

320 power_output = DataParser.parse_int16(data, offset, signed=True) 

321 offset += 2 

322 

323 return TreadmillData( 

324 flags=flags, 

325 instantaneous_speed=instantaneous_speed, 

326 average_speed=average_speed, 

327 total_distance=total_distance, 

328 inclination=inclination, 

329 ramp_angle_setting=ramp_angle_setting, 

330 positive_elevation_gain=positive_elevation_gain, 

331 negative_elevation_gain=negative_elevation_gain, 

332 instantaneous_pace=instantaneous_pace, 

333 average_pace=average_pace, 

334 total_energy=total_energy, 

335 energy_per_hour=energy_per_hour, 

336 energy_per_minute=energy_per_minute, 

337 heart_rate=heart_rate, 

338 metabolic_equivalent=metabolic_equivalent, 

339 elapsed_time=elapsed_time, 

340 remaining_time=remaining_time, 

341 force_on_belt=force_on_belt, 

342 power_output=power_output, 

343 ) 

344 

345 def _encode_value(self, data: TreadmillData) -> bytearray: # noqa: PLR0912 

346 """Encode TreadmillData back to BLE bytes. 

347 

348 Reconstructs flags from present fields so round-trip encoding 

349 preserves the original wire format. 

350 

351 Args: 

352 data: TreadmillData instance. 

353 

354 Returns: 

355 Encoded bytearray matching the BLE wire format. 

356 

357 """ 

358 flags = TreadmillDataFlags(0) 

359 

360 # Bit 0 -- inverted: set MORE_DATA when Speed is absent 

361 if data.instantaneous_speed is None: 

362 flags |= TreadmillDataFlags.MORE_DATA 

363 if data.average_speed is not None: 

364 flags |= TreadmillDataFlags.AVERAGE_SPEED_PRESENT 

365 if data.total_distance is not None: 

366 flags |= TreadmillDataFlags.TOTAL_DISTANCE_PRESENT 

367 if data.inclination is not None: 

368 flags |= TreadmillDataFlags.INCLINATION_AND_RAMP_PRESENT 

369 if data.positive_elevation_gain is not None: 

370 flags |= TreadmillDataFlags.ELEVATION_GAIN_PRESENT 

371 if data.instantaneous_pace is not None: 

372 flags |= TreadmillDataFlags.INSTANTANEOUS_PACE_PRESENT 

373 if data.average_pace is not None: 

374 flags |= TreadmillDataFlags.AVERAGE_PACE_PRESENT 

375 if data.total_energy is not None: 

376 flags |= TreadmillDataFlags.EXPENDED_ENERGY_PRESENT 

377 if data.heart_rate is not None: 

378 flags |= TreadmillDataFlags.HEART_RATE_PRESENT 

379 if data.metabolic_equivalent is not None: 

380 flags |= TreadmillDataFlags.METABOLIC_EQUIVALENT_PRESENT 

381 if data.elapsed_time is not None: 

382 flags |= TreadmillDataFlags.ELAPSED_TIME_PRESENT 

383 if data.remaining_time is not None: 

384 flags |= TreadmillDataFlags.REMAINING_TIME_PRESENT 

385 if data.force_on_belt is not None: 

386 flags |= TreadmillDataFlags.FORCE_AND_POWER_PRESENT 

387 

388 result = DataParser.encode_int16(int(flags), signed=False) 

389 

390 if data.instantaneous_speed is not None: 

391 raw_speed = round(data.instantaneous_speed * _SPEED_RESOLUTION) 

392 result.extend(DataParser.encode_int16(raw_speed, signed=False)) 

393 if data.average_speed is not None: 

394 raw_avg_speed = round(data.average_speed * _SPEED_RESOLUTION) 

395 result.extend(DataParser.encode_int16(raw_avg_speed, signed=False)) 

396 if data.total_distance is not None: 

397 result.extend(DataParser.encode_int24(data.total_distance, signed=False)) 

398 if data.inclination is not None: 

399 raw_incl = round(data.inclination * _TENTH_RESOLUTION) 

400 result.extend(DataParser.encode_int16(raw_incl, signed=True)) 

401 if data.ramp_angle_setting is not None: 

402 raw_ramp = round(data.ramp_angle_setting * _TENTH_RESOLUTION) 

403 result.extend(DataParser.encode_int16(raw_ramp, signed=True)) 

404 if data.positive_elevation_gain is not None: 

405 raw_pos = round(data.positive_elevation_gain * _TENTH_RESOLUTION) 

406 result.extend(DataParser.encode_int16(raw_pos, signed=False)) 

407 if data.negative_elevation_gain is not None: 

408 raw_neg = round(data.negative_elevation_gain * _TENTH_RESOLUTION) 

409 result.extend(DataParser.encode_int16(raw_neg, signed=False)) 

410 if data.instantaneous_pace is not None: 

411 result.extend(DataParser.encode_int16(data.instantaneous_pace, signed=False)) 

412 if data.average_pace is not None: 

413 result.extend(DataParser.encode_int16(data.average_pace, signed=False)) 

414 if data.total_energy is not None: 

415 result.extend(encode_energy_triplet(data.total_energy, data.energy_per_hour, data.energy_per_minute)) 

416 if data.heart_rate is not None: 

417 result.extend(encode_heart_rate(data.heart_rate)) 

418 if data.metabolic_equivalent is not None: 

419 result.extend(encode_metabolic_equivalent(data.metabolic_equivalent)) 

420 if data.elapsed_time is not None: 

421 result.extend(encode_elapsed_time(data.elapsed_time)) 

422 if data.remaining_time is not None: 

423 result.extend(encode_remaining_time(data.remaining_time)) 

424 if data.force_on_belt is not None: 

425 result.extend(DataParser.encode_int16(data.force_on_belt, signed=True)) 

426 if data.power_output is not None: 

427 result.extend(DataParser.encode_int16(data.power_output, signed=True)) 

428 

429 return result