Coverage for src / bluetooth_sig / advertising / pdu_parser.py: 54%

374 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 11:17 +0000

1"""BLE Advertising PDU parser. 

2 

3This module provides a parser for BLE advertising PDU data packets, 

4extracting device information, manufacturer data, and service UUIDs 

5from both legacy and extended advertising formats. 

6 

7This is the low-level BLE spec parser. For interpreting vendor-specific 

8sensor data (e.g., Xiaomi, RuuviTag, BTHome), see the AdvertisingDataInterpreter 

9base class. 

10""" 

11 

12from __future__ import annotations 

13 

14import logging 

15 

16from bluetooth_sig.gatt.characteristics.utils import DataParser 

17from bluetooth_sig.gatt.constants import SIZE_UINT16, SIZE_UINT24, SIZE_UINT32, SIZE_UINT48, SIZE_UUID128 

18from bluetooth_sig.registry.core.ad_types import ad_types_registry 

19from bluetooth_sig.registry.core.appearance_values import appearance_values_registry 

20from bluetooth_sig.registry.core.class_of_device import class_of_device_registry 

21from bluetooth_sig.types import ManufacturerData 

22from bluetooth_sig.types.ad_types_constants import ADType 

23from bluetooth_sig.types.advertising.ad_structures import ( 

24 AdvertisingDataStructures, 

25 ConnectionIntervalRange, 

26 ExtendedAdvertisingData, 

27) 

28from bluetooth_sig.types.advertising.channel_map_update import ChannelMapUpdateIndication 

29from bluetooth_sig.types.advertising.extended import ( 

30 AdvertisingDataInfo, 

31 AuxiliaryPointer, 

32 CTEInfo, 

33 CTEType, 

34 PHYType, 

35 SyncInfo, 

36) 

37from bluetooth_sig.types.advertising.features import LEFeatures 

38from bluetooth_sig.types.advertising.flags import BLEAdvertisingFlags 

39from bluetooth_sig.types.advertising.indoor_positioning import IndoorPositioningData 

40from bluetooth_sig.types.advertising.pdu import ( 

41 BLEAdvertisingPDU, 

42 BLEExtendedHeader, 

43 ExtendedHeaderFlags, 

44 PDUHeaderFlags, 

45 PDULayout, 

46 PDUType, 

47) 

48from bluetooth_sig.types.advertising.result import AdvertisingData 

49from bluetooth_sig.types.advertising.three_d_information import ThreeDInformationData 

50from bluetooth_sig.types.advertising.transport_discovery import TransportDiscoveryData 

51from bluetooth_sig.types.appearance import AppearanceData 

52from bluetooth_sig.types.mesh import ( 

53 MeshBeaconType, 

54 MeshMessage, 

55 ProvisioningBearerData, 

56 SecureNetworkBeacon, 

57 UnprovisionedDeviceBeacon, 

58) 

59from bluetooth_sig.types.uri import URIData 

60from bluetooth_sig.types.uuid import BluetoothUUID 

61 

62logger = logging.getLogger(__name__) 

63 

64 

65class _ExtendedHeaderBitMasks: 

66 """Bit masks for parsing extended header fields (Core Spec Vol 6, Part B).""" 

67 

68 # CTE Info field (1 byte) 

69 CTE_TIME_MASK: int = 0x1F # Bits 0-4: CTE time (0-19) 

70 CTE_TYPE_SHIFT: int = 6 

71 CTE_TYPE_MASK: int = 0x03 # Bits 6-7: CTE type 

72 

73 # Advertising Data Info (ADI) field (2 bytes) 

74 ADI_DID_MASK: int = 0x0FFF # Bits 0-11: Data ID 

75 ADI_SID_SHIFT: int = 12 

76 ADI_SID_MASK: int = 0x0F # Bits 12-15: Set ID 

77 

78 # Auxiliary Pointer field (3 bytes) 

79 AUX_CHANNEL_MASK: int = 0x3F # Bits 0-5: Channel index 

80 AUX_CA_SHIFT: int = 6 

81 AUX_OFFSET_UNITS_SHIFT: int = 7 

82 AUX_OFFSET_SHIFT: int = 8 

83 AUX_OFFSET_MASK: int = 0x1FFF # Bits 8-20: 13-bit offset 

84 AUX_PHY_SHIFT: int = 21 

85 AUX_PHY_MASK: int = 0x07 # Bits 21-23: PHY 

86 

87 # SyncInfo field (18 bytes) 

88 SYNC_OFFSET_MASK: int = 0x1FFF # Bits 0-12: 13-bit offset 

89 SYNC_OFFSET_UNITS_SHIFT: int = 13 

90 SYNC_CHANNEL_MAP_MASK: int = 0x1FFFFFFFFF # 37-bit channel map 

91 SYNC_SCA_SHIFT: int = 5 

92 SYNC_SCA_MASK: int = 0x07 # Bits 5-7: SCA 

