Coverage for src/bluetooth_sig/gatt/characteristics/cycling_power_vector.py: 92%

88 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-30 00:10 +0000

1"""Cycling Power Vector characteristic implementation.""" 

2 

3from __future__ import annotations 

4 

5from enum import IntFlag 

6 

7import msgspec 

8 

9from ..constants import SINT16_MAX, SINT16_MIN, UINT8_MAX 

10from ..context import CharacteristicContext 

11from .base import BaseCharacteristic 

12from .utils import DataParser 

13 

14 

15class CyclingPowerVectorFlags(IntFlag): 

16 """Cycling Power Vector flags as per Bluetooth SIG specification.""" 

17 

18 INSTANTANEOUS_FORCE_MAGNITUDE_ARRAY_PRESENT = 0x01 

19 INSTANTANEOUS_TORQUE_MAGNITUDE_ARRAY_PRESENT = 0x02 

20 

21 

22class CrankRevolutionData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods 

23 """Crank revolution data from cycling power vector.""" 

24 

25 crank_revolutions: int 

26 last_crank_event_time: float # in seconds 

27 

28 

29class CyclingPowerVectorData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods 

30 """Parsed data from Cycling Power Vector characteristic. 

31 

32 Used for both parsing and encoding - all fields are properly typed. 

33 """ 

34 

35 flags: CyclingPowerVectorFlags 

36 crank_revolution_data: CrankRevolutionData 

37 first_crank_measurement_angle: float 

38 instantaneous_force_magnitude_array: tuple[float, ...] | None = None 

39 instantaneous_torque_magnitude_array: tuple[float, ...] | None = None 

40 

41 def __post_init__(self) -> None: 

42 """Validate cycling power vector data.""" 

43 if not 0 <= int(self.flags) <= UINT8_MAX: 

44 raise ValueError("Flags must be a uint8 value (0-UINT8_MAX)") 

45 if not 0 <= self.first_crank_measurement_angle <= 360: 

46 raise ValueError("First crank measurement angle must be 0-360 degrees") 

47 

48 

49class CyclingPowerVectorCharacteristic(BaseCharacteristic): 

50 """Cycling Power Vector characteristic (0x2A64). 

51 

52 Used to transmit detailed cycling power vector data including force 

53 and torque measurements at different crank angles. 

54 """ 

55 

56 _manual_unit: str = "various" # Multiple units in vector data 

57 

58 def decode_value(self, data: bytearray, ctx: CharacteristicContext | None = None) -> CyclingPowerVectorData: 

59 """Parse cycling power vector data according to Bluetooth specification. 

60 

61 Format: Flags(1) + Crank Revolution Data(2) + Last Crank Event Time(2) + 

62 First Crank Measurement Angle(2) + [Instantaneous Force Magnitude Array] + 

63 [Instantaneous Torque Magnitude Array] 

64 

65 Args: 

66 data: Raw bytearray from BLE characteristic. 

67 ctx: Optional CharacteristicContext providing surrounding context (may be None). 

68 

69 Returns: 

70 CyclingPowerVectorData containing parsed cycling power vector data. 

71 

72 # `ctx` is intentionally unused in this implementation; mark as used 

73 # so linters do not report an unused-argument error. 

74 del ctx 

75 Raises: 

76 ValueError: If data format is invalid. 

77 

78 """ 

79 if len(data) < 7: 

80 raise ValueError("Cycling Power Vector data must be at least 7 bytes") 

81 

82 flags = CyclingPowerVectorFlags(data[0]) 

83 

84 # Parse crank revolution data (2 bytes) 

85 crank_revolutions = DataParser.parse_int16(data, 1, signed=False) 

86 

87 # Parse last crank event time (2 bytes, 1/1024 second units) 

88 crank_event_time_raw = DataParser.parse_int16(data, 3, signed=False) 

89 crank_event_time = crank_event_time_raw / 1024.0 

90 

91 # Parse first crank measurement angle (2 bytes, 1/180 degree units) 

92 first_angle_raw = DataParser.parse_int16(data, 5, signed=False) 

93 first_angle = first_angle_raw / 180.0 # Convert to degrees 

94 

95 # Create crank revolution data 

96 crank_revolution_data = CrankRevolutionData( 

97 crank_revolutions=crank_revolutions, last_crank_event_time=crank_event_time 

98 ) 

99 

100 offset = 7 

101 force_magnitudes_list: list[float] = [] 

102 torque_magnitudes_list: list[float] = [] 

103 

104 # Parse optional instantaneous force magnitude array if present 

105 if (flags & CyclingPowerVectorFlags.INSTANTANEOUS_FORCE_MAGNITUDE_ARRAY_PRESENT) and len(data) > offset: 

106 # Each force magnitude is 2 bytes (signed 16-bit, 1 N units) 

