Coverage for src / bluetooth_sig / gatt / characteristics / location_and_speed.py: 92%

130 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 11:17 +0000

1"""Location and Speed characteristic implementation.""" 

2 

3from __future__ import annotations 

4 

5from datetime import datetime 

6from enum import IntEnum, IntFlag 

7 

8import msgspec 

9 

10from ...types.location import PositionStatus 

11from ..context import CharacteristicContext 

12from .base import BaseCharacteristic 

13from .utils import DataParser, IEEE11073Parser 

14 

15 

16class LocationAndSpeedFlags(IntFlag): 

17 """Location and Speed flags as per Bluetooth SIG specification.""" 

18 

19 INSTANTANEOUS_SPEED_PRESENT = 0x0001 

20 TOTAL_DISTANCE_PRESENT = 0x0002 

21 LOCATION_PRESENT = 0x0004 

22 ELEVATION_PRESENT = 0x0008 

23 HEADING_PRESENT = 0x0010 

24 ROLLING_TIME_PRESENT = 0x0020 

25 UTC_TIME_PRESENT = 0x0040 

26 

27 

28class SpeedAndDistanceFormat(IntEnum): 

29 """Speed and distance format enumeration.""" 

30 

31 FORMAT_2D = 0 

32 FORMAT_3D = 1 

33 

34 

35class ElevationSource(IntEnum): 

36 """Elevation source enumeration.""" 

37 

38 POSITIONING_SYSTEM = 0 

39 BAROMETRIC_AIR_PRESSURE = 1 

40 DATABASE_SERVICE = 2 

41 OTHER = 3 

42 

43 

44class HeadingSource(IntEnum): 

45 """Heading source enumeration.""" 

46 

47 HEADING_BASED_ON_MOVEMENT = 0 

48 HEADING_BASED_ON_MAGNETIC_COMPASS = 1 

49 

50 

51class LocationAndSpeedData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods,too-many-instance-attributes 

52 """Parsed data from Location and Speed characteristic.""" 

53 

54 flags: LocationAndSpeedFlags 

55 instantaneous_speed: float | None = None 

56 total_distance: float | None = None 

57 latitude: float | None = None 

58 longitude: float | None = None 

59 elevation: float | None = None 

60 heading: float | None = None 

61 rolling_time: int | None = None 

62 utc_time: datetime | None = None 

63 position_status: PositionStatus | None = None 

64 speed_and_distance_format: SpeedAndDistanceFormat | None = None 

65 elevation_source: ElevationSource | None = None 

66 heading_source: HeadingSource | None = None 

67 

68 

69class LocationAndSpeedCharacteristic(BaseCharacteristic[LocationAndSpeedData]): 

70 """Location and Speed characteristic. 

71 

72 Used to represent data related to a location and speed sensor. 

73 Note that it is possible for this characteristic to exceed the default LE ATT_MTU size. 

74 """ 

75 

76 min_length = 2 # Flags(2) minimum 

77 max_length = 28 # Flags(2) + InstantaneousSpeed(2) + TotalDistance(3) + Location(8) + 

78 # Elevation(3) + Heading(2) + RollingTime(1) + UTCTime(7) maximum 

79 allow_variable_length: bool = True # Variable optional fields 

80 

81 # Bit masks and shifts for status information in flags 

82 POSITION_STATUS_MASK = 0x0300 

83 POSITION_STATUS_SHIFT = 8 

84 SPEED_DISTANCE_FORMAT_MASK = 0x0400 

85 SPEED_DISTANCE_FORMAT_SHIFT = 10 

86 ELEVATION_SOURCE_MASK = 0x1800 

87 ELEVATION_SOURCE_SHIFT = 11 

88 HEADING_SOURCE_MASK = 0x2000 

89 HEADING_SOURCE_SHIFT = 13 

90 

91 # Maximum valid enum values 

92 _MAX_POSITION_STATUS_VALUE = 3 

93 _MAX_ELEVATION_SOURCE_VALUE = 3 

94 

95 def _decode_value( # pylint: disable=too-many-locals # Location spec with many positional fields 

96 self, data: bytearray, ctx: CharacteristicContext | None = None, *, validate: bool = True 

97 ) -> LocationAndSpeedData: 

98 """Parse location and speed data according to Bluetooth specification. 

99 

100 Format: Flags(2) + [Instantaneous Speed(2)] + [Total Distance(3)] + [Location - Latitude(4)] + 

101 [Location - Longitude(4)] + [Elevation(3)] + [Heading(2)] + [Rolling Time(1)] + [UTC Time(7)]. 

102 

103 Args: 

104 data: Raw bytearray from BLE characteristic 

105 ctx: Optional context providing surrounding context (may be None) 

106 validate: Whether to validate ranges (default True) 

107 

108 Returns: 

109 LocationAndSpeedData containing parsed location and speed data 

110 

111 """ 

112 flags = LocationAndSpeedFlags(DataParser.parse_int16(data, 0, signed=False)) 

113 

114 # Extract status information from flags 

115 position_status_bits = (flags & self.POSITION_STATUS_MASK) >> self.POSITION_STATUS_SHIFT 

116 position_status = ( 

117 PositionStatus(position_status_bits) if position_status_bits <= self._MAX_POSITION_STATUS_VALUE else None 

118 ) 

119 

120 speed_distance_format_bit = (flags & self.SPEED_DISTANCE_FORMAT_MASK) >> self.SPEED_DISTANCE_FORMAT_SHIFT 

121 speed_and_distance_format = SpeedAndDistanceFormat(speed_distance_format_bit) 

122 

123 elevation_source_bits = (flags & self.ELEVATION_SOURCE_MASK) >> self.ELEVATION_SOURCE_SHIFT 

