Coverage for src/bluetooth_sig/gatt/uuid_registry.py: 88%

419 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-30 00:10 +0000

1"""UUID registry loading from Bluetooth SIG YAML files.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6import threading 

7from enum import Enum 

8from pathlib import Path 

9from typing import Any, cast 

10 

11import msgspec 

12 

13from bluetooth_sig.types.gatt_enums import DataType 

14from bluetooth_sig.types.uuid import BluetoothUUID 

15 

16from ..registry.utils import find_bluetooth_sig_path, load_yaml_uuids, normalize_uuid_string 

17 

18 

19class FieldInfo(msgspec.Struct, frozen=True, kw_only=True): 

20 """Field-related metadata from YAML.""" 

21 

22 data_type: str | None = None 

23 field_size: str | None = None 

24 

25 

26class UnitInfo(msgspec.Struct, frozen=True, kw_only=True): 

27 """Unit-related metadata from YAML.""" 

28 

29 unit_id: str | None = None 

30 unit_symbol: str | None = None 

31 base_unit: str | None = None 

32 resolution_text: str | None = None 

33 

34 

35class CharacteristicSpec(msgspec.Struct, kw_only=True): 

36 """Characteristic specification from cross-file YAML references.""" 

37 

38 uuid: BluetoothUUID 

39 name: str 

40 field_info: FieldInfo = msgspec.field(default_factory=FieldInfo) 

41 unit_info: UnitInfo = msgspec.field(default_factory=UnitInfo) 

42 description: str | None = None 

43 

44 # Convenience properties for backward compatibility 

45 @property 

46 def data_type(self) -> str | None: 

47 """Get data type from field info.""" 

48 return self.field_info.data_type if self.field_info else None 

49 

50 @property 

51 def field_size(self) -> str | None: 

52 """Get field size from field info.""" 

53 return self.field_info.field_size if self.field_info else None 

54 

55 @property 

56 def unit_id(self) -> str | None: 

57 """Get unit ID from unit info.""" 

58 return self.unit_info.unit_id if self.unit_info else None 

59 

60 @property 

61 def unit_symbol(self) -> str | None: 

62 """Get unit symbol from unit info.""" 

63 return self.unit_info.unit_symbol if self.unit_info else None 

64 

65 @property 

66 def base_unit(self) -> str | None: 

67 """Get base unit from unit info.""" 

68 return self.unit_info.base_unit if self.unit_info else None 

69 

70 @property 

71 def resolution_text(self) -> str | None: 

72 """Get resolution text from unit info.""" 

73 return self.unit_info.resolution_text if self.unit_info else None 

74 

75 

76class UuidOrigin(Enum): 

77 """Origin of UUID information.""" 

78 

79 BLUETOOTH_SIG = "bluetooth_sig" 

80 RUNTIME = "runtime" 

81 

82 

83class UuidInfo(msgspec.Struct, frozen=True, kw_only=True): 

84 """Information about a UUID.""" 

85 

86 uuid: BluetoothUUID 

87 name: str 

88 id: str 

89 summary: str = "" 

90 unit: str | None = None 

91 value_type: str | None = None 

92 origin: UuidOrigin = UuidOrigin.BLUETOOTH_SIG 

93 

94 

95class CustomUuidEntry(msgspec.Struct, frozen=True, kw_only=True): 

96 """Entry for custom UUID registration.""" 

97 

98 uuid: BluetoothUUID 

99 name: str 

100 id: str | None = None 

101 summary: str | None = None 

102 unit: str | None = None 

103 value_type: str | None = None 

104 

105 

106class UuidRegistry: # pylint: disable=too-many-instance-attributes 

107 """Registry for Bluetooth SIG UUIDs with canonical storage + alias indices. 

108 

109 This registry stores a number of internal caches and mappings which 

110 legitimately exceed the default pylint instance attribute limit. The 

111 complexity is intentional and centralised; an inline disable is used to 

112 avoid noisy global configuration changes. 

