Coverage for src / bluetooth_sig / advertising / ead_decryptor.py: 92%
98 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 20:14 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 20:14 +0000
1"""Pure decryption functions for BLE Encrypted Advertising Data (EAD).
3This module provides framework-agnostic decryption functions for EAD
4per Bluetooth Core Spec Supplement Section 1.23.
6The decryption uses AES-CCM with:
7- 128-bit (16-byte) session key
8- 13-byte nonce (5-byte randomizer + 6-byte device address + 2-byte padding)
9- 4-byte MIC (authentication tag)
11Requires the 'cryptography' package: pip install bluetooth-sig[ead]
12"""
14from __future__ import annotations
16import logging
18from cryptography.exceptions import InvalidTag
19from cryptography.hazmat.primitives.ciphers.aead import AESCCM
21from bluetooth_sig.advertising.encryption import EADKeyProvider
22from bluetooth_sig.types.address import mac_address_to_bytes
23from bluetooth_sig.types.ead import (
24 EAD_MIN_SIZE,
25 EAD_RANDOMIZER_SIZE,
26 EADDecryptResult,
27 EADError,
28 EncryptedAdvertisingData,
29)
31logger = logging.getLogger(__name__)
33# EAD cryptographic constants per Bluetooth Core Spec Supplement
34EAD_KEY_SIZE: int = 16 # 128-bit AES key
35EAD_NONCE_SIZE: int = 13 # Randomizer(5) + Address(6) + Padding(2)
36EAD_MIC_SIZE: int = 4 # 4-byte authentication tag
37EAD_ADDRESS_SIZE: int = 6 # BLE device address size
40def _get_aesccm_cipher(session_key: bytes) -> AESCCM:
41 """Create an AESCCM cipher instance.
43 Args:
44 session_key: 16-byte AES-128 session key
46 Returns:
47 AESCCM cipher instance configured for EAD (4-byte tag)
49 Raises:
50 ValueError: If session_key is not 16 bytes
51 """
52 if len(session_key) != EAD_KEY_SIZE:
53 msg = f"Session key must be {EAD_KEY_SIZE} bytes, got {len(session_key)}"
54 raise ValueError(msg)
56 return AESCCM(session_key, tag_length=EAD_MIC_SIZE)
59def build_ead_nonce(
60 randomizer: bytes,
61 device_address: bytes,
62) -> bytes:
63 """Build the 13-byte nonce for EAD decryption.
65 The nonce is constructed as:
66 Randomizer[5] + DeviceAddress[6] + Padding[2]
68 Args:
69 randomizer: 5-byte randomizer from EAD advertisement
70 device_address: 6-byte BLE device address (little-endian)
72 Returns:
73 13-byte nonce for AES-CCM decryption
75 Raises:
76 ValueError: If randomizer is not 5 bytes or address is not 6 bytes
78 Example:
79 >>> randomizer = bytes.fromhex("0102030405")
80 >>> address = bytes.fromhex("aabbccddeeff")
81 >>> nonce = build_ead_nonce(randomizer, address)
82 >>> len(nonce)
83 13
84 """
85 if len(randomizer) != EAD_RANDOMIZER_SIZE:
86 msg = f"Randomizer must be {EAD_RANDOMIZER_SIZE} bytes, got {len(randomizer)}"
87 raise ValueError(msg)
89 if len(device_address) != EAD_ADDRESS_SIZE:
90 msg = f"Device address must be {EAD_ADDRESS_SIZE} bytes, got {len(device_address)}"
91 raise ValueError(msg)
93 # Nonce: Randomizer(5) + Address(6) + Padding(2)
94 padding = b"\x00\x00"
95 return randomizer + device_address + padding
98def decrypt_ead( # pylint: disable=too-many-return-statements
99 encrypted_data: EncryptedAdvertisingData,
100 session_key: bytes,
101 device_address: bytes,
102 associated_data: bytes | None = None,
103) -> EADDecryptResult:
104 """Decrypt BLE Encrypted Advertising Data.
106 Performs AES-CCM decryption per Bluetooth Core Spec Supplement Section 1.23.
107 This function never raises exceptions on decryption failure; instead,
108 it returns a result with appropriate error information.
110 Args:
111 encrypted_data: Parsed EAD structure containing randomizer,
112 encrypted payload, and MIC
113 session_key: 16-byte AES-128 session key
114 device_address: 6-byte BLE device address (little-endian bytes)
115 associated_data: Optional additional authenticated data (AAD).
116 Must match the AAD used during encryption.
118 Returns:
119 EADDecryptResult with success status and plaintext or error details
121 Example:
122 >>> ead = EncryptedAdvertisingData.from_bytes(raw_advertisement)
123 >>> result = decrypt_ead(ead, session_key, device_address)
124 >>> if result.success:
125 ... process_sensor_data(result.plaintext)
126 ... elif result.error_type == EADError.INVALID_KEY:
127 ... logger.warning("Incorrect encryption key for device")
128 """
129 # Validate session key
130 if len(session_key) != EAD_KEY_SIZE:
131 return EADDecryptResult(
132 success=False,
133 error=f"Session key must be {EAD_KEY_SIZE} bytes, got {len(session_key)}",
134 error_type=EADError.CORRUPTED_DATA,
135 )
137 # Validate device address
138 if len(device_address) != EAD_ADDRESS_SIZE:
139 return EADDecryptResult(
140 success=False,
141 error=f"Device address must be {EAD_ADDRESS_SIZE} bytes, got {len(device_address)}",
142 error_type=EADError.CORRUPTED_DATA,
143 )
145 try:
146 # Build nonce and cipher
147 nonce = build_ead_nonce(encrypted_data.randomizer, device_address)
148 cipher = _get_aesccm_cipher(session_key)
150 # AES-CCM expects ciphertext with MIC appended
151 ciphertext_with_mic = encrypted_data.encrypted_payload + encrypted_data.mic
153 # Decrypt and verify
154 plaintext = cipher.decrypt(nonce, ciphertext_with_mic, associated_data)
156 return EADDecryptResult(success=True, plaintext=plaintext)
158 except InvalidTag:
159 # MIC verification failed - wrong key or corrupted data
160 logger.debug("EAD decryption failed: MIC verification failed")
161 return EADDecryptResult(
162 success=False,
163 error="MIC verification failed - incorrect key or corrupted data",
164 error_type=EADError.INVALID_KEY,
165 )
166 except ValueError as err:
167 # Invalid parameters
168 logger.debug("EAD decryption failed: %s", err)
169 return EADDecryptResult(
170 success=False,
171 error=str(err),
172 error_type=EADError.CORRUPTED_DATA,
173 )
176def decrypt_ead_from_raw(
177 raw_ead_data: bytes,
178 session_key: bytes,
179 mac_address: str,
180 associated_data: bytes | None = None,
181) -> EADDecryptResult:
182 """Convenience function to decrypt raw EAD bytes.
184 Combines parsing and decryption in a single call for simpler usage.
186 Args:
187 raw_ead_data: Raw EAD advertisement bytes (AD Type 0x31 payload)
188 session_key: 16-byte AES-128 session key
189 mac_address: Device MAC address string (e.g., "AA:BB:CC:DD:EE:FF")
190 associated_data: Optional additional authenticated data
192 Returns:
193 EADDecryptResult with success status and plaintext or error details
195 Example:
196 >>> result = decrypt_ead_from_raw(
197 ... raw_ead_data=advertisement_payload,
198 ... session_key=bytes.fromhex("0123456789abcdef0123456789abcdef"),
199 ... mac_address="AA:BB:CC:DD:EE:FF",
200 ... )
201 >>> if result.success:
202 ... print(f"Decrypted: {result.plaintext.hex()}")
203 """
204 # Parse raw bytes
205 if len(raw_ead_data) < EAD_MIN_SIZE:
206 return EADDecryptResult(
207 success=False,
208 error=(f"EAD data too short: {len(raw_ead_data)} bytes, minimum {EAD_MIN_SIZE} required"),
209 error_type=EADError.INSUFFICIENT_DATA,
210 )
212 try:
213 encrypted_data = EncryptedAdvertisingData.from_bytes(raw_ead_data)
214 device_address = mac_address_to_bytes(mac_address)
215 except ValueError as err:
216 return EADDecryptResult(
217 success=False,
218 error=str(err),
219 error_type=EADError.CORRUPTED_DATA,
220 )
222 return decrypt_ead(encrypted_data, session_key, device_address, associated_data)
225class EADDecryptor:
226 """Stateful EAD decryptor with cipher caching and key provider support.
228 For one-off decryption, use the module-level `decrypt_ead()` function.
229 For repeated decryption with the same key, this class caches the AESCCM
230 cipher instance for better performance.
232 This class also integrates with `EADKeyProvider` for automatic key lookup
233 by MAC address.
235 Use the factory methods `from_key()` or `from_provider()` to create instances.
237 Attributes:
238 _key_provider: Optional key provider for MAC-based key lookup
239 _static_key: Static session key (used if no provider)
240 _cipher_cache: Cached cipher instances keyed by session key
242 Example with static key:
243 >>> from bluetooth_sig.advertising import EADDecryptor
244 >>>
245 >>> decryptor = EADDecryptor.from_key(bytes.fromhex("0123456789abcdef0123456789abcdef"))
246 >>> result = decryptor.decrypt(raw_ead_data, "AA:BB:CC:DD:EE:FF")
247 >>> if result.success:
248 ... print(result.plaintext)
250 Example with key provider:
251 >>> from bluetooth_sig.advertising import EADDecryptor, DictKeyProvider
252 >>> from bluetooth_sig.types.ead import EADKeyMaterial
253 >>>
254 >>> provider = DictKeyProvider()
255 >>> provider.set_ead_key(
256 ... "AA:BB:CC:DD:EE:FF",
257 ... EADKeyMaterial(
258 ... session_key=bytes.fromhex("0123456789abcdef0123456789abcdef"),
259 ... iv=bytes.fromhex("0102030405060708"),
260 ... ),
261 ... )
262 >>> decryptor = EADDecryptor.from_provider(provider)
263 >>> result = decryptor.decrypt(raw_ead_data, "AA:BB:CC:DD:EE:FF")
264 """
266 def __init__(
267 self,
268 *,
269 _key_provider: EADKeyProvider | None = None,
270 _static_key: bytes | None = None,
271 ) -> None:
272 """Private constructor. Use `from_key()` or `from_provider()` instead."""
273 self._key_provider = _key_provider
274 self._static_key = _static_key
275 self._cipher_cache: dict[bytes, AESCCM] = {}
277 @classmethod
278 def from_key(cls, session_key: bytes) -> EADDecryptor:
279 """Create decryptor with a static session key.
281 Args:
282 session_key: 16-byte AES-128 session key
284 Returns:
285 Configured EADDecryptor instance
287 Raises:
288 ValueError: If session_key is not 16 bytes
290 Example:
291 >>> decryptor = EADDecryptor.from_key(bytes.fromhex("0123456789abcdef0123456789abcdef"))
292 """
293 if len(session_key) != EAD_KEY_SIZE:
294 msg = f"Session key must be {EAD_KEY_SIZE} bytes, got {len(session_key)}"
295 raise ValueError(msg)
296 return cls(_static_key=session_key)
298 @classmethod
299 def from_provider(cls, key_provider: EADKeyProvider) -> EADDecryptor:
300 """Create decryptor with a key provider for MAC-based lookup.
302 Args:
303 key_provider: Provider that looks up keys by MAC address
305 Returns:
306 Configured EADDecryptor instance
308 Example:
309 >>> provider = DictKeyProvider()
310 >>> provider.set_ead_key("AA:BB:CC:DD:EE:FF", key_material)
311 >>> decryptor = EADDecryptor.from_provider(provider)
312 """
313 return cls(_key_provider=key_provider)
315 def _get_cached_cipher(self, session_key: bytes) -> AESCCM:
316 """Get or create a cached cipher for the session key.
318 Args:
319 session_key: 16-byte AES session key
321 Returns:
322 AESCCM cipher instance
323 """
324 if session_key not in self._cipher_cache:
325 self._cipher_cache[session_key] = _get_aesccm_cipher(session_key)
326 return self._cipher_cache[session_key]
328 def decrypt(
329 self,
330 raw_ead_data: bytes,
331 mac_address: str,
332 associated_data: bytes | None = None,
333 ) -> EADDecryptResult:
334 """Decrypt EAD data using cached cipher.
336 Looks up the session key from the key provider (if configured)
337 or uses the static key. Caches the cipher instance for reuse.
339 Args:
340 raw_ead_data: Raw EAD advertisement bytes (AD Type 0x31 payload)
341 mac_address: Device MAC address (e.g., "AA:BB:CC:DD:EE:FF")
342 associated_data: Optional additional authenticated data (AAD)
344 Returns:
345 EADDecryptResult with success status and plaintext or error details
346 """
347 # Determine session key
348 if self._key_provider is not None:
349 key_material = self._key_provider.get_ead_key(mac_address)
350 if key_material is None:
351 return EADDecryptResult(
352 success=False,
353 error=f"No EAD key available for {mac_address}",
354 error_type=EADError.NO_KEY_AVAILABLE,
355 )
356 session_key = key_material.session_key
357 else:
358 # Static key guaranteed non-None by __init__ validation
359 session_key = self._static_key # type: ignore[assignment]
361 # Parse raw data
362 if len(raw_ead_data) < EAD_MIN_SIZE:
363 return EADDecryptResult(
364 success=False,
365 error=f"EAD data too short: {len(raw_ead_data)} bytes, minimum {EAD_MIN_SIZE} required",
366 error_type=EADError.INSUFFICIENT_DATA,
367 )
369 try:
370 encrypted_data = EncryptedAdvertisingData.from_bytes(raw_ead_data)
371 device_address = mac_address_to_bytes(mac_address)
372 except ValueError as err:
373 return EADDecryptResult(
374 success=False,
375 error=str(err),
376 error_type=EADError.CORRUPTED_DATA,
377 )
379 try:
380 # Get cached cipher
381 cipher = self._get_cached_cipher(session_key)
383 # Build nonce and decrypt
384 nonce = build_ead_nonce(encrypted_data.randomizer, device_address)
385 ciphertext_with_mic = encrypted_data.encrypted_payload + encrypted_data.mic
386 plaintext = cipher.decrypt(nonce, ciphertext_with_mic, associated_data)
388 return EADDecryptResult(success=True, plaintext=plaintext)
390 except InvalidTag:
391 logger.debug("EAD decryption failed: MIC verification failed")
392 return EADDecryptResult(
393 success=False,
394 error="MIC verification failed - incorrect key or corrupted data",
395 error_type=EADError.INVALID_KEY,
396 )
397 except ValueError as err:
398 logger.debug("EAD decryption failed: %s", err)
399 return EADDecryptResult(
400 success=False,
401 error=str(err),
402 error_type=EADError.CORRUPTED_DATA,
403 )
405 def clear_cache(self) -> None:
406 """Clear the cipher cache.
408 Call this if you need to free memory or if keys have been rotated.
409 """
410 self._cipher_cache.clear()