Coverage for src / bluetooth_sig / advertising / pdu_parser.py: 50%

253 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 20:14 +0000

1"""BLE Advertising PDU parser. 

2 

3This module provides a parser for BLE advertising PDU data packets, 

4extracting device information, manufacturer data, and service UUIDs 

5from both legacy and extended advertising formats. 

6 

7This is the low-level BLE spec parser. For interpreting vendor-specific 

8sensor data (e.g., Xiaomi, RuuviTag, BTHome), see the AdvertisingDataInterpreter 

9base class. 

10""" 

11 

12from __future__ import annotations 

13 

14import logging 

15 

16from bluetooth_sig.gatt.characteristics.utils import DataParser 

17from bluetooth_sig.registry.company_identifiers import company_identifiers_registry 

18from bluetooth_sig.registry.core.ad_types import ad_types_registry 

19from bluetooth_sig.registry.core.appearance_values import appearance_values_registry 

20from bluetooth_sig.registry.core.class_of_device import class_of_device_registry 

21from bluetooth_sig.types import ( 

22 AdvertisingData, 

23 AdvertisingDataStructures, 

24 BLEAdvertisingFlags, 

25 BLEAdvertisingPDU, 

26 BLEExtendedHeader, 

27 ExtendedAdvertisingData, 

28 PDUHeaderFlags, 

29 PDULayout, 

30 PDUType, 

31) 

32from bluetooth_sig.types.ad_types_constants import ADType 

33from bluetooth_sig.types.appearance import AppearanceData 

34from bluetooth_sig.types.uri import URIData 

35from bluetooth_sig.types.uuid import BluetoothUUID 

36 

37logger = logging.getLogger(__name__) 

38 

39 

40class AdvertisingPDUParser: # pylint: disable=too-few-public-methods 

41 """Parser for BLE advertising PDU data packets. 

42 

43 Parses raw BLE advertising PDU bytes into structured AdvertisingData, 

44 handling both legacy and extended advertising formats. 

45 

46 This is the low-level parsing layer that extracts: 

47 - Manufacturer data (company_id → payload) 

48 - Service data (UUID → payload) 

49 - Flags, local name, appearance, TX power 

50 - Extended advertising fields (BLE 5.0+) 

51 

52 For vendor-specific interpretation (e.g., BTHome sensor values), 

53 use AdvertisingDataInterpreter subclasses. 

54 """ 

55 

56 def parse_advertising_data(self, raw_data: bytes) -> AdvertisingData: 

57 """Parse raw advertising data and return structured information. 

58 

59 Args: 

60 raw_data: Raw bytes from BLE advertising packet 

61 

62 Returns: 

63 AdvertisingData with parsed information 

64 

65 """ 

66 if self._is_extended_advertising_pdu(raw_data): 

67 return self._parse_extended_advertising(raw_data) 

68 return self._parse_legacy_advertising(raw_data) 

69 

70 def _is_extended_advertising_pdu(self, data: bytes) -> bool: 

71 """Check if the advertising data is an extended advertising PDU. 

72 

73 Args: 

74 data: Raw advertising data bytes 

75 

76 Returns: 

77 True if extended advertising PDU, False otherwise 

78 

79 """ 

80 if len(data) < PDULayout.PDU_HEADER: 

81 return False 

82 

83 pdu_header = data[0] 

84 pdu_type = pdu_header & PDUHeaderFlags.TYPE_MASK 

85 

86 return pdu_type in (PDUType.ADV_EXT_IND.value, PDUType.ADV_AUX_IND.value) 

87 

88 def _parse_extended_advertising(self, raw_data: bytes) -> AdvertisingData: 

89 """Parse extended advertising data. 

90 

91 Args: 

92 raw_data: Raw extended advertising data 

93 

94 Returns: 

95 Parsed AdvertisingData 

96 

97 """ 

98 if len(raw_data) < PDULayout.MIN_EXTENDED_PDU: 

99 return self._parse_legacy_advertising(raw_data) 

100 

101 pdu = self._parse_extended_pdu(raw_data) 

102 

103 if not pdu: 

104 return self._parse_legacy_advertising(raw_data) 

105 

106 parsed_data = AdvertisingDataStructures() 

107 

108 if pdu.payload: 

109 parsed_data = self._parse_ad_structures(pdu.payload) 

110 

111 auxiliary_packets: list[BLEAdvertisingPDU] = [] 

112 if pdu.extended_header and pdu.extended_header.auxiliary_pointer: 

