Coverage for src / bluetooth_sig / advertising / ead_decryptor.py: 92%

98 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 20:14 +0000

1"""Pure decryption functions for BLE Encrypted Advertising Data (EAD). 

2 

3This module provides framework-agnostic decryption functions for EAD 

4per Bluetooth Core Spec Supplement Section 1.23. 

5 

6The decryption uses AES-CCM with: 

7- 128-bit (16-byte) session key 

8- 13-byte nonce (5-byte randomizer + 6-byte device address + 2-byte padding) 

9- 4-byte MIC (authentication tag) 

10 

11Requires the 'cryptography' package: pip install bluetooth-sig[ead] 

12""" 

13 

14from __future__ import annotations 

15 

16import logging 

17 

18from cryptography.exceptions import InvalidTag 

19from cryptography.hazmat.primitives.ciphers.aead import AESCCM 

20 

21from bluetooth_sig.advertising.encryption import EADKeyProvider 

22from bluetooth_sig.types.address import mac_address_to_bytes 

23from bluetooth_sig.types.ead import ( 

24 EAD_MIN_SIZE, 

25 EAD_RANDOMIZER_SIZE, 

26 EADDecryptResult, 

27 EADError, 

28 EncryptedAdvertisingData, 

29) 

30 

31logger = logging.getLogger(__name__) 

32 

33# EAD cryptographic constants per Bluetooth Core Spec Supplement 

34EAD_KEY_SIZE: int = 16 # 128-bit AES key 

35EAD_NONCE_SIZE: int = 13 # Randomizer(5) + Address(6) + Padding(2) 

36EAD_MIC_SIZE: int = 4 # 4-byte authentication tag 

37EAD_ADDRESS_SIZE: int = 6 # BLE device address size 

38 

39 

40def _get_aesccm_cipher(session_key: bytes) -> AESCCM: 

41 """Create an AESCCM cipher instance. 

42 

43 Args: 

44 session_key: 16-byte AES-128 session key 

45 

46 Returns: 

47 AESCCM cipher instance configured for EAD (4-byte tag) 

48 

49 Raises: 

50 ValueError: If session_key is not 16 bytes 

51 """ 

52 if len(session_key) != EAD_KEY_SIZE: 

53 msg = f"Session key must be {EAD_KEY_SIZE} bytes, got {len(session_key)}" 

54 raise ValueError(msg) 

55 

56 return AESCCM(session_key, tag_length=EAD_MIC_SIZE) 

57 

58 

59def build_ead_nonce( 

60 randomizer: bytes, 

61 device_address: bytes, 

62) -> bytes: 

63 """Build the 13-byte nonce for EAD decryption. 

64 

65 The nonce is constructed as: 

66 Randomizer[5] + DeviceAddress[6] + Padding[2] 

67 

68 Args: 

69 randomizer: 5-byte randomizer from EAD advertisement 

70 device_address: 6-byte BLE device address (little-endian) 

71 

72 Returns: 

73 13-byte nonce for AES-CCM decryption 

74 

75 Raises: 

76 ValueError: If randomizer is not 5 bytes or address is not 6 bytes 

77 

78 Example: 

79 >>> randomizer = bytes.fromhex("0102030405") 

80 >>> address = bytes.fromhex("aabbccddeeff") 

81 >>> nonce = build_ead_nonce(randomizer, address) 

82 >>> len(nonce) 

83 13 

84 """ 

85 if len(randomizer) != EAD_RANDOMIZER_SIZE: 

86 msg = f"Randomizer must be {EAD_RANDOMIZER_SIZE} bytes, got {len(randomizer)}" 

87 raise ValueError(msg) 

88 

89 if len(device_address) != EAD_ADDRESS_SIZE: 

90 msg = f"Device address must be {EAD_ADDRESS_SIZE} bytes, got {len(device_address)}" 

91 raise ValueError(msg) 

92 

93 # Nonce: Randomizer(5) + Address(6) + Padding(2) 

94 padding = b"\x00\x00" 

95 return randomizer + device_address + padding 

96 

97 

98def decrypt_ead( # pylint: disable=too-many-return-statements 

99 encrypted_data: EncryptedAdvertisingData, 

100 session_key: bytes, 

101 device_address: bytes, 

102 associated_data: bytes | None = None, 

103) -> EADDecryptResult: 

