Coverage for src / bluetooth_sig / gatt / characteristics / cycling_power_vector.py: 91%

90 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 20:14 +0000

1"""Cycling Power Vector characteristic implementation.""" 

2 

3from __future__ import annotations 

4 

5from enum import IntFlag 

6 

7import msgspec 

8 

9from ..constants import SINT16_MAX, SINT16_MIN, UINT8_MAX 

10from ..context import CharacteristicContext 

11from .base import BaseCharacteristic 

12from .utils import DataParser 

13 

14 

15class CyclingPowerVectorFlags(IntFlag): 

16 """Cycling Power Vector flags as per Bluetooth SIG specification.""" 

17 

18 INSTANTANEOUS_FORCE_MAGNITUDE_ARRAY_PRESENT = 0x01 

19 INSTANTANEOUS_TORQUE_MAGNITUDE_ARRAY_PRESENT = 0x02 

20 

21 

22class CrankRevolutionData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods 

23 """Crank revolution data from cycling power vector.""" 

24 

25 crank_revolutions: int 

26 last_crank_event_time: float # in seconds 

27 

28 

29class CyclingPowerVectorData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods 

30 """Parsed data from Cycling Power Vector characteristic. 

31 

32 Used for both parsing and encoding - all fields are properly typed. 

33 """ 

34 

35 flags: CyclingPowerVectorFlags 

36 crank_revolution_data: CrankRevolutionData 

37 first_crank_measurement_angle: float 

38 instantaneous_force_magnitude_array: tuple[float, ...] | None = None 

39 instantaneous_torque_magnitude_array: tuple[float, ...] | None = None 

40 

41 def __post_init__(self) -> None: 

42 """Validate cycling power vector data.""" 

43 if not 0 <= int(self.flags) <= UINT8_MAX: 

44 raise ValueError("Flags must be a uint8 value (0-UINT8_MAX)") 

45 if not 0 <= self.first_crank_measurement_angle <= 360: 

46 raise ValueError("First crank measurement angle must be 0-360 degrees") 

47 

48 

49class CyclingPowerVectorCharacteristic(BaseCharacteristic[CyclingPowerVectorData]): 

50 """Cycling Power Vector characteristic (0x2A64). 

51 

52 Used to transmit detailed cycling power vector data including force 

53 and torque measurements at different crank angles. 

54 """ 

55 

56 # Variable length: min 7 bytes (flags:1 + crank_revs:2 + crank_time:2 + angle:2), plus optional arrays 

57 min_length = 7 

58 allow_variable_length = True 

59 

60 _manual_unit: str = "various" # Multiple units in vector data 

61 

62 def _decode_value(self, data: bytearray, ctx: CharacteristicContext | None = None) -> CyclingPowerVectorData: 

63 """Parse cycling power vector data according to Bluetooth specification. 

64 

65 Format: Flags(1) + Crank Revolution Data(2) + Last Crank Event Time(2) + 

66 First Crank Measurement Angle(2) + [Instantaneous Force Magnitude Array] + 

67 [Instantaneous Torque Magnitude Array] 

68 

69 Args: 

70 data: Raw bytearray from BLE characteristic. 

71 ctx: Optional CharacteristicContext providing surrounding context (may be None). 

72 

73 Returns: 

74 CyclingPowerVectorData containing parsed cycling power vector data. 

75 

76 # `ctx` is intentionally unused in this implementation; mark as used 

77 # so linters do not report an unused-argument error. 

78 del ctx 

79 Raises: 

80 ValueError: If data format is invalid. 

81 

82 """ 

83 if len(data) < 7: 

84 raise ValueError("Cycling Power Vector data must be at least 7 bytes") 

85 

86 flags = CyclingPowerVectorFlags(data[0]) 

87 

88 # Parse crank revolution data (2 bytes) 

89 crank_revolutions = DataParser.parse_int16(data, 1, signed=False) 

90 

91 # Parse last crank event time (2 bytes, 1/1024 second units) 

92 crank_event_time_raw = DataParser.parse_int16(data, 3, signed=False) 

93 crank_event_time = crank_event_time_raw / 1024.0 

94 

95 # Parse first crank measurement angle (2 bytes, 1/180 degree units) 

96 first_angle_raw = DataParser.parse_int16(data, 5, signed=False) 

97 first_angle = first_angle_raw / 180.0 # Convert to degrees 

98 

99 # Create crank revolution data 

100 crank_revolution_data = CrankRevolutionData( 

101 crank_revolutions=crank_revolutions, last_crank_event_time=crank_event_time 

102 ) 

103 

104 offset = 7 

105 force_magnitudes_list: list[float] = [] 

106 torque_magnitudes_list: list[float] = [] 

107 

108 # Parse optional instantaneous force magnitude array if present 

109 if (flags & CyclingPowerVectorFlags.INSTANTANEOUS_FORCE_MAGNITUDE_ARRAY_PRESENT) and len(data) > offset: 

