Coverage for src / bluetooth_sig / gatt / characteristics / rsc_measurement.py: 86%

87 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 11:17 +0000

1"""RSC Measurement characteristic implementation.""" 

2 

3from __future__ import annotations 

4 

5from enum import IntFlag 

6from typing import Any, ClassVar 

7 

8import msgspec 

9 

10from ..constants import UINT8_MAX, UINT16_MAX, UINT32_MAX 

11from ..context import CharacteristicContext 

12from .base import BaseCharacteristic 

13from .rsc_feature import RSCFeatureCharacteristic 

14from .utils import DataParser 

15 

16 

17class RSCMeasurementFlags(IntFlag): 

18 """RSC Measurement flags as per Bluetooth SIG specification.""" 

19 

20 INSTANTANEOUS_STRIDE_LENGTH_PRESENT = 0x01 

21 TOTAL_DISTANCE_PRESENT = 0x02 

22 

23 

24class RSCMeasurementData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods 

25 """Parsed data from RSC Measurement characteristic.""" 

26 

27 instantaneous_speed: float # m/s 

28 instantaneous_cadence: int # steps per minute 

29 flags: RSCMeasurementFlags 

30 instantaneous_stride_length: float | None = None # meters 

31 total_distance: float | None = None # meters 

32 

33 def __post_init__(self) -> None: 

34 """Validate RSC measurement data.""" 

35 if not 0 <= self.flags <= UINT8_MAX: 

36 raise ValueError("Flags must be a uint8 value (0-UINT8_MAX)") 

37 if not 0 <= self.instantaneous_cadence <= UINT8_MAX: 

38 raise ValueError("Cadence must be a uint8 value (0-UINT8_MAX)") 

39 

40 

41class RSCMeasurementCharacteristic(BaseCharacteristic[RSCMeasurementData]): 

42 """RSC (Running Speed and Cadence) Measurement characteristic (0x2A53). 

43 

44 Used to transmit running speed and cadence data. 

45 """ 

46 

47 # Declare optional dependency on RSC Feature for validation 

48 # This ensures RSC Feature is parsed first when both are present 

49 _optional_dependencies: ClassVar[list[type[BaseCharacteristic[Any]]]] = [RSCFeatureCharacteristic] 

50 

51 min_length: int = 4 # Flags(1) + Speed(2) + Cadence(1) 

52 allow_variable_length: bool = True # Optional stride length and total distance 

53 

54 def _validate_against_feature(self, data: RSCMeasurementData, ctx: CharacteristicContext) -> None: 

55 """Validate RSC measurement data against supported features. 

56 

57 Args: 

58 data: Parsed RSC measurement data to validate. 

59 ctx: CharacteristicContext containing other characteristics. 

60 validate: Whether to validate ranges (default True) 

61 

62 Raises: 

63 ValueError: If measurement reports unsupported features. 

64 

65 """ 

66 # Get RSC Feature characteristic from context 

67 feature_data = self.get_context_characteristic(ctx, RSCFeatureCharacteristic) 

68 if feature_data is None: 

69 # No feature characteristic available, skip validation 

70 return 

71 

72 # Validate optional fields against supported features 

73 if data.instantaneous_stride_length is not None and not feature_data.instantaneous_stride_length_supported: 

74 raise ValueError("Instantaneous stride length reported but not supported by device features") 

75 

76 if data.total_distance is not None and not feature_data.total_distance_supported: 

77 raise ValueError("Total distance reported but not supported by device features") 

78 

79 def _decode_value( 

80 self, data: bytearray, ctx: CharacteristicContext | None = None, *, validate: bool = True 

81 ) -> RSCMeasurementData: 

82 """Parse RSC measurement data according to Bluetooth specification. 

83 

84 Format: Flags(1) + Instantaneous Speed(2) + Instantaneous Cadence(1) + 

85 [Instantaneous Stride Length(2)] + [Total Distance(4)]. 

86 

87 Args: 

88 data: Raw bytearray from BLE characteristic. 

89 ctx: Optional CharacteristicContext providing surrounding context (may be None). 

90 validate: Whether to validate ranges (default True) 

91 

92 Returns: 

93 RSCMeasurementData containing parsed RSC data. 

94 

95 Raises: 

96 ValueError: If data format is invalid. 

97 

98 """ 

99 flags = RSCMeasurementFlags(data[0]) 

100 

