Coverage for src / bluetooth_sig / gatt / characteristics / treadmill_data.py: 92%
221 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
1"""Treadmill Data characteristic implementation.
3Implements the Treadmill Data characteristic (0x2ACD) from the Fitness Machine
4Service. A 16-bit flags field controls the presence of optional data fields.
6Bit 0 ("More Data") uses **inverted logic**: when bit 0 is 0 the
7Instantaneous Speed field IS present; when bit 0 is 1 it is absent.
8All other bits use normal logic (1 = present).
10References:
11 Bluetooth SIG Fitness Machine Service 1.0
12 org.bluetooth.characteristic.treadmill_data (GSS YAML)
13"""
15from __future__ import annotations
17from enum import IntFlag
19import msgspec
21from ..constants import SINT16_MAX, SINT16_MIN, UINT8_MAX, UINT16_MAX, UINT24_MAX
22from ..context import CharacteristicContext
23from .base import BaseCharacteristic
24from .fitness_machine_common import (
25 MET_RESOLUTION,
26 decode_elapsed_time,
27 decode_energy_triplet,
28 decode_heart_rate,
29 decode_metabolic_equivalent,
30 decode_remaining_time,
31 encode_elapsed_time,
32 encode_energy_triplet,
33 encode_heart_rate,
34 encode_metabolic_equivalent,
35 encode_remaining_time,
36)
37from .utils import DataParser
39# Speed: M=1, d=-2, b=0 -> actual = raw / 100 km/h
40_SPEED_RESOLUTION = 100.0
42# Inclination: M=1, d=-1, b=0 -> actual = raw / 10 %
43# Ramp Angle: M=1, d=-1, b=0 -> actual = raw / 10 degrees
44# Elevation: M=1, d=-1, b=0 -> actual = raw / 10 metres
45_TENTH_RESOLUTION = 10.0
48class TreadmillDataFlags(IntFlag):
49 """Treadmill Data flags as per Bluetooth SIG specification.
51 Bit 0 uses inverted logic: 0 = Instantaneous Speed present, 1 = absent.
52 """
54 MORE_DATA = 0x0001 # Inverted: 0 -> Speed present, 1 -> absent
55 AVERAGE_SPEED_PRESENT = 0x0002
56 TOTAL_DISTANCE_PRESENT = 0x0004
57 INCLINATION_AND_RAMP_PRESENT = 0x0008
58 ELEVATION_GAIN_PRESENT = 0x0010
59 INSTANTANEOUS_PACE_PRESENT = 0x0020
60 AVERAGE_PACE_PRESENT = 0x0040
61 EXPENDED_ENERGY_PRESENT = 0x0080
62 HEART_RATE_PRESENT = 0x0100
63 METABOLIC_EQUIVALENT_PRESENT = 0x0200
64 ELAPSED_TIME_PRESENT = 0x0400
65 REMAINING_TIME_PRESENT = 0x0800
66 FORCE_AND_POWER_PRESENT = 0x1000
69class TreadmillData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods,too-many-instance-attributes
70 """Parsed data from Treadmill Data characteristic.
72 Attributes:
73 flags: Raw 16-bit flags field.
74 instantaneous_speed: Instantaneous belt speed in km/h (0.01 resolution).
75 average_speed: Average speed in km/h (0.01 resolution).
76 total_distance: Total distance in metres (uint24).
77 inclination: Current inclination in % (0.1 resolution, signed).
78 ramp_angle_setting: Current ramp angle in degrees (0.1 resolution, signed).
79 positive_elevation_gain: Positive elevation gain in metres (0.1 resolution).
80 negative_elevation_gain: Negative elevation gain in metres (0.1 resolution).
81 instantaneous_pace: Instantaneous pace in seconds per 500 m.
82 average_pace: Average pace in seconds per 500 m.
83 total_energy: Total expended energy in kcal.
84 energy_per_hour: Expended energy per hour in kcal.
85 energy_per_minute: Expended energy per minute in kcal.
86 heart_rate: Heart rate in bpm.
87 metabolic_equivalent: MET value (0.1 resolution).
88 elapsed_time: Elapsed time in seconds.
89 remaining_time: Remaining time in seconds.
90 force_on_belt: Force on belt in newtons (signed).
91 power_output: Power output in watts (signed).
93 """
95 flags: TreadmillDataFlags
96 instantaneous_speed: float | None = None
97 average_speed: float | None = None
98 total_distance: int | None = None
99 inclination: float | None = None
100 ramp_angle_setting: float | None = None
101 positive_elevation_gain: float | None = None
102 negative_elevation_gain: float | None = None
103 instantaneous_pace: int | None = None
104 average_pace: int | None = None
105 total_energy: int | None = None
106 energy_per_hour: int | None = None
107 energy_per_minute: int | None = None
108 heart_rate: int | None = None
109 metabolic_equivalent: float | None = None
110 elapsed_time: int | None = None
111 remaining_time: int | None = None
112 force_on_belt: int | None = None
113 power_output: int | None = None
115 def __post_init__(self) -> None:
116 """Validate field ranges."""
117 if (
118 self.instantaneous_speed is not None
119 and not 0.0 <= self.instantaneous_speed <= UINT16_MAX / _SPEED_RESOLUTION
120 ):
121 raise ValueError(
122 f"Instantaneous speed must be 0.0-{UINT16_MAX / _SPEED_RESOLUTION}, got {self.instantaneous_speed}"
123 )
124 if self.average_speed is not None and not 0.0 <= self.average_speed <= UINT16_MAX / _SPEED_RESOLUTION:
125 raise ValueError(f"Average speed must be 0.0-{UINT16_MAX / _SPEED_RESOLUTION}, got {self.average_speed}")
126 if self.total_distance is not None and not 0 <= self.total_distance <= UINT24_MAX:
127 raise ValueError(f"Total distance must be 0-{UINT24_MAX}, got {self.total_distance}")
128 if (
129 self.inclination is not None
130 and not SINT16_MIN / _TENTH_RESOLUTION <= self.inclination <= SINT16_MAX / _TENTH_RESOLUTION
131 ):
132 raise ValueError(
133 f"Inclination must be {SINT16_MIN / _TENTH_RESOLUTION}-{SINT16_MAX / _TENTH_RESOLUTION}, "
134 f"got {self.inclination}"
135 )
136 if (
137 self.ramp_angle_setting is not None
138 and not SINT16_MIN / _TENTH_RESOLUTION <= self.ramp_angle_setting <= SINT16_MAX / _TENTH_RESOLUTION
139 ):
140 raise ValueError(
141 f"Ramp angle must be {SINT16_MIN / _TENTH_RESOLUTION}-{SINT16_MAX / _TENTH_RESOLUTION}, "
142 f"got {self.ramp_angle_setting}"
143 )
144 if (
145 self.positive_elevation_gain is not None
146 and not 0.0 <= self.positive_elevation_gain <= UINT16_MAX / _TENTH_RESOLUTION
147 ):
148 raise ValueError(
149 f"Positive elevation must be 0.0-{UINT16_MAX / _TENTH_RESOLUTION}, got {self.positive_elevation_gain}"
150 )
151 if (
152 self.negative_elevation_gain is not None
153 and not 0.0 <= self.negative_elevation_gain <= UINT16_MAX / _TENTH_RESOLUTION
154 ):
155 raise ValueError(
156 f"Negative elevation must be 0.0-{UINT16_MAX / _TENTH_RESOLUTION}, got {self.negative_elevation_gain}"
157 )
158 if self.instantaneous_pace is not None and not 0 <= self.instantaneous_pace <= UINT16_MAX:
159 raise ValueError(f"Instantaneous pace must be 0-{UINT16_MAX}, got {self.instantaneous_pace}")
160 if self.average_pace is not None and not 0 <= self.average_pace <= UINT16_MAX:
161 raise ValueError(f"Average pace must be 0-{UINT16_MAX}, got {self.average_pace}")
162 if self.total_energy is not None and not 0 <= self.total_energy <= UINT16_MAX:
163 raise ValueError(f"Total energy must be 0-{UINT16_MAX}, got {self.total_energy}")
164 if self.energy_per_hour is not None and not 0 <= self.energy_per_hour <= UINT16_MAX:
165 raise ValueError(f"Energy per hour must be 0-{UINT16_MAX}, got {self.energy_per_hour}")
166 if self.energy_per_minute is not None and not 0 <= self.energy_per_minute <= UINT8_MAX:
167 raise ValueError(f"Energy per minute must be 0-{UINT8_MAX}, got {self.energy_per_minute}")
168 if self.heart_rate is not None and not 0 <= self.heart_rate <= UINT8_MAX:
169 raise ValueError(f"Heart rate must be 0-{UINT8_MAX}, got {self.heart_rate}")
170 if self.metabolic_equivalent is not None and not 0.0 <= self.metabolic_equivalent <= UINT8_MAX / MET_RESOLUTION:
171 raise ValueError(
172 f"Metabolic equivalent must be 0.0-{UINT8_MAX / MET_RESOLUTION}, got {self.metabolic_equivalent}"
173 )
174 if self.elapsed_time is not None and not 0 <= self.elapsed_time <= UINT16_MAX:
175 raise ValueError(f"Elapsed time must be 0-{UINT16_MAX}, got {self.elapsed_time}")
176 if self.remaining_time is not None and not 0 <= self.remaining_time <= UINT16_MAX:
177 raise ValueError(f"Remaining time must be 0-{UINT16_MAX}, got {self.remaining_time}")
178 if self.force_on_belt is not None and not SINT16_MIN <= self.force_on_belt <= SINT16_MAX:
179 raise ValueError(f"Force on belt must be {SINT16_MIN}-{SINT16_MAX}, got {self.force_on_belt}")
180 if self.power_output is not None and not SINT16_MIN <= self.power_output <= SINT16_MAX:
181 raise ValueError(f"Power output must be {SINT16_MIN}-{SINT16_MAX}, got {self.power_output}")
184class TreadmillDataCharacteristic(BaseCharacteristic[TreadmillData]):
185 """Treadmill Data characteristic (0x2ACD).
187 Used in the Fitness Machine Service to transmit treadmill workout data.
188 A 16-bit flags field controls which optional fields are present.
190 Flag-bit assignments (from GSS YAML):
191 Bit 0: More Data -- **inverted**: 0 -> Inst. Speed present, 1 -> absent
192 Bit 1: Average Speed present
193 Bit 2: Total Distance present
194 Bit 3: Inclination and Ramp Angle Setting present (gates 2 fields)
195 Bit 4: Elevation Gain present (gates 2 fields: pos + neg)
196 Bit 5: Instantaneous Pace present
197 Bit 6: Average Pace present
198 Bit 7: Expended Energy present (gates triplet: total + /hr + /min)
199 Bit 8: Heart Rate present
200 Bit 9: Metabolic Equivalent present
201 Bit 10: Elapsed Time present
202 Bit 11: Remaining Time present
203 Bit 12: Force On Belt and Power Output present (gates 2 fields)
204 Bits 13-15: Reserved for Future Use
206 """
208 expected_type = TreadmillData
209 min_length: int = 2 # Flags only (all optional fields absent)
210 allow_variable_length: bool = True
212 def _decode_value(
213 self,
214 data: bytearray,
215 ctx: CharacteristicContext | None = None,
216 *,
217 validate: bool = True,
218 ) -> TreadmillData:
219 """Parse Treadmill Data from raw BLE bytes.
221 Args:
222 data: Raw bytearray from BLE characteristic.
223 ctx: Optional context (unused).
224 validate: Whether to validate ranges.
226 Returns:
227 TreadmillData with all present fields populated.
229 """
230 flags = TreadmillDataFlags(DataParser.parse_int16(data, 0, signed=False))
231 offset = 2
233 # Bit 0 -- inverted: Instantaneous Speed present when bit is NOT set
234 instantaneous_speed = None
235 if not (flags & TreadmillDataFlags.MORE_DATA) and len(data) >= offset + 2:
236 raw_speed = DataParser.parse_int16(data, offset, signed=False)
237 instantaneous_speed = raw_speed / _SPEED_RESOLUTION
238 offset += 2
240 # Bit 1 -- Average Speed
241 average_speed = None
242 if (flags & TreadmillDataFlags.AVERAGE_SPEED_PRESENT) and len(data) >= offset + 2:
243 raw_avg_speed = DataParser.parse_int16(data, offset, signed=False)
244 average_speed = raw_avg_speed / _SPEED_RESOLUTION
245 offset += 2
247 # Bit 2 -- Total Distance (uint24)
248 total_distance = None
249 if (flags & TreadmillDataFlags.TOTAL_DISTANCE_PRESENT) and len(data) >= offset + 3:
250 total_distance = DataParser.parse_int24(data, offset, signed=False)
251 offset += 3
253 # Bit 3 -- Inclination (sint16) + Ramp Angle Setting (sint16)
254 inclination = None
255 ramp_angle_setting = None
256 if (flags & TreadmillDataFlags.INCLINATION_AND_RAMP_PRESENT) and len(data) >= offset + 4:
257 raw_incl = DataParser.parse_int16(data, offset, signed=True)
258 inclination = raw_incl / _TENTH_RESOLUTION
259 offset += 2
260 raw_ramp = DataParser.parse_int16(data, offset, signed=True)
261 ramp_angle_setting = raw_ramp / _TENTH_RESOLUTION
262 offset += 2
264 # Bit 4 -- Positive Elevation Gain (uint16) + Negative Elevation Gain (uint16)
265 positive_elevation_gain = None
266 negative_elevation_gain = None
267 if (flags & TreadmillDataFlags.ELEVATION_GAIN_PRESENT) and len(data) >= offset + 4:
268 raw_pos = DataParser.parse_int16(data, offset, signed=False)
269 positive_elevation_gain = raw_pos / _TENTH_RESOLUTION
270 offset += 2
271 raw_neg = DataParser.parse_int16(data, offset, signed=False)
272 negative_elevation_gain = raw_neg / _TENTH_RESOLUTION
273 offset += 2
275 # Bit 5 -- Instantaneous Pace
276 instantaneous_pace = None
277 if (flags & TreadmillDataFlags.INSTANTANEOUS_PACE_PRESENT) and len(data) >= offset + 2:
278 instantaneous_pace = DataParser.parse_int16(data, offset, signed=False)
279 offset += 2
281 # Bit 6 -- Average Pace
282 average_pace = None
283 if (flags & TreadmillDataFlags.AVERAGE_PACE_PRESENT) and len(data) >= offset + 2:
284 average_pace = DataParser.parse_int16(data, offset, signed=False)
285 offset += 2
287 # Bit 7 -- Energy triplet (Total + Per Hour + Per Minute)
288 total_energy = None
289 energy_per_hour = None
290 energy_per_minute = None
291 if flags & TreadmillDataFlags.EXPENDED_ENERGY_PRESENT:
292 total_energy, energy_per_hour, energy_per_minute, offset = decode_energy_triplet(data, offset)
294 # Bit 8 -- Heart Rate
295 heart_rate = None
296 if flags & TreadmillDataFlags.HEART_RATE_PRESENT:
297 heart_rate, offset = decode_heart_rate(data, offset)
299 # Bit 9 -- Metabolic Equivalent
300 metabolic_equivalent = None
301 if flags & TreadmillDataFlags.METABOLIC_EQUIVALENT_PRESENT:
302 metabolic_equivalent, offset = decode_metabolic_equivalent(data, offset)
304 # Bit 10 -- Elapsed Time
305 elapsed_time = None
306 if flags & TreadmillDataFlags.ELAPSED_TIME_PRESENT:
307 elapsed_time, offset = decode_elapsed_time(data, offset)
309 # Bit 11 -- Remaining Time
310 remaining_time = None
311 if flags & TreadmillDataFlags.REMAINING_TIME_PRESENT:
312 remaining_time, offset = decode_remaining_time(data, offset)
314 # Bit 12 -- Force On Belt (sint16) + Power Output (sint16)
315 force_on_belt = None
316 power_output = None
317 if (flags & TreadmillDataFlags.FORCE_AND_POWER_PRESENT) and len(data) >= offset + 4:
318 force_on_belt = DataParser.parse_int16(data, offset, signed=True)
319 offset += 2
320 power_output = DataParser.parse_int16(data, offset, signed=True)
321 offset += 2
323 return TreadmillData(
324 flags=flags,
325 instantaneous_speed=instantaneous_speed,
326 average_speed=average_speed,
327 total_distance=total_distance,
328 inclination=inclination,
329 ramp_angle_setting=ramp_angle_setting,
330 positive_elevation_gain=positive_elevation_gain,
331 negative_elevation_gain=negative_elevation_gain,
332 instantaneous_pace=instantaneous_pace,
333 average_pace=average_pace,
334 total_energy=total_energy,
335 energy_per_hour=energy_per_hour,
336 energy_per_minute=energy_per_minute,
337 heart_rate=heart_rate,
338 metabolic_equivalent=metabolic_equivalent,
339 elapsed_time=elapsed_time,
340 remaining_time=remaining_time,
341 force_on_belt=force_on_belt,
342 power_output=power_output,
343 )
345 def _encode_value(self, data: TreadmillData) -> bytearray: # noqa: PLR0912
346 """Encode TreadmillData back to BLE bytes.
348 Reconstructs flags from present fields so round-trip encoding
349 preserves the original wire format.
351 Args:
352 data: TreadmillData instance.
354 Returns:
355 Encoded bytearray matching the BLE wire format.
357 """
358 flags = TreadmillDataFlags(0)
360 # Bit 0 -- inverted: set MORE_DATA when Speed is absent
361 if data.instantaneous_speed is None:
362 flags |= TreadmillDataFlags.MORE_DATA
363 if data.average_speed is not None:
364 flags |= TreadmillDataFlags.AVERAGE_SPEED_PRESENT
365 if data.total_distance is not None:
366 flags |= TreadmillDataFlags.TOTAL_DISTANCE_PRESENT
367 if data.inclination is not None:
368 flags |= TreadmillDataFlags.INCLINATION_AND_RAMP_PRESENT
369 if data.positive_elevation_gain is not None:
370 flags |= TreadmillDataFlags.ELEVATION_GAIN_PRESENT
371 if data.instantaneous_pace is not None:
372 flags |= TreadmillDataFlags.INSTANTANEOUS_PACE_PRESENT
373 if data.average_pace is not None:
374 flags |= TreadmillDataFlags.AVERAGE_PACE_PRESENT
375 if data.total_energy is not None:
376 flags |= TreadmillDataFlags.EXPENDED_ENERGY_PRESENT
377 if data.heart_rate is not None:
378 flags |= TreadmillDataFlags.HEART_RATE_PRESENT
379 if data.metabolic_equivalent is not None:
380 flags |= TreadmillDataFlags.METABOLIC_EQUIVALENT_PRESENT
381 if data.elapsed_time is not None:
382 flags |= TreadmillDataFlags.ELAPSED_TIME_PRESENT
383 if data.remaining_time is not None:
384 flags |= TreadmillDataFlags.REMAINING_TIME_PRESENT
385 if data.force_on_belt is not None:
386 flags |= TreadmillDataFlags.FORCE_AND_POWER_PRESENT
388 result = DataParser.encode_int16(int(flags), signed=False)
390 if data.instantaneous_speed is not None:
391 raw_speed = round(data.instantaneous_speed * _SPEED_RESOLUTION)
392 result.extend(DataParser.encode_int16(raw_speed, signed=False))
393 if data.average_speed is not None:
394 raw_avg_speed = round(data.average_speed * _SPEED_RESOLUTION)
395 result.extend(DataParser.encode_int16(raw_avg_speed, signed=False))
396 if data.total_distance is not None:
397 result.extend(DataParser.encode_int24(data.total_distance, signed=False))
398 if data.inclination is not None:
399 raw_incl = round(data.inclination * _TENTH_RESOLUTION)
400 result.extend(DataParser.encode_int16(raw_incl, signed=True))
401 if data.ramp_angle_setting is not None:
402 raw_ramp = round(data.ramp_angle_setting * _TENTH_RESOLUTION)
403 result.extend(DataParser.encode_int16(raw_ramp, signed=True))
404 if data.positive_elevation_gain is not None:
405 raw_pos = round(data.positive_elevation_gain * _TENTH_RESOLUTION)
406 result.extend(DataParser.encode_int16(raw_pos, signed=False))
407 if data.negative_elevation_gain is not None:
408 raw_neg = round(data.negative_elevation_gain * _TENTH_RESOLUTION)
409 result.extend(DataParser.encode_int16(raw_neg, signed=False))
410 if data.instantaneous_pace is not None:
411 result.extend(DataParser.encode_int16(data.instantaneous_pace, signed=False))
412 if data.average_pace is not None:
413 result.extend(DataParser.encode_int16(data.average_pace, signed=False))
414 if data.total_energy is not None:
415 result.extend(encode_energy_triplet(data.total_energy, data.energy_per_hour, data.energy_per_minute))
416 if data.heart_rate is not None:
417 result.extend(encode_heart_rate(data.heart_rate))
418 if data.metabolic_equivalent is not None:
419 result.extend(encode_metabolic_equivalent(data.metabolic_equivalent))
420 if data.elapsed_time is not None:
421 result.extend(encode_elapsed_time(data.elapsed_time))
422 if data.remaining_time is not None:
423 result.extend(encode_remaining_time(data.remaining_time))
424 if data.force_on_belt is not None:
425 result.extend(DataParser.encode_int16(data.force_on_belt, signed=True))
426 if data.power_output is not None:
427 result.extend(DataParser.encode_int16(data.power_output, signed=True))
429 return result