Coverage for src/bluetooth_sig/gatt/characteristics/cycling_power_vector.py: 92%
88 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 00:10 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 00:10 +0000
1"""Cycling Power Vector characteristic implementation."""
3from __future__ import annotations
5from enum import IntFlag
7import msgspec
9from ..constants import SINT16_MAX, SINT16_MIN, UINT8_MAX
10from ..context import CharacteristicContext
11from .base import BaseCharacteristic
12from .utils import DataParser
15class CyclingPowerVectorFlags(IntFlag):
16 """Cycling Power Vector flags as per Bluetooth SIG specification."""
18 INSTANTANEOUS_FORCE_MAGNITUDE_ARRAY_PRESENT = 0x01
19 INSTANTANEOUS_TORQUE_MAGNITUDE_ARRAY_PRESENT = 0x02
22class CrankRevolutionData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods
23 """Crank revolution data from cycling power vector."""
25 crank_revolutions: int
26 last_crank_event_time: float # in seconds
29class CyclingPowerVectorData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods
30 """Parsed data from Cycling Power Vector characteristic.
32 Used for both parsing and encoding - all fields are properly typed.
33 """
35 flags: CyclingPowerVectorFlags
36 crank_revolution_data: CrankRevolutionData
37 first_crank_measurement_angle: float
38 instantaneous_force_magnitude_array: tuple[float, ...] | None = None
39 instantaneous_torque_magnitude_array: tuple[float, ...] | None = None
41 def __post_init__(self) -> None:
42 """Validate cycling power vector data."""
43 if not 0 <= int(self.flags) <= UINT8_MAX:
44 raise ValueError("Flags must be a uint8 value (0-UINT8_MAX)")
45 if not 0 <= self.first_crank_measurement_angle <= 360:
46 raise ValueError("First crank measurement angle must be 0-360 degrees")
49class CyclingPowerVectorCharacteristic(BaseCharacteristic):
50 """Cycling Power Vector characteristic (0x2A64).
52 Used to transmit detailed cycling power vector data including force
53 and torque measurements at different crank angles.
54 """
56 _manual_unit: str = "various" # Multiple units in vector data
58 def decode_value(self, data: bytearray, ctx: CharacteristicContext | None = None) -> CyclingPowerVectorData:
59 """Parse cycling power vector data according to Bluetooth specification.
61 Format: Flags(1) + Crank Revolution Data(2) + Last Crank Event Time(2) +
62 First Crank Measurement Angle(2) + [Instantaneous Force Magnitude Array] +
63 [Instantaneous Torque Magnitude Array]
65 Args:
66 data: Raw bytearray from BLE characteristic.
67 ctx: Optional CharacteristicContext providing surrounding context (may be None).
69 Returns:
70 CyclingPowerVectorData containing parsed cycling power vector data.
72 # `ctx` is intentionally unused in this implementation; mark as used
73 # so linters do not report an unused-argument error.
74 del ctx
75 Raises:
76 ValueError: If data format is invalid.
78 """
79 if len(data) < 7:
80 raise ValueError("Cycling Power Vector data must be at least 7 bytes")
82 flags = CyclingPowerVectorFlags(data[0])
84 # Parse crank revolution data (2 bytes)
85 crank_revolutions = DataParser.parse_int16(data, 1, signed=False)
87 # Parse last crank event time (2 bytes, 1/1024 second units)
88 crank_event_time_raw = DataParser.parse_int16(data, 3, signed=False)
89 crank_event_time = crank_event_time_raw / 1024.0
91 # Parse first crank measurement angle (2 bytes, 1/180 degree units)
92 first_angle_raw = DataParser.parse_int16(data, 5, signed=False)
93 first_angle = first_angle_raw / 180.0 # Convert to degrees
95 # Create crank revolution data
96 crank_revolution_data = CrankRevolutionData(
97 crank_revolutions=crank_revolutions, last_crank_event_time=crank_event_time
98 )
100 offset = 7
101 force_magnitudes_list: list[float] = []
102 torque_magnitudes_list: list[float] = []
104 # Parse optional instantaneous force magnitude array if present
105 if (flags & CyclingPowerVectorFlags.INSTANTANEOUS_FORCE_MAGNITUDE_ARRAY_PRESENT) and len(data) > offset:
106 # Each force magnitude is 2 bytes (signed 16-bit, 1 N units)
107 while offset + 1 < len(data) and not (
108 flags & CyclingPowerVectorFlags.INSTANTANEOUS_TORQUE_MAGNITUDE_ARRAY_PRESENT
109 ): # Stop if torque data follows
110 if offset + 2 > len(data):
111 break
112 force_raw = DataParser.parse_int16(data, offset, signed=True)
113 force_magnitudes_list.append(float(force_raw)) # Force in Newtons
114 offset += 2
116 # Parse optional instantaneous torque magnitude array if present
117 if (flags & CyclingPowerVectorFlags.INSTANTANEOUS_TORQUE_MAGNITUDE_ARRAY_PRESENT) and len(data) > offset:
118 # Each torque magnitude is 2 bytes (signed 16-bit, 1/32 Nm units)
119 while offset + 1 < len(data):
120 if offset + 2 > len(data):
121 break
122 torque_raw = DataParser.parse_int16(data, offset, signed=True)
123 torque_magnitudes_list.append(torque_raw / 32.0) # Convert to Nm
124 offset += 2
126 return CyclingPowerVectorData(
127 flags=flags,
128 crank_revolution_data=crank_revolution_data,
129 first_crank_measurement_angle=first_angle,
130 instantaneous_force_magnitude_array=tuple(force_magnitudes_list) if force_magnitudes_list else None,
131 instantaneous_torque_magnitude_array=tuple(torque_magnitudes_list) if torque_magnitudes_list else None,
132 )
134 def encode_value(self, data: CyclingPowerVectorData) -> bytearray: # pylint: disable=too-many-branches # Complex cycling power vector with optional fields
135 """Encode cycling power vector value back to bytes.
137 Args:
138 data: CyclingPowerVectorData containing cycling power vector data
140 Returns:
141 Encoded bytes representing the power vector
143 """
144 if not isinstance(data, CyclingPowerVectorData):
145 raise TypeError(f"Cycling power vector data must be a CyclingPowerVectorData, got {type(data).__name__}")
147 # Extract values from dataclass
148 crank_revolutions = data.crank_revolution_data.crank_revolutions
149 crank_event_time = data.crank_revolution_data.last_crank_event_time
150 first_angle = data.first_crank_measurement_angle
152 # Build flags based on optional arrays
153 flags = data.flags
154 if data.instantaneous_force_magnitude_array is not None:
155 flags |= (
156 CyclingPowerVectorFlags.INSTANTANEOUS_FORCE_MAGNITUDE_ARRAY_PRESENT
157 ) # Force magnitude array present
158 if data.instantaneous_torque_magnitude_array is not None:
159 flags |= (
160 CyclingPowerVectorFlags.INSTANTANEOUS_TORQUE_MAGNITUDE_ARRAY_PRESENT
161 ) # Torque magnitude array present
163 # Convert values to raw format
164 crank_event_time_raw = round(crank_event_time * 1024) # 1/1024 second units
165 first_angle_raw = round(first_angle * 180) # 1/180 degree units
167 # Validate ranges
168 if not 0 <= crank_revolutions <= 0xFFFF:
169 raise ValueError(f"Crank revolutions {crank_revolutions} exceeds uint16 range")
170 if not 0 <= crank_event_time_raw <= 0xFFFF:
171 raise ValueError(f"Crank event time {crank_event_time_raw} exceeds uint16 range")
172 if not 0 <= first_angle_raw <= 0xFFFF:
173 raise ValueError(f"First angle {first_angle_raw} exceeds uint16 range")
175 # Build result
176 result = bytearray([int(flags)])
177 result.extend(DataParser.encode_int16(crank_revolutions, signed=False))
178 result.extend(DataParser.encode_int16(crank_event_time_raw, signed=False))
179 result.extend(DataParser.encode_int16(first_angle_raw, signed=False))
181 # Add force magnitude array if present
182 if data.instantaneous_force_magnitude_array is not None:
183 for force in data.instantaneous_force_magnitude_array:
184 force_val = int(force)
185 if SINT16_MIN <= force_val <= SINT16_MAX: # signed 16-bit range
186 result.extend(DataParser.encode_int16(force_val, signed=True))
188 # Add torque magnitude array if present
189 if data.instantaneous_torque_magnitude_array is not None:
190 for torque in data.instantaneous_torque_magnitude_array:
191 torque_val = int(torque * 32) # Convert back to 1/32 Nm units
192 if SINT16_MIN <= torque_val <= SINT16_MAX: # signed 16-bit range
193 result.extend(DataParser.encode_int16(torque_val, signed=True))
195 return result