93 SYNC_ADDR_TYPE_SHIFT: int = 4 

94 

95 

96class AdvertisingPDUParser: # pylint: disable=too-few-public-methods 

97 """Parser for BLE advertising PDU data packets. 

98 

99 Parses raw BLE advertising PDU bytes into structured AdvertisingData, 

100 handling both legacy and extended advertising formats. 

101 

102 This is the low-level parsing layer that extracts: 

103 - Manufacturer data (company_id → payload) 

104 - Service data (UUID → payload) 

105 - Flags, local name, appearance, TX power 

106 - Extended advertising fields (BLE 5.0+) 

107 

108 For vendor-specific interpretation (e.g., BTHome sensor values), 

109 use AdvertisingDataInterpreter subclasses. 

110 """ 

111 

112 def parse_advertising_data(self, raw_data: bytes) -> AdvertisingData: 

113 """Parse raw advertising data and return structured information. 

114 

115 Args: 

116 raw_data: Raw bytes from BLE advertising packet 

117 

118 Returns: 

119 AdvertisingData with parsed information 

120 

121 """ 

122 if self._is_extended_advertising_pdu(raw_data): 

123 return self._parse_extended_advertising(raw_data) 

124 return self._parse_legacy_advertising(raw_data) 

125 

126 def _is_extended_advertising_pdu(self, data: bytes) -> bool: 

127 """Check if the advertising data is an extended advertising PDU. 

128 

129 Args: 

130 data: Raw advertising data bytes 

131 

132 Returns: 

133 True if extended advertising PDU, False otherwise 

134 

135 """ 

136 if len(data) < PDULayout.PDU_HEADER: 

137 return False 

138 

139 pdu_header = data[0] 

140 pdu_type = pdu_header & PDUHeaderFlags.TYPE_MASK 

141 

142 return pdu_type in (PDUType.ADV_EXT_IND.value, PDUType.ADV_AUX_IND.value) 

143 

144 def _parse_extended_advertising(self, raw_data: bytes) -> AdvertisingData: 

145 """Parse extended advertising data. 

146 

147 Args: 

148 raw_data: Raw extended advertising data 

149 

150 Returns: 

151 Parsed AdvertisingData 

152 

153 """ 

154 if len(raw_data) < PDULayout.MIN_EXTENDED_PDU: 

155 return self._parse_legacy_advertising(raw_data) 

156 

157 pdu = self._parse_extended_pdu(raw_data) 

158 

159 if not pdu: 

160 return self._parse_legacy_advertising(raw_data) 

161 

162 parsed_data = AdvertisingDataStructures() 

163 

164 if pdu.payload: 

165 parsed_data = self._parse_ad_structures(pdu.payload) 

166 

167 auxiliary_packets: list[BLEAdvertisingPDU] = [] 

168 if pdu.extended_header and pdu.extended_header.auxiliary_pointer: 

169 aux_packets = self._parse_auxiliary_packets(pdu.extended_header.auxiliary_pointer) 

170 auxiliary_packets.extend(aux_packets) 

171 

172 return AdvertisingData( 

173 raw_data=raw_data, 

174 ad_structures=parsed_data, 

175 extended=ExtendedAdvertisingData( 

176 extended_payload=pdu.payload, 

177 auxiliary_packets=auxiliary_packets, 

178 ), 

179 ) 

180 

181 def _parse_extended_pdu(self, data: bytes) -> BLEAdvertisingPDU | None: 

182 """Parse extended PDU header and payload. 

183 

184 Args: 

185 data: Raw PDU data 

186 

187 Returns: 

188 Parsed BLEAdvertisingPDU or None if invalid 

189 

190 """ 

191 if len(data) < PDULayout.MIN_EXTENDED_PDU: 

192 return None 

193 

194 header = DataParser.parse_int16(data, 0, signed=False) 

195 pdu_type = header & PDUHeaderFlags.TYPE_MASK 

196 tx_add = bool(header & PDUHeaderFlags.TX_ADD_MASK) 

197 rx_add = bool(header & PDUHeaderFlags.RX_ADD_MASK) 

198 

199 length = data[PDULayout.PDU_LENGTH_OFFSET] 

200 

201 if len(data) < PDULayout.MIN_EXTENDED_PDU + length: 

202 return None 

203 

204 extended_header_start = PDULayout.EXTENDED_HEADER_START 

205 

206 extended_header = self._parse_extended_header(data[extended_header_start:]) 

207 

208 if not extended_header: 

209 return None 

210 

211 payload_start = extended_header_start + extended_header.extended_header_length + PDULayout.EXT_HEADER_LENGTH 

212 payload_length = length - (extended_header.extended_header_length + PDULayout.EXT_HEADER_LENGTH) 

213 

214 if payload_start + payload_length > len(data): 

215 return None 

216 

217 payload = data[payload_start : payload_start + payload_length] 

218 

219 adva = extended_header.extended_advertiser_address 

