Coverage for src / bluetooth_sig / gatt / characteristics / csc_measurement.py: 86%
100 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
1"""CSC Measurement characteristic implementation."""
3from __future__ import annotations
5from enum import IntFlag
6from typing import Any, ClassVar
8import msgspec
10from ..constants import UINT8_MAX, UINT16_MAX, UINT32_MAX
11from ..context import CharacteristicContext
12from .base import BaseCharacteristic
13from .csc_feature import CSCFeatureCharacteristic, CSCFeatureData
14from .utils import DataParser
17class CSCMeasurementFlags(IntFlag):
18 """CSC Measurement flags as per Bluetooth SIG specification."""
20 WHEEL_REVOLUTION_DATA_PRESENT = 0x01
21 CRANK_REVOLUTION_DATA_PRESENT = 0x02
24class CSCMeasurementData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods
25 """Parsed data from CSC Measurement characteristic."""
27 flags: CSCMeasurementFlags
28 cumulative_wheel_revolutions: int | None = None
29 last_wheel_event_time: float | None = None
30 cumulative_crank_revolutions: int | None = None
31 last_crank_event_time: float | None = None
33 def __post_init__(self) -> None:
34 """Validate CSC measurement data."""
35 if not 0 <= int(self.flags) <= UINT8_MAX:
36 raise ValueError("Flags must be a uint8 value (0-UINT8_MAX)")
39class CSCMeasurementCharacteristic(BaseCharacteristic[CSCMeasurementData]):
40 """CSC (Cycling Speed and Cadence) Measurement characteristic (0x2A5B).
42 Used to transmit cycling speed and cadence data.
43 """
45 # Override automatic name resolution because "CSC" is an acronym
46 _characteristic_name: str | None = "CSC Measurement"
48 # Declare optional dependency on CSC Feature for validation
49 # This ensures CSC Feature is parsed first when both are present
50 _optional_dependencies: ClassVar[list[type[BaseCharacteristic[Any]]]] = [CSCFeatureCharacteristic]
52 # Validation: min 1 byte (flags), max 11 bytes (flags + wheel + crank data)
53 min_length = 1
54 allow_variable_length = True
55 max_length = 11 # flags:1 + wheel:6 + crank:4
57 # Time resolution constants
58 CSC_TIME_RESOLUTION = 1024.0 # 1/1024 second resolution for both wheel and crank event times
60 def _decode_value(
61 self, data: bytearray, ctx: CharacteristicContext | None = None, *, validate: bool = True
62 ) -> CSCMeasurementData:
63 """Parse CSC measurement data according to Bluetooth specification.
65 Format: Flags(1) + [Cumulative Wheel Revolutions(4)] + [Last Wheel Event Time(2)] +
66 [Cumulative Crank Revolutions(2)] + [Last Crank Event Time(2)]
68 Args:
69 data: Raw bytearray from BLE characteristic.
70 ctx: Optional CharacteristicContext providing surrounding context (may be None).
71 validate: Whether to validate ranges (default True)
73 Returns:
74 CSCMeasurementData containing parsed CSC data.
76 Raises:
77 ValueError: If data format is invalid.
79 """
80 flags = CSCMeasurementFlags(data[0])
81 offset = 1
83 # Initialize result data
84 cumulative_wheel_revolutions = None
85 last_wheel_event_time = None
86 cumulative_crank_revolutions = None
87 last_crank_event_time = None
89 # Parse optional wheel revolution data (6 bytes total) if present
90 if (flags & CSCMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT) and len(data) >= offset + 6:
91 wheel_revolutions = DataParser.parse_int32(data, offset, signed=False)
92 wheel_event_time_raw = DataParser.parse_int16(data, offset + 4, signed=False)
93 # Wheel event time is in 1/CSC_TIME_RESOLUTION second units
94 cumulative_wheel_revolutions = wheel_revolutions
95 last_wheel_event_time = wheel_event_time_raw / self.CSC_TIME_RESOLUTION
96 offset += 6
98 # Parse optional crank revolution data (4 bytes total) if present
99 if (flags & CSCMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT) and len(data) >= offset + 4:
100 crank_revolutions = DataParser.parse_int16(data, offset, signed=False)
101 crank_event_time_raw = DataParser.parse_int16(data, offset + 2, signed=False)
102 # Crank event time is in 1/CSC_TIME_RESOLUTION second units
103 cumulative_crank_revolutions = crank_revolutions
104 last_crank_event_time = crank_event_time_raw / self.CSC_TIME_RESOLUTION
106 # Validate flags against CSC Feature if available
107 if ctx is not None:
108 feature_value = self.get_context_characteristic(ctx, CSCFeatureCharacteristic)
109 if feature_value is not None:
110 self._validate_against_feature(flags, feature_value)
112 return CSCMeasurementData(
113 flags=flags,
114 cumulative_wheel_revolutions=cumulative_wheel_revolutions,
115 last_wheel_event_time=last_wheel_event_time,
116 cumulative_crank_revolutions=cumulative_crank_revolutions,
117 last_crank_event_time=last_crank_event_time,
118 )
120 def _encode_wheel_data(self, data: CSCMeasurementData) -> bytearray:
121 """Encode wheel revolution data.
123 Args:
124 data: CSCMeasurementData containing wheel data
126 Returns:
127 Encoded wheel revolution bytes
129 Raises:
130 ValueError: If wheel data is invalid or out of range
132 """
133 if data.cumulative_wheel_revolutions is None or data.last_wheel_event_time is None:
134 raise ValueError("CSC wheel revolution data marked present but missing values")
136 wheel_revolutions = int(data.cumulative_wheel_revolutions)
137 wheel_event_time = float(data.last_wheel_event_time)
139 # Validate ranges
140 if not 0 <= wheel_revolutions <= UINT32_MAX:
141 raise ValueError(f"Wheel revolutions {wheel_revolutions} exceeds uint32 range")
143 wheel_event_time_raw = round(wheel_event_time * self.CSC_TIME_RESOLUTION)
144 if not 0 <= wheel_event_time_raw <= UINT16_MAX:
145 raise ValueError(f"Wheel event time {wheel_event_time_raw} exceeds uint16 range")
147 result = bytearray()
148 result.extend(DataParser.encode_int32(wheel_revolutions, signed=False))
149 result.extend(DataParser.encode_int16(wheel_event_time_raw, signed=False))
150 return result
152 def _encode_crank_data(self, data: CSCMeasurementData) -> bytearray:
153 """Encode crank revolution data.
155 Args:
156 data: CSCMeasurementData containing crank data
158 Returns:
159 Encoded crank revolution bytes
161 Raises:
162 ValueError: If crank data is invalid or out of range
164 """
165 if data.cumulative_crank_revolutions is None or data.last_crank_event_time is None:
166 raise ValueError("CSC crank revolution data marked present but missing values")
168 crank_revolutions = int(data.cumulative_crank_revolutions)
169 crank_event_time = float(data.last_crank_event_time)
171 # Validate ranges
172 if not 0 <= crank_revolutions <= UINT16_MAX:
173 raise ValueError(f"Crank revolutions {crank_revolutions} exceeds uint16 range")
175 crank_event_time_raw = round(crank_event_time * self.CSC_TIME_RESOLUTION)
176 if not 0 <= crank_event_time_raw <= UINT16_MAX:
177 raise ValueError(f"Crank event time {crank_event_time_raw} exceeds uint16 range")
179 result = bytearray()
180 result.extend(DataParser.encode_int16(crank_revolutions, signed=False))
181 result.extend(DataParser.encode_int16(crank_event_time_raw, signed=False))
182 return result
184 def _encode_value(self, data: CSCMeasurementData) -> bytearray:
185 """Encode CSC measurement value back to bytes.
187 Args:
188 data: CSCMeasurementData containing CSC measurement data
190 Returns:
191 Encoded bytes representing the CSC measurement
193 """
194 # Build flags based on available data
195 flags = data.flags
196 has_wheel_data = data.cumulative_wheel_revolutions is not None and data.last_wheel_event_time is not None
197 has_crank_data = data.cumulative_crank_revolutions is not None and data.last_crank_event_time is not None
199 # Update flags to match available data
200 if has_wheel_data:
201 flags |= CSCMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT
202 if has_crank_data:
203 flags |= CSCMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT
205 # Start with flags byte
206 result = bytearray([int(flags)])
208 # Add wheel revolution data if present
209 if has_wheel_data:
210 result.extend(self._encode_wheel_data(data))
212 # Add crank revolution data if present
213 if has_crank_data:
214 result.extend(self._encode_crank_data(data))
216 return result
218 def _validate_against_feature(self, flags: int, feature_data: CSCFeatureData) -> None:
219 """Validate measurement flags against CSC Feature characteristic.
221 Args:
222 flags: Measurement flags indicating which data is present
223 feature_data: CSCFeatureData from CSC Feature characteristic
225 Raises:
226 ValueError: If reported measurement fields are not supported by device features
228 """
229 # Validate that reported measurement fields are supported
230 wheel_flag = int(CSCMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT)
231 if (flags & wheel_flag) and not feature_data.wheel_revolution_data_supported:
232 raise ValueError("Wheel revolution data reported but not supported by CSC Feature")
234 crank_flag = int(CSCMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT)
235 if (flags & crank_flag) and not feature_data.crank_revolution_data_supported:
236 raise ValueError("Crank revolution data reported but not supported by CSC Feature")