113 """ 

114 

115 def __init__(self) -> None: 

116 """Initialize the UUID registry.""" 

117 self._lock = threading.RLock() 

118 

119 # Canonical storage: normalized_uuid -> UuidInfo (single source of truth) 

120 self._services: dict[str, UuidInfo] = {} 

121 self._characteristics: dict[str, UuidInfo] = {} 

122 self._descriptors: dict[str, UuidInfo] = {} 

123 

124 # Lightweight alias indices: alias -> normalized_uuid 

125 self._service_aliases: dict[str, str] = {} 

126 self._characteristic_aliases: dict[str, str] = {} 

127 self._descriptor_aliases: dict[str, str] = {} 

128 

129 # Preserve SIG entries overridden at runtime so we can restore them 

130 self._service_overrides: dict[str, UuidInfo] = {} 

131 self._characteristic_overrides: dict[str, UuidInfo] = {} 

132 self._descriptor_overrides: dict[str, UuidInfo] = {} 

133 

134 # Unit mappings 

135 self._unit_mappings: dict[str, str] = {} 

136 

137 # GSS specifications storage (for resolve_characteristic_spec) 

138 self._gss_specs: dict[str, dict[str, Any]] = {} 

139 

140 try: 

141 self._load_uuids() 

142 except (FileNotFoundError, Exception): # pylint: disable=broad-exception-caught 

143 # If YAML loading fails, continue with empty registry 

144 pass 

145 

146 def _store_service(self, info: UuidInfo) -> None: 

147 """Store service info with canonical storage + aliases.""" 

148 canonical_key = info.uuid.normalized 

149 

150 # Store once in canonical location 

151 self._services[canonical_key] = info 

152 

153 # Create lightweight alias mappings (normalized to lowercase) 

154 aliases = self._generate_aliases(info) 

155 for alias in aliases: 

156 self._service_aliases[alias.lower()] = canonical_key 

157 

158 def _store_characteristic(self, info: UuidInfo) -> None: 

159 """Store characteristic info with canonical storage + aliases.""" 

160 canonical_key = info.uuid.normalized 

161 

162 # Store once in canonical location 

163 self._characteristics[canonical_key] = info 

164 

165 # Create lightweight alias mappings (normalized to lowercase) 

166 aliases = self._generate_aliases(info) 

167 for alias in aliases: 

168 self._characteristic_aliases[alias.lower()] = canonical_key 

169 

170 def _store_descriptor(self, info: UuidInfo) -> None: 

171 """Store descriptor info with canonical storage + aliases.""" 

172 canonical_key = info.uuid.normalized 

173 

174 # Store once in canonical location 

175 self._descriptors[canonical_key] = info 

176 

177 # Create lightweight alias mappings (normalized to lowercase) 

178 aliases = self._generate_aliases(info) 

179 for alias in aliases: 

180 self._descriptor_aliases[alias.lower()] = canonical_key 

181 

182 def _generate_aliases(self, info: UuidInfo) -> set[str]: 

183 """Generate name/ID-based alias keys for a UuidInfo (UUID variations handled by BluetoothUUID).""" 

184 aliases: set[str] = { 

185 # Name variations 

186 info.name.lower(), # Lowercase name 

187 info.id, # Full ID 

188 } 

189 

190 # Add service/characteristic-specific name variations 

191 if "service" in info.id: 

192 service_name = info.id.replace("org.bluetooth.service.", "") 

193 if service_name.endswith("_service"): 

194 service_name = service_name[:-8] # Remove _service 

195 service_name = service_name.replace("_", " ").title() 

196 aliases.add(service_name) 

197 # Also add "Service" suffix if not present 

198 if not service_name.endswith(" Service"): 

199 aliases.add(service_name + " Service") 

200 elif "characteristic" in info.id: 

201 char_name = info.id.replace("org.bluetooth.characteristic.", "") 

202 char_name = char_name.replace("_", " ").title() 

203 aliases.add(char_name) 

204 

205 # Add space-separated words from name 

206 name_words = info.name.replace("_", " ").replace("-", " ") 

207 if " " in name_words: 

208 aliases.add(name_words.title()) 

209 aliases.add(name_words.lower()) 

210 

211 # Remove empty strings, None values, and the canonical key itself 

212 canonical_key = info.uuid.normalized 

213 return {alias for alias in aliases if alias and alias.strip() and alias != canonical_key} 

214 

215 def _load_uuids(self) -> None: # pylint: disable=too-many-branches 

216 """Load all UUIDs from YAML files.""" 

217 base_path = find_bluetooth_sig_path() 

218 if not base_path: 

219 return 

220 

221 # Load service UUIDs 

222 service_yaml = base_path / "service_uuids.yaml" 

223 if service_yaml.exists(): 

224 for uuid_info in load_yaml_uuids(service_yaml): 

225 uuid = normalize_uuid_string(uuid_info["uuid"]) 

226 

227 bt_uuid = BluetoothUUID(uuid) 

228 info = UuidInfo( 

229 uuid=bt_uuid, name=uuid_info["name"], id=uuid_info["id"], origin=UuidOrigin.BLUETOOTH_SIG 

230 ) 

231 self._store_service(info) 

232 

233 # Load characteristic UUIDs 

234 characteristic_yaml = base_path / "characteristic_uuids.yaml" 

235 if characteristic_yaml.exists(): 

236 for uuid_info in load_yaml_uuids(characteristic_yaml): 

237 uuid = normalize_uuid_string(uuid_info["uuid"]) 

238 

239 bt_uuid = BluetoothUUID(uuid) 

240 info = UuidInfo( 

241 uuid=bt_uuid, name=uuid_info["name"], id=uuid_info["id"], origin=UuidOrigin.BLUETOOTH_SIG 

242 ) 

243 self._store_characteristic(info) 

244 

245 # Load descriptor UUIDs 

246 descriptor_yaml = base_path / "descriptors.yaml" 

247 if descriptor_yaml.exists(): 

248 for uuid_info in load_yaml_uuids(descriptor_yaml): 

249 uuid = normalize_uuid_string(uuid_info["uuid"]) 

250 

251 bt_uuid = BluetoothUUID(uuid) 

252 info = UuidInfo( 

253 uuid=bt_uuid, name=uuid_info["name"], id=uuid_info["id"], origin=UuidOrigin.BLUETOOTH_SIG 

254 ) 

255 self._store_descriptor(info) 

256 

257 # Load unit mappings and GSS specifications 

258 self._load_unit_mappings(base_path) 

259 self._load_gss_specifications() 

260 

261 def _load_unit_mappings(self, base_path: Path) -> None: 

262 """Load unit symbol mappings from units.yaml file.""" 

263 units_yaml = base_path / "units.yaml" 

264 if not units_yaml.exists(): 

265 return 

266 

267 try: 

268 units_data = load_yaml_uuids(units_yaml) 

269 for unit_info in units_data: 

270 unit_id = unit_info.get("id", "") 

271 unit_name = unit_info.get("name", "") 

272 

273 if not unit_id or not unit_name: 

274 continue 

275 

276 unit_symbol = self._extract_unit_symbol_from_name(unit_name) 

277 if unit_symbol: 

278 unit_key = unit_id.replace("org.bluetooth.unit.", "").lower() 

279 self._unit_mappings[unit_key] = unit_symbol 

280 

281 except (msgspec.DecodeError, OSError, KeyError): 

282 pass 

283 

284 def _extract_unit_symbol_from_name(self, unit_name: str) -> str: 

285 """Extract unit symbol from unit name. 