220 targeta = extended_header.extended_target_address 

221 

222 return BLEAdvertisingPDU( 

223 pdu_type=PDUType(pdu_type), 

224 tx_add=tx_add, 

225 rx_add=rx_add, 

226 length=length, 

227 advertiser_address=adva, 

228 target_address=targeta, 

229 payload=payload, 

230 extended_header=extended_header, 

231 ) 

232 

233 @staticmethod 

234 def _parse_address_to_string(addr_bytes: bytes) -> str: 

235 """Convert 6-byte Bluetooth address to formatted string. 

236 

237 Args: 

238 addr_bytes: 6-byte address in little-endian order 

239 

240 Returns: 

241 Formatted address string (XX:XX:XX:XX:XX:XX) 

242 """ 

243 return ":".join(f"{b:02X}" for b in addr_bytes[::-1]) 

244 

245 @staticmethod 

246 def _parse_cte_info(data: bytes) -> CTEInfo: 

247 """Parse CTE info from raw byte. 

248 

249 Args: 

250 data: 1-byte CTE info 

251 

252 Returns: 

253 Parsed CTEInfo struct 

254 """ 

255 cte_byte = data[0] 

256 cte_time = cte_byte & _ExtendedHeaderBitMasks.CTE_TIME_MASK 

257 cte_type_raw = (cte_byte >> _ExtendedHeaderBitMasks.CTE_TYPE_SHIFT) & _ExtendedHeaderBitMasks.CTE_TYPE_MASK 

258 return CTEInfo(cte_time=cte_time, cte_type=CTEType(cte_type_raw)) 

259 

260 @staticmethod 

261 def _parse_advertising_data_info(data: bytes) -> AdvertisingDataInfo: 

262 """Parse Advertising Data Info (ADI) from raw bytes. 

263 

264 Args: 

265 data: 2-byte ADI field 

266 

267 Returns: 

268 Parsed AdvertisingDataInfo struct 

269 """ 

270 adi_value = DataParser.parse_int16(data, 0, signed=False) 

271 did = adi_value & _ExtendedHeaderBitMasks.ADI_DID_MASK 

272 sid = (adi_value >> _ExtendedHeaderBitMasks.ADI_SID_SHIFT) & _ExtendedHeaderBitMasks.ADI_SID_MASK 

273 return AdvertisingDataInfo(advertising_data_id=did, advertising_set_id=sid) 

274 

275 @staticmethod 

276 def _parse_auxiliary_pointer_struct(data: bytes) -> AuxiliaryPointer: 

277 """Parse Auxiliary Pointer from raw bytes. 

278 

279 Args: 

280 data: 3-byte AuxPtr field 

281 

282 Returns: 

283 Parsed AuxiliaryPointer struct 

284 """ 

285 aux_value = DataParser.parse_int24(data, 0, signed=False) 

286 channel_index = aux_value & _ExtendedHeaderBitMasks.AUX_CHANNEL_MASK 

287 ca = bool((aux_value >> _ExtendedHeaderBitMasks.AUX_CA_SHIFT) & 0x01) 

288 offset_units = (aux_value >> _ExtendedHeaderBitMasks.AUX_OFFSET_UNITS_SHIFT) & 0x01 

289 aux_offset = (aux_value >> _ExtendedHeaderBitMasks.AUX_OFFSET_SHIFT) & _ExtendedHeaderBitMasks.AUX_OFFSET_MASK 

290 aux_phy_raw = (aux_value >> _ExtendedHeaderBitMasks.AUX_PHY_SHIFT) & _ExtendedHeaderBitMasks.AUX_PHY_MASK 

291 aux_phy = PHYType(min(aux_phy_raw, PHYType.LE_CODED)) # Clamp to valid range 

292 return AuxiliaryPointer( 

293 channel_index=channel_index, 

294 ca=ca, 

295 offset_units=offset_units, 

296 aux_offset=aux_offset, 

297 aux_phy=aux_phy, 

298 ) 

299 

300 def _parse_sync_info(self, data: bytes) -> SyncInfo: 

301 """Parse SyncInfo from raw bytes. 

302 

303 Args: 

304 data: 18-byte SyncInfo field 

305 

306 Returns: 

307 Parsed SyncInfo struct 

308 """ 

309 # Bytes 0-1: Sync packet offset and units (13 bits offset + 1 bit units + 2 reserved) 

310 offset_field = DataParser.parse_int16(data, 0, signed=False) 

311 sync_packet_offset = offset_field & _ExtendedHeaderBitMasks.SYNC_OFFSET_MASK 

312 offset_units = (offset_field >> _ExtendedHeaderBitMasks.SYNC_OFFSET_UNITS_SHIFT) & 0x01 

313 

314 # Bytes 2-3: Interval 

315 interval = DataParser.parse_int16(data, 2, signed=False) 

316 

317 # Bytes 4-8: Channel map (37 bits, stored in 5 bytes) 

