Coverage for src/bluetooth_sig/gatt/characteristics/cycling_power_control_point.py: 89%

139 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-30 00:10 +0000

1"""Cycling Power Control Point characteristic implementation.""" 

2 

3from __future__ import annotations 

4 

5from enum import IntEnum 

6 

7import msgspec 

8 

9from ..constants import UINT8_MAX 

10from ..context import CharacteristicContext 

11from .base import BaseCharacteristic 

12from .utils import DataParser 

13 

14 

15class OpCodeParameters(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods 

16 """Parsed operation code parameters.""" 

17 

18 cumulative_value: int | None 

19 sensor_location: int | None 

20 crank_length: float | None 

21 chain_length: float | None 

22 chain_weight: float | None 

23 span_length: int | None 

24 measurement_mask: int | None 

25 request_op_code: CyclingPowerOpCode | None 

26 response_value: CyclingPowerResponseValue | None 

27 

28 

29class CyclingPowerControlPointData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods,too-many-instance-attributes 

30 """Parsed data from Cycling Power Control Point characteristic.""" 

31 

32 op_code: CyclingPowerOpCode 

33 # Optional parameters based on op_code 

34 cumulative_value: int | None = None 

35 sensor_location: int | None = None 

36 crank_length: float | None = None # mm 

37 chain_length: float | None = None # mm 

38 chain_weight: float | None = None # g 

39 span_length: int | None = None # mm 

40 measurement_mask: int | None = None 

41 request_op_code: CyclingPowerOpCode | None = None 

42 response_value: CyclingPowerResponseValue | None = None 

43 

44 def __post_init__(self) -> None: 

45 """Validate cycling power control point data.""" 

46 if not 0 <= self.op_code <= UINT8_MAX: 

47 raise ValueError("Op code must be a uint8 value (0-UINT8_MAX)") 

48 

49 

50class CyclingPowerOpCode(IntEnum): 

51 """Cycling Power Control Point operation codes as per Bluetooth SIG specification.""" 

52 

53 # Value 0x00 is Reserved for Future Use 

54 SET_CUMULATIVE_VALUE = 0x01 

55 UPDATE_SENSOR_LOCATION = 0x02 

56 REQUEST_SUPPORTED_SENSOR_LOCATIONS = 0x03 

57 SET_CRANK_LENGTH = 0x04 

58 REQUEST_CRANK_LENGTH = 0x05 

59 SET_CHAIN_LENGTH = 0x06 

60 REQUEST_CHAIN_LENGTH = 0x07 

61 SET_CHAIN_WEIGHT = 0x08 

62 REQUEST_CHAIN_WEIGHT = 0x09 

63 SET_SPAN_LENGTH = 0x0A 

64 REQUEST_SPAN_LENGTH = 0x0B 

65 START_OFFSET_COMPENSATION = 0x0C 

66 MASK_CYCLING_POWER_MEASUREMENT = 0x0D 

67 REQUEST_SAMPLING_RATE = 0x0E 

68 REQUEST_FACTORY_CALIBRATION_DATE = 0x0F 

69 # Values 0x10-0x1F are Reserved for Future Use 

70 RESPONSE_CODE = 0x20 

71 # Values 0x21-0xFF are Reserved for Future Use 

72 

73 def __str__(self) -> str: 

74 """Return human-readable operation code name.""" 

75 names = { 

76 self.SET_CUMULATIVE_VALUE: "Set Cumulative Value", 

77 self.UPDATE_SENSOR_LOCATION: "Update Sensor Location", 

78 self.REQUEST_SUPPORTED_SENSOR_LOCATIONS: "Request Supported Sensor Locations", 

79 self.SET_CRANK_LENGTH: "Set Crank Length", 

80 self.REQUEST_CRANK_LENGTH: "Request Crank Length", 

81 self.SET_CHAIN_LENGTH: "Set Chain Length", 

82 self.REQUEST_CHAIN_LENGTH: "Request Chain Length", 

83 self.SET_CHAIN_WEIGHT: "Set Chain Weight", 

84 self.REQUEST_CHAIN_WEIGHT: "Request Chain Weight", 

85 self.SET_SPAN_LENGTH: "Set Span Length", 

86 self.REQUEST_SPAN_LENGTH: "Request Span Length", 

87 self.START_OFFSET_COMPENSATION: "Start Offset Compensation", 

88 self.MASK_CYCLING_POWER_MEASUREMENT: "Mask Cycling Power Measurement", 

89 self.REQUEST_SAMPLING_RATE: "Request Sampling Rate", 

90 self.REQUEST_FACTORY_CALIBRATION_DATE: "Request Factory Calibration Date", 

91 self.RESPONSE_CODE: "Response Code", 

92 } 

93 return names[self] 

94 

95 

96# Constants 

97MIN_OP_CODE_LENGTH = 1 # Minimum length for op code data 

98 

99 

100class CyclingPowerResponseValue(IntEnum): 

101 """Cycling Power Control Point response values as per Bluetooth SIG specification.""" 

102 

103 # Value 0x00 is Reserved for Future Use 

104 SUCCESS = 0x01 

105 OP_CODE_NOT_SUPPORTED = 0x02 

106 INVALID_PARAMETER = 0x03 

107 OPERATION_FAILED = 0x04 

108 # Values 0x05-0xFF are Reserved for Future Use 

109 

110 def __str__(self) -> str: 

111 """Return human-readable response value name.""" 

112 names = { 

113 self.SUCCESS: "Success", 

114 self.OP_CODE_NOT_SUPPORTED: "Op Code Not Supported", 

115 self.INVALID_PARAMETER: "Invalid Parameter", 

116 self.OPERATION_FAILED: "Operation Failed", 

117 } 

118 return names[self] 

119 

120 

121class CyclingPowerControlPointCharacteristic(BaseCharacteristic): 

122 """Cycling Power Control Point characteristic (0x2A66). 

123 

124 Used for control and configuration of cycling power sensors. 

125 Provides commands for calibration, configuration, and sensor 

126 control. 

127 """ 

128 

129 # Resolution constants for parameter calculations 

130 CRANK_LENGTH_RESOLUTION = 2.0 # 0.5mm resolution (value / 2) 

131 CHAIN_LENGTH_RESOLUTION = 10.0 # 0.1mm resolution (value / 10) 

132 CHAIN_WEIGHT_RESOLUTION = 10.0 # 0.1g resolution (value / 10) 

133 

134 # Data length constants 

135 MIN_OP_CODE_LENGTH = 1 

136 CUMULATIVE_VALUE_LENGTH = 5 # op_code(1) + value(4) 

137 SENSOR_LOCATION_LENGTH = 2 # op_code(1) + location(1) 

138 TWO_BYTE_PARAM_LENGTH = 3 # op_code(1) + param(2) 

139 RESPONSE_CODE_LENGTH = 3 # op_code(1) + request(1) + response(1) 

140 

141 def decode_value(self, data: bytearray, ctx: CharacteristicContext | None = None) -> CyclingPowerControlPointData: 

142 """Parse cycling power control point data. 

143 

144 Format: Op Code(1) + [Request Parameter] or Response Code(1) + [Response Parameter]. 

145 

146 Args: 

147 data: Raw bytearray from BLE characteristic. 

148 ctx: Optional CharacteristicContext providing surrounding context (may be None). 

149 

150 Returns: 

151 CyclingPowerControlPointData containing parsed control point data. 

152 

153 Raises: 

154 ValueError: If data format is invalid. 

155 

156 """ 

157 if len(data) < MIN_OP_CODE_LENGTH: 

158 raise ValueError("Cycling Power Control Point data must be at least 1 byte") 

159 

160 op_code = data[0] 

161 

162 # Parse additional data based on op code 

163 params = ( 

164 self._parse_op_code_parameters(op_code, data) 

165 if len(data) > 1 

166 else OpCodeParameters( 

167 cumulative_value=None, 

168 sensor_location=None, 

169 crank_length=None, 

170 chain_length=None, 

171 chain_weight=None, 

172 span_length=None, 

173 measurement_mask=None, 

174 request_op_code=None, 

175 response_value=None, 

176 ) 

177 ) 

178 

179 # Create struct with all parsed values 

180 return CyclingPowerControlPointData( 

181 op_code=CyclingPowerOpCode(op_code), 

182 cumulative_value=params.cumulative_value, 

183 sensor_location=params.sensor_location, 

184 crank_length=params.crank_length, 

185 chain_length=params.chain_length, 

186 chain_weight=params.chain_weight, 

187 span_length=params.span_length, 

188 measurement_mask=params.measurement_mask, 

189 request_op_code=params.request_op_code, 

190 response_value=params.response_value, 

191 ) 

192 

193 def encode_value(self, data: CyclingPowerControlPointData | int) -> bytearray: 

194 """Encode cycling power control point value back to bytes. 

195 

196 Args: 

197 data: CyclingPowerControlPointData with op_code and optional parameters, or raw op_code integer 

198 

199 Returns: 

200 Encoded bytes representing the control point command 

201 

202 """ 

203 if isinstance(data, int): 

204 # Simple op code only 

205 op_code = data 

206 if not 0 <= op_code <= UINT8_MAX: 

207 raise ValueError(f"Op code {op_code} exceeds uint8 range") 

208 return bytearray([op_code]) 

209 

210 # Handle dataclass case 

211 op_code = data.op_code 

212 result = bytearray([op_code]) 

213 

214 # Add parameters based on op_code and available data 

215 if data.cumulative_value is not None: 

216 result.extend(DataParser.encode_int32(data.cumulative_value, signed=False)) 

217 elif data.sensor_location is not None: 

218 result.append(data.sensor_location) 

219 elif data.crank_length is not None: 

220 result.extend(DataParser.encode_int16(int(data.crank_length * self.CRANK_LENGTH_RESOLUTION), signed=False)) 

221 elif data.chain_length is not None: 

222 result.extend(DataParser.encode_int16(int(data.chain_length * self.CHAIN_LENGTH_RESOLUTION), signed=False)) 

223 elif data.chain_weight is not None: 

224 result.extend(DataParser.encode_int16(int(data.chain_weight * self.CHAIN_WEIGHT_RESOLUTION), signed=False)) 

225 elif data.span_length is not None: 

226 result.extend(DataParser.encode_int16(data.span_length, signed=False)) 

227 elif data.measurement_mask is not None: 

228 result.extend(DataParser.encode_int16(data.measurement_mask, signed=False)) 

229 elif data.request_op_code is not None and data.response_value is not None: 

230 result.extend([data.request_op_code.value, data.response_value.value]) 

231 

232 return result 

233 

234 def _parse_op_code_parameters( # pylint: disable=too-many-branches 

235 self, op_code: int, data: bytearray 

236 ) -> OpCodeParameters: 

237 """Parse operation code specific parameters. 

238 

239 Args: 

240 op_code: Operation code 

241 data: Raw data 

242 

243 Returns: 

244 OpCodeParameters containing all parsed parameters 

245 

246 """ 

247 cumulative_value: int | None = None 

248 sensor_location: int | None = None 

249 crank_length: float | None = None 

250 chain_length: float | None = None 

251 chain_weight: float | None = None 

252 span_length: int | None = None 

253 measurement_mask: int | None = None 

254 request_op_code: CyclingPowerOpCode | None = None 

255 response_value: CyclingPowerResponseValue | None = None 

256 

257 if op_code == CyclingPowerOpCode.SET_CUMULATIVE_VALUE: 

258 if len(data) >= self.CUMULATIVE_VALUE_LENGTH: 

259 cumulative_value = DataParser.parse_int32(data, offset=1, signed=False) 

260 elif op_code == CyclingPowerOpCode.UPDATE_SENSOR_LOCATION: 

261 if len(data) >= self.SENSOR_LOCATION_LENGTH: 

262 sensor_location = int(data[1]) 

263 elif op_code == CyclingPowerOpCode.SET_CRANK_LENGTH: 

264 if len(data) >= self.TWO_BYTE_PARAM_LENGTH: 

265 crank_length_raw = DataParser.parse_int16(data, offset=1, signed=False) 

266 crank_length = crank_length_raw / self.CRANK_LENGTH_RESOLUTION 

267 elif op_code == CyclingPowerOpCode.SET_CHAIN_LENGTH: 

268 if len(data) >= self.TWO_BYTE_PARAM_LENGTH: 

269 chain_length_raw = DataParser.parse_int16(data, offset=1, signed=False) 

270 chain_length = chain_length_raw / self.CHAIN_LENGTH_RESOLUTION 

271 elif op_code == CyclingPowerOpCode.SET_CHAIN_WEIGHT: 

272 if len(data) >= self.TWO_BYTE_PARAM_LENGTH: 

273 chain_weight_raw = DataParser.parse_int16(data, offset=1, signed=False) 

274 chain_weight = chain_weight_raw / self.CHAIN_WEIGHT_RESOLUTION 

275 elif op_code == CyclingPowerOpCode.SET_SPAN_LENGTH: 

276 if len(data) >= self.TWO_BYTE_PARAM_LENGTH: 

277 span_length = DataParser.parse_int16(data, offset=1, signed=False) # mm 

278 elif op_code == CyclingPowerOpCode.MASK_CYCLING_POWER_MEASUREMENT: 

279 if len(data) >= self.TWO_BYTE_PARAM_LENGTH: 

280 measurement_mask = DataParser.parse_int16(data, offset=1, signed=False) 

281 elif op_code == CyclingPowerOpCode.RESPONSE_CODE: 

282 if len(data) >= self.RESPONSE_CODE_LENGTH: 

283 request_op_code = CyclingPowerOpCode(data[1]) 

284 response_value = CyclingPowerResponseValue(data[2]) 

285 

286 return OpCodeParameters( 

287 cumulative_value=cumulative_value, 

288 sensor_location=sensor_location, 

289 crank_length=crank_length, 

290 chain_length=chain_length, 

291 chain_weight=chain_weight, 

292 span_length=span_length, 

293 measurement_mask=measurement_mask, 

294 request_op_code=request_op_code, 

295 response_value=response_value, 

296 )