286 

287 Args: 

288 unit_name: The unit name from units.yaml (e.g., "pressure (pascal)") 

289 

290 Returns: 

291 Unit symbol string (e.g., "Pa"), or empty string if no symbol can be extracted 

292 

293 """ 

294 # Handle common unit names that map to symbols 

295 unit_symbol_map = { 

296 "percentage": "%", 

297 "per mille": "‰", 

298 "unitless": "", 

299 } 

300 

301 if unit_name.lower() in unit_symbol_map: 

302 return unit_symbol_map[unit_name.lower()] 

303 

304 # Extract symbol from parentheses if present 

305 if "(" in unit_name and ")" in unit_name: 

306 start = unit_name.find("(") + 1 

307 end = unit_name.find(")", start) 

308 if 0 < start < end: 

309 symbol_candidate = unit_name[start:end].strip() 

310 

311 # Map common symbols 

312 symbol_mapping = { 

313 "degree celsius": "°C", 

314 "degree fahrenheit": "°F", 

315 "kelvin": "K", 

316 "pascal": "Pa", 

317 "bar": "bar", 

318 "millimetre of mercury": "mmHg", 

319 "ampere": "A", 

320 "volt": "V", 

321 "joule": "J", 

322 "watt": "W", 

323 "hertz": "Hz", 

324 "metre": "m", 

325 "kilogram": "kg", 

326 "second": "s", 

327 "metre per second": "m/s", 

328 "metre per second squared": "m/s²", 

329 "radian per second": "rad/s", 

330 "candela": "cd", 

331 "lux": "lux", 

332 "newton": "N", 

333 "coulomb": "C", 

334 "farad": "F", 

335 "ohm": "Ω", 

336 "siemens": "S", 

337 "weber": "Wb", 

338 "tesla": "T", 

339 "henry": "H", 

340 "lumen": "lm", 

341 "becquerel": "Bq", 

342 "gray": "Gy", 

343 "sievert": "Sv", 

344 "katal": "kat", 

345 "degree": "°", 

346 "radian": "rad", 

347 "steradian": "sr", 

348 } 

349 

350 return symbol_mapping.get(symbol_candidate.lower(), symbol_candidate) 

351 

352 # For units without parentheses, try to map common ones 

353 common_units = { 

354 "frequency": "Hz", 

355 "force": "N", 

356 "pressure": "Pa", 

357 "energy": "J", 

358 "power": "W", 

359 "mass": "kg", 

360 "length": "m", 

361 "time": "s", 

362 } 

363 

364 for unit_type, symbol in common_units.items(): 

365 if unit_name.lower().startswith(unit_type): 

366 return symbol 

367 

368 # Handle thermodynamic temperature specially (from yaml_cross_reference) 

369 if "celsius temperature" in unit_name.lower(): 

370 return "°C" 

371 if "fahrenheit temperature" in unit_name.lower(): 

372 return "°F" 

373 

374 # Return empty string if no symbol can be extracted (API compatibility) 

375 return "" 

376 

377 def _load_gss_specifications(self) -> None: 

378 """Load detailed specifications from GSS YAML files.""" 

379 gss_path = self._find_gss_path() 

380 if not gss_path: 

381 return 

382 

383 for yaml_file in gss_path.glob("org.bluetooth.characteristic.*.yaml"): 

384 self._process_gss_file(yaml_file) 

385 

386 def _find_gss_path(self) -> Path | None: 

387 """Find the GSS specifications directory.""" 

388 project_root = Path(__file__).parent.parent.parent.parent 

389 gss_path = project_root / "bluetooth_sig" / "gss" 

390 

391 if gss_path.exists(): 

392 return gss_path 

393 

394 pkg_root = Path(__file__).parent.parent 

395 gss_path = pkg_root / "bluetooth_sig" / "gss" 

396 

397 return gss_path if gss_path.exists() else None 

398 

399 def _process_gss_file(self, yaml_file: Path) -> None: 

400 """Process a single GSS YAML file.""" 

401 try: 

402 with yaml_file.open("r", encoding="utf-8") as f: 

403 data = msgspec.yaml.decode(f.read()) 

404 

405 if not data or "characteristic" not in data: 

406 return 

407 

408 char_data = data["characteristic"] 

409 char_name = char_data.get("name") 

410 char_id = char_data.get("identifier") 

411 

412 if not char_name or not char_id: 

413 return 

414 

415 # Store full GSS spec for resolve_characteristic_spec method 

416 # Store by both ID and name for lookup flexibility 

417 if char_id: 

418 self._gss_specs[char_id] = char_data 

419 if char_name: 

420 self._gss_specs[char_name] = char_data 

421 

422 unit, value_type = self._extract_info_from_gss(char_data) 

423 

424 if unit or value_type: 

425 self._update_characteristic_with_gss_info(char_name, char_id, unit, value_type) 

426 

427 except (msgspec.DecodeError, OSError, KeyError) as e: 

428 logging.warning("Failed to parse GSS YAML file %s: %s", yaml_file, e) 

429 

430 def _update_characteristic_with_gss_info( 

431 self, char_name: str, char_id: str, unit: str | None, value_type: str | None 

432 ) -> None: 

433 """Update existing characteristic with GSS info.""" 

434 with self._lock: 

435 # Find the canonical entry by checking aliases (normalized to lowercase) 

436 canonical_uuid = None 

437 for search_key in [char_name, char_id]: 

438 canonical_uuid = self._characteristic_aliases.get(search_key.lower()) 

439 if canonical_uuid: 

440 break 

441 

442 if not canonical_uuid or canonical_uuid not in self._characteristics: 

443 return 

444 

445 # Get existing info and create updated version 

446 existing_info = self._characteristics[canonical_uuid] 

447 updated_info = UuidInfo( 

448 uuid=existing_info.uuid, 

449 name=existing_info.name, 

450 id=existing_info.id, 

451 summary=existing_info.summary, 

452 unit=unit or existing_info.unit, 

453 value_type=value_type or existing_info.value_type, 

454 origin=existing_info.origin, 

455 ) 

456 

457 # Update canonical store (aliases remain the same since UUID/name/id unchanged) 

458 self._characteristics[canonical_uuid] = updated_info 

459 

460 def _extract_info_from_gss(self, char_data: dict[str, Any]) -> tuple[str | None, str | None]: 

461 """Extract unit and value_type from GSS characteristic structure.""" 

462 structure = char_data.get("structure", []) 

463 if not isinstance(structure, list) or not structure: 

464 return None, None 

465 

466 typed_structure: list[dict[str, Any]] = [] 

467 for raw_field in structure: 

468 if isinstance(raw_field, dict): 

469 typed_structure.append(cast(dict[str, Any], raw_field)) 

470 

471 if not typed_structure: 

472 return None, None 

473 

474 unit = None 

475 value_type = None 

476 

477 for field in typed_structure: 

478 field_dict: dict[str, Any] = field 

479 

480 if not value_type and isinstance(field_dict.get("type"), str): 

481 yaml_type_value = cast(str, field_dict["type"]) 

482 value_type = self._convert_yaml_type_to_python_type(yaml_type_value) 

483 

484 description_value = field_dict.get("description", "") 

485 if not isinstance(description_value, str): 

486 continue 

487 

488 if "Base Unit:" in description_value and not unit: 

489 unit_line = None 

490 for line in description_value.split("\n"): 

491 if "Base Unit:" in line: 

492 unit_line = line.strip() 

493 break 

494 

495 if unit_line and "org.bluetooth.unit." in unit_line: 

496 unit_spec = unit_line.split("org.bluetooth.unit.")[1].strip() 

497 unit = self._convert_bluetooth_unit_to_readable(unit_spec) 

498 

499 return unit, value_type 

500 

501 def _convert_yaml_type_to_python_type(self, yaml_type: str) -> str: 

502 """Convert YAML type to Python type string.""" 

503 return DataType.from_string(yaml_type).to_python_type() 

504 

505 def _convert_bluetooth_unit_to_readable(self, unit_spec: str) -> str: 

506 """Convert Bluetooth SIG unit specification to human-readable format.""" 

507 unit_spec = unit_spec.rstrip(".").lower() 

508 return self._unit_mappings.get(unit_spec, unit_spec) 

509 

510 def register_characteristic( 

511 self, 

512 entry: CustomUuidEntry, 

513 override: bool = False, 

514 ) -> None: 

515 """Register a custom characteristic at runtime.""" 

516 with self._lock: 

517 canonical_key = entry.uuid.normalized 

518 

519 # Check for conflicts with existing entries 

520 if canonical_key in self._characteristics: 

521 existing = self._characteristics[canonical_key] 

522 if existing.origin == UuidOrigin.BLUETOOTH_SIG: 

523 if not override: 

524 raise ValueError( 

525 f"UUID {entry.uuid} conflicts with existing SIG " 

526 "characteristic entry. Use override=True to replace." 

527 ) 

528 # Preserve original SIG entry for restoration 

529 self._characteristic_overrides.setdefault(canonical_key, existing) 

530 elif existing.origin == UuidOrigin.RUNTIME and not override: 

531 raise ValueError( 

532 f"UUID {entry.uuid} already registered as runtime characteristic. Use override=True to replace." 

533 ) 

534 

535 info = UuidInfo( 

536 uuid=entry.uuid, 

537 name=entry.name, 

538 id=entry.id or f"runtime.characteristic.{entry.name.lower().replace(' ', '_')}", 

539 summary=entry.summary or "", 

540 unit=entry.unit, 

541 value_type=entry.value_type, 

542 origin=UuidOrigin.RUNTIME, 

543 ) 

544 

545 self._store_characteristic(info) 

546 

547 def register_service(self, entry: CustomUuidEntry, override: bool = False) -> None: 

548 """Register a custom service at runtime.""" 

549 with self._lock: 

550 canonical_key = entry.uuid.normalized 

551 

552 # Check for conflicts with existing entries 

553 if canonical_key in self._services: 

554 existing = self._services[canonical_key] 

555 if existing.origin == UuidOrigin.BLUETOOTH_SIG: 

556 if not override: 

557 raise ValueError( 

558 f"UUID {entry.uuid} conflicts with existing SIG service entry. " 

559 "Use override=True to replace." 

560 ) 

561 # Preserve original SIG entry for restoration 

562 self._service_overrides.setdefault(canonical_key, existing) 

563 elif existing.origin == UuidOrigin.RUNTIME and not override: 

564 raise ValueError( 

565 f"UUID {entry.uuid} already registered as runtime service. Use override=True to replace." 

566 ) 

567 

568 info = UuidInfo( 

569 uuid=entry.uuid, 

570 name=entry.name, 

571 id=entry.id or f"runtime.service.{entry.name.lower().replace(' ', '_')}", 

572 summary=entry.summary or "", 

573 origin=UuidOrigin.RUNTIME, 

574 ) 

575 

576 self._store_service(info) 

577 

578 def get_service_info(self, key: str | BluetoothUUID) -> UuidInfo | None: 

579 """Get information about a service by UUID, name, or ID.""" 

580 with self._lock: 

581 # Convert BluetoothUUID to canonical key 

582 if isinstance(key, BluetoothUUID): 

583 canonical_key = key.normalized 

584 # Direct canonical lookup 

585 if canonical_key in self._services: 

586 return self._services[canonical_key] 

587 else: 

588 search_key = str(key).strip() 

589 

590 # Try UUID normalization first 

591 try: 

592 bt_uuid = BluetoothUUID(search_key) 

593 canonical_key = bt_uuid.normalized 

594 if canonical_key in self._services: 

595 return self._services[canonical_key] 

596 except ValueError: 

597 pass 

598 

599 # Check alias index (normalized to lowercase) 

600 alias_key = self._service_aliases.get(search_key.lower()) 

601 if alias_key and alias_key in self._services: 

602 return self._services[alias_key] 

603 

604 return None 

605 

606 def get_characteristic_info(self, identifier: str | BluetoothUUID) -> UuidInfo | None: 

607 """Get information about a characteristic by UUID, name, or ID.""" 

608 with self._lock: 

609 # Convert BluetoothUUID to canonical key 

610 if isinstance(identifier, BluetoothUUID): 

611 canonical_key = identifier.normalized 

612 # Direct canonical lookup 

613 if canonical_key in self._characteristics: 

614 return self._characteristics[canonical_key] 

615 else: 

616 search_key = str(identifier).strip() 

617 

618 # Try UUID normalization first 

619 try: 

620 bt_uuid = BluetoothUUID(search_key) 

621 canonical_key = bt_uuid.normalized 

622 if canonical_key in self._characteristics: 

623 return self._characteristics[canonical_key] 

624 except ValueError: 

625 pass 

626 

627 # Check alias index (normalized to lowercase) 

628 alias_key = self._characteristic_aliases.get(search_key.lower()) 

629 if alias_key and alias_key in self._characteristics: 

630 return self._characteristics[alias_key] 

631 

632 return None 

633 

634 def get_descriptor_info(self, identifier: str | BluetoothUUID) -> UuidInfo | None: 

635 """Get information about a descriptor by UUID, name, or ID.""" 

636 with self._lock: 

637 # Convert BluetoothUUID to canonical key 

638 if isinstance(identifier, BluetoothUUID): 

639 canonical_key = identifier.normalized 

640 # Direct canonical lookup 

641 if canonical_key in self._descriptors: 

642 return self._descriptors[canonical_key] 

643 else: 

644 search_key = str(identifier).strip() 

645 

646 # Try UUID normalization first 

647 try: 

648 bt_uuid = BluetoothUUID(search_key) 

649 canonical_key = bt_uuid.normalized 

650 if canonical_key in self._descriptors: 

651 return self._descriptors[canonical_key] 

652 except ValueError: 

653 pass 

654 

655 # Check alias index (normalized to lowercase) 

656 alias_key = self._descriptor_aliases.get(search_key.lower()) 

657 if alias_key and alias_key in self._descriptors: 

658 return self._descriptors[alias_key] 

659 

660 return None 

661 

662 def resolve_characteristic_spec(self, characteristic_name: str) -> CharacteristicSpec | None: # pylint: disable=too-many-locals 

663 """Resolve characteristic specification with rich YAML metadata. 

