Coverage for src / bluetooth_sig / gatt / characteristics / rsc_measurement.py: 85%

88 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 20:14 +0000

1"""RSC Measurement characteristic implementation.""" 

2 

3from __future__ import annotations 

4 

5from enum import IntFlag 

6 

7import msgspec 

8 

9from ..constants import UINT8_MAX 

10from ..context import CharacteristicContext 

11from .base import BaseCharacteristic 

12from .rsc_feature import RSCFeatureCharacteristic 

13from .utils import DataParser 

14 

15 

16class RSCMeasurementFlags(IntFlag): 

17 """RSC Measurement flags as per Bluetooth SIG specification.""" 

18 

19 INSTANTANEOUS_STRIDE_LENGTH_PRESENT = 0x01 

20 TOTAL_DISTANCE_PRESENT = 0x02 

21 

22 

23class RSCMeasurementData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods 

24 """Parsed data from RSC Measurement characteristic.""" 

25 

26 instantaneous_speed: float # m/s 

27 instantaneous_cadence: int # steps per minute 

28 flags: RSCMeasurementFlags 

29 instantaneous_stride_length: float | None = None # meters 

30 total_distance: float | None = None # meters 

31 

32 def __post_init__(self) -> None: 

33 """Validate RSC measurement data.""" 

34 if not 0 <= self.flags <= UINT8_MAX: 

35 raise ValueError("Flags must be a uint8 value (0-UINT8_MAX)") 

36 if not 0 <= self.instantaneous_cadence <= UINT8_MAX: 

37 raise ValueError("Cadence must be a uint8 value (0-UINT8_MAX)") 

38 

39 

40class RSCMeasurementCharacteristic(BaseCharacteristic[RSCMeasurementData]): 

41 """RSC (Running Speed and Cadence) Measurement characteristic (0x2A53). 

42 

43 Used to transmit running speed and cadence data. 

44 """ 

45 

46 # Declare optional dependency on RSC Feature for validation 

47 # This ensures RSC Feature is parsed first when both are present 

48 _optional_dependencies = [RSCFeatureCharacteristic] 

49 

50 min_length: int = 4 # Flags(1) + Speed(2) + Cadence(1) 

51 allow_variable_length: bool = True # Optional stride length and total distance 

52 

53 def _validate_against_feature(self, data: RSCMeasurementData, ctx: CharacteristicContext) -> None: 

54 """Validate RSC measurement data against supported features. 

55 

56 Args: 

57 data: Parsed RSC measurement data to validate. 

58 ctx: CharacteristicContext containing other characteristics. 

59 

60 Raises: 

61 ValueError: If measurement reports unsupported features. 

62 

63 """ 

64 # Get RSC Feature characteristic from context 

65 feature_data = self.get_context_characteristic(ctx, RSCFeatureCharacteristic) 

66 if feature_data is None: 

67 # No feature characteristic available, skip validation 

68 return 

69 

70 # Validate optional fields against supported features 

71 if data.instantaneous_stride_length is not None and not feature_data.instantaneous_stride_length_supported: 

72 raise ValueError("Instantaneous stride length reported but not supported by device features") 

73 

74 if data.total_distance is not None and not feature_data.total_distance_supported: 

75 raise ValueError("Total distance reported but not supported by device features") 

76 

77 def _decode_value(self, data: bytearray, ctx: CharacteristicContext | None = None) -> RSCMeasurementData: 

78 """Parse RSC measurement data according to Bluetooth specification. 

79 

80 Format: Flags(1) + Instantaneous Speed(2) + Instantaneous Cadence(1) + 

81 [Instantaneous Stride Length(2)] + [Total Distance(4)]. 

82 

83 Args: 

84 data: Raw bytearray from BLE characteristic. 

85 ctx: Optional CharacteristicContext providing surrounding context (may be None). 

86 

87 Returns: 

88 RSCMeasurementData containing parsed RSC data. 

89 

90 Raises: 

91 ValueError: If data format is invalid. 

92 

93 """ 

94 if len(data) < 4: 

95 raise ValueError("RSC Measurement data must be at least 4 bytes") 

96 

97 flags = RSCMeasurementFlags(data[0]) 

98 

99 # Parse instantaneous speed (uint16, 1/256 m/s units) 

