Coverage for src/bluetooth_sig/gatt/characteristics/location_and_speed.py: 91%

136 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-30 00:10 +0000

1"""Location and Speed characteristic implementation.""" 

2 

3from __future__ import annotations 

4 

5from datetime import datetime 

6from enum import IntEnum, IntFlag 

7 

8import msgspec 

9 

10from ...types.gatt_enums import ValueType 

11from ..context import CharacteristicContext 

12from .base import BaseCharacteristic 

13from .utils import DataParser, IEEE11073Parser 

14 

15 

16class LocationAndSpeedFlags(IntFlag): 

17 """Location and Speed flags as per Bluetooth SIG specification.""" 

18 

19 INSTANTANEOUS_SPEED_PRESENT = 0x0001 

20 TOTAL_DISTANCE_PRESENT = 0x0002 

21 LOCATION_PRESENT = 0x0004 

22 ELEVATION_PRESENT = 0x0008 

23 HEADING_PRESENT = 0x0010 

24 ROLLING_TIME_PRESENT = 0x0020 

25 UTC_TIME_PRESENT = 0x0040 

26 

27 

28class PositionStatus(IntEnum): 

29 """Position status enumeration.""" 

30 

31 NO_POSITION = 0 

32 POSITION_OK = 1 

33 ESTIMATED_POSITION = 2 

34 LAST_KNOWN_POSITION = 3 

35 

36 

37class SpeedAndDistanceFormat(IntEnum): 

38 """Speed and distance format enumeration.""" 

39 

40 FORMAT_2D = 0 

41 FORMAT_3D = 1 

42 

43 

44class ElevationSource(IntEnum): 

45 """Elevation source enumeration.""" 

46 

47 POSITIONING_SYSTEM = 0 

48 BAROMETRIC_AIR_PRESSURE = 1 

49 DATABASE_SERVICE = 2 

50 OTHER = 3 

51 

52 

53class HeadingSource(IntEnum): 

54 """Heading source enumeration.""" 

55 

56 HEADING_BASED_ON_MOVEMENT = 0 

57 HEADING_BASED_ON_MAGNETIC_COMPASS = 1 

58 

59 

60class LocationAndSpeedData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods,too-many-instance-attributes 

61 """Parsed data from Location and Speed characteristic.""" 

62 

63 flags: LocationAndSpeedFlags 

64 instantaneous_speed: float | None = None 

65 total_distance: float | None = None 

66 latitude: float | None = None 

67 longitude: float | None = None 

68 elevation: float | None = None 

69 heading: float | None = None 

70 rolling_time: int | None = None 

71 utc_time: datetime | None = None 

72 position_status: PositionStatus | None = None 

73 speed_and_distance_format: SpeedAndDistanceFormat | None = None 

74 elevation_source: ElevationSource | None = None 

75 heading_source: HeadingSource | None = None 

76 

77 

78class LocationAndSpeedCharacteristic(BaseCharacteristic): 

79 """Location and Speed characteristic. 

80 

81 Used to represent data related to a location and speed sensor. 

82 Note that it is possible for this characteristic to exceed the default LE ATT_MTU size. 

83 """ 

84 

85 _manual_value_type: ValueType | str | None = ValueType.DICT # Override since decode_value returns dataclass 

86 

87 min_length = 2 # Flags(2) minimum 

88 max_length = 28 # Flags(2) + InstantaneousSpeed(2) + TotalDistance(3) + Location(8) + 

89 # Elevation(3) + Heading(2) + RollingTime(1) + UTCTime(7) maximum 

90 allow_variable_length: bool = True # Variable optional fields 

91 

92 # Bit masks and shifts for status information in flags 

93 POSITION_STATUS_MASK = 0x0300 

94 POSITION_STATUS_SHIFT = 8 

95 SPEED_DISTANCE_FORMAT_MASK = 0x0400 

96 SPEED_DISTANCE_FORMAT_SHIFT = 10 

97 ELEVATION_SOURCE_MASK = 0x1800 

98 ELEVATION_SOURCE_SHIFT = 11 

99 HEADING_SOURCE_MASK = 0x2000 

100 HEADING_SOURCE_SHIFT = 13 

101 

102 def decode_value(self, data: bytearray, ctx: CharacteristicContext | None = None) -> LocationAndSpeedData: # pylint: disable=too-many-locals 

103 """Parse location and speed data according to Bluetooth specification. 

104 

105 Format: Flags(2) + [Instantaneous Speed(2)] + [Total Distance(3)] + [Location - Latitude(4)] + 

106 [Location - Longitude(4)] + [Elevation(3)] + [Heading(2)] + [Rolling Time(1)] + [UTC Time(7)]. 

107 

108 Args: 

109 data: Raw bytearray from BLE characteristic 

110 ctx: Optional context providing surrounding context (may be None) 

111 

112 Returns: 

113 LocationAndSpeedData containing parsed location and speed data 

114 

115 """ 

116 if len(data) < 2: 

117 raise ValueError("Location and Speed data must be at least 2 bytes") 

118 

119 flags = LocationAndSpeedFlags(DataParser.parse_int16(data, 0, signed=False)) 

120 

121 # Extract status information from flags 

122 position_status_bits = (flags & self.POSITION_STATUS_MASK) >> self.POSITION_STATUS_SHIFT 

123 position_status = PositionStatus(position_status_bits) if position_status_bits <= 3 else None 

124 

125 speed_distance_format_bit = (flags & self.SPEED_DISTANCE_FORMAT_MASK) >> self.SPEED_DISTANCE_FORMAT_SHIFT 

126 speed_and_distance_format = SpeedAndDistanceFormat(speed_distance_format_bit) 