664 

665 This method provides detailed characteristic information including data types, 

666 field sizes, units, and descriptions by cross-referencing multiple YAML sources. 

667 

668 Args: 

669 characteristic_name: Name of the characteristic (e.g., "Temperature", "Battery Level") 

670 

671 Returns: 

672 CharacteristicSpec with full metadata, or None if not found 

673 

674 Example: 

675 spec = uuid_registry.resolve_characteristic_spec("Temperature") 

676 if spec: 

677 print(f"UUID: {spec.uuid}, Unit: {spec.unit_symbol}, Type: {spec.data_type}") 

678 

679 """ 

680 with self._lock: 

681 # 1. Get UUID from characteristic registry 

682 char_info = self.get_characteristic_info(characteristic_name) 

683 if not char_info: 

684 return None 

685 

686 # 2. Get GSS specification if available 

687 gss_spec = None 

688 for search_key in [characteristic_name, char_info.id]: 

689 # Load GSS specs on-demand (already loaded in __init__) 

690 if hasattr(self, "_gss_specs"): 

691 gss_spec = getattr(self, "_gss_specs", {}).get(search_key) 

692 if gss_spec: 

693 break 

694 

695 # 3. Extract metadata from GSS specification 

696 data_type = None 

697 field_size = None 

698 unit_id = None 

699 unit_symbol = None 

700 base_unit = None 

701 resolution_text = None 

702 description = None 

703 

704 if gss_spec: 

705 description = gss_spec.get("description", "") 

706 structure = gss_spec.get("structure", []) 

707 

708 if structure and len(structure) > 0: 

709 first_field = structure[0] 

710 data_type = first_field.get("type") 

711 field_size = first_field.get("size") 

712 field_description = first_field.get("description", "") 

713 

714 # Extract base unit from description 

715 if "Base Unit:" in field_description: 

716 base_unit_line = field_description.split("Base Unit:")[1].split("\n")[0].strip() 

717 base_unit = base_unit_line 

718 unit_id = base_unit_line 

719 

720 # Cross-reference unit_id with units.yaml to get symbol 

721 if hasattr(self, "_unit_mappings"): 

722 unit_symbol = getattr(self, "_unit_mappings", {}).get(unit_id, "") 

723 

724 # Extract resolution information 

725 if "resolution of" in field_description.lower(): 

726 resolution_text = field_description 

727 

728 # 4. Use existing unit/value_type from UuidInfo if GSS didn't provide them 

729 if not unit_symbol and char_info.unit: 

730 unit_symbol = char_info.unit 

731 

732 return CharacteristicSpec( 

733 uuid=char_info.uuid, 

734 name=char_info.name, 

735 field_info=FieldInfo(data_type=data_type, field_size=field_size), 

736 unit_info=UnitInfo( 

737 unit_id=unit_id, 

738 unit_symbol=unit_symbol, 

739 base_unit=base_unit, 

740 resolution_text=resolution_text, 

741 ), 

742 description=description, 

743 ) 

744 

745 def get_signed_from_data_type(self, data_type: str | None) -> bool: 

746 """Determine if data type is signed from GSS data type. 

