Coverage for src / bluetooth_sig / gatt / characteristics / step_climber_data.py: 92%

142 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 11:17 +0000

1"""Step Climber Data characteristic implementation. 

2 

3Implements the Step Climber Data characteristic (0x2ACF) from the Fitness 

4Machine Service. A 16-bit flags field controls the presence of optional 

5data fields. 

6 

7Bit 0 ("More Data") uses **inverted logic**: when bit 0 is 0 **both** 

8Floors and Step Count are present; when bit 0 is 1 they are absent. 

9All other bits use normal logic (1 = present). 

10 

11References: 

12 Bluetooth SIG Fitness Machine Service 1.0 

13 org.bluetooth.characteristic.step_climber_data (GSS YAML) 

14""" 

15 

16from __future__ import annotations 

17 

18from enum import IntFlag 

19 

20import msgspec 

21 

22from ..constants import UINT8_MAX, UINT16_MAX 

23from ..context import CharacteristicContext 

24from .base import BaseCharacteristic 

25from .fitness_machine_common import ( 

26 MET_RESOLUTION, 

27 decode_elapsed_time, 

28 decode_energy_triplet, 

29 decode_heart_rate, 

30 decode_metabolic_equivalent, 

31 decode_remaining_time, 

32 encode_elapsed_time, 

33 encode_energy_triplet, 

34 encode_heart_rate, 

35 encode_metabolic_equivalent, 

36 encode_remaining_time, 

37) 

38from .utils import DataParser 

39 

40 

41class StepClimberDataFlags(IntFlag): 

42 """Step Climber Data flags as per Bluetooth SIG specification. 

43 

44 Bit 0 uses inverted logic: 0 -> Floors + Step Count present, 1 -> absent. 

45 """ 

46 

47 MORE_DATA = 0x0001 # Inverted: 0 -> Floors + Step Count present 

48 STEPS_PER_MINUTE_PRESENT = 0x0002 

49 AVERAGE_STEP_RATE_PRESENT = 0x0004 

50 POSITIVE_ELEVATION_GAIN_PRESENT = 0x0008 

51 EXPENDED_ENERGY_PRESENT = 0x0010 

52 HEART_RATE_PRESENT = 0x0020 

53 METABOLIC_EQUIVALENT_PRESENT = 0x0040 

54 ELAPSED_TIME_PRESENT = 0x0080 

55 REMAINING_TIME_PRESENT = 0x0100 

56 

57 

58class StepClimberData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods,too-many-instance-attributes 

59 """Parsed data from Step Climber Data characteristic. 

60 

61 Attributes: 

62 flags: Raw 16-bit flags field. 

63 floors: Total floors counted (present when bit 0 is 0). 

64 step_count: Total steps counted (present when bit 0 is 0). 

65 steps_per_minute: Step rate in steps/min. 

66 average_step_rate: Average step rate in steps/min. 

67 positive_elevation_gain: Positive elevation gain in metres. 

68 total_energy: Total expended energy in kcal. 

69 energy_per_hour: Expended energy per hour in kcal. 

70 energy_per_minute: Expended energy per minute in kcal. 

71 heart_rate: Heart rate in bpm. 

72 metabolic_equivalent: MET value (0.1 resolution). 

73 elapsed_time: Elapsed time in seconds. 

74 remaining_time: Remaining time in seconds. 

75 

76 """ 

77 

78 flags: StepClimberDataFlags 

79 floors: int | None = None 

80 step_count: int | None = None 

81 steps_per_minute: int | None = None 

82 average_step_rate: int | None = None 

83 positive_elevation_gain: int | None = None 

84 total_energy: int | None = None 

85 energy_per_hour: int | None = None 

86 energy_per_minute: int | None = None 

87 heart_rate: int | None = None 

88 metabolic_equivalent: float | None = None 

89 elapsed_time: int | None = None 

90 remaining_time: int | None = None 

91 

92 def __post_init__(self) -> None: 

93 """Validate field ranges.""" 

94 if self.floors is not None and not 0 <= self.floors <= UINT16_MAX: 

95 raise ValueError(f"Floors must be 0-{UINT16_MAX}, got {self.floors}") 

96 if self.step_count is not None and not 0 <= self.step_count <= UINT16_MAX: 

97 raise ValueError(f"Step count must be 0-{UINT16_MAX}, got {self.step_count}") 

98 if self.steps_per_minute is not None and not 0 <= self.steps_per_minute <= UINT16_MAX: 

99 raise ValueError(f"Steps per minute must be 0-{UINT16_MAX}, got {self.steps_per_minute}") 

100 if self.average_step_rate is not None and not 0 <= self.average_step_rate <= UINT16_MAX: 

101 raise ValueError(f"Average step rate must be 0-{UINT16_MAX}, got {self.average_step_rate}") 

102 if self.positive_elevation_gain is not None and not 0 <= self.positive_elevation_gain <= UINT16_MAX: 

103 raise ValueError(f"Positive elevation gain must be 0-{UINT16_MAX}, got {self.positive_elevation_gain}") 

