Coverage for src / bluetooth_sig / gatt / characteristics / cycling_power_vector.py: 95%
78 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-03 16:41 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-03 16:41 +0000
1"""Cycling Power Vector characteristic implementation."""
3from __future__ import annotations
5from enum import IntFlag
7import msgspec
9from ..constants import SINT16_MAX, SINT16_MIN
10from ..context import CharacteristicContext
11from .base import BaseCharacteristic
12from .utils import DataParser
15class CyclingPowerVectorFlags(IntFlag):
16 """Cycling Power Vector flags as per CPS v1.1 Table 3.7."""
18 CRANK_REVOLUTION_DATA_PRESENT = 0x01
19 FIRST_CRANK_MEASUREMENT_ANGLE_PRESENT = 0x02
20 INSTANTANEOUS_FORCE_MAGNITUDE_ARRAY_PRESENT = 0x04
21 INSTANTANEOUS_TORQUE_MAGNITUDE_ARRAY_PRESENT = 0x08
22 INSTANTANEOUS_MEASUREMENT_DIRECTION_BIT0 = 0x10
23 INSTANTANEOUS_MEASUREMENT_DIRECTION_BIT1 = 0x20
26class CrankRevolutionData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods
27 """Crank revolution data from cycling power vector."""
29 crank_revolutions: int
30 last_crank_event_time: float # in seconds
33class CyclingPowerVectorData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods
34 """Parsed data from Cycling Power Vector characteristic.
36 Used for both parsing and encoding - all fields are properly typed.
37 """
39 flags: CyclingPowerVectorFlags
40 crank_revolution_data: CrankRevolutionData | None = None
41 first_crank_measurement_angle: float | None = None
42 instantaneous_force_magnitude_array: tuple[float, ...] | None = None
43 instantaneous_torque_magnitude_array: tuple[float, ...] | None = None
44 instantaneous_measurement_direction: int = 0
47class CyclingPowerVectorCharacteristic(BaseCharacteristic[CyclingPowerVectorData]):
48 """Cycling Power Vector characteristic (0x2A64).
50 Used to transmit detailed cycling power vector data including force
51 and torque measurements at different crank angles.
52 """
54 # Variable length: min 1 byte (flags only), optional crank data + angle + arrays
55 min_length = 1
56 allow_variable_length = True
58 _manual_unit: str = "various" # Multiple units in vector data
60 _DIRECTION_MASK = 0x30
61 _DIRECTION_SHIFT = 4
63 def _decode_value(
64 self, data: bytearray, ctx: CharacteristicContext | None = None, *, validate: bool = True
65 ) -> CyclingPowerVectorData: # pylint: disable=too-many-locals # Vector data with multiple array fields
66 """Parse cycling power vector data according to CPS v1.1.
68 Format: Flags(1) + [Crank Revolutions(2) + Last Crank Event Time(2)] +
69 [First Crank Measurement Angle(2)] + [Force Magnitude Array(sint16[])] +
70 [Torque Magnitude Array(sint16[])]
72 Args:
73 data: Raw bytearray from BLE characteristic.
74 ctx: Optional CharacteristicContext providing surrounding context (may be None).
75 validate: Whether to validate ranges (default True)
77 Returns:
78 CyclingPowerVectorData containing parsed cycling power vector data.
80 """
81 flags = CyclingPowerVectorFlags(data[0])
82 offset = 1
84 crank_revolution_data: CrankRevolutionData | None = None
85 first_crank_measurement_angle: float | None = None
87 # Parse crank revolution data if present (bit 0)
88 if (flags & CyclingPowerVectorFlags.CRANK_REVOLUTION_DATA_PRESENT) and len(data) >= offset + 4:
89 crank_revolutions = DataParser.parse_int16(data, offset, signed=False)
90 crank_event_time_raw = DataParser.parse_int16(data, offset + 2, signed=False)
91 crank_revolution_data = CrankRevolutionData(
92 crank_revolutions=crank_revolutions,
93 last_crank_event_time=crank_event_time_raw / 1024.0,
94 )
95 offset += 4
97 # Parse first crank measurement angle if present (bit 1)
98 if (flags & CyclingPowerVectorFlags.FIRST_CRANK_MEASUREMENT_ANGLE_PRESENT) and len(data) >= offset + 2:
99 first_crank_measurement_angle = DataParser.parse_int16(data, offset, signed=False) / 1.0
100 offset += 2
102 # Extract instantaneous measurement direction (bits 4-5)
103 direction = (int(flags) & self._DIRECTION_MASK) >> self._DIRECTION_SHIFT
105 force_magnitudes_list: list[float] = []
106 torque_magnitudes_list: list[float] = []
108 # Parse force magnitude array if present (bit 2)
109 if flags & CyclingPowerVectorFlags.INSTANTANEOUS_FORCE_MAGNITUDE_ARRAY_PRESENT:
110 while offset + 2 <= len(data):
111 force_raw = DataParser.parse_int16(data, offset, signed=True)
112 force_magnitudes_list.append(float(force_raw))
113 offset += 2
115 # Parse torque magnitude array if present (bit 3)
116 if flags & CyclingPowerVectorFlags.INSTANTANEOUS_TORQUE_MAGNITUDE_ARRAY_PRESENT:
117 while offset + 2 <= len(data):
118 torque_raw = DataParser.parse_int16(data, offset, signed=True)
119 torque_magnitudes_list.append(torque_raw / 32.0)
120 offset += 2
122 return CyclingPowerVectorData(
123 flags=flags,
124 crank_revolution_data=crank_revolution_data,
125 first_crank_measurement_angle=first_crank_measurement_angle,
126 instantaneous_force_magnitude_array=tuple(force_magnitudes_list) if force_magnitudes_list else None,
127 instantaneous_torque_magnitude_array=tuple(torque_magnitudes_list) if torque_magnitudes_list else None,
128 instantaneous_measurement_direction=direction,
129 )
131 def _encode_value(self, data: CyclingPowerVectorData) -> bytearray: # pylint: disable=too-many-branches # Complex cycling power vector with optional fields
132 """Encode cycling power vector value back to bytes.
134 Args:
135 data: CyclingPowerVectorData containing cycling power vector data
137 Returns:
138 Encoded bytes representing the power vector
140 """
141 flags = int(data.flags)
143 result = bytearray([flags])
145 # Encode crank revolution data if present
146 if data.crank_revolution_data is not None:
147 crank_revolutions = data.crank_revolution_data.crank_revolutions
148 crank_event_time_raw = round(data.crank_revolution_data.last_crank_event_time * 1024)
149 result.extend(DataParser.encode_int16(crank_revolutions, signed=False))
150 result.extend(DataParser.encode_int16(crank_event_time_raw, signed=False))
152 # Encode first crank measurement angle if present
153 if data.first_crank_measurement_angle is not None:
154 result.extend(DataParser.encode_int16(round(data.first_crank_measurement_angle), signed=False))
156 # Encode force magnitude array if present
157 if data.instantaneous_force_magnitude_array is not None:
158 for force in data.instantaneous_force_magnitude_array:
159 force_val = int(force)
160 if SINT16_MIN <= force_val <= SINT16_MAX:
161 result.extend(DataParser.encode_int16(force_val, signed=True))
163 # Encode torque magnitude array if present
164 if data.instantaneous_torque_magnitude_array is not None:
165 for torque in data.instantaneous_torque_magnitude_array:
166 torque_val = int(torque * 32)
167 if SINT16_MIN <= torque_val <= SINT16_MAX:
168 result.extend(DataParser.encode_int16(torque_val, signed=True))
170 return result