Coverage for src / bluetooth_sig / gatt / uuid_registry.py: 87%

305 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 20:14 +0000

1"""UUID registry loading from Bluetooth SIG YAML files.""" 

2 

3from __future__ import annotations 

4 

5from bluetooth_sig.types.base_types import SIGInfo 

6 

7__all__ = [ 

8 "UuidRegistry", 

9 "uuid_registry", 

10] 

11 

12import threading 

13 

14from bluetooth_sig.registry.gss import GssRegistry 

15from bluetooth_sig.registry.uuids.units import UnitsRegistry 

16from bluetooth_sig.types import CharacteristicInfo, ServiceInfo 

17from bluetooth_sig.types.gatt_enums import ValueType 

18from bluetooth_sig.types.registry.descriptor_types import DescriptorInfo 

19from bluetooth_sig.types.registry.gss_characteristic import GssCharacteristicSpec 

20from bluetooth_sig.types.uuid import BluetoothUUID 

21 

22from ..registry.utils import find_bluetooth_sig_path, load_yaml_uuids, normalize_uuid_string 

23from ..types.registry import CharacteristicSpec, FieldInfo, UnitMetadata 

24 

25 

26class UuidRegistry: # pylint: disable=too-many-instance-attributes 

27 """Registry for Bluetooth SIG UUIDs with canonical storage + alias indices. 

28 

29 This registry stores a number of internal caches and mappings which 

30 legitimately exceed the default pylint instance attribute limit. The 

31 complexity is intentional and centralised; an inline disable is used to 

32 avoid noisy global configuration changes. 

33 """ 

34 

35 def __init__(self) -> None: 

36 """Initialize the UUID registry.""" 

37 self._lock = threading.RLock() 

38 

39 # Canonical storage: normalized_uuid -> domain types (single source of truth) 

40 self._services: dict[str, ServiceInfo] = {} 

41 self._characteristics: dict[str, CharacteristicInfo] = {} 

42 self._descriptors: dict[str, DescriptorInfo] = {} 

43 

44 # Lightweight alias indices: alias -> normalized_uuid 

45 self._service_aliases: dict[str, str] = {} 

46 self._characteristic_aliases: dict[str, str] = {} 

47 self._descriptor_aliases: dict[str, str] = {} 

48 

49 # Preserve SIG entries overridden at runtime so we can restore them 

50 self._service_overrides: dict[str, ServiceInfo] = {} 

51 self._characteristic_overrides: dict[str, CharacteristicInfo] = {} 

52 self._descriptor_overrides: dict[str, DescriptorInfo] = {} 

53 

54 # Track runtime-registered UUIDs (replaces origin field checks) 

55 self._runtime_uuids: set[str] = set() 

56 

57 self._gss_registry: GssRegistry | None = None 

58 

59 try: 

60 self._load_uuids() 

61 except (FileNotFoundError, Exception): # pylint: disable=broad-exception-caught 

62 # If YAML loading fails, continue with empty registry 

63 pass 

64 

65 def _store_service(self, info: ServiceInfo) -> None: 

66 """Store service info with canonical storage + aliases.""" 

67 canonical_key = info.uuid.normalized 

68 

69 # Store once in canonical location 

70 self._services[canonical_key] = info 

71 

72 # Create lightweight alias mappings (normalized to lowercase) 

73 aliases = self._generate_aliases(info) 

74 for alias in aliases: 

75 self._service_aliases[alias.lower()] = canonical_key 

76 

77 def _store_characteristic(self, info: CharacteristicInfo) -> None: 

78 """Store characteristic info with canonical storage + aliases.""" 

79 canonical_key = info.uuid.normalized 

80 

81 # Store once in canonical location 

82 self._characteristics[canonical_key] = info 

83 

84 # Create lightweight alias mappings (normalized to lowercase) 

85 aliases = self._generate_aliases(info) 

86 for alias in aliases: 

87 self._characteristic_aliases[alias.lower()] = canonical_key 

88 

89 def _store_descriptor(self, info: DescriptorInfo) -> None: 

90 """Store descriptor info with canonical storage + aliases.""" 

91 canonical_key = info.uuid.normalized 

92 

93 # Store once in canonical location 

94 self._descriptors[canonical_key] = info 

95 

