Coverage for src / bluetooth_sig / gatt / characteristics / rsc_measurement.py: 87%

90 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-03 16:41 +0000

1"""RSC Measurement characteristic implementation.""" 

2 

3from __future__ import annotations 

4 

5from enum import IntFlag 

6from typing import Any, ClassVar 

7 

8import msgspec 

9 

10from ..constants import UINT8_MAX, UINT16_MAX, UINT32_MAX 

11from ..context import CharacteristicContext 

12from .base import BaseCharacteristic 

13from .rsc_feature import RSCFeatureCharacteristic 

14from .utils import DataParser 

15 

16 

17class RSCMeasurementFlags(IntFlag): 

18 """RSC Measurement flags as per Bluetooth SIG specification.""" 

19 

20 INSTANTANEOUS_STRIDE_LENGTH_PRESENT = 0x01 

21 TOTAL_DISTANCE_PRESENT = 0x02 

22 WALKING_OR_RUNNING_STATUS = 0x04 

23 

24 

25class RSCMeasurementData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods 

26 """Parsed data from RSC Measurement characteristic.""" 

27 

28 instantaneous_speed: float # m/s 

29 instantaneous_cadence: int # steps per minute 

30 flags: RSCMeasurementFlags 

31 is_running: bool = False 

32 instantaneous_stride_length: float | None = None # meters 

33 total_distance: float | None = None # meters 

34 

35 def __post_init__(self) -> None: 

36 """Validate RSC measurement data.""" 

37 if not 0 <= self.flags <= UINT8_MAX: 

38 raise ValueError("Flags must be a uint8 value (0-UINT8_MAX)") 

39 if not 0 <= self.instantaneous_cadence <= UINT8_MAX: 

40 raise ValueError("Cadence must be a uint8 value (0-UINT8_MAX)") 

41 

42 

43class RSCMeasurementCharacteristic(BaseCharacteristic[RSCMeasurementData]): 

44 """RSC (Running Speed and Cadence) Measurement characteristic (0x2A53). 

45 

46 Used to transmit running speed and cadence data. 

47 """ 

48 

49 # Declare optional dependency on RSC Feature for validation 

50 # This ensures RSC Feature is parsed first when both are present 

51 _optional_dependencies: ClassVar[list[type[BaseCharacteristic[Any]]]] = [RSCFeatureCharacteristic] 

52 

53 min_length: int = 4 # Flags(1) + Speed(2) + Cadence(1) 

54 allow_variable_length: bool = True # Optional stride length and total distance 

55 

56 def _validate_against_feature(self, data: RSCMeasurementData, ctx: CharacteristicContext) -> None: 

57 """Validate RSC measurement data against supported features. 

58 

59 Args: 

60 data: Parsed RSC measurement data to validate. 

61 ctx: CharacteristicContext containing other characteristics. 

62 validate: Whether to validate ranges (default True) 

63 

64 Raises: 

65 ValueError: If measurement reports unsupported features. 

66 

67 """ 

68 # Get RSC Feature characteristic from context 

69 feature_data = self.get_context_characteristic(ctx, RSCFeatureCharacteristic) 

70 if feature_data is None: 

71 # No feature characteristic available, skip validation 

72 return 

73 

74 # Validate optional fields against supported features 

75 if data.instantaneous_stride_length is not None and not feature_data.instantaneous_stride_length_supported: 

76 raise ValueError("Instantaneous stride length reported but not supported by device features") 

77 

78 if data.total_distance is not None and not feature_data.total_distance_supported: 

79 raise ValueError("Total distance reported but not supported by device features") 

80 

81 def _decode_value( 

82 self, data: bytearray, ctx: CharacteristicContext | None = None, *, validate: bool = True 

83 ) -> RSCMeasurementData: 

84 """Parse RSC measurement data according to Bluetooth specification. 

85 

86 Format: Flags(1) + Instantaneous Speed(2) + Instantaneous Cadence(1) + 

87 [Instantaneous Stride Length(2)] + [Total Distance(4)]. 

88 

89 Args: 

90 data: Raw bytearray from BLE characteristic. 

91 ctx: Optional CharacteristicContext providing surrounding context (may be None). 

92 validate: Whether to validate ranges (default True) 

93 

94 Returns: 

95 RSCMeasurementData containing parsed RSC data. 

96 

97 Raises: 

98 ValueError: If data format is invalid. 

99 

100 """ 

101 flags = RSCMeasurementFlags(data[0]) 

102 

103 # Parse walking or running status (bit 2: 0=Walking, 1=Running) 

