Coverage for src / bluetooth_sig / gatt / characteristics / cycling_power_vector.py: 95%

78 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-03 16:41 +0000

1"""Cycling Power Vector characteristic implementation.""" 

2 

3from __future__ import annotations 

4 

5from enum import IntFlag 

6 

7import msgspec 

8 

9from ..constants import SINT16_MAX, SINT16_MIN 

10from ..context import CharacteristicContext 

11from .base import BaseCharacteristic 

12from .utils import DataParser 

13 

14 

15class CyclingPowerVectorFlags(IntFlag): 

16 """Cycling Power Vector flags as per CPS v1.1 Table 3.7.""" 

17 

18 CRANK_REVOLUTION_DATA_PRESENT = 0x01 

19 FIRST_CRANK_MEASUREMENT_ANGLE_PRESENT = 0x02 

20 INSTANTANEOUS_FORCE_MAGNITUDE_ARRAY_PRESENT = 0x04 

21 INSTANTANEOUS_TORQUE_MAGNITUDE_ARRAY_PRESENT = 0x08 

22 INSTANTANEOUS_MEASUREMENT_DIRECTION_BIT0 = 0x10 

23 INSTANTANEOUS_MEASUREMENT_DIRECTION_BIT1 = 0x20 

24 

25 

26class CrankRevolutionData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods 

27 """Crank revolution data from cycling power vector.""" 

28 

29 crank_revolutions: int 

30 last_crank_event_time: float # in seconds 

31 

32 

33class CyclingPowerVectorData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods 

34 """Parsed data from Cycling Power Vector characteristic. 

35 

36 Used for both parsing and encoding - all fields are properly typed. 

37 """ 

38 

39 flags: CyclingPowerVectorFlags 

40 crank_revolution_data: CrankRevolutionData | None = None 

41 first_crank_measurement_angle: float | None = None 

42 instantaneous_force_magnitude_array: tuple[float, ...] | None = None 

43 instantaneous_torque_magnitude_array: tuple[float, ...] | None = None 

44 instantaneous_measurement_direction: int = 0 

45 

46 

47class CyclingPowerVectorCharacteristic(BaseCharacteristic[CyclingPowerVectorData]): 

48 """Cycling Power Vector characteristic (0x2A64). 

49 

50 Used to transmit detailed cycling power vector data including force 

51 and torque measurements at different crank angles. 

52 """ 

53 

54 # Variable length: min 1 byte (flags only), optional crank data + angle + arrays 

55 min_length = 1 

56 allow_variable_length = True 

57 

58 _manual_unit: str = "various" # Multiple units in vector data 

59 

60 _DIRECTION_MASK = 0x30 

61 _DIRECTION_SHIFT = 4 

62 

63 def _decode_value( 

64 self, data: bytearray, ctx: CharacteristicContext | None = None, *, validate: bool = True 

65 ) -> CyclingPowerVectorData: # pylint: disable=too-many-locals # Vector data with multiple array fields 

66 """Parse cycling power vector data according to CPS v1.1. 

67 

68 Format: Flags(1) + [Crank Revolutions(2) + Last Crank Event Time(2)] + 

69 [First Crank Measurement Angle(2)] + [Force Magnitude Array(sint16[])] + 

70 [Torque Magnitude Array(sint16[])] 

71 

72 Args: 

73 data: Raw bytearray from BLE characteristic. 

74 ctx: Optional CharacteristicContext providing surrounding context (may be None). 

75 validate: Whether to validate ranges (default True) 

76 

77 Returns: 

78 CyclingPowerVectorData containing parsed cycling power vector data. 

79 

80 """ 

81 flags = CyclingPowerVectorFlags(data[0]) 

82 offset = 1 

83 

84 crank_revolution_data: CrankRevolutionData | None = None 

85 first_crank_measurement_angle: float | None = None 

86 

87 # Parse crank revolution data if present (bit 0) 

88 if (flags & CyclingPowerVectorFlags.CRANK_REVOLUTION_DATA_PRESENT) and len(data) >= offset + 4: 

89 crank_revolutions = DataParser.parse_int16(data, offset, signed=False) 