124 elevation_source = ( 

125 ElevationSource(elevation_source_bits) 

126 if elevation_source_bits <= self._MAX_ELEVATION_SOURCE_VALUE 

127 else None 

128 ) 

129 

130 heading_source_bit = (flags & self.HEADING_SOURCE_MASK) >> self.HEADING_SOURCE_SHIFT 

131 heading_source = HeadingSource(heading_source_bit) 

132 

133 # Parse optional fields 

134 instantaneous_speed: float | None = None 

135 total_distance: float | None = None 

136 latitude: float | None = None 

137 longitude: float | None = None 

138 elevation: float | None = None 

139 heading: float | None = None 

140 rolling_time: int | None = None 

141 utc_time: datetime | None = None 

142 offset = 2 

143 

144 if (flags & LocationAndSpeedFlags.INSTANTANEOUS_SPEED_PRESENT) and len(data) >= offset + 2: 

145 # Unit is 1/100 of a m/s 

146 instantaneous_speed = DataParser.parse_int16(data, offset, signed=False) / 100.0 

147 offset += 2 

148 

149 if (flags & LocationAndSpeedFlags.TOTAL_DISTANCE_PRESENT) and len(data) >= offset + 3: 

150 # Unit is 1/10 m 

151 total_distance = DataParser.parse_int24(data, offset, signed=False) / 10.0 

152 offset += 3 

153 

154 if (flags & LocationAndSpeedFlags.LOCATION_PRESENT) and len(data) >= offset + 8: 

155 # Unit is 1*10^-7 degrees 

156 latitude = DataParser.parse_int32(data, offset, signed=True) / 10000000.0 

157 longitude = DataParser.parse_int32(data, offset + 4, signed=True) / 10000000.0 

158 offset += 8 

159 

160 if (flags & LocationAndSpeedFlags.ELEVATION_PRESENT) and len(data) >= offset + 3: 

161 # Unit is 1/100 m 

162 elevation = DataParser.parse_int24(data, offset, signed=True) / 100.0 

163 offset += 3 

164 

165 if (flags & LocationAndSpeedFlags.HEADING_PRESENT) and len(data) >= offset + 2: 

166 # Unit is 1*10^-2 degrees 

167 heading = DataParser.parse_int16(data, offset, signed=False) / 100.0 

168 offset += 2 

169 

170 if (flags & LocationAndSpeedFlags.ROLLING_TIME_PRESENT) and len(data) >= offset + 1: 

171 rolling_time = data[offset] 

172 offset += 1 

173 

174 if (flags & LocationAndSpeedFlags.UTC_TIME_PRESENT) and len(data) >= offset + 7: 

175 utc_time = IEEE11073Parser.parse_timestamp(data, offset) 

176 

177 return LocationAndSpeedData( 

178 flags=flags, 

179 instantaneous_speed=instantaneous_speed, 

180 total_distance=total_distance, 

181 latitude=latitude, 

182 longitude=longitude, 

183 elevation=elevation, 

184 heading=heading, 

185 rolling_time=rolling_time, 

186 utc_time=utc_time, 

187 position_status=position_status, 

188 speed_and_distance_format=speed_and_distance_format, 

189 elevation_source=elevation_source, 

190 heading_source=heading_source, 

191 ) 

192 

193 def _encode_value(self, data: LocationAndSpeedData) -> bytearray: 

194 """Encode LocationAndSpeedData back to bytes. 

195 

196 Args: 

197 data: LocationAndSpeedData instance to encode 

198 

199 Returns: 

200 Encoded bytes representing the location and speed data 

201 

202 """ 

203 result = bytearray() 

204 

205 flags = int(data.flags) 

206 

207 # Set status bits in flags 

208 if data.position_status is not None: 

209 flags |= data.position_status.value << self.POSITION_STATUS_SHIFT 

210 

211 if data.speed_and_distance_format is not None: 

212 flags |= data.speed_and_distance_format.value << self.SPEED_DISTANCE_FORMAT_SHIFT 

213 

214 if data.elevation_source is not None: 

215 flags |= data.elevation_source.value << self.ELEVATION_SOURCE_SHIFT 

216 

217 if data.heading_source is not None: 

218 flags |= data.heading_source.value << self.HEADING_SOURCE_SHIFT 

219 

220 result.extend(DataParser.encode_int16(flags, signed=False)) 

221 

222 if data.instantaneous_speed is not None: 

223 speed_value = int(data.instantaneous_speed * 100) 

224 result.extend(DataParser.encode_int16(speed_value, signed=False)) 

225 

226 if data.total_distance is not None: 

227 distance_value = int(data.total_distance * 10) 

228 result.extend(DataParser.encode_int24(distance_value, signed=False)) 

229 

230 if data.latitude is not None and data.longitude is not None: 

231 lat_value = int(data.latitude * 10000000) 

232 lon_value = int(data.longitude * 10000000) 

233 result.extend(DataParser.encode_int32(lat_value, signed=True)) 

234 result.extend(DataParser.encode_int32(lon_value, signed=True)) 

235 

236 if data.elevation is not None: 

237 elevation_value = int(data.elevation * 100) 

238 result.extend(DataParser.encode_int24(elevation_value, signed=True)) 

239 

240 if data.heading is not None: 

241 heading_value = int(data.heading * 100) 

242 result.extend(DataParser.encode_int16(heading_value, signed=False)) 

243 

244 if data.rolling_time is not None: 

245 result.append(data.rolling_time) 

246 

247 if data.utc_time is not None: 

248 result.extend(IEEE11073Parser.encode_timestamp(data.utc_time)) 

249 

250 return result