Coverage for src/bluetooth_sig/device/advertising_parser.py: 21%
215 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 00:10 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 00:10 +0000
1"""Advertising data parser for BLE devices.
3This module provides a dedicated parser for BLE advertising data
4packets, extracting device information, manufacturer data, and service
5UUIDs from both legacy and extended advertising formats.
6"""
8from __future__ import annotations
10from ..gatt.characteristics.utils import DataParser
11from ..types import (
12 BLEAdvertisementTypes,
13 BLEAdvertisingFlags,
14 BLEAdvertisingPDU,
15 BLEExtendedHeader,
16 DeviceAdvertiserData,
17 ParsedADStructures,
18 PDUConstants,
19 PDUFlags,
20 PDUType,
21)
24class AdvertisingParser: # pylint: disable=too-few-public-methods
25 """Parser for BLE advertising data packets.
27 Handles both legacy and extended advertising PDU formats, extracting
28 device information, manufacturer data, and service UUIDs.
29 """
31 def parse_advertising_data(self, raw_data: bytes) -> DeviceAdvertiserData:
32 """Parse raw advertising data and return structured information.
34 Args:
35 raw_data: Raw bytes from BLE advertising packet
37 Returns:
38 DeviceAdvertiserData with parsed information
40 """
41 if self._is_extended_advertising_pdu(raw_data):
42 return self._parse_extended_advertising(raw_data)
43 return self._parse_legacy_advertising(raw_data)
45 def _is_extended_advertising_pdu(self, data: bytes) -> bool:
46 """Check if the advertising data is an extended advertising PDU.
48 Args:
49 data: Raw advertising data bytes
51 Returns:
52 True if extended advertising PDU, False otherwise
54 """
55 if len(data) < PDUConstants.PDU_HEADER:
56 return False
58 pdu_header = data[0]
59 pdu_type = pdu_header & PDUFlags.TYPE_MASK
61 return pdu_type in (PDUType.ADV_EXT_IND.value, PDUType.ADV_AUX_IND.value)
63 def _parse_extended_advertising(self, raw_data: bytes) -> DeviceAdvertiserData:
64 """Parse extended advertising data.
66 Args:
67 raw_data: Raw extended advertising data
69 Returns:
70 Parsed DeviceAdvertiserData
72 """
73 if len(raw_data) < PDUConstants.MIN_EXTENDED_PDU:
74 return self._parse_legacy_advertising(raw_data)
76 pdu = self._parse_extended_pdu(raw_data)
78 if not pdu:
79 return self._parse_legacy_advertising(raw_data)
81 parsed_data = ParsedADStructures()
83 if pdu.payload:
84 parsed_data = self._parse_ad_structures(pdu.payload)
86 auxiliary_packets: list[BLEAdvertisingPDU] = []
87 if pdu.extended_header and pdu.extended_header.auxiliary_pointer:
88 aux_packets = self._parse_auxiliary_packets(pdu.extended_header.auxiliary_pointer)
89 auxiliary_packets.extend(aux_packets)
91 return DeviceAdvertiserData(
92 raw_data=raw_data,
93 local_name=parsed_data.local_name,
94 manufacturer_data=parsed_data.manufacturer_data,
95 service_uuids=parsed_data.service_uuids,
96 tx_power=parsed_data.tx_power,
97 flags=parsed_data.flags,
98 extended_payload=pdu.payload,
99 auxiliary_packets=auxiliary_packets,
100 )
102 def _parse_extended_pdu(self, data: bytes) -> BLEAdvertisingPDU | None:
103 """Parse extended PDU header and payload.
105 Args:
106 data: Raw PDU data
108 Returns:
109 Parsed BLEAdvertisingPDU or None if invalid
111 """
112 if len(data) < PDUConstants.MIN_EXTENDED_PDU:
113 return None
115 header = int.from_bytes(data[0 : PDUConstants.PDU_HEADER], byteorder="little")
116 pdu_type = header & PDUFlags.TYPE_MASK
117 tx_add = bool(header & PDUFlags.TX_ADD_MASK)
118 rx_add = bool(header & PDUFlags.RX_ADD_MASK)
120 length = data[PDUConstants.PDU_LENGTH_OFFSET]
122 if len(data) < PDUConstants.MIN_EXTENDED_PDU + length:
123 return None
125 extended_header_start = PDUConstants.EXTENDED_HEADER_START
127 extended_header = self._parse_extended_header(data[extended_header_start:])
129 if not extended_header:
130 return None
132 payload_start = extended_header_start + extended_header.extended_header_length + PDUConstants.EXT_HEADER_LENGTH
133 payload_length = length - (extended_header.extended_header_length + PDUConstants.EXT_HEADER_LENGTH)
135 if payload_start + payload_length > len(data):
136 return None
138 payload = data[payload_start : payload_start + payload_length]
140 adva = extended_header.extended_advertiser_address
141 targeta = extended_header.extended_target_address
143 return BLEAdvertisingPDU(
144 pdu_type=PDUType(pdu_type),
145 tx_add=tx_add,
146 rx_add=rx_add,
147 length=length,
148 advertiser_address=adva,
149 target_address=targeta,
150 payload=payload,
151 extended_header=extended_header,
152 )
154 def _parse_extended_header(self, data: bytes) -> BLEExtendedHeader | None:
155 """Parse extended header from PDU data.
157 Args:
158 data: Extended header data
160 Returns:
161 Parsed BLEExtendedHeader or None if invalid
163 """
164 # pylint: disable=too-many-return-statements,too-many-branches
165 if len(data) < 1:
166 return None
168 header = BLEExtendedHeader()
169 header.extended_header_length = data[0]
171 if len(data) < header.extended_header_length + 1:
172 return None
174 adv_mode = data[1]
175 header.adv_mode = adv_mode
177 offset = PDUConstants.ADV_ADDR_OFFSET # Start after length and mode bytes
179 if header.has_extended_advertiser_address:
180 if offset + PDUConstants.BLE_ADDR > len(data):
181 return None
182 header.extended_advertiser_address = data[offset : offset + PDUConstants.BLE_ADDR]
183 offset += PDUConstants.BLE_ADDR
185 if header.has_extended_target_address:
186 if offset + PDUConstants.BLE_ADDR > len(data):
187 return None
188 header.extended_target_address = data[offset : offset + PDUConstants.BLE_ADDR]
189 offset += PDUConstants.BLE_ADDR
191 if header.has_cte_info:
192 if offset + PDUConstants.CTE_INFO > len(data):
193 return None
194 header.cte_info = data[offset : offset + PDUConstants.CTE_INFO]
195 offset += PDUConstants.CTE_INFO
197 if header.has_advertising_data_info:
198 if offset + PDUConstants.ADV_DATA_INFO > len(data):
199 return None
200 header.advertising_data_info = data[offset : offset + PDUConstants.ADV_DATA_INFO]
201 offset += PDUConstants.ADV_DATA_INFO
203 if header.has_auxiliary_pointer:
204 if offset + PDUConstants.AUX_PTR > len(data):
205 return None
206 header.auxiliary_pointer = data[offset : offset + PDUConstants.AUX_PTR]
207 offset += PDUConstants.AUX_PTR
209 if header.has_sync_info:
210 if offset + PDUConstants.SYNC_INFO > len(data):
211 return None
212 header.sync_info = data[offset : offset + PDUConstants.SYNC_INFO]
213 offset += PDUConstants.SYNC_INFO
215 if header.has_tx_power:
216 if offset + PDUConstants.TX_POWER > len(data):
217 return None
218 header.tx_power = int.from_bytes(
219 data[offset : offset + PDUConstants.TX_POWER],
220 byteorder="little",
221 signed=True,
222 )
223 offset += PDUConstants.TX_POWER
225 if header.has_additional_controller_data:
226 header.additional_controller_advertising_data = data[offset:]
228 return header
230 def _parse_auxiliary_packets(self, aux_ptr: bytes) -> list[BLEAdvertisingPDU]:
231 """Parse auxiliary packets referenced by auxiliary pointer.
233 Args:
234 aux_ptr: Auxiliary pointer data
236 Returns:
237 List of auxiliary packets (currently returns empty list)
239 """
240 if len(aux_ptr) != PDUConstants.AUX_PTR:
241 return []
243 return []
245 def _parse_legacy_advertising(self, raw_data: bytes) -> DeviceAdvertiserData:
246 """Parse legacy advertising data.
248 Args:
249 raw_data: Raw legacy advertising data
251 Returns:
252 Parsed DeviceAdvertiserData
254 """
255 parsed_data = self._parse_ad_structures(raw_data)
257 return DeviceAdvertiserData(
258 raw_data=raw_data,
259 local_name=parsed_data.local_name,
260 manufacturer_data=parsed_data.manufacturer_data,
261 service_uuids=parsed_data.service_uuids,
262 tx_power=parsed_data.tx_power if parsed_data.tx_power != 0 else None,
263 flags=parsed_data.flags if parsed_data.flags != 0 else None,
264 appearance=parsed_data.appearance,
265 service_data=parsed_data.service_data,
266 solicited_service_uuids=parsed_data.solicited_service_uuids,
267 uri=parsed_data.uri,
268 indoor_positioning=parsed_data.indoor_positioning,
269 transport_discovery_data=parsed_data.transport_discovery_data,
270 le_supported_features=parsed_data.le_supported_features,
271 encrypted_advertising_data=parsed_data.encrypted_advertising_data,
272 periodic_advertising_response_timing=parsed_data.periodic_advertising_response_timing,
273 electronic_shelf_label=parsed_data.electronic_shelf_label,
274 three_d_information=parsed_data.three_d_information,
275 broadcast_name=parsed_data.broadcast_name,
276 biginfo=parsed_data.biginfo,
277 mesh_message=parsed_data.mesh_message,
278 mesh_beacon=parsed_data.mesh_beacon,
279 public_target_address=parsed_data.public_target_address,
280 random_target_address=parsed_data.random_target_address,
281 advertising_interval=parsed_data.advertising_interval,
282 advertising_interval_long=parsed_data.advertising_interval_long,
283 le_bluetooth_device_address=parsed_data.le_bluetooth_device_address,
284 le_role=parsed_data.le_role,
285 class_of_device=parsed_data.class_of_device,
286 simple_pairing_hash_c=parsed_data.simple_pairing_hash_c,
287 simple_pairing_randomizer_r=parsed_data.simple_pairing_randomizer_r,
288 security_manager_tk_value=parsed_data.security_manager_tk_value,
289 security_manager_out_of_band_flags=parsed_data.security_manager_out_of_band_flags,
290 slave_connection_interval_range=parsed_data.slave_connection_interval_range,
291 secure_connections_confirmation=parsed_data.secure_connections_confirmation,
292 secure_connections_random=parsed_data.secure_connections_random,
293 channel_map_update_indication=parsed_data.channel_map_update_indication,
294 pb_adv=parsed_data.pb_adv,
295 resolvable_set_identifier=parsed_data.resolvable_set_identifier,
296 )
298 def _parse_ad_structures(self, data: bytes) -> ParsedADStructures:
299 """Parse advertising data structures from raw bytes.
301 Args:
302 data: Raw advertising data payload
304 Returns:
305 ParsedADStructures object with extracted data
307 """
308 # pylint: disable=too-many-branches,too-many-statements
309 parsed = ParsedADStructures()
311 i = 0
312 while i < len(data):
313 if i + 1 >= len(data):
314 break
316 length = data[i]
317 if length == 0 or i + length + 1 > len(data):
318 break
320 ad_type = data[i + 1]
321 ad_data = data[i + 2 : i + length + 1]
323 if ad_type == BLEAdvertisementTypes.FLAGS and len(ad_data) >= 1:
324 parsed.flags = BLEAdvertisingFlags(ad_data[0])
325 elif ad_type in (
326 BLEAdvertisementTypes.INCOMPLETE_16BIT_SERVICE_UUIDS,
327 BLEAdvertisementTypes.COMPLETE_16BIT_SERVICE_UUIDS,
328 ):
329 for j in range(0, len(ad_data), 2):
330 if j + 1 < len(ad_data):
331 uuid_short = DataParser.parse_int16(ad_data, j, signed=False)
332 parsed.service_uuids.append(f"{uuid_short:04X}")
333 elif ad_type in (
334 BLEAdvertisementTypes.SHORTENED_LOCAL_NAME,
335 BLEAdvertisementTypes.COMPLETE_LOCAL_NAME,
336 ):
337 try:
338 parsed.local_name = ad_data.decode("utf-8")
339 except UnicodeDecodeError:
340 parsed.local_name = ad_data.hex()
341 elif ad_type == BLEAdvertisementTypes.TX_POWER_LEVEL and len(ad_data) >= 1:
342 parsed.tx_power = int.from_bytes(ad_data[:1], byteorder="little", signed=True)
343 elif ad_type == BLEAdvertisementTypes.MANUFACTURER_SPECIFIC_DATA and len(ad_data) >= 2:
344 company_id = DataParser.parse_int16(ad_data, 0, signed=False)
345 parsed.manufacturer_data[company_id] = ad_data[2:]
346 elif ad_type == BLEAdvertisementTypes.APPEARANCE and len(ad_data) >= 2:
347 parsed.appearance = DataParser.parse_int16(ad_data, 0, signed=False)
348 elif ad_type == BLEAdvertisementTypes.SERVICE_DATA_16BIT and len(ad_data) >= 2:
349 service_uuid = f"{DataParser.parse_int16(ad_data, 0, signed=False):04X}"
350 parsed.service_data[service_uuid] = ad_data[2:]
351 elif ad_type == BLEAdvertisementTypes.URI:
352 try:
353 parsed.uri = ad_data.decode("utf-8")
354 except UnicodeDecodeError:
355 parsed.uri = ad_data.hex()
356 elif ad_type == BLEAdvertisementTypes.INDOOR_POSITIONING:
357 parsed.indoor_positioning = ad_data
358 elif ad_type == BLEAdvertisementTypes.TRANSPORT_DISCOVERY_DATA:
359 parsed.transport_discovery_data = ad_data
360 elif ad_type == BLEAdvertisementTypes.LE_SUPPORTED_FEATURES:
361 parsed.le_supported_features = ad_data
362 elif ad_type == BLEAdvertisementTypes.ENCRYPTED_ADVERTISING_DATA:
363 parsed.encrypted_advertising_data = ad_data
364 elif ad_type == BLEAdvertisementTypes.PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION:
365 parsed.periodic_advertising_response_timing = ad_data
366 elif ad_type == BLEAdvertisementTypes.ELECTRONIC_SHELF_LABEL:
367 parsed.electronic_shelf_label = ad_data
368 elif ad_type == BLEAdvertisementTypes.THREE_D_INFORMATION_DATA:
369 parsed.three_d_information = ad_data
370 elif ad_type == BLEAdvertisementTypes.BROADCAST_NAME:
371 try:
372 parsed.broadcast_name = ad_data.decode("utf-8")
373 except UnicodeDecodeError:
374 parsed.broadcast_name = ad_data.hex()
375 elif ad_type == BLEAdvertisementTypes.BROADCAST_CODE:
376 parsed.broadcast_code = ad_data
377 elif ad_type == BLEAdvertisementTypes.BIGINFO:
378 parsed.biginfo = ad_data
379 elif ad_type == BLEAdvertisementTypes.MESH_MESSAGE:
380 parsed.mesh_message = ad_data
381 elif ad_type == BLEAdvertisementTypes.MESH_BEACON:
382 parsed.mesh_beacon = ad_data
383 elif ad_type == BLEAdvertisementTypes.PUBLIC_TARGET_ADDRESS:
384 for j in range(0, len(ad_data), 6):
385 if j + 5 < len(ad_data):
386 addr_bytes = ad_data[j : j + 6]
387 addr_str = ":".join(f"{b:02X}" for b in addr_bytes[::-1])
388 parsed.public_target_address.append(addr_str)
389 elif ad_type == BLEAdvertisementTypes.RANDOM_TARGET_ADDRESS:
390 for j in range(0, len(ad_data), 6):
391 if j + 5 < len(ad_data):
392 addr_bytes = ad_data[j : j + 6]
393 addr_str = ":".join(f"{b:02X}" for b in addr_bytes[::-1])
394 parsed.random_target_address.append(addr_str)
395 elif ad_type == BLEAdvertisementTypes.ADVERTISING_INTERVAL and len(ad_data) >= 2:
396 parsed.advertising_interval = DataParser.parse_int16(ad_data, 0, signed=False)
397 elif ad_type == BLEAdvertisementTypes.ADVERTISING_INTERVAL_LONG and len(ad_data) >= 3:
398 parsed.advertising_interval_long = int.from_bytes(ad_data[:3], byteorder="little", signed=False)
399 elif ad_type == BLEAdvertisementTypes.LE_BLUETOOTH_DEVICE_ADDRESS and len(ad_data) >= 6:
400 addr_bytes = ad_data[:6]
401 parsed.le_bluetooth_device_address = ":".join(f"{b:02X}" for b in addr_bytes[::-1])
402 elif ad_type == BLEAdvertisementTypes.LE_ROLE and len(ad_data) >= 1:
403 parsed.le_role = ad_data[0]
404 elif ad_type == BLEAdvertisementTypes.CLASS_OF_DEVICE and len(ad_data) >= 3:
405 parsed.class_of_device = int.from_bytes(ad_data[:3], byteorder="little", signed=False)
406 elif ad_type == BLEAdvertisementTypes.SIMPLE_PAIRING_HASH_C:
407 parsed.simple_pairing_hash_c = ad_data
408 elif ad_type == BLEAdvertisementTypes.SIMPLE_PAIRING_RANDOMIZER_R:
409 parsed.simple_pairing_randomizer_r = ad_data
410 elif ad_type == BLEAdvertisementTypes.SECURITY_MANAGER_TK_VALUE:
411 parsed.security_manager_tk_value = ad_data
412 elif ad_type == BLEAdvertisementTypes.SECURITY_MANAGER_OUT_OF_BAND_FLAGS:
413 parsed.security_manager_out_of_band_flags = ad_data
414 elif ad_type == BLEAdvertisementTypes.SLAVE_CONNECTION_INTERVAL_RANGE:
415 parsed.slave_connection_interval_range = ad_data
416 elif ad_type == BLEAdvertisementTypes.SECURE_CONNECTIONS_CONFIRMATION_VALUE:
417 parsed.secure_connections_confirmation = ad_data
418 elif ad_type == BLEAdvertisementTypes.SECURE_CONNECTIONS_RANDOM_VALUE:
419 parsed.secure_connections_random = ad_data
420 elif ad_type == BLEAdvertisementTypes.CHANNEL_MAP_UPDATE_INDICATION:
421 parsed.channel_map_update_indication = ad_data
422 elif ad_type == BLEAdvertisementTypes.PB_ADV:
423 parsed.pb_adv = ad_data
424 elif ad_type == BLEAdvertisementTypes.RESOLVABLE_SET_IDENTIFIER:
425 parsed.resolvable_set_identifier = ad_data
427 i += length + 1
429 return parsed