Coverage for src / bluetooth_sig / advertising / ead_decryptor.py: 92%

99 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 11:17 +0000

1"""Pure decryption functions for BLE Encrypted Advertising Data (EAD). 

2 

3This module provides framework-agnostic decryption functions for EAD 

4per Bluetooth Core Spec Supplement Section 1.23. 

5 

6The decryption uses AES-CCM with: 

7- 128-bit (16-byte) session key 

8- 13-byte nonce (5-byte randomizer + 6-byte device address + 2-byte padding) 

9- 4-byte MIC (authentication tag) 

10 

11Requires the 'cryptography' package: pip install bluetooth-sig[ead] 

12""" 

13 

14from __future__ import annotations 

15 

16import logging 

17 

18from cryptography.exceptions import InvalidTag 

19from cryptography.hazmat.primitives.ciphers.aead import AESCCM 

20 

21from bluetooth_sig.advertising.encryption import EADKeyProvider 

22from bluetooth_sig.types.address import mac_address_to_bytes 

23from bluetooth_sig.types.ead import ( 

24 EAD_MIN_SIZE, 

25 EAD_RANDOMIZER_SIZE, 

26 EADDecryptResult, 

27 EADError, 

28 EncryptedAdvertisingData, 

29) 

30 

31logger = logging.getLogger(__name__) 

32 

33# EAD cryptographic constants per Bluetooth Core Spec Supplement 

34EAD_KEY_SIZE: int = 16 # 128-bit AES key 

35EAD_NONCE_SIZE: int = 13 # Randomizer(5) + Address(6) + Padding(2) 

36EAD_MIC_SIZE: int = 4 # 4-byte authentication tag 

37EAD_ADDRESS_SIZE: int = 6 # BLE device address size 

38 

39 

40def _get_aesccm_cipher(session_key: bytes) -> AESCCM: 

41 """Create an AESCCM cipher instance. 

42 

43 Args: 

44 session_key: 16-byte AES-128 session key 

45 

46 Returns: 

47 AESCCM cipher instance configured for EAD (4-byte tag) 

48 

49 Raises: 

50 ValueError: If session_key is not 16 bytes 

51 """ 

52 if len(session_key) != EAD_KEY_SIZE: 

53 msg = f"Session key must be {EAD_KEY_SIZE} bytes, got {len(session_key)}" 

54 raise ValueError(msg) 

55 

56 return AESCCM(session_key, tag_length=EAD_MIC_SIZE) 

57 

58 

59def build_ead_nonce( 

60 randomizer: bytes, 

61 device_address: bytes, 

62) -> bytes: 

63 """Build the 13-byte nonce for EAD decryption. 

64 

65 The nonce is constructed as: 

66 Randomizer[5] + DeviceAddress[6] + Padding[2] 

67 

68 Args: 

69 randomizer: 5-byte randomizer from EAD advertisement 

70 device_address: 6-byte BLE device address (little-endian) 

71 

72 Returns: 

73 13-byte nonce for AES-CCM decryption 

74 

75 Raises: 

76 ValueError: If randomizer is not 5 bytes or address is not 6 bytes 

77 

78 Example:: 

79 >>> randomizer = bytes.fromhex("0102030405") 

80 >>> address = bytes.fromhex("aabbccddeeff") 

81 >>> nonce = build_ead_nonce(randomizer, address) 

82 >>> len(nonce) 

83 13 

84 """ 

85 if len(randomizer) != EAD_RANDOMIZER_SIZE: 

86 msg = f"Randomizer must be {EAD_RANDOMIZER_SIZE} bytes, got {len(randomizer)}" 

87 raise ValueError(msg) 

88 

89 if len(device_address) != EAD_ADDRESS_SIZE: 

90 msg = f"Device address must be {EAD_ADDRESS_SIZE} bytes, got {len(device_address)}" 

91 raise ValueError(msg) 

92 

93 # Nonce structure: Randomizer(5 bytes) + Address(6 bytes) + Padding(2 bytes) 

94 padding = b"\x00\x00" 

95 return randomizer + device_address + padding 

96 

97 

98def decrypt_ead( # pylint: disable=too-many-return-statements 

99 encrypted_data: EncryptedAdvertisingData, 

100 session_key: bytes, 

101 device_address: bytes, 

102 associated_data: bytes | None = None, 

103) -> EADDecryptResult: 

