Coverage for src / bluetooth_sig / gatt / characteristics / activity_goal.py: 69%

121 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 11:17 +0000

1"""Activity Goal characteristic (0x2B4E).""" 

2 

3from __future__ import annotations 

4 

5from enum import IntFlag 

6 

7import msgspec 

8 

9from ..constants import UINT16_MAX 

10from ..context import CharacteristicContext 

11from ..exceptions import InsufficientDataError, ValueRangeError 

12from .base import BaseCharacteristic 

13from .utils import DataParser 

14 

15 

16# Hard type for presence flags bit field 

17class ActivityGoalPresenceFlags(IntFlag): 

18 """Presence flags for Activity Goal characteristic.""" 

19 

20 TOTAL_ENERGY_EXPENDITURE = 1 << 0 

21 NORMAL_WALKING_STEPS = 1 << 1 

22 INTENSITY_STEPS = 1 << 2 

23 FLOOR_STEPS = 1 << 3 

24 DISTANCE = 1 << 4 

25 DURATION_NORMAL_WALKING = 1 << 5 

26 DURATION_INTENSITY_WALKING = 1 << 6 

27 

28 

29class ActivityGoalData(msgspec.Struct, frozen=True, kw_only=True): 

30 """Activity Goal data structure.""" 

31 

32 presence_flags: ActivityGoalPresenceFlags 

33 total_energy_expenditure: int | None = None 

34 normal_walking_steps: int | None = None 

35 intensity_steps: int | None = None 

36 floor_steps: int | None = None 

37 distance: int | None = None 

38 duration_normal_walking: int | None = None 

39 duration_intensity_walking: int | None = None 

40 

41 

42class ActivityGoalCharacteristic(BaseCharacteristic[ActivityGoalData]): 

43 """Activity Goal characteristic (0x2B4E). 

44 

45 org.bluetooth.characteristic.activity_goal 

46 

47 The Activity Goal characteristic is used to represent the goal or target of a user, 

48 such as number of steps or total energy expenditure, related to a physical activity session. 

49 """ 

50 

51 min_length: int = 1 # At least presence flags byte 

52 max_length: int = 22 # Max length with all optional fields present 

53 allow_variable_length: bool = True # Variable based on presence flags 

54 

55 def _decode_value( 

56 self, data: bytearray, ctx: CharacteristicContext | None = None, *, validate: bool = True 

57 ) -> ActivityGoalData: 

58 """Decode Activity Goal from raw bytes. 

59 

60 Args: 

61 data: Raw bytes from BLE characteristic 

62 ctx: Optional context for parsing 

63 validate: Whether to validate ranges (default True) 

64 

65 Returns: 

66 ActivityGoalData: Parsed activity goal 

67 

68 Raises: 

69 InsufficientDataError: If data is insufficient for parsing 

70 """ # pylint: disable=too-many-branches 

71 # NOTE: Required by Bluetooth SIG Activity Goal characteristic specification 

72 # Each branch corresponds to a mandatory presence flag check per spec 

73 # Refactoring would violate spec compliance and reduce readability 

74 

75 presence_flags = data[0] 

76 

77 # Start after presence flags 

78 pos = 1 

79 

80 # Parse conditional fields based on presence flags 

81 total_energy_expenditure = None 

82 if presence_flags & ActivityGoalPresenceFlags.TOTAL_ENERGY_EXPENDITURE: 

83 if len(data) < pos + 2: 

84 raise InsufficientDataError("Activity Goal", data, pos + 2) 

85 total_energy_expenditure = DataParser.parse_int16(data, offset=pos, signed=False) 

86 pos += 2 

87 

88 normal_walking_steps = None 

89 if presence_flags & ActivityGoalPresenceFlags.NORMAL_WALKING_STEPS: 

90 if len(data) < pos + 3: 

91 raise InsufficientDataError("Activity Goal", data, pos + 3) 

92 normal_walking_steps = DataParser.parse_int24(data, offset=pos, signed=False) 

93 pos += 3 

94 

95 intensity_steps = None 

96 if presence_flags & ActivityGoalPresenceFlags.INTENSITY_STEPS: 

97 if len(data) < pos + 3: 

98 raise InsufficientDataError("Activity Goal", data, pos + 3) 

99 intensity_steps = DataParser.parse_int24(data, offset=pos, signed=False) 

100 pos += 3 

101 

102 floor_steps = None 

103 if presence_flags & ActivityGoalPresenceFlags.FLOOR_STEPS: 

104 if len(data) < pos + 3: 

105 raise InsufficientDataError("Activity Goal", data, pos + 3) 

106 floor_steps = DataParser.parse_int24(data, offset=pos, signed=False) 

107 pos += 3 

108 

109 distance = None 

110 if presence_flags & ActivityGoalPresenceFlags.DISTANCE: 

111 if len(data) < pos + 3: 

112 raise InsufficientDataError("Activity Goal", data, pos + 3) 

113 distance = DataParser.parse_int24(data, offset=pos, signed=False) 

114 pos += 3 

115 

116 duration_normal_walking = None 

117 if presence_flags & ActivityGoalPresenceFlags.DURATION_NORMAL_WALKING: 

118 if len(data) < pos + 3: 

119 raise InsufficientDataError("Activity Goal", data, pos + 3) 

120 duration_normal_walking = DataParser.parse_int24(data, offset=pos, signed=False) 

121 pos += 3 

122 

