Coverage for src/bluetooth_sig/device/advertising_parser.py: 21%

215 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-30 00:10 +0000

1"""Advertising data parser for BLE devices. 

2 

3This module provides a dedicated parser for BLE advertising data 

4packets, extracting device information, manufacturer data, and service 

5UUIDs from both legacy and extended advertising formats. 

6""" 

7 

8from __future__ import annotations 

9 

10from ..gatt.characteristics.utils import DataParser 

11from ..types import ( 

12 BLEAdvertisementTypes, 

13 BLEAdvertisingFlags, 

14 BLEAdvertisingPDU, 

15 BLEExtendedHeader, 

16 DeviceAdvertiserData, 

17 ParsedADStructures, 

18 PDUConstants, 

19 PDUFlags, 

20 PDUType, 

21) 

22 

23 

24class AdvertisingParser: # pylint: disable=too-few-public-methods 

25 """Parser for BLE advertising data packets. 

26 

27 Handles both legacy and extended advertising PDU formats, extracting 

28 device information, manufacturer data, and service UUIDs. 

29 """ 

30 

31 def parse_advertising_data(self, raw_data: bytes) -> DeviceAdvertiserData: 

32 """Parse raw advertising data and return structured information. 

33 

34 Args: 

35 raw_data: Raw bytes from BLE advertising packet 

36 

37 Returns: 

38 DeviceAdvertiserData with parsed information 

39 

40 """ 

41 if self._is_extended_advertising_pdu(raw_data): 

42 return self._parse_extended_advertising(raw_data) 

43 return self._parse_legacy_advertising(raw_data) 

44 

45 def _is_extended_advertising_pdu(self, data: bytes) -> bool: 

46 """Check if the advertising data is an extended advertising PDU. 

47 

48 Args: 

49 data: Raw advertising data bytes 

50 

51 Returns: 

52 True if extended advertising PDU, False otherwise 

53 

54 """ 

55 if len(data) < PDUConstants.PDU_HEADER: 

56 return False 

57 

58 pdu_header = data[0] 

59 pdu_type = pdu_header & PDUFlags.TYPE_MASK 

60 

61 return pdu_type in (PDUType.ADV_EXT_IND.value, PDUType.ADV_AUX_IND.value) 

62 

63 def _parse_extended_advertising(self, raw_data: bytes) -> DeviceAdvertiserData: 

64 """Parse extended advertising data. 

65 

66 Args: 

67 raw_data: Raw extended advertising data 

68 

69 Returns: 

70 Parsed DeviceAdvertiserData 

71 

72 """ 

73 if len(raw_data) < PDUConstants.MIN_EXTENDED_PDU: 

74 return self._parse_legacy_advertising(raw_data) 

75 

76 pdu = self._parse_extended_pdu(raw_data) 

77 

78 if not pdu: 

79 return self._parse_legacy_advertising(raw_data) 

80 

81 parsed_data = ParsedADStructures() 

82 

83 if pdu.payload: 

84 parsed_data = self._parse_ad_structures(pdu.payload) 

85 

86 auxiliary_packets: list[BLEAdvertisingPDU] = [] 

87 if pdu.extended_header and pdu.extended_header.auxiliary_pointer: 

88 aux_packets = self._parse_auxiliary_packets(pdu.extended_header.auxiliary_pointer) 

89 auxiliary_packets.extend(aux_packets) 

90 

91 return DeviceAdvertiserData( 

92 raw_data=raw_data, 

93 local_name=parsed_data.local_name, 

94 manufacturer_data=parsed_data.manufacturer_data, 

95 service_uuids=parsed_data.service_uuids, 

96 tx_power=parsed_data.tx_power, 

97 flags=parsed_data.flags, 

98 extended_payload=pdu.payload, 

99 auxiliary_packets=auxiliary_packets, 

100 ) 

101 

102 def _parse_extended_pdu(self, data: bytes) -> BLEAdvertisingPDU | None: 

103 """Parse extended PDU header and payload. 

104 

105 Args: 

106 data: Raw PDU data 

107 

108 Returns: 

109 Parsed BLEAdvertisingPDU or None if invalid 

110 

111 """ 

112 if len(data) < PDUConstants.MIN_EXTENDED_PDU: 