113 aux_packets = self._parse_auxiliary_packets(pdu.extended_header.auxiliary_pointer) 

114 auxiliary_packets.extend(aux_packets) 

115 

116 return AdvertisingData( 

117 raw_data=raw_data, 

118 ad_structures=parsed_data, 

119 extended=ExtendedAdvertisingData( 

120 extended_payload=pdu.payload, 

121 auxiliary_packets=auxiliary_packets, 

122 ), 

123 ) 

124 

125 def _parse_extended_pdu(self, data: bytes) -> BLEAdvertisingPDU | None: 

126 """Parse extended PDU header and payload. 

127 

128 Args: 

129 data: Raw PDU data 

130 

131 Returns: 

132 Parsed BLEAdvertisingPDU or None if invalid 

133 

134 """ 

135 if len(data) < PDULayout.MIN_EXTENDED_PDU: 

136 return None 

137 

138 header = int.from_bytes(data[0 : PDULayout.PDU_HEADER], byteorder="little") 

139 pdu_type = header & PDUHeaderFlags.TYPE_MASK 

140 tx_add = bool(header & PDUHeaderFlags.TX_ADD_MASK) 

141 rx_add = bool(header & PDUHeaderFlags.RX_ADD_MASK) 

142 

143 length = data[PDULayout.PDU_LENGTH_OFFSET] 

144 

145 if len(data) < PDULayout.MIN_EXTENDED_PDU + length: 

146 return None 

147 

148 extended_header_start = PDULayout.EXTENDED_HEADER_START 

149 

150 extended_header = self._parse_extended_header(data[extended_header_start:]) 

151 

152 if not extended_header: 

153 return None 

154 

155 payload_start = extended_header_start + extended_header.extended_header_length + PDULayout.EXT_HEADER_LENGTH 

156 payload_length = length - (extended_header.extended_header_length + PDULayout.EXT_HEADER_LENGTH) 

157 

158 if payload_start + payload_length > len(data): 

159 return None 

160 

161 payload = data[payload_start : payload_start + payload_length] 

162 

163 adva = extended_header.extended_advertiser_address 

164 targeta = extended_header.extended_target_address 

165 

166 return BLEAdvertisingPDU( 

167 pdu_type=PDUType(pdu_type), 

168 tx_add=tx_add, 

169 rx_add=rx_add, 

170 length=length, 

171 advertiser_address=adva, 

172 target_address=targeta, 

173 payload=payload, 

174 extended_header=extended_header, 

175 ) 

176 

177 def _parse_extended_header(self, data: bytes) -> BLEExtendedHeader | None: 

178 """Parse extended header from PDU data. 

179 

180 Args: 

181 data: Extended header data 

182 

183 Returns: 

184 Parsed BLEExtendedHeader or None if invalid 

185 

186 """ 

187 # pylint: disable=too-many-return-statements,too-many-branches 

188 if len(data) < 1: 

189 return None 

190 

191 header = BLEExtendedHeader() 

192 header.extended_header_length = data[0] 

193 

194 if len(data) < header.extended_header_length + 1: 

195 return None 

196 

197 adv_mode = data[1] 

198 header.adv_mode = adv_mode 

199 

200 offset = PDULayout.ADV_ADDR_OFFSET # Start after length and mode bytes 

201 

202 if header.has_extended_advertiser_address: 

203 if offset + PDULayout.BLE_ADDR > len(data): 

204 return None 

205 header.extended_advertiser_address = data[offset : offset + PDULayout.BLE_ADDR] 

206 offset += PDULayout.BLE_ADDR 

207 

208 if header.has_extended_target_address: 

209 if offset + PDULayout.BLE_ADDR > len(data): 

210 return None 

211 header.extended_target_address = data[offset : offset + PDULayout.BLE_ADDR] 

212 offset += PDULayout.BLE_ADDR 

213 

214 if header.has_cte_info: 

215 if offset + PDULayout.CTE_INFO > len(data): 

216 return None 

217 header.cte_info = data[offset : offset + PDULayout.CTE_INFO] 

218 offset += PDULayout.CTE_INFO 

219 

220 if header.has_advertising_data_info: 

221 if offset + PDULayout.ADV_DATA_INFO > len(data): 

222 return None 

223 header.advertising_data_info = data[offset : offset + PDULayout.ADV_DATA_INFO] 

224 offset += PDULayout.ADV_DATA_INFO 

225 

226 if header.has_auxiliary_pointer: 