104 if self.total_energy is not None and not 0 <= self.total_energy <= UINT16_MAX: 

105 raise ValueError(f"Total energy must be 0-{UINT16_MAX}, got {self.total_energy}") 

106 if self.energy_per_hour is not None and not 0 <= self.energy_per_hour <= UINT16_MAX: 

107 raise ValueError(f"Energy per hour must be 0-{UINT16_MAX}, got {self.energy_per_hour}") 

108 if self.energy_per_minute is not None and not 0 <= self.energy_per_minute <= UINT8_MAX: 

109 raise ValueError(f"Energy per minute must be 0-{UINT8_MAX}, got {self.energy_per_minute}") 

110 if self.heart_rate is not None and not 0 <= self.heart_rate <= UINT8_MAX: 

111 raise ValueError(f"Heart rate must be 0-{UINT8_MAX}, got {self.heart_rate}") 

112 if self.metabolic_equivalent is not None and not 0.0 <= self.metabolic_equivalent <= UINT8_MAX / MET_RESOLUTION: 

113 raise ValueError( 

114 f"Metabolic equivalent must be 0.0-{UINT8_MAX / MET_RESOLUTION}, got {self.metabolic_equivalent}" 

115 ) 

116 if self.elapsed_time is not None and not 0 <= self.elapsed_time <= UINT16_MAX: 

117 raise ValueError(f"Elapsed time must be 0-{UINT16_MAX}, got {self.elapsed_time}") 

118 if self.remaining_time is not None and not 0 <= self.remaining_time <= UINT16_MAX: 

119 raise ValueError(f"Remaining time must be 0-{UINT16_MAX}, got {self.remaining_time}") 

120 

121 

122class StepClimberDataCharacteristic(BaseCharacteristic[StepClimberData]): 

123 """Step Climber Data characteristic (0x2ACF). 

124 

125 Used in the Fitness Machine Service to transmit step climber workout 

126 data. A 16-bit flags field controls which optional fields are present. 

127 

128 Flag-bit assignments (from GSS YAML): 

129 Bit 0: More Data -- **inverted**: 0 -> Floors + Step Count present 

130 Bit 1: Steps Per Minute present 

131 Bit 2: Average Step Rate present 

132 Bit 3: Positive Elevation Gain present 

133 Bit 4: Expended Energy present (gates triplet: total + /hr + /min) 

134 Bit 5: Heart Rate present 

135 Bit 6: Metabolic Equivalent present 

136 Bit 7: Elapsed Time present 

137 Bit 8: Remaining Time present 

138 Bits 9-15: Reserved for Future Use 

139 

140 """ 

141 

142 expected_type = StepClimberData 

143 min_length: int = 2 # Flags only (all optional fields absent) 

144 allow_variable_length: bool = True 

145 

146 def _decode_value( 

147 self, 

148 data: bytearray, 

149 ctx: CharacteristicContext | None = None, 

150 *, 

151 validate: bool = True, 

152 ) -> StepClimberData: 

153 """Parse Step Climber Data from raw BLE bytes. 

154 

155 Args: 

156 data: Raw bytearray from BLE characteristic. 

157 ctx: Optional context (unused). 

158 validate: Whether to validate ranges. 

159 

160 Returns: 

161 StepClimberData with all present fields populated. 

162 

163 """ 

164 flags = StepClimberDataFlags(DataParser.parse_int16(data, 0, signed=False)) 

165 offset = 2 

166 

167 # Bit 0 -- inverted: Floors + Step Count present when bit is NOT set 

168 floors = None 

169 step_count = None 

170 if not (flags & StepClimberDataFlags.MORE_DATA) and len(data) >= offset + 4: 

171 floors = DataParser.parse_int16(data, offset, signed=False) 

172 step_count = DataParser.parse_int16(data, offset + 2, signed=False) 

173 offset += 4 

174 

175 # Bit 1 -- Steps Per Minute 

176 steps_per_minute = None 

177 if (flags & StepClimberDataFlags.STEPS_PER_MINUTE_PRESENT) and len(data) >= offset + 2: 

178 steps_per_minute = DataParser.parse_int16(data, offset, signed=False) 

179 offset += 2 

180 

181 # Bit 2 -- Average Step Rate 

182 average_step_rate = None 

183 if (flags & StepClimberDataFlags.AVERAGE_STEP_RATE_PRESENT) and len(data) >= offset + 2: 

184 average_step_rate = DataParser.parse_int16(data, offset, signed=False) 

185 offset += 2 

186 

187 # Bit 3 -- Positive Elevation Gain 

188 positive_elevation_gain = None 

189 if (flags & StepClimberDataFlags.POSITIVE_ELEVATION_GAIN_PRESENT) and len(data) >= offset + 2: 

190 positive_elevation_gain = DataParser.parse_int16(data, offset, signed=False) 

191 offset += 2 

192 

193 # Bit 4 -- Energy triplet (Total + Per Hour + Per Minute) 

194 total_energy = None 