110 # Each force magnitude is 2 bytes (signed 16-bit, 1 N units) 

111 while offset + 1 < len(data) and not ( 

112 flags & CyclingPowerVectorFlags.INSTANTANEOUS_TORQUE_MAGNITUDE_ARRAY_PRESENT 

113 ): # Stop if torque data follows 

114 if offset + 2 > len(data): 

115 break 

116 force_raw = DataParser.parse_int16(data, offset, signed=True) 

117 force_magnitudes_list.append(float(force_raw)) # Force in Newtons 

118 offset += 2 

119 

120 # Parse optional instantaneous torque magnitude array if present 

121 if (flags & CyclingPowerVectorFlags.INSTANTANEOUS_TORQUE_MAGNITUDE_ARRAY_PRESENT) and len(data) > offset: 

122 # Each torque magnitude is 2 bytes (signed 16-bit, 1/32 Nm units) 

123 while offset + 1 < len(data): 

124 if offset + 2 > len(data): 

125 break 

126 torque_raw = DataParser.parse_int16(data, offset, signed=True) 

127 torque_magnitudes_list.append(torque_raw / 32.0) # Convert to Nm 

128 offset += 2 

129 

130 return CyclingPowerVectorData( 

131 flags=flags, 

132 crank_revolution_data=crank_revolution_data, 

133 first_crank_measurement_angle=first_angle, 

134 instantaneous_force_magnitude_array=tuple(force_magnitudes_list) if force_magnitudes_list else None, 

135 instantaneous_torque_magnitude_array=tuple(torque_magnitudes_list) if torque_magnitudes_list else None, 

136 ) 

137 

138 def _encode_value(self, data: CyclingPowerVectorData) -> bytearray: # pylint: disable=too-many-branches # Complex cycling power vector with optional fields 

139 """Encode cycling power vector value back to bytes. 

140 

141 Args: 

142 data: CyclingPowerVectorData containing cycling power vector data 

143 

144 Returns: 

145 Encoded bytes representing the power vector 

146 

147 """ 

148 if not isinstance(data, CyclingPowerVectorData): 

149 raise TypeError(f"Cycling power vector data must be a CyclingPowerVectorData, got {type(data).__name__}") 

150 

151 # Extract values from dataclass 

152 crank_revolutions = data.crank_revolution_data.crank_revolutions 

153 crank_event_time = data.crank_revolution_data.last_crank_event_time 

154 first_angle = data.first_crank_measurement_angle 

155 

156 # Build flags based on optional arrays 

157 flags = data.flags 

158 if data.instantaneous_force_magnitude_array is not None: 

159 flags |= ( 

160 CyclingPowerVectorFlags.INSTANTANEOUS_FORCE_MAGNITUDE_ARRAY_PRESENT 

161 ) # Force magnitude array present 

162 if data.instantaneous_torque_magnitude_array is not None: 

163 flags |= ( 

164 CyclingPowerVectorFlags.INSTANTANEOUS_TORQUE_MAGNITUDE_ARRAY_PRESENT 

165 ) # Torque magnitude array present 

166 

167 # Convert values to raw format 

168 crank_event_time_raw = round(crank_event_time * 1024) # 1/1024 second units 

169 first_angle_raw = round(first_angle * 180) # 1/180 degree units 

170 

171 # Validate ranges 

172 if not 0 <= crank_revolutions <= 0xFFFF: 

173 raise ValueError(f"Crank revolutions {crank_revolutions} exceeds uint16 range") 

174 if not 0 <= crank_event_time_raw <= 0xFFFF: 

175 raise ValueError(f"Crank event time {crank_event_time_raw} exceeds uint16 range") 

176 if not 0 <= first_angle_raw <= 0xFFFF: 

177 raise ValueError(f"First angle {first_angle_raw} exceeds uint16 range") 

178 

179 # Build result 

180 result = bytearray([int(flags)]) 

181 result.extend(DataParser.encode_int16(crank_revolutions, signed=False)) 

182 result.extend(DataParser.encode_int16(crank_event_time_raw, signed=False)) 

183 result.extend(DataParser.encode_int16(first_angle_raw, signed=False)) 

184 

185 # Add force magnitude array if present 

186 if data.instantaneous_force_magnitude_array is not None: 

187 for force in data.instantaneous_force_magnitude_array: 

188 force_val = int(force) 

189 if SINT16_MIN <= force_val <= SINT16_MAX: # signed 16-bit range 

190 result.extend(DataParser.encode_int16(force_val, signed=True)) 

191 

192 # Add torque magnitude array if present 

193 if data.instantaneous_torque_magnitude_array is not None: 

194 for torque in data.instantaneous_torque_magnitude_array: 

195 torque_val = int(torque * 32) # Convert back to 1/32 Nm units 

196 if SINT16_MIN <= torque_val <= SINT16_MAX: # signed 16-bit range 

197 result.extend(DataParser.encode_int16(torque_val, signed=True)) 

198 

199 return result