227 if offset + PDULayout.AUX_PTR > len(data): 

228 return None 

229 header.auxiliary_pointer = data[offset : offset + PDULayout.AUX_PTR] 

230 offset += PDULayout.AUX_PTR 

231 

232 if header.has_sync_info: 

233 if offset + PDULayout.SYNC_INFO > len(data): 

234 return None 

235 header.sync_info = data[offset : offset + PDULayout.SYNC_INFO] 

236 offset += PDULayout.SYNC_INFO 

237 

238 if header.has_tx_power: 

239 if offset + PDULayout.TX_POWER > len(data): 

240 return None 

241 header.tx_power = int.from_bytes( 

242 data[offset : offset + PDULayout.TX_POWER], 

243 byteorder="little", 

244 signed=True, 

245 ) 

246 offset += PDULayout.TX_POWER 

247 

248 if header.has_additional_controller_data: 

249 header.additional_controller_advertising_data = data[offset:] 

250 

251 return header 

252 

253 def _parse_auxiliary_packets(self, aux_ptr: bytes) -> list[BLEAdvertisingPDU]: 

254 """Parse auxiliary packets referenced by auxiliary pointer. 

255 

256 Args: 

257 aux_ptr: Auxiliary pointer data 

258 

259 Returns: 

260 List of auxiliary packets (currently returns empty list) 

261 

262 """ 

263 if len(aux_ptr) != PDULayout.AUX_PTR: 

264 return [] 

265 

266 return [] 

267 

268 def _parse_legacy_advertising(self, raw_data: bytes) -> AdvertisingData: 

269 """Parse legacy advertising data. 

270 

271 Args: 

272 raw_data: Raw legacy advertising data 

273 

274 Returns: 

275 Parsed AdvertisingData 

276 

277 """ 

278 parsed_data = self._parse_ad_structures(raw_data) 

279 return AdvertisingData( 

280 raw_data=raw_data, 

281 ad_structures=parsed_data, 

282 ) 

283 

284 @staticmethod 

285 def _parse_address_list(ad_data: bytes) -> list[str]: 

286 """Parse list of 6-byte Bluetooth addresses from raw data. 

287 

288 Args: 

289 ad_data: Raw address data (multiple 6-byte addresses) 

290 

291 Returns: 

292 List of formatted address strings (XX:XX:XX:XX:XX:XX) 

293 """ 

294 addresses: list[str] = [] 

295 for j in range(0, len(ad_data), 6): 

296 if j + 5 < len(ad_data): 

297 addr_bytes = ad_data[j : j + 6] 

298 addresses.append(":".join(f"{b:02X}" for b in addr_bytes[::-1])) 

299 return addresses 

300 

301 @staticmethod 

302 def _parse_16bit_uuids(ad_data: bytes) -> list[BluetoothUUID]: 

303 """Parse list of 16-bit service UUIDs from raw data. 

304 

305 Args: 

306 ad_data: Raw UUID data 

307 

308 Returns: 

309 List of BluetoothUUID objects 

310 """ 

311 uuids: list[BluetoothUUID] = [] 

312 for j in range(0, len(ad_data), 2): 

313 if j + 1 < len(ad_data): 

314 uuid_short = DataParser.parse_int16(ad_data, j, signed=False) 

315 uuids.append(BluetoothUUID(uuid_short)) 

316 return uuids 

317 

318 @staticmethod 

319 def _parse_128bit_uuids(ad_data: bytes) -> list[BluetoothUUID]: 

320 """Parse list of 128-bit service UUIDs from raw data. 

321 

322 Args: 

323 ad_data: Raw UUID data 

324 

325 Returns: 

326 List of BluetoothUUID objects 

327 """ 

328 uuids: list[BluetoothUUID] = [] 

329 for j in range(0, len(ad_data), 16): 

330 if j + 15 < len(ad_data): 

331 uuids.append(BluetoothUUID(ad_data[j : j + 16].hex().upper())) 

332 return uuids 

333 

334 def _parse_manufacturer_data(self, ad_data: bytes, parsed: AdvertisingDataStructures) -> None: 

335 """Parse manufacturer-specific data and resolve company name. 

336 

337 Args: 

338 ad_data: Raw manufacturer-specific data bytes 

339 parsed: AdvertisingDataStructures object to update 

340 

341 """ 

342 company_id = DataParser.parse_int16(ad_data, 0, signed=False) 

343 parsed.core.manufacturer_data[company_id] = ad_data[2:] 

