Coverage for src/bluetooth_sig/gatt/characteristics/location_and_speed.py: 91%
136 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 00:10 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 00:10 +0000
1"""Location and Speed characteristic implementation."""
3from __future__ import annotations
5from datetime import datetime
6from enum import IntEnum, IntFlag
8import msgspec
10from ...types.gatt_enums import ValueType
11from ..context import CharacteristicContext
12from .base import BaseCharacteristic
13from .utils import DataParser, IEEE11073Parser
16class LocationAndSpeedFlags(IntFlag):
17 """Location and Speed flags as per Bluetooth SIG specification."""
19 INSTANTANEOUS_SPEED_PRESENT = 0x0001
20 TOTAL_DISTANCE_PRESENT = 0x0002
21 LOCATION_PRESENT = 0x0004
22 ELEVATION_PRESENT = 0x0008
23 HEADING_PRESENT = 0x0010
24 ROLLING_TIME_PRESENT = 0x0020
25 UTC_TIME_PRESENT = 0x0040
28class PositionStatus(IntEnum):
29 """Position status enumeration."""
31 NO_POSITION = 0
32 POSITION_OK = 1
33 ESTIMATED_POSITION = 2
34 LAST_KNOWN_POSITION = 3
37class SpeedAndDistanceFormat(IntEnum):
38 """Speed and distance format enumeration."""
40 FORMAT_2D = 0
41 FORMAT_3D = 1
44class ElevationSource(IntEnum):
45 """Elevation source enumeration."""
47 POSITIONING_SYSTEM = 0
48 BAROMETRIC_AIR_PRESSURE = 1
49 DATABASE_SERVICE = 2
50 OTHER = 3
53class HeadingSource(IntEnum):
54 """Heading source enumeration."""
56 HEADING_BASED_ON_MOVEMENT = 0
57 HEADING_BASED_ON_MAGNETIC_COMPASS = 1
60class LocationAndSpeedData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods,too-many-instance-attributes
61 """Parsed data from Location and Speed characteristic."""
63 flags: LocationAndSpeedFlags
64 instantaneous_speed: float | None = None
65 total_distance: float | None = None
66 latitude: float | None = None
67 longitude: float | None = None
68 elevation: float | None = None
69 heading: float | None = None
70 rolling_time: int | None = None
71 utc_time: datetime | None = None
72 position_status: PositionStatus | None = None
73 speed_and_distance_format: SpeedAndDistanceFormat | None = None
74 elevation_source: ElevationSource | None = None
75 heading_source: HeadingSource | None = None
78class LocationAndSpeedCharacteristic(BaseCharacteristic):
79 """Location and Speed characteristic.
81 Used to represent data related to a location and speed sensor.
82 Note that it is possible for this characteristic to exceed the default LE ATT_MTU size.
83 """
85 _manual_value_type: ValueType | str | None = ValueType.DICT # Override since decode_value returns dataclass
87 min_length = 2 # Flags(2) minimum
88 max_length = 28 # Flags(2) + InstantaneousSpeed(2) + TotalDistance(3) + Location(8) +
89 # Elevation(3) + Heading(2) + RollingTime(1) + UTCTime(7) maximum
90 allow_variable_length: bool = True # Variable optional fields
92 # Bit masks and shifts for status information in flags
93 POSITION_STATUS_MASK = 0x0300
94 POSITION_STATUS_SHIFT = 8
95 SPEED_DISTANCE_FORMAT_MASK = 0x0400
96 SPEED_DISTANCE_FORMAT_SHIFT = 10
97 ELEVATION_SOURCE_MASK = 0x1800
98 ELEVATION_SOURCE_SHIFT = 11
99 HEADING_SOURCE_MASK = 0x2000
100 HEADING_SOURCE_SHIFT = 13
102 def decode_value(self, data: bytearray, ctx: CharacteristicContext | None = None) -> LocationAndSpeedData: # pylint: disable=too-many-locals
103 """Parse location and speed data according to Bluetooth specification.
105 Format: Flags(2) + [Instantaneous Speed(2)] + [Total Distance(3)] + [Location - Latitude(4)] +
106 [Location - Longitude(4)] + [Elevation(3)] + [Heading(2)] + [Rolling Time(1)] + [UTC Time(7)].
108 Args:
109 data: Raw bytearray from BLE characteristic
110 ctx: Optional context providing surrounding context (may be None)
112 Returns:
113 LocationAndSpeedData containing parsed location and speed data
115 """
116 if len(data) < 2:
117 raise ValueError("Location and Speed data must be at least 2 bytes")
119 flags = LocationAndSpeedFlags(DataParser.parse_int16(data, 0, signed=False))
121 # Extract status information from flags
122 position_status_bits = (flags & self.POSITION_STATUS_MASK) >> self.POSITION_STATUS_SHIFT
123 position_status = PositionStatus(position_status_bits) if position_status_bits <= 3 else None
125 speed_distance_format_bit = (flags & self.SPEED_DISTANCE_FORMAT_MASK) >> self.SPEED_DISTANCE_FORMAT_SHIFT
126 speed_and_distance_format = SpeedAndDistanceFormat(speed_distance_format_bit)
128 elevation_source_bits = (flags & self.ELEVATION_SOURCE_MASK) >> self.ELEVATION_SOURCE_SHIFT
129 elevation_source = ElevationSource(elevation_source_bits) if elevation_source_bits <= 3 else None
131 heading_source_bit = (flags & self.HEADING_SOURCE_MASK) >> self.HEADING_SOURCE_SHIFT
132 heading_source = HeadingSource(heading_source_bit)
134 # Parse optional fields
135 instantaneous_speed: float | None = None
136 total_distance: float | None = None
137 latitude: float | None = None
138 longitude: float | None = None
139 elevation: float | None = None
140 heading: float | None = None
141 rolling_time: int | None = None
142 utc_time: datetime | None = None
143 offset = 2
145 if (flags & LocationAndSpeedFlags.INSTANTANEOUS_SPEED_PRESENT) and len(data) >= offset + 2:
146 # Unit is 1/100 of a m/s
147 instantaneous_speed = DataParser.parse_int16(data, offset, signed=False) / 100.0
148 offset += 2
150 if (flags & LocationAndSpeedFlags.TOTAL_DISTANCE_PRESENT) and len(data) >= offset + 3:
151 # Unit is 1/10 m
152 total_distance = DataParser.parse_int24(data, offset, signed=False) / 10.0
153 offset += 3
155 if (flags & LocationAndSpeedFlags.LOCATION_PRESENT) and len(data) >= offset + 8:
156 # Unit is 1*10^-7 degrees
157 latitude = DataParser.parse_int32(data, offset, signed=True) / 10000000.0
158 longitude = DataParser.parse_int32(data, offset + 4, signed=True) / 10000000.0
159 offset += 8
161 if (flags & LocationAndSpeedFlags.ELEVATION_PRESENT) and len(data) >= offset + 3:
162 # Unit is 1/100 m
163 elevation = DataParser.parse_int24(data, offset, signed=True) / 100.0
164 offset += 3
166 if (flags & LocationAndSpeedFlags.HEADING_PRESENT) and len(data) >= offset + 2:
167 # Unit is 1*10^-2 degrees
168 heading = DataParser.parse_int16(data, offset, signed=False) / 100.0
169 offset += 2
171 if (flags & LocationAndSpeedFlags.ROLLING_TIME_PRESENT) and len(data) >= offset + 1:
172 rolling_time = data[offset]
173 offset += 1
175 if (flags & LocationAndSpeedFlags.UTC_TIME_PRESENT) and len(data) >= offset + 7:
176 utc_time = IEEE11073Parser.parse_timestamp(data, offset)
178 return LocationAndSpeedData(
179 flags=flags,
180 instantaneous_speed=instantaneous_speed,
181 total_distance=total_distance,
182 latitude=latitude,
183 longitude=longitude,
184 elevation=elevation,
185 heading=heading,
186 rolling_time=rolling_time,
187 utc_time=utc_time,
188 position_status=position_status,
189 speed_and_distance_format=speed_and_distance_format,
190 elevation_source=elevation_source,
191 heading_source=heading_source,
192 )
194 def encode_value(self, data: LocationAndSpeedData) -> bytearray:
195 """Encode LocationAndSpeedData back to bytes.
197 Args:
198 data: LocationAndSpeedData instance to encode
200 Returns:
201 Encoded bytes representing the location and speed data
203 """
204 result = bytearray()
206 flags = int(data.flags)
208 # Set status bits in flags
209 if data.position_status is not None:
210 flags |= data.position_status.value << self.POSITION_STATUS_SHIFT
212 if data.speed_and_distance_format is not None:
213 flags |= data.speed_and_distance_format.value << self.SPEED_DISTANCE_FORMAT_SHIFT
215 if data.elevation_source is not None:
216 flags |= data.elevation_source.value << self.ELEVATION_SOURCE_SHIFT
218 if data.heading_source is not None:
219 flags |= data.heading_source.value << self.HEADING_SOURCE_SHIFT
221 result.extend(DataParser.encode_int16(flags, signed=False))
223 if data.instantaneous_speed is not None:
224 speed_value = int(data.instantaneous_speed * 100)
225 result.extend(DataParser.encode_int16(speed_value, signed=False))
227 if data.total_distance is not None:
228 distance_value = int(data.total_distance * 10)
229 result.extend(DataParser.encode_int24(distance_value, signed=False))
231 if data.latitude is not None and data.longitude is not None:
232 lat_value = int(data.latitude * 10000000)
233 lon_value = int(data.longitude * 10000000)
234 result.extend(DataParser.encode_int32(lat_value, signed=True))
235 result.extend(DataParser.encode_int32(lon_value, signed=True))
237 if data.elevation is not None:
238 elevation_value = int(data.elevation * 100)
239 result.extend(DataParser.encode_int24(elevation_value, signed=True))
241 if data.heading is not None:
242 heading_value = int(data.heading * 100)
243 result.extend(DataParser.encode_int16(heading_value, signed=False))
245 if data.rolling_time is not None:
246 result.append(data.rolling_time)
248 if data.utc_time is not None:
249 result.extend(IEEE11073Parser.encode_timestamp(data.utc_time))
251 return result