Coverage for src / bluetooth_sig / gatt / characteristics / navigation.py: 92%
98 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 20:14 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 20:14 +0000
1"""Navigation characteristic implementation."""
3from __future__ import annotations
5from datetime import datetime
6from enum import IntEnum, IntFlag
8import msgspec
10from ...types.gatt_enums import ValueType
11from ...types.location import PositionStatus
12from ..context import CharacteristicContext
13from .base import BaseCharacteristic
14from .utils import DataParser, IEEE11073Parser
17class NavigationFlags(IntFlag):
18 """Navigation flags as per Bluetooth SIG specification."""
20 REMAINING_DISTANCE_PRESENT = 0x0001
21 REMAINING_VERTICAL_DISTANCE_PRESENT = 0x0002
22 ESTIMATED_TIME_OF_ARRIVAL_PRESENT = 0x0004
25class NavigationIndicatorType(IntEnum):
26 """Navigation indicator type enumeration."""
28 TO_WAYPOINT = 0
29 TO_DESTINATION = 1
32class NavigationData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods,too-many-instance-attributes
33 """Parsed data from Navigation characteristic."""
35 flags: NavigationFlags
36 bearing: float
37 heading: float
38 remaining_distance: float | None = None
39 remaining_vertical_distance: float | None = None
40 estimated_time_of_arrival: datetime | None = None
41 position_status: PositionStatus | None = None
42 heading_source: HeadingSource | None = None
43 navigation_indicator_type: NavigationIndicatorType | None = None
44 waypoint_reached: bool | None = None
45 destination_reached: bool | None = None
48class HeadingSource(IntEnum):
49 """Heading source enumeration."""
51 HEADING_BASED_ON_MOVEMENT = 0
52 HEADING_BASED_ON_MAGNETIC_COMPASS = 1
55class NavigationCharacteristic(BaseCharacteristic[NavigationData]):
56 """Navigation characteristic.
58 Used to represent data related to a navigation sensor.
59 """
61 _manual_value_type: ValueType | str | None = ValueType.DICT # Override since decode_value returns dataclass
63 min_length = 6 # Flags(2) + Bearing(2) + Heading(2) minimum
64 max_length = 19 # + RemainingDistance(3) + RemainingVerticalDistance(3) + EstimatedTimeOfArrival(7) maximum
65 allow_variable_length: bool = True # Variable optional fields
67 # Bit masks and shifts for status information in flags
68 POSITION_STATUS_MASK = 0x0006
69 POSITION_STATUS_SHIFT = 1
70 HEADING_SOURCE_MASK = 0x0020
71 HEADING_SOURCE_SHIFT = 5
72 NAVIGATION_INDICATOR_TYPE_MASK = 0x0040
73 NAVIGATION_INDICATOR_TYPE_SHIFT = 6
74 WAYPOINT_REACHED_MASK = 0x0080
75 DESTINATION_REACHED_MASK = 0x0100
77 def _decode_value(self, data: bytearray, ctx: CharacteristicContext | None = None) -> NavigationData: # pylint: disable=too-many-locals
78 """Parse navigation data according to Bluetooth specification.
80 Format: Flags(2) + Bearing(2) + Heading(2) + [Remaining Distance(3)] +
81 [Remaining Vertical Distance(3)] + [Estimated Time of Arrival(7)].
83 Args:
84 data: Raw bytearray from BLE characteristic
85 ctx: Optional context providing surrounding context (may be None)
87 Returns:
88 NavigationData containing parsed navigation data
90 """
91 if len(data) < 6:
92 raise ValueError("Navigation data must be at least 6 bytes")
94 flags = NavigationFlags(DataParser.parse_int16(data, 0, signed=False))
96 # Unit is 1*10^-2 degrees
97 bearing = DataParser.parse_int16(data, 2, signed=False) / 100.0
98 heading = DataParser.parse_int16(data, 4, signed=False) / 100.0
100 # Extract status information from flags
101 position_status_bits = (flags & self.POSITION_STATUS_MASK) >> self.POSITION_STATUS_SHIFT
102 position_status = PositionStatus(position_status_bits) if position_status_bits <= 3 else None
104 heading_source_bit = (flags & self.HEADING_SOURCE_MASK) >> self.HEADING_SOURCE_SHIFT
105 heading_source = HeadingSource(heading_source_bit)
107 navigation_indicator_type_bit = (
108 flags & self.NAVIGATION_INDICATOR_TYPE_MASK
109 ) >> self.NAVIGATION_INDICATOR_TYPE_SHIFT
110 navigation_indicator_type = NavigationIndicatorType(navigation_indicator_type_bit)
112 waypoint_reached = bool(flags & self.WAYPOINT_REACHED_MASK)
113 destination_reached = bool(flags & self.DESTINATION_REACHED_MASK)
115 # Parse optional fields
116 remaining_distance: float | None = None
117 remaining_vertical_distance: float | None = None
118 estimated_time_of_arrival: datetime | None = None
119 offset = 6
121 if (flags & NavigationFlags.REMAINING_DISTANCE_PRESENT) and len(data) >= offset + 3:
122 # Unit is 1/10 m
123 remaining_distance = DataParser.parse_int24(data, offset, signed=False) / 10.0
124 offset += 3
126 if (flags & NavigationFlags.REMAINING_VERTICAL_DISTANCE_PRESENT) and len(data) >= offset + 3:
127 # Unit is 1/100 m
128 remaining_vertical_distance = DataParser.parse_int24(data, offset, signed=True) / 100.0
129 offset += 3
131 if (flags & NavigationFlags.ESTIMATED_TIME_OF_ARRIVAL_PRESENT) and len(data) >= offset + 7:
132 estimated_time_of_arrival = IEEE11073Parser.parse_timestamp(data, offset)
134 return NavigationData(
135 flags=flags,
136 bearing=bearing,
137 heading=heading,
138 remaining_distance=remaining_distance,
139 remaining_vertical_distance=remaining_vertical_distance,
140 estimated_time_of_arrival=estimated_time_of_arrival,
141 position_status=position_status,
142 heading_source=heading_source,
143 navigation_indicator_type=navigation_indicator_type,
144 waypoint_reached=waypoint_reached,
145 destination_reached=destination_reached,
146 )
148 def _encode_value(self, data: NavigationData) -> bytearray:
149 """Encode NavigationData back to bytes.
151 Args:
152 data: NavigationData instance to encode
154 Returns:
155 Encoded bytes representing the navigation data
157 """
158 result = bytearray()
160 flags = int(data.flags)
162 # Set status bits in flags
163 if data.position_status is not None:
164 flags |= data.position_status.value << self.POSITION_STATUS_SHIFT
166 if data.heading_source is not None:
167 flags |= data.heading_source.value << self.HEADING_SOURCE_SHIFT
169 if data.navigation_indicator_type is not None:
170 flags |= data.navigation_indicator_type.value << self.NAVIGATION_INDICATOR_TYPE_SHIFT
172 if data.waypoint_reached is not None and data.waypoint_reached:
173 flags |= self.WAYPOINT_REACHED_MASK
175 if data.destination_reached is not None and data.destination_reached:
176 flags |= self.DESTINATION_REACHED_MASK
178 result.extend(DataParser.encode_int16(flags, signed=False))
180 # Unit is 1*10^-2 degrees
181 bearing_value = int(data.bearing * 100)
182 heading_value = int(data.heading * 100)
183 result.extend(DataParser.encode_int16(bearing_value, signed=False))
184 result.extend(DataParser.encode_int16(heading_value, signed=False))
186 if data.remaining_distance is not None:
187 distance_value = int(data.remaining_distance * 10)
188 result.extend(DataParser.encode_int24(distance_value, signed=False))
190 if data.remaining_vertical_distance is not None:
191 vertical_distance_value = int(data.remaining_vertical_distance * 100)
192 result.extend(DataParser.encode_int24(vertical_distance_value, signed=True))
194 if data.estimated_time_of_arrival is not None:
195 result.extend(IEEE11073Parser.encode_timestamp(data.estimated_time_of_arrival))
197 return result