104 """Decrypt BLE Encrypted Advertising Data. 

105 

106 Performs AES-CCM decryption per Bluetooth Core Spec Supplement Section 1.23. 

107 This function never raises exceptions on decryption failure; instead, 

108 it returns a result with appropriate error information. 

109 

110 Args: 

111 encrypted_data: Parsed EAD structure containing randomizer, 

112 encrypted payload, and MIC 

113 session_key: 16-byte AES-128 session key 

114 device_address: 6-byte BLE device address (little-endian bytes) 

115 associated_data: Optional additional authenticated data (AAD). 

116 Must match the AAD used during encryption. 

117 

118 Returns: 

119 EADDecryptResult with success status and plaintext or error details 

120 

121 Example:: 

122 >>> ead = EncryptedAdvertisingData.from_bytes(raw_advertisement) 

123 >>> result = decrypt_ead(ead, session_key, device_address) 

124 >>> if result.success: 

125 ... process_sensor_data(result.plaintext) 

126 ... elif result.error_type == EADError.INVALID_KEY: 

127 ... logger.warning("Incorrect encryption key for device") 

128 """ 

129 # Validate session key 

130 if len(session_key) != EAD_KEY_SIZE: 

131 return EADDecryptResult( 

132 success=False, 

133 error=f"Session key must be {EAD_KEY_SIZE} bytes, got {len(session_key)}", 

134 error_type=EADError.CORRUPTED_DATA, 

135 ) 

136 

137 # Validate device address 

138 if len(device_address) != EAD_ADDRESS_SIZE: 

139 return EADDecryptResult( 

140 success=False, 

141 error=f"Device address must be {EAD_ADDRESS_SIZE} bytes, got {len(device_address)}", 

142 error_type=EADError.CORRUPTED_DATA, 

143 ) 

144 

145 try: 

146 # Build nonce and cipher 

147 nonce = build_ead_nonce(encrypted_data.randomizer, device_address) 

148 cipher = _get_aesccm_cipher(session_key) 

149 

150 # AES-CCM expects ciphertext with MIC appended 

151 ciphertext_with_mic = encrypted_data.encrypted_payload + encrypted_data.mic 

152 

153 # Decrypt and verify 

154 plaintext = cipher.decrypt(nonce, ciphertext_with_mic, associated_data) 

155 

156 return EADDecryptResult(success=True, plaintext=plaintext) 

157 

158 except InvalidTag: 

159 # MIC verification failed - wrong key or corrupted data 

160 logger.debug("EAD decryption failed: MIC verification failed") 

161 return EADDecryptResult( 

162 success=False, 

163 error="MIC verification failed - incorrect key or corrupted data", 

164 error_type=EADError.INVALID_KEY, 

165 ) 

166 except ValueError as err: 

167 # Invalid parameters 

168 logger.debug("EAD decryption failed: %s", err) 

169 return EADDecryptResult( 

170 success=False, 

171 error=str(err), 

172 error_type=EADError.CORRUPTED_DATA, 

173 ) 

174 

175 

176def decrypt_ead_from_raw( 

177 raw_ead_data: bytes, 

178 session_key: bytes, 

179 mac_address: str, 

180 associated_data: bytes | None = None, 

181) -> EADDecryptResult: 

182 """Convenience function to decrypt raw EAD bytes. 

183 

184 Combines parsing and decryption in a single call for simpler usage. 

185 

186 Args: 

187 raw_ead_data: Raw EAD advertisement bytes (AD Type 0x31 payload) 

188 session_key: 16-byte AES-128 session key 

189 mac_address: Device MAC address string (e.g., "AA:BB:CC:DD:EE:FF") 

190 associated_data: Optional additional authenticated data 

191 

192 Returns: 

193 EADDecryptResult with success status and plaintext or error details 

194 

195 Example:: 

196 >>> result = decrypt_ead_from_raw( 

197 ... raw_ead_data=advertisement_payload, 

198 ... session_key=bytes.fromhex("0123456789abcdef0123456789abcdef"), 

199 ... mac_address="AA:BB:CC:DD:EE:FF", 

200 ... ) 

201 >>> if result.success: 

202 ... print(f"Decrypted: {result.plaintext.hex()}") 

203 """ 

