Coverage for src/bluetooth_sig/gatt/characteristics/cycling_power_control_point.py: 89%
139 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 00:10 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 00:10 +0000
1"""Cycling Power Control Point characteristic implementation."""
3from __future__ import annotations
5from enum import IntEnum
7import msgspec
9from ..constants import UINT8_MAX
10from ..context import CharacteristicContext
11from .base import BaseCharacteristic
12from .utils import DataParser
15class OpCodeParameters(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods
16 """Parsed operation code parameters."""
18 cumulative_value: int | None
19 sensor_location: int | None
20 crank_length: float | None
21 chain_length: float | None
22 chain_weight: float | None
23 span_length: int | None
24 measurement_mask: int | None
25 request_op_code: CyclingPowerOpCode | None
26 response_value: CyclingPowerResponseValue | None
29class CyclingPowerControlPointData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods,too-many-instance-attributes
30 """Parsed data from Cycling Power Control Point characteristic."""
32 op_code: CyclingPowerOpCode
33 # Optional parameters based on op_code
34 cumulative_value: int | None = None
35 sensor_location: int | None = None
36 crank_length: float | None = None # mm
37 chain_length: float | None = None # mm
38 chain_weight: float | None = None # g
39 span_length: int | None = None # mm
40 measurement_mask: int | None = None
41 request_op_code: CyclingPowerOpCode | None = None
42 response_value: CyclingPowerResponseValue | None = None
44 def __post_init__(self) -> None:
45 """Validate cycling power control point data."""
46 if not 0 <= self.op_code <= UINT8_MAX:
47 raise ValueError("Op code must be a uint8 value (0-UINT8_MAX)")
50class CyclingPowerOpCode(IntEnum):
51 """Cycling Power Control Point operation codes as per Bluetooth SIG specification."""
53 # Value 0x00 is Reserved for Future Use
54 SET_CUMULATIVE_VALUE = 0x01
55 UPDATE_SENSOR_LOCATION = 0x02
56 REQUEST_SUPPORTED_SENSOR_LOCATIONS = 0x03
57 SET_CRANK_LENGTH = 0x04
58 REQUEST_CRANK_LENGTH = 0x05
59 SET_CHAIN_LENGTH = 0x06
60 REQUEST_CHAIN_LENGTH = 0x07
61 SET_CHAIN_WEIGHT = 0x08
62 REQUEST_CHAIN_WEIGHT = 0x09
63 SET_SPAN_LENGTH = 0x0A
64 REQUEST_SPAN_LENGTH = 0x0B
65 START_OFFSET_COMPENSATION = 0x0C
66 MASK_CYCLING_POWER_MEASUREMENT = 0x0D
67 REQUEST_SAMPLING_RATE = 0x0E
68 REQUEST_FACTORY_CALIBRATION_DATE = 0x0F
69 # Values 0x10-0x1F are Reserved for Future Use
70 RESPONSE_CODE = 0x20
71 # Values 0x21-0xFF are Reserved for Future Use
73 def __str__(self) -> str:
74 """Return human-readable operation code name."""
75 names = {
76 self.SET_CUMULATIVE_VALUE: "Set Cumulative Value",
77 self.UPDATE_SENSOR_LOCATION: "Update Sensor Location",
78 self.REQUEST_SUPPORTED_SENSOR_LOCATIONS: "Request Supported Sensor Locations",
79 self.SET_CRANK_LENGTH: "Set Crank Length",
80 self.REQUEST_CRANK_LENGTH: "Request Crank Length",
81 self.SET_CHAIN_LENGTH: "Set Chain Length",
82 self.REQUEST_CHAIN_LENGTH: "Request Chain Length",
83 self.SET_CHAIN_WEIGHT: "Set Chain Weight",
84 self.REQUEST_CHAIN_WEIGHT: "Request Chain Weight",
85 self.SET_SPAN_LENGTH: "Set Span Length",
86 self.REQUEST_SPAN_LENGTH: "Request Span Length",
87 self.START_OFFSET_COMPENSATION: "Start Offset Compensation",
88 self.MASK_CYCLING_POWER_MEASUREMENT: "Mask Cycling Power Measurement",
89 self.REQUEST_SAMPLING_RATE: "Request Sampling Rate",
90 self.REQUEST_FACTORY_CALIBRATION_DATE: "Request Factory Calibration Date",
91 self.RESPONSE_CODE: "Response Code",
92 }
93 return names[self]
96# Constants
97MIN_OP_CODE_LENGTH = 1 # Minimum length for op code data
100class CyclingPowerResponseValue(IntEnum):
101 """Cycling Power Control Point response values as per Bluetooth SIG specification."""
103 # Value 0x00 is Reserved for Future Use
104 SUCCESS = 0x01
105 OP_CODE_NOT_SUPPORTED = 0x02
106 INVALID_PARAMETER = 0x03
107 OPERATION_FAILED = 0x04
108 # Values 0x05-0xFF are Reserved for Future Use
110 def __str__(self) -> str:
111 """Return human-readable response value name."""
112 names = {
113 self.SUCCESS: "Success",
114 self.OP_CODE_NOT_SUPPORTED: "Op Code Not Supported",
115 self.INVALID_PARAMETER: "Invalid Parameter",
116 self.OPERATION_FAILED: "Operation Failed",
117 }
118 return names[self]
121class CyclingPowerControlPointCharacteristic(BaseCharacteristic):
122 """Cycling Power Control Point characteristic (0x2A66).
124 Used for control and configuration of cycling power sensors.
125 Provides commands for calibration, configuration, and sensor
126 control.
127 """
129 # Resolution constants for parameter calculations
130 CRANK_LENGTH_RESOLUTION = 2.0 # 0.5mm resolution (value / 2)
131 CHAIN_LENGTH_RESOLUTION = 10.0 # 0.1mm resolution (value / 10)
132 CHAIN_WEIGHT_RESOLUTION = 10.0 # 0.1g resolution (value / 10)
134 # Data length constants
135 MIN_OP_CODE_LENGTH = 1
136 CUMULATIVE_VALUE_LENGTH = 5 # op_code(1) + value(4)
137 SENSOR_LOCATION_LENGTH = 2 # op_code(1) + location(1)
138 TWO_BYTE_PARAM_LENGTH = 3 # op_code(1) + param(2)
139 RESPONSE_CODE_LENGTH = 3 # op_code(1) + request(1) + response(1)
141 def decode_value(self, data: bytearray, ctx: CharacteristicContext | None = None) -> CyclingPowerControlPointData:
142 """Parse cycling power control point data.
144 Format: Op Code(1) + [Request Parameter] or Response Code(1) + [Response Parameter].
146 Args:
147 data: Raw bytearray from BLE characteristic.
148 ctx: Optional CharacteristicContext providing surrounding context (may be None).
150 Returns:
151 CyclingPowerControlPointData containing parsed control point data.
153 Raises:
154 ValueError: If data format is invalid.
156 """
157 if len(data) < MIN_OP_CODE_LENGTH:
158 raise ValueError("Cycling Power Control Point data must be at least 1 byte")
160 op_code = data[0]
162 # Parse additional data based on op code
163 params = (
164 self._parse_op_code_parameters(op_code, data)
165 if len(data) > 1
166 else OpCodeParameters(
167 cumulative_value=None,
168 sensor_location=None,
169 crank_length=None,
170 chain_length=None,
171 chain_weight=None,
172 span_length=None,
173 measurement_mask=None,
174 request_op_code=None,
175 response_value=None,
176 )
177 )
179 # Create struct with all parsed values
180 return CyclingPowerControlPointData(
181 op_code=CyclingPowerOpCode(op_code),
182 cumulative_value=params.cumulative_value,
183 sensor_location=params.sensor_location,
184 crank_length=params.crank_length,
185 chain_length=params.chain_length,
186 chain_weight=params.chain_weight,
187 span_length=params.span_length,
188 measurement_mask=params.measurement_mask,
189 request_op_code=params.request_op_code,
190 response_value=params.response_value,
191 )
193 def encode_value(self, data: CyclingPowerControlPointData | int) -> bytearray:
194 """Encode cycling power control point value back to bytes.
196 Args:
197 data: CyclingPowerControlPointData with op_code and optional parameters, or raw op_code integer
199 Returns:
200 Encoded bytes representing the control point command
202 """
203 if isinstance(data, int):
204 # Simple op code only
205 op_code = data
206 if not 0 <= op_code <= UINT8_MAX:
207 raise ValueError(f"Op code {op_code} exceeds uint8 range")
208 return bytearray([op_code])
210 # Handle dataclass case
211 op_code = data.op_code
212 result = bytearray([op_code])
214 # Add parameters based on op_code and available data
215 if data.cumulative_value is not None:
216 result.extend(DataParser.encode_int32(data.cumulative_value, signed=False))
217 elif data.sensor_location is not None:
218 result.append(data.sensor_location)
219 elif data.crank_length is not None:
220 result.extend(DataParser.encode_int16(int(data.crank_length * self.CRANK_LENGTH_RESOLUTION), signed=False))
221 elif data.chain_length is not None:
222 result.extend(DataParser.encode_int16(int(data.chain_length * self.CHAIN_LENGTH_RESOLUTION), signed=False))
223 elif data.chain_weight is not None:
224 result.extend(DataParser.encode_int16(int(data.chain_weight * self.CHAIN_WEIGHT_RESOLUTION), signed=False))
225 elif data.span_length is not None:
226 result.extend(DataParser.encode_int16(data.span_length, signed=False))
227 elif data.measurement_mask is not None:
228 result.extend(DataParser.encode_int16(data.measurement_mask, signed=False))
229 elif data.request_op_code is not None and data.response_value is not None:
230 result.extend([data.request_op_code.value, data.response_value.value])
232 return result
234 def _parse_op_code_parameters( # pylint: disable=too-many-branches
235 self, op_code: int, data: bytearray
236 ) -> OpCodeParameters:
237 """Parse operation code specific parameters.
239 Args:
240 op_code: Operation code
241 data: Raw data
243 Returns:
244 OpCodeParameters containing all parsed parameters
246 """
247 cumulative_value: int | None = None
248 sensor_location: int | None = None
249 crank_length: float | None = None
250 chain_length: float | None = None
251 chain_weight: float | None = None
252 span_length: int | None = None
253 measurement_mask: int | None = None
254 request_op_code: CyclingPowerOpCode | None = None
255 response_value: CyclingPowerResponseValue | None = None
257 if op_code == CyclingPowerOpCode.SET_CUMULATIVE_VALUE:
258 if len(data) >= self.CUMULATIVE_VALUE_LENGTH:
259 cumulative_value = DataParser.parse_int32(data, offset=1, signed=False)
260 elif op_code == CyclingPowerOpCode.UPDATE_SENSOR_LOCATION:
261 if len(data) >= self.SENSOR_LOCATION_LENGTH:
262 sensor_location = int(data[1])
263 elif op_code == CyclingPowerOpCode.SET_CRANK_LENGTH:
264 if len(data) >= self.TWO_BYTE_PARAM_LENGTH:
265 crank_length_raw = DataParser.parse_int16(data, offset=1, signed=False)
266 crank_length = crank_length_raw / self.CRANK_LENGTH_RESOLUTION
267 elif op_code == CyclingPowerOpCode.SET_CHAIN_LENGTH:
268 if len(data) >= self.TWO_BYTE_PARAM_LENGTH:
269 chain_length_raw = DataParser.parse_int16(data, offset=1, signed=False)
270 chain_length = chain_length_raw / self.CHAIN_LENGTH_RESOLUTION
271 elif op_code == CyclingPowerOpCode.SET_CHAIN_WEIGHT:
272 if len(data) >= self.TWO_BYTE_PARAM_LENGTH:
273 chain_weight_raw = DataParser.parse_int16(data, offset=1, signed=False)
274 chain_weight = chain_weight_raw / self.CHAIN_WEIGHT_RESOLUTION
275 elif op_code == CyclingPowerOpCode.SET_SPAN_LENGTH:
276 if len(data) >= self.TWO_BYTE_PARAM_LENGTH:
277 span_length = DataParser.parse_int16(data, offset=1, signed=False) # mm
278 elif op_code == CyclingPowerOpCode.MASK_CYCLING_POWER_MEASUREMENT:
279 if len(data) >= self.TWO_BYTE_PARAM_LENGTH:
280 measurement_mask = DataParser.parse_int16(data, offset=1, signed=False)
281 elif op_code == CyclingPowerOpCode.RESPONSE_CODE:
282 if len(data) >= self.RESPONSE_CODE_LENGTH:
283 request_op_code = CyclingPowerOpCode(data[1])
284 response_value = CyclingPowerResponseValue(data[2])
286 return OpCodeParameters(
287 cumulative_value=cumulative_value,
288 sensor_location=sensor_location,
289 crank_length=crank_length,
290 chain_length=chain_length,
291 chain_weight=chain_weight,
292 span_length=span_length,
293 measurement_mask=measurement_mask,
294 request_op_code=request_op_code,
295 response_value=response_value,
296 )