101 # Parse instantaneous speed (uint16, 1/256 m/s units) 

102 speed_raw = DataParser.parse_int16(data, 1, signed=False) 

103 speed_ms = speed_raw / 256.0 # m/s 

104 

105 # Parse instantaneous cadence (uint8, 1/min units) 

106 cadence = data[3] 

107 

108 # Initialize optional fields 

109 instantaneous_stride_length = None 

110 total_distance = None 

111 

112 offset = 4 

113 

114 # Parse optional instantaneous stride length (2 bytes) if present 

115 if (RSCMeasurementFlags.INSTANTANEOUS_STRIDE_LENGTH_PRESENT in flags) and len(data) >= offset + 2: 

116 stride_length_raw = DataParser.parse_int16(data, offset, signed=False) 

117 instantaneous_stride_length = stride_length_raw / 100.0 # Convert to meters 

118 offset += 2 

119 

120 # Parse optional total distance (4 bytes) if present 

121 if (RSCMeasurementFlags.TOTAL_DISTANCE_PRESENT in flags) and len(data) >= offset + 4: 

122 total_distance_raw = DataParser.parse_int32(data, offset, signed=False) 

123 total_distance = total_distance_raw / 10.0 # Convert to meters 

124 

125 measurement_data = RSCMeasurementData( 

126 instantaneous_speed=speed_ms, 

127 instantaneous_cadence=cadence, 

128 flags=flags, 

129 instantaneous_stride_length=instantaneous_stride_length, 

130 total_distance=total_distance, 

131 ) 

132 

133 # Validate against feature characteristic if context is available 

134 if ctx is not None: 

135 self._validate_against_feature(measurement_data, ctx) 

136 

137 return measurement_data 

138 

139 def _encode_value(self, data: RSCMeasurementData) -> bytearray: 

140 """Encode RSC measurement value back to bytes. 

141 

142 Args: 

143 data: RSCMeasurementData containing RSC measurement data 

144 

145 Returns: 

146 Encoded bytes representing the RSC measurement 

147 

148 """ 

149 # Build flags based on available optional data 

150 flags = RSCMeasurementFlags(data.flags) 

151 has_stride_length = data.instantaneous_stride_length is not None 

152 has_total_distance = data.total_distance is not None 

153 

154 # Update flags to match available data 

155 if has_stride_length: 

156 flags |= RSCMeasurementFlags.INSTANTANEOUS_STRIDE_LENGTH_PRESENT 

157 if has_total_distance: 

158 flags |= RSCMeasurementFlags.TOTAL_DISTANCE_PRESENT 

159 

160 # Validate required fields 

161 speed_raw = round(data.instantaneous_speed * 256) # Convert to 1/256 m/s units 

162 if not 0 <= speed_raw <= UINT16_MAX: 

163 raise ValueError(f"Speed {data.instantaneous_speed} m/s exceeds uint16 range") 

164 

165 if not 0 <= data.instantaneous_cadence <= UINT8_MAX: 

166 raise ValueError(f"Cadence {data.instantaneous_cadence} exceeds uint8 range") 

167 

168 # Start with flags, speed, and cadence 

169 result = bytearray([int(flags)]) 

170 result.extend(DataParser.encode_int16(speed_raw, signed=False)) 

171 result.append(data.instantaneous_cadence) 

172 

173 # Add optional stride length if present 

174 if has_stride_length: 

175 if data.instantaneous_stride_length is None: 

176 raise ValueError("Stride length is required but None") 

177 stride_length = float(data.instantaneous_stride_length) 

178 stride_length_raw = round(stride_length * 100) # Convert to cm units 

179 if not 0 <= stride_length_raw <= UINT16_MAX: 

180 raise ValueError(f"Stride length {stride_length} m exceeds uint16 range") 

181 result.extend(DataParser.encode_int16(stride_length_raw, signed=False)) 

182 

183 # Add optional total distance if present 

184 if has_total_distance: 

185 if data.total_distance is None: 

186 raise ValueError("Total distance is required but None") 

187 total_distance = float(data.total_distance) 

188 total_distance_raw = round(total_distance * 10) # Convert to dm units 

189 if not 0 <= total_distance_raw <= UINT32_MAX: 

190 raise ValueError(f"Total distance {total_distance} m exceeds uint32 range") 

191 result.extend(DataParser.encode_int32(total_distance_raw, signed=False)) 

192 

193 return result