Coverage for src / bluetooth_sig / gatt / characteristics / rsc_measurement.py: 87%
90 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-03 16:41 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-03 16:41 +0000
1"""RSC Measurement characteristic implementation."""
3from __future__ import annotations
5from enum import IntFlag
6from typing import Any, ClassVar
8import msgspec
10from ..constants import UINT8_MAX, UINT16_MAX, UINT32_MAX
11from ..context import CharacteristicContext
12from .base import BaseCharacteristic
13from .rsc_feature import RSCFeatureCharacteristic
14from .utils import DataParser
17class RSCMeasurementFlags(IntFlag):
18 """RSC Measurement flags as per Bluetooth SIG specification."""
20 INSTANTANEOUS_STRIDE_LENGTH_PRESENT = 0x01
21 TOTAL_DISTANCE_PRESENT = 0x02
22 WALKING_OR_RUNNING_STATUS = 0x04
25class RSCMeasurementData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods
26 """Parsed data from RSC Measurement characteristic."""
28 instantaneous_speed: float # m/s
29 instantaneous_cadence: int # steps per minute
30 flags: RSCMeasurementFlags
31 is_running: bool = False
32 instantaneous_stride_length: float | None = None # meters
33 total_distance: float | None = None # meters
35 def __post_init__(self) -> None:
36 """Validate RSC measurement data."""
37 if not 0 <= self.flags <= UINT8_MAX:
38 raise ValueError("Flags must be a uint8 value (0-UINT8_MAX)")
39 if not 0 <= self.instantaneous_cadence <= UINT8_MAX:
40 raise ValueError("Cadence must be a uint8 value (0-UINT8_MAX)")
43class RSCMeasurementCharacteristic(BaseCharacteristic[RSCMeasurementData]):
44 """RSC (Running Speed and Cadence) Measurement characteristic (0x2A53).
46 Used to transmit running speed and cadence data.
47 """
49 # Declare optional dependency on RSC Feature for validation
50 # This ensures RSC Feature is parsed first when both are present
51 _optional_dependencies: ClassVar[list[type[BaseCharacteristic[Any]]]] = [RSCFeatureCharacteristic]
53 min_length: int = 4 # Flags(1) + Speed(2) + Cadence(1)
54 allow_variable_length: bool = True # Optional stride length and total distance
56 def _validate_against_feature(self, data: RSCMeasurementData, ctx: CharacteristicContext) -> None:
57 """Validate RSC measurement data against supported features.
59 Args:
60 data: Parsed RSC measurement data to validate.
61 ctx: CharacteristicContext containing other characteristics.
62 validate: Whether to validate ranges (default True)
64 Raises:
65 ValueError: If measurement reports unsupported features.
67 """
68 # Get RSC Feature characteristic from context
69 feature_data = self.get_context_characteristic(ctx, RSCFeatureCharacteristic)
70 if feature_data is None:
71 # No feature characteristic available, skip validation
72 return
74 # Validate optional fields against supported features
75 if data.instantaneous_stride_length is not None and not feature_data.instantaneous_stride_length_supported:
76 raise ValueError("Instantaneous stride length reported but not supported by device features")
78 if data.total_distance is not None and not feature_data.total_distance_supported:
79 raise ValueError("Total distance reported but not supported by device features")
81 def _decode_value(
82 self, data: bytearray, ctx: CharacteristicContext | None = None, *, validate: bool = True
83 ) -> RSCMeasurementData:
84 """Parse RSC measurement data according to Bluetooth specification.
86 Format: Flags(1) + Instantaneous Speed(2) + Instantaneous Cadence(1) +
87 [Instantaneous Stride Length(2)] + [Total Distance(4)].
89 Args:
90 data: Raw bytearray from BLE characteristic.
91 ctx: Optional CharacteristicContext providing surrounding context (may be None).
92 validate: Whether to validate ranges (default True)
94 Returns:
95 RSCMeasurementData containing parsed RSC data.
97 Raises:
98 ValueError: If data format is invalid.
100 """
101 flags = RSCMeasurementFlags(data[0])
103 # Parse walking or running status (bit 2: 0=Walking, 1=Running)
104 is_running = bool(flags & RSCMeasurementFlags.WALKING_OR_RUNNING_STATUS)
106 # Parse instantaneous speed (uint16, 1/256 m/s units)
107 speed_raw = DataParser.parse_int16(data, 1, signed=False)
108 speed_ms = speed_raw / 256.0 # m/s
110 # Parse instantaneous cadence (uint8, 1/min units)
111 cadence = data[3]
113 # Initialize optional fields
114 instantaneous_stride_length = None
115 total_distance = None
117 offset = 4
119 # Parse optional instantaneous stride length (2 bytes) if present
120 if (RSCMeasurementFlags.INSTANTANEOUS_STRIDE_LENGTH_PRESENT in flags) and len(data) >= offset + 2:
121 stride_length_raw = DataParser.parse_int16(data, offset, signed=False)
122 instantaneous_stride_length = stride_length_raw / 100.0 # Convert to meters
123 offset += 2
125 # Parse optional total distance (4 bytes) if present
126 if (RSCMeasurementFlags.TOTAL_DISTANCE_PRESENT in flags) and len(data) >= offset + 4:
127 total_distance_raw = DataParser.parse_int32(data, offset, signed=False)
128 total_distance = total_distance_raw / 10.0 # Convert to meters
130 measurement_data = RSCMeasurementData(
131 instantaneous_speed=speed_ms,
132 instantaneous_cadence=cadence,
133 flags=flags,
134 is_running=is_running,
135 instantaneous_stride_length=instantaneous_stride_length,
136 total_distance=total_distance,
137 )
139 # Validate against feature characteristic if context is available
140 if ctx is not None:
141 self._validate_against_feature(measurement_data, ctx)
143 return measurement_data
145 def _encode_value(self, data: RSCMeasurementData) -> bytearray:
146 """Encode RSC measurement value back to bytes.
148 Args:
149 data: RSCMeasurementData containing RSC measurement data
151 Returns:
152 Encoded bytes representing the RSC measurement
154 """
155 # Build flags based on available optional data
156 flags = RSCMeasurementFlags(data.flags)
157 has_stride_length = data.instantaneous_stride_length is not None
158 has_total_distance = data.total_distance is not None
160 # Update flags to match available data
161 if has_stride_length:
162 flags |= RSCMeasurementFlags.INSTANTANEOUS_STRIDE_LENGTH_PRESENT
163 if has_total_distance:
164 flags |= RSCMeasurementFlags.TOTAL_DISTANCE_PRESENT
166 # Validate required fields
167 speed_raw = round(data.instantaneous_speed * 256) # Convert to 1/256 m/s units
168 if not 0 <= speed_raw <= UINT16_MAX:
169 raise ValueError(f"Speed {data.instantaneous_speed} m/s exceeds uint16 range")
171 if not 0 <= data.instantaneous_cadence <= UINT8_MAX:
172 raise ValueError(f"Cadence {data.instantaneous_cadence} exceeds uint8 range")
174 # Start with flags, speed, and cadence
175 result = bytearray([int(flags)])
176 result.extend(DataParser.encode_int16(speed_raw, signed=False))
177 result.append(data.instantaneous_cadence)
179 # Add optional stride length if present
180 if has_stride_length:
181 if data.instantaneous_stride_length is None:
182 raise ValueError("Stride length is required but None")
183 stride_length = float(data.instantaneous_stride_length)
184 stride_length_raw = round(stride_length * 100) # Convert to cm units
185 if not 0 <= stride_length_raw <= UINT16_MAX:
186 raise ValueError(f"Stride length {stride_length} m exceeds uint16 range")
187 result.extend(DataParser.encode_int16(stride_length_raw, signed=False))
189 # Add optional total distance if present
190 if has_total_distance:
191 if data.total_distance is None:
192 raise ValueError("Total distance is required but None")
193 total_distance = float(data.total_distance)
194 total_distance_raw = round(total_distance * 10) # Convert to dm units
195 if not 0 <= total_distance_raw <= UINT32_MAX:
196 raise ValueError(f"Total distance {total_distance} m exceeds uint32 range")
197 result.extend(DataParser.encode_int32(total_distance_raw, signed=False))
199 return result