Coverage for src / bluetooth_sig / gatt / characteristics / csc_measurement.py: 86%

100 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 11:17 +0000

1"""CSC Measurement characteristic implementation.""" 

2 

3from __future__ import annotations 

4 

5from enum import IntFlag 

6from typing import Any, ClassVar 

7 

8import msgspec 

9 

10from ..constants import UINT8_MAX, UINT16_MAX, UINT32_MAX 

11from ..context import CharacteristicContext 

12from .base import BaseCharacteristic 

13from .csc_feature import CSCFeatureCharacteristic, CSCFeatureData 

14from .utils import DataParser 

15 

16 

17class CSCMeasurementFlags(IntFlag): 

18 """CSC Measurement flags as per Bluetooth SIG specification.""" 

19 

20 WHEEL_REVOLUTION_DATA_PRESENT = 0x01 

21 CRANK_REVOLUTION_DATA_PRESENT = 0x02 

22 

23 

24class CSCMeasurementData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods 

25 """Parsed data from CSC Measurement characteristic.""" 

26 

27 flags: CSCMeasurementFlags 

28 cumulative_wheel_revolutions: int | None = None 

29 last_wheel_event_time: float | None = None 

30 cumulative_crank_revolutions: int | None = None 

31 last_crank_event_time: float | None = None 

32 

33 def __post_init__(self) -> None: 

34 """Validate CSC measurement data.""" 

35 if not 0 <= int(self.flags) <= UINT8_MAX: 

36 raise ValueError("Flags must be a uint8 value (0-UINT8_MAX)") 

37 

38 

39class CSCMeasurementCharacteristic(BaseCharacteristic[CSCMeasurementData]): 

40 """CSC (Cycling Speed and Cadence) Measurement characteristic (0x2A5B). 

41 

42 Used to transmit cycling speed and cadence data. 

43 """ 

44 

45 # Override automatic name resolution because "CSC" is an acronym 

46 _characteristic_name: str | None = "CSC Measurement" 

47 

48 # Declare optional dependency on CSC Feature for validation 

49 # This ensures CSC Feature is parsed first when both are present 

50 _optional_dependencies: ClassVar[list[type[BaseCharacteristic[Any]]]] = [CSCFeatureCharacteristic] 

51 

52 # Validation: min 1 byte (flags), max 11 bytes (flags + wheel + crank data) 

53 min_length = 1 

54 allow_variable_length = True 

55 max_length = 11 # flags:1 + wheel:6 + crank:4 

56 

57 # Time resolution constants 

58 CSC_TIME_RESOLUTION = 1024.0 # 1/1024 second resolution for both wheel and crank event times 

59 

60 def _decode_value( 

61 self, data: bytearray, ctx: CharacteristicContext | None = None, *, validate: bool = True 

62 ) -> CSCMeasurementData: 

63 """Parse CSC measurement data according to Bluetooth specification. 

64 

65 Format: Flags(1) + [Cumulative Wheel Revolutions(4)] + [Last Wheel Event Time(2)] + 

66 [Cumulative Crank Revolutions(2)] + [Last Crank Event Time(2)] 

67 

68 Args: 

69 data: Raw bytearray from BLE characteristic. 

70 ctx: Optional CharacteristicContext providing surrounding context (may be None). 

71 validate: Whether to validate ranges (default True) 

72 

73 Returns: 

74 CSCMeasurementData containing parsed CSC data. 

75 

76 Raises: 

77 ValueError: If data format is invalid. 

78 

79 """ 

80 flags = CSCMeasurementFlags(data[0]) 

81 offset = 1 

82 

83 # Initialize result data 

84 cumulative_wheel_revolutions = None 

85 last_wheel_event_time = None 

86 cumulative_crank_revolutions = None 

87 last_crank_event_time = None 

88 

89 # Parse optional wheel revolution data (6 bytes total) if present 

90 if (flags & CSCMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT) and len(data) >= offset + 6: 

91 wheel_revolutions = DataParser.parse_int32(data, offset, signed=False) 

92 wheel_event_time_raw = DataParser.parse_int16(data, offset + 4, signed=False) 

93 # Wheel event time is in 1/CSC_TIME_RESOLUTION second units 

94 cumulative_wheel_revolutions = wheel_revolutions 

95 last_wheel_event_time = wheel_event_time_raw / self.CSC_TIME_RESOLUTION 

96 offset += 6 

97 

98 # Parse optional crank revolution data (4 bytes total) if present 

99 if (flags & CSCMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT) and len(data) >= offset + 4: 

100 crank_revolutions = DataParser.parse_int16(data, offset, signed=False) 

101 crank_event_time_raw = DataParser.parse_int16(data, offset + 2, signed=False) 

102 # Crank event time is in 1/CSC_TIME_RESOLUTION second units 

103 cumulative_crank_revolutions = crank_revolutions 

104 last_crank_event_time = crank_event_time_raw / self.CSC_TIME_RESOLUTION 

105 

106 # Validate flags against CSC Feature if available 

107 if ctx is not None: 

108 feature_value = self.get_context_characteristic(ctx, CSCFeatureCharacteristic) 

109 if feature_value is not None: 

110 self._validate_against_feature(flags, feature_value) 

111 

