Coverage for src/bluetooth_sig/gatt/characteristics/csc_measurement.py: 84%

97 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-30 00:10 +0000

1"""CSC Measurement characteristic implementation.""" 

2 

3from __future__ import annotations 

4 

5from enum import IntFlag 

6 

7import msgspec 

8 

9from ..constants import UINT8_MAX 

10from ..context import CharacteristicContext 

11from .base import BaseCharacteristic 

12from .csc_feature import CSCFeatureCharacteristic, CSCFeatureData 

13from .utils import DataParser 

14 

15 

16class CSCMeasurementFlags(IntFlag): 

17 """CSC Measurement flags as per Bluetooth SIG specification.""" 

18 

19 WHEEL_REVOLUTION_DATA_PRESENT = 0x01 

20 CRANK_REVOLUTION_DATA_PRESENT = 0x02 

21 

22 

23class CSCMeasurementData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods 

24 """Parsed data from CSC Measurement characteristic.""" 

25 

26 flags: CSCMeasurementFlags 

27 cumulative_wheel_revolutions: int | None = None 

28 last_wheel_event_time: float | None = None 

29 cumulative_crank_revolutions: int | None = None 

30 last_crank_event_time: float | None = None 

31 

32 def __post_init__(self) -> None: 

33 """Validate CSC measurement data.""" 

34 if not 0 <= int(self.flags) <= UINT8_MAX: 

35 raise ValueError("Flags must be a uint8 value (0-UINT8_MAX)") 

36 

37 

38class CSCMeasurementCharacteristic(BaseCharacteristic): 

39 """CSC (Cycling Speed and Cadence) Measurement characteristic (0x2A5B). 

40 

41 Used to transmit cycling speed and cadence data. 

42 """ 

43 

44 # Override automatic name resolution because "CSC" is an acronym 

45 _characteristic_name: str | None = "CSC Measurement" 

46 

47 # Time resolution constants 

48 CSC_TIME_RESOLUTION = 1024.0 # 1/1024 second resolution for both wheel and crank event times 

49 

50 def decode_value(self, data: bytearray, ctx: CharacteristicContext | None = None) -> CSCMeasurementData: 

51 """Parse CSC measurement data according to Bluetooth specification. 

52 

53 Format: Flags(1) + [Cumulative Wheel Revolutions(4)] + [Last Wheel Event Time(2)] + 

54 [Cumulative Crank Revolutions(2)] + [Last Crank Event Time(2)] 

55 

56 Args: 

57 data: Raw bytearray from BLE characteristic. 

58 ctx: Optional CharacteristicContext providing surrounding context (may be None). 

59 

60 Returns: 

61 CSCMeasurementData containing parsed CSC data. 

62 

63 Raises: 

64 ValueError: If data format is invalid. 

65 

66 """ 

67 if len(data) < 1: 

68 raise ValueError("CSC Measurement data must be at least 1 byte") 

69 

70 flags = CSCMeasurementFlags(data[0]) 

71 offset = 1 

72 

73 # Initialize result data 

74 cumulative_wheel_revolutions = None 

75 last_wheel_event_time = None 

76 cumulative_crank_revolutions = None 

77 last_crank_event_time = None 

78 

79 # Parse optional wheel revolution data (6 bytes total) if present 

80 if (flags & CSCMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT) and len(data) >= offset + 6: 

81 wheel_revolutions = DataParser.parse_int32(data, offset, signed=False) 

82 wheel_event_time_raw = DataParser.parse_int16(data, offset + 4, signed=False) 

83 # Wheel event time is in 1/CSC_TIME_RESOLUTION second units 

84 cumulative_wheel_revolutions = wheel_revolutions 

85 last_wheel_event_time = wheel_event_time_raw / self.CSC_TIME_RESOLUTION 

86 offset += 6 

87 

88 # Parse optional crank revolution data (4 bytes total) if present 

89 if (flags & CSCMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT) and len(data) >= offset + 4: 

90 crank_revolutions = DataParser.parse_int16(data, offset, signed=False) 

91 crank_event_time_raw = DataParser.parse_int16(data, offset + 2, signed=False) 

92 # Crank event time is in 1/CSC_TIME_RESOLUTION second units 

93 cumulative_crank_revolutions = crank_revolutions 

94 last_crank_event_time = crank_event_time_raw / self.CSC_TIME_RESOLUTION 

95 

96 # Validate flags against CSC Feature if available 

97 if ctx is not None: 

98 feature_char = self.get_context_characteristic(ctx, CSCFeatureCharacteristic) 

99 if feature_char and feature_char.parse_success and feature_char.value is not None: 

100 self._validate_against_feature(flags, feature_char.value) 

101 

102 return CSCMeasurementData( 

103 flags=flags, 

104 cumulative_wheel_revolutions=cumulative_wheel_revolutions, 

105 last_wheel_event_time=last_wheel_event_time, 

106 cumulative_crank_revolutions=cumulative_crank_revolutions, 

107 last_crank_event_time=last_crank_event_time, 

108 ) 

109 