107 while offset + 1 < len(data) and not ( 

108 flags & CyclingPowerVectorFlags.INSTANTANEOUS_TORQUE_MAGNITUDE_ARRAY_PRESENT 

109 ): # Stop if torque data follows 

110 if offset + 2 > len(data): 

111 break 

112 force_raw = DataParser.parse_int16(data, offset, signed=True) 

113 force_magnitudes_list.append(float(force_raw)) # Force in Newtons 

114 offset += 2 

115 

116 # Parse optional instantaneous torque magnitude array if present 

117 if (flags & CyclingPowerVectorFlags.INSTANTANEOUS_TORQUE_MAGNITUDE_ARRAY_PRESENT) and len(data) > offset: 

118 # Each torque magnitude is 2 bytes (signed 16-bit, 1/32 Nm units) 

119 while offset + 1 < len(data): 

120 if offset + 2 > len(data): 

121 break 

122 torque_raw = DataParser.parse_int16(data, offset, signed=True) 

123 torque_magnitudes_list.append(torque_raw / 32.0) # Convert to Nm 

124 offset += 2 

125 

126 return CyclingPowerVectorData( 

127 flags=flags, 

128 crank_revolution_data=crank_revolution_data, 

129 first_crank_measurement_angle=first_angle, 

130 instantaneous_force_magnitude_array=tuple(force_magnitudes_list) if force_magnitudes_list else None, 

131 instantaneous_torque_magnitude_array=tuple(torque_magnitudes_list) if torque_magnitudes_list else None, 

132 ) 

133 

134 def encode_value(self, data: CyclingPowerVectorData) -> bytearray: # pylint: disable=too-many-branches # Complex cycling power vector with optional fields 

135 """Encode cycling power vector value back to bytes. 

136 

137 Args: 

138 data: CyclingPowerVectorData containing cycling power vector data 

139 

140 Returns: 

141 Encoded bytes representing the power vector 

142 

143 """ 

144 if not isinstance(data, CyclingPowerVectorData): 

145 raise TypeError(f"Cycling power vector data must be a CyclingPowerVectorData, got {type(data).__name__}") 

146 

147 # Extract values from dataclass 

148 crank_revolutions = data.crank_revolution_data.crank_revolutions 

149 crank_event_time = data.crank_revolution_data.last_crank_event_time 

150 first_angle = data.first_crank_measurement_angle 

151 

152 # Build flags based on optional arrays 

153 flags = data.flags 

154 if data.instantaneous_force_magnitude_array is not None: 

155 flags |= ( 

156 CyclingPowerVectorFlags.INSTANTANEOUS_FORCE_MAGNITUDE_ARRAY_PRESENT 

157 ) # Force magnitude array present 

158 if data.instantaneous_torque_magnitude_array is not None: 

159 flags |= ( 

160 CyclingPowerVectorFlags.INSTANTANEOUS_TORQUE_MAGNITUDE_ARRAY_PRESENT 

161 ) # Torque magnitude array present 

162 

163 # Convert values to raw format 

164 crank_event_time_raw = round(crank_event_time * 1024) # 1/1024 second units 

165 first_angle_raw = round(first_angle * 180) # 1/180 degree units 

166 

167 # Validate ranges 

168 if not 0 <= crank_revolutions <= 0xFFFF: 

169 raise ValueError(f"Crank revolutions {crank_revolutions} exceeds uint16 range") 

170 if not 0 <= crank_event_time_raw <= 0xFFFF: 

171 raise ValueError(f"Crank event time {crank_event_time_raw} exceeds uint16 range") 

172 if not 0 <= first_angle_raw <= 0xFFFF: 

173 raise ValueError(f"First angle {first_angle_raw} exceeds uint16 range") 

174 

175 # Build result 

176 result = bytearray([int(flags)]) 

177 result.extend(DataParser.encode_int16(crank_revolutions, signed=False)) 

178 result.extend(DataParser.encode_int16(crank_event_time_raw, signed=False)) 

179 result.extend(DataParser.encode_int16(first_angle_raw, signed=False)) 

180 

181 # Add force magnitude array if present 

182 if data.instantaneous_force_magnitude_array is not None: 

183 for force in data.instantaneous_force_magnitude_array: 

184 force_val = int(force) 

185 if SINT16_MIN <= force_val <= SINT16_MAX: # signed 16-bit range 

186 result.extend(DataParser.encode_int16(force_val, signed=True)) 

187 

188 # Add torque magnitude array if present 

189 if data.instantaneous_torque_magnitude_array is not None: 

190 for torque in data.instantaneous_torque_magnitude_array: 

191 torque_val = int(torque * 32) # Convert back to 1/32 Nm units 

192 if SINT16_MIN <= torque_val <= SINT16_MAX: # signed 16-bit range 

193 result.extend(DataParser.encode_int16(torque_val, signed=True)) 

194 

195 return result