Coverage for src / bluetooth_sig / gatt / characteristics / csc_measurement.py: 85%

101 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 20:14 +0000

1"""CSC Measurement characteristic implementation.""" 

2 

3from __future__ import annotations 

4 

5from enum import IntFlag 

6 

7import msgspec 

8 

9from ..constants import UINT8_MAX 

10from ..context import CharacteristicContext 

11from .base import BaseCharacteristic 

12from .csc_feature import CSCFeatureCharacteristic, CSCFeatureData 

13from .utils import DataParser 

14 

15 

16class CSCMeasurementFlags(IntFlag): 

17 """CSC Measurement flags as per Bluetooth SIG specification.""" 

18 

19 WHEEL_REVOLUTION_DATA_PRESENT = 0x01 

20 CRANK_REVOLUTION_DATA_PRESENT = 0x02 

21 

22 

23class CSCMeasurementData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods 

24 """Parsed data from CSC Measurement characteristic.""" 

25 

26 flags: CSCMeasurementFlags 

27 cumulative_wheel_revolutions: int | None = None 

28 last_wheel_event_time: float | None = None 

29 cumulative_crank_revolutions: int | None = None 

30 last_crank_event_time: float | None = None 

31 

32 def __post_init__(self) -> None: 

33 """Validate CSC measurement data.""" 

34 if not 0 <= int(self.flags) <= UINT8_MAX: 

35 raise ValueError("Flags must be a uint8 value (0-UINT8_MAX)") 

36 

37 

38class CSCMeasurementCharacteristic(BaseCharacteristic[CSCMeasurementData]): 

39 """CSC (Cycling Speed and Cadence) Measurement characteristic (0x2A5B). 

40 

41 Used to transmit cycling speed and cadence data. 

42 """ 

43 

44 # Override automatic name resolution because "CSC" is an acronym 

45 _characteristic_name: str | None = "CSC Measurement" 

46 

47 # Declare optional dependency on CSC Feature for validation 

48 # This ensures CSC Feature is parsed first when both are present 

49 _optional_dependencies = [CSCFeatureCharacteristic] 

50 

51 # Validation: min 1 byte (flags), max 11 bytes (flags + wheel + crank data) 

52 min_length = 1 

53 allow_variable_length = True 

54 max_length = 11 # flags:1 + wheel:6 + crank:4 

55 

56 # Time resolution constants 

57 CSC_TIME_RESOLUTION = 1024.0 # 1/1024 second resolution for both wheel and crank event times 

58 

59 def _decode_value(self, data: bytearray, ctx: CharacteristicContext | None = None) -> CSCMeasurementData: 

60 """Parse CSC measurement data according to Bluetooth specification. 

61 

62 Format: Flags(1) + [Cumulative Wheel Revolutions(4)] + [Last Wheel Event Time(2)] + 

63 [Cumulative Crank Revolutions(2)] + [Last Crank Event Time(2)] 

64 

65 Args: 

66 data: Raw bytearray from BLE characteristic. 

67 ctx: Optional CharacteristicContext providing surrounding context (may be None). 

68 

69 Returns: 

70 CSCMeasurementData containing parsed CSC data. 

71 

72 Raises: 

73 ValueError: If data format is invalid. 

74 

75 """ 

76 if len(data) < 1: 

77 raise ValueError("CSC Measurement data must be at least 1 byte") 

78 

79 flags = CSCMeasurementFlags(data[0]) 

80 offset = 1 

81 

82 # Initialize result data 

83 cumulative_wheel_revolutions = None 

84 last_wheel_event_time = None 

85 cumulative_crank_revolutions = None 

86 last_crank_event_time = None 

87 

88 # Parse optional wheel revolution data (6 bytes total) if present 

89 if (flags & CSCMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT) and len(data) >= offset + 6: 

90 wheel_revolutions = DataParser.parse_int32(data, offset, signed=False) 

91 wheel_event_time_raw = DataParser.parse_int16(data, offset + 4, signed=False) 

92 # Wheel event time is in 1/CSC_TIME_RESOLUTION second units 

93 cumulative_wheel_revolutions = wheel_revolutions 

94 last_wheel_event_time = wheel_event_time_raw / self.CSC_TIME_RESOLUTION 

95 offset += 6 

96 

97 # Parse optional crank revolution data (4 bytes total) if present 

98 if (flags & CSCMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT) and len(data) >= offset + 4: 

99 crank_revolutions = DataParser.parse_int16(data, offset, signed=False) 

100 crank_event_time_raw = DataParser.parse_int16(data, offset + 2, signed=False) 

101 # Crank event time is in 1/CSC_TIME_RESOLUTION second units 

102 cumulative_crank_revolutions = crank_revolutions 

103 last_crank_event_time = crank_event_time_raw / self.CSC_TIME_RESOLUTION 

104 

105 # Validate flags against CSC Feature if available 

106 if ctx is not None: 

107 feature_value = self.get_context_characteristic(ctx, CSCFeatureCharacteristic) 

108 if feature_value is not None: 

109 self._validate_against_feature(flags, feature_value) 

110 