96 # Create lightweight alias mappings (normalized to lowercase) 

97 aliases = self._generate_aliases(info) 

98 for alias in aliases: 

99 self._descriptor_aliases[alias.lower()] = canonical_key 

100 

101 def _generate_aliases(self, info: SIGInfo) -> set[str]: 

102 """Generate name/ID-based alias keys for domain info types (UUID variations handled by BluetoothUUID).""" 

103 aliases: set[str] = { 

104 info.name.lower(), 

105 } 

106 

107 if info.id: 

108 aliases.add(info.id) 

109 

110 if info.id and "service" in info.id: 

111 service_name = info.id.replace("org.bluetooth.service.", "") 

112 if service_name.endswith("_service"): 

113 service_name = service_name[:-8] # Remove _service 

114 service_name = service_name.replace("_", " ").title() 

115 aliases.add(service_name) 

116 # Also add "Service" suffix if not present 

117 if not service_name.endswith(" Service"): 

118 aliases.add(service_name + " Service") 

119 elif info.id and "characteristic" in info.id: 

120 char_name = info.id.replace("org.bluetooth.characteristic.", "") 

121 char_name = char_name.replace("_", " ").title() 

122 aliases.add(char_name) 

123 

124 # Add space-separated words from name 

125 name_words = info.name.replace("_", " ").replace("-", " ") 

126 if " " in name_words: 

127 aliases.add(name_words.title()) 

128 aliases.add(name_words.lower()) 

129 

130 # Remove empty strings, None values, and the canonical key itself 

131 canonical_key = info.uuid.normalized 

132 return {alias for alias in aliases if alias and alias.strip() and alias != canonical_key} 

133 

134 def _load_uuids(self) -> None: # pylint: disable=too-many-branches 

135 """Load all UUIDs from YAML files.""" 

136 base_path = find_bluetooth_sig_path() 

137 if not base_path: 

138 return 

139 

140 # Load service UUIDs 

141 service_yaml = base_path / "service_uuids.yaml" 

142 if service_yaml.exists(): 

143 for uuid_info in load_yaml_uuids(service_yaml): 

144 uuid = normalize_uuid_string(uuid_info["uuid"]) 

145 

146 bt_uuid = BluetoothUUID(uuid) 

147 info = ServiceInfo( 

148 uuid=bt_uuid, 

149 name=uuid_info["name"], 

150 id=uuid_info.get("id", ""), 

151 ) 

152 self._store_service(info) 

153 

154 # Load characteristic UUIDs 

155 characteristic_yaml = base_path / "characteristic_uuids.yaml" 

156 if characteristic_yaml.exists(): 

157 for uuid_info in load_yaml_uuids(characteristic_yaml): 

158 uuid = normalize_uuid_string(uuid_info["uuid"]) 

159 

160 bt_uuid = BluetoothUUID(uuid) 

161 char_info = CharacteristicInfo( 

162 uuid=bt_uuid, 

163 name=uuid_info["name"], 

164 id=uuid_info.get("id", ""), 

165 unit="", # Will be set from unit mappings if available 

166 value_type=ValueType.UNKNOWN, 

167 ) 

168 self._store_characteristic(char_info) 

169 

170 # Load descriptor UUIDs 

171 descriptor_yaml = base_path / "descriptors.yaml" 

172 if descriptor_yaml.exists(): 

173 for uuid_info in load_yaml_uuids(descriptor_yaml): 

174 uuid = normalize_uuid_string(uuid_info["uuid"]) 

175 

176 bt_uuid = BluetoothUUID(uuid) 

177 desc_info = DescriptorInfo( 

178 uuid=bt_uuid, 

179 name=uuid_info["name"], 

180 id=uuid_info.get("id", ""), 

181 ) 

182 self._store_descriptor(desc_info) 

183 

184 # Load GSS specifications 

185 self._gss_registry = GssRegistry.get_instance() 

186 self._load_gss_characteristic_info() 

187 

188 def _load_gss_characteristic_info(self) -> None: 

189 """Load GSS specs and update characteristics with extracted info.""" 

190 if self._gss_registry is None: 

191 return 

192 

193 all_specs = self._gss_registry.get_all_specs() 

194 

195 # Group by identifier to avoid duplicate processing 

196 processed_ids: set[str] = set() 

197 for spec in all_specs.values(): 