104 """Decrypt BLE Encrypted Advertising Data. 

105 

106 Performs AES-CCM decryption per Bluetooth Core Spec Supplement Section 1.23. 

107 This function never raises exceptions on decryption failure; instead, 

108 it returns a result with appropriate error information. 

109 

110 Args: 

111 encrypted_data: Parsed EAD structure containing randomizer, 

112 encrypted payload, and MIC 

113 session_key: 16-byte AES-128 session key 

114 device_address: 6-byte BLE device address (little-endian bytes) 

115 associated_data: Optional additional authenticated data (AAD). 

116 Must match the AAD used during encryption. 

117 

118 Returns: 

119 EADDecryptResult with success status and plaintext or error details 

120 

121 Example: 

122 >>> ead = EncryptedAdvertisingData.from_bytes(raw_advertisement) 

123 >>> result = decrypt_ead(ead, session_key, device_address) 

124 >>> if result.success: 

125 ... process_sensor_data(result.plaintext) 

126 ... elif result.error_type == EADError.INVALID_KEY: 

127 ... logger.warning("Incorrect encryption key for device") 

128 """ 

129 # Validate session key 

130 if len(session_key) != EAD_KEY_SIZE: 

131 return EADDecryptResult( 

132 success=False, 

133 error=f"Session key must be {EAD_KEY_SIZE} bytes, got {len(session_key)}", 

134 error_type=EADError.CORRUPTED_DATA, 

135 ) 

136 

137 # Validate device address 

138 if len(device_address) != EAD_ADDRESS_SIZE: 

139 return EADDecryptResult( 

140 success=False, 

141 error=f"Device address must be {EAD_ADDRESS_SIZE} bytes, got {len(device_address)}", 

142 error_type=EADError.CORRUPTED_DATA, 

143 ) 

144 

145 try: 

146 # Build nonce and cipher 

147 nonce = build_ead_nonce(encrypted_data.randomizer, device_address) 

148 cipher = _get_aesccm_cipher(session_key) 

149 

150 # AES-CCM expects ciphertext with MIC appended 

151 ciphertext_with_mic = encrypted_data.encrypted_payload + encrypted_data.mic 

152 

153 # Decrypt and verify 

154 plaintext = cipher.decrypt(nonce, ciphertext_with_mic, associated_data) 

155 

156 return EADDecryptResult(success=True, plaintext=plaintext) 

157 

158 except InvalidTag: 

159 # MIC verification failed - wrong key or corrupted data 

160 logger.debug("EAD decryption failed: MIC verification failed") 

161 return EADDecryptResult( 

162 success=False, 

163 error="MIC verification failed - incorrect key or corrupted data", 

164 error_type=EADError.INVALID_KEY, 

165 ) 

166 except ValueError as err: 

167 # Invalid parameters 

168 logger.debug("EAD decryption failed: %s", err) 

169 return EADDecryptResult( 

170 success=False, 

171 error=str(err), 

172 error_type=EADError.CORRUPTED_DATA, 

173 ) 

174 

175 

176def decrypt_ead_from_raw( 

177 raw_ead_data: bytes, 

178 session_key: bytes, 

179 mac_address: str, 

180 associated_data: bytes | None = None, 

181) -> EADDecryptResult: 

182 """Convenience function to decrypt raw EAD bytes. 

183 

184 Combines parsing and decryption in a single call for simpler usage. 

185 

186 Args: 

187 raw_ead_data: Raw EAD advertisement bytes (AD Type 0x31 payload) 

188 session_key: 16-byte AES-128 session key 

189 mac_address: Device MAC address string (e.g., "AA:BB:CC:DD:EE:FF") 

190 associated_data: Optional additional authenticated data 

191 

192 Returns: 

193 EADDecryptResult with success status and plaintext or error details 

194 

195 Example: 

196 >>> result = decrypt_ead_from_raw( 

197 ... raw_ead_data=advertisement_payload, 

198 ... session_key=bytes.fromhex("0123456789abcdef0123456789abcdef"), 

199 ... mac_address="AA:BB:CC:DD:EE:FF", 

200 ... ) 

201 >>> if result.success: 

202 ... print(f"Decrypted: {result.plaintext.hex()}") 

203 """ 