104 is_running = bool(flags & RSCMeasurementFlags.WALKING_OR_RUNNING_STATUS) 

105 

106 # Parse instantaneous speed (uint16, 1/256 m/s units) 

107 speed_raw = DataParser.parse_int16(data, 1, signed=False) 

108 speed_ms = speed_raw / 256.0 # m/s 

109 

110 # Parse instantaneous cadence (uint8, 1/min units) 

111 cadence = data[3] 

112 

113 # Initialize optional fields 

114 instantaneous_stride_length = None 

115 total_distance = None 

116 

117 offset = 4 

118 

119 # Parse optional instantaneous stride length (2 bytes) if present 

120 if (RSCMeasurementFlags.INSTANTANEOUS_STRIDE_LENGTH_PRESENT in flags) and len(data) >= offset + 2: 

121 stride_length_raw = DataParser.parse_int16(data, offset, signed=False) 

122 instantaneous_stride_length = stride_length_raw / 100.0 # Convert to meters 

123 offset += 2 

124 

125 # Parse optional total distance (4 bytes) if present 

126 if (RSCMeasurementFlags.TOTAL_DISTANCE_PRESENT in flags) and len(data) >= offset + 4: 

127 total_distance_raw = DataParser.parse_int32(data, offset, signed=False) 

128 total_distance = total_distance_raw / 10.0 # Convert to meters 

129 

130 measurement_data = RSCMeasurementData( 

131 instantaneous_speed=speed_ms, 

132 instantaneous_cadence=cadence, 

133 flags=flags, 

134 is_running=is_running, 

135 instantaneous_stride_length=instantaneous_stride_length, 

136 total_distance=total_distance, 

137 ) 

138 

139 # Validate against feature characteristic if context is available 

140 if ctx is not None: 

141 self._validate_against_feature(measurement_data, ctx) 

142 

143 return measurement_data 

144 

145 def _encode_value(self, data: RSCMeasurementData) -> bytearray: 

146 """Encode RSC measurement value back to bytes. 

147 

148 Args: 

149 data: RSCMeasurementData containing RSC measurement data 

150 

151 Returns: 

152 Encoded bytes representing the RSC measurement 

153 

154 """ 

155 # Build flags based on available optional data 

156 flags = RSCMeasurementFlags(data.flags) 

157 has_stride_length = data.instantaneous_stride_length is not None 

158 has_total_distance = data.total_distance is not None 

159 

160 # Update flags to match available data 

161 if has_stride_length: 

162 flags |= RSCMeasurementFlags.INSTANTANEOUS_STRIDE_LENGTH_PRESENT 

163 if has_total_distance: 

164 flags |= RSCMeasurementFlags.TOTAL_DISTANCE_PRESENT 

165 

166 # Validate required fields 

167 speed_raw = round(data.instantaneous_speed * 256) # Convert to 1/256 m/s units 

168 if not 0 <= speed_raw <= UINT16_MAX: 

169 raise ValueError(f"Speed {data.instantaneous_speed} m/s exceeds uint16 range") 

170 

171 if not 0 <= data.instantaneous_cadence <= UINT8_MAX: 

172 raise ValueError(f"Cadence {data.instantaneous_cadence} exceeds uint8 range") 

173 

174 # Start with flags, speed, and cadence 

175 result = bytearray([int(flags)]) 

176 result.extend(DataParser.encode_int16(speed_raw, signed=False)) 

177 result.append(data.instantaneous_cadence) 

178 

179 # Add optional stride length if present 

180 if has_stride_length: 

181 if data.instantaneous_stride_length is None: 

182 raise ValueError("Stride length is required but None") 

183 stride_length = float(data.instantaneous_stride_length) 

184 stride_length_raw = round(stride_length * 100) # Convert to cm units 

185 if not 0 <= stride_length_raw <= UINT16_MAX: 

186 raise ValueError(f"Stride length {stride_length} m exceeds uint16 range") 

187 result.extend(DataParser.encode_int16(stride_length_raw, signed=False)) 

188 

189 # Add optional total distance if present 

190 if has_total_distance: 

191 if data.total_distance is None: 

192 raise ValueError("Total distance is required but None") 

193 total_distance = float(data.total_distance) 

194 total_distance_raw = round(total_distance * 10) # Convert to dm units 

195 if not 0 <= total_distance_raw <= UINT32_MAX: 

196 raise ValueError(f"Total distance {total_distance} m exceeds uint32 range") 

197 result.extend(DataParser.encode_int32(total_distance_raw, signed=False)) 

198 

199 return result