Coverage for src / bluetooth_sig / gatt / characteristics / location_and_speed.py: 92%
130 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
1"""Location and Speed characteristic implementation."""
3from __future__ import annotations
5from datetime import datetime
6from enum import IntEnum, IntFlag
8import msgspec
10from ...types.location import PositionStatus
11from ..context import CharacteristicContext
12from .base import BaseCharacteristic
13from .utils import DataParser, IEEE11073Parser
16class LocationAndSpeedFlags(IntFlag):
17 """Location and Speed flags as per Bluetooth SIG specification."""
19 INSTANTANEOUS_SPEED_PRESENT = 0x0001
20 TOTAL_DISTANCE_PRESENT = 0x0002
21 LOCATION_PRESENT = 0x0004
22 ELEVATION_PRESENT = 0x0008
23 HEADING_PRESENT = 0x0010
24 ROLLING_TIME_PRESENT = 0x0020
25 UTC_TIME_PRESENT = 0x0040
28class SpeedAndDistanceFormat(IntEnum):
29 """Speed and distance format enumeration."""
31 FORMAT_2D = 0
32 FORMAT_3D = 1
35class ElevationSource(IntEnum):
36 """Elevation source enumeration."""
38 POSITIONING_SYSTEM = 0
39 BAROMETRIC_AIR_PRESSURE = 1
40 DATABASE_SERVICE = 2
41 OTHER = 3
44class HeadingSource(IntEnum):
45 """Heading source enumeration."""
47 HEADING_BASED_ON_MOVEMENT = 0
48 HEADING_BASED_ON_MAGNETIC_COMPASS = 1
51class LocationAndSpeedData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods,too-many-instance-attributes
52 """Parsed data from Location and Speed characteristic."""
54 flags: LocationAndSpeedFlags
55 instantaneous_speed: float | None = None
56 total_distance: float | None = None
57 latitude: float | None = None
58 longitude: float | None = None
59 elevation: float | None = None
60 heading: float | None = None
61 rolling_time: int | None = None
62 utc_time: datetime | None = None
63 position_status: PositionStatus | None = None
64 speed_and_distance_format: SpeedAndDistanceFormat | None = None
65 elevation_source: ElevationSource | None = None
66 heading_source: HeadingSource | None = None
69class LocationAndSpeedCharacteristic(BaseCharacteristic[LocationAndSpeedData]):
70 """Location and Speed characteristic.
72 Used to represent data related to a location and speed sensor.
73 Note that it is possible for this characteristic to exceed the default LE ATT_MTU size.
74 """
76 min_length = 2 # Flags(2) minimum
77 max_length = 28 # Flags(2) + InstantaneousSpeed(2) + TotalDistance(3) + Location(8) +
78 # Elevation(3) + Heading(2) + RollingTime(1) + UTCTime(7) maximum
79 allow_variable_length: bool = True # Variable optional fields
81 # Bit masks and shifts for status information in flags
82 POSITION_STATUS_MASK = 0x0300
83 POSITION_STATUS_SHIFT = 8
84 SPEED_DISTANCE_FORMAT_MASK = 0x0400
85 SPEED_DISTANCE_FORMAT_SHIFT = 10
86 ELEVATION_SOURCE_MASK = 0x1800
87 ELEVATION_SOURCE_SHIFT = 11
88 HEADING_SOURCE_MASK = 0x2000
89 HEADING_SOURCE_SHIFT = 13
91 # Maximum valid enum values
92 _MAX_POSITION_STATUS_VALUE = 3
93 _MAX_ELEVATION_SOURCE_VALUE = 3
95 def _decode_value( # pylint: disable=too-many-locals # Location spec with many positional fields
96 self, data: bytearray, ctx: CharacteristicContext | None = None, *, validate: bool = True
97 ) -> LocationAndSpeedData:
98 """Parse location and speed data according to Bluetooth specification.
100 Format: Flags(2) + [Instantaneous Speed(2)] + [Total Distance(3)] + [Location - Latitude(4)] +
101 [Location - Longitude(4)] + [Elevation(3)] + [Heading(2)] + [Rolling Time(1)] + [UTC Time(7)].
103 Args:
104 data: Raw bytearray from BLE characteristic
105 ctx: Optional context providing surrounding context (may be None)
106 validate: Whether to validate ranges (default True)
108 Returns:
109 LocationAndSpeedData containing parsed location and speed data
111 """
112 flags = LocationAndSpeedFlags(DataParser.parse_int16(data, 0, signed=False))
114 # Extract status information from flags
115 position_status_bits = (flags & self.POSITION_STATUS_MASK) >> self.POSITION_STATUS_SHIFT
116 position_status = (
117 PositionStatus(position_status_bits) if position_status_bits <= self._MAX_POSITION_STATUS_VALUE else None
118 )
120 speed_distance_format_bit = (flags & self.SPEED_DISTANCE_FORMAT_MASK) >> self.SPEED_DISTANCE_FORMAT_SHIFT
121 speed_and_distance_format = SpeedAndDistanceFormat(speed_distance_format_bit)
123 elevation_source_bits = (flags & self.ELEVATION_SOURCE_MASK) >> self.ELEVATION_SOURCE_SHIFT
124 elevation_source = (
125 ElevationSource(elevation_source_bits)
126 if elevation_source_bits <= self._MAX_ELEVATION_SOURCE_VALUE
127 else None
128 )
130 heading_source_bit = (flags & self.HEADING_SOURCE_MASK) >> self.HEADING_SOURCE_SHIFT
131 heading_source = HeadingSource(heading_source_bit)
133 # Parse optional fields
134 instantaneous_speed: float | None = None
135 total_distance: float | None = None
136 latitude: float | None = None
137 longitude: float | None = None
138 elevation: float | None = None
139 heading: float | None = None
140 rolling_time: int | None = None
141 utc_time: datetime | None = None
142 offset = 2
144 if (flags & LocationAndSpeedFlags.INSTANTANEOUS_SPEED_PRESENT) and len(data) >= offset + 2:
145 # Unit is 1/100 of a m/s
146 instantaneous_speed = DataParser.parse_int16(data, offset, signed=False) / 100.0
147 offset += 2
149 if (flags & LocationAndSpeedFlags.TOTAL_DISTANCE_PRESENT) and len(data) >= offset + 3:
150 # Unit is 1/10 m
151 total_distance = DataParser.parse_int24(data, offset, signed=False) / 10.0
152 offset += 3
154 if (flags & LocationAndSpeedFlags.LOCATION_PRESENT) and len(data) >= offset + 8:
155 # Unit is 1*10^-7 degrees
156 latitude = DataParser.parse_int32(data, offset, signed=True) / 10000000.0
157 longitude = DataParser.parse_int32(data, offset + 4, signed=True) / 10000000.0
158 offset += 8
160 if (flags & LocationAndSpeedFlags.ELEVATION_PRESENT) and len(data) >= offset + 3:
161 # Unit is 1/100 m
162 elevation = DataParser.parse_int24(data, offset, signed=True) / 100.0
163 offset += 3
165 if (flags & LocationAndSpeedFlags.HEADING_PRESENT) and len(data) >= offset + 2:
166 # Unit is 1*10^-2 degrees
167 heading = DataParser.parse_int16(data, offset, signed=False) / 100.0
168 offset += 2
170 if (flags & LocationAndSpeedFlags.ROLLING_TIME_PRESENT) and len(data) >= offset + 1:
171 rolling_time = data[offset]
172 offset += 1
174 if (flags & LocationAndSpeedFlags.UTC_TIME_PRESENT) and len(data) >= offset + 7:
175 utc_time = IEEE11073Parser.parse_timestamp(data, offset)
177 return LocationAndSpeedData(
178 flags=flags,
179 instantaneous_speed=instantaneous_speed,
180 total_distance=total_distance,
181 latitude=latitude,
182 longitude=longitude,
183 elevation=elevation,
184 heading=heading,
185 rolling_time=rolling_time,
186 utc_time=utc_time,
187 position_status=position_status,
188 speed_and_distance_format=speed_and_distance_format,
189 elevation_source=elevation_source,
190 heading_source=heading_source,
191 )
193 def _encode_value(self, data: LocationAndSpeedData) -> bytearray:
194 """Encode LocationAndSpeedData back to bytes.
196 Args:
197 data: LocationAndSpeedData instance to encode
199 Returns:
200 Encoded bytes representing the location and speed data
202 """
203 result = bytearray()
205 flags = int(data.flags)
207 # Set status bits in flags
208 if data.position_status is not None:
209 flags |= data.position_status.value << self.POSITION_STATUS_SHIFT
211 if data.speed_and_distance_format is not None:
212 flags |= data.speed_and_distance_format.value << self.SPEED_DISTANCE_FORMAT_SHIFT
214 if data.elevation_source is not None:
215 flags |= data.elevation_source.value << self.ELEVATION_SOURCE_SHIFT
217 if data.heading_source is not None:
218 flags |= data.heading_source.value << self.HEADING_SOURCE_SHIFT
220 result.extend(DataParser.encode_int16(flags, signed=False))
222 if data.instantaneous_speed is not None:
223 speed_value = int(data.instantaneous_speed * 100)
224 result.extend(DataParser.encode_int16(speed_value, signed=False))
226 if data.total_distance is not None:
227 distance_value = int(data.total_distance * 10)
228 result.extend(DataParser.encode_int24(distance_value, signed=False))
230 if data.latitude is not None and data.longitude is not None:
231 lat_value = int(data.latitude * 10000000)
232 lon_value = int(data.longitude * 10000000)
233 result.extend(DataParser.encode_int32(lat_value, signed=True))
234 result.extend(DataParser.encode_int32(lon_value, signed=True))
236 if data.elevation is not None:
237 elevation_value = int(data.elevation * 100)
238 result.extend(DataParser.encode_int24(elevation_value, signed=True))
240 if data.heading is not None:
241 heading_value = int(data.heading * 100)
242 result.extend(DataParser.encode_int16(heading_value, signed=False))
244 if data.rolling_time is not None:
245 result.append(data.rolling_time)
247 if data.utc_time is not None:
248 result.extend(IEEE11073Parser.encode_timestamp(data.utc_time))
250 return result