204 # Parse raw bytes 

205 if len(raw_ead_data) < EAD_MIN_SIZE: 

206 return EADDecryptResult( 

207 success=False, 

208 error=(f"EAD data too short: {len(raw_ead_data)} bytes, minimum {EAD_MIN_SIZE} required"), 

209 error_type=EADError.INSUFFICIENT_DATA, 

210 ) 

211 

212 try: 

213 encrypted_data = EncryptedAdvertisingData.from_bytes(raw_ead_data) 

214 device_address = mac_address_to_bytes(mac_address) 

215 except ValueError as err: 

216 return EADDecryptResult( 

217 success=False, 

218 error=str(err), 

219 error_type=EADError.CORRUPTED_DATA, 

220 ) 

221 

222 return decrypt_ead(encrypted_data, session_key, device_address, associated_data) 

223 

224 

225class EADDecryptor: 

226 """Stateful EAD decryptor with cipher caching and key provider support. 

227 

228 For one-off decryption, use the module-level `decrypt_ead()` function. 

229 For repeated decryption with the same key, this class caches the AESCCM 

230 cipher instance for better performance. 

231 

232 This class also integrates with `EADKeyProvider` for automatic key lookup 

233 by MAC address. 

234 

235 Use the factory methods `from_key()` or `from_provider()` to create instances. 

236 

237 Attributes: 

238 _key_provider: Optional key provider for MAC-based key lookup 

239 _static_key: Static session key (used if no provider) 

240 _cipher_cache: Cached cipher instances keyed by session key 

241 

242 Example with static key: 

243 >>> from bluetooth_sig.advertising import EADDecryptor 

244 >>> 

245 >>> decryptor = EADDecryptor.from_key(bytes.fromhex("0123456789abcdef0123456789abcdef")) 

246 >>> result = decryptor.decrypt(raw_ead_data, "AA:BB:CC:DD:EE:FF") 

247 >>> if result.success: 

248 ... print(result.plaintext) 

249 

250 Example with key provider: 

251 >>> from bluetooth_sig.advertising import EADDecryptor, DictKeyProvider 

252 >>> from bluetooth_sig.types.ead import EADKeyMaterial 

253 >>> 

254 >>> provider = DictKeyProvider() 

255 >>> provider.set_ead_key( 

256 ... "AA:BB:CC:DD:EE:FF", 

257 ... EADKeyMaterial( 

258 ... session_key=bytes.fromhex("0123456789abcdef0123456789abcdef"), 

259 ... iv=bytes.fromhex("0102030405060708"), 

260 ... ), 

261 ... ) 

262 >>> decryptor = EADDecryptor.from_provider(provider) 

263 >>> result = decryptor.decrypt(raw_ead_data, "AA:BB:CC:DD:EE:FF") 

264 """ 

265 

266 def __init__( 

267 self, 

268 *, 

269 _key_provider: EADKeyProvider | None = None, 

270 _static_key: bytes | None = None, 

271 ) -> None: 

272 """Private constructor. Use `from_key()` or `from_provider()` instead.""" 

273 self._key_provider = _key_provider 

274 self._static_key = _static_key 

275 self._cipher_cache: dict[bytes, AESCCM] = {} 

276 

277 @classmethod 

278 def from_key(cls, session_key: bytes) -> EADDecryptor: 

279 """Create decryptor with a static session key. 

280 

281 Args: 

282 session_key: 16-byte AES-128 session key 

283 

284 Returns: 

285 Configured EADDecryptor instance 

286 

287 Raises: 

288 ValueError: If session_key is not 16 bytes 

289 

290 Example: 

291 >>> decryptor = EADDecryptor.from_key(bytes.fromhex("0123456789abcdef0123456789abcdef")) 

292 """ 

293 if len(session_key) != EAD_KEY_SIZE: 

294 msg = f"Session key must be {EAD_KEY_SIZE} bytes, got {len(session_key)}" 

295 raise ValueError(msg) 

296 return cls(_static_key=session_key) 

297 

298 @classmethod 

299 def from_provider(cls, key_provider: EADKeyProvider) -> EADDecryptor: 

