Coverage for src / bluetooth_sig / advertising / pdu_parser.py: 54%
374 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
1"""BLE Advertising PDU parser.
3This module provides a parser for BLE advertising PDU data packets,
4extracting device information, manufacturer data, and service UUIDs
5from both legacy and extended advertising formats.
7This is the low-level BLE spec parser. For interpreting vendor-specific
8sensor data (e.g., Xiaomi, RuuviTag, BTHome), see the AdvertisingDataInterpreter
9base class.
10"""
12from __future__ import annotations
14import logging
16from bluetooth_sig.gatt.characteristics.utils import DataParser
17from bluetooth_sig.gatt.constants import SIZE_UINT16, SIZE_UINT24, SIZE_UINT32, SIZE_UINT48, SIZE_UUID128
18from bluetooth_sig.registry.core.ad_types import ad_types_registry
19from bluetooth_sig.registry.core.appearance_values import appearance_values_registry
20from bluetooth_sig.registry.core.class_of_device import class_of_device_registry
21from bluetooth_sig.types import ManufacturerData
22from bluetooth_sig.types.ad_types_constants import ADType
23from bluetooth_sig.types.advertising.ad_structures import (
24 AdvertisingDataStructures,
25 ConnectionIntervalRange,
26 ExtendedAdvertisingData,
27)
28from bluetooth_sig.types.advertising.channel_map_update import ChannelMapUpdateIndication
29from bluetooth_sig.types.advertising.extended import (
30 AdvertisingDataInfo,
31 AuxiliaryPointer,
32 CTEInfo,
33 CTEType,
34 PHYType,
35 SyncInfo,
36)
37from bluetooth_sig.types.advertising.features import LEFeatures
38from bluetooth_sig.types.advertising.flags import BLEAdvertisingFlags
39from bluetooth_sig.types.advertising.indoor_positioning import IndoorPositioningData
40from bluetooth_sig.types.advertising.pdu import (
41 BLEAdvertisingPDU,
42 BLEExtendedHeader,
43 ExtendedHeaderFlags,
44 PDUHeaderFlags,
45 PDULayout,
46 PDUType,
47)
48from bluetooth_sig.types.advertising.result import AdvertisingData
49from bluetooth_sig.types.advertising.three_d_information import ThreeDInformationData
50from bluetooth_sig.types.advertising.transport_discovery import TransportDiscoveryData
51from bluetooth_sig.types.appearance import AppearanceData
52from bluetooth_sig.types.mesh import (
53 MeshBeaconType,
54 MeshMessage,
55 ProvisioningBearerData,
56 SecureNetworkBeacon,
57 UnprovisionedDeviceBeacon,
58)
59from bluetooth_sig.types.uri import URIData
60from bluetooth_sig.types.uuid import BluetoothUUID
62logger = logging.getLogger(__name__)
65class _ExtendedHeaderBitMasks:
66 """Bit masks for parsing extended header fields (Core Spec Vol 6, Part B)."""
68 # CTE Info field (1 byte)
69 CTE_TIME_MASK: int = 0x1F # Bits 0-4: CTE time (0-19)
70 CTE_TYPE_SHIFT: int = 6
71 CTE_TYPE_MASK: int = 0x03 # Bits 6-7: CTE type
73 # Advertising Data Info (ADI) field (2 bytes)
74 ADI_DID_MASK: int = 0x0FFF # Bits 0-11: Data ID
75 ADI_SID_SHIFT: int = 12
76 ADI_SID_MASK: int = 0x0F # Bits 12-15: Set ID
78 # Auxiliary Pointer field (3 bytes)
79 AUX_CHANNEL_MASK: int = 0x3F # Bits 0-5: Channel index
80 AUX_CA_SHIFT: int = 6
81 AUX_OFFSET_UNITS_SHIFT: int = 7
82 AUX_OFFSET_SHIFT: int = 8
83 AUX_OFFSET_MASK: int = 0x1FFF # Bits 8-20: 13-bit offset
84 AUX_PHY_SHIFT: int = 21
85 AUX_PHY_MASK: int = 0x07 # Bits 21-23: PHY
87 # SyncInfo field (18 bytes)
88 SYNC_OFFSET_MASK: int = 0x1FFF # Bits 0-12: 13-bit offset
89 SYNC_OFFSET_UNITS_SHIFT: int = 13
90 SYNC_CHANNEL_MAP_MASK: int = 0x1FFFFFFFFF # 37-bit channel map
91 SYNC_SCA_SHIFT: int = 5
92 SYNC_SCA_MASK: int = 0x07 # Bits 5-7: SCA
93 SYNC_ADDR_TYPE_SHIFT: int = 4
96class AdvertisingPDUParser: # pylint: disable=too-few-public-methods
97 """Parser for BLE advertising PDU data packets.
99 Parses raw BLE advertising PDU bytes into structured AdvertisingData,
100 handling both legacy and extended advertising formats.
102 This is the low-level parsing layer that extracts:
103 - Manufacturer data (company_id → payload)
104 - Service data (UUID → payload)
105 - Flags, local name, appearance, TX power
106 - Extended advertising fields (BLE 5.0+)
108 For vendor-specific interpretation (e.g., BTHome sensor values),
109 use AdvertisingDataInterpreter subclasses.
110 """
112 def parse_advertising_data(self, raw_data: bytes) -> AdvertisingData:
113 """Parse raw advertising data and return structured information.
115 Args:
116 raw_data: Raw bytes from BLE advertising packet
118 Returns:
119 AdvertisingData with parsed information
121 """
122 if self._is_extended_advertising_pdu(raw_data):
123 return self._parse_extended_advertising(raw_data)
124 return self._parse_legacy_advertising(raw_data)
126 def _is_extended_advertising_pdu(self, data: bytes) -> bool:
127 """Check if the advertising data is an extended advertising PDU.
129 Args:
130 data: Raw advertising data bytes
132 Returns:
133 True if extended advertising PDU, False otherwise
135 """
136 if len(data) < PDULayout.PDU_HEADER:
137 return False
139 pdu_header = data[0]
140 pdu_type = pdu_header & PDUHeaderFlags.TYPE_MASK
142 return pdu_type in (PDUType.ADV_EXT_IND.value, PDUType.ADV_AUX_IND.value)
144 def _parse_extended_advertising(self, raw_data: bytes) -> AdvertisingData:
145 """Parse extended advertising data.
147 Args:
148 raw_data: Raw extended advertising data
150 Returns:
151 Parsed AdvertisingData
153 """
154 if len(raw_data) < PDULayout.MIN_EXTENDED_PDU:
155 return self._parse_legacy_advertising(raw_data)
157 pdu = self._parse_extended_pdu(raw_data)
159 if not pdu:
160 return self._parse_legacy_advertising(raw_data)
162 parsed_data = AdvertisingDataStructures()
164 if pdu.payload:
165 parsed_data = self._parse_ad_structures(pdu.payload)
167 auxiliary_packets: list[BLEAdvertisingPDU] = []
168 if pdu.extended_header and pdu.extended_header.auxiliary_pointer:
169 aux_packets = self._parse_auxiliary_packets(pdu.extended_header.auxiliary_pointer)
170 auxiliary_packets.extend(aux_packets)
172 return AdvertisingData(
173 raw_data=raw_data,
174 ad_structures=parsed_data,
175 extended=ExtendedAdvertisingData(
176 extended_payload=pdu.payload,
177 auxiliary_packets=auxiliary_packets,
178 ),
179 )
181 def _parse_extended_pdu(self, data: bytes) -> BLEAdvertisingPDU | None:
182 """Parse extended PDU header and payload.
184 Args:
185 data: Raw PDU data
187 Returns:
188 Parsed BLEAdvertisingPDU or None if invalid
190 """
191 if len(data) < PDULayout.MIN_EXTENDED_PDU:
192 return None
194 header = DataParser.parse_int16(data, 0, signed=False)
195 pdu_type = header & PDUHeaderFlags.TYPE_MASK
196 tx_add = bool(header & PDUHeaderFlags.TX_ADD_MASK)
197 rx_add = bool(header & PDUHeaderFlags.RX_ADD_MASK)
199 length = data[PDULayout.PDU_LENGTH_OFFSET]
201 if len(data) < PDULayout.MIN_EXTENDED_PDU + length:
202 return None
204 extended_header_start = PDULayout.EXTENDED_HEADER_START
206 extended_header = self._parse_extended_header(data[extended_header_start:])
208 if not extended_header:
209 return None
211 payload_start = extended_header_start + extended_header.extended_header_length + PDULayout.EXT_HEADER_LENGTH
212 payload_length = length - (extended_header.extended_header_length + PDULayout.EXT_HEADER_LENGTH)
214 if payload_start + payload_length > len(data):
215 return None
217 payload = data[payload_start : payload_start + payload_length]
219 adva = extended_header.extended_advertiser_address
220 targeta = extended_header.extended_target_address
222 return BLEAdvertisingPDU(
223 pdu_type=PDUType(pdu_type),
224 tx_add=tx_add,
225 rx_add=rx_add,
226 length=length,
227 advertiser_address=adva,
228 target_address=targeta,
229 payload=payload,
230 extended_header=extended_header,
231 )
233 @staticmethod
234 def _parse_address_to_string(addr_bytes: bytes) -> str:
235 """Convert 6-byte Bluetooth address to formatted string.
237 Args:
238 addr_bytes: 6-byte address in little-endian order
240 Returns:
241 Formatted address string (XX:XX:XX:XX:XX:XX)
242 """
243 return ":".join(f"{b:02X}" for b in addr_bytes[::-1])
245 @staticmethod
246 def _parse_cte_info(data: bytes) -> CTEInfo:
247 """Parse CTE info from raw byte.
249 Args:
250 data: 1-byte CTE info
252 Returns:
253 Parsed CTEInfo struct
254 """
255 cte_byte = data[0]
256 cte_time = cte_byte & _ExtendedHeaderBitMasks.CTE_TIME_MASK
257 cte_type_raw = (cte_byte >> _ExtendedHeaderBitMasks.CTE_TYPE_SHIFT) & _ExtendedHeaderBitMasks.CTE_TYPE_MASK
258 return CTEInfo(cte_time=cte_time, cte_type=CTEType(cte_type_raw))
260 @staticmethod
261 def _parse_advertising_data_info(data: bytes) -> AdvertisingDataInfo:
262 """Parse Advertising Data Info (ADI) from raw bytes.
264 Args:
265 data: 2-byte ADI field
267 Returns:
268 Parsed AdvertisingDataInfo struct
269 """
270 adi_value = DataParser.parse_int16(data, 0, signed=False)
271 did = adi_value & _ExtendedHeaderBitMasks.ADI_DID_MASK
272 sid = (adi_value >> _ExtendedHeaderBitMasks.ADI_SID_SHIFT) & _ExtendedHeaderBitMasks.ADI_SID_MASK
273 return AdvertisingDataInfo(advertising_data_id=did, advertising_set_id=sid)
275 @staticmethod
276 def _parse_auxiliary_pointer_struct(data: bytes) -> AuxiliaryPointer:
277 """Parse Auxiliary Pointer from raw bytes.
279 Args:
280 data: 3-byte AuxPtr field
282 Returns:
283 Parsed AuxiliaryPointer struct
284 """
285 aux_value = DataParser.parse_int24(data, 0, signed=False)
286 channel_index = aux_value & _ExtendedHeaderBitMasks.AUX_CHANNEL_MASK
287 ca = bool((aux_value >> _ExtendedHeaderBitMasks.AUX_CA_SHIFT) & 0x01)
288 offset_units = (aux_value >> _ExtendedHeaderBitMasks.AUX_OFFSET_UNITS_SHIFT) & 0x01
289 aux_offset = (aux_value >> _ExtendedHeaderBitMasks.AUX_OFFSET_SHIFT) & _ExtendedHeaderBitMasks.AUX_OFFSET_MASK
290 aux_phy_raw = (aux_value >> _ExtendedHeaderBitMasks.AUX_PHY_SHIFT) & _ExtendedHeaderBitMasks.AUX_PHY_MASK
291 aux_phy = PHYType(min(aux_phy_raw, PHYType.LE_CODED)) # Clamp to valid range
292 return AuxiliaryPointer(
293 channel_index=channel_index,
294 ca=ca,
295 offset_units=offset_units,
296 aux_offset=aux_offset,
297 aux_phy=aux_phy,
298 )
300 def _parse_sync_info(self, data: bytes) -> SyncInfo:
301 """Parse SyncInfo from raw bytes.
303 Args:
304 data: 18-byte SyncInfo field
306 Returns:
307 Parsed SyncInfo struct
308 """
309 # Bytes 0-1: Sync packet offset and units (13 bits offset + 1 bit units + 2 reserved)
310 offset_field = DataParser.parse_int16(data, 0, signed=False)
311 sync_packet_offset = offset_field & _ExtendedHeaderBitMasks.SYNC_OFFSET_MASK
312 offset_units = (offset_field >> _ExtendedHeaderBitMasks.SYNC_OFFSET_UNITS_SHIFT) & 0x01
314 # Bytes 2-3: Interval
315 interval = DataParser.parse_int16(data, 2, signed=False)
317 # Bytes 4-8: Channel map (37 bits, stored in 5 bytes)
318 # Note: 40-bit field exceeds DataParser's 32-bit limit, use int.from_bytes directly
319 channel_map = int.from_bytes(data[4:9], byteorder="little") & _ExtendedHeaderBitMasks.SYNC_CHANNEL_MAP_MASK
321 # Byte 9: SCA (3 bits) and other fields
322 sca_byte = DataParser.parse_int8(data, 9, signed=False)
323 sleep_clock_accuracy = (
324 sca_byte >> _ExtendedHeaderBitMasks.SYNC_SCA_SHIFT
325 ) & _ExtendedHeaderBitMasks.SYNC_SCA_MASK
327 # Bytes 10-15: Access Address (we interpret this as advertiser address)
328 advertising_address = self._parse_address_to_string(data[10:16])
329 advertising_address_type = (sca_byte >> _ExtendedHeaderBitMasks.SYNC_ADDR_TYPE_SHIFT) & 0x01
331 # Bytes 16-17: Sync counter/CRC init
332 sync_counter = DataParser.parse_int16(data, 16, signed=False)
334 return SyncInfo(
335 sync_packet_offset=sync_packet_offset,
336 offset_units=offset_units,
337 interval=interval,
338 channel_map=channel_map,
339 sleep_clock_accuracy=sleep_clock_accuracy,
340 advertising_address=advertising_address,
341 advertising_address_type=advertising_address_type,
342 sync_counter=sync_counter,
343 )
345 def _parse_extended_header(self, data: bytes) -> BLEExtendedHeader | None:
346 """Parse extended header from PDU data.
348 Args:
349 data: Extended header data
351 Returns:
352 Parsed BLEExtendedHeader or None if invalid
354 """
355 # pylint: disable=too-many-return-statements,too-many-branches
356 if len(data) < 1:
357 return None
359 extended_header_length = data[0]
361 if len(data) < extended_header_length + 1:
362 return None
364 adv_mode = data[1]
365 offset = PDULayout.ADV_ADDR_OFFSET # Start after length and mode bytes
367 extended_advertiser_address = ""
368 extended_target_address = ""
369 cte_info: CTEInfo | None = None
370 advertising_data_info: AdvertisingDataInfo | None = None
371 auxiliary_pointer: AuxiliaryPointer | None = None
372 sync_info: SyncInfo | None = None
373 tx_power: int | None = None
374 additional_controller_advertising_data = b""
376 # Check ADV_ADDR flag
377 if adv_mode & ExtendedHeaderFlags.ADV_ADDR:
378 if offset + PDULayout.BLE_ADDR > len(data):
379 return None
380 extended_advertiser_address = self._parse_address_to_string(data[offset : offset + PDULayout.BLE_ADDR])
381 offset += PDULayout.BLE_ADDR
383 # Check TARGET_ADDR flag
384 if adv_mode & ExtendedHeaderFlags.TARGET_ADDR:
385 if offset + PDULayout.BLE_ADDR > len(data):
386 return None
387 extended_target_address = self._parse_address_to_string(data[offset : offset + PDULayout.BLE_ADDR])
388 offset += PDULayout.BLE_ADDR
390 # Check CTE_INFO flag
391 if adv_mode & ExtendedHeaderFlags.CTE_INFO:
392 if offset + PDULayout.CTE_INFO > len(data):
393 return None
394 cte_info = self._parse_cte_info(data[offset : offset + PDULayout.CTE_INFO])
395 offset += PDULayout.CTE_INFO
397 # Check ADV_DATA_INFO flag
398 if adv_mode & ExtendedHeaderFlags.ADV_DATA_INFO:
399 if offset + PDULayout.ADV_DATA_INFO > len(data):
400 return None
401 advertising_data_info = self._parse_advertising_data_info(data[offset : offset + PDULayout.ADV_DATA_INFO])
402 offset += PDULayout.ADV_DATA_INFO
404 # Check AUX_PTR flag
405 if adv_mode & ExtendedHeaderFlags.AUX_PTR:
406 if offset + PDULayout.AUX_PTR > len(data):
407 return None
408 auxiliary_pointer = self._parse_auxiliary_pointer_struct(data[offset : offset + PDULayout.AUX_PTR])
409 offset += PDULayout.AUX_PTR
411 # Check SYNC_INFO flag
412 if adv_mode & ExtendedHeaderFlags.SYNC_INFO:
413 if offset + PDULayout.SYNC_INFO > len(data):
414 return None
415 sync_info = self._parse_sync_info(data[offset : offset + PDULayout.SYNC_INFO])
416 offset += PDULayout.SYNC_INFO
418 # Check TX_POWER flag
419 if adv_mode & ExtendedHeaderFlags.TX_POWER:
420 if offset + PDULayout.TX_POWER > len(data):
421 return None
422 tx_power = DataParser.parse_int8(data, offset, signed=True)
423 offset += PDULayout.TX_POWER
425 # Check ACAD flag
426 if adv_mode & ExtendedHeaderFlags.ACAD:
427 additional_controller_advertising_data = data[offset:]
429 return BLEExtendedHeader(
430 extended_header_length=extended_header_length,
431 adv_mode=adv_mode,
432 extended_advertiser_address=extended_advertiser_address,
433 extended_target_address=extended_target_address,
434 cte_info=cte_info,
435 advertising_data_info=advertising_data_info,
436 auxiliary_pointer=auxiliary_pointer,
437 sync_info=sync_info,
438 tx_power=tx_power,
439 additional_controller_advertising_data=additional_controller_advertising_data,
440 )
442 def _parse_auxiliary_packets(self, aux_ptr: AuxiliaryPointer) -> list[BLEAdvertisingPDU]:
443 """Parse auxiliary packets referenced by auxiliary pointer.
445 Args:
446 aux_ptr: Parsed AuxiliaryPointer struct
448 Returns:
449 List of auxiliary packets (currently returns empty list)
451 Note:
452 This is a placeholder. Full implementation would require
453 receiving raw PDU data from the auxiliary channel specified
454 in the AuxiliaryPointer (channel_index, aux_offset, aux_phy).
455 """
456 # Auxiliary packets are received over the air on a different channel
457 # at a later time. This cannot be parsed from a single PDU capture.
458 # The AuxiliaryPointer tells us where to listen, not the data itself.
459 _ = aux_ptr # Acknowledge the parameter for future implementation
460 return []
462 def _parse_legacy_advertising(self, raw_data: bytes) -> AdvertisingData:
463 """Parse legacy advertising data.
465 Args:
466 raw_data: Raw legacy advertising data
468 Returns:
469 Parsed AdvertisingData
471 """
472 parsed_data = self._parse_ad_structures(raw_data)
473 return AdvertisingData(
474 raw_data=raw_data,
475 ad_structures=parsed_data,
476 )
478 @staticmethod
479 def _parse_address_list(ad_data: bytes) -> list[str]:
480 """Parse list of 6-byte Bluetooth addresses from raw data.
482 Args:
483 ad_data: Raw address data (multiple 6-byte addresses)
485 Returns:
486 List of formatted address strings (XX:XX:XX:XX:XX:XX)
487 """
488 addresses: list[str] = []
489 for j in range(0, len(ad_data), 6):
490 if j + 5 < len(ad_data):
491 addr_bytes = ad_data[j : j + 6]
492 addresses.append(":".join(f"{b:02X}" for b in addr_bytes[::-1]))
493 return addresses
495 @staticmethod
496 def _parse_16bit_uuids(ad_data: bytes) -> list[BluetoothUUID]:
497 """Parse list of 16-bit service UUIDs from raw data.
499 Args:
500 ad_data: Raw UUID data
502 Returns:
503 List of BluetoothUUID objects
504 """
505 uuids: list[BluetoothUUID] = []
506 for j in range(0, len(ad_data), 2):
507 if j + 1 < len(ad_data):
508 uuid_short = DataParser.parse_int16(ad_data, j, signed=False)
509 uuids.append(BluetoothUUID(uuid_short))
510 return uuids
512 @staticmethod
513 def _parse_32bit_uuids(ad_data: bytes) -> list[BluetoothUUID]:
514 """Parse list of 32-bit service UUIDs from raw data.
516 Args:
517 ad_data: Raw UUID data
519 Returns:
520 List of BluetoothUUID objects
521 """
522 uuids: list[BluetoothUUID] = []
523 for j in range(0, len(ad_data), 4):
524 if j + 3 < len(ad_data):
525 uuid_32 = DataParser.parse_int32(ad_data, j, signed=False)
526 uuids.append(BluetoothUUID(uuid_32))
527 return uuids
529 @staticmethod
530 def _parse_128bit_uuids(ad_data: bytes) -> list[BluetoothUUID]:
531 """Parse list of 128-bit service UUIDs from raw data.
533 Args:
534 ad_data: Raw UUID data
536 Returns:
537 List of BluetoothUUID objects
538 """
539 uuids: list[BluetoothUUID] = []
540 for j in range(0, len(ad_data), 16):
541 if j + 15 < len(ad_data):
542 uuids.append(BluetoothUUID(ad_data[j : j + 16].hex().upper()))
543 return uuids
545 def _parse_manufacturer_data(self, ad_data: bytes, parsed: AdvertisingDataStructures) -> None:
546 """Parse manufacturer-specific data and resolve company name.
548 Args:
549 ad_data: Raw manufacturer-specific data bytes (company ID + payload)
550 parsed: AdvertisingDataStructures object to update
552 """
553 mfr_data = ManufacturerData.from_bytes(ad_data)
554 parsed.core.manufacturer_data[mfr_data.company.id] = mfr_data
556 def _handle_core_ad_types(self, ad_type: int, ad_data: bytes, parsed: AdvertisingDataStructures) -> bool:
557 """Handle core advertising data types (service UUIDs, names, etc).
559 Returns:
560 True if ad_type was handled, False otherwise
561 """
562 if ad_type in (ADType.INCOMPLETE_16BIT_SERVICE_UUIDS, ADType.COMPLETE_16BIT_SERVICE_UUIDS):
563 parsed.core.service_uuids.extend(self._parse_16bit_uuids(ad_data))
564 elif ad_type in (ADType.INCOMPLETE_32BIT_SERVICE_UUIDS, ADType.COMPLETE_32BIT_SERVICE_UUIDS):
565 parsed.core.service_uuids.extend(self._parse_32bit_uuids(ad_data))
566 elif ad_type in (ADType.INCOMPLETE_128BIT_SERVICE_UUIDS, ADType.COMPLETE_128BIT_SERVICE_UUIDS):
567 parsed.core.service_uuids.extend(self._parse_128bit_uuids(ad_data))
568 elif ad_type == ADType.SOLICITED_SERVICE_UUIDS_16BIT:
569 parsed.core.solicited_service_uuids.extend(self._parse_16bit_uuids(ad_data))
570 elif ad_type == ADType.SOLICITED_SERVICE_UUIDS_32BIT:
571 parsed.core.solicited_service_uuids.extend(self._parse_32bit_uuids(ad_data))
572 elif ad_type == ADType.SOLICITED_SERVICE_UUIDS_128BIT:
573 parsed.core.solicited_service_uuids.extend(self._parse_128bit_uuids(ad_data))
574 elif ad_type in (ADType.SHORTENED_LOCAL_NAME, ADType.COMPLETE_LOCAL_NAME):
575 try:
576 parsed.core.local_name = ad_data.decode("utf-8")
577 except UnicodeDecodeError:
578 parsed.core.local_name = ad_data.hex()
579 elif ad_type == ADType.URI:
580 parsed.core.uri_data = URIData.from_raw_data(ad_data)
581 elif ad_type == ADType.SERVICE_DATA_16BIT and len(ad_data) >= SIZE_UINT16:
582 service_uuid = BluetoothUUID(DataParser.parse_int16(ad_data, 0, signed=False))
583 parsed.core.service_data[service_uuid] = ad_data[2:]
584 if service_uuid not in parsed.core.service_uuids:
585 parsed.core.service_uuids.append(service_uuid)
586 elif ad_type == ADType.SERVICE_DATA_32BIT and len(ad_data) >= SIZE_UINT32:
587 service_uuid = BluetoothUUID(DataParser.parse_int32(ad_data, 0, signed=False))
588 parsed.core.service_data[service_uuid] = ad_data[4:]
589 if service_uuid not in parsed.core.service_uuids:
590 parsed.core.service_uuids.append(service_uuid)
591 elif ad_type == ADType.SERVICE_DATA_128BIT and len(ad_data) >= SIZE_UUID128:
592 service_uuid = BluetoothUUID(ad_data[:16].hex().upper())
593 parsed.core.service_data[service_uuid] = ad_data[16:]
594 if service_uuid not in parsed.core.service_uuids:
595 parsed.core.service_uuids.append(service_uuid)
596 else:
597 return False
598 return True
600 def _handle_property_ad_types(self, ad_type: int, ad_data: bytes, parsed: AdvertisingDataStructures) -> bool:
601 """Handle device property advertising data types (flags, power, appearance, etc).
603 Returns:
604 True if ad_type was handled, False otherwise
605 """
606 if ad_type == ADType.FLAGS and len(ad_data) >= 1:
607 parsed.properties.flags = BLEAdvertisingFlags(ad_data[0])
608 elif ad_type == ADType.TX_POWER_LEVEL and len(ad_data) >= 1:
609 parsed.properties.tx_power = int.from_bytes(ad_data[:1], byteorder="little", signed=True)
610 elif ad_type == ADType.APPEARANCE and len(ad_data) >= SIZE_UINT16:
611 raw_value = DataParser.parse_int16(ad_data, 0, signed=False)
612 appearance_info = appearance_values_registry.get_appearance_info(raw_value)
613 parsed.properties.appearance = AppearanceData(raw_value=raw_value, info=appearance_info)
614 elif ad_type == ADType.LE_SUPPORTED_FEATURES:
615 parsed.properties.le_supported_features = LEFeatures(raw_value=ad_data)
616 elif ad_type == ADType.LE_ROLE and len(ad_data) >= 1:
617 parsed.properties.le_role = ad_data[0]
618 elif ad_type == ADType.CLASS_OF_DEVICE and len(ad_data) >= SIZE_UINT24:
619 raw_cod = int.from_bytes(ad_data[:3], byteorder="little", signed=False)
620 parsed.properties.class_of_device = class_of_device_registry.decode_class_of_device(raw_cod)
621 elif ad_type == ADType.MANUFACTURER_SPECIFIC_DATA and len(ad_data) >= SIZE_UINT16:
622 self._parse_manufacturer_data(ad_data, parsed)
623 else:
624 return False
625 return True
627 def _handle_mesh_ad_types(self, ad_type: int, ad_data: bytes, parsed: AdvertisingDataStructures) -> bool:
628 """Handle mesh networking advertising data types.
630 Returns:
631 True if ad_type was handled, False otherwise
632 """
633 if ad_type == ADType.PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION:
634 parsed.mesh.periodic_advertising_response_timing = ad_data
635 elif ad_type == ADType.ELECTRONIC_SHELF_LABEL:
636 parsed.mesh.electronic_shelf_label = ad_data
637 elif ad_type == ADType.BROADCAST_NAME:
638 try:
639 parsed.mesh.broadcast_name = ad_data.decode("utf-8")
640 except UnicodeDecodeError:
641 parsed.mesh.broadcast_name = ad_data.hex()
642 elif ad_type == ADType.BROADCAST_CODE:
643 parsed.mesh.broadcast_code = ad_data
644 elif ad_type == ADType.BIGINFO:
645 parsed.mesh.biginfo = ad_data
646 elif ad_type == ADType.MESH_MESSAGE:
647 parsed.mesh.mesh_message = MeshMessage.decode(ad_data)
648 elif ad_type == ADType.MESH_BEACON:
649 self._parse_mesh_beacon(ad_data, parsed)
650 elif ad_type == ADType.PB_ADV:
651 parsed.mesh.provisioning_bearer = ProvisioningBearerData.decode(ad_data)
652 else:
653 return False
654 return True
656 def _parse_mesh_beacon(self, ad_data: bytes, parsed: AdvertisingDataStructures) -> None:
657 """Parse mesh beacon data into appropriate typed beacon.
659 Args:
660 ad_data: Raw beacon advertisement data
661 parsed: Advertising data structures to populate
663 """
664 if len(ad_data) < 1:
665 return
667 beacon_type = ad_data[0]
668 beacon_data = ad_data[1:]
670 if beacon_type == MeshBeaconType.SECURE_NETWORK:
671 parsed.mesh.secure_network_beacon = SecureNetworkBeacon.decode(beacon_data)
672 elif beacon_type == MeshBeaconType.UNPROVISIONED_DEVICE:
673 parsed.mesh.unprovisioned_device_beacon = UnprovisionedDeviceBeacon.decode(beacon_data)
675 def _handle_security_ad_types(self, ad_type: int, ad_data: bytes, parsed: AdvertisingDataStructures) -> bool:
676 """Handle security/pairing advertising data types.
678 Returns:
679 True if ad_type was handled, False otherwise
680 """
681 if ad_type == ADType.ENCRYPTED_ADVERTISING_DATA:
682 parsed.security.encrypted_advertising_data = ad_data
683 elif ad_type == ADType.RESOLVABLE_SET_IDENTIFIER:
684 parsed.security.resolvable_set_identifier = ad_data
685 elif ad_type == ADType.SIMPLE_PAIRING_HASH_C:
686 parsed.oob_security.simple_pairing_hash_c = ad_data
687 elif ad_type == ADType.SIMPLE_PAIRING_RANDOMIZER_R:
688 parsed.oob_security.simple_pairing_randomizer_r = ad_data
689 elif ad_type == ADType.SECURITY_MANAGER_TK_VALUE:
690 parsed.oob_security.security_manager_tk_value = ad_data
691 elif ad_type == ADType.SECURITY_MANAGER_OUT_OF_BAND_FLAGS:
692 parsed.oob_security.security_manager_oob_flags = ad_data
693 elif ad_type == ADType.SECURE_CONNECTIONS_CONFIRMATION_VALUE:
694 parsed.oob_security.secure_connections_confirmation = ad_data
695 elif ad_type == ADType.SECURE_CONNECTIONS_RANDOM_VALUE:
696 parsed.oob_security.secure_connections_random = ad_data
697 else:
698 return False
699 return True
701 def _handle_directed_ad_types(self, ad_type: int, ad_data: bytes, parsed: AdvertisingDataStructures) -> bool:
702 """Handle directed/connection advertising data types.
704 Returns:
705 True if ad_type was handled, False otherwise
706 """
707 if ad_type == ADType.PUBLIC_TARGET_ADDRESS:
708 parsed.directed.public_target_address.extend(self._parse_address_list(ad_data))
709 elif ad_type == ADType.RANDOM_TARGET_ADDRESS:
710 parsed.directed.random_target_address.extend(self._parse_address_list(ad_data))
711 elif ad_type == ADType.ADVERTISING_INTERVAL and len(ad_data) >= SIZE_UINT16:
712 parsed.directed.advertising_interval = DataParser.parse_int16(ad_data, 0, signed=False)
713 elif ad_type == ADType.ADVERTISING_INTERVAL_LONG and len(ad_data) >= SIZE_UINT24:
714 parsed.directed.advertising_interval_long = int.from_bytes(ad_data[:3], byteorder="little", signed=False)
715 elif ad_type == ADType.LE_BLUETOOTH_DEVICE_ADDRESS and len(ad_data) >= SIZE_UINT48:
716 addr_bytes = ad_data[:6]
717 parsed.directed.le_bluetooth_device_address = ":".join(f"{b:02X}" for b in addr_bytes[::-1])
718 elif ad_type == ADType.SLAVE_CONNECTION_INTERVAL_RANGE and len(ad_data) >= SIZE_UINT32:
719 min_interval = DataParser.parse_int16(ad_data, 0, signed=False)
720 max_interval = DataParser.parse_int16(ad_data, 2, signed=False)
721 parsed.directed.peripheral_connection_interval_range = ConnectionIntervalRange(
722 min_interval=min_interval,
723 max_interval=max_interval,
724 )
725 else:
726 return False
727 return True
729 def _handle_location_ad_types(self, ad_type: int, ad_data: bytes, parsed: AdvertisingDataStructures) -> bool:
730 """Handle location/positioning advertising data types.
732 Returns:
733 True if ad_type was handled, False otherwise
734 """
735 if ad_type == ADType.INDOOR_POSITIONING:
736 parsed.location.indoor_positioning = IndoorPositioningData.decode(ad_data)
737 elif ad_type == ADType.TRANSPORT_DISCOVERY_DATA:
738 parsed.location.transport_discovery_data = TransportDiscoveryData.decode(ad_data)
739 elif ad_type == ADType.THREE_D_INFORMATION_DATA:
740 parsed.location.three_d_information = ThreeDInformationData.decode(ad_data)
741 elif ad_type == ADType.CHANNEL_MAP_UPDATE_INDICATION:
742 parsed.location.channel_map_update_indication = ChannelMapUpdateIndication.decode(ad_data)
743 else:
744 return False
745 return True
747 def _parse_ad_structures(self, data: bytes) -> AdvertisingDataStructures:
748 """Parse advertising data structures from raw bytes.
750 Args:
751 data: Raw advertising data payload
753 Returns:
754 AdvertisingDataStructures object with extracted data
756 """
757 parsed = AdvertisingDataStructures()
759 i = 0
760 while i < len(data):
761 if i + 1 >= len(data):
762 break
764 length = data[i]
765 if length == 0 or i + length + 1 > len(data):
766 break
768 ad_type = data[i + 1]
769 ad_data = data[i + 2 : i + length + 1]
771 if not ad_types_registry.is_known_ad_type(ad_type):
772 logger.warning("Unknown AD type encountered: 0x%02X", ad_type)
774 # Dispatch to category handlers
775 self._handle_core_ad_types(ad_type, ad_data, parsed) or self._handle_property_ad_types(
776 ad_type, ad_data, parsed
777 ) or self._handle_mesh_ad_types(ad_type, ad_data, parsed) or self._handle_security_ad_types(
778 ad_type, ad_data, parsed
779 ) or self._handle_directed_ad_types(ad_type, ad_data, parsed) or self._handle_location_ad_types(
780 ad_type, ad_data, parsed
781 )
783 i += length + 1
785 return parsed