111 return CSCMeasurementData( 

112 flags=flags, 

113 cumulative_wheel_revolutions=cumulative_wheel_revolutions, 

114 last_wheel_event_time=last_wheel_event_time, 

115 cumulative_crank_revolutions=cumulative_crank_revolutions, 

116 last_crank_event_time=last_crank_event_time, 

117 ) 

118 

119 def _encode_wheel_data(self, data: CSCMeasurementData) -> bytearray: 

120 """Encode wheel revolution data. 

121 

122 Args: 

123 data: CSCMeasurementData containing wheel data 

124 

125 Returns: 

126 Encoded wheel revolution bytes 

127 

128 Raises: 

129 ValueError: If wheel data is invalid or out of range 

130 

131 """ 

132 if data.cumulative_wheel_revolutions is None or data.last_wheel_event_time is None: 

133 raise ValueError("CSC wheel revolution data marked present but missing values") 

134 

135 wheel_revolutions = int(data.cumulative_wheel_revolutions) 

136 wheel_event_time = float(data.last_wheel_event_time) 

137 

138 # Validate ranges 

139 if not 0 <= wheel_revolutions <= 0xFFFFFFFF: 

140 raise ValueError(f"Wheel revolutions {wheel_revolutions} exceeds uint32 range") 

141 

142 wheel_event_time_raw = round(wheel_event_time * self.CSC_TIME_RESOLUTION) 

143 if not 0 <= wheel_event_time_raw <= 0xFFFF: 

144 raise ValueError(f"Wheel event time {wheel_event_time_raw} exceeds uint16 range") 

145 

146 result = bytearray() 

147 result.extend(DataParser.encode_int32(wheel_revolutions, signed=False)) 

148 result.extend(DataParser.encode_int16(wheel_event_time_raw, signed=False)) 

149 return result 

150 

151 def _encode_crank_data(self, data: CSCMeasurementData) -> bytearray: 

152 """Encode crank revolution data. 

153 

154 Args: 

155 data: CSCMeasurementData containing crank data 

156 

157 Returns: 

158 Encoded crank revolution bytes 

159 

160 Raises: 

161 ValueError: If crank data is invalid or out of range 

162 

163 """ 

164 if data.cumulative_crank_revolutions is None or data.last_crank_event_time is None: 

165 raise ValueError("CSC crank revolution data marked present but missing values") 

166 

167 crank_revolutions = int(data.cumulative_crank_revolutions) 

168 crank_event_time = float(data.last_crank_event_time) 

169 

170 # Validate ranges 

171 if not 0 <= crank_revolutions <= 0xFFFF: 

172 raise ValueError(f"Crank revolutions {crank_revolutions} exceeds uint16 range") 

173 

174 crank_event_time_raw = round(crank_event_time * self.CSC_TIME_RESOLUTION) 

175 if not 0 <= crank_event_time_raw <= 0xFFFF: 

176 raise ValueError(f"Crank event time {crank_event_time_raw} exceeds uint16 range") 

177 

178 result = bytearray() 

179 result.extend(DataParser.encode_int16(crank_revolutions, signed=False)) 

180 result.extend(DataParser.encode_int16(crank_event_time_raw, signed=False)) 

181 return result 

182 

183 def _encode_value(self, data: CSCMeasurementData) -> bytearray: 

184 """Encode CSC measurement value back to bytes. 

185 

186 Args: 

187 data: CSCMeasurementData containing CSC measurement data 

188 

189 Returns: 

190 Encoded bytes representing the CSC measurement 

191 

192 """ 

193 # Build flags based on available data 

194 flags = data.flags 

195 has_wheel_data = data.cumulative_wheel_revolutions is not None and data.last_wheel_event_time is not None 

196 has_crank_data = data.cumulative_crank_revolutions is not None and data.last_crank_event_time is not None 

197 

198 # Update flags to match available data 

199 if has_wheel_data: 

200 flags |= CSCMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT 

201 if has_crank_data: 

202 flags |= CSCMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT 

203 

204 # Start with flags byte 

205 result = bytearray([int(flags)]) 

206 

207 # Add wheel revolution data if present 

208 if has_wheel_data: 

209 result.extend(self._encode_wheel_data(data)) 

210 

211 # Add crank revolution data if present 

212 if has_crank_data: 

213 result.extend(self._encode_crank_data(data)) 

214 

215 return result 

216 

217 def _validate_against_feature(self, flags: int, feature_data: CSCFeatureData) -> None: 

218 """Validate measurement flags against CSC Feature characteristic. 

219 

220 Args: 

221 flags: Measurement flags indicating which data is present 

222 feature_data: CSCFeatureData from CSC Feature characteristic 

223 

224 Raises: 

225 ValueError: If reported measurement fields are not supported by device features 

226 

227 """ 

228 # Validate that reported measurement fields are supported 

229 wheel_flag = int(CSCMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT) 

230 if (flags & wheel_flag) and not feature_data.wheel_revolution_data_supported: 

231 raise ValueError("Wheel revolution data reported but not supported by CSC Feature") 

232 

233 crank_flag = int(CSCMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT) 

234 if (flags & crank_flag) and not feature_data.crank_revolution_data_supported: 

235 raise ValueError("Crank revolution data reported but not supported by CSC Feature")