Coverage for src/bluetooth_sig/gatt/characteristics/location_and_speed.py: 90%

167 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-06-28 01:26 +0000

1"""Location and Speed characteristic implementation.""" 

2 

3from __future__ import annotations 

4 

5from datetime import datetime 

6from enum import IntEnum, IntFlag 

7from typing import Any, ClassVar 

8 

9import msgspec 

10 

11from ...types.gatt_enums import CharacteristicName 

12from ...types.location import PositionStatus 

13from ..context import CharacteristicContext 

14from .base import BaseCharacteristic 

15from .ln_feature import LNFeatureCharacteristic, LNFeatureData 

16from .utils import DataParser, IEEE11073Parser 

17 

18 

19class LocationAndSpeedFlags(IntFlag): 

20 """Location and Speed flags as per Bluetooth SIG specification.""" 

21 

22 INSTANTANEOUS_SPEED_PRESENT = 0x0001 

23 TOTAL_DISTANCE_PRESENT = 0x0002 

24 LOCATION_PRESENT = 0x0004 

25 ELEVATION_PRESENT = 0x0008 

26 HEADING_PRESENT = 0x0010 

27 ROLLING_TIME_PRESENT = 0x0020 

28 UTC_TIME_PRESENT = 0x0040 

29 

30 

31class SpeedAndDistanceFormat(IntEnum): 

32 """Speed and distance format enumeration.""" 

33 

34 FORMAT_2D = 0 

35 FORMAT_3D = 1 

36 

37 

38class ElevationSource(IntEnum): 

39 """Elevation source enumeration.""" 

40 

41 POSITIONING_SYSTEM = 0 

42 BAROMETRIC_AIR_PRESSURE = 1 

43 DATABASE_SERVICE = 2 

44 OTHER = 3 

45 

46 

47class HeadingSource(IntEnum): 

48 """Heading source enumeration.""" 

49 

50 HEADING_BASED_ON_MOVEMENT = 0 

51 HEADING_BASED_ON_MAGNETIC_COMPASS = 1 

52 

53 

54class LocationAndSpeedData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods,too-many-instance-attributes 

55 """Parsed data from Location and Speed characteristic.""" 

56 

57 flags: LocationAndSpeedFlags 

58 instantaneous_speed: float | None = None 

59 total_distance: float | None = None 

60 latitude: float | None = None 

61 longitude: float | None = None 

62 elevation: float | None = None 

63 heading: float | None = None 

64 rolling_time: int | None = None 

65 utc_time: datetime | None = None 

66 position_status: PositionStatus | None = None 

67 speed_and_distance_format: SpeedAndDistanceFormat | None = None 

68 elevation_source: ElevationSource | None = None 

69 heading_source: HeadingSource | None = None 

70 

71 

72class LocationAndSpeedCharacteristic(BaseCharacteristic[LocationAndSpeedData]): 

73 """Location and Speed characteristic. 

74 

75 Used to represent data related to a location and speed sensor. 

76 Note that it is possible for this characteristic to exceed the default LE ATT_MTU size. 

77 """ 

78 

79 min_length = 2 # Flags(2) minimum 

80 max_length = 28 # Flags(2) + InstantaneousSpeed(2) + TotalDistance(3) + Location(8) + 

81 # Elevation(3) + Heading(2) + RollingTime(1) + UTCTime(7) maximum 

82 allow_variable_length: bool = True # Variable optional fields 

83 

84 _optional_dependencies: ClassVar[list[type[BaseCharacteristic[Any]]]] = [LNFeatureCharacteristic] 

85 

86 # Bit masks and shifts for status information in flags 

87 POSITION_STATUS_MASK = 0x0180 

88 POSITION_STATUS_SHIFT = 7 

89 SPEED_DISTANCE_FORMAT_MASK = 0x0200 

90 SPEED_DISTANCE_FORMAT_SHIFT = 9 

91 ELEVATION_SOURCE_MASK = 0x0C00 

92 ELEVATION_SOURCE_SHIFT = 10 

93 HEADING_SOURCE_MASK = 0x1000 

94 HEADING_SOURCE_SHIFT = 12 

95 

96 # Maximum valid enum values 

97 _MAX_POSITION_STATUS_VALUE = 3 

98 _MAX_ELEVATION_SOURCE_VALUE = 3 

99 

100 def _decode_value( # pylint: disable=too-many-locals # Location spec with many positional fields 

101 self, data: bytearray, ctx: CharacteristicContext | None = None, *, validate: bool = True 

102 ) -> LocationAndSpeedData: 

103 """Parse location and speed data according to Bluetooth specification. 

104 

105 Format: Flags(2) + [Instantaneous Speed(2)] + [Total Distance(3)] + [Location - Latitude(4)] + 

106 [Location - Longitude(4)] + [Elevation(3)] + [Heading(2)] + [Rolling Time(1)] + [UTC Time(7)]. 

107 

108 Args: 

109 data: Raw bytearray from BLE characteristic 

110 ctx: Optional context providing surrounding context (may be None) 

111 validate: Whether to validate ranges (default True) 

112 

113 Returns: 

114 LocationAndSpeedData containing parsed location and speed data 

115 

116 """ 