747 

748 Args: 

749 data_type: GSS data type string (e.g., "sint16", "float32", "uint8") 

750 

751 Returns: 

752 True if the type represents signed values, False otherwise 

753 

754 """ 

755 if not data_type: 

756 return False 

757 # Comprehensive signed type detection 

758 signed_types = {"float32", "float64", "medfloat16", "medfloat32"} 

759 return data_type.startswith("sint") or data_type in signed_types 

760 

761 @staticmethod 

762 def get_byte_order_hint() -> str: 

763 """Get byte order hint for Bluetooth SIG specifications. 

764 

765 Returns: 

766 "little" - Bluetooth SIG uses little-endian by convention 

767 

768 """ 

769 return "little" 

770 

771 def clear_custom_registrations(self) -> None: 

772 """Clear all custom registrations (for testing).""" 

773 with self._lock: 

774 # Remove runtime entries from canonical stores 

775 runtime_service_keys = [k for k, v in self._services.items() if v.origin == UuidOrigin.RUNTIME] 

776 runtime_char_keys = [k for k, v in self._characteristics.items() if v.origin == UuidOrigin.RUNTIME] 

777 runtime_desc_keys = [k for k, v in self._descriptors.items() if v.origin == UuidOrigin.RUNTIME] 

778 

779 for key in runtime_service_keys: 

780 del self._services[key] 

781 for key in runtime_char_keys: 

782 del self._characteristics[key] 

783 for key in runtime_desc_keys: 

784 del self._descriptors[key] 

785 

786 # Remove corresponding aliases (alias -> canonical_key where canonical_key is runtime) 

787 runtime_service_aliases = [ 

788 alias for alias, canonical in self._service_aliases.items() if canonical in runtime_service_keys 

789 ] 

790 runtime_char_aliases = [ 

791 alias for alias, canonical in self._characteristic_aliases.items() if canonical in runtime_char_keys 

792 ] 

793 runtime_desc_aliases = [ 

794 alias for alias, canonical in self._descriptor_aliases.items() if canonical in runtime_desc_keys 

795 ] 

796 

797 for alias in runtime_service_aliases: 

798 del self._service_aliases[alias] 

799 for alias in runtime_char_aliases: 

800 del self._characteristic_aliases[alias] 

801 for alias in runtime_desc_aliases: 

802 del self._descriptor_aliases[alias] 

803 

804 # Restore any preserved SIG entries that were overridden 

805 for key in runtime_service_keys: 

806 original = self._service_overrides.pop(key, None) 

807 if original is not None: 

808 self._store_service(original) 

809 for key in runtime_char_keys: 

810 original = self._characteristic_overrides.pop(key, None) 

811 if original is not None: 

812 self._store_characteristic(original) 

813 for key in runtime_desc_keys: 

814 original = self._descriptor_overrides.pop(key, None) 

815 if original is not None: 

816 self._store_descriptor(original) 

817 

818 

819# Global instance 

820uuid_registry = UuidRegistry()