113 return None 

114 

115 header = int.from_bytes(data[0 : PDUConstants.PDU_HEADER], byteorder="little") 

116 pdu_type = header & PDUFlags.TYPE_MASK 

117 tx_add = bool(header & PDUFlags.TX_ADD_MASK) 

118 rx_add = bool(header & PDUFlags.RX_ADD_MASK) 

119 

120 length = data[PDUConstants.PDU_LENGTH_OFFSET] 

121 

122 if len(data) < PDUConstants.MIN_EXTENDED_PDU + length: 

123 return None 

124 

125 extended_header_start = PDUConstants.EXTENDED_HEADER_START 

126 

127 extended_header = self._parse_extended_header(data[extended_header_start:]) 

128 

129 if not extended_header: 

130 return None 

131 

132 payload_start = extended_header_start + extended_header.extended_header_length + PDUConstants.EXT_HEADER_LENGTH 

133 payload_length = length - (extended_header.extended_header_length + PDUConstants.EXT_HEADER_LENGTH) 

134 

135 if payload_start + payload_length > len(data): 

136 return None 

137 

138 payload = data[payload_start : payload_start + payload_length] 

139 

140 adva = extended_header.extended_advertiser_address 

141 targeta = extended_header.extended_target_address 

142 

143 return BLEAdvertisingPDU( 

144 pdu_type=PDUType(pdu_type), 

145 tx_add=tx_add, 

146 rx_add=rx_add, 

147 length=length, 

148 advertiser_address=adva, 

149 target_address=targeta, 

150 payload=payload, 

151 extended_header=extended_header, 

152 ) 

153 

154 def _parse_extended_header(self, data: bytes) -> BLEExtendedHeader | None: 

155 """Parse extended header from PDU data. 

156 

157 Args: 

158 data: Extended header data 

159 

160 Returns: 

161 Parsed BLEExtendedHeader or None if invalid 

162 

163 """ 

164 # pylint: disable=too-many-return-statements,too-many-branches 

165 if len(data) < 1: 

166 return None 

167 

168 header = BLEExtendedHeader() 

169 header.extended_header_length = data[0] 

170 

171 if len(data) < header.extended_header_length + 1: 

172 return None 

173 

174 adv_mode = data[1] 

175 header.adv_mode = adv_mode 

176 

177 offset = PDUConstants.ADV_ADDR_OFFSET # Start after length and mode bytes 

178 

179 if header.has_extended_advertiser_address: 

180 if offset + PDUConstants.BLE_ADDR > len(data): 

181 return None 

182 header.extended_advertiser_address = data[offset : offset + PDUConstants.BLE_ADDR] 

183 offset += PDUConstants.BLE_ADDR 

184 

185 if header.has_extended_target_address: 

186 if offset + PDUConstants.BLE_ADDR > len(data): 

187 return None 

188 header.extended_target_address = data[offset : offset + PDUConstants.BLE_ADDR] 

189 offset += PDUConstants.BLE_ADDR 

190 

191 if header.has_cte_info: 

192 if offset + PDUConstants.CTE_INFO > len(data): 

193 return None 

194 header.cte_info = data[offset : offset + PDUConstants.CTE_INFO] 

195 offset += PDUConstants.CTE_INFO 

196 

197 if header.has_advertising_data_info: 

198 if offset + PDUConstants.ADV_DATA_INFO > len(data): 

199 return None 

200 header.advertising_data_info = data[offset : offset + PDUConstants.ADV_DATA_INFO] 

201 offset += PDUConstants.ADV_DATA_INFO 

202 

203 if header.has_auxiliary_pointer: 

204 if offset + PDUConstants.AUX_PTR > len(data): 

205 return None 

206 header.auxiliary_pointer = data[offset : offset + PDUConstants.AUX_PTR] 

207 offset += PDUConstants.AUX_PTR 

208 

209 if header.has_sync_info: 

210 if offset + PDUConstants.SYNC_INFO > len(data): 

211 return None 

212 header.sync_info = data[offset : offset + PDUConstants.SYNC_INFO] 

213 offset += PDUConstants.SYNC_INFO 

214 

215 if header.has_tx_power: 

216 if offset + PDUConstants.TX_POWER > len(data): 