110 def _encode_wheel_data(self, data: CSCMeasurementData) -> bytearray: 

111 """Encode wheel revolution data. 

112 

113 Args: 

114 data: CSCMeasurementData containing wheel data 

115 

116 Returns: 

117 Encoded wheel revolution bytes 

118 

119 Raises: 

120 ValueError: If wheel data is invalid or out of range 

121 

122 """ 

123 if data.cumulative_wheel_revolutions is None or data.last_wheel_event_time is None: 

124 raise ValueError("CSC wheel revolution data marked present but missing values") 

125 

126 wheel_revolutions = int(data.cumulative_wheel_revolutions) 

127 wheel_event_time = float(data.last_wheel_event_time) 

128 

129 # Validate ranges 

130 if not 0 <= wheel_revolutions <= 0xFFFFFFFF: 

131 raise ValueError(f"Wheel revolutions {wheel_revolutions} exceeds uint32 range") 

132 

133 wheel_event_time_raw = round(wheel_event_time * self.CSC_TIME_RESOLUTION) 

134 if not 0 <= wheel_event_time_raw <= 0xFFFF: 

135 raise ValueError(f"Wheel event time {wheel_event_time_raw} exceeds uint16 range") 

136 

137 result = bytearray() 

138 result.extend(DataParser.encode_int32(wheel_revolutions, signed=False)) 

139 result.extend(DataParser.encode_int16(wheel_event_time_raw, signed=False)) 

140 return result 

141 

142 def _encode_crank_data(self, data: CSCMeasurementData) -> bytearray: 

143 """Encode crank revolution data. 

144 

145 Args: 

146 data: CSCMeasurementData containing crank data 

147 

148 Returns: 

149 Encoded crank revolution bytes 

150 

151 Raises: 

152 ValueError: If crank data is invalid or out of range 

153 

154 """ 

155 if data.cumulative_crank_revolutions is None or data.last_crank_event_time is None: 

156 raise ValueError("CSC crank revolution data marked present but missing values") 

157 

158 crank_revolutions = int(data.cumulative_crank_revolutions) 

159 crank_event_time = float(data.last_crank_event_time) 

160 

161 # Validate ranges 

162 if not 0 <= crank_revolutions <= 0xFFFF: 

163 raise ValueError(f"Crank revolutions {crank_revolutions} exceeds uint16 range") 

164 

165 crank_event_time_raw = round(crank_event_time * self.CSC_TIME_RESOLUTION) 

166 if not 0 <= crank_event_time_raw <= 0xFFFF: 

167 raise ValueError(f"Crank event time {crank_event_time_raw} exceeds uint16 range") 

168 

169 result = bytearray() 

170 result.extend(DataParser.encode_int16(crank_revolutions, signed=False)) 

171 result.extend(DataParser.encode_int16(crank_event_time_raw, signed=False)) 

172 return result 

173 

174 def encode_value(self, data: CSCMeasurementData) -> bytearray: 

175 """Encode CSC measurement value back to bytes. 

176 

177 Args: 

178 data: CSCMeasurementData containing CSC measurement data 

179 

180 Returns: 

181 Encoded bytes representing the CSC measurement 

182 

183 """ 

184 # Build flags based on available data 

185 flags = data.flags 

186 has_wheel_data = data.cumulative_wheel_revolutions is not None and data.last_wheel_event_time is not None 

187 has_crank_data = data.cumulative_crank_revolutions is not None and data.last_crank_event_time is not None 

188 

189 # Update flags to match available data 

190 if has_wheel_data: 

191 flags |= CSCMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT 

192 if has_crank_data: 

193 flags |= CSCMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT 

194 

195 # Start with flags byte 

196 result = bytearray([int(flags)]) 

197 

198 # Add wheel revolution data if present 

199 if has_wheel_data: 

200 result.extend(self._encode_wheel_data(data)) 

201 

202 # Add crank revolution data if present 

203 if has_crank_data: 

204 result.extend(self._encode_crank_data(data)) 

205 

206 return result 

207 

208 def _validate_against_feature(self, flags: int, feature_data: CSCFeatureData) -> None: 

209 """Validate measurement flags against CSC Feature characteristic. 

210 

211 Args: 

212 flags: Measurement flags indicating which data is present 

213 feature_data: CSCFeatureData from CSC Feature characteristic 

214 

215 Raises: 

216 ValueError: If reported measurement fields are not supported by device features 

217 

218 """ 

219 # Validate that reported measurement fields are supported 

220 wheel_flag = int(CSCMeasurementFlags.WHEEL_REVOLUTION_DATA_PRESENT) 

221 if (flags & wheel_flag) and not feature_data.wheel_revolution_data_supported: 

222 raise ValueError("Wheel revolution data reported but not supported by CSC Feature") 

223 crank_flag = int(CSCMeasurementFlags.CRANK_REVOLUTION_DATA_PRESENT) 

224 if (flags & crank_flag) and not feature_data.crank_revolution_data_supported: 

225 raise ValueError("Crank revolution data reported but not supported by CSC Feature")