204 # Parse raw bytes 

205 if len(raw_ead_data) < EAD_MIN_SIZE: 

206 return EADDecryptResult( 

207 success=False, 

208 error=(f"EAD data too short: {len(raw_ead_data)} bytes, minimum {EAD_MIN_SIZE} required"), 

209 error_type=EADError.INSUFFICIENT_DATA, 

210 ) 

211 

212 try: 

213 encrypted_data = EncryptedAdvertisingData.from_bytes(raw_ead_data) 

214 device_address = mac_address_to_bytes(mac_address) 

215 except ValueError as err: 

216 return EADDecryptResult( 

217 success=False, 

218 error=str(err), 

219 error_type=EADError.CORRUPTED_DATA, 

220 ) 

221 

222 return decrypt_ead(encrypted_data, session_key, device_address, associated_data) 

223 

224 

225class EADDecryptor: 

226 """Stateful EAD decryptor with cipher caching and key provider support. 

227 

228 For one-off decryption, use the module-level `decrypt_ead()` function. 

229 For repeated decryption with the same key, this class caches the AESCCM 

230 cipher instance for better performance. 

231 

232 This class also integrates with `EADKeyProvider` for automatic key lookup 

233 by MAC address. 

234 

235 Use the factory methods `from_key()` or `from_provider()` to create instances. 

236 

237 Attributes: 

238 _key_provider: Optional key provider for MAC-based key lookup 

239 _static_key: Static session key (used if no provider) 

240 _cipher_cache: Cached cipher instances keyed by session key 

241 

242 Example with static key: 

243 >>> from bluetooth_sig.advertising import EADDecryptor 

244 >>> 

245 >>> decryptor = EADDecryptor.from_key(bytes.fromhex("0123456789abcdef0123456789abcdef")) 

246 >>> result = decryptor.decrypt(raw_ead_data, "AA:BB:CC:DD:EE:FF") 

247 >>> if result.success: 

248 ... print(result.plaintext) 

249 

250 Example with key provider: 

251 >>> from bluetooth_sig.advertising import EADDecryptor, DictKeyProvider 

252 >>> from bluetooth_sig.types.ead import EADKeyMaterial 

253 >>> 

254 >>> provider = DictKeyProvider() 

255 >>> provider.set_ead_key( 

256 ... "AA:BB:CC:DD:EE:FF", 

257 ... EADKeyMaterial( 

258 ... session_key=bytes.fromhex("0123456789abcdef0123456789abcdef"), 

259 ... iv=bytes.fromhex("0102030405060708"), 

260 ... ), 

261 ... ) 

262 >>> decryptor = EADDecryptor.from_provider(provider) 

263 >>> result = decryptor.decrypt(raw_ead_data, "AA:BB:CC:DD:EE:FF") 

264 """ 

265 

266 def __init__( 

267 self, 

268 *, 

269 _key_provider: EADKeyProvider | None = None, 

270 _static_key: bytes | None = None, 

271 ) -> None: 

272 """Private constructor. Use `from_key()` or `from_provider()` instead.""" 

273 self._key_provider = _key_provider 

274 self._static_key = _static_key 

275 self._cipher_cache: dict[bytes, AESCCM] = {} 

276 

277 @classmethod 

278 def from_key(cls, session_key: bytes) -> EADDecryptor: 

279 """Create decryptor with a static session key. 

280 

281 Args: 

282 session_key: 16-byte AES-128 session key 

283 

284 Returns: 

285 Configured EADDecryptor instance 

286 

287 Raises: 

288 ValueError: If session_key is not 16 bytes 

289 

290 Example:: 

291 >>> decryptor = EADDecryptor.from_key(bytes.fromhex("0123456789abcdef0123456789abcdef")) 

292 """ 

293 if len(session_key) != EAD_KEY_SIZE: 

294 msg = f"Session key must be {EAD_KEY_SIZE} bytes, got {len(session_key)}" 

295 raise ValueError(msg) 

296 return cls(_static_key=session_key) 

297 

298 @classmethod 

299 def from_provider(cls, key_provider: EADKeyProvider) -> EADDecryptor: 