195 energy_per_hour = None 

196 energy_per_minute = None 

197 if flags & StepClimberDataFlags.EXPENDED_ENERGY_PRESENT: 

198 total_energy, energy_per_hour, energy_per_minute, offset = decode_energy_triplet(data, offset) 

199 

200 # Bit 5 -- Heart Rate 

201 heart_rate = None 

202 if flags & StepClimberDataFlags.HEART_RATE_PRESENT: 

203 heart_rate, offset = decode_heart_rate(data, offset) 

204 

205 # Bit 6 -- Metabolic Equivalent 

206 metabolic_equivalent = None 

207 if flags & StepClimberDataFlags.METABOLIC_EQUIVALENT_PRESENT: 

208 metabolic_equivalent, offset = decode_metabolic_equivalent(data, offset) 

209 

210 # Bit 7 -- Elapsed Time 

211 elapsed_time = None 

212 if flags & StepClimberDataFlags.ELAPSED_TIME_PRESENT: 

213 elapsed_time, offset = decode_elapsed_time(data, offset) 

214 

215 # Bit 8 -- Remaining Time 

216 remaining_time = None 

217 if flags & StepClimberDataFlags.REMAINING_TIME_PRESENT: 

218 remaining_time, offset = decode_remaining_time(data, offset) 

219 

220 return StepClimberData( 

221 flags=flags, 

222 floors=floors, 

223 step_count=step_count, 

224 steps_per_minute=steps_per_minute, 

225 average_step_rate=average_step_rate, 

226 positive_elevation_gain=positive_elevation_gain, 

227 total_energy=total_energy, 

228 energy_per_hour=energy_per_hour, 

229 energy_per_minute=energy_per_minute, 

230 heart_rate=heart_rate, 

231 metabolic_equivalent=metabolic_equivalent, 

232 elapsed_time=elapsed_time, 

233 remaining_time=remaining_time, 

234 ) 

235 

236 def _encode_value(self, data: StepClimberData) -> bytearray: 

237 """Encode StepClimberData back to BLE bytes. 

238 

239 Reconstructs flags from present fields so round-trip encoding 

240 preserves the original wire format. 

241 

242 Args: 

243 data: StepClimberData instance. 

244 

245 Returns: 

246 Encoded bytearray matching the BLE wire format. 

247 

248 """ 

249 flags = StepClimberDataFlags(0) 

250 

251 # Bit 0 -- inverted: set MORE_DATA when Floors/Step Count absent 

252 if data.floors is None: 

253 flags |= StepClimberDataFlags.MORE_DATA 

254 if data.steps_per_minute is not None: 

255 flags |= StepClimberDataFlags.STEPS_PER_MINUTE_PRESENT 

256 if data.average_step_rate is not None: 

257 flags |= StepClimberDataFlags.AVERAGE_STEP_RATE_PRESENT 

258 if data.positive_elevation_gain is not None: 

259 flags |= StepClimberDataFlags.POSITIVE_ELEVATION_GAIN_PRESENT 

260 if data.total_energy is not None: 

261 flags |= StepClimberDataFlags.EXPENDED_ENERGY_PRESENT 

262 if data.heart_rate is not None: 

263 flags |= StepClimberDataFlags.HEART_RATE_PRESENT 

264 if data.metabolic_equivalent is not None: 

265 flags |= StepClimberDataFlags.METABOLIC_EQUIVALENT_PRESENT 

266 if data.elapsed_time is not None: 

267 flags |= StepClimberDataFlags.ELAPSED_TIME_PRESENT 

268 if data.remaining_time is not None: 

269 flags |= StepClimberDataFlags.REMAINING_TIME_PRESENT 

270 

271 result = DataParser.encode_int16(int(flags), signed=False) 

272 

273 if data.floors is not None: 

274 result.extend(DataParser.encode_int16(data.floors, signed=False)) 

275 if data.step_count is not None: 

276 result.extend(DataParser.encode_int16(data.step_count, signed=False)) 

277 if data.steps_per_minute is not None: 

278 result.extend(DataParser.encode_int16(data.steps_per_minute, signed=False)) 

279 if data.average_step_rate is not None: 

280 result.extend(DataParser.encode_int16(data.average_step_rate, signed=False)) 

281 if data.positive_elevation_gain is not None: 

282 result.extend(DataParser.encode_int16(data.positive_elevation_gain, signed=False)) 

283 if data.total_energy is not None: 

284 result.extend(encode_energy_triplet(data.total_energy, data.energy_per_hour, data.energy_per_minute)) 

285 if data.heart_rate is not None: 

286 result.extend(encode_heart_rate(data.heart_rate)) 

287 if data.metabolic_equivalent is not None: 

288 result.extend(encode_metabolic_equivalent(data.metabolic_equivalent)) 

289 if data.elapsed_time is not None: 

290 result.extend(encode_elapsed_time(data.elapsed_time)) 

291 if data.remaining_time is not None: 

292 result.extend(encode_remaining_time(data.remaining_time)) 

293 

294 return result