117 flags = LocationAndSpeedFlags(DataParser.parse_int16(data, 0, signed=False)) 

118 

119 # Extract status information from flags 

120 position_status_bits = (flags & self.POSITION_STATUS_MASK) >> self.POSITION_STATUS_SHIFT 

121 position_status = ( 

122 PositionStatus(position_status_bits) if position_status_bits <= self._MAX_POSITION_STATUS_VALUE else None 

123 ) 

124 

125 speed_distance_format_bit = (flags & self.SPEED_DISTANCE_FORMAT_MASK) >> self.SPEED_DISTANCE_FORMAT_SHIFT 

126 speed_and_distance_format = SpeedAndDistanceFormat(speed_distance_format_bit) 

127 

128 elevation_source_bits = (flags & self.ELEVATION_SOURCE_MASK) >> self.ELEVATION_SOURCE_SHIFT 

129 elevation_source = ( 

130 ElevationSource(elevation_source_bits) 

131 if elevation_source_bits <= self._MAX_ELEVATION_SOURCE_VALUE 

132 else None 

133 ) 

134 

135 heading_source_bit = (flags & self.HEADING_SOURCE_MASK) >> self.HEADING_SOURCE_SHIFT 

136 heading_source = HeadingSource(heading_source_bit) 

137 

138 # Parse optional fields 

139 instantaneous_speed: float | None = None 

140 total_distance: float | None = None 

141 latitude: float | None = None 

142 longitude: float | None = None 

143 elevation: float | None = None 

144 heading: float | None = None 

145 rolling_time: int | None = None 

146 utc_time: datetime | None = None 

147 offset = 2 

148 

149 if (flags & LocationAndSpeedFlags.INSTANTANEOUS_SPEED_PRESENT) and len(data) >= offset + 2: 

150 # Unit is 1/100 of a m/s 

151 instantaneous_speed = DataParser.parse_int16(data, offset, signed=False) / 100.0 

152 offset += 2 

153 

154 if (flags & LocationAndSpeedFlags.TOTAL_DISTANCE_PRESENT) and len(data) >= offset + 3: 

155 # Unit is 1/10 m 

156 total_distance = DataParser.parse_int24(data, offset, signed=False) / 10.0 

157 offset += 3 

158 

159 if (flags & LocationAndSpeedFlags.LOCATION_PRESENT) and len(data) >= offset + 8: 

160 # Unit is 1*10^-7 degrees 

161 latitude = DataParser.parse_int32(data, offset, signed=True) / 10000000.0 

162 longitude = DataParser.parse_int32(data, offset + 4, signed=True) / 10000000.0 

163 offset += 8 

164 

165 if (flags & LocationAndSpeedFlags.ELEVATION_PRESENT) and len(data) >= offset + 3: 

166 # Unit is 1/100 m 

167 elevation = DataParser.parse_int24(data, offset, signed=True) / 100.0 

168 offset += 3 

169 

170 if (flags & LocationAndSpeedFlags.HEADING_PRESENT) and len(data) >= offset + 2: 

171 # Unit is 1*10^-2 degrees 

172 heading = DataParser.parse_int16(data, offset, signed=False) / 100.0 

173 offset += 2 

174 

175 if (flags & LocationAndSpeedFlags.ROLLING_TIME_PRESENT) and len(data) >= offset + 1: 

176 rolling_time = data[offset] 

177 offset += 1 

178 

179 if (flags & LocationAndSpeedFlags.UTC_TIME_PRESENT) and len(data) >= offset + 7: 

180 utc_time = IEEE11073Parser.parse_timestamp(data, offset) 

181 

182 if ctx is not None: 

183 feature_data = self.get_context_characteristic(ctx, CharacteristicName.LN_FEATURE) 

184 if feature_data is not None: 

185 self._validate_flags_against_ln_feature(flags, feature_data) 

186 

187 return LocationAndSpeedData( 

188 flags=flags, 

189 instantaneous_speed=instantaneous_speed, 

190 total_distance=total_distance, 

191 latitude=latitude, 

192 longitude=longitude, 

193 elevation=elevation, 

194 heading=heading, 

195 rolling_time=rolling_time, 

196 utc_time=utc_time, 

197 position_status=position_status, 

198 speed_and_distance_format=speed_and_distance_format, 

199 elevation_source=elevation_source, 

200 heading_source=heading_source, 

201 ) 

202 

203 def _encode_value(self, data: LocationAndSpeedData) -> bytearray: 

204 """Encode LocationAndSpeedData back to bytes. 

205 

206 Args: 

207 data: LocationAndSpeedData instance to encode 

208 

209 Returns: 

210 Encoded bytes representing the location and speed data 

211 

212 """ 

