Coverage for src/bluetooth_sig/gatt/characteristics/csc_measurement.py: 84%
97 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 00:10 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 00:10 +0000
1"""CSC Measurement characteristic implementation."""
3from __future__ import annotations
5from enum import IntFlag
7import msgspec
9from ..constants import UINT8_MAX
10from ..context import CharacteristicContext
11from .base import BaseCharacteristic
12from .csc_feature import CSCFeatureCharacteristic, CSCFeatureData
13from .utils import DataParser
16class CSCMeasurementFlags(IntFlag):
17 """CSC Measurement flags as per Bluetooth SIG specification."""
19 WHEEL_REVOLUTION_DATA_PRESENT = 0x01
20 CRANK_REVOLUTION_DATA_PRESENT = 0x02
23class CSCMeasurementData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods
24 """Parsed data from CSC Measurement characteristic."""
26 flags: CSCMeasurementFlags
27 cumulative_wheel_revolutions: int | None = None
28 last_wheel_event_time: float | None = None
29 cumulative_crank_revolutions: int | None = None
30 last_crank_event_time: float | None = None
32 def __post_init__(self) -> None:
33 """Validate CSC measurement data."""
34 if not 0 <= int(self.flags) <= UINT8_MAX:
35 raise ValueError("Flags must be a uint8 value (0-UINT8_MAX)")
38class CSCMeasurementCharacteristic(BaseCharacteristic):
39 """CSC (Cycling Speed and Cadence) Measurement characteristic (0x2A5B).
41 Used to transmit cycling speed and cadence data.
42 """
44 # Override automatic name resolution because "CSC" is an acronym
45 _characteristic_name: str | None = "CSC Measurement"
47 # Time resolution constants
48 CSC_TIME_RESOLUTION = 1024.0 # 1/1024 second resolution for both wheel and crank event times
50 def decode_value(self, data: bytearray, ctx: CharacteristicContext | None = None) -> CSCMeasurementData:
51 """Parse CSC measurement data according to Bluetooth specification.
53 Format: Flags(1) + [Cumulative Wheel Revolutions(4)] + [Last Wheel Event Time(2)] +
54 [Cumulative Crank Revolutions(2)] + [Last Crank Event Time(2)]
56 Args:
57 data: Raw bytearray from BLE characteristic.
58 ctx: Optional CharacteristicContext providing surrounding context (may be None).
60 Returns:
61 CSCMeasurementData containing parsed CSC data.
63 Raises:
64 ValueError: If data format is invalid.
66 """
67 if len(data) < 1:
68 raise ValueError("CSC Measurement data must be at least 1 byte")
70 flags = CSCMeasurementFlags(data[0])
71 offset = 1
73 # Initialize result data
74 cumulative_wheel_revolutions = None
75 last_wheel_event_time = None
76 cumulative_crank_revolutions = None
77 last_crank_event_time = None
79 # Parse optional wheel revolution data (6 bytes total) if present
80 if (flags & CSCMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT) and len(data) >= offset + 6:
81 wheel_revolutions = DataParser.parse_int32(data, offset, signed=False)
82 wheel_event_time_raw = DataParser.parse_int16(data, offset + 4, signed=False)
83 # Wheel event time is in 1/CSC_TIME_RESOLUTION second units
84 cumulative_wheel_revolutions = wheel_revolutions
85 last_wheel_event_time = wheel_event_time_raw / self.CSC_TIME_RESOLUTION
86 offset += 6
88 # Parse optional crank revolution data (4 bytes total) if present
89 if (flags & CSCMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT) and len(data) >= offset + 4:
90 crank_revolutions = DataParser.parse_int16(data, offset, signed=False)
91 crank_event_time_raw = DataParser.parse_int16(data, offset + 2, signed=False)
92 # Crank event time is in 1/CSC_TIME_RESOLUTION second units
93 cumulative_crank_revolutions = crank_revolutions
94 last_crank_event_time = crank_event_time_raw / self.CSC_TIME_RESOLUTION
96 # Validate flags against CSC Feature if available
97 if ctx is not None:
98 feature_char = self.get_context_characteristic(ctx, CSCFeatureCharacteristic)
99 if feature_char and feature_char.parse_success and feature_char.value is not None:
100 self._validate_against_feature(flags, feature_char.value)
102 return CSCMeasurementData(
103 flags=flags,
104 cumulative_wheel_revolutions=cumulative_wheel_revolutions,
105 last_wheel_event_time=last_wheel_event_time,
106 cumulative_crank_revolutions=cumulative_crank_revolutions,
107 last_crank_event_time=last_crank_event_time,
108 )
110 def _encode_wheel_data(self, data: CSCMeasurementData) -> bytearray:
111 """Encode wheel revolution data.
113 Args:
114 data: CSCMeasurementData containing wheel data
116 Returns:
117 Encoded wheel revolution bytes
119 Raises:
120 ValueError: If wheel data is invalid or out of range
122 """
123 if data.cumulative_wheel_revolutions is None or data.last_wheel_event_time is None:
124 raise ValueError("CSC wheel revolution data marked present but missing values")
126 wheel_revolutions = int(data.cumulative_wheel_revolutions)
127 wheel_event_time = float(data.last_wheel_event_time)
129 # Validate ranges
130 if not 0 <= wheel_revolutions <= 0xFFFFFFFF:
131 raise ValueError(f"Wheel revolutions {wheel_revolutions} exceeds uint32 range")
133 wheel_event_time_raw = round(wheel_event_time * self.CSC_TIME_RESOLUTION)
134 if not 0 <= wheel_event_time_raw <= 0xFFFF:
135 raise ValueError(f"Wheel event time {wheel_event_time_raw} exceeds uint16 range")
137 result = bytearray()
138 result.extend(DataParser.encode_int32(wheel_revolutions, signed=False))
139 result.extend(DataParser.encode_int16(wheel_event_time_raw, signed=False))
140 return result
142 def _encode_crank_data(self, data: CSCMeasurementData) -> bytearray:
143 """Encode crank revolution data.
145 Args:
146 data: CSCMeasurementData containing crank data
148 Returns:
149 Encoded crank revolution bytes
151 Raises:
152 ValueError: If crank data is invalid or out of range
154 """
155 if data.cumulative_crank_revolutions is None or data.last_crank_event_time is None:
156 raise ValueError("CSC crank revolution data marked present but missing values")
158 crank_revolutions = int(data.cumulative_crank_revolutions)
159 crank_event_time = float(data.last_crank_event_time)
161 # Validate ranges
162 if not 0 <= crank_revolutions <= 0xFFFF:
163 raise ValueError(f"Crank revolutions {crank_revolutions} exceeds uint16 range")
165 crank_event_time_raw = round(crank_event_time * self.CSC_TIME_RESOLUTION)
166 if not 0 <= crank_event_time_raw <= 0xFFFF:
167 raise ValueError(f"Crank event time {crank_event_time_raw} exceeds uint16 range")
169 result = bytearray()
170 result.extend(DataParser.encode_int16(crank_revolutions, signed=False))
171 result.extend(DataParser.encode_int16(crank_event_time_raw, signed=False))
172 return result
174 def encode_value(self, data: CSCMeasurementData) -> bytearray:
175 """Encode CSC measurement value back to bytes.
177 Args:
178 data: CSCMeasurementData containing CSC measurement data
180 Returns:
181 Encoded bytes representing the CSC measurement
183 """
184 # Build flags based on available data
185 flags = data.flags
186 has_wheel_data = data.cumulative_wheel_revolutions is not None and data.last_wheel_event_time is not None
187 has_crank_data = data.cumulative_crank_revolutions is not None and data.last_crank_event_time is not None
189 # Update flags to match available data
190 if has_wheel_data:
191 flags |= CSCMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT
192 if has_crank_data:
193 flags |= CSCMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT
195 # Start with flags byte
196 result = bytearray([int(flags)])
198 # Add wheel revolution data if present
199 if has_wheel_data:
200 result.extend(self._encode_wheel_data(data))
202 # Add crank revolution data if present
203 if has_crank_data:
204 result.extend(self._encode_crank_data(data))
206 return result
208 def _validate_against_feature(self, flags: int, feature_data: CSCFeatureData) -> None:
209 """Validate measurement flags against CSC Feature characteristic.
211 Args:
212 flags: Measurement flags indicating which data is present
213 feature_data: CSCFeatureData from CSC Feature characteristic
215 Raises:
216 ValueError: If reported measurement fields are not supported by device features
218 """
219 # Validate that reported measurement fields are supported
220 wheel_flag = int(CSCMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT)
221 if (flags & wheel_flag) and not feature_data.wheel_revolution_data_supported:
222 raise ValueError("Wheel revolution data reported but not supported by CSC Feature")
223 crank_flag = int(CSCMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT)
224 if (flags & crank_flag) and not feature_data.crank_revolution_data_supported:
225 raise ValueError("Crank revolution data reported but not supported by CSC Feature")