217 return None 

218 header.tx_power = int.from_bytes( 

219 data[offset : offset + PDUConstants.TX_POWER], 

220 byteorder="little", 

221 signed=True, 

222 ) 

223 offset += PDUConstants.TX_POWER 

224 

225 if header.has_additional_controller_data: 

226 header.additional_controller_advertising_data = data[offset:] 

227 

228 return header 

229 

230 def _parse_auxiliary_packets(self, aux_ptr: bytes) -> list[BLEAdvertisingPDU]: 

231 """Parse auxiliary packets referenced by auxiliary pointer. 

232 

233 Args: 

234 aux_ptr: Auxiliary pointer data 

235 

236 Returns: 

237 List of auxiliary packets (currently returns empty list) 

238 

239 """ 

240 if len(aux_ptr) != PDUConstants.AUX_PTR: 

241 return [] 

242 

243 return [] 

244 

245 def _parse_legacy_advertising(self, raw_data: bytes) -> DeviceAdvertiserData: 

246 """Parse legacy advertising data. 

247 

248 Args: 

249 raw_data: Raw legacy advertising data 

250 

251 Returns: 

252 Parsed DeviceAdvertiserData 

253 

254 """ 

255 parsed_data = self._parse_ad_structures(raw_data) 

256 

257 return DeviceAdvertiserData( 

258 raw_data=raw_data, 

259 local_name=parsed_data.local_name, 

260 manufacturer_data=parsed_data.manufacturer_data, 

261 service_uuids=parsed_data.service_uuids, 

262 tx_power=parsed_data.tx_power if parsed_data.tx_power != 0 else None, 

263 flags=parsed_data.flags if parsed_data.flags != 0 else None, 

264 appearance=parsed_data.appearance, 

265 service_data=parsed_data.service_data, 

266 solicited_service_uuids=parsed_data.solicited_service_uuids, 

267 uri=parsed_data.uri, 

268 indoor_positioning=parsed_data.indoor_positioning, 

269 transport_discovery_data=parsed_data.transport_discovery_data, 

270 le_supported_features=parsed_data.le_supported_features, 

271 encrypted_advertising_data=parsed_data.encrypted_advertising_data, 

272 periodic_advertising_response_timing=parsed_data.periodic_advertising_response_timing, 

273 electronic_shelf_label=parsed_data.electronic_shelf_label, 

274 three_d_information=parsed_data.three_d_information, 

275 broadcast_name=parsed_data.broadcast_name, 

276 biginfo=parsed_data.biginfo, 

277 mesh_message=parsed_data.mesh_message, 

278 mesh_beacon=parsed_data.mesh_beacon, 

279 public_target_address=parsed_data.public_target_address, 

280 random_target_address=parsed_data.random_target_address, 

281 advertising_interval=parsed_data.advertising_interval, 

282 advertising_interval_long=parsed_data.advertising_interval_long, 

283 le_bluetooth_device_address=parsed_data.le_bluetooth_device_address, 

284 le_role=parsed_data.le_role, 

285 class_of_device=parsed_data.class_of_device, 

286 simple_pairing_hash_c=parsed_data.simple_pairing_hash_c, 

287 simple_pairing_randomizer_r=parsed_data.simple_pairing_randomizer_r, 

288 security_manager_tk_value=parsed_data.security_manager_tk_value, 

289 security_manager_out_of_band_flags=parsed_data.security_manager_out_of_band_flags, 

290 slave_connection_interval_range=parsed_data.slave_connection_interval_range, 

291 secure_connections_confirmation=parsed_data.secure_connections_confirmation, 

292 secure_connections_random=parsed_data.secure_connections_random, 

293 channel_map_update_indication=parsed_data.channel_map_update_indication, 

294 pb_adv=parsed_data.pb_adv, 

295 resolvable_set_identifier=parsed_data.resolvable_set_identifier, 

296 ) 

297 

298 def _parse_ad_structures(self, data: bytes) -> ParsedADStructures: 

299 """Parse advertising data structures from raw bytes. 

300 

301 Args: 

302 data: Raw advertising data payload 

303 

304 Returns: 

305 ParsedADStructures object with extracted data 

306 

307 """ 