127 

128 elevation_source_bits = (flags & self.ELEVATION_SOURCE_MASK) >> self.ELEVATION_SOURCE_SHIFT 

129 elevation_source = ElevationSource(elevation_source_bits) if elevation_source_bits <= 3 else None 

130 

131 heading_source_bit = (flags & self.HEADING_SOURCE_MASK) >> self.HEADING_SOURCE_SHIFT 

132 heading_source = HeadingSource(heading_source_bit) 

133 

134 # Parse optional fields 

135 instantaneous_speed: float | None = None 

136 total_distance: float | None = None 

137 latitude: float | None = None 

138 longitude: float | None = None 

139 elevation: float | None = None 

140 heading: float | None = None 

141 rolling_time: int | None = None 

142 utc_time: datetime | None = None 

143 offset = 2 

144 

145 if (flags & LocationAndSpeedFlags.INSTANTANEOUS_SPEED_PRESENT) and len(data) >= offset + 2: 

146 # Unit is 1/100 of a m/s 

147 instantaneous_speed = DataParser.parse_int16(data, offset, signed=False) / 100.0 

148 offset += 2 

149 

150 if (flags & LocationAndSpeedFlags.TOTAL_DISTANCE_PRESENT) and len(data) >= offset + 3: 

151 # Unit is 1/10 m 

152 total_distance = DataParser.parse_int24(data, offset, signed=False) / 10.0 

153 offset += 3 

154 

155 if (flags & LocationAndSpeedFlags.LOCATION_PRESENT) and len(data) >= offset + 8: 

156 # Unit is 1*10^-7 degrees 

157 latitude = DataParser.parse_int32(data, offset, signed=True) / 10000000.0 

158 longitude = DataParser.parse_int32(data, offset + 4, signed=True) / 10000000.0 

159 offset += 8 

160 

161 if (flags & LocationAndSpeedFlags.ELEVATION_PRESENT) and len(data) >= offset + 3: 

162 # Unit is 1/100 m 

163 elevation = DataParser.parse_int24(data, offset, signed=True) / 100.0 

164 offset += 3 

165 

166 if (flags & LocationAndSpeedFlags.HEADING_PRESENT) and len(data) >= offset + 2: 

167 # Unit is 1*10^-2 degrees 

168 heading = DataParser.parse_int16(data, offset, signed=False) / 100.0 

169 offset += 2 

170 

171 if (flags & LocationAndSpeedFlags.ROLLING_TIME_PRESENT) and len(data) >= offset + 1: 

172 rolling_time = data[offset] 

173 offset += 1 

174 

175 if (flags & LocationAndSpeedFlags.UTC_TIME_PRESENT) and len(data) >= offset + 7: 

176 utc_time = IEEE11073Parser.parse_timestamp(data, offset) 

177 

178 return LocationAndSpeedData( 

179 flags=flags, 

180 instantaneous_speed=instantaneous_speed, 

181 total_distance=total_distance, 

182 latitude=latitude, 

183 longitude=longitude, 

184 elevation=elevation, 

185 heading=heading, 

186 rolling_time=rolling_time, 

187 utc_time=utc_time, 

188 position_status=position_status, 

189 speed_and_distance_format=speed_and_distance_format, 

190 elevation_source=elevation_source, 

191 heading_source=heading_source, 

192 ) 

193 

194 def encode_value(self, data: LocationAndSpeedData) -> bytearray: 

195 """Encode LocationAndSpeedData back to bytes. 

196 

197 Args: 

198 data: LocationAndSpeedData instance to encode 

199 

200 Returns: 

201 Encoded bytes representing the location and speed data 

202 

203 """ 

204 result = bytearray() 

205 

206 flags = int(data.flags) 

207 

208 # Set status bits in flags 

209 if data.position_status is not None: 

210 flags |= data.position_status.value << self.POSITION_STATUS_SHIFT 

211 

212 if data.speed_and_distance_format is not None: 

213 flags |= data.speed_and_distance_format.value << self.SPEED_DISTANCE_FORMAT_SHIFT 

214 

215 if data.elevation_source is not None: 

216 flags |= data.elevation_source.value << self.ELEVATION_SOURCE_SHIFT 

217 

218 if data.heading_source is not None: 

219 flags |= data.heading_source.value << self.HEADING_SOURCE_SHIFT 

220 

221 result.extend(DataParser.encode_int16(flags, signed=False)) 

222 

223 if data.instantaneous_speed is not None: 

224 speed_value = int(data.instantaneous_speed * 100) 

225 result.extend(DataParser.encode_int16(speed_value, signed=False)) 

226 

227 if data.total_distance is not None: 

228 distance_value = int(data.total_distance * 10) 

229 result.extend(DataParser.encode_int24(distance_value, signed=False)) 

230 

231 if data.latitude is not None and data.longitude is not None: 

232 lat_value = int(data.latitude * 10000000) 

233 lon_value = int(data.longitude * 10000000) 

234 result.extend(DataParser.encode_int32(lat_value, signed=True)) 

235 result.extend(DataParser.encode_int32(lon_value, signed=True)) 

236 

237 if data.elevation is not None: 

238 elevation_value = int(data.elevation * 100) 

239 result.extend(DataParser.encode_int24(elevation_value, signed=True)) 

240 

241 if data.heading is not None: 

242 heading_value = int(data.heading * 100) 

243 result.extend(DataParser.encode_int16(heading_value, signed=False)) 

244 

245 if data.rolling_time is not None: 

246 result.append(data.rolling_time) 

247 

248 if data.utc_time is not None: 

249 result.extend(IEEE11073Parser.encode_timestamp(data.utc_time)) 

250 

251 return result