318 # Note: 40-bit field exceeds DataParser's 32-bit limit, use int.from_bytes directly 

319 channel_map = int.from_bytes(data[4:9], byteorder="little") & _ExtendedHeaderBitMasks.SYNC_CHANNEL_MAP_MASK 

320 

321 # Byte 9: SCA (3 bits) and other fields 

322 sca_byte = DataParser.parse_int8(data, 9, signed=False) 

323 sleep_clock_accuracy = ( 

324 sca_byte >> _ExtendedHeaderBitMasks.SYNC_SCA_SHIFT 

325 ) & _ExtendedHeaderBitMasks.SYNC_SCA_MASK 

326 

327 # Bytes 10-15: Access Address (we interpret this as advertiser address) 

328 advertising_address = self._parse_address_to_string(data[10:16]) 

329 advertising_address_type = (sca_byte >> _ExtendedHeaderBitMasks.SYNC_ADDR_TYPE_SHIFT) & 0x01 

330 

331 # Bytes 16-17: Sync counter/CRC init 

332 sync_counter = DataParser.parse_int16(data, 16, signed=False) 

333 

334 return SyncInfo( 

335 sync_packet_offset=sync_packet_offset, 

336 offset_units=offset_units, 

337 interval=interval, 

338 channel_map=channel_map, 

339 sleep_clock_accuracy=sleep_clock_accuracy, 

340 advertising_address=advertising_address, 

341 advertising_address_type=advertising_address_type, 

342 sync_counter=sync_counter, 

343 ) 

344 

345 def _parse_extended_header(self, data: bytes) -> BLEExtendedHeader | None: 

346 """Parse extended header from PDU data. 

347 

348 Args: 

349 data: Extended header data 

350 

351 Returns: 

352 Parsed BLEExtendedHeader or None if invalid 

353 

354 """ 

355 # pylint: disable=too-many-return-statements,too-many-branches 

356 if len(data) < 1: 

357 return None 

358 

359 extended_header_length = data[0] 

360 

361 if len(data) < extended_header_length + 1: 

362 return None 

363 

364 adv_mode = data[1] 

365 offset = PDULayout.ADV_ADDR_OFFSET # Start after length and mode bytes 

366 

367 extended_advertiser_address = "" 

368 extended_target_address = "" 

369 cte_info: CTEInfo | None = None 

370 advertising_data_info: AdvertisingDataInfo | None = None 

371 auxiliary_pointer: AuxiliaryPointer | None = None 

372 sync_info: SyncInfo | None = None 

373 tx_power: int | None = None 

374 additional_controller_advertising_data = b"" 

375 

376 # Check ADV_ADDR flag 

377 if adv_mode & ExtendedHeaderFlags.ADV_ADDR: 

378 if offset + PDULayout.BLE_ADDR > len(data): 

379 return None 

380 extended_advertiser_address = self._parse_address_to_string(data[offset : offset + PDULayout.BLE_ADDR]) 

381 offset += PDULayout.BLE_ADDR 

382 

383 # Check TARGET_ADDR flag 

384 if adv_mode & ExtendedHeaderFlags.TARGET_ADDR: 

385 if offset + PDULayout.BLE_ADDR > len(data): 

386 return None 

387 extended_target_address = self._parse_address_to_string(data[offset : offset + PDULayout.BLE_ADDR]) 

388 offset += PDULayout.BLE_ADDR 

389 

390 # Check CTE_INFO flag 

391 if adv_mode & ExtendedHeaderFlags.CTE_INFO: 

392 if offset + PDULayout.CTE_INFO > len(data): 

393 return None 

394 cte_info = self._parse_cte_info(data[offset : offset + PDULayout.CTE_INFO]) 

395 offset += PDULayout.CTE_INFO 

396 

397 # Check ADV_DATA_INFO flag 

398 if adv_mode & ExtendedHeaderFlags.ADV_DATA_INFO: 

399 if offset + PDULayout.ADV_DATA_INFO > len(data): 

400 return None 

401 advertising_data_info = self._parse_advertising_data_info(data[offset : offset + PDULayout.ADV_DATA_INFO]) 

402 offset += PDULayout.ADV_DATA_INFO 

403 

404 # Check AUX_PTR flag 

405 if adv_mode & ExtendedHeaderFlags.AUX_PTR: 

406 if offset + PDULayout.AUX_PTR > len(data): 

407 return None 

408 auxiliary_pointer = self._parse_auxiliary_pointer_struct(data[offset : offset + PDULayout.AUX_PTR]) 

409 offset += PDULayout.AUX_PTR 

410 

411 # Check SYNC_INFO flag 

412 if adv_mode & ExtendedHeaderFlags.SYNC_INFO: 

413 if offset + PDULayout.SYNC_INFO > len(data): 

414 return None 

415 sync_info = self._parse_sync_info(data[offset : offset + PDULayout.SYNC_INFO]) 