300 """Create decryptor with a key provider for MAC-based lookup. 

301 

302 Args: 

303 key_provider: Provider that looks up keys by MAC address 

304 

305 Returns: 

306 Configured EADDecryptor instance 

307 

308 Example: 

309 >>> provider = DictKeyProvider() 

310 >>> provider.set_ead_key("AA:BB:CC:DD:EE:FF", key_material) 

311 >>> decryptor = EADDecryptor.from_provider(provider) 

312 """ 

313 return cls(_key_provider=key_provider) 

314 

315 def _get_cached_cipher(self, session_key: bytes) -> AESCCM: 

316 """Get or create a cached cipher for the session key. 

317 

318 Args: 

319 session_key: 16-byte AES session key 

320 

321 Returns: 

322 AESCCM cipher instance 

323 """ 

324 if session_key not in self._cipher_cache: 

325 self._cipher_cache[session_key] = _get_aesccm_cipher(session_key) 

326 return self._cipher_cache[session_key] 

327 

328 def decrypt( 

329 self, 

330 raw_ead_data: bytes, 

331 mac_address: str, 

332 associated_data: bytes | None = None, 

333 ) -> EADDecryptResult: 

334 """Decrypt EAD data using cached cipher. 

335 

336 Looks up the session key from the key provider (if configured) 

337 or uses the static key. Caches the cipher instance for reuse. 

338 

339 Args: 

340 raw_ead_data: Raw EAD advertisement bytes (AD Type 0x31 payload) 

341 mac_address: Device MAC address (e.g., "AA:BB:CC:DD:EE:FF") 

342 associated_data: Optional additional authenticated data (AAD) 

343 

344 Returns: 

345 EADDecryptResult with success status and plaintext or error details 

346 """ 

347 # Determine session key 

348 if self._key_provider is not None: 

349 key_material = self._key_provider.get_ead_key(mac_address) 

350 if key_material is None: 

351 return EADDecryptResult( 

352 success=False, 

353 error=f"No EAD key available for {mac_address}", 

354 error_type=EADError.NO_KEY_AVAILABLE, 

355 ) 

356 session_key = key_material.session_key 

357 else: 

358 # Static key guaranteed non-None by __init__ validation 

359 session_key = self._static_key # type: ignore[assignment] 

360 

361 # Parse raw data 

362 if len(raw_ead_data) < EAD_MIN_SIZE: 

363 return EADDecryptResult( 

364 success=False, 

365 error=f"EAD data too short: {len(raw_ead_data)} bytes, minimum {EAD_MIN_SIZE} required", 

366 error_type=EADError.INSUFFICIENT_DATA, 

367 ) 

368 

369 try: 

370 encrypted_data = EncryptedAdvertisingData.from_bytes(raw_ead_data) 

371 device_address = mac_address_to_bytes(mac_address) 

372 except ValueError as err: 

373 return EADDecryptResult( 

374 success=False, 

375 error=str(err), 

376 error_type=EADError.CORRUPTED_DATA, 

377 ) 

378 

379 try: 

380 # Get cached cipher 

381 cipher = self._get_cached_cipher(session_key) 

382 

383 # Build nonce and decrypt 

384 nonce = build_ead_nonce(encrypted_data.randomizer, device_address) 

385 ciphertext_with_mic = encrypted_data.encrypted_payload + encrypted_data.mic 

386 plaintext = cipher.decrypt(nonce, ciphertext_with_mic, associated_data) 

387 

388 return EADDecryptResult(success=True, plaintext=plaintext) 

389 

390 except InvalidTag: 

391 logger.debug("EAD decryption failed: MIC verification failed") 

392 return EADDecryptResult( 

393 success=False, 

394 error="MIC verification failed - incorrect key or corrupted data", 

395 error_type=EADError.INVALID_KEY, 

396 ) 

397 except ValueError as err: 

398 logger.debug("EAD decryption failed: %s", err) 

399 return EADDecryptResult( 

400 success=False, 

401 error=str(err), 

402 error_type=EADError.CORRUPTED_DATA, 

403 ) 

404 

405 def clear_cache(self) -> None: 

406 """Clear the cipher cache. 

407 

408 Call this if you need to free memory or if keys have been rotated. 

409 """ 

410 self._cipher_cache.clear()