Coverage for src / bluetooth_sig / gatt / characteristics / navigation.py: 93%
95 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
1"""Navigation characteristic implementation."""
3from __future__ import annotations
5from datetime import datetime
6from enum import IntEnum, IntFlag
8import msgspec
10from ...types.location import PositionStatus
11from ..context import CharacteristicContext
12from .base import BaseCharacteristic
13from .utils import DataParser, IEEE11073Parser
16class NavigationFlags(IntFlag):
17 """Navigation flags as per Bluetooth SIG specification."""
19 REMAINING_DISTANCE_PRESENT = 0x0001
20 REMAINING_VERTICAL_DISTANCE_PRESENT = 0x0002
21 ESTIMATED_TIME_OF_ARRIVAL_PRESENT = 0x0004
24class NavigationIndicatorType(IntEnum):
25 """Navigation indicator type enumeration."""
27 TO_WAYPOINT = 0
28 TO_DESTINATION = 1
31class NavigationData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods,too-many-instance-attributes
32 """Parsed data from Navigation characteristic."""
34 flags: NavigationFlags
35 bearing: float
36 heading: float
37 remaining_distance: float | None = None
38 remaining_vertical_distance: float | None = None
39 estimated_time_of_arrival: datetime | None = None
40 position_status: PositionStatus | None = None
41 heading_source: HeadingSource | None = None
42 navigation_indicator_type: NavigationIndicatorType | None = None
43 waypoint_reached: bool | None = None
44 destination_reached: bool | None = None
47class HeadingSource(IntEnum):
48 """Heading source enumeration."""
50 HEADING_BASED_ON_MOVEMENT = 0
51 HEADING_BASED_ON_MAGNETIC_COMPASS = 1
54class NavigationCharacteristic(BaseCharacteristic[NavigationData]):
55 """Navigation characteristic.
57 Used to represent data related to a navigation sensor.
58 """
60 min_length = 6 # Flags(2) + Bearing(2) + Heading(2) minimum
61 max_length = 19 # + RemainingDistance(3) + RemainingVerticalDistance(3) + EstimatedTimeOfArrival(7) maximum
62 allow_variable_length: bool = True # Variable optional fields
64 # Bit masks and shifts for status information in flags
65 POSITION_STATUS_MASK = 0x0006
66 POSITION_STATUS_SHIFT = 1
67 HEADING_SOURCE_MASK = 0x0020
68 HEADING_SOURCE_SHIFT = 5
69 NAVIGATION_INDICATOR_TYPE_MASK = 0x0040
70 NAVIGATION_INDICATOR_TYPE_SHIFT = 6
71 WAYPOINT_REACHED_MASK = 0x0080
72 DESTINATION_REACHED_MASK = 0x0100
74 # Maximum valid enum value for PositionStatus
75 _MAX_POSITION_STATUS_VALUE = 3
77 def _decode_value(
78 self, data: bytearray, ctx: CharacteristicContext | None = None, *, validate: bool = True
79 ) -> NavigationData: # pylint: disable=too-many-locals # Navigation spec requires many fields
80 """Parse navigation data according to Bluetooth specification.
82 Format: Flags(2) + Bearing(2) + Heading(2) + [Remaining Distance(3)] +
83 [Remaining Vertical Distance(3)] + [Estimated Time of Arrival(7)].
85 Args:
86 data: Raw bytearray from BLE characteristic
87 ctx: Optional context providing surrounding context (may be None)
88 validate: Whether to validate ranges (default True)
90 Returns:
91 NavigationData containing parsed navigation data
93 """
94 flags = NavigationFlags(DataParser.parse_int16(data, 0, signed=False))
96 # Unit is 1*10^-2 degrees
97 bearing = DataParser.parse_int16(data, 2, signed=False) / 100.0
98 heading = DataParser.parse_int16(data, 4, signed=False) / 100.0
100 # Extract status information from flags
101 position_status_bits = (flags & self.POSITION_STATUS_MASK) >> self.POSITION_STATUS_SHIFT
102 position_status = (
103 PositionStatus(position_status_bits) if position_status_bits <= self._MAX_POSITION_STATUS_VALUE else None
104 )
106 heading_source_bit = (flags & self.HEADING_SOURCE_MASK) >> self.HEADING_SOURCE_SHIFT
107 heading_source = HeadingSource(heading_source_bit)
109 navigation_indicator_type_bit = (
110 flags & self.NAVIGATION_INDICATOR_TYPE_MASK
111 ) >> self.NAVIGATION_INDICATOR_TYPE_SHIFT
112 navigation_indicator_type = NavigationIndicatorType(navigation_indicator_type_bit)
114 waypoint_reached = bool(flags & self.WAYPOINT_REACHED_MASK)
115 destination_reached = bool(flags & self.DESTINATION_REACHED_MASK)
117 # Parse optional fields
118 remaining_distance: float | None = None
119 remaining_vertical_distance: float | None = None
120 estimated_time_of_arrival: datetime | None = None
121 offset = 6
123 if (flags & NavigationFlags.REMAINING_DISTANCE_PRESENT) and len(data) >= offset + 3:
124 # Unit is 1/10 m
125 remaining_distance = DataParser.parse_int24(data, offset, signed=False) / 10.0
126 offset += 3
128 if (flags & NavigationFlags.REMAINING_VERTICAL_DISTANCE_PRESENT) and len(data) >= offset + 3:
129 # Unit is 1/100 m
130 remaining_vertical_distance = DataParser.parse_int24(data, offset, signed=True) / 100.0
131 offset += 3
133 if (flags & NavigationFlags.ESTIMATED_TIME_OF_ARRIVAL_PRESENT) and len(data) >= offset + 7:
134 estimated_time_of_arrival = IEEE11073Parser.parse_timestamp(data, offset)
136 return NavigationData(
137 flags=flags,
138 bearing=bearing,
139 heading=heading,
140 remaining_distance=remaining_distance,
141 remaining_vertical_distance=remaining_vertical_distance,
142 estimated_time_of_arrival=estimated_time_of_arrival,
143 position_status=position_status,
144 heading_source=heading_source,
145 navigation_indicator_type=navigation_indicator_type,
146 waypoint_reached=waypoint_reached,
147 destination_reached=destination_reached,
148 )
150 def _encode_value(self, data: NavigationData) -> bytearray:
151 """Encode NavigationData back to bytes.
153 Args:
154 data: NavigationData instance to encode
156 Returns:
157 Encoded bytes representing the navigation data
159 """
160 result = bytearray()
162 flags = int(data.flags)
164 # Set status bits in flags
165 if data.position_status is not None:
166 flags |= data.position_status.value << self.POSITION_STATUS_SHIFT
168 if data.heading_source is not None:
169 flags |= data.heading_source.value << self.HEADING_SOURCE_SHIFT
171 if data.navigation_indicator_type is not None:
172 flags |= data.navigation_indicator_type.value << self.NAVIGATION_INDICATOR_TYPE_SHIFT
174 if data.waypoint_reached is not None and data.waypoint_reached:
175 flags |= self.WAYPOINT_REACHED_MASK
177 if data.destination_reached is not None and data.destination_reached:
178 flags |= self.DESTINATION_REACHED_MASK
180 result.extend(DataParser.encode_int16(flags, signed=False))
182 # Unit is 1*10^-2 degrees
183 bearing_value = int(data.bearing * 100)
184 heading_value = int(data.heading * 100)
185 result.extend(DataParser.encode_int16(bearing_value, signed=False))
186 result.extend(DataParser.encode_int16(heading_value, signed=False))
188 if data.remaining_distance is not None:
189 distance_value = int(data.remaining_distance * 10)
190 result.extend(DataParser.encode_int24(distance_value, signed=False))
192 if data.remaining_vertical_distance is not None:
193 vertical_distance_value = int(data.remaining_vertical_distance * 100)
194 result.extend(DataParser.encode_int24(vertical_distance_value, signed=True))
196 if data.estimated_time_of_arrival is not None:
197 result.extend(IEEE11073Parser.encode_timestamp(data.estimated_time_of_arrival))
199 return result