416 offset += PDULayout.SYNC_INFO 

417 

418 # Check TX_POWER flag 

419 if adv_mode & ExtendedHeaderFlags.TX_POWER: 

420 if offset + PDULayout.TX_POWER > len(data): 

421 return None 

422 tx_power = DataParser.parse_int8(data, offset, signed=True) 

423 offset += PDULayout.TX_POWER 

424 

425 # Check ACAD flag 

426 if adv_mode & ExtendedHeaderFlags.ACAD: 

427 additional_controller_advertising_data = data[offset:] 

428 

429 return BLEExtendedHeader( 

430 extended_header_length=extended_header_length, 

431 adv_mode=adv_mode, 

432 extended_advertiser_address=extended_advertiser_address, 

433 extended_target_address=extended_target_address, 

434 cte_info=cte_info, 

435 advertising_data_info=advertising_data_info, 

436 auxiliary_pointer=auxiliary_pointer, 

437 sync_info=sync_info, 

438 tx_power=tx_power, 

439 additional_controller_advertising_data=additional_controller_advertising_data, 

440 ) 

441 

442 def _parse_auxiliary_packets(self, aux_ptr: AuxiliaryPointer) -> list[BLEAdvertisingPDU]: 

443 """Parse auxiliary packets referenced by auxiliary pointer. 

444 

445 Args: 

446 aux_ptr: Parsed AuxiliaryPointer struct 

447 

448 Returns: 

449 List of auxiliary packets (currently returns empty list) 

450 

451 Note: 

452 This is a placeholder. Full implementation would require 

453 receiving raw PDU data from the auxiliary channel specified 

454 in the AuxiliaryPointer (channel_index, aux_offset, aux_phy). 

455 """ 

456 # Auxiliary packets are received over the air on a different channel 

457 # at a later time. This cannot be parsed from a single PDU capture. 

458 # The AuxiliaryPointer tells us where to listen, not the data itself. 

459 _ = aux_ptr # Acknowledge the parameter for future implementation 

460 return [] 

461 

462 def _parse_legacy_advertising(self, raw_data: bytes) -> AdvertisingData: 

463 """Parse legacy advertising data. 

464 

465 Args: 

466 raw_data: Raw legacy advertising data 

467 

468 Returns: 

469 Parsed AdvertisingData 

470 

471 """ 

472 parsed_data = self._parse_ad_structures(raw_data) 

473 return AdvertisingData( 

474 raw_data=raw_data, 

475 ad_structures=parsed_data, 

476 ) 

477 

478 @staticmethod 

479 def _parse_address_list(ad_data: bytes) -> list[str]: 

480 """Parse list of 6-byte Bluetooth addresses from raw data. 

481 

482 Args: 

483 ad_data: Raw address data (multiple 6-byte addresses) 

484 

485 Returns: 

486 List of formatted address strings (XX:XX:XX:XX:XX:XX) 

487 """ 

488 addresses: list[str] = [] 

489 for j in range(0, len(ad_data), 6): 

490 if j + 5 < len(ad_data): 

491 addr_bytes = ad_data[j : j + 6] 

492 addresses.append(":".join(f"{b:02X}" for b in addr_bytes[::-1])) 

493 return addresses 

494 

495 @staticmethod 

496 def _parse_16bit_uuids(ad_data: bytes) -> list[BluetoothUUID]: 

497 """Parse list of 16-bit service UUIDs from raw data. 

498 

499 Args: 

500 ad_data: Raw UUID data 

501 

502 Returns: 

503 List of BluetoothUUID objects 

504 """ 

505 uuids: list[BluetoothUUID] = [] 

506 for j in range(0, len(ad_data), 2): 

507 if j + 1 < len(ad_data): 

508 uuid_short = DataParser.parse_int16(ad_data, j, signed=False) 

509 uuids.append(BluetoothUUID(uuid_short)) 

510 return uuids 

511 

512 @staticmethod 

513 def _parse_32bit_uuids(ad_data: bytes) -> list[BluetoothUUID]: 

514 """Parse list of 32-bit service UUIDs from raw data. 

515 

516 Args: 

517 ad_data: Raw UUID data 

518 

519 Returns: 

520 List of BluetoothUUID objects 

521 """ 

522 uuids: list[BluetoothUUID] = [] 

523 for j in range(0, len(ad_data), 4): 

524 if j + 3 < len(ad_data): 

525 uuid_32 = DataParser.parse_int32(ad_data, j, signed=False) 

526 uuids.append(BluetoothUUID(uuid_32)) 

527 return uuids 

528 

529 @staticmethod 

530 def _parse_128bit_uuids(ad_data: bytes) -> list[BluetoothUUID]: 

531 """Parse list of 128-bit service UUIDs from raw data. 

532 

533 Args: 

534 ad_data: Raw UUID data 

535 

536 Returns: 

537 List of BluetoothUUID objects 

538 """ 

