Coverage for src / bluetooth_sig / advertising / pdu_parser.py: 50%
253 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 20:14 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 20:14 +0000
1"""BLE Advertising PDU parser.
3This module provides a parser for BLE advertising PDU data packets,
4extracting device information, manufacturer data, and service UUIDs
5from both legacy and extended advertising formats.
7This is the low-level BLE spec parser. For interpreting vendor-specific
8sensor data (e.g., Xiaomi, RuuviTag, BTHome), see the AdvertisingDataInterpreter
9base class.
10"""
12from __future__ import annotations
14import logging
16from bluetooth_sig.gatt.characteristics.utils import DataParser
17from bluetooth_sig.registry.company_identifiers import company_identifiers_registry
18from bluetooth_sig.registry.core.ad_types import ad_types_registry
19from bluetooth_sig.registry.core.appearance_values import appearance_values_registry
20from bluetooth_sig.registry.core.class_of_device import class_of_device_registry
21from bluetooth_sig.types import (
22 AdvertisingData,
23 AdvertisingDataStructures,
24 BLEAdvertisingFlags,
25 BLEAdvertisingPDU,
26 BLEExtendedHeader,
27 ExtendedAdvertisingData,
28 PDUHeaderFlags,
29 PDULayout,
30 PDUType,
31)
32from bluetooth_sig.types.ad_types_constants import ADType
33from bluetooth_sig.types.appearance import AppearanceData
34from bluetooth_sig.types.uri import URIData
35from bluetooth_sig.types.uuid import BluetoothUUID
37logger = logging.getLogger(__name__)
40class AdvertisingPDUParser: # pylint: disable=too-few-public-methods
41 """Parser for BLE advertising PDU data packets.
43 Parses raw BLE advertising PDU bytes into structured AdvertisingData,
44 handling both legacy and extended advertising formats.
46 This is the low-level parsing layer that extracts:
47 - Manufacturer data (company_id → payload)
48 - Service data (UUID → payload)
49 - Flags, local name, appearance, TX power
50 - Extended advertising fields (BLE 5.0+)
52 For vendor-specific interpretation (e.g., BTHome sensor values),
53 use AdvertisingDataInterpreter subclasses.
54 """
56 def parse_advertising_data(self, raw_data: bytes) -> AdvertisingData:
57 """Parse raw advertising data and return structured information.
59 Args:
60 raw_data: Raw bytes from BLE advertising packet
62 Returns:
63 AdvertisingData with parsed information
65 """
66 if self._is_extended_advertising_pdu(raw_data):
67 return self._parse_extended_advertising(raw_data)
68 return self._parse_legacy_advertising(raw_data)
70 def _is_extended_advertising_pdu(self, data: bytes) -> bool:
71 """Check if the advertising data is an extended advertising PDU.
73 Args:
74 data: Raw advertising data bytes
76 Returns:
77 True if extended advertising PDU, False otherwise
79 """
80 if len(data) < PDULayout.PDU_HEADER:
81 return False
83 pdu_header = data[0]
84 pdu_type = pdu_header & PDUHeaderFlags.TYPE_MASK
86 return pdu_type in (PDUType.ADV_EXT_IND.value, PDUType.ADV_AUX_IND.value)
88 def _parse_extended_advertising(self, raw_data: bytes) -> AdvertisingData:
89 """Parse extended advertising data.
91 Args:
92 raw_data: Raw extended advertising data
94 Returns:
95 Parsed AdvertisingData
97 """
98 if len(raw_data) < PDULayout.MIN_EXTENDED_PDU:
99 return self._parse_legacy_advertising(raw_data)
101 pdu = self._parse_extended_pdu(raw_data)
103 if not pdu:
104 return self._parse_legacy_advertising(raw_data)
106 parsed_data = AdvertisingDataStructures()
108 if pdu.payload:
109 parsed_data = self._parse_ad_structures(pdu.payload)
111 auxiliary_packets: list[BLEAdvertisingPDU] = []
112 if pdu.extended_header and pdu.extended_header.auxiliary_pointer:
113 aux_packets = self._parse_auxiliary_packets(pdu.extended_header.auxiliary_pointer)
114 auxiliary_packets.extend(aux_packets)
116 return AdvertisingData(
117 raw_data=raw_data,
118 ad_structures=parsed_data,
119 extended=ExtendedAdvertisingData(
120 extended_payload=pdu.payload,
121 auxiliary_packets=auxiliary_packets,
122 ),
123 )
125 def _parse_extended_pdu(self, data: bytes) -> BLEAdvertisingPDU | None:
126 """Parse extended PDU header and payload.
128 Args:
129 data: Raw PDU data
131 Returns:
132 Parsed BLEAdvertisingPDU or None if invalid
134 """
135 if len(data) < PDULayout.MIN_EXTENDED_PDU:
136 return None
138 header = int.from_bytes(data[0 : PDULayout.PDU_HEADER], byteorder="little")
139 pdu_type = header & PDUHeaderFlags.TYPE_MASK
140 tx_add = bool(header & PDUHeaderFlags.TX_ADD_MASK)
141 rx_add = bool(header & PDUHeaderFlags.RX_ADD_MASK)
143 length = data[PDULayout.PDU_LENGTH_OFFSET]
145 if len(data) < PDULayout.MIN_EXTENDED_PDU + length:
146 return None
148 extended_header_start = PDULayout.EXTENDED_HEADER_START
150 extended_header = self._parse_extended_header(data[extended_header_start:])
152 if not extended_header:
153 return None
155 payload_start = extended_header_start + extended_header.extended_header_length + PDULayout.EXT_HEADER_LENGTH
156 payload_length = length - (extended_header.extended_header_length + PDULayout.EXT_HEADER_LENGTH)
158 if payload_start + payload_length > len(data):
159 return None
161 payload = data[payload_start : payload_start + payload_length]
163 adva = extended_header.extended_advertiser_address
164 targeta = extended_header.extended_target_address
166 return BLEAdvertisingPDU(
167 pdu_type=PDUType(pdu_type),
168 tx_add=tx_add,
169 rx_add=rx_add,
170 length=length,
171 advertiser_address=adva,
172 target_address=targeta,
173 payload=payload,
174 extended_header=extended_header,
175 )
177 def _parse_extended_header(self, data: bytes) -> BLEExtendedHeader | None:
178 """Parse extended header from PDU data.
180 Args:
181 data: Extended header data
183 Returns:
184 Parsed BLEExtendedHeader or None if invalid
186 """
187 # pylint: disable=too-many-return-statements,too-many-branches
188 if len(data) < 1:
189 return None
191 header = BLEExtendedHeader()
192 header.extended_header_length = data[0]
194 if len(data) < header.extended_header_length + 1:
195 return None
197 adv_mode = data[1]
198 header.adv_mode = adv_mode
200 offset = PDULayout.ADV_ADDR_OFFSET # Start after length and mode bytes
202 if header.has_extended_advertiser_address:
203 if offset + PDULayout.BLE_ADDR > len(data):
204 return None
205 header.extended_advertiser_address = data[offset : offset + PDULayout.BLE_ADDR]
206 offset += PDULayout.BLE_ADDR
208 if header.has_extended_target_address:
209 if offset + PDULayout.BLE_ADDR > len(data):
210 return None
211 header.extended_target_address = data[offset : offset + PDULayout.BLE_ADDR]
212 offset += PDULayout.BLE_ADDR
214 if header.has_cte_info:
215 if offset + PDULayout.CTE_INFO > len(data):
216 return None
217 header.cte_info = data[offset : offset + PDULayout.CTE_INFO]
218 offset += PDULayout.CTE_INFO
220 if header.has_advertising_data_info:
221 if offset + PDULayout.ADV_DATA_INFO > len(data):
222 return None
223 header.advertising_data_info = data[offset : offset + PDULayout.ADV_DATA_INFO]
224 offset += PDULayout.ADV_DATA_INFO
226 if header.has_auxiliary_pointer:
227 if offset + PDULayout.AUX_PTR > len(data):
228 return None
229 header.auxiliary_pointer = data[offset : offset + PDULayout.AUX_PTR]
230 offset += PDULayout.AUX_PTR
232 if header.has_sync_info:
233 if offset + PDULayout.SYNC_INFO > len(data):
234 return None
235 header.sync_info = data[offset : offset + PDULayout.SYNC_INFO]
236 offset += PDULayout.SYNC_INFO
238 if header.has_tx_power:
239 if offset + PDULayout.TX_POWER > len(data):
240 return None
241 header.tx_power = int.from_bytes(
242 data[offset : offset + PDULayout.TX_POWER],
243 byteorder="little",
244 signed=True,
245 )
246 offset += PDULayout.TX_POWER
248 if header.has_additional_controller_data:
249 header.additional_controller_advertising_data = data[offset:]
251 return header
253 def _parse_auxiliary_packets(self, aux_ptr: bytes) -> list[BLEAdvertisingPDU]:
254 """Parse auxiliary packets referenced by auxiliary pointer.
256 Args:
257 aux_ptr: Auxiliary pointer data
259 Returns:
260 List of auxiliary packets (currently returns empty list)
262 """
263 if len(aux_ptr) != PDULayout.AUX_PTR:
264 return []
266 return []
268 def _parse_legacy_advertising(self, raw_data: bytes) -> AdvertisingData:
269 """Parse legacy advertising data.
271 Args:
272 raw_data: Raw legacy advertising data
274 Returns:
275 Parsed AdvertisingData
277 """
278 parsed_data = self._parse_ad_structures(raw_data)
279 return AdvertisingData(
280 raw_data=raw_data,
281 ad_structures=parsed_data,
282 )
284 @staticmethod
285 def _parse_address_list(ad_data: bytes) -> list[str]:
286 """Parse list of 6-byte Bluetooth addresses from raw data.
288 Args:
289 ad_data: Raw address data (multiple 6-byte addresses)
291 Returns:
292 List of formatted address strings (XX:XX:XX:XX:XX:XX)
293 """
294 addresses: list[str] = []
295 for j in range(0, len(ad_data), 6):
296 if j + 5 < len(ad_data):
297 addr_bytes = ad_data[j : j + 6]
298 addresses.append(":".join(f"{b:02X}" for b in addr_bytes[::-1]))
299 return addresses
301 @staticmethod
302 def _parse_16bit_uuids(ad_data: bytes) -> list[BluetoothUUID]:
303 """Parse list of 16-bit service UUIDs from raw data.
305 Args:
306 ad_data: Raw UUID data
308 Returns:
309 List of BluetoothUUID objects
310 """
311 uuids: list[BluetoothUUID] = []
312 for j in range(0, len(ad_data), 2):
313 if j + 1 < len(ad_data):
314 uuid_short = DataParser.parse_int16(ad_data, j, signed=False)
315 uuids.append(BluetoothUUID(uuid_short))
316 return uuids
318 @staticmethod
319 def _parse_128bit_uuids(ad_data: bytes) -> list[BluetoothUUID]:
320 """Parse list of 128-bit service UUIDs from raw data.
322 Args:
323 ad_data: Raw UUID data
325 Returns:
326 List of BluetoothUUID objects
327 """
328 uuids: list[BluetoothUUID] = []
329 for j in range(0, len(ad_data), 16):
330 if j + 15 < len(ad_data):
331 uuids.append(BluetoothUUID(ad_data[j : j + 16].hex().upper()))
332 return uuids
334 def _parse_manufacturer_data(self, ad_data: bytes, parsed: AdvertisingDataStructures) -> None:
335 """Parse manufacturer-specific data and resolve company name.
337 Args:
338 ad_data: Raw manufacturer-specific data bytes
339 parsed: AdvertisingDataStructures object to update
341 """
342 company_id = DataParser.parse_int16(ad_data, 0, signed=False)
343 parsed.core.manufacturer_data[company_id] = ad_data[2:]
344 # Resolve company name from registry
345 company_name = company_identifiers_registry.get_company_name(company_id)
346 if company_name is not None:
347 parsed.core.manufacturer_names[company_id] = company_name
349 def _parse_ad_structures(self, data: bytes) -> AdvertisingDataStructures:
350 """Parse advertising data structures from raw bytes.
352 Args:
353 data: Raw advertising data payload
355 Returns:
356 AdvertisingDataStructures object with extracted data
358 """
359 # pylint: disable=too-many-branches,too-many-statements
360 parsed = AdvertisingDataStructures()
362 i = 0
363 while i < len(data):
364 if i + 1 >= len(data):
365 break
367 length = data[i]
368 if length == 0 or i + length + 1 > len(data):
369 break
371 ad_type = data[i + 1]
372 ad_data = data[i + 2 : i + length + 1]
374 # Warn about unknown AD types
375 if not ad_types_registry.is_known_ad_type(ad_type):
376 logger.warning("Unknown AD type encountered: 0x%02X", ad_type)
378 if ad_type == ADType.FLAGS and len(ad_data) >= 1:
379 parsed.properties.flags = BLEAdvertisingFlags(ad_data[0])
380 elif ad_type in (
381 ADType.INCOMPLETE_16BIT_SERVICE_UUIDS,
382 ADType.COMPLETE_16BIT_SERVICE_UUIDS,
383 ):
384 parsed.core.service_uuids.extend(self._parse_16bit_uuids(ad_data))
385 elif ad_type in (
386 ADType.INCOMPLETE_128BIT_SERVICE_UUIDS,
387 ADType.COMPLETE_128BIT_SERVICE_UUIDS,
388 ):
389 parsed.core.service_uuids.extend(self._parse_128bit_uuids(ad_data))
390 elif ad_type in (ADType.SHORTENED_LOCAL_NAME, ADType.COMPLETE_LOCAL_NAME):
391 try:
392 parsed.core.local_name = ad_data.decode("utf-8")
393 except UnicodeDecodeError:
394 parsed.core.local_name = ad_data.hex()
395 elif ad_type == ADType.TX_POWER_LEVEL and len(ad_data) >= 1:
396 parsed.properties.tx_power = int.from_bytes(ad_data[:1], byteorder="little", signed=True)
397 elif ad_type == ADType.MANUFACTURER_SPECIFIC_DATA and len(ad_data) >= 2:
398 self._parse_manufacturer_data(ad_data, parsed)
399 elif ad_type == ADType.APPEARANCE and len(ad_data) >= 2:
400 raw_value = DataParser.parse_int16(ad_data, 0, signed=False)
401 appearance_info = appearance_values_registry.get_appearance_info(raw_value)
402 parsed.properties.appearance = AppearanceData(raw_value=raw_value, info=appearance_info)
403 elif ad_type == ADType.SERVICE_DATA_16BIT and len(ad_data) >= 2:
404 service_uuid = BluetoothUUID(DataParser.parse_int16(ad_data, 0, signed=False))
405 parsed.core.service_data[service_uuid] = ad_data[2:]
406 if service_uuid not in parsed.core.service_uuids:
407 parsed.core.service_uuids.append(service_uuid)
408 elif ad_type == ADType.SERVICE_DATA_128BIT and len(ad_data) >= 16:
409 service_uuid = BluetoothUUID(ad_data[:16].hex().upper())
410 parsed.core.service_data[service_uuid] = ad_data[16:]
411 if service_uuid not in parsed.core.service_uuids:
412 parsed.core.service_uuids.append(service_uuid)
413 elif ad_type == ADType.URI:
414 parsed.core.uri_data = URIData.from_raw_data(ad_data)
415 elif ad_type == ADType.INDOOR_POSITIONING:
416 parsed.location.indoor_positioning = ad_data
417 elif ad_type == ADType.TRANSPORT_DISCOVERY_DATA:
418 parsed.location.transport_discovery_data = ad_data
419 elif ad_type == ADType.LE_SUPPORTED_FEATURES:
420 parsed.properties.le_supported_features = ad_data
421 elif ad_type == ADType.ENCRYPTED_ADVERTISING_DATA:
422 parsed.security.encrypted_advertising_data = ad_data
423 elif ad_type == ADType.PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION:
424 parsed.mesh.periodic_advertising_response_timing = ad_data
425 elif ad_type == ADType.ELECTRONIC_SHELF_LABEL:
426 parsed.mesh.electronic_shelf_label = ad_data
427 elif ad_type == ADType.THREE_D_INFORMATION_DATA:
428 parsed.location.three_d_information = ad_data
429 elif ad_type == ADType.BROADCAST_NAME:
430 try:
431 parsed.mesh.broadcast_name = ad_data.decode("utf-8")
432 except UnicodeDecodeError:
433 parsed.mesh.broadcast_name = ad_data.hex()
434 elif ad_type == ADType.BROADCAST_CODE:
435 parsed.mesh.broadcast_code = ad_data
436 elif ad_type == ADType.BIGINFO:
437 parsed.mesh.biginfo = ad_data
438 elif ad_type == ADType.MESH_MESSAGE:
439 parsed.mesh.mesh_message = ad_data
440 elif ad_type == ADType.MESH_BEACON:
441 parsed.mesh.mesh_beacon = ad_data
442 elif ad_type == ADType.PUBLIC_TARGET_ADDRESS:
443 parsed.directed.public_target_address.extend(self._parse_address_list(ad_data))
444 elif ad_type == ADType.RANDOM_TARGET_ADDRESS:
445 parsed.directed.random_target_address.extend(self._parse_address_list(ad_data))
446 elif ad_type == ADType.ADVERTISING_INTERVAL and len(ad_data) >= 2:
447 parsed.directed.advertising_interval = DataParser.parse_int16(ad_data, 0, signed=False)
448 elif ad_type == ADType.ADVERTISING_INTERVAL_LONG and len(ad_data) >= 3:
449 parsed.directed.advertising_interval_long = int.from_bytes(
450 ad_data[:3], byteorder="little", signed=False
451 )
452 elif ad_type == ADType.LE_BLUETOOTH_DEVICE_ADDRESS and len(ad_data) >= 6:
453 addr_bytes = ad_data[:6]
454 parsed.directed.le_bluetooth_device_address = ":".join(f"{b:02X}" for b in addr_bytes[::-1])
455 elif ad_type == ADType.LE_ROLE and len(ad_data) >= 1:
456 parsed.properties.le_role = ad_data[0]
457 elif ad_type == ADType.CLASS_OF_DEVICE and len(ad_data) >= 3:
458 raw_cod = int.from_bytes(ad_data[:3], byteorder="little", signed=False)
459 parsed.properties.class_of_device = class_of_device_registry.decode_class_of_device(raw_cod)
460 elif ad_type == ADType.SIMPLE_PAIRING_HASH_C:
461 parsed.oob_security.simple_pairing_hash_c = ad_data
462 elif ad_type == ADType.SIMPLE_PAIRING_RANDOMIZER_R:
463 parsed.oob_security.simple_pairing_randomizer_r = ad_data
464 elif ad_type == ADType.SECURITY_MANAGER_TK_VALUE:
465 parsed.oob_security.security_manager_tk_value = ad_data
466 elif ad_type == ADType.SECURITY_MANAGER_OUT_OF_BAND_FLAGS:
467 parsed.oob_security.security_manager_oob_flags = ad_data
468 elif ad_type == ADType.SLAVE_CONNECTION_INTERVAL_RANGE:
469 parsed.directed.peripheral_connection_interval_range = ad_data
470 elif ad_type == ADType.SECURE_CONNECTIONS_CONFIRMATION_VALUE:
471 parsed.oob_security.secure_connections_confirmation = ad_data
472 elif ad_type == ADType.SECURE_CONNECTIONS_RANDOM_VALUE:
473 parsed.oob_security.secure_connections_random = ad_data
474 elif ad_type == ADType.CHANNEL_MAP_UPDATE_INDICATION:
475 parsed.location.channel_map_update_indication = ad_data
476 elif ad_type == ADType.PB_ADV:
477 parsed.mesh.pb_adv = ad_data
478 elif ad_type == ADType.RESOLVABLE_SET_IDENTIFIER:
479 parsed.security.resolvable_set_identifier = ad_data
481 i += length + 1
483 return parsed