198 if spec.identifier in processed_ids: 

199 continue 

200 processed_ids.add(spec.identifier) 

201 

202 # Extract unit and value_type from structure 

203 char_data = { 

204 "structure": [ 

205 { 

206 "field": f.field, 

207 "type": f.type, 

208 "size": f.size, 

209 "description": f.description, 

210 } 

211 for f in spec.structure 

212 ] 

213 } 

214 unit, value_type = self._gss_registry.extract_info_from_gss(char_data) 

215 

216 if unit or value_type: 

217 self._update_characteristic_with_gss_info(spec.name, spec.identifier, unit, value_type) 

218 

219 def _update_characteristic_with_gss_info( 

220 self, char_name: str, char_id: str, unit: str | None, value_type: str | None 

221 ) -> None: 

222 """Update existing characteristic with GSS info.""" 

223 with self._lock: 

224 # Find the canonical entry by checking aliases (normalized to lowercase) 

225 canonical_uuid = None 

226 for search_key in [char_name, char_id]: 

227 canonical_uuid = self._characteristic_aliases.get(search_key.lower()) 

228 if canonical_uuid: 

229 break 

230 

231 if not canonical_uuid or canonical_uuid not in self._characteristics: 

232 return 

233 

234 # Get existing info and create updated version 

235 existing_info = self._characteristics[canonical_uuid] 

236 

237 # Convert value_type string to ValueType enum if provided 

238 new_value_type = existing_info.value_type 

239 if value_type: 

240 try: 

241 new_value_type = ValueType(value_type) 

242 except (ValueError, KeyError): 

243 new_value_type = existing_info.value_type 

244 

245 # Create updated CharacteristicInfo (immutable, so create new instance) 

246 updated_info = CharacteristicInfo( 

247 uuid=existing_info.uuid, 

248 name=existing_info.name, 

249 id=existing_info.id, 

250 unit=unit or existing_info.unit, 

251 value_type=new_value_type, 

252 ) 

253 

254 # Update canonical store (aliases remain the same since UUID/name/id unchanged) 

255 self._characteristics[canonical_uuid] = updated_info 

256 

257 def _convert_bluetooth_unit_to_readable(self, unit_spec: str) -> str: 

258 """Convert Bluetooth SIG unit specification to human-readable symbol. 

259 

260 Args: 

261 unit_spec: Unit specification (e.g., "thermodynamic_temperature.degree_celsius") 

262 

263 Returns: 

264 Human-readable symbol (e.g., "°C"), or unit_spec if no mapping found 

265 """ 

266 unit_spec = unit_spec.rstrip(".").lower() 

267 unit_id = f"org.bluetooth.unit.{unit_spec}" 

268 

269 units_registry = UnitsRegistry.get_instance() 

270 unit_info = units_registry.get_info(unit_id) 

271 if unit_info and unit_info.symbol: 

272 return unit_info.symbol 

273 

274 return unit_spec 

275 

276 def register_characteristic( # pylint: disable=too-many-arguments,too-many-positional-arguments 

277 self, 

278 uuid: BluetoothUUID, 

279 name: str, 

280 identifier: str | None = None, 

281 unit: str | None = None, 

282 value_type: ValueType | None = None, 

283 override: bool = False, 

284 ) -> None: 

285 """Register a custom characteristic at runtime. 

286 

287 Args: 

288 uuid: The Bluetooth UUID for the characteristic 

289 name: Human-readable name 

290 identifier: Optional identifier (auto-generated if not provided) 

291 unit: Optional unit of measurement 

292 value_type: Optional value type 

293 override: If True, allow overriding existing entries 

294 """ 

295 with self._lock: 

296 canonical_key = uuid.normalized 

297 

298 # Check for conflicts with existing entries 

299 if canonical_key in self._characteristics: 

300 # Check if it's a SIG characteristic (not in runtime set) 

301 if canonical_key not in self._runtime_uuids: 

302 if not override: 

303 raise ValueError( 

304 f"UUID {uuid} conflicts with existing SIG " 

305 "characteristic entry. Use override=True to replace." 

306 ) 

307 # Preserve original SIG entry for restoration 

308 self._characteristic_overrides.setdefault(canonical_key, self._characteristics[canonical_key]) 

309 elif not override: 