300 """Create decryptor with a key provider for MAC-based lookup. 

301 

302 Args: 

303 key_provider: Provider that looks up keys by MAC address 

304 

305 Returns: 

306 Configured EADDecryptor instance 

307 

308 Example:: 

309 >>> provider = DictKeyProvider() 

310 >>> provider.set_ead_key("AA:BB:CC:DD:EE:FF", key_material) 

311 >>> decryptor = EADDecryptor.from_provider(provider) 

312 """ 

313 return cls(_key_provider=key_provider) 

314 

315 def _get_cached_cipher(self, session_key: bytes) -> AESCCM: 

316 """Get or create a cached cipher for the session key. 

317 

318 Args: 

319 session_key: 16-byte AES session key 

320 

321 Returns: 

322 AESCCM cipher instance 

323 """ 

324 if session_key not in self._cipher_cache: 

325 self._cipher_cache[session_key] = _get_aesccm_cipher(session_key) 

326 return self._cipher_cache[session_key] 

327 

328 def decrypt( 

329 self, 

330 raw_ead_data: bytes, 

331 mac_address: str, 

332 associated_data: bytes | None = None, 

333 ) -> EADDecryptResult: 

334 """Decrypt EAD data using cached cipher. 

335 

336 Looks up the session key from the key provider (if configured) 

337 or uses the static key. Caches the cipher instance for reuse. 

338 

339 Args: 

340 raw_ead_data: Raw EAD advertisement bytes (AD Type 0x31 payload) 

341 mac_address: Device MAC address (e.g., "AA:BB:CC:DD:EE:FF") 

342 associated_data: Optional additional authenticated data (AAD) 

343 

344 Returns: 

345 EADDecryptResult with success status and plaintext or error details 

346 """ 

347 # Determine session key 

348 if self._key_provider is not None: 

349 key_material = self._key_provider.get_ead_key(mac_address) 

350 if key_material is None: 

351 return EADDecryptResult( 

352 success=False, 

353 error=f"No EAD key available for {mac_address}", 

354 error_type=EADError.NO_KEY_AVAILABLE, 

355 ) 

356 session_key = key_material.session_key 

357 else: 

358 # Static key guaranteed non-None by __init__ validation 

359 assert self._static_key is not None, "EADDecryptor requires either a key provider or static key" 

360 session_key = self._static_key 

361 

362 # Parse raw data 

363 if len(raw_ead_data) < EAD_MIN_SIZE: 

364 return EADDecryptResult( 

365 success=False, 

366 error=f"EAD data too short: {len(raw_ead_data)} bytes, minimum {EAD_MIN_SIZE} required", 

367 error_type=EADError.INSUFFICIENT_DATA, 

368 ) 

369 

370 try: 

371 encrypted_data = EncryptedAdvertisingData.from_bytes(raw_ead_data) 

372 device_address = mac_address_to_bytes(mac_address) 

373 except ValueError as err: 

374 return EADDecryptResult( 

375 success=False, 

376 error=str(err), 

377 error_type=EADError.CORRUPTED_DATA, 

378 ) 

379 

380 try: 

381 # Get cached cipher 

382 cipher = self._get_cached_cipher(session_key) 

383 

384 # Build nonce and decrypt 

385 nonce = build_ead_nonce(encrypted_data.randomizer, device_address) 

386 ciphertext_with_mic = encrypted_data.encrypted_payload + encrypted_data.mic 

387 plaintext = cipher.decrypt(nonce, ciphertext_with_mic, associated_data) 

388 

389 return EADDecryptResult(success=True, plaintext=plaintext) 

390 

391 except InvalidTag: 

392 logger.debug("EAD decryption failed: MIC verification failed") 

393 return EADDecryptResult( 

394 success=False, 

395 error="MIC verification failed - incorrect key or corrupted data", 

396 error_type=EADError.INVALID_KEY, 

397 ) 

398 except ValueError as err: 

399 logger.debug("EAD decryption failed: %s", err) 

400 return EADDecryptResult( 

401 success=False, 

402 error=str(err), 

403 error_type=EADError.CORRUPTED_DATA, 

404 ) 

405 

406 def clear_cache(self) -> None: 

407 """Clear the cipher cache. 

408 

409 Call this if you need to free memory or if keys have been rotated. 

410 """ 

411 self._cipher_cache.clear()