539 uuids: list[BluetoothUUID] = [] 

540 for j in range(0, len(ad_data), 16): 

541 if j + 15 < len(ad_data): 

542 uuids.append(BluetoothUUID(ad_data[j : j + 16].hex().upper())) 

543 return uuids 

544 

545 def _parse_manufacturer_data(self, ad_data: bytes, parsed: AdvertisingDataStructures) -> None: 

546 """Parse manufacturer-specific data and resolve company name. 

547 

548 Args: 

549 ad_data: Raw manufacturer-specific data bytes (company ID + payload) 

550 parsed: AdvertisingDataStructures object to update 

551 

552 """ 

553 mfr_data = ManufacturerData.from_bytes(ad_data) 

554 parsed.core.manufacturer_data[mfr_data.company.id] = mfr_data 

555 

556 def _handle_core_ad_types(self, ad_type: int, ad_data: bytes, parsed: AdvertisingDataStructures) -> bool: 

557 """Handle core advertising data types (service UUIDs, names, etc). 

558 

559 Returns: 

560 True if ad_type was handled, False otherwise 

561 """ 

562 if ad_type in (ADType.INCOMPLETE_16BIT_SERVICE_UUIDS, ADType.COMPLETE_16BIT_SERVICE_UUIDS): 

563 parsed.core.service_uuids.extend(self._parse_16bit_uuids(ad_data)) 

564 elif ad_type in (ADType.INCOMPLETE_32BIT_SERVICE_UUIDS, ADType.COMPLETE_32BIT_SERVICE_UUIDS): 

565 parsed.core.service_uuids.extend(self._parse_32bit_uuids(ad_data)) 

566 elif ad_type in (ADType.INCOMPLETE_128BIT_SERVICE_UUIDS, ADType.COMPLETE_128BIT_SERVICE_UUIDS): 

567 parsed.core.service_uuids.extend(self._parse_128bit_uuids(ad_data)) 

568 elif ad_type == ADType.SOLICITED_SERVICE_UUIDS_16BIT: 

569 parsed.core.solicited_service_uuids.extend(self._parse_16bit_uuids(ad_data)) 

570 elif ad_type == ADType.SOLICITED_SERVICE_UUIDS_32BIT: 

571 parsed.core.solicited_service_uuids.extend(self._parse_32bit_uuids(ad_data)) 

572 elif ad_type == ADType.SOLICITED_SERVICE_UUIDS_128BIT: 

573 parsed.core.solicited_service_uuids.extend(self._parse_128bit_uuids(ad_data)) 

574 elif ad_type in (ADType.SHORTENED_LOCAL_NAME, ADType.COMPLETE_LOCAL_NAME): 

575 try: 

576 parsed.core.local_name = ad_data.decode("utf-8") 

577 except UnicodeDecodeError: 

578 parsed.core.local_name = ad_data.hex() 

579 elif ad_type == ADType.URI: 

580 parsed.core.uri_data = URIData.from_raw_data(ad_data) 

581 elif ad_type == ADType.SERVICE_DATA_16BIT and len(ad_data) >= SIZE_UINT16: 

582 service_uuid = BluetoothUUID(DataParser.parse_int16(ad_data, 0, signed=False)) 

583 parsed.core.service_data[service_uuid] = ad_data[2:] 

584 if service_uuid not in parsed.core.service_uuids: 

585 parsed.core.service_uuids.append(service_uuid) 

586 elif ad_type == ADType.SERVICE_DATA_32BIT and len(ad_data) >= SIZE_UINT32: 

587 service_uuid = BluetoothUUID(DataParser.parse_int32(ad_data, 0, signed=False)) 

588 parsed.core.service_data[service_uuid] = ad_data[4:] 

589 if service_uuid not in parsed.core.service_uuids: 

590 parsed.core.service_uuids.append(service_uuid) 

591 elif ad_type == ADType.SERVICE_DATA_128BIT and len(ad_data) >= SIZE_UUID128: 

592 service_uuid = BluetoothUUID(ad_data[:16].hex().upper()) 

593 parsed.core.service_data[service_uuid] = ad_data[16:] 

594 if service_uuid not in parsed.core.service_uuids: 

595 parsed.core.service_uuids.append(service_uuid) 

596 else: 

597 return False 

598 return True 

599 

600 def _handle_property_ad_types(self, ad_type: int, ad_data: bytes, parsed: AdvertisingDataStructures) -> bool: 

601 """Handle device property advertising data types (flags, power, appearance, etc). 

602 

603 Returns: 

604 True if ad_type was handled, False otherwise 

605 """ 

606 if ad_type == ADType.FLAGS and len(ad_data) >= 1: 

607 parsed.properties.flags = BLEAdvertisingFlags(ad_data[0]) 

608 elif ad_type == ADType.TX_POWER_LEVEL and len(ad_data) >= 1: 

609 parsed.properties.tx_power = int.from_bytes(ad_data[:1], byteorder="little", signed=True) 

