Coverage for src/bluetooth_sig/gatt/characteristics/navigation.py: 93%

102 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-30 00:10 +0000

1"""Navigation characteristic implementation.""" 

2 

3from __future__ import annotations 

4 

5from datetime import datetime 

6from enum import IntEnum, IntFlag 

7 

8import msgspec 

9 

10from ...types.gatt_enums import ValueType 

11from ..context import CharacteristicContext 

12from .base import BaseCharacteristic 

13from .utils import DataParser, IEEE11073Parser 

14 

15 

16class NavigationFlags(IntFlag): 

17 """Navigation flags as per Bluetooth SIG specification.""" 

18 

19 REMAINING_DISTANCE_PRESENT = 0x0001 

20 REMAINING_VERTICAL_DISTANCE_PRESENT = 0x0002 

21 ESTIMATED_TIME_OF_ARRIVAL_PRESENT = 0x0004 

22 

23 

24class NavigationIndicatorType(IntEnum): 

25 """Navigation indicator type enumeration.""" 

26 

27 TO_WAYPOINT = 0 

28 TO_DESTINATION = 1 

29 

30 

31class NavigationData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods,too-many-instance-attributes 

32 """Parsed data from Navigation characteristic.""" 

33 

34 flags: NavigationFlags 

35 bearing: float 

36 heading: float 

37 remaining_distance: float | None = None 

38 remaining_vertical_distance: float | None = None 

39 estimated_time_of_arrival: datetime | None = None 

40 position_status: PositionStatus | None = None 

41 heading_source: HeadingSource | None = None 

42 navigation_indicator_type: NavigationIndicatorType | None = None 

43 waypoint_reached: bool | None = None 

44 destination_reached: bool | None = None 

45 

46 

47class PositionStatus(IntEnum): 

48 """Position status enumeration.""" 

49 

50 NO_POSITION = 0 

51 POSITION_OK = 1 

52 ESTIMATED_POSITION = 2 

53 LAST_KNOWN_POSITION = 3 

54 

55 

56class HeadingSource(IntEnum): 

57 """Heading source enumeration.""" 

58 

59 HEADING_BASED_ON_MOVEMENT = 0 

60 HEADING_BASED_ON_MAGNETIC_COMPASS = 1 

61 

62 

63class NavigationCharacteristic(BaseCharacteristic): 

64 """Navigation characteristic. 

65 

66 Used to represent data related to a navigation sensor. 

67 """ 

68 

69 _manual_value_type: ValueType | str | None = ValueType.DICT # Override since decode_value returns dataclass 

70 

71 min_length = 6 # Flags(2) + Bearing(2) + Heading(2) minimum 

72 max_length = 16 # + RemainingDistance(3) + RemainingVerticalDistance(3) + EstimatedTimeOfArrival(7) maximum 

73 allow_variable_length: bool = True # Variable optional fields 

74 

75 # Bit masks and shifts for status information in flags 

76 POSITION_STATUS_MASK = 0x0006 

77 POSITION_STATUS_SHIFT = 1 

78 HEADING_SOURCE_MASK = 0x0020 

79 HEADING_SOURCE_SHIFT = 5 

80 NAVIGATION_INDICATOR_TYPE_MASK = 0x0040 

81 NAVIGATION_INDICATOR_TYPE_SHIFT = 6 

82 WAYPOINT_REACHED_MASK = 0x0080 

83 DESTINATION_REACHED_MASK = 0x0100 

84 

85 def decode_value(self, data: bytearray, ctx: CharacteristicContext | None = None) -> NavigationData: # pylint: disable=too-many-locals 

86 """Parse navigation data according to Bluetooth specification. 

87 

88 Format: Flags(2) + Bearing(2) + Heading(2) + [Remaining Distance(3)] + 

89 [Remaining Vertical Distance(3)] + [Estimated Time of Arrival(7)]. 

90 

91 Args: 

92 data: Raw bytearray from BLE characteristic 

93 ctx: Optional context providing surrounding context (may be None) 

94 

95 Returns: 

96 NavigationData containing parsed navigation data 

97 

98 """ 

99 if len(data) < 6: 

100 raise ValueError("Navigation data must be at least 6 bytes") 

101 

102 flags = NavigationFlags(DataParser.parse_int16(data, 0, signed=False)) 

103 

104 # Unit is 1*10^-2 degrees 

105 bearing = DataParser.parse_int16(data, 2, signed=False) / 100.0 

106 heading = DataParser.parse_int16(data, 4, signed=False) / 100.0 

107 

108 # Extract status information from flags 