344 # Resolve company name from registry 

345 company_name = company_identifiers_registry.get_company_name(company_id) 

346 if company_name is not None: 

347 parsed.core.manufacturer_names[company_id] = company_name 

348 

349 def _parse_ad_structures(self, data: bytes) -> AdvertisingDataStructures: 

350 """Parse advertising data structures from raw bytes. 

351 

352 Args: 

353 data: Raw advertising data payload 

354 

355 Returns: 

356 AdvertisingDataStructures object with extracted data 

357 

358 """ 

359 # pylint: disable=too-many-branches,too-many-statements 

360 parsed = AdvertisingDataStructures() 

361 

362 i = 0 

363 while i < len(data): 

364 if i + 1 >= len(data): 

365 break 

366 

367 length = data[i] 

368 if length == 0 or i + length + 1 > len(data): 

369 break 

370 

371 ad_type = data[i + 1] 

372 ad_data = data[i + 2 : i + length + 1] 

373 

374 # Warn about unknown AD types 

375 if not ad_types_registry.is_known_ad_type(ad_type): 

376 logger.warning("Unknown AD type encountered: 0x%02X", ad_type) 

377 

378 if ad_type == ADType.FLAGS and len(ad_data) >= 1: 

379 parsed.properties.flags = BLEAdvertisingFlags(ad_data[0]) 

380 elif ad_type in ( 

381 ADType.INCOMPLETE_16BIT_SERVICE_UUIDS, 

382 ADType.COMPLETE_16BIT_SERVICE_UUIDS, 

383 ): 

384 parsed.core.service_uuids.extend(self._parse_16bit_uuids(ad_data)) 

385 elif ad_type in ( 

386 ADType.INCOMPLETE_128BIT_SERVICE_UUIDS, 

387 ADType.COMPLETE_128BIT_SERVICE_UUIDS, 

388 ): 

389 parsed.core.service_uuids.extend(self._parse_128bit_uuids(ad_data)) 

390 elif ad_type in (ADType.SHORTENED_LOCAL_NAME, ADType.COMPLETE_LOCAL_NAME): 

391 try: 

392 parsed.core.local_name = ad_data.decode("utf-8") 

393 except UnicodeDecodeError: 

394 parsed.core.local_name = ad_data.hex() 

395 elif ad_type == ADType.TX_POWER_LEVEL and len(ad_data) >= 1: 

396 parsed.properties.tx_power = int.from_bytes(ad_data[:1], byteorder="little", signed=True) 

397 elif ad_type == ADType.MANUFACTURER_SPECIFIC_DATA and len(ad_data) >= 2: 

398 self._parse_manufacturer_data(ad_data, parsed) 

399 elif ad_type == ADType.APPEARANCE and len(ad_data) >= 2: 

400 raw_value = DataParser.parse_int16(ad_data, 0, signed=False) 

401 appearance_info = appearance_values_registry.get_appearance_info(raw_value) 

402 parsed.properties.appearance = AppearanceData(raw_value=raw_value, info=appearance_info) 

403 elif ad_type == ADType.SERVICE_DATA_16BIT and len(ad_data) >= 2: 

404 service_uuid = BluetoothUUID(DataParser.parse_int16(ad_data, 0, signed=False)) 

405 parsed.core.service_data[service_uuid] = ad_data[2:] 

406 if service_uuid not in parsed.core.service_uuids: 

407 parsed.core.service_uuids.append(service_uuid) 

408 elif ad_type == ADType.SERVICE_DATA_128BIT and len(ad_data) >= 16: 

409 service_uuid = BluetoothUUID(ad_data[:16].hex().upper()) 

410 parsed.core.service_data[service_uuid] = ad_data[16:] 

411 if service_uuid not in parsed.core.service_uuids: 

412 parsed.core.service_uuids.append(service_uuid) 

413 elif ad_type == ADType.URI: 

414 parsed.core.uri_data = URIData.from_raw_data(ad_data) 

415 elif ad_type == ADType.INDOOR_POSITIONING: 

416 parsed.location.indoor_positioning = ad_data 

417 elif ad_type == ADType.TRANSPORT_DISCOVERY_DATA: 

418 parsed.location.transport_discovery_data = ad_data 

419 elif ad_type == ADType.LE_SUPPORTED_FEATURES: 

420 parsed.properties.le_supported_features = ad_data 

421 elif ad_type == ADType.ENCRYPTED_ADVERTISING_DATA: 

