Coverage for src / bluetooth_sig / gatt / characteristics / cycling_power_control_point.py: 89%

141 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 20:14 +0000

1"""Cycling Power Control Point characteristic implementation.""" 

2 

3from __future__ import annotations 

4 

5from enum import IntEnum 

6 

7import msgspec 

8 

9from ..constants import UINT8_MAX 

10from ..context import CharacteristicContext 

11from .base import BaseCharacteristic 

12from .utils import DataParser 

13 

14 

15class OpCodeParameters(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods 

16 """Parsed operation code parameters.""" 

17 

18 cumulative_value: int | None 

19 sensor_location: int | None 

20 crank_length: float | None 

21 chain_length: float | None 

22 chain_weight: float | None 

23 span_length: int | None 

24 measurement_mask: int | None 

25 request_op_code: CyclingPowerOpCode | None 

26 response_value: CyclingPowerResponseValue | None 

27 

28 

29class CyclingPowerControlPointData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods,too-many-instance-attributes 

30 """Parsed data from Cycling Power Control Point characteristic.""" 

31 

32 op_code: CyclingPowerOpCode 

33 # Optional parameters based on op_code 

34 cumulative_value: int | None = None 

35 sensor_location: int | None = None 

36 crank_length: float | None = None # mm 

37 chain_length: float | None = None # mm 

38 chain_weight: float | None = None # g 

39 span_length: int | None = None # mm 

40 measurement_mask: int | None = None 

41 request_op_code: CyclingPowerOpCode | None = None 

42 response_value: CyclingPowerResponseValue | None = None 

43 

44 def __post_init__(self) -> None: 

45 """Validate cycling power control point data.""" 

46 if not 0 <= self.op_code <= UINT8_MAX: 

47 raise ValueError("Op code must be a uint8 value (0-UINT8_MAX)") 

48 

49 

50class CyclingPowerOpCode(IntEnum): 

51 """Cycling Power Control Point operation codes as per Bluetooth SIG specification.""" 

52 

53 # Value 0x00 is Reserved for Future Use 

54 SET_CUMULATIVE_VALUE = 0x01 

55 UPDATE_SENSOR_LOCATION = 0x02 

56 REQUEST_SUPPORTED_SENSOR_LOCATIONS = 0x03 

57 SET_CRANK_LENGTH = 0x04 

58 REQUEST_CRANK_LENGTH = 0x05 

59 SET_CHAIN_LENGTH = 0x06 

60 REQUEST_CHAIN_LENGTH = 0x07 

61 SET_CHAIN_WEIGHT = 0x08 

62 REQUEST_CHAIN_WEIGHT = 0x09 

63 SET_SPAN_LENGTH = 0x0A 

64 REQUEST_SPAN_LENGTH = 0x0B 

65 START_OFFSET_COMPENSATION = 0x0C 

66 MASK_CYCLING_POWER_MEASUREMENT = 0x0D 

67 REQUEST_SAMPLING_RATE = 0x0E 

68 REQUEST_FACTORY_CALIBRATION_DATE = 0x0F 

69 # Values 0x10-0x1F are Reserved for Future Use 

70 RESPONSE_CODE = 0x20 

71 # Values 0x21-0xFF are Reserved for Future Use 

72 

73 def __str__(self) -> str: 

74 """Return human-readable operation code name.""" 

75 names = { 

76 self.SET_CUMULATIVE_VALUE: "Set Cumulative Value", 

77 self.UPDATE_SENSOR_LOCATION: "Update Sensor Location", 

78 self.REQUEST_SUPPORTED_SENSOR_LOCATIONS: "Request Supported Sensor Locations", 

79 self.SET_CRANK_LENGTH: "Set Crank Length", 

80 self.REQUEST_CRANK_LENGTH: "Request Crank Length", 

81 self.SET_CHAIN_LENGTH: "Set Chain Length", 

82 self.REQUEST_CHAIN_LENGTH: "Request Chain Length", 

83 self.SET_CHAIN_WEIGHT: "Set Chain Weight", 

84 self.REQUEST_CHAIN_WEIGHT: "Request Chain Weight", 

85 self.SET_SPAN_LENGTH: "Set Span Length", 

86 self.REQUEST_SPAN_LENGTH: "Request Span Length", 

87 self.START_OFFSET_COMPENSATION: "Start Offset Compensation", 

88 self.MASK_CYCLING_POWER_MEASUREMENT: "Mask Cycling Power Measurement", 

89 self.REQUEST_SAMPLING_RATE: "Request Sampling Rate", 

90 self.REQUEST_FACTORY_CALIBRATION_DATE: "Request Factory Calibration Date", 

91 self.RESPONSE_CODE: "Response Code", 

92 } 

93 return names[self] 

94 

95 

96# Constants 

97MIN_OP_CODE_LENGTH = 1 # Minimum length for op code data 

98 

99 

100class CyclingPowerResponseValue(IntEnum): 

101 """Cycling Power Control Point response values as per Bluetooth SIG specification.""" 

102 

103 # Value 0x00 is Reserved for Future Use 

104 SUCCESS = 0x01 

105 OP_CODE_NOT_SUPPORTED = 0x02 

106 INVALID_PARAMETER = 0x03 

107 OPERATION_FAILED = 0x04 

108 # Values 0x05-0xFF are Reserved for Future Use 

109 

110 def __str__(self) -> str: 

111 """Return human-readable response value name.""" 

112 names = { 

113 self.SUCCESS: "Success", 

114 self.OP_CODE_NOT_SUPPORTED: "Op Code Not Supported", 

115 self.INVALID_PARAMETER: "Invalid Parameter", 

116 self.OPERATION_FAILED: "Operation Failed", 

117 } 

118 return names[self] 

119 

120 

121class CyclingPowerControlPointCharacteristic(BaseCharacteristic[CyclingPowerControlPointData]): 

122 """Cycling Power Control Point characteristic (0x2A66). 

123 

124 Used for control and configuration of cycling power sensors. 

125 Provides commands for calibration, configuration, and sensor 

126 control. 

127 """ 

128 

129 # Variable length: min 1 byte (op_code), variable parameters per op_code 

130 min_length = 1 

131 allow_variable_length = True 

132 

133 # Resolution constants for parameter calculations 

134 CRANK_LENGTH_RESOLUTION = 2.0 # 0.5mm resolution (value / 2) 

135 CHAIN_LENGTH_RESOLUTION = 10.0 # 0.1mm resolution (value / 10) 

136 CHAIN_WEIGHT_RESOLUTION = 10.0 # 0.1g resolution (value / 10) 

137 

138 # Data length constants 

139 MIN_OP_CODE_LENGTH = 1 

140 CUMULATIVE_VALUE_LENGTH = 5 # op_code(1) + value(4) 

141 SENSOR_LOCATION_LENGTH = 2 # op_code(1) + location(1) 

142 TWO_BYTE_PARAM_LENGTH = 3 # op_code(1) + param(2) 

143 RESPONSE_CODE_LENGTH = 3 # op_code(1) + request(1) + response(1) 

144 

145 def _decode_value(self, data: bytearray, ctx: CharacteristicContext | None = None) -> CyclingPowerControlPointData: 

146 """Parse cycling power control point data. 

147 

148 Format: Op Code(1) + [Request Parameter] or Response Code(1) + [Response Parameter]. 

149 

150 Args: 

151 data: Raw bytearray from BLE characteristic. 

152 ctx: Optional CharacteristicContext providing surrounding context (may be None). 

153 

154 Returns: 

155 CyclingPowerControlPointData containing parsed control point data. 

156 

157 Raises: 

158 ValueError: If data format is invalid. 

159 

160 """ 

161 if len(data) < MIN_OP_CODE_LENGTH: 

162 raise ValueError("Cycling Power Control Point data must be at least 1 byte") 

163 

164 op_code = data[0] 

165 

166 # Parse additional data based on op code 

167 params = ( 

168 self._parse_op_code_parameters(op_code, data) 

169 if len(data) > 1 

170 else OpCodeParameters( 

171 cumulative_value=None, 

172 sensor_location=None, 

173 crank_length=None, 

174 chain_length=None, 

175 chain_weight=None, 

176 span_length=None, 

177 measurement_mask=None, 

178 request_op_code=None, 

179 response_value=None, 

180 ) 

181 ) 

182 

183 # Create struct with all parsed values 

184 return CyclingPowerControlPointData( 

185 op_code=CyclingPowerOpCode(op_code), 

186 cumulative_value=params.cumulative_value, 

187 sensor_location=params.sensor_location, 

188 crank_length=params.crank_length, 

189 chain_length=params.chain_length, 

190 chain_weight=params.chain_weight, 

191 span_length=params.span_length, 

192 measurement_mask=params.measurement_mask, 

193 request_op_code=params.request_op_code, 

194 response_value=params.response_value, 

195 ) 

196 

197 def _encode_value(self, data: CyclingPowerControlPointData | int) -> bytearray: 

198 """Encode cycling power control point value back to bytes. 

199 

200 Args: 

201 data: CyclingPowerControlPointData with op_code and optional parameters, or raw op_code integer 

202 

203 Returns: 

204 Encoded bytes representing the control point command 

205 

206 """ 

207 if isinstance(data, int): 

208 # Simple op code only 

209 op_code = data 

210 if not 0 <= op_code <= UINT8_MAX: 

211 raise ValueError(f"Op code {op_code} exceeds uint8 range") 

212 return bytearray([op_code]) 

213 

214 # Handle dataclass case 

215 op_code = data.op_code 

216 result = bytearray([op_code]) 

217 

218 # Add parameters based on op_code and available data 

219 if data.cumulative_value is not None: 

220 result.extend(DataParser.encode_int32(data.cumulative_value, signed=False)) 

221 elif data.sensor_location is not None: 

222 result.append(data.sensor_location) 

223 elif data.crank_length is not None: 

224 result.extend(DataParser.encode_int16(int(data.crank_length * self.CRANK_LENGTH_RESOLUTION), signed=False)) 

225 elif data.chain_length is not None: 

226 result.extend(DataParser.encode_int16(int(data.chain_length * self.CHAIN_LENGTH_RESOLUTION), signed=False)) 

227 elif data.chain_weight is not None: 

228 result.extend(DataParser.encode_int16(int(data.chain_weight * self.CHAIN_WEIGHT_RESOLUTION), signed=False)) 

229 elif data.span_length is not None: 

230 result.extend(DataParser.encode_int16(data.span_length, signed=False)) 

231 elif data.measurement_mask is not None: 

232 result.extend(DataParser.encode_int16(data.measurement_mask, signed=False)) 

233 elif data.request_op_code is not None and data.response_value is not None: 

234 result.extend([data.request_op_code.value, data.response_value.value]) 

235 

236 return result 

237 

238 def _parse_op_code_parameters( # pylint: disable=too-many-branches 

239 self, op_code: int, data: bytearray 

240 ) -> OpCodeParameters: 

241 """Parse operation code specific parameters. 

242 

243 Args: 

244 op_code: Operation code 

245 data: Raw data 

246 

247 Returns: 

248 OpCodeParameters containing all parsed parameters 

249 

250 """ 

251 cumulative_value: int | None = None 

252 sensor_location: int | None = None 

253 crank_length: float | None = None 

254 chain_length: float | None = None 

255 chain_weight: float | None = None 

256 span_length: int | None = None 

257 measurement_mask: int | None = None 

258 request_op_code: CyclingPowerOpCode | None = None 

259 response_value: CyclingPowerResponseValue | None = None 

260 

261 if op_code == CyclingPowerOpCode.SET_CUMULATIVE_VALUE: 

262 if len(data) >= self.CUMULATIVE_VALUE_LENGTH: 

263 cumulative_value = DataParser.parse_int32(data, offset=1, signed=False) 

264 elif op_code == CyclingPowerOpCode.UPDATE_SENSOR_LOCATION: 

265 if len(data) >= self.SENSOR_LOCATION_LENGTH: 

266 sensor_location = int(data[1]) 

267 elif op_code == CyclingPowerOpCode.SET_CRANK_LENGTH: 

268 if len(data) >= self.TWO_BYTE_PARAM_LENGTH: 

269 crank_length_raw = DataParser.parse_int16(data, offset=1, signed=False) 

270 crank_length = crank_length_raw / self.CRANK_LENGTH_RESOLUTION 

271 elif op_code == CyclingPowerOpCode.SET_CHAIN_LENGTH: 

272 if len(data) >= self.TWO_BYTE_PARAM_LENGTH: 

273 chain_length_raw = DataParser.parse_int16(data, offset=1, signed=False) 

274 chain_length = chain_length_raw / self.CHAIN_LENGTH_RESOLUTION 

275 elif op_code == CyclingPowerOpCode.SET_CHAIN_WEIGHT: 

276 if len(data) >= self.TWO_BYTE_PARAM_LENGTH: 

277 chain_weight_raw = DataParser.parse_int16(data, offset=1, signed=False) 

278 chain_weight = chain_weight_raw / self.CHAIN_WEIGHT_RESOLUTION 

279 elif op_code == CyclingPowerOpCode.SET_SPAN_LENGTH: 

280 if len(data) >= self.TWO_BYTE_PARAM_LENGTH: 

281 span_length = DataParser.parse_int16(data, offset=1, signed=False) # mm 

282 elif op_code == CyclingPowerOpCode.MASK_CYCLING_POWER_MEASUREMENT: 

283 if len(data) >= self.TWO_BYTE_PARAM_LENGTH: 

284 measurement_mask = DataParser.parse_int16(data, offset=1, signed=False) 

285 elif op_code == CyclingPowerOpCode.RESPONSE_CODE: 

286 if len(data) >= self.RESPONSE_CODE_LENGTH: 

287 request_op_code = CyclingPowerOpCode(data[1]) 

288 response_value = CyclingPowerResponseValue(data[2]) 

289 

290 return OpCodeParameters( 

291 cumulative_value=cumulative_value, 

292 sensor_location=sensor_location, 

293 crank_length=crank_length, 

294 chain_length=chain_length, 

295 chain_weight=chain_weight, 

296 span_length=span_length, 

297 measurement_mask=measurement_mask, 

298 request_op_code=request_op_code, 

299 response_value=response_value, 

300 )