Coverage for src/bluetooth_sig/gatt/characteristics/rsc_measurement.py: 80%
88 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 00:10 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 00:10 +0000
1"""RSC Measurement characteristic implementation."""
3from __future__ import annotations
5from enum import IntFlag
7import msgspec
9from ..constants import UINT8_MAX
10from ..context import CharacteristicContext
11from .base import BaseCharacteristic
12from .rsc_feature import RSCFeatureCharacteristic, RSCFeatureData
13from .utils import DataParser
16class RSCMeasurementFlags(IntFlag):
17 """RSC Measurement flags as per Bluetooth SIG specification."""
19 INSTANTANEOUS_STRIDE_LENGTH_PRESENT = 0x01
20 TOTAL_DISTANCE_PRESENT = 0x02
23class RSCMeasurementData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods
24 """Parsed data from RSC Measurement characteristic."""
26 instantaneous_speed: float # m/s
27 instantaneous_cadence: int # steps per minute
28 flags: RSCMeasurementFlags
29 instantaneous_stride_length: float | None = None # meters
30 total_distance: float | None = None # meters
32 def __post_init__(self) -> None:
33 """Validate RSC measurement data."""
34 if not 0 <= self.flags <= UINT8_MAX:
35 raise ValueError("Flags must be a uint8 value (0-UINT8_MAX)")
36 if not 0 <= self.instantaneous_cadence <= UINT8_MAX:
37 raise ValueError("Cadence must be a uint8 value (0-UINT8_MAX)")
40class RSCMeasurementCharacteristic(BaseCharacteristic):
41 """RSC (Running Speed and Cadence) Measurement characteristic (0x2A53).
43 Used to transmit running speed and cadence data.
44 """
46 def _validate_against_feature(self, data: RSCMeasurementData, ctx: CharacteristicContext) -> None:
47 """Validate RSC measurement data against supported features.
49 Args:
50 data: Parsed RSC measurement data to validate.
51 ctx: CharacteristicContext containing other characteristics.
53 Raises:
54 ValueError: If measurement reports unsupported features.
56 """
57 # Get RSC Feature characteristic from context
58 feature_char = self.get_context_characteristic(ctx, RSCFeatureCharacteristic)
59 if feature_char is None:
60 # No feature characteristic available, skip validation
61 return
63 # Decode the feature data
64 feature_data: RSCFeatureData = feature_char.decode_value(feature_char.value, ctx)
66 # Validate optional fields against supported features
67 if data.instantaneous_stride_length is not None and not feature_data.instantaneous_stride_length_supported:
68 raise ValueError("Instantaneous stride length reported but not supported by device features")
70 if data.total_distance is not None and not feature_data.total_distance_supported:
71 raise ValueError("Total distance reported but not supported by device features")
73 def decode_value(self, data: bytearray, ctx: CharacteristicContext | None = None) -> RSCMeasurementData:
74 """Parse RSC measurement data according to Bluetooth specification.
76 Format: Flags(1) + Instantaneous Speed(2) + Instantaneous Cadence(1) +
77 [Instantaneous Stride Length(2)] + [Total Distance(4)].
79 Args:
80 data: Raw bytearray from BLE characteristic.
81 ctx: Optional CharacteristicContext providing surrounding context (may be None).
83 Returns:
84 RSCMeasurementData containing parsed RSC data.
86 Raises:
87 ValueError: If data format is invalid.
89 """
90 if len(data) < 4:
91 raise ValueError("RSC Measurement data must be at least 4 bytes")
93 flags = RSCMeasurementFlags(data[0])
95 # Parse instantaneous speed (uint16, 1/256 m/s units)
96 speed_raw = DataParser.parse_int16(data, 1, signed=False)
97 speed_ms = speed_raw / 256.0 # m/s
99 # Parse instantaneous cadence (uint8, 1/min units)
100 cadence = data[3]
102 # Initialize optional fields
103 instantaneous_stride_length = None
104 total_distance = None
106 offset = 4
108 # Parse optional instantaneous stride length (2 bytes) if present
109 if (RSCMeasurementFlags.INSTANTANEOUS_STRIDE_LENGTH_PRESENT in flags) and len(data) >= offset + 2:
110 stride_length_raw = DataParser.parse_int16(data, offset, signed=False)
111 instantaneous_stride_length = stride_length_raw / 100.0 # Convert to meters
112 offset += 2
114 # Parse optional total distance (4 bytes) if present
115 if (RSCMeasurementFlags.TOTAL_DISTANCE_PRESENT in flags) and len(data) >= offset + 4:
116 total_distance_raw = DataParser.parse_int32(data, offset, signed=False)
117 total_distance = total_distance_raw / 10.0 # Convert to meters
119 measurement_data = RSCMeasurementData(
120 instantaneous_speed=speed_ms,
121 instantaneous_cadence=cadence,
122 flags=flags,
123 instantaneous_stride_length=instantaneous_stride_length,
124 total_distance=total_distance,
125 )
127 # Validate against feature characteristic if context is available
128 if ctx is not None:
129 self._validate_against_feature(measurement_data, ctx)
131 return measurement_data
133 def encode_value(self, data: RSCMeasurementData) -> bytearray:
134 """Encode RSC measurement value back to bytes.
136 Args:
137 data: RSCMeasurementData containing RSC measurement data
139 Returns:
140 Encoded bytes representing the RSC measurement
142 """
143 if not isinstance(data, RSCMeasurementData):
144 raise TypeError(f"RSC measurement data must be a RSCMeasurementData, got {type(data).__name__}")
146 # Build flags based on available optional data
147 flags = RSCMeasurementFlags(data.flags)
148 has_stride_length = data.instantaneous_stride_length is not None
149 has_total_distance = data.total_distance is not None
151 # Update flags to match available data
152 if has_stride_length:
153 flags |= RSCMeasurementFlags.INSTANTANEOUS_STRIDE_LENGTH_PRESENT
154 if has_total_distance:
155 flags |= RSCMeasurementFlags.TOTAL_DISTANCE_PRESENT
157 # Validate required fields
158 speed_raw = round(data.instantaneous_speed * 256) # Convert to 1/256 m/s units
159 if not 0 <= speed_raw <= 0xFFFF:
160 raise ValueError(f"Speed {data.instantaneous_speed} m/s exceeds uint16 range")
162 if not 0 <= data.instantaneous_cadence <= UINT8_MAX:
163 raise ValueError(f"Cadence {data.instantaneous_cadence} exceeds uint8 range")
165 # Start with flags, speed, and cadence
166 result = bytearray([int(flags)])
167 result.extend(DataParser.encode_int16(speed_raw, signed=False))
168 result.append(data.instantaneous_cadence)
170 # Add optional stride length if present
171 if has_stride_length:
172 if data.instantaneous_stride_length is None:
173 raise ValueError("Stride length is required but None")
174 stride_length = float(data.instantaneous_stride_length)
175 stride_length_raw = round(stride_length * 100) # Convert to cm units
176 if not 0 <= stride_length_raw <= 0xFFFF:
177 raise ValueError(f"Stride length {stride_length} m exceeds uint16 range")
178 result.extend(DataParser.encode_int16(stride_length_raw, signed=False))
180 # Add optional total distance if present
181 if has_total_distance:
182 if data.total_distance is None:
183 raise ValueError("Total distance is required but None")
184 total_distance = float(data.total_distance)
185 total_distance_raw = round(total_distance * 10) # Convert to dm units
186 if not 0 <= total_distance_raw <= 0xFFFFFFFF:
187 raise ValueError(f"Total distance {total_distance} m exceeds uint32 range")
188 result.extend(DataParser.encode_int32(total_distance_raw, signed=False))
190 return result