100 speed_raw = DataParser.parse_int16(data, 1, signed=False) 

101 speed_ms = speed_raw / 256.0 # m/s 

102 

103 # Parse instantaneous cadence (uint8, 1/min units) 

104 cadence = data[3] 

105 

106 # Initialize optional fields 

107 instantaneous_stride_length = None 

108 total_distance = None 

109 

110 offset = 4 

111 

112 # Parse optional instantaneous stride length (2 bytes) if present 

113 if (RSCMeasurementFlags.INSTANTANEOUS_STRIDE_LENGTH_PRESENT in flags) and len(data) >= offset + 2: 

114 stride_length_raw = DataParser.parse_int16(data, offset, signed=False) 

115 instantaneous_stride_length = stride_length_raw / 100.0 # Convert to meters 

116 offset += 2 

117 

118 # Parse optional total distance (4 bytes) if present 

119 if (RSCMeasurementFlags.TOTAL_DISTANCE_PRESENT in flags) and len(data) >= offset + 4: 

120 total_distance_raw = DataParser.parse_int32(data, offset, signed=False) 

121 total_distance = total_distance_raw / 10.0 # Convert to meters 

122 

123 measurement_data = RSCMeasurementData( 

124 instantaneous_speed=speed_ms, 

125 instantaneous_cadence=cadence, 

126 flags=flags, 

127 instantaneous_stride_length=instantaneous_stride_length, 

128 total_distance=total_distance, 

129 ) 

130 

131 # Validate against feature characteristic if context is available 

132 if ctx is not None: 

133 self._validate_against_feature(measurement_data, ctx) 

134 

135 return measurement_data 

136 

137 def _encode_value(self, data: RSCMeasurementData) -> bytearray: 

138 """Encode RSC measurement value back to bytes. 

139 

140 Args: 

141 data: RSCMeasurementData containing RSC measurement data 

142 

143 Returns: 

144 Encoded bytes representing the RSC measurement 

145 

146 """ 

147 # Build flags based on available optional data 

148 flags = RSCMeasurementFlags(data.flags) 

149 has_stride_length = data.instantaneous_stride_length is not None 

150 has_total_distance = data.total_distance is not None 

151 

152 # Update flags to match available data 

153 if has_stride_length: 

154 flags |= RSCMeasurementFlags.INSTANTANEOUS_STRIDE_LENGTH_PRESENT 

155 if has_total_distance: 

156 flags |= RSCMeasurementFlags.TOTAL_DISTANCE_PRESENT 

157 

158 # Validate required fields 

159 speed_raw = round(data.instantaneous_speed * 256) # Convert to 1/256 m/s units 

160 if not 0 <= speed_raw <= 0xFFFF: 

161 raise ValueError(f"Speed {data.instantaneous_speed} m/s exceeds uint16 range") 

162 

163 if not 0 <= data.instantaneous_cadence <= UINT8_MAX: 

164 raise ValueError(f"Cadence {data.instantaneous_cadence} exceeds uint8 range") 

165 

166 # Start with flags, speed, and cadence 

167 result = bytearray([int(flags)]) 

168 result.extend(DataParser.encode_int16(speed_raw, signed=False)) 

169 result.append(data.instantaneous_cadence) 

170 

171 # Add optional stride length if present 

172 if has_stride_length: 

173 if data.instantaneous_stride_length is None: 

174 raise ValueError("Stride length is required but None") 

175 stride_length = float(data.instantaneous_stride_length) 

176 stride_length_raw = round(stride_length * 100) # Convert to cm units 

177 if not 0 <= stride_length_raw <= 0xFFFF: 

178 raise ValueError(f"Stride length {stride_length} m exceeds uint16 range") 

179 result.extend(DataParser.encode_int16(stride_length_raw, signed=False)) 

180 

181 # Add optional total distance if present 

182 if has_total_distance: 

183 if data.total_distance is None: 

184 raise ValueError("Total distance is required but None") 

185 total_distance = float(data.total_distance) 

186 total_distance_raw = round(total_distance * 10) # Convert to dm units 

187 if not 0 <= total_distance_raw <= 0xFFFFFFFF: 

188 raise ValueError(f"Total distance {total_distance} m exceeds uint32 range") 

189 result.extend(DataParser.encode_int32(total_distance_raw, signed=False)) 

190 

191 return result