310 # Runtime entry already exists 

311 raise ValueError( 

312 f"UUID {uuid} already registered as runtime characteristic. Use override=True to replace." 

313 ) 

314 

315 info = CharacteristicInfo( 

316 uuid=uuid, 

317 name=name, 

318 id=identifier or f"runtime.characteristic.{name.lower().replace(' ', '_')}", 

319 unit=unit or "", 

320 value_type=value_type or ValueType.UNKNOWN, 

321 ) 

322 

323 # Track as runtime-registered UUID 

324 self._runtime_uuids.add(canonical_key) 

325 

326 self._store_characteristic(info) 

327 

328 def register_service( 

329 self, 

330 uuid: BluetoothUUID, 

331 name: str, 

332 identifier: str | None = None, 

333 override: bool = False, 

334 ) -> None: 

335 """Register a custom service at runtime. 

336 

337 Args: 

338 uuid: The Bluetooth UUID for the service 

339 name: Human-readable name 

340 identifier: Optional identifier (auto-generated if not provided) 

341 override: If True, allow overriding existing entries 

342 """ 

343 with self._lock: 

344 canonical_key = uuid.normalized 

345 

346 # Check for conflicts with existing entries 

347 if canonical_key in self._services: 

348 # Check if it's a SIG service (not in runtime set) 

349 if canonical_key not in self._runtime_uuids: 

350 if not override: 

351 raise ValueError( 

352 f"UUID {uuid} conflicts with existing SIG service entry. Use override=True to replace." 

353 ) 

354 # Preserve original SIG entry for restoration 

355 self._service_overrides.setdefault(canonical_key, self._services[canonical_key]) 

356 elif not override: 

357 # Runtime entry already exists 

358 raise ValueError( 

359 f"UUID {uuid} already registered as runtime service. Use override=True to replace." 

360 ) 

361 

362 info = ServiceInfo( 

363 uuid=uuid, 

364 name=name, 

365 id=identifier or f"runtime.service.{name.lower().replace(' ', '_')}", 

366 ) 

367 

368 # Track as runtime-registered UUID 

369 self._runtime_uuids.add(canonical_key) 

370 

371 self._store_service(info) 

372 

373 def get_service_info(self, key: str | BluetoothUUID) -> ServiceInfo | None: 

374 """Get information about a service by UUID, name, or ID.""" 

375 with self._lock: 

376 # Convert BluetoothUUID to canonical key 

377 if isinstance(key, BluetoothUUID): 

378 canonical_key = key.normalized 

379 # Direct canonical lookup 

380 if canonical_key in self._services: 

381 return self._services[canonical_key] 

382 else: 

383 search_key = str(key).strip() 

384 

385 # Try UUID normalization first 

386 try: 

387 bt_uuid = BluetoothUUID(search_key) 

388 canonical_key = bt_uuid.normalized 

389 if canonical_key in self._services: 

390 return self._services[canonical_key] 

391 except ValueError: 

392 pass 

393 

394 # Check alias index (normalized to lowercase) 

395 alias_key = self._service_aliases.get(search_key.lower()) 

396 if alias_key and alias_key in self._services: 

397 return self._services[alias_key] 

398 

399 return None 

400 

401 def get_characteristic_info(self, identifier: str | BluetoothUUID) -> CharacteristicInfo | None: 

402 """Get information about a characteristic by UUID, name, or ID.""" 

403 with self._lock: 

404 # Convert BluetoothUUID to canonical key 

405 if isinstance(identifier, BluetoothUUID): 

406 canonical_key = identifier.normalized 

407 # Direct canonical lookup 

408 if canonical_key in self._characteristics: 

409 return self._characteristics[canonical_key] 

410 else: 

411 search_key = str(identifier).strip() 

412 

413 # Try UUID normalization first 

414 try: 

415 bt_uuid = BluetoothUUID(search_key) 

416 canonical_key = bt_uuid.normalized 

417 if canonical_key in self._characteristics: 

418 return self._characteristics[canonical_key] 

419 except ValueError: 

420 pass 

421 

422 # Check alias index (normalized to lowercase) 

423 alias_key = self._characteristic_aliases.get(search_key.lower()) 

424 if alias_key and alias_key in self._characteristics: 

425 return self._characteristics[alias_key] 

426 

