Coverage for src / bluetooth_sig / gatt / characteristics / fitness_machine_common.py: 100%
42 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
1"""Shared parsing/encoding utilities for Fitness Machine Service characteristics.
3All six Fitness Machine data characteristics (treadmill, indoor bike, cross
4trainer, rower, stair climber, step climber) share the same trailing optional
5field blocks: energy triplet, heart rate, metabolic equivalent, elapsed time,
6and remaining time. This module provides reusable helpers so each
7characteristic only has to implement its own unique fields.
9References:
10 Bluetooth SIG Fitness Machine Service 1.0
11 org.bluetooth.characteristic.{treadmill,indoor_bike,cross_trainer,
12 rower,stair_climber,step_climber}_data YAML specs in GSS submodule
13"""
15from __future__ import annotations
17from .utils import DataParser
19# ---------------------------------------------------------------------------
20# Scaling constants (from YAML M/d/b parameters)
21# ---------------------------------------------------------------------------
22MET_RESOLUTION = 10.0 # M=1, d=-1, b=0 -> raw / 10
24# ---------------------------------------------------------------------------
25# Decode helpers
26# ---------------------------------------------------------------------------
29def decode_energy_triplet(data: bytearray, offset: int) -> tuple[int | None, int | None, int | None, int]:
30 """Decode the shared Energy triplet (Total + Per Hour + Per Minute).
32 Wire format: uint16 + uint16 + uint8 = 5 bytes, all gated by a single
33 flag bit.
35 Args:
36 data: Raw BLE bytes.
37 offset: Current read position.
39 Returns:
40 (total_energy, energy_per_hour, energy_per_minute, new_offset)
42 """
43 if len(data) < offset + 5:
44 return None, None, None, offset
45 total_energy = DataParser.parse_int16(data, offset, signed=False)
46 energy_per_hour = DataParser.parse_int16(data, offset + 2, signed=False)
47 energy_per_minute = DataParser.parse_int8(data, offset + 4, signed=False)
48 return total_energy, energy_per_hour, energy_per_minute, offset + 5
51def decode_heart_rate(data: bytearray, offset: int) -> tuple[int | None, int]:
52 """Decode the shared Heart Rate field (uint8, bpm).
54 Args:
55 data: Raw BLE bytes.
56 offset: Current read position.
58 Returns:
59 (heart_rate, new_offset)
61 """
62 if len(data) < offset + 1:
63 return None, offset
64 return DataParser.parse_int8(data, offset, signed=False), offset + 1
67def decode_metabolic_equivalent(data: bytearray, offset: int) -> tuple[float | None, int]:
68 """Decode the shared Metabolic Equivalent field (uint8, M=1 d=-1 b=0).
70 Args:
71 data: Raw BLE bytes.
72 offset: Current read position.
74 Returns:
75 (metabolic_equivalent, new_offset)
77 """
78 if len(data) < offset + 1:
79 return None, offset
80 raw = DataParser.parse_int8(data, offset, signed=False)
81 return raw / MET_RESOLUTION, offset + 1
84def decode_elapsed_time(data: bytearray, offset: int) -> tuple[int | None, int]:
85 """Decode the shared Elapsed Time field (uint16, seconds).
87 Args:
88 data: Raw BLE bytes.
89 offset: Current read position.
91 Returns:
92 (elapsed_time, new_offset)
94 """
95 if len(data) < offset + 2:
96 return None, offset
97 return DataParser.parse_int16(data, offset, signed=False), offset + 2
100def decode_remaining_time(data: bytearray, offset: int) -> tuple[int | None, int]:
101 """Decode the shared Remaining Time field (uint16, seconds).
103 Args:
104 data: Raw BLE bytes.
105 offset: Current read position.
107 Returns:
108 (remaining_time, new_offset)
110 """
111 if len(data) < offset + 2:
112 return None, offset
113 return DataParser.parse_int16(data, offset, signed=False), offset + 2
116# ---------------------------------------------------------------------------
117# Encode helpers
118# ---------------------------------------------------------------------------
121def encode_energy_triplet(
122 total_energy: int | None,
123 energy_per_hour: int | None,
124 energy_per_minute: int | None,
125) -> bytearray:
126 """Encode the shared Energy triplet (Total + Per Hour + Per Minute).
128 Args:
129 total_energy: Total energy in kcal (uint16).
130 energy_per_hour: Energy per hour in kcal (uint16).
131 energy_per_minute: Energy per minute in kcal (uint8).
133 Returns:
134 5-byte bytearray (uint16 + uint16 + uint8).
136 """
137 result = bytearray()
138 result.extend(DataParser.encode_int16(total_energy if total_energy is not None else 0, signed=False))
139 result.extend(DataParser.encode_int16(energy_per_hour if energy_per_hour is not None else 0, signed=False))
140 result.extend(DataParser.encode_int8(energy_per_minute if energy_per_minute is not None else 0, signed=False))
141 return result
144def encode_heart_rate(heart_rate: int) -> bytearray:
145 """Encode the shared Heart Rate field (uint8, bpm).
147 Args:
148 heart_rate: Heart rate in bpm (uint8).
150 Returns:
151 1-byte bytearray.
153 """
154 return DataParser.encode_int8(heart_rate, signed=False)
157def encode_metabolic_equivalent(metabolic_equivalent: float) -> bytearray:
158 """Encode the shared Metabolic Equivalent field (uint8, M=1 d=-1 b=0).
160 Args:
161 metabolic_equivalent: Metabolic equivalent value (real).
163 Returns:
164 1-byte bytearray.
166 """
167 raw = round(metabolic_equivalent * MET_RESOLUTION)
168 return DataParser.encode_int8(raw, signed=False)
171def encode_elapsed_time(elapsed_time: int) -> bytearray:
172 """Encode the shared Elapsed Time field (uint16, seconds).
174 Args:
175 elapsed_time: Elapsed time in seconds (uint16).
177 Returns:
178 2-byte bytearray.
180 """
181 return DataParser.encode_int16(elapsed_time, signed=False)
184def encode_remaining_time(remaining_time: int) -> bytearray:
185 """Encode the shared Remaining Time field (uint16, seconds).
187 Args:
188 remaining_time: Remaining time in seconds (uint16).
190 Returns:
191 2-byte bytearray.
193 """
194 return DataParser.encode_int16(remaining_time, signed=False)