Coverage for src/bluetooth_sig/gatt/characteristics/location_and_speed.py: 90%
167 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-28 01:26 +0000
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-28 01:26 +0000
1"""Location and Speed characteristic implementation."""
3from __future__ import annotations
5from datetime import datetime
6from enum import IntEnum, IntFlag
7from typing import Any, ClassVar
9import msgspec
11from ...types.gatt_enums import CharacteristicName
12from ...types.location import PositionStatus
13from ..context import CharacteristicContext
14from .base import BaseCharacteristic
15from .ln_feature import LNFeatureCharacteristic, LNFeatureData
16from .utils import DataParser, IEEE11073Parser
19class LocationAndSpeedFlags(IntFlag):
20 """Location and Speed flags as per Bluetooth SIG specification."""
22 INSTANTANEOUS_SPEED_PRESENT = 0x0001
23 TOTAL_DISTANCE_PRESENT = 0x0002
24 LOCATION_PRESENT = 0x0004
25 ELEVATION_PRESENT = 0x0008
26 HEADING_PRESENT = 0x0010
27 ROLLING_TIME_PRESENT = 0x0020
28 UTC_TIME_PRESENT = 0x0040
31class SpeedAndDistanceFormat(IntEnum):
32 """Speed and distance format enumeration."""
34 FORMAT_2D = 0
35 FORMAT_3D = 1
38class ElevationSource(IntEnum):
39 """Elevation source enumeration."""
41 POSITIONING_SYSTEM = 0
42 BAROMETRIC_AIR_PRESSURE = 1
43 DATABASE_SERVICE = 2
44 OTHER = 3
47class HeadingSource(IntEnum):
48 """Heading source enumeration."""
50 HEADING_BASED_ON_MOVEMENT = 0
51 HEADING_BASED_ON_MAGNETIC_COMPASS = 1
54class LocationAndSpeedData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods,too-many-instance-attributes
55 """Parsed data from Location and Speed characteristic."""
57 flags: LocationAndSpeedFlags
58 instantaneous_speed: float | None = None
59 total_distance: float | None = None
60 latitude: float | None = None
61 longitude: float | None = None
62 elevation: float | None = None
63 heading: float | None = None
64 rolling_time: int | None = None
65 utc_time: datetime | None = None
66 position_status: PositionStatus | None = None
67 speed_and_distance_format: SpeedAndDistanceFormat | None = None
68 elevation_source: ElevationSource | None = None
69 heading_source: HeadingSource | None = None
72class LocationAndSpeedCharacteristic(BaseCharacteristic[LocationAndSpeedData]):
73 """Location and Speed characteristic.
75 Used to represent data related to a location and speed sensor.
76 Note that it is possible for this characteristic to exceed the default LE ATT_MTU size.
77 """
79 min_length = 2 # Flags(2) minimum
80 max_length = 28 # Flags(2) + InstantaneousSpeed(2) + TotalDistance(3) + Location(8) +
81 # Elevation(3) + Heading(2) + RollingTime(1) + UTCTime(7) maximum
82 allow_variable_length: bool = True # Variable optional fields
84 _optional_dependencies: ClassVar[list[type[BaseCharacteristic[Any]]]] = [LNFeatureCharacteristic]
86 # Bit masks and shifts for status information in flags
87 POSITION_STATUS_MASK = 0x0180
88 POSITION_STATUS_SHIFT = 7
89 SPEED_DISTANCE_FORMAT_MASK = 0x0200
90 SPEED_DISTANCE_FORMAT_SHIFT = 9
91 ELEVATION_SOURCE_MASK = 0x0C00
92 ELEVATION_SOURCE_SHIFT = 10
93 HEADING_SOURCE_MASK = 0x1000
94 HEADING_SOURCE_SHIFT = 12
96 # Maximum valid enum values
97 _MAX_POSITION_STATUS_VALUE = 3
98 _MAX_ELEVATION_SOURCE_VALUE = 3
100 def _decode_value( # pylint: disable=too-many-locals # Location spec with many positional fields
101 self, data: bytearray, ctx: CharacteristicContext | None = None, *, validate: bool = True
102 ) -> LocationAndSpeedData:
103 """Parse location and speed data according to Bluetooth specification.
105 Format: Flags(2) + [Instantaneous Speed(2)] + [Total Distance(3)] + [Location - Latitude(4)] +
106 [Location - Longitude(4)] + [Elevation(3)] + [Heading(2)] + [Rolling Time(1)] + [UTC Time(7)].
108 Args:
109 data: Raw bytearray from BLE characteristic
110 ctx: Optional context providing surrounding context (may be None)
111 validate: Whether to validate ranges (default True)
113 Returns:
114 LocationAndSpeedData containing parsed location and speed data
116 """
117 flags = LocationAndSpeedFlags(DataParser.parse_int16(data, 0, signed=False))
119 # Extract status information from flags
120 position_status_bits = (flags & self.POSITION_STATUS_MASK) >> self.POSITION_STATUS_SHIFT
121 position_status = (
122 PositionStatus(position_status_bits) if position_status_bits <= self._MAX_POSITION_STATUS_VALUE else None
123 )
125 speed_distance_format_bit = (flags & self.SPEED_DISTANCE_FORMAT_MASK) >> self.SPEED_DISTANCE_FORMAT_SHIFT
126 speed_and_distance_format = SpeedAndDistanceFormat(speed_distance_format_bit)
128 elevation_source_bits = (flags & self.ELEVATION_SOURCE_MASK) >> self.ELEVATION_SOURCE_SHIFT
129 elevation_source = (
130 ElevationSource(elevation_source_bits)
131 if elevation_source_bits <= self._MAX_ELEVATION_SOURCE_VALUE
132 else None
133 )
135 heading_source_bit = (flags & self.HEADING_SOURCE_MASK) >> self.HEADING_SOURCE_SHIFT
136 heading_source = HeadingSource(heading_source_bit)
138 # Parse optional fields
139 instantaneous_speed: float | None = None
140 total_distance: float | None = None
141 latitude: float | None = None
142 longitude: float | None = None
143 elevation: float | None = None
144 heading: float | None = None
145 rolling_time: int | None = None
146 utc_time: datetime | None = None
147 offset = 2
149 if (flags & LocationAndSpeedFlags.INSTANTANEOUS_SPEED_PRESENT) and len(data) >= offset + 2:
150 # Unit is 1/100 of a m/s
151 instantaneous_speed = DataParser.parse_int16(data, offset, signed=False) / 100.0
152 offset += 2
154 if (flags & LocationAndSpeedFlags.TOTAL_DISTANCE_PRESENT) and len(data) >= offset + 3:
155 # Unit is 1/10 m
156 total_distance = DataParser.parse_int24(data, offset, signed=False) / 10.0
157 offset += 3
159 if (flags & LocationAndSpeedFlags.LOCATION_PRESENT) and len(data) >= offset + 8:
160 # Unit is 1*10^-7 degrees
161 latitude = DataParser.parse_int32(data, offset, signed=True) / 10000000.0
162 longitude = DataParser.parse_int32(data, offset + 4, signed=True) / 10000000.0
163 offset += 8
165 if (flags & LocationAndSpeedFlags.ELEVATION_PRESENT) and len(data) >= offset + 3:
166 # Unit is 1/100 m
167 elevation = DataParser.parse_int24(data, offset, signed=True) / 100.0
168 offset += 3
170 if (flags & LocationAndSpeedFlags.HEADING_PRESENT) and len(data) >= offset + 2:
171 # Unit is 1*10^-2 degrees
172 heading = DataParser.parse_int16(data, offset, signed=False) / 100.0
173 offset += 2
175 if (flags & LocationAndSpeedFlags.ROLLING_TIME_PRESENT) and len(data) >= offset + 1:
176 rolling_time = data[offset]
177 offset += 1
179 if (flags & LocationAndSpeedFlags.UTC_TIME_PRESENT) and len(data) >= offset + 7:
180 utc_time = IEEE11073Parser.parse_timestamp(data, offset)
182 if ctx is not None:
183 feature_data = self.get_context_characteristic(ctx, CharacteristicName.LN_FEATURE)
184 if feature_data is not None:
185 self._validate_flags_against_ln_feature(flags, feature_data)
187 return LocationAndSpeedData(
188 flags=flags,
189 instantaneous_speed=instantaneous_speed,
190 total_distance=total_distance,
191 latitude=latitude,
192 longitude=longitude,
193 elevation=elevation,
194 heading=heading,
195 rolling_time=rolling_time,
196 utc_time=utc_time,
197 position_status=position_status,
198 speed_and_distance_format=speed_and_distance_format,
199 elevation_source=elevation_source,
200 heading_source=heading_source,
201 )
203 def _encode_value(self, data: LocationAndSpeedData) -> bytearray:
204 """Encode LocationAndSpeedData back to bytes.
206 Args:
207 data: LocationAndSpeedData instance to encode
209 Returns:
210 Encoded bytes representing the location and speed data
212 """
213 result = bytearray()
215 flags = int(data.flags)
217 # Set status bits in flags
218 if data.position_status is not None:
219 flags |= data.position_status.value << self.POSITION_STATUS_SHIFT
221 if data.speed_and_distance_format is not None:
222 flags |= data.speed_and_distance_format.value << self.SPEED_DISTANCE_FORMAT_SHIFT
224 if data.elevation_source is not None:
225 flags |= data.elevation_source.value << self.ELEVATION_SOURCE_SHIFT
227 if data.heading_source is not None:
228 flags |= data.heading_source.value << self.HEADING_SOURCE_SHIFT
230 result.extend(DataParser.encode_int16(flags, signed=False))
232 if data.instantaneous_speed is not None:
233 speed_value = int(data.instantaneous_speed * 100)
234 result.extend(DataParser.encode_int16(speed_value, signed=False))
236 if data.total_distance is not None:
237 distance_value = int(data.total_distance * 10)
238 result.extend(DataParser.encode_int24(distance_value, signed=False))
240 if data.latitude is not None and data.longitude is not None:
241 lat_value = int(data.latitude * 10000000)
242 lon_value = int(data.longitude * 10000000)
243 result.extend(DataParser.encode_int32(lat_value, signed=True))
244 result.extend(DataParser.encode_int32(lon_value, signed=True))
246 if data.elevation is not None:
247 elevation_value = int(data.elevation * 100)
248 result.extend(DataParser.encode_int24(elevation_value, signed=True))
250 if data.heading is not None:
251 heading_value = int(data.heading * 100)
252 result.extend(DataParser.encode_int16(heading_value, signed=False))
254 if data.rolling_time is not None:
255 result.append(data.rolling_time)
257 if data.utc_time is not None:
258 result.extend(IEEE11073Parser.encode_timestamp(data.utc_time))
260 return result
262 @staticmethod
263 def _validate_flags_against_ln_feature(flags: LocationAndSpeedFlags, feature_data: LNFeatureData) -> None:
264 """Validate measurement flags against LN Feature capability bits."""
265 if (
266 flags & LocationAndSpeedFlags.INSTANTANEOUS_SPEED_PRESENT
267 ) and not feature_data.instantaneous_speed_supported:
268 raise ValueError("Instantaneous speed reported but not supported by LN Feature")
269 if (flags & LocationAndSpeedFlags.TOTAL_DISTANCE_PRESENT) and not feature_data.total_distance_supported:
270 raise ValueError("Total distance reported but not supported by LN Feature")
271 if (flags & LocationAndSpeedFlags.LOCATION_PRESENT) and not feature_data.location_supported:
272 raise ValueError("Location reported but not supported by LN Feature")
273 if (flags & LocationAndSpeedFlags.ELEVATION_PRESENT) and not feature_data.elevation_supported:
274 raise ValueError("Elevation reported but not supported by LN Feature")
275 if (flags & LocationAndSpeedFlags.HEADING_PRESENT) and not feature_data.heading_supported:
276 raise ValueError("Heading reported but not supported by LN Feature")
277 if (flags & LocationAndSpeedFlags.ROLLING_TIME_PRESENT) and not feature_data.rolling_time_supported:
278 raise ValueError("Rolling time reported but not supported by LN Feature")
279 if (flags & LocationAndSpeedFlags.UTC_TIME_PRESENT) and not feature_data.utc_time_supported:
280 raise ValueError("UTC time reported but not supported by LN Feature")
282 position_status_flags = flags & LocationAndSpeedCharacteristic.POSITION_STATUS_MASK
283 if position_status_flags and not feature_data.position_status_supported:
284 raise ValueError("Position status reported but not supported by LN Feature")
286 speed_distance_format_flag = flags & LocationAndSpeedCharacteristic.SPEED_DISTANCE_FORMAT_MASK
287 speed_distance_supported = feature_data.instantaneous_speed_supported and feature_data.total_distance_supported
288 if speed_distance_format_flag and not speed_distance_supported:
289 raise ValueError("Speed and distance format reported but not supported by LN Feature")
291 elevation_source_flags = flags & LocationAndSpeedCharacteristic.ELEVATION_SOURCE_MASK
292 if elevation_source_flags and not feature_data.elevation_supported:
293 raise ValueError("Elevation source reported but not supported by LN Feature")
295 heading_source_flag = flags & LocationAndSpeedCharacteristic.HEADING_SOURCE_MASK
296 if heading_source_flag and not feature_data.heading_supported:
297 raise ValueError("Heading source reported but not supported by LN Feature")