Coverage for src / bluetooth_sig / gatt / characteristics / fitness_machine_common.py: 100%

42 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 11:17 +0000

1"""Shared parsing/encoding utilities for Fitness Machine Service characteristics. 

2 

3All six Fitness Machine data characteristics (treadmill, indoor bike, cross 

4trainer, rower, stair climber, step climber) share the same trailing optional 

5field blocks: energy triplet, heart rate, metabolic equivalent, elapsed time, 

6and remaining time. This module provides reusable helpers so each 

7characteristic only has to implement its own unique fields. 

8 

9References: 

10 Bluetooth SIG Fitness Machine Service 1.0 

11 org.bluetooth.characteristic.{treadmill,indoor_bike,cross_trainer, 

12 rower,stair_climber,step_climber}_data YAML specs in GSS submodule 

13""" 

14 

15from __future__ import annotations 

16 

17from .utils import DataParser 

18 

19# --------------------------------------------------------------------------- 

20# Scaling constants (from YAML M/d/b parameters) 

21# --------------------------------------------------------------------------- 

22MET_RESOLUTION = 10.0 # M=1, d=-1, b=0 -> raw / 10 

23 

24# --------------------------------------------------------------------------- 

25# Decode helpers 

26# --------------------------------------------------------------------------- 

27 

28 

29def decode_energy_triplet(data: bytearray, offset: int) -> tuple[int | None, int | None, int | None, int]: 

30 """Decode the shared Energy triplet (Total + Per Hour + Per Minute). 

31 

32 Wire format: uint16 + uint16 + uint8 = 5 bytes, all gated by a single 

33 flag bit. 

34 

35 Args: 

36 data: Raw BLE bytes. 

37 offset: Current read position. 

38 

39 Returns: 

40 (total_energy, energy_per_hour, energy_per_minute, new_offset) 

41 

42 """ 

43 if len(data) < offset + 5: 

44 return None, None, None, offset 

45 total_energy = DataParser.parse_int16(data, offset, signed=False) 

46 energy_per_hour = DataParser.parse_int16(data, offset + 2, signed=False) 

47 energy_per_minute = DataParser.parse_int8(data, offset + 4, signed=False) 

48 return total_energy, energy_per_hour, energy_per_minute, offset + 5 

49 

50 

51def decode_heart_rate(data: bytearray, offset: int) -> tuple[int | None, int]: 

52 """Decode the shared Heart Rate field (uint8, bpm). 

53 

54 Args: 

55 data: Raw BLE bytes. 

56 offset: Current read position. 

57 

58 Returns: 

59 (heart_rate, new_offset) 

60 

61 """ 

62 if len(data) < offset + 1: 

63 return None, offset 

64 return DataParser.parse_int8(data, offset, signed=False), offset + 1 

65 

66 

67def decode_metabolic_equivalent(data: bytearray, offset: int) -> tuple[float | None, int]: 

68 """Decode the shared Metabolic Equivalent field (uint8, M=1 d=-1 b=0). 

69 

70 Args: 

71 data: Raw BLE bytes. 

72 offset: Current read position. 

73 

74 Returns: 

75 (metabolic_equivalent, new_offset) 

76 

77 """ 

78 if len(data) < offset + 1: 

79 return None, offset 

80 raw = DataParser.parse_int8(data, offset, signed=False) 

81 return raw / MET_RESOLUTION, offset + 1 

82 

83 

84def decode_elapsed_time(data: bytearray, offset: int) -> tuple[int | None, int]: 

85 """Decode the shared Elapsed Time field (uint16, seconds). 

86 

87 Args: 

88 data: Raw BLE bytes. 

89 offset: Current read position. 

90 

91 Returns: 

92 (elapsed_time, new_offset) 

93 

94 """ 

95 if len(data) < offset + 2: 

96 return None, offset 

97 return DataParser.parse_int16(data, offset, signed=False), offset + 2 

98 

99 

100def decode_remaining_time(data: bytearray, offset: int) -> tuple[int | None, int]: 

101 """Decode the shared Remaining Time field (uint16, seconds). 

102 

103 Args: 

104 data: Raw BLE bytes. 

105 offset: Current read position. 

106 

107 Returns: 

108 (remaining_time, new_offset) 

109 

110 """ 

111 if len(data) < offset + 2: 

112 return None, offset 

113 return DataParser.parse_int16(data, offset, signed=False), offset + 2 

114 

115 

116# --------------------------------------------------------------------------- 

117# Encode helpers 

118# --------------------------------------------------------------------------- 

119 

120 

121def encode_energy_triplet( 

122 total_energy: int | None, 

123 energy_per_hour: int | None, 

124 energy_per_minute: int | None, 

125) -> bytearray: 

126 """Encode the shared Energy triplet (Total + Per Hour + Per Minute). 

127 

128 Args: 

129 total_energy: Total energy in kcal (uint16). 

130 energy_per_hour: Energy per hour in kcal (uint16). 

131 energy_per_minute: Energy per minute in kcal (uint8). 

132 

133 Returns: 

134 5-byte bytearray (uint16 + uint16 + uint8). 

135 

136 """ 

137 result = bytearray() 

138 result.extend(DataParser.encode_int16(total_energy if total_energy is not None else 0, signed=False)) 

139 result.extend(DataParser.encode_int16(energy_per_hour if energy_per_hour is not None else 0, signed=False)) 

140 result.extend(DataParser.encode_int8(energy_per_minute if energy_per_minute is not None else 0, signed=False)) 

141 return result 

142 

143 

144def encode_heart_rate(heart_rate: int) -> bytearray: 

145 """Encode the shared Heart Rate field (uint8, bpm). 

146 

147 Args: 

148 heart_rate: Heart rate in bpm (uint8). 

149 

150 Returns: 

151 1-byte bytearray. 

152 

153 """ 

154 return DataParser.encode_int8(heart_rate, signed=False) 

155 

156 

157def encode_metabolic_equivalent(metabolic_equivalent: float) -> bytearray: 

158 """Encode the shared Metabolic Equivalent field (uint8, M=1 d=-1 b=0). 

159 

160 Args: 

161 metabolic_equivalent: Metabolic equivalent value (real). 

162 

163 Returns: 

164 1-byte bytearray. 

165 

166 """ 

167 raw = round(metabolic_equivalent * MET_RESOLUTION) 

168 return DataParser.encode_int8(raw, signed=False) 

169 

170 

171def encode_elapsed_time(elapsed_time: int) -> bytearray: 

172 """Encode the shared Elapsed Time field (uint16, seconds). 

173 

174 Args: 

175 elapsed_time: Elapsed time in seconds (uint16). 

176 

177 Returns: 

178 2-byte bytearray. 

179 

180 """ 

181 return DataParser.encode_int16(elapsed_time, signed=False) 

182 

183 

184def encode_remaining_time(remaining_time: int) -> bytearray: 

185 """Encode the shared Remaining Time field (uint16, seconds). 

186 

187 Args: 

188 remaining_time: Remaining time in seconds (uint16). 

189 

190 Returns: 

191 2-byte bytearray. 

192 

193 """ 

194 return DataParser.encode_int16(remaining_time, signed=False)