427 return None 

428 

429 def get_descriptor_info(self, identifier: str | BluetoothUUID) -> DescriptorInfo | None: 

430 """Get information about a descriptor by UUID, name, or ID.""" 

431 with self._lock: 

432 # Convert BluetoothUUID to canonical key 

433 if isinstance(identifier, BluetoothUUID): 

434 canonical_key = identifier.normalized 

435 # Direct canonical lookup 

436 if canonical_key in self._descriptors: 

437 return self._descriptors[canonical_key] 

438 else: 

439 search_key = str(identifier).strip() 

440 

441 # Try UUID normalization first 

442 try: 

443 bt_uuid = BluetoothUUID(search_key) 

444 canonical_key = bt_uuid.normalized 

445 if canonical_key in self._descriptors: 

446 return self._descriptors[canonical_key] 

447 except ValueError: 

448 pass 

449 

450 # Check alias index (normalized to lowercase) 

451 alias_key = self._descriptor_aliases.get(search_key.lower()) 

452 if alias_key and alias_key in self._descriptors: 

453 return self._descriptors[alias_key] 

454 

455 return None 

456 

457 def get_gss_spec(self, identifier: str | BluetoothUUID) -> GssCharacteristicSpec | None: 

458 """Get the full GSS characteristic specification with all field metadata. 

459 

460 This provides access to the complete YAML structure including all fields, 

461 their units, resolutions, ranges, and presence conditions. 

462 

463 Args: 

464 identifier: Characteristic name, ID, or UUID 

465 

466 Returns: 

467 GssCharacteristicSpec with full field structure, or None if not found 

468 

469 Example: 

470 gss = uuid_registry.get_gss_spec("Location and Speed") 

471 if gss: 

472 for field in gss.structure: 

473 print(f"{field.python_name}: unit={field.unit_id}, resolution={field.resolution}") 

474 

475 """ 

476 if self._gss_registry is None: 

477 return None 

478 

479 with self._lock: 

480 # Try direct lookup by name or ID 

481 if isinstance(identifier, str): 

482 spec = self._gss_registry.get_spec(identifier) 

483 if spec: 

484 return spec 

485 

486 # Try to get CharacteristicInfo to find the ID 

487 char_info = self.get_characteristic_info(identifier) 

488 if char_info: 

489 spec = self._gss_registry.get_spec(char_info.id) 

490 if spec: 

491 return spec 

492 elif isinstance(identifier, BluetoothUUID): 

493 # Look up by UUID 

494 char_info = self.get_characteristic_info(identifier) 

495 if char_info: 

496 spec = self._gss_registry.get_spec(char_info.name) 

497 if spec: 

498 return spec 

499 spec = self._gss_registry.get_spec(char_info.id) 

500 if spec: 

501 return spec 

502 

503 return None 

504 

505 def resolve_characteristic_spec(self, characteristic_name: str) -> CharacteristicSpec | None: # pylint: disable=too-many-locals 

506 """Resolve characteristic specification with rich YAML metadata. 

507 

508 This method provides detailed characteristic information including data types, 

509 field sizes, units, and descriptions by cross-referencing multiple YAML sources. 

510 

511 Args: 

512 characteristic_name: Name of the characteristic (e.g., "Temperature", "Battery Level") 

513 

514 Returns: 

515 CharacteristicSpec with full metadata, or None if not found 

516 

517 Example: 

518 spec = uuid_registry.resolve_characteristic_spec("Temperature") 

519 if spec: 

520 print(f"UUID: {spec.uuid}, Unit: {spec.unit_symbol}, Type: {spec.data_type}") 

521 

522 """ 

523 with self._lock: 

524 # 1. Get UUID from characteristic registry 

525 char_info = self.get_characteristic_info(characteristic_name) 

526 if not char_info: 

527 return None 

528 

529 # 2. Get typed GSS specification if available 

530 gss_spec = self.get_gss_spec(characteristic_name) 

531 

532 # 3. Extract metadata from GSS specification 

533 data_type = None 

534 field_size = None 

535 unit_id = None 

536 unit_symbol = None 

537 base_unit = None 

538 resolution_text = None 

539 description = None 

540 

541 if gss_spec: 

542 description = gss_spec.description 

543 

544 # Only set data_type for single-field characteristics 