109 position_status_bits = (flags & self.POSITION_STATUS_MASK) >> self.POSITION_STATUS_SHIFT 

110 position_status = PositionStatus(position_status_bits) if position_status_bits <= 3 else None 

111 

112 heading_source_bit = (flags & self.HEADING_SOURCE_MASK) >> self.HEADING_SOURCE_SHIFT 

113 heading_source = HeadingSource(heading_source_bit) 

114 

115 navigation_indicator_type_bit = ( 

116 flags & self.NAVIGATION_INDICATOR_TYPE_MASK 

117 ) >> self.NAVIGATION_INDICATOR_TYPE_SHIFT 

118 navigation_indicator_type = NavigationIndicatorType(navigation_indicator_type_bit) 

119 

120 waypoint_reached = bool(flags & self.WAYPOINT_REACHED_MASK) 

121 destination_reached = bool(flags & self.DESTINATION_REACHED_MASK) 

122 

123 # Parse optional fields 

124 remaining_distance: float | None = None 

125 remaining_vertical_distance: float | None = None 

126 estimated_time_of_arrival: datetime | None = None 

127 offset = 6 

128 

129 if (flags & NavigationFlags.REMAINING_DISTANCE_PRESENT) and len(data) >= offset + 3: 

130 # Unit is 1/10 m 

131 remaining_distance = DataParser.parse_int24(data, offset, signed=False) / 10.0 

132 offset += 3 

133 

134 if (flags & NavigationFlags.REMAINING_VERTICAL_DISTANCE_PRESENT) and len(data) >= offset + 3: 

135 # Unit is 1/100 m 

136 remaining_vertical_distance = DataParser.parse_int24(data, offset, signed=True) / 100.0 

137 offset += 3 

138 

139 if (flags & NavigationFlags.ESTIMATED_TIME_OF_ARRIVAL_PRESENT) and len(data) >= offset + 7: 

140 estimated_time_of_arrival = IEEE11073Parser.parse_timestamp(data, offset) 

141 

142 return NavigationData( 

143 flags=flags, 

144 bearing=bearing, 

145 heading=heading, 

146 remaining_distance=remaining_distance, 

147 remaining_vertical_distance=remaining_vertical_distance, 

148 estimated_time_of_arrival=estimated_time_of_arrival, 

149 position_status=position_status, 

150 heading_source=heading_source, 

151 navigation_indicator_type=navigation_indicator_type, 

152 waypoint_reached=waypoint_reached, 

153 destination_reached=destination_reached, 

154 ) 

155 

156 def encode_value(self, data: NavigationData) -> bytearray: 

157 """Encode NavigationData back to bytes. 

158 

159 Args: 

160 data: NavigationData instance to encode 

161 

162 Returns: 

163 Encoded bytes representing the navigation data 

164 

165 """ 

166 result = bytearray() 

167 

168 flags = int(data.flags) 

169 

170 # Set status bits in flags 

171 if data.position_status is not None: 

172 flags |= data.position_status.value << self.POSITION_STATUS_SHIFT 

173 

174 if data.heading_source is not None: 

175 flags |= data.heading_source.value << self.HEADING_SOURCE_SHIFT 

176 

177 if data.navigation_indicator_type is not None: 

178 flags |= data.navigation_indicator_type.value << self.NAVIGATION_INDICATOR_TYPE_SHIFT 

179 

180 if data.waypoint_reached is not None and data.waypoint_reached: 

181 flags |= self.WAYPOINT_REACHED_MASK 

182 

183 if data.destination_reached is not None and data.destination_reached: 

184 flags |= self.DESTINATION_REACHED_MASK 

185 

186 result.extend(DataParser.encode_int16(flags, signed=False)) 

187 

188 # Unit is 1*10^-2 degrees 

189 bearing_value = int(data.bearing * 100) 

190 heading_value = int(data.heading * 100) 

191 result.extend(DataParser.encode_int16(bearing_value, signed=False)) 

192 result.extend(DataParser.encode_int16(heading_value, signed=False)) 

193 

194 if data.remaining_distance is not None: 

195 distance_value = int(data.remaining_distance * 10) 

196 result.extend(DataParser.encode_int24(distance_value, signed=False)) 

197 

198 if data.remaining_vertical_distance is not None: 

199 vertical_distance_value = int(data.remaining_vertical_distance * 100) 

200 result.extend(DataParser.encode_int24(vertical_distance_value, signed=True)) 

201 

202 if data.estimated_time_of_arrival is not None: 

203 result.extend(IEEE11073Parser.encode_timestamp(data.estimated_time_of_arrival)) 

204 

205 return result