610 elif ad_type == ADType.APPEARANCE and len(ad_data) >= SIZE_UINT16: 

611 raw_value = DataParser.parse_int16(ad_data, 0, signed=False) 

612 appearance_info = appearance_values_registry.get_appearance_info(raw_value) 

613 parsed.properties.appearance = AppearanceData(raw_value=raw_value, info=appearance_info) 

614 elif ad_type == ADType.LE_SUPPORTED_FEATURES: 

615 parsed.properties.le_supported_features = LEFeatures(raw_value=ad_data) 

616 elif ad_type == ADType.LE_ROLE and len(ad_data) >= 1: 

617 parsed.properties.le_role = ad_data[0] 

618 elif ad_type == ADType.CLASS_OF_DEVICE and len(ad_data) >= SIZE_UINT24: 

619 raw_cod = int.from_bytes(ad_data[:3], byteorder="little", signed=False) 

620 parsed.properties.class_of_device = class_of_device_registry.decode_class_of_device(raw_cod) 

621 elif ad_type == ADType.MANUFACTURER_SPECIFIC_DATA and len(ad_data) >= SIZE_UINT16: 

622 self._parse_manufacturer_data(ad_data, parsed) 

623 else: 

624 return False 

625 return True 

626 

627 def _handle_mesh_ad_types(self, ad_type: int, ad_data: bytes, parsed: AdvertisingDataStructures) -> bool: 

628 """Handle mesh networking advertising data types. 

629 

630 Returns: 

631 True if ad_type was handled, False otherwise 

632 """ 

633 if ad_type == ADType.PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION: 

634 parsed.mesh.periodic_advertising_response_timing = ad_data 

635 elif ad_type == ADType.ELECTRONIC_SHELF_LABEL: 

636 parsed.mesh.electronic_shelf_label = ad_data 

637 elif ad_type == ADType.BROADCAST_NAME: 

638 try: 

639 parsed.mesh.broadcast_name = ad_data.decode("utf-8") 

640 except UnicodeDecodeError: 

641 parsed.mesh.broadcast_name = ad_data.hex() 

642 elif ad_type == ADType.BROADCAST_CODE: 

643 parsed.mesh.broadcast_code = ad_data 

644 elif ad_type == ADType.BIGINFO: 

645 parsed.mesh.biginfo = ad_data 

646 elif ad_type == ADType.MESH_MESSAGE: 

647 parsed.mesh.mesh_message = MeshMessage.decode(ad_data) 

648 elif ad_type == ADType.MESH_BEACON: 

649 self._parse_mesh_beacon(ad_data, parsed) 

650 elif ad_type == ADType.PB_ADV: 

651 parsed.mesh.provisioning_bearer = ProvisioningBearerData.decode(ad_data) 

652 else: 

653 return False 

654 return True 

655 

656 def _parse_mesh_beacon(self, ad_data: bytes, parsed: AdvertisingDataStructures) -> None: 

657 """Parse mesh beacon data into appropriate typed beacon. 

658 

659 Args: 

660 ad_data: Raw beacon advertisement data 

661 parsed: Advertising data structures to populate 

662 

663 """ 

664 if len(ad_data) < 1: 

665 return 

666 

667 beacon_type = ad_data[0] 

668 beacon_data = ad_data[1:] 

669 

670 if beacon_type == MeshBeaconType.SECURE_NETWORK: 

671 parsed.mesh.secure_network_beacon = SecureNetworkBeacon.decode(beacon_data) 

672 elif beacon_type == MeshBeaconType.UNPROVISIONED_DEVICE: 

673 parsed.mesh.unprovisioned_device_beacon = UnprovisionedDeviceBeacon.decode(beacon_data) 

674 

675 def _handle_security_ad_types(self, ad_type: int, ad_data: bytes, parsed: AdvertisingDataStructures) -> bool: 

676 """Handle security/pairing advertising data types. 

677 

678 Returns: 

679 True if ad_type was handled, False otherwise 

680 """ 

681 if ad_type == ADType.ENCRYPTED_ADVERTISING_DATA: 

682 parsed.security.encrypted_advertising_data = ad_data 

683 elif ad_type == ADType.RESOLVABLE_SET_IDENTIFIER: 

684 parsed.security.resolvable_set_identifier = ad_data 

685 elif ad_type == ADType.SIMPLE_PAIRING_HASH_C: 

686 parsed.oob_security.simple_pairing_hash_c = ad_data 

687 elif ad_type == ADType.SIMPLE_PAIRING_RANDOMIZER_R: 

688 parsed.oob_security.simple_pairing_randomizer_r = ad_data 

689 elif ad_type == ADType.SECURITY_MANAGER_TK_VALUE: 

690 parsed.oob_security.security_manager_tk_value = ad_data 

691 elif ad_type == ADType.SECURITY_MANAGER_OUT_OF_BAND_FLAGS: 