90 crank_event_time_raw = DataParser.parse_int16(data, offset + 2, signed=False) 

91 crank_revolution_data = CrankRevolutionData( 

92 crank_revolutions=crank_revolutions, 

93 last_crank_event_time=crank_event_time_raw / 1024.0, 

94 ) 

95 offset += 4 

96 

97 # Parse first crank measurement angle if present (bit 1) 

98 if (flags & CyclingPowerVectorFlags.FIRST_CRANK_MEASUREMENT_ANGLE_PRESENT) and len(data) >= offset + 2: 

99 first_crank_measurement_angle = DataParser.parse_int16(data, offset, signed=False) / 1.0 

100 offset += 2 

101 

102 # Extract instantaneous measurement direction (bits 4-5) 

103 direction = (int(flags) & self._DIRECTION_MASK) >> self._DIRECTION_SHIFT 

104 

105 force_magnitudes_list: list[float] = [] 

106 torque_magnitudes_list: list[float] = [] 

107 

108 # Parse force magnitude array if present (bit 2) 

109 if flags & CyclingPowerVectorFlags.INSTANTANEOUS_FORCE_MAGNITUDE_ARRAY_PRESENT: 

110 while offset + 2 <= len(data): 

111 force_raw = DataParser.parse_int16(data, offset, signed=True) 

112 force_magnitudes_list.append(float(force_raw)) 

113 offset += 2 

114 

115 # Parse torque magnitude array if present (bit 3) 

116 if flags & CyclingPowerVectorFlags.INSTANTANEOUS_TORQUE_MAGNITUDE_ARRAY_PRESENT: 

117 while offset + 2 <= len(data): 

118 torque_raw = DataParser.parse_int16(data, offset, signed=True) 

119 torque_magnitudes_list.append(torque_raw / 32.0) 

120 offset += 2 

121 

122 return CyclingPowerVectorData( 

123 flags=flags, 

124 crank_revolution_data=crank_revolution_data, 

125 first_crank_measurement_angle=first_crank_measurement_angle, 

126 instantaneous_force_magnitude_array=tuple(force_magnitudes_list) if force_magnitudes_list else None, 

127 instantaneous_torque_magnitude_array=tuple(torque_magnitudes_list) if torque_magnitudes_list else None, 

128 instantaneous_measurement_direction=direction, 

129 ) 

130 

131 def _encode_value(self, data: CyclingPowerVectorData) -> bytearray: # pylint: disable=too-many-branches # Complex cycling power vector with optional fields 

132 """Encode cycling power vector value back to bytes. 

133 

134 Args: 

135 data: CyclingPowerVectorData containing cycling power vector data 

136 

137 Returns: 

138 Encoded bytes representing the power vector 

139 

140 """ 

141 flags = int(data.flags) 

142 

143 result = bytearray([flags]) 

144 

145 # Encode crank revolution data if present 

146 if data.crank_revolution_data is not None: 

147 crank_revolutions = data.crank_revolution_data.crank_revolutions 

148 crank_event_time_raw = round(data.crank_revolution_data.last_crank_event_time * 1024) 

149 result.extend(DataParser.encode_int16(crank_revolutions, signed=False)) 

150 result.extend(DataParser.encode_int16(crank_event_time_raw, signed=False)) 

151 

152 # Encode first crank measurement angle if present 

153 if data.first_crank_measurement_angle is not None: 

154 result.extend(DataParser.encode_int16(round(data.first_crank_measurement_angle), signed=False)) 

155 

156 # Encode force magnitude array if present 

157 if data.instantaneous_force_magnitude_array is not None: 

158 for force in data.instantaneous_force_magnitude_array: 

159 force_val = int(force) 

160 if SINT16_MIN <= force_val <= SINT16_MAX: 

161 result.extend(DataParser.encode_int16(force_val, signed=True)) 

162 

163 # Encode torque magnitude array if present 

164 if data.instantaneous_torque_magnitude_array is not None: 

165 for torque in data.instantaneous_torque_magnitude_array: 

166 torque_val = int(torque * 32) 

167 if SINT16_MIN <= torque_val <= SINT16_MAX: 

168 result.extend(DataParser.encode_int16(torque_val, signed=True)) 

169 

170 return result