Coverage for src / bluetooth_sig / gatt / characteristics / cycling_power_vector.py: 91%
90 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 20:14 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 20:14 +0000
1"""Cycling Power Vector characteristic implementation."""
3from __future__ import annotations
5from enum import IntFlag
7import msgspec
9from ..constants import SINT16_MAX, SINT16_MIN, UINT8_MAX
10from ..context import CharacteristicContext
11from .base import BaseCharacteristic
12from .utils import DataParser
15class CyclingPowerVectorFlags(IntFlag):
16 """Cycling Power Vector flags as per Bluetooth SIG specification."""
18 INSTANTANEOUS_FORCE_MAGNITUDE_ARRAY_PRESENT = 0x01
19 INSTANTANEOUS_TORQUE_MAGNITUDE_ARRAY_PRESENT = 0x02
22class CrankRevolutionData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods
23 """Crank revolution data from cycling power vector."""
25 crank_revolutions: int
26 last_crank_event_time: float # in seconds
29class CyclingPowerVectorData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods
30 """Parsed data from Cycling Power Vector characteristic.
32 Used for both parsing and encoding - all fields are properly typed.
33 """
35 flags: CyclingPowerVectorFlags
36 crank_revolution_data: CrankRevolutionData
37 first_crank_measurement_angle: float
38 instantaneous_force_magnitude_array: tuple[float, ...] | None = None
39 instantaneous_torque_magnitude_array: tuple[float, ...] | None = None
41 def __post_init__(self) -> None:
42 """Validate cycling power vector data."""
43 if not 0 <= int(self.flags) <= UINT8_MAX:
44 raise ValueError("Flags must be a uint8 value (0-UINT8_MAX)")
45 if not 0 <= self.first_crank_measurement_angle <= 360:
46 raise ValueError("First crank measurement angle must be 0-360 degrees")
49class CyclingPowerVectorCharacteristic(BaseCharacteristic[CyclingPowerVectorData]):
50 """Cycling Power Vector characteristic (0x2A64).
52 Used to transmit detailed cycling power vector data including force
53 and torque measurements at different crank angles.
54 """
56 # Variable length: min 7 bytes (flags:1 + crank_revs:2 + crank_time:2 + angle:2), plus optional arrays
57 min_length = 7
58 allow_variable_length = True
60 _manual_unit: str = "various" # Multiple units in vector data
62 def _decode_value(self, data: bytearray, ctx: CharacteristicContext | None = None) -> CyclingPowerVectorData:
63 """Parse cycling power vector data according to Bluetooth specification.
65 Format: Flags(1) + Crank Revolution Data(2) + Last Crank Event Time(2) +
66 First Crank Measurement Angle(2) + [Instantaneous Force Magnitude Array] +
67 [Instantaneous Torque Magnitude Array]
69 Args:
70 data: Raw bytearray from BLE characteristic.
71 ctx: Optional CharacteristicContext providing surrounding context (may be None).
73 Returns:
74 CyclingPowerVectorData containing parsed cycling power vector data.
76 # `ctx` is intentionally unused in this implementation; mark as used
77 # so linters do not report an unused-argument error.
78 del ctx
79 Raises:
80 ValueError: If data format is invalid.
82 """
83 if len(data) < 7:
84 raise ValueError("Cycling Power Vector data must be at least 7 bytes")
86 flags = CyclingPowerVectorFlags(data[0])
88 # Parse crank revolution data (2 bytes)
89 crank_revolutions = DataParser.parse_int16(data, 1, signed=False)
91 # Parse last crank event time (2 bytes, 1/1024 second units)
92 crank_event_time_raw = DataParser.parse_int16(data, 3, signed=False)
93 crank_event_time = crank_event_time_raw / 1024.0
95 # Parse first crank measurement angle (2 bytes, 1/180 degree units)
96 first_angle_raw = DataParser.parse_int16(data, 5, signed=False)
97 first_angle = first_angle_raw / 180.0 # Convert to degrees
99 # Create crank revolution data
100 crank_revolution_data = CrankRevolutionData(
101 crank_revolutions=crank_revolutions, last_crank_event_time=crank_event_time
102 )
104 offset = 7
105 force_magnitudes_list: list[float] = []
106 torque_magnitudes_list: list[float] = []
108 # Parse optional instantaneous force magnitude array if present
109 if (flags & CyclingPowerVectorFlags.INSTANTANEOUS_FORCE_MAGNITUDE_ARRAY_PRESENT) and len(data) > offset:
110 # Each force magnitude is 2 bytes (signed 16-bit, 1 N units)
111 while offset + 1 < len(data) and not (
112 flags & CyclingPowerVectorFlags.INSTANTANEOUS_TORQUE_MAGNITUDE_ARRAY_PRESENT
113 ): # Stop if torque data follows
114 if offset + 2 > len(data):
115 break
116 force_raw = DataParser.parse_int16(data, offset, signed=True)
117 force_magnitudes_list.append(float(force_raw)) # Force in Newtons
118 offset += 2
120 # Parse optional instantaneous torque magnitude array if present
121 if (flags & CyclingPowerVectorFlags.INSTANTANEOUS_TORQUE_MAGNITUDE_ARRAY_PRESENT) and len(data) > offset:
122 # Each torque magnitude is 2 bytes (signed 16-bit, 1/32 Nm units)
123 while offset + 1 < len(data):
124 if offset + 2 > len(data):
125 break
126 torque_raw = DataParser.parse_int16(data, offset, signed=True)
127 torque_magnitudes_list.append(torque_raw / 32.0) # Convert to Nm
128 offset += 2
130 return CyclingPowerVectorData(
131 flags=flags,
132 crank_revolution_data=crank_revolution_data,
133 first_crank_measurement_angle=first_angle,
134 instantaneous_force_magnitude_array=tuple(force_magnitudes_list) if force_magnitudes_list else None,
135 instantaneous_torque_magnitude_array=tuple(torque_magnitudes_list) if torque_magnitudes_list else None,
136 )
138 def _encode_value(self, data: CyclingPowerVectorData) -> bytearray: # pylint: disable=too-many-branches # Complex cycling power vector with optional fields
139 """Encode cycling power vector value back to bytes.
141 Args:
142 data: CyclingPowerVectorData containing cycling power vector data
144 Returns:
145 Encoded bytes representing the power vector
147 """
148 if not isinstance(data, CyclingPowerVectorData):
149 raise TypeError(f"Cycling power vector data must be a CyclingPowerVectorData, got {type(data).__name__}")
151 # Extract values from dataclass
152 crank_revolutions = data.crank_revolution_data.crank_revolutions
153 crank_event_time = data.crank_revolution_data.last_crank_event_time
154 first_angle = data.first_crank_measurement_angle
156 # Build flags based on optional arrays
157 flags = data.flags
158 if data.instantaneous_force_magnitude_array is not None:
159 flags |= (
160 CyclingPowerVectorFlags.INSTANTANEOUS_FORCE_MAGNITUDE_ARRAY_PRESENT
161 ) # Force magnitude array present
162 if data.instantaneous_torque_magnitude_array is not None:
163 flags |= (
164 CyclingPowerVectorFlags.INSTANTANEOUS_TORQUE_MAGNITUDE_ARRAY_PRESENT
165 ) # Torque magnitude array present
167 # Convert values to raw format
168 crank_event_time_raw = round(crank_event_time * 1024) # 1/1024 second units
169 first_angle_raw = round(first_angle * 180) # 1/180 degree units
171 # Validate ranges
172 if not 0 <= crank_revolutions <= 0xFFFF:
173 raise ValueError(f"Crank revolutions {crank_revolutions} exceeds uint16 range")
174 if not 0 <= crank_event_time_raw <= 0xFFFF:
175 raise ValueError(f"Crank event time {crank_event_time_raw} exceeds uint16 range")
176 if not 0 <= first_angle_raw <= 0xFFFF:
177 raise ValueError(f"First angle {first_angle_raw} exceeds uint16 range")
179 # Build result
180 result = bytearray([int(flags)])
181 result.extend(DataParser.encode_int16(crank_revolutions, signed=False))
182 result.extend(DataParser.encode_int16(crank_event_time_raw, signed=False))
183 result.extend(DataParser.encode_int16(first_angle_raw, signed=False))
185 # Add force magnitude array if present
186 if data.instantaneous_force_magnitude_array is not None:
187 for force in data.instantaneous_force_magnitude_array:
188 force_val = int(force)
189 if SINT16_MIN <= force_val <= SINT16_MAX: # signed 16-bit range
190 result.extend(DataParser.encode_int16(force_val, signed=True))
192 # Add torque magnitude array if present
193 if data.instantaneous_torque_magnitude_array is not None:
194 for torque in data.instantaneous_torque_magnitude_array:
195 torque_val = int(torque * 32) # Convert back to 1/32 Nm units
196 if SINT16_MIN <= torque_val <= SINT16_MAX: # signed 16-bit range
197 result.extend(DataParser.encode_int16(torque_val, signed=True))
199 return result