692 parsed.oob_security.security_manager_oob_flags = ad_data 

693 elif ad_type == ADType.SECURE_CONNECTIONS_CONFIRMATION_VALUE: 

694 parsed.oob_security.secure_connections_confirmation = ad_data 

695 elif ad_type == ADType.SECURE_CONNECTIONS_RANDOM_VALUE: 

696 parsed.oob_security.secure_connections_random = ad_data 

697 else: 

698 return False 

699 return True 

700 

701 def _handle_directed_ad_types(self, ad_type: int, ad_data: bytes, parsed: AdvertisingDataStructures) -> bool: 

702 """Handle directed/connection advertising data types. 

703 

704 Returns: 

705 True if ad_type was handled, False otherwise 

706 """ 

707 if ad_type == ADType.PUBLIC_TARGET_ADDRESS: 

708 parsed.directed.public_target_address.extend(self._parse_address_list(ad_data)) 

709 elif ad_type == ADType.RANDOM_TARGET_ADDRESS: 

710 parsed.directed.random_target_address.extend(self._parse_address_list(ad_data)) 

711 elif ad_type == ADType.ADVERTISING_INTERVAL and len(ad_data) >= SIZE_UINT16: 

712 parsed.directed.advertising_interval = DataParser.parse_int16(ad_data, 0, signed=False) 

713 elif ad_type == ADType.ADVERTISING_INTERVAL_LONG and len(ad_data) >= SIZE_UINT24: 

714 parsed.directed.advertising_interval_long = int.from_bytes(ad_data[:3], byteorder="little", signed=False) 

715 elif ad_type == ADType.LE_BLUETOOTH_DEVICE_ADDRESS and len(ad_data) >= SIZE_UINT48: 

716 addr_bytes = ad_data[:6] 

717 parsed.directed.le_bluetooth_device_address = ":".join(f"{b:02X}" for b in addr_bytes[::-1]) 

718 elif ad_type == ADType.SLAVE_CONNECTION_INTERVAL_RANGE and len(ad_data) >= SIZE_UINT32: 

719 min_interval = DataParser.parse_int16(ad_data, 0, signed=False) 

720 max_interval = DataParser.parse_int16(ad_data, 2, signed=False) 

721 parsed.directed.peripheral_connection_interval_range = ConnectionIntervalRange( 

722 min_interval=min_interval, 

723 max_interval=max_interval, 

724 ) 

725 else: 

726 return False 

727 return True 

728 

729 def _handle_location_ad_types(self, ad_type: int, ad_data: bytes, parsed: AdvertisingDataStructures) -> bool: 

730 """Handle location/positioning advertising data types. 

731 

732 Returns: 

733 True if ad_type was handled, False otherwise 

734 """ 

735 if ad_type == ADType.INDOOR_POSITIONING: 

736 parsed.location.indoor_positioning = IndoorPositioningData.decode(ad_data) 

737 elif ad_type == ADType.TRANSPORT_DISCOVERY_DATA: 

738 parsed.location.transport_discovery_data = TransportDiscoveryData.decode(ad_data) 

739 elif ad_type == ADType.THREE_D_INFORMATION_DATA: 

740 parsed.location.three_d_information = ThreeDInformationData.decode(ad_data) 

741 elif ad_type == ADType.CHANNEL_MAP_UPDATE_INDICATION: 

742 parsed.location.channel_map_update_indication = ChannelMapUpdateIndication.decode(ad_data) 

743 else: 

744 return False 

745 return True 

746 

747 def _parse_ad_structures(self, data: bytes) -> AdvertisingDataStructures: 

748 """Parse advertising data structures from raw bytes. 

749 

750 Args: 

751 data: Raw advertising data payload 

752 

753 Returns: 

754 AdvertisingDataStructures object with extracted data 

755 

756 """ 

757 parsed = AdvertisingDataStructures() 

758 

759 i = 0 

760 while i < len(data): 

761 if i + 1 >= len(data): 

762 break 

763 

764 length = data[i] 

765 if length == 0 or i + length + 1 > len(data): 

766 break 

767 

768 ad_type = data[i + 1] 

769 ad_data = data[i + 2 : i + length + 1] 

770 

771 if not ad_types_registry.is_known_ad_type(ad_type): 

772 logger.warning("Unknown AD type encountered: 0x%02X", ad_type) 

773 

774 # Dispatch to category handlers 

775 self._handle_core_ad_types(ad_type, ad_data, parsed) or self._handle_property_ad_types( 

776 ad_type, ad_data, parsed 

777 ) or self._handle_mesh_ad_types(ad_type, ad_data, parsed) or self._handle_security_ad_types( 

778 ad_type, ad_data, parsed 

779 ) or self._handle_directed_ad_types(ad_type, ad_data, parsed) or self._handle_location_ad_types( 

780 ad_type, ad_data, parsed 

781 ) 

782 

783 i += length + 1 

784 

785 return parsed