123 duration_intensity_walking = None 

124 if presence_flags & ActivityGoalPresenceFlags.DURATION_INTENSITY_WALKING: 

125 if len(data) < pos + 3: 

126 raise InsufficientDataError("Activity Goal", data, pos + 3) 

127 duration_intensity_walking = DataParser.parse_int24(data, offset=pos, signed=False) 

128 pos += 3 

129 

130 return ActivityGoalData( 

131 presence_flags=ActivityGoalPresenceFlags(presence_flags), 

132 total_energy_expenditure=total_energy_expenditure, 

133 normal_walking_steps=normal_walking_steps, 

134 intensity_steps=intensity_steps, 

135 floor_steps=floor_steps, 

136 distance=distance, 

137 duration_normal_walking=duration_normal_walking, 

138 duration_intensity_walking=duration_intensity_walking, 

139 ) 

140 

141 def _encode_value(self, data: ActivityGoalData) -> bytearray: 

142 """Encode Activity Goal to raw bytes. 

143 

144 Args: 

145 data: ActivityGoalData to encode 

146 

147 Returns: 

148 bytearray: Encoded bytes 

149 """ # pylint: disable=too-many-branches 

150 # NOTE: Required by Bluetooth SIG Activity Goal characteristic specification 

151 # Each branch corresponds to a mandatory presence flag encoding per spec 

152 # Refactoring would violate spec compliance and reduce readability 

153 result = bytearray() 

154 

155 result.append(data.presence_flags) 

156 

157 # Encode conditional fields based on presence flags 

158 if data.presence_flags & ActivityGoalPresenceFlags.TOTAL_ENERGY_EXPENDITURE: 

159 if data.total_energy_expenditure is None: 

160 raise ValueRangeError("total_energy_expenditure", data.total_energy_expenditure, 0, UINT16_MAX) 

161 if not 0 <= data.total_energy_expenditure <= UINT16_MAX: 

162 raise ValueRangeError("total_energy_expenditure", data.total_energy_expenditure, 0, UINT16_MAX) 

163 result.extend(data.total_energy_expenditure.to_bytes(2, byteorder="little", signed=False)) 

164 

165 if data.presence_flags & ActivityGoalPresenceFlags.NORMAL_WALKING_STEPS: 

166 if data.normal_walking_steps is None: 

167 raise ValueRangeError("normal_walking_steps", data.normal_walking_steps, 0, UINT16_MAX) 

168 if not 0 <= data.normal_walking_steps <= UINT16_MAX: 

169 raise ValueRangeError("normal_walking_steps", data.normal_walking_steps, 0, UINT16_MAX) 

170 result.extend(data.normal_walking_steps.to_bytes(3, byteorder="little", signed=False)) 

171 

172 if data.presence_flags & ActivityGoalPresenceFlags.INTENSITY_STEPS: 

173 if data.intensity_steps is None: 

174 raise ValueRangeError("intensity_steps", data.intensity_steps, 0, UINT16_MAX) 

175 if not 0 <= data.intensity_steps <= UINT16_MAX: 

176 raise ValueRangeError("intensity_steps", data.intensity_steps, 0, UINT16_MAX) 

177 result.extend(data.intensity_steps.to_bytes(3, byteorder="little", signed=False)) 

178 

179 if data.presence_flags & ActivityGoalPresenceFlags.FLOOR_STEPS: 

180 if data.floor_steps is None: 

181 raise ValueRangeError("floor_steps", data.floor_steps, 0, UINT16_MAX) 

182 if not 0 <= data.floor_steps <= UINT16_MAX: 

183 raise ValueRangeError("floor_steps", data.floor_steps, 0, UINT16_MAX) 

184 result.extend(data.floor_steps.to_bytes(3, byteorder="little", signed=False)) 

185 

186 if data.presence_flags & ActivityGoalPresenceFlags.DISTANCE: 

187 if data.distance is None: 

188 raise ValueRangeError("distance", data.distance, 0, UINT16_MAX) 

189 if not 0 <= data.distance <= UINT16_MAX: 

190 raise ValueRangeError("distance", data.distance, 0, UINT16_MAX) 

191 result.extend(data.distance.to_bytes(3, byteorder="little", signed=False)) 

192 

193 if data.presence_flags & ActivityGoalPresenceFlags.DURATION_NORMAL_WALKING: 

194 if data.duration_normal_walking is None: 

195 raise ValueRangeError("duration_normal_walking", data.duration_normal_walking, 0, UINT16_MAX) 

196 if not 0 <= data.duration_normal_walking <= UINT16_MAX: 

197 raise ValueRangeError("duration_normal_walking", data.duration_normal_walking, 0, UINT16_MAX) 

198 result.extend(data.duration_normal_walking.to_bytes(3, byteorder="little", signed=False)) 

199 

200 if data.presence_flags & ActivityGoalPresenceFlags.DURATION_INTENSITY_WALKING: 

201 if data.duration_intensity_walking is None: 

202 raise ValueRangeError("duration_intensity_walking", data.duration_intensity_walking, 0, UINT16_MAX) 

203 if not 0 <= data.duration_intensity_walking <= UINT16_MAX: 

204 raise ValueRangeError("duration_intensity_walking", data.duration_intensity_walking, 0, UINT16_MAX) 

205 result.extend(data.duration_intensity_walking.to_bytes(3, byteorder="little", signed=False)) 

206 

207 return result