Coverage for src / bluetooth_sig / gatt / characteristics / csc_measurement.py: 85%
101 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 20:14 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 20:14 +0000
1"""CSC Measurement characteristic implementation."""
3from __future__ import annotations
5from enum import IntFlag
7import msgspec
9from ..constants import UINT8_MAX
10from ..context import CharacteristicContext
11from .base import BaseCharacteristic
12from .csc_feature import CSCFeatureCharacteristic, CSCFeatureData
13from .utils import DataParser
16class CSCMeasurementFlags(IntFlag):
17 """CSC Measurement flags as per Bluetooth SIG specification."""
19 WHEEL_REVOLUTION_DATA_PRESENT = 0x01
20 CRANK_REVOLUTION_DATA_PRESENT = 0x02
23class CSCMeasurementData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods
24 """Parsed data from CSC Measurement characteristic."""
26 flags: CSCMeasurementFlags
27 cumulative_wheel_revolutions: int | None = None
28 last_wheel_event_time: float | None = None
29 cumulative_crank_revolutions: int | None = None
30 last_crank_event_time: float | None = None
32 def __post_init__(self) -> None:
33 """Validate CSC measurement data."""
34 if not 0 <= int(self.flags) <= UINT8_MAX:
35 raise ValueError("Flags must be a uint8 value (0-UINT8_MAX)")
38class CSCMeasurementCharacteristic(BaseCharacteristic[CSCMeasurementData]):
39 """CSC (Cycling Speed and Cadence) Measurement characteristic (0x2A5B).
41 Used to transmit cycling speed and cadence data.
42 """
44 # Override automatic name resolution because "CSC" is an acronym
45 _characteristic_name: str | None = "CSC Measurement"
47 # Declare optional dependency on CSC Feature for validation
48 # This ensures CSC Feature is parsed first when both are present
49 _optional_dependencies = [CSCFeatureCharacteristic]
51 # Validation: min 1 byte (flags), max 11 bytes (flags + wheel + crank data)
52 min_length = 1
53 allow_variable_length = True
54 max_length = 11 # flags:1 + wheel:6 + crank:4
56 # Time resolution constants
57 CSC_TIME_RESOLUTION = 1024.0 # 1/1024 second resolution for both wheel and crank event times
59 def _decode_value(self, data: bytearray, ctx: CharacteristicContext | None = None) -> CSCMeasurementData:
60 """Parse CSC measurement data according to Bluetooth specification.
62 Format: Flags(1) + [Cumulative Wheel Revolutions(4)] + [Last Wheel Event Time(2)] +
63 [Cumulative Crank Revolutions(2)] + [Last Crank Event Time(2)]
65 Args:
66 data: Raw bytearray from BLE characteristic.
67 ctx: Optional CharacteristicContext providing surrounding context (may be None).
69 Returns:
70 CSCMeasurementData containing parsed CSC data.
72 Raises:
73 ValueError: If data format is invalid.
75 """
76 if len(data) < 1:
77 raise ValueError("CSC Measurement data must be at least 1 byte")
79 flags = CSCMeasurementFlags(data[0])
80 offset = 1
82 # Initialize result data
83 cumulative_wheel_revolutions = None
84 last_wheel_event_time = None
85 cumulative_crank_revolutions = None
86 last_crank_event_time = None
88 # Parse optional wheel revolution data (6 bytes total) if present
89 if (flags & CSCMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT) and len(data) >= offset + 6:
90 wheel_revolutions = DataParser.parse_int32(data, offset, signed=False)
91 wheel_event_time_raw = DataParser.parse_int16(data, offset + 4, signed=False)
92 # Wheel event time is in 1/CSC_TIME_RESOLUTION second units
93 cumulative_wheel_revolutions = wheel_revolutions
94 last_wheel_event_time = wheel_event_time_raw / self.CSC_TIME_RESOLUTION
95 offset += 6
97 # Parse optional crank revolution data (4 bytes total) if present
98 if (flags & CSCMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT) and len(data) >= offset + 4:
99 crank_revolutions = DataParser.parse_int16(data, offset, signed=False)
100 crank_event_time_raw = DataParser.parse_int16(data, offset + 2, signed=False)
101 # Crank event time is in 1/CSC_TIME_RESOLUTION second units
102 cumulative_crank_revolutions = crank_revolutions
103 last_crank_event_time = crank_event_time_raw / self.CSC_TIME_RESOLUTION
105 # Validate flags against CSC Feature if available
106 if ctx is not None:
107 feature_value = self.get_context_characteristic(ctx, CSCFeatureCharacteristic)
108 if feature_value is not None:
109 self._validate_against_feature(flags, feature_value)
111 return CSCMeasurementData(
112 flags=flags,
113 cumulative_wheel_revolutions=cumulative_wheel_revolutions,
114 last_wheel_event_time=last_wheel_event_time,
115 cumulative_crank_revolutions=cumulative_crank_revolutions,
116 last_crank_event_time=last_crank_event_time,
117 )
119 def _encode_wheel_data(self, data: CSCMeasurementData) -> bytearray:
120 """Encode wheel revolution data.
122 Args:
123 data: CSCMeasurementData containing wheel data
125 Returns:
126 Encoded wheel revolution bytes
128 Raises:
129 ValueError: If wheel data is invalid or out of range
131 """
132 if data.cumulative_wheel_revolutions is None or data.last_wheel_event_time is None:
133 raise ValueError("CSC wheel revolution data marked present but missing values")
135 wheel_revolutions = int(data.cumulative_wheel_revolutions)
136 wheel_event_time = float(data.last_wheel_event_time)
138 # Validate ranges
139 if not 0 <= wheel_revolutions <= 0xFFFFFFFF:
140 raise ValueError(f"Wheel revolutions {wheel_revolutions} exceeds uint32 range")
142 wheel_event_time_raw = round(wheel_event_time * self.CSC_TIME_RESOLUTION)
143 if not 0 <= wheel_event_time_raw <= 0xFFFF:
144 raise ValueError(f"Wheel event time {wheel_event_time_raw} exceeds uint16 range")
146 result = bytearray()
147 result.extend(DataParser.encode_int32(wheel_revolutions, signed=False))
148 result.extend(DataParser.encode_int16(wheel_event_time_raw, signed=False))
149 return result
151 def _encode_crank_data(self, data: CSCMeasurementData) -> bytearray:
152 """Encode crank revolution data.
154 Args:
155 data: CSCMeasurementData containing crank data
157 Returns:
158 Encoded crank revolution bytes
160 Raises:
161 ValueError: If crank data is invalid or out of range
163 """
164 if data.cumulative_crank_revolutions is None or data.last_crank_event_time is None:
165 raise ValueError("CSC crank revolution data marked present but missing values")
167 crank_revolutions = int(data.cumulative_crank_revolutions)
168 crank_event_time = float(data.last_crank_event_time)
170 # Validate ranges
171 if not 0 <= crank_revolutions <= 0xFFFF:
172 raise ValueError(f"Crank revolutions {crank_revolutions} exceeds uint16 range")
174 crank_event_time_raw = round(crank_event_time * self.CSC_TIME_RESOLUTION)
175 if not 0 <= crank_event_time_raw <= 0xFFFF:
176 raise ValueError(f"Crank event time {crank_event_time_raw} exceeds uint16 range")
178 result = bytearray()
179 result.extend(DataParser.encode_int16(crank_revolutions, signed=False))
180 result.extend(DataParser.encode_int16(crank_event_time_raw, signed=False))
181 return result
183 def _encode_value(self, data: CSCMeasurementData) -> bytearray:
184 """Encode CSC measurement value back to bytes.
186 Args:
187 data: CSCMeasurementData containing CSC measurement data
189 Returns:
190 Encoded bytes representing the CSC measurement
192 """
193 # Build flags based on available data
194 flags = data.flags
195 has_wheel_data = data.cumulative_wheel_revolutions is not None and data.last_wheel_event_time is not None
196 has_crank_data = data.cumulative_crank_revolutions is not None and data.last_crank_event_time is not None
198 # Update flags to match available data
199 if has_wheel_data:
200 flags |= CSCMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT
201 if has_crank_data:
202 flags |= CSCMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT
204 # Start with flags byte
205 result = bytearray([int(flags)])
207 # Add wheel revolution data if present
208 if has_wheel_data:
209 result.extend(self._encode_wheel_data(data))
211 # Add crank revolution data if present
212 if has_crank_data:
213 result.extend(self._encode_crank_data(data))
215 return result
217 def _validate_against_feature(self, flags: int, feature_data: CSCFeatureData) -> None:
218 """Validate measurement flags against CSC Feature characteristic.
220 Args:
221 flags: Measurement flags indicating which data is present
222 feature_data: CSCFeatureData from CSC Feature characteristic
224 Raises:
225 ValueError: If reported measurement fields are not supported by device features
227 """
228 # Validate that reported measurement fields are supported
229 wheel_flag = int(CSCMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT)
230 if (flags & wheel_flag) and not feature_data.wheel_revolution_data_supported:
231 raise ValueError("Wheel revolution data reported but not supported by CSC Feature")
233 crank_flag = int(CSCMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT)
234 if (flags & crank_flag) and not feature_data.crank_revolution_data_supported:
235 raise ValueError("Crank revolution data reported but not supported by CSC Feature")