545 # Multi-field characteristics have complex structures and no single data type 

546 if len(gss_spec.structure) == 1: 

547 # Use primary field for metadata extraction 

548 primary = gss_spec.primary_field 

549 if primary: 

550 data_type = primary.type 

551 field_size = str(primary.fixed_size) if primary.fixed_size else primary.size 

552 

553 # Use FieldSpec's unit_id property (auto-parsed from description) 

554 if primary.unit_id: 

555 unit_id = f"org.bluetooth.unit.{primary.unit_id}" 

556 unit_symbol = self._convert_bluetooth_unit_to_readable(primary.unit_id) 

557 base_unit = unit_id 

558 

559 # Get resolution from FieldSpec 

560 if primary.resolution is not None: 

561 resolution_text = f"Resolution: {primary.resolution}" 

562 

563 # 4. Use existing unit/value_type from CharacteristicInfo if GSS didn't provide them 

564 if not unit_symbol and char_info.unit: 

565 unit_symbol = char_info.unit 

566 

567 return CharacteristicSpec( 

568 uuid=char_info.uuid, 

569 name=char_info.name, 

570 field_info=FieldInfo(data_type=data_type, field_size=field_size), 

571 unit_info=UnitMetadata( 

572 unit_id=unit_id, 

573 unit_symbol=unit_symbol, 

574 base_unit=base_unit, 

575 resolution_text=resolution_text, 

576 ), 

577 description=description, 

578 structure=gss_spec.structure if gss_spec else [], 

579 ) 

580 

581 def get_signed_from_data_type(self, data_type: str | None) -> bool: 

582 """Determine if data type is signed from GSS data type. 

583 

584 Args: 

585 data_type: GSS data type string (e.g., "sint16", "float32", "uint8") 

586 

587 Returns: 

588 True if the type represents signed values, False otherwise 

589 

590 """ 

591 if not data_type: 

592 return False 

593 # Comprehensive signed type detection 

594 signed_types = {"float32", "float64", "medfloat16", "medfloat32"} 

595 return data_type.startswith("sint") or data_type in signed_types 

596 

597 @staticmethod 

598 def get_byte_order_hint() -> str: 

599 """Get byte order hint for Bluetooth SIG specifications. 

600 

601 Returns: 

602 "little" - Bluetooth SIG uses little-endian by convention 

603 

604 """ 

605 return "little" 

606 

607 def clear_custom_registrations(self) -> None: 

608 """Clear all custom registrations (for testing).""" 

609 with self._lock: 

610 # Use runtime_uuids set to identify what to remove 

611 runtime_keys = list(self._runtime_uuids) 

612 

613 # Remove runtime entries from canonical stores 

614 for key in runtime_keys: 

615 self._services.pop(key, None) 

616 self._characteristics.pop(key, None) 

617 self._descriptors.pop(key, None) 

618 

619 # Remove corresponding aliases (alias -> canonical_key where canonical_key is runtime) 

620 runtime_service_aliases = [ 

621 alias for alias, canonical in self._service_aliases.items() if canonical in runtime_keys 

622 ] 

623 runtime_char_aliases = [ 

624 alias for alias, canonical in self._characteristic_aliases.items() if canonical in runtime_keys 

625 ] 

626 runtime_desc_aliases = [ 

627 alias for alias, canonical in self._descriptor_aliases.items() if canonical in runtime_keys 

628 ] 

629 

630 for alias in runtime_service_aliases: 

631 del self._service_aliases[alias] 

632 for alias in runtime_char_aliases: 

633 del self._characteristic_aliases[alias] 

634 for alias in runtime_desc_aliases: 

635 del self._descriptor_aliases[alias] 

636 

637 # Restore any preserved SIG entries that were overridden 

638 for key in runtime_keys: 

639 original = self._service_overrides.pop(key, None) 

640 if original is not None: 

641 self._store_service(original) 

642 original = self._characteristic_overrides.pop(key, None) 

643 if original is not None: 

644 self._store_characteristic(original) 

645 original = self._descriptor_overrides.pop(key, None) 

646 if original is not None: 

647 self._store_descriptor(original) 

648 

649 # Clear the runtime tracking set 

650 self._runtime_uuids.clear() 

651 

652 

653# Global instance 

654uuid_registry = UuidRegistry()