112 return CSCMeasurementData( 

113 flags=flags, 

114 cumulative_wheel_revolutions=cumulative_wheel_revolutions, 

115 last_wheel_event_time=last_wheel_event_time, 

116 cumulative_crank_revolutions=cumulative_crank_revolutions, 

117 last_crank_event_time=last_crank_event_time, 

118 ) 

119 

120 def _encode_wheel_data(self, data: CSCMeasurementData) -> bytearray: 

121 """Encode wheel revolution data. 

122 

123 Args: 

124 data: CSCMeasurementData containing wheel data 

125 

126 Returns: 

127 Encoded wheel revolution bytes 

128 

129 Raises: 

130 ValueError: If wheel data is invalid or out of range 

131 

132 """ 

133 if data.cumulative_wheel_revolutions is None or data.last_wheel_event_time is None: 

134 raise ValueError("CSC wheel revolution data marked present but missing values") 

135 

136 wheel_revolutions = int(data.cumulative_wheel_revolutions) 

137 wheel_event_time = float(data.last_wheel_event_time) 

138 

139 # Validate ranges 

140 if not 0 <= wheel_revolutions <= UINT32_MAX: 

141 raise ValueError(f"Wheel revolutions {wheel_revolutions} exceeds uint32 range") 

142 

143 wheel_event_time_raw = round(wheel_event_time * self.CSC_TIME_RESOLUTION) 

144 if not 0 <= wheel_event_time_raw <= UINT16_MAX: 

145 raise ValueError(f"Wheel event time {wheel_event_time_raw} exceeds uint16 range") 

146 

147 result = bytearray() 

148 result.extend(DataParser.encode_int32(wheel_revolutions, signed=False)) 

149 result.extend(DataParser.encode_int16(wheel_event_time_raw, signed=False)) 

150 return result 

151 

152 def _encode_crank_data(self, data: CSCMeasurementData) -> bytearray: 

153 """Encode crank revolution data. 

154 

155 Args: 

156 data: CSCMeasurementData containing crank data 

157 

158 Returns: 

159 Encoded crank revolution bytes 

160 

161 Raises: 

162 ValueError: If crank data is invalid or out of range 

163 

164 """ 

165 if data.cumulative_crank_revolutions is None or data.last_crank_event_time is None: 

166 raise ValueError("CSC crank revolution data marked present but missing values") 

167 

168 crank_revolutions = int(data.cumulative_crank_revolutions) 

169 crank_event_time = float(data.last_crank_event_time) 

170 

171 # Validate ranges 

172 if not 0 <= crank_revolutions <= UINT16_MAX: 

173 raise ValueError(f"Crank revolutions {crank_revolutions} exceeds uint16 range") 

174 

175 crank_event_time_raw = round(crank_event_time * self.CSC_TIME_RESOLUTION) 

176 if not 0 <= crank_event_time_raw <= UINT16_MAX: 

177 raise ValueError(f"Crank event time {crank_event_time_raw} exceeds uint16 range") 

178 

179 result = bytearray() 

180 result.extend(DataParser.encode_int16(crank_revolutions, signed=False)) 

181 result.extend(DataParser.encode_int16(crank_event_time_raw, signed=False)) 

182 return result 

183 

184 def _encode_value(self, data: CSCMeasurementData) -> bytearray: 

185 """Encode CSC measurement value back to bytes. 

186 

187 Args: 

188 data: CSCMeasurementData containing CSC measurement data 

189 

190 Returns: 

191 Encoded bytes representing the CSC measurement 

192 

193 """ 

194 # Build flags based on available data 

195 flags = data.flags 

196 has_wheel_data = data.cumulative_wheel_revolutions is not None and data.last_wheel_event_time is not None 

197 has_crank_data = data.cumulative_crank_revolutions is not None and data.last_crank_event_time is not None 

198 

199 # Update flags to match available data 

200 if has_wheel_data: 

201 flags |= CSCMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT 

202 if has_crank_data: 

203 flags |= CSCMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT 

204 

205 # Start with flags byte 

206 result = bytearray([int(flags)]) 

207 

208 # Add wheel revolution data if present 

209 if has_wheel_data: 

210 result.extend(self._encode_wheel_data(data)) 

211 

212 # Add crank revolution data if present 

213 if has_crank_data: 

214 result.extend(self._encode_crank_data(data)) 

215 

216 return result 

217 

218 def _validate_against_feature(self, flags: int, feature_data: CSCFeatureData) -> None: 

219 """Validate measurement flags against CSC Feature characteristic. 

220 

221 Args: 

222 flags: Measurement flags indicating which data is present 

223 feature_data: CSCFeatureData from CSC Feature characteristic 

224 

225 Raises: 

226 ValueError: If reported measurement fields are not supported by device features 

227 

228 """ 

229 # Validate that reported measurement fields are supported 

230 wheel_flag = int(CSCMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT) 

231 if (flags & wheel_flag) and not feature_data.wheel_revolution_data_supported: 

232 raise ValueError("Wheel revolution data reported but not supported by CSC Feature") 

233 

234 crank_flag = int(CSCMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT) 

235 if (flags & crank_flag) and not feature_data.crank_revolution_data_supported: 

236 raise ValueError("Crank revolution data reported but not supported by CSC Feature")