308 # pylint: disable=too-many-branches,too-many-statements 

309 parsed = ParsedADStructures() 

310 

311 i = 0 

312 while i < len(data): 

313 if i + 1 >= len(data): 

314 break 

315 

316 length = data[i] 

317 if length == 0 or i + length + 1 > len(data): 

318 break 

319 

320 ad_type = data[i + 1] 

321 ad_data = data[i + 2 : i + length + 1] 

322 

323 if ad_type == BLEAdvertisementTypes.FLAGS and len(ad_data) >= 1: 

324 parsed.flags = BLEAdvertisingFlags(ad_data[0]) 

325 elif ad_type in ( 

326 BLEAdvertisementTypes.INCOMPLETE_16BIT_SERVICE_UUIDS, 

327 BLEAdvertisementTypes.COMPLETE_16BIT_SERVICE_UUIDS, 

328 ): 

329 for j in range(0, len(ad_data), 2): 

330 if j + 1 < len(ad_data): 

331 uuid_short = DataParser.parse_int16(ad_data, j, signed=False) 

332 parsed.service_uuids.append(f"{uuid_short:04X}") 

333 elif ad_type in ( 

334 BLEAdvertisementTypes.SHORTENED_LOCAL_NAME, 

335 BLEAdvertisementTypes.COMPLETE_LOCAL_NAME, 

336 ): 

337 try: 

338 parsed.local_name = ad_data.decode("utf-8") 

339 except UnicodeDecodeError: 

340 parsed.local_name = ad_data.hex() 

341 elif ad_type == BLEAdvertisementTypes.TX_POWER_LEVEL and len(ad_data) >= 1: 

342 parsed.tx_power = int.from_bytes(ad_data[:1], byteorder="little", signed=True) 

343 elif ad_type == BLEAdvertisementTypes.MANUFACTURER_SPECIFIC_DATA and len(ad_data) >= 2: 

344 company_id = DataParser.parse_int16(ad_data, 0, signed=False) 

345 parsed.manufacturer_data[company_id] = ad_data[2:] 

346 elif ad_type == BLEAdvertisementTypes.APPEARANCE and len(ad_data) >= 2: 

347 parsed.appearance = DataParser.parse_int16(ad_data, 0, signed=False) 

348 elif ad_type == BLEAdvertisementTypes.SERVICE_DATA_16BIT and len(ad_data) >= 2: 

349 service_uuid = f"{DataParser.parse_int16(ad_data, 0, signed=False):04X}" 

350 parsed.service_data[service_uuid] = ad_data[2:] 

351 elif ad_type == BLEAdvertisementTypes.URI: 

352 try: 

353 parsed.uri = ad_data.decode("utf-8") 

354 except UnicodeDecodeError: 

355 parsed.uri = ad_data.hex() 

356 elif ad_type == BLEAdvertisementTypes.INDOOR_POSITIONING: 

357 parsed.indoor_positioning = ad_data 

358 elif ad_type == BLEAdvertisementTypes.TRANSPORT_DISCOVERY_DATA: 

359 parsed.transport_discovery_data = ad_data 

360 elif ad_type == BLEAdvertisementTypes.LE_SUPPORTED_FEATURES: 

361 parsed.le_supported_features = ad_data 

362 elif ad_type == BLEAdvertisementTypes.ENCRYPTED_ADVERTISING_DATA: 

363 parsed.encrypted_advertising_data = ad_data 

364 elif ad_type == BLEAdvertisementTypes.PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION: 

365 parsed.periodic_advertising_response_timing = ad_data 

366 elif ad_type == BLEAdvertisementTypes.ELECTRONIC_SHELF_LABEL: 

367 parsed.electronic_shelf_label = ad_data 

368 elif ad_type == BLEAdvertisementTypes.THREE_D_INFORMATION_DATA: 

369 parsed.three_d_information = ad_data 

370 elif ad_type == BLEAdvertisementTypes.BROADCAST_NAME: 

371 try: 

372 parsed.broadcast_name = ad_data.decode("utf-8") 

373 except UnicodeDecodeError: 

374 parsed.broadcast_name = ad_data.hex() 

375 elif ad_type == BLEAdvertisementTypes.BROADCAST_CODE: 

376 parsed.broadcast_code = ad_data 