422 parsed.security.encrypted_advertising_data = ad_data 

423 elif ad_type == ADType.PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION: 

424 parsed.mesh.periodic_advertising_response_timing = ad_data 

425 elif ad_type == ADType.ELECTRONIC_SHELF_LABEL: 

426 parsed.mesh.electronic_shelf_label = ad_data 

427 elif ad_type == ADType.THREE_D_INFORMATION_DATA: 

428 parsed.location.three_d_information = ad_data 

429 elif ad_type == ADType.BROADCAST_NAME: 

430 try: 

431 parsed.mesh.broadcast_name = ad_data.decode("utf-8") 

432 except UnicodeDecodeError: 

433 parsed.mesh.broadcast_name = ad_data.hex() 

434 elif ad_type == ADType.BROADCAST_CODE: 

435 parsed.mesh.broadcast_code = ad_data 

436 elif ad_type == ADType.BIGINFO: 

437 parsed.mesh.biginfo = ad_data 

438 elif ad_type == ADType.MESH_MESSAGE: 

439 parsed.mesh.mesh_message = ad_data 

440 elif ad_type == ADType.MESH_BEACON: 

441 parsed.mesh.mesh_beacon = ad_data 

442 elif ad_type == ADType.PUBLIC_TARGET_ADDRESS: 

443 parsed.directed.public_target_address.extend(self._parse_address_list(ad_data)) 

444 elif ad_type == ADType.RANDOM_TARGET_ADDRESS: 

445 parsed.directed.random_target_address.extend(self._parse_address_list(ad_data)) 

446 elif ad_type == ADType.ADVERTISING_INTERVAL and len(ad_data) >= 2: 

447 parsed.directed.advertising_interval = DataParser.parse_int16(ad_data, 0, signed=False) 

448 elif ad_type == ADType.ADVERTISING_INTERVAL_LONG and len(ad_data) >= 3: 

449 parsed.directed.advertising_interval_long = int.from_bytes( 

450 ad_data[:3], byteorder="little", signed=False 

451 ) 

452 elif ad_type == ADType.LE_BLUETOOTH_DEVICE_ADDRESS and len(ad_data) >= 6: 

453 addr_bytes = ad_data[:6] 

454 parsed.directed.le_bluetooth_device_address = ":".join(f"{b:02X}" for b in addr_bytes[::-1]) 

455 elif ad_type == ADType.LE_ROLE and len(ad_data) >= 1: 

456 parsed.properties.le_role = ad_data[0] 

457 elif ad_type == ADType.CLASS_OF_DEVICE and len(ad_data) >= 3: 

458 raw_cod = int.from_bytes(ad_data[:3], byteorder="little", signed=False) 

459 parsed.properties.class_of_device = class_of_device_registry.decode_class_of_device(raw_cod) 

460 elif ad_type == ADType.SIMPLE_PAIRING_HASH_C: 

461 parsed.oob_security.simple_pairing_hash_c = ad_data 

462 elif ad_type == ADType.SIMPLE_PAIRING_RANDOMIZER_R: 

463 parsed.oob_security.simple_pairing_randomizer_r = ad_data 

464 elif ad_type == ADType.SECURITY_MANAGER_TK_VALUE: 

465 parsed.oob_security.security_manager_tk_value = ad_data 

466 elif ad_type == ADType.SECURITY_MANAGER_OUT_OF_BAND_FLAGS: 

467 parsed.oob_security.security_manager_oob_flags = ad_data 

468 elif ad_type == ADType.SLAVE_CONNECTION_INTERVAL_RANGE: 

469 parsed.directed.peripheral_connection_interval_range = ad_data 

470 elif ad_type == ADType.SECURE_CONNECTIONS_CONFIRMATION_VALUE: 

471 parsed.oob_security.secure_connections_confirmation = ad_data 

472 elif ad_type == ADType.SECURE_CONNECTIONS_RANDOM_VALUE: 

473 parsed.oob_security.secure_connections_random = ad_data 

474 elif ad_type == ADType.CHANNEL_MAP_UPDATE_INDICATION: 

475 parsed.location.channel_map_update_indication = ad_data 

476 elif ad_type == ADType.PB_ADV: 

477 parsed.mesh.pb_adv = ad_data 

478 elif ad_type == ADType.RESOLVABLE_SET_IDENTIFIER: 

479 parsed.security.resolvable_set_identifier = ad_data 

480 

481 i += length + 1 

482 

483 return parsed