Coverage for src / bluetooth_sig / gatt / characteristics / indoor_bike_data.py: 92%
196 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
1"""Indoor Bike Data characteristic implementation.
3Implements the Indoor Bike Data characteristic (0x2AD2) from the Fitness
4Machine Service. A 16-bit flags field controls the presence of optional
5data fields.
7Bit 0 ("More Data") uses **inverted logic**: when bit 0 is 0 the
8Instantaneous Speed field IS present; when bit 0 is 1 it is absent.
9All other bits use normal logic (1 = present).
11References:
12 Bluetooth SIG Fitness Machine Service 1.0
13 org.bluetooth.characteristic.indoor_bike_data (GSS YAML)
14"""
16from __future__ import annotations
18from enum import IntFlag
20import msgspec
22from ..constants import SINT16_MAX, SINT16_MIN, UINT8_MAX, UINT16_MAX, UINT24_MAX
23from ..context import CharacteristicContext
24from .base import BaseCharacteristic
25from .fitness_machine_common import (
26 MET_RESOLUTION,
27 decode_elapsed_time,
28 decode_energy_triplet,
29 decode_heart_rate,
30 decode_metabolic_equivalent,
31 decode_remaining_time,
32 encode_elapsed_time,
33 encode_energy_triplet,
34 encode_heart_rate,
35 encode_metabolic_equivalent,
36 encode_remaining_time,
37)
38from .utils import DataParser
40# Speed: M=1, d=-2, b=0 -> actual = raw / 100 km/h
41_SPEED_RESOLUTION = 100.0
43# Cadence: M=1, d=0, b=-1 -> actual = raw / 2 rpm
44_CADENCE_DIVISOR = 2.0
46# Resistance level: M=1, d=1, b=0 -> actual = raw * 10
47_RESISTANCE_RESOLUTION = 10.0
50class IndoorBikeDataFlags(IntFlag):
51 """Indoor Bike Data flags as per Bluetooth SIG specification.
53 Bit 0 uses inverted logic: 0 = Instantaneous Speed present,
54 1 = absent.
55 """
57 MORE_DATA = 0x0001 # Inverted: 0 -> Speed present, 1 -> absent
58 AVERAGE_SPEED_PRESENT = 0x0002
59 INSTANTANEOUS_CADENCE_PRESENT = 0x0004
60 AVERAGE_CADENCE_PRESENT = 0x0008
61 TOTAL_DISTANCE_PRESENT = 0x0010
62 RESISTANCE_LEVEL_PRESENT = 0x0020
63 INSTANTANEOUS_POWER_PRESENT = 0x0040
64 AVERAGE_POWER_PRESENT = 0x0080
65 EXPENDED_ENERGY_PRESENT = 0x0100
66 HEART_RATE_PRESENT = 0x0200
67 METABOLIC_EQUIVALENT_PRESENT = 0x0400
68 ELAPSED_TIME_PRESENT = 0x0800
69 REMAINING_TIME_PRESENT = 0x1000
72class IndoorBikeData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods,too-many-instance-attributes
73 """Parsed data from Indoor Bike Data characteristic.
75 Attributes:
76 flags: Raw 16-bit flags field.
77 instantaneous_speed: Instantaneous speed in km/h (0.01 resolution).
78 average_speed: Average speed in km/h (0.01 resolution).
79 instantaneous_cadence: Instantaneous cadence in rpm (0.5 resolution).
80 average_cadence: Average cadence in rpm (0.5 resolution).
81 total_distance: Total distance in metres (uint24).
82 resistance_level: Resistance level (unitless, resolution 10).
83 instantaneous_power: Instantaneous power in watts (signed).
84 average_power: Average power in watts (signed).
85 total_energy: Total expended energy in kcal.
86 energy_per_hour: Expended energy per hour in kcal.
87 energy_per_minute: Expended energy per minute in kcal.
88 heart_rate: Heart rate in bpm.
89 metabolic_equivalent: MET value (0.1 resolution).
90 elapsed_time: Elapsed time in seconds.
91 remaining_time: Remaining time in seconds.
93 """
95 flags: IndoorBikeDataFlags
96 instantaneous_speed: float | None = None
97 average_speed: float | None = None
98 instantaneous_cadence: float | None = None
99 average_cadence: float | None = None
100 total_distance: int | None = None
101 resistance_level: float | None = None
102 instantaneous_power: int | None = None
103 average_power: int | None = None
104 total_energy: int | None = None
105 energy_per_hour: int | None = None
106 energy_per_minute: int | None = None
107 heart_rate: int | None = None
108 metabolic_equivalent: float | None = None
109 elapsed_time: int | None = None
110 remaining_time: int | None = None
112 def __post_init__(self) -> None:
113 """Validate field ranges."""
114 if (
115 self.instantaneous_speed is not None
116 and not 0.0 <= self.instantaneous_speed <= UINT16_MAX / _SPEED_RESOLUTION
117 ):
118 raise ValueError(
119 f"Instantaneous speed must be 0.0-{UINT16_MAX / _SPEED_RESOLUTION}, got {self.instantaneous_speed}"
120 )
121 if self.average_speed is not None and not 0.0 <= self.average_speed <= UINT16_MAX / _SPEED_RESOLUTION:
122 raise ValueError(f"Average speed must be 0.0-{UINT16_MAX / _SPEED_RESOLUTION}, got {self.average_speed}")
123 if (
124 self.instantaneous_cadence is not None
125 and not 0.0 <= self.instantaneous_cadence <= UINT16_MAX / _CADENCE_DIVISOR
126 ):
127 raise ValueError(
128 f"Instantaneous cadence must be 0.0-{UINT16_MAX / _CADENCE_DIVISOR}, got {self.instantaneous_cadence}"
129 )
130 if self.average_cadence is not None and not 0.0 <= self.average_cadence <= UINT16_MAX / _CADENCE_DIVISOR:
131 raise ValueError(f"Average cadence must be 0.0-{UINT16_MAX / _CADENCE_DIVISOR}, got {self.average_cadence}")
132 if self.total_distance is not None and not 0 <= self.total_distance <= UINT24_MAX:
133 raise ValueError(f"Total distance must be 0-{UINT24_MAX}, got {self.total_distance}")
134 if self.resistance_level is not None and not 0.0 <= self.resistance_level <= UINT8_MAX * _RESISTANCE_RESOLUTION:
135 raise ValueError(
136 f"Resistance level must be 0.0-{UINT8_MAX * _RESISTANCE_RESOLUTION}, got {self.resistance_level}"
137 )
138 if self.instantaneous_power is not None and not SINT16_MIN <= self.instantaneous_power <= SINT16_MAX:
139 raise ValueError(f"Instantaneous power must be {SINT16_MIN}-{SINT16_MAX}, got {self.instantaneous_power}")
140 if self.average_power is not None and not SINT16_MIN <= self.average_power <= SINT16_MAX:
141 raise ValueError(f"Average power must be {SINT16_MIN}-{SINT16_MAX}, got {self.average_power}")
142 if self.total_energy is not None and not 0 <= self.total_energy <= UINT16_MAX:
143 raise ValueError(f"Total energy must be 0-{UINT16_MAX}, got {self.total_energy}")
144 if self.energy_per_hour is not None and not 0 <= self.energy_per_hour <= UINT16_MAX:
145 raise ValueError(f"Energy per hour must be 0-{UINT16_MAX}, got {self.energy_per_hour}")
146 if self.energy_per_minute is not None and not 0 <= self.energy_per_minute <= UINT8_MAX:
147 raise ValueError(f"Energy per minute must be 0-{UINT8_MAX}, got {self.energy_per_minute}")
148 if self.heart_rate is not None and not 0 <= self.heart_rate <= UINT8_MAX:
149 raise ValueError(f"Heart rate must be 0-{UINT8_MAX}, got {self.heart_rate}")
150 if self.metabolic_equivalent is not None and not 0.0 <= self.metabolic_equivalent <= UINT8_MAX / MET_RESOLUTION:
151 raise ValueError(
152 f"Metabolic equivalent must be 0.0-{UINT8_MAX / MET_RESOLUTION}, got {self.metabolic_equivalent}"
153 )
154 if self.elapsed_time is not None and not 0 <= self.elapsed_time <= UINT16_MAX:
155 raise ValueError(f"Elapsed time must be 0-{UINT16_MAX}, got {self.elapsed_time}")
156 if self.remaining_time is not None and not 0 <= self.remaining_time <= UINT16_MAX:
157 raise ValueError(f"Remaining time must be 0-{UINT16_MAX}, got {self.remaining_time}")
160class IndoorBikeDataCharacteristic(BaseCharacteristic[IndoorBikeData]):
161 """Indoor Bike Data characteristic (0x2AD2).
163 Used in the Fitness Machine Service to transmit indoor bike workout
164 data. A 16-bit flags field controls which optional fields are present.
166 Flag-bit assignments (from GSS YAML):
167 Bit 0: More Data -- **inverted**: 0 -> Inst. Speed present, 1 -> absent
168 Bit 1: Average Speed present
169 Bit 2: Instantaneous Cadence present
170 Bit 3: Average Cadence present
171 Bit 4: Total Distance present
172 Bit 5: Resistance Level present
173 Bit 6: Instantaneous Power present
174 Bit 7: Average Power present
175 Bit 8: Expended Energy present (gates triplet: total + /hr + /min)
176 Bit 9: Heart Rate present
177 Bit 10: Metabolic Equivalent present
178 Bit 11: Elapsed Time present
179 Bit 12: Remaining Time present
180 Bits 13-15: Reserved for Future Use
182 """
184 expected_type = IndoorBikeData
185 min_length: int = 2 # Flags only (all optional fields absent)
186 allow_variable_length: bool = True
188 def _decode_value(
189 self,
190 data: bytearray,
191 ctx: CharacteristicContext | None = None,
192 *,
193 validate: bool = True,
194 ) -> IndoorBikeData:
195 """Parse Indoor Bike Data from raw BLE bytes.
197 Args:
198 data: Raw bytearray from BLE characteristic.
199 ctx: Optional context (unused).
200 validate: Whether to validate ranges.
202 Returns:
203 IndoorBikeData with all present fields populated.
205 """
206 flags = IndoorBikeDataFlags(DataParser.parse_int16(data, 0, signed=False))
207 offset = 2
209 # Bit 0 -- inverted: Instantaneous Speed present when bit is NOT set
210 instantaneous_speed = None
211 if not (flags & IndoorBikeDataFlags.MORE_DATA) and len(data) >= offset + 2:
212 raw_speed = DataParser.parse_int16(data, offset, signed=False)
213 instantaneous_speed = raw_speed / _SPEED_RESOLUTION
214 offset += 2
216 # Bit 1 -- Average Speed
217 average_speed = None
218 if (flags & IndoorBikeDataFlags.AVERAGE_SPEED_PRESENT) and len(data) >= offset + 2:
219 raw_avg_speed = DataParser.parse_int16(data, offset, signed=False)
220 average_speed = raw_avg_speed / _SPEED_RESOLUTION
221 offset += 2
223 # Bit 2 -- Instantaneous Cadence
224 instantaneous_cadence = None
225 if (flags & IndoorBikeDataFlags.INSTANTANEOUS_CADENCE_PRESENT) and len(data) >= offset + 2:
226 raw_cadence = DataParser.parse_int16(data, offset, signed=False)
227 instantaneous_cadence = raw_cadence / _CADENCE_DIVISOR
228 offset += 2
230 # Bit 3 -- Average Cadence
231 average_cadence = None
232 if (flags & IndoorBikeDataFlags.AVERAGE_CADENCE_PRESENT) and len(data) >= offset + 2:
233 raw_avg_cadence = DataParser.parse_int16(data, offset, signed=False)
234 average_cadence = raw_avg_cadence / _CADENCE_DIVISOR
235 offset += 2
237 # Bit 4 -- Total Distance (uint24)
238 total_distance = None
239 if (flags & IndoorBikeDataFlags.TOTAL_DISTANCE_PRESENT) and len(data) >= offset + 3:
240 total_distance = DataParser.parse_int24(data, offset, signed=False)
241 offset += 3
243 # Bit 5 -- Resistance Level
244 resistance_level = None
245 if (flags & IndoorBikeDataFlags.RESISTANCE_LEVEL_PRESENT) and len(data) >= offset + 1:
246 raw_resistance = DataParser.parse_int8(data, offset, signed=False)
247 resistance_level = raw_resistance * _RESISTANCE_RESOLUTION
248 offset += 1
250 # Bit 6 -- Instantaneous Power (sint16)
251 instantaneous_power = None
252 if (flags & IndoorBikeDataFlags.INSTANTANEOUS_POWER_PRESENT) and len(data) >= offset + 2:
253 instantaneous_power = DataParser.parse_int16(data, offset, signed=True)
254 offset += 2
256 # Bit 7 -- Average Power (sint16)
257 average_power = None
258 if (flags & IndoorBikeDataFlags.AVERAGE_POWER_PRESENT) and len(data) >= offset + 2:
259 average_power = DataParser.parse_int16(data, offset, signed=True)
260 offset += 2
262 # Bit 8 -- Energy triplet (Total + Per Hour + Per Minute)
263 total_energy = None
264 energy_per_hour = None
265 energy_per_minute = None
266 if flags & IndoorBikeDataFlags.EXPENDED_ENERGY_PRESENT:
267 total_energy, energy_per_hour, energy_per_minute, offset = decode_energy_triplet(data, offset)
269 # Bit 9 -- Heart Rate
270 heart_rate = None
271 if flags & IndoorBikeDataFlags.HEART_RATE_PRESENT:
272 heart_rate, offset = decode_heart_rate(data, offset)
274 # Bit 10 -- Metabolic Equivalent
275 metabolic_equivalent = None
276 if flags & IndoorBikeDataFlags.METABOLIC_EQUIVALENT_PRESENT:
277 metabolic_equivalent, offset = decode_metabolic_equivalent(data, offset)
279 # Bit 11 -- Elapsed Time
280 elapsed_time = None
281 if flags & IndoorBikeDataFlags.ELAPSED_TIME_PRESENT:
282 elapsed_time, offset = decode_elapsed_time(data, offset)
284 # Bit 12 -- Remaining Time
285 remaining_time = None
286 if flags & IndoorBikeDataFlags.REMAINING_TIME_PRESENT:
287 remaining_time, offset = decode_remaining_time(data, offset)
289 return IndoorBikeData(
290 flags=flags,
291 instantaneous_speed=instantaneous_speed,
292 average_speed=average_speed,
293 instantaneous_cadence=instantaneous_cadence,
294 average_cadence=average_cadence,
295 total_distance=total_distance,
296 resistance_level=resistance_level,
297 instantaneous_power=instantaneous_power,
298 average_power=average_power,
299 total_energy=total_energy,
300 energy_per_hour=energy_per_hour,
301 energy_per_minute=energy_per_minute,
302 heart_rate=heart_rate,
303 metabolic_equivalent=metabolic_equivalent,
304 elapsed_time=elapsed_time,
305 remaining_time=remaining_time,
306 )
308 def _encode_value(self, data: IndoorBikeData) -> bytearray: # noqa: PLR0912
309 """Encode IndoorBikeData back to BLE bytes.
311 Reconstructs flags from present fields so round-trip encoding
312 preserves the original wire format.
314 Args:
315 data: IndoorBikeData instance.
317 Returns:
318 Encoded bytearray matching the BLE wire format.
320 """
321 flags = IndoorBikeDataFlags(0)
323 # Bit 0 -- inverted: set MORE_DATA when Speed is absent
324 if data.instantaneous_speed is None:
325 flags |= IndoorBikeDataFlags.MORE_DATA
326 if data.average_speed is not None:
327 flags |= IndoorBikeDataFlags.AVERAGE_SPEED_PRESENT
328 if data.instantaneous_cadence is not None:
329 flags |= IndoorBikeDataFlags.INSTANTANEOUS_CADENCE_PRESENT
330 if data.average_cadence is not None:
331 flags |= IndoorBikeDataFlags.AVERAGE_CADENCE_PRESENT
332 if data.total_distance is not None:
333 flags |= IndoorBikeDataFlags.TOTAL_DISTANCE_PRESENT
334 if data.resistance_level is not None:
335 flags |= IndoorBikeDataFlags.RESISTANCE_LEVEL_PRESENT
336 if data.instantaneous_power is not None:
337 flags |= IndoorBikeDataFlags.INSTANTANEOUS_POWER_PRESENT
338 if data.average_power is not None:
339 flags |= IndoorBikeDataFlags.AVERAGE_POWER_PRESENT
340 if data.total_energy is not None:
341 flags |= IndoorBikeDataFlags.EXPENDED_ENERGY_PRESENT
342 if data.heart_rate is not None:
343 flags |= IndoorBikeDataFlags.HEART_RATE_PRESENT
344 if data.metabolic_equivalent is not None:
345 flags |= IndoorBikeDataFlags.METABOLIC_EQUIVALENT_PRESENT
346 if data.elapsed_time is not None:
347 flags |= IndoorBikeDataFlags.ELAPSED_TIME_PRESENT
348 if data.remaining_time is not None:
349 flags |= IndoorBikeDataFlags.REMAINING_TIME_PRESENT
351 result = DataParser.encode_int16(int(flags), signed=False)
353 if data.instantaneous_speed is not None:
354 raw_speed = round(data.instantaneous_speed * _SPEED_RESOLUTION)
355 result.extend(DataParser.encode_int16(raw_speed, signed=False))
356 if data.average_speed is not None:
357 raw_avg_speed = round(data.average_speed * _SPEED_RESOLUTION)
358 result.extend(DataParser.encode_int16(raw_avg_speed, signed=False))
359 if data.instantaneous_cadence is not None:
360 raw_cadence = round(data.instantaneous_cadence * _CADENCE_DIVISOR)
361 result.extend(DataParser.encode_int16(raw_cadence, signed=False))
362 if data.average_cadence is not None:
363 raw_avg_cadence = round(data.average_cadence * _CADENCE_DIVISOR)
364 result.extend(DataParser.encode_int16(raw_avg_cadence, signed=False))
365 if data.total_distance is not None:
366 result.extend(DataParser.encode_int24(data.total_distance, signed=False))
367 if data.resistance_level is not None:
368 raw_resistance = round(data.resistance_level / _RESISTANCE_RESOLUTION)
369 result.extend(DataParser.encode_int8(raw_resistance, signed=False))
370 if data.instantaneous_power is not None:
371 result.extend(DataParser.encode_int16(data.instantaneous_power, signed=True))
372 if data.average_power is not None:
373 result.extend(DataParser.encode_int16(data.average_power, signed=True))
374 if data.total_energy is not None:
375 result.extend(encode_energy_triplet(data.total_energy, data.energy_per_hour, data.energy_per_minute))
376 if data.heart_rate is not None:
377 result.extend(encode_heart_rate(data.heart_rate))
378 if data.metabolic_equivalent is not None:
379 result.extend(encode_metabolic_equivalent(data.metabolic_equivalent))
380 if data.elapsed_time is not None:
381 result.extend(encode_elapsed_time(data.elapsed_time))
382 if data.remaining_time is not None:
383 result.extend(encode_remaining_time(data.remaining_time))
385 return result