377 elif ad_type == BLEAdvertisementTypes.BIGINFO: 

378 parsed.biginfo = ad_data 

379 elif ad_type == BLEAdvertisementTypes.MESH_MESSAGE: 

380 parsed.mesh_message = ad_data 

381 elif ad_type == BLEAdvertisementTypes.MESH_BEACON: 

382 parsed.mesh_beacon = ad_data 

383 elif ad_type == BLEAdvertisementTypes.PUBLIC_TARGET_ADDRESS: 

384 for j in range(0, len(ad_data), 6): 

385 if j + 5 < len(ad_data): 

386 addr_bytes = ad_data[j : j + 6] 

387 addr_str = ":".join(f"{b:02X}" for b in addr_bytes[::-1]) 

388 parsed.public_target_address.append(addr_str) 

389 elif ad_type == BLEAdvertisementTypes.RANDOM_TARGET_ADDRESS: 

390 for j in range(0, len(ad_data), 6): 

391 if j + 5 < len(ad_data): 

392 addr_bytes = ad_data[j : j + 6] 

393 addr_str = ":".join(f"{b:02X}" for b in addr_bytes[::-1]) 

394 parsed.random_target_address.append(addr_str) 

395 elif ad_type == BLEAdvertisementTypes.ADVERTISING_INTERVAL and len(ad_data) >= 2: 

396 parsed.advertising_interval = DataParser.parse_int16(ad_data, 0, signed=False) 

397 elif ad_type == BLEAdvertisementTypes.ADVERTISING_INTERVAL_LONG and len(ad_data) >= 3: 

398 parsed.advertising_interval_long = int.from_bytes(ad_data[:3], byteorder="little", signed=False) 

399 elif ad_type == BLEAdvertisementTypes.LE_BLUETOOTH_DEVICE_ADDRESS and len(ad_data) >= 6: 

400 addr_bytes = ad_data[:6] 

401 parsed.le_bluetooth_device_address = ":".join(f"{b:02X}" for b in addr_bytes[::-1]) 

402 elif ad_type == BLEAdvertisementTypes.LE_ROLE and len(ad_data) >= 1: 

403 parsed.le_role = ad_data[0] 

404 elif ad_type == BLEAdvertisementTypes.CLASS_OF_DEVICE and len(ad_data) >= 3: 

405 parsed.class_of_device = int.from_bytes(ad_data[:3], byteorder="little", signed=False) 

406 elif ad_type == BLEAdvertisementTypes.SIMPLE_PAIRING_HASH_C: 

407 parsed.simple_pairing_hash_c = ad_data 

408 elif ad_type == BLEAdvertisementTypes.SIMPLE_PAIRING_RANDOMIZER_R: 

409 parsed.simple_pairing_randomizer_r = ad_data 

410 elif ad_type == BLEAdvertisementTypes.SECURITY_MANAGER_TK_VALUE: 

411 parsed.security_manager_tk_value = ad_data 

412 elif ad_type == BLEAdvertisementTypes.SECURITY_MANAGER_OUT_OF_BAND_FLAGS: 

413 parsed.security_manager_out_of_band_flags = ad_data 

414 elif ad_type == BLEAdvertisementTypes.SLAVE_CONNECTION_INTERVAL_RANGE: 

415 parsed.slave_connection_interval_range = ad_data 

416 elif ad_type == BLEAdvertisementTypes.SECURE_CONNECTIONS_CONFIRMATION_VALUE: 

417 parsed.secure_connections_confirmation = ad_data 

418 elif ad_type == BLEAdvertisementTypes.SECURE_CONNECTIONS_RANDOM_VALUE: 

419 parsed.secure_connections_random = ad_data 

420 elif ad_type == BLEAdvertisementTypes.CHANNEL_MAP_UPDATE_INDICATION: 

421 parsed.channel_map_update_indication = ad_data 

422 elif ad_type == BLEAdvertisementTypes.PB_ADV: 

423 parsed.pb_adv = ad_data 

424 elif ad_type == BLEAdvertisementTypes.RESOLVABLE_SET_IDENTIFIER: 

425 parsed.resolvable_set_identifier = ad_data 

426 

427 i += length + 1 

428 

429 return parsed