Coverage for src/bluetooth_sig/gatt/characteristics/navigation.py: 93%
102 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 00:10 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 00:10 +0000
1"""Navigation characteristic implementation."""
3from __future__ import annotations
5from datetime import datetime
6from enum import IntEnum, IntFlag
8import msgspec
10from ...types.gatt_enums import ValueType
11from ..context import CharacteristicContext
12from .base import BaseCharacteristic
13from .utils import DataParser, IEEE11073Parser
16class NavigationFlags(IntFlag):
17 """Navigation flags as per Bluetooth SIG specification."""
19 REMAINING_DISTANCE_PRESENT = 0x0001
20 REMAINING_VERTICAL_DISTANCE_PRESENT = 0x0002
21 ESTIMATED_TIME_OF_ARRIVAL_PRESENT = 0x0004
24class NavigationIndicatorType(IntEnum):
25 """Navigation indicator type enumeration."""
27 TO_WAYPOINT = 0
28 TO_DESTINATION = 1
31class NavigationData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods,too-many-instance-attributes
32 """Parsed data from Navigation characteristic."""
34 flags: NavigationFlags
35 bearing: float
36 heading: float
37 remaining_distance: float | None = None
38 remaining_vertical_distance: float | None = None
39 estimated_time_of_arrival: datetime | None = None
40 position_status: PositionStatus | None = None
41 heading_source: HeadingSource | None = None
42 navigation_indicator_type: NavigationIndicatorType | None = None
43 waypoint_reached: bool | None = None
44 destination_reached: bool | None = None
47class PositionStatus(IntEnum):
48 """Position status enumeration."""
50 NO_POSITION = 0
51 POSITION_OK = 1
52 ESTIMATED_POSITION = 2
53 LAST_KNOWN_POSITION = 3
56class HeadingSource(IntEnum):
57 """Heading source enumeration."""
59 HEADING_BASED_ON_MOVEMENT = 0
60 HEADING_BASED_ON_MAGNETIC_COMPASS = 1
63class NavigationCharacteristic(BaseCharacteristic):
64 """Navigation characteristic.
66 Used to represent data related to a navigation sensor.
67 """
69 _manual_value_type: ValueType | str | None = ValueType.DICT # Override since decode_value returns dataclass
71 min_length = 6 # Flags(2) + Bearing(2) + Heading(2) minimum
72 max_length = 16 # + RemainingDistance(3) + RemainingVerticalDistance(3) + EstimatedTimeOfArrival(7) maximum
73 allow_variable_length: bool = True # Variable optional fields
75 # Bit masks and shifts for status information in flags
76 POSITION_STATUS_MASK = 0x0006
77 POSITION_STATUS_SHIFT = 1
78 HEADING_SOURCE_MASK = 0x0020
79 HEADING_SOURCE_SHIFT = 5
80 NAVIGATION_INDICATOR_TYPE_MASK = 0x0040
81 NAVIGATION_INDICATOR_TYPE_SHIFT = 6
82 WAYPOINT_REACHED_MASK = 0x0080
83 DESTINATION_REACHED_MASK = 0x0100
85 def decode_value(self, data: bytearray, ctx: CharacteristicContext | None = None) -> NavigationData: # pylint: disable=too-many-locals
86 """Parse navigation data according to Bluetooth specification.
88 Format: Flags(2) + Bearing(2) + Heading(2) + [Remaining Distance(3)] +
89 [Remaining Vertical Distance(3)] + [Estimated Time of Arrival(7)].
91 Args:
92 data: Raw bytearray from BLE characteristic
93 ctx: Optional context providing surrounding context (may be None)
95 Returns:
96 NavigationData containing parsed navigation data
98 """
99 if len(data) < 6:
100 raise ValueError("Navigation data must be at least 6 bytes")
102 flags = NavigationFlags(DataParser.parse_int16(data, 0, signed=False))
104 # Unit is 1*10^-2 degrees
105 bearing = DataParser.parse_int16(data, 2, signed=False) / 100.0
106 heading = DataParser.parse_int16(data, 4, signed=False) / 100.0
108 # Extract status information from flags
109 position_status_bits = (flags & self.POSITION_STATUS_MASK) >> self.POSITION_STATUS_SHIFT
110 position_status = PositionStatus(position_status_bits) if position_status_bits <= 3 else None
112 heading_source_bit = (flags & self.HEADING_SOURCE_MASK) >> self.HEADING_SOURCE_SHIFT
113 heading_source = HeadingSource(heading_source_bit)
115 navigation_indicator_type_bit = (
116 flags & self.NAVIGATION_INDICATOR_TYPE_MASK
117 ) >> self.NAVIGATION_INDICATOR_TYPE_SHIFT
118 navigation_indicator_type = NavigationIndicatorType(navigation_indicator_type_bit)
120 waypoint_reached = bool(flags & self.WAYPOINT_REACHED_MASK)
121 destination_reached = bool(flags & self.DESTINATION_REACHED_MASK)
123 # Parse optional fields
124 remaining_distance: float | None = None
125 remaining_vertical_distance: float | None = None
126 estimated_time_of_arrival: datetime | None = None
127 offset = 6
129 if (flags & NavigationFlags.REMAINING_DISTANCE_PRESENT) and len(data) >= offset + 3:
130 # Unit is 1/10 m
131 remaining_distance = DataParser.parse_int24(data, offset, signed=False) / 10.0
132 offset += 3
134 if (flags & NavigationFlags.REMAINING_VERTICAL_DISTANCE_PRESENT) and len(data) >= offset + 3:
135 # Unit is 1/100 m
136 remaining_vertical_distance = DataParser.parse_int24(data, offset, signed=True) / 100.0
137 offset += 3
139 if (flags & NavigationFlags.ESTIMATED_TIME_OF_ARRIVAL_PRESENT) and len(data) >= offset + 7:
140 estimated_time_of_arrival = IEEE11073Parser.parse_timestamp(data, offset)
142 return NavigationData(
143 flags=flags,
144 bearing=bearing,
145 heading=heading,
146 remaining_distance=remaining_distance,
147 remaining_vertical_distance=remaining_vertical_distance,
148 estimated_time_of_arrival=estimated_time_of_arrival,
149 position_status=position_status,
150 heading_source=heading_source,
151 navigation_indicator_type=navigation_indicator_type,
152 waypoint_reached=waypoint_reached,
153 destination_reached=destination_reached,
154 )
156 def encode_value(self, data: NavigationData) -> bytearray:
157 """Encode NavigationData back to bytes.
159 Args:
160 data: NavigationData instance to encode
162 Returns:
163 Encoded bytes representing the navigation data
165 """
166 result = bytearray()
168 flags = int(data.flags)
170 # Set status bits in flags
171 if data.position_status is not None:
172 flags |= data.position_status.value << self.POSITION_STATUS_SHIFT
174 if data.heading_source is not None:
175 flags |= data.heading_source.value << self.HEADING_SOURCE_SHIFT
177 if data.navigation_indicator_type is not None:
178 flags |= data.navigation_indicator_type.value << self.NAVIGATION_INDICATOR_TYPE_SHIFT
180 if data.waypoint_reached is not None and data.waypoint_reached:
181 flags |= self.WAYPOINT_REACHED_MASK
183 if data.destination_reached is not None and data.destination_reached:
184 flags |= self.DESTINATION_REACHED_MASK
186 result.extend(DataParser.encode_int16(flags, signed=False))
188 # Unit is 1*10^-2 degrees
189 bearing_value = int(data.bearing * 100)
190 heading_value = int(data.heading * 100)
191 result.extend(DataParser.encode_int16(bearing_value, signed=False))
192 result.extend(DataParser.encode_int16(heading_value, signed=False))
194 if data.remaining_distance is not None:
195 distance_value = int(data.remaining_distance * 10)
196 result.extend(DataParser.encode_int24(distance_value, signed=False))
198 if data.remaining_vertical_distance is not None:
199 vertical_distance_value = int(data.remaining_vertical_distance * 100)
200 result.extend(DataParser.encode_int24(vertical_distance_value, signed=True))
202 if data.estimated_time_of_arrival is not None:
203 result.extend(IEEE11073Parser.encode_timestamp(data.estimated_time_of_arrival))
205 return result