213 result = bytearray() 

214 

215 flags = int(data.flags) 

216 

217 # Set status bits in flags 

218 if data.position_status is not None: 

219 flags |= data.position_status.value << self.POSITION_STATUS_SHIFT 

220 

221 if data.speed_and_distance_format is not None: 

222 flags |= data.speed_and_distance_format.value << self.SPEED_DISTANCE_FORMAT_SHIFT 

223 

224 if data.elevation_source is not None: 

225 flags |= data.elevation_source.value << self.ELEVATION_SOURCE_SHIFT 

226 

227 if data.heading_source is not None: 

228 flags |= data.heading_source.value << self.HEADING_SOURCE_SHIFT 

229 

230 result.extend(DataParser.encode_int16(flags, signed=False)) 

231 

232 if data.instantaneous_speed is not None: 

233 speed_value = int(data.instantaneous_speed * 100) 

234 result.extend(DataParser.encode_int16(speed_value, signed=False)) 

235 

236 if data.total_distance is not None: 

237 distance_value = int(data.total_distance * 10) 

238 result.extend(DataParser.encode_int24(distance_value, signed=False)) 

239 

240 if data.latitude is not None and data.longitude is not None: 

241 lat_value = int(data.latitude * 10000000) 

242 lon_value = int(data.longitude * 10000000) 

243 result.extend(DataParser.encode_int32(lat_value, signed=True)) 

244 result.extend(DataParser.encode_int32(lon_value, signed=True)) 

245 

246 if data.elevation is not None: 

247 elevation_value = int(data.elevation * 100) 

248 result.extend(DataParser.encode_int24(elevation_value, signed=True)) 

249 

250 if data.heading is not None: 

251 heading_value = int(data.heading * 100) 

252 result.extend(DataParser.encode_int16(heading_value, signed=False)) 

253 

254 if data.rolling_time is not None: 

255 result.append(data.rolling_time) 

256 

257 if data.utc_time is not None: 

258 result.extend(IEEE11073Parser.encode_timestamp(data.utc_time)) 

259 

260 return result 

261 

262 @staticmethod 

263 def _validate_flags_against_ln_feature(flags: LocationAndSpeedFlags, feature_data: LNFeatureData) -> None: 

264 """Validate measurement flags against LN Feature capability bits.""" 

265 if ( 

266 flags & LocationAndSpeedFlags.INSTANTANEOUS_SPEED_PRESENT 

267 ) and not feature_data.instantaneous_speed_supported: 

268 raise ValueError("Instantaneous speed reported but not supported by LN Feature") 

269 if (flags & LocationAndSpeedFlags.TOTAL_DISTANCE_PRESENT) and not feature_data.total_distance_supported: 

270 raise ValueError("Total distance reported but not supported by LN Feature") 

271 if (flags & LocationAndSpeedFlags.LOCATION_PRESENT) and not feature_data.location_supported: 

272 raise ValueError("Location reported but not supported by LN Feature") 

273 if (flags & LocationAndSpeedFlags.ELEVATION_PRESENT) and not feature_data.elevation_supported: 

274 raise ValueError("Elevation reported but not supported by LN Feature") 

275 if (flags & LocationAndSpeedFlags.HEADING_PRESENT) and not feature_data.heading_supported: 

276 raise ValueError("Heading reported but not supported by LN Feature") 

277 if (flags & LocationAndSpeedFlags.ROLLING_TIME_PRESENT) and not feature_data.rolling_time_supported: 

278 raise ValueError("Rolling time reported but not supported by LN Feature") 

279 if (flags & LocationAndSpeedFlags.UTC_TIME_PRESENT) and not feature_data.utc_time_supported: 

280 raise ValueError("UTC time reported but not supported by LN Feature") 

281 

282 position_status_flags = flags & LocationAndSpeedCharacteristic.POSITION_STATUS_MASK 

283 if position_status_flags and not feature_data.position_status_supported: 

284 raise ValueError("Position status reported but not supported by LN Feature") 

285 

286 speed_distance_format_flag = flags & LocationAndSpeedCharacteristic.SPEED_DISTANCE_FORMAT_MASK 

287 speed_distance_supported = feature_data.instantaneous_speed_supported and feature_data.total_distance_supported 

288 if speed_distance_format_flag and not speed_distance_supported: 

289 raise ValueError("Speed and distance format reported but not supported by LN Feature") 

290 

291 elevation_source_flags = flags & LocationAndSpeedCharacteristic.ELEVATION_SOURCE_MASK 

292 if elevation_source_flags and not feature_data.elevation_supported: 

293 raise ValueError("Elevation source reported but not supported by LN Feature") 

294 

295 heading_source_flag = flags & LocationAndSpeedCharacteristic.HEADING_SOURCE_MASK 

296 if heading_source_flag and not feature_data.heading_supported: 

297 raise ValueError("Heading source reported but not supported by LN Feature")