Coverage for src / bluetooth_sig / gatt / uuid_registry.py: 88%
312 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-03 16:41 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-03 16:41 +0000
1"""UUID registry loading from Bluetooth SIG YAML files."""
3from __future__ import annotations
5import contextlib
6import logging
7import threading
9from bluetooth_sig.registry.gss import GssRegistry
10from bluetooth_sig.registry.uuids.units import UnitsRegistry
11from bluetooth_sig.types import CharacteristicInfo, ServiceInfo
12from bluetooth_sig.types.base_types import SIGInfo
13from bluetooth_sig.types.registry.descriptor_types import DescriptorInfo
14from bluetooth_sig.types.registry.gss_characteristic import GssCharacteristicSpec
15from bluetooth_sig.types.uuid import BluetoothUUID
17from ..registry.utils import find_bluetooth_sig_path, load_yaml_uuids, normalize_uuid_string
18from ..types.registry import CharacteristicSpec, FieldInfo, UnitMetadata
20__all__ = [
21 "UuidRegistry",
22 "uuid_registry",
23]
25logger = logging.getLogger(__name__)
28class UuidRegistry: # pylint: disable=too-many-instance-attributes
29 """Registry for Bluetooth SIG UUIDs with canonical storage + alias indices.
31 This registry stores a number of internal caches and mappings which
32 legitimately exceed the default pylint instance attribute limit. The
33 complexity is intentional and centralised; an inline disable is used to
34 avoid noisy global configuration changes.
35 """
37 def __init__(self) -> None:
38 """Initialize the UUID registry."""
39 self._lock = threading.RLock()
41 # Canonical storage: normalized_uuid -> domain types (single source of truth)
42 self._services: dict[str, ServiceInfo] = {}
43 self._characteristics: dict[str, CharacteristicInfo] = {}
44 self._descriptors: dict[str, DescriptorInfo] = {}
46 # Lightweight alias indices: alias -> normalized_uuid
47 self._service_aliases: dict[str, str] = {}
48 self._characteristic_aliases: dict[str, str] = {}
49 self._descriptor_aliases: dict[str, str] = {}
51 # Preserve SIG entries overridden at runtime so we can restore them
52 self._service_overrides: dict[str, ServiceInfo] = {}
53 self._characteristic_overrides: dict[str, CharacteristicInfo] = {}
54 self._descriptor_overrides: dict[str, DescriptorInfo] = {}
56 # Track runtime-registered UUIDs (replaces origin field checks)
57 self._runtime_uuids: set[str] = set()
59 self._gss_registry: GssRegistry | None = None
61 with contextlib.suppress(FileNotFoundError, Exception):
62 # If YAML loading fails, continue with empty registry
63 self._load_uuids()
65 def _store_service(self, info: ServiceInfo) -> None:
66 """Store service info with canonical storage + aliases."""
67 canonical_key = info.uuid.normalized
69 # Store once in canonical location
70 self._services[canonical_key] = info
72 # Create lightweight alias mappings (normalized to lowercase)
73 aliases = self._generate_aliases(info)
74 for alias in aliases:
75 self._service_aliases[alias.lower()] = canonical_key
77 def _store_characteristic(self, info: CharacteristicInfo) -> None:
78 """Store characteristic info with canonical storage + aliases."""
79 canonical_key = info.uuid.normalized
81 # Store once in canonical location
82 self._characteristics[canonical_key] = info
84 # Create lightweight alias mappings (normalized to lowercase)
85 aliases = self._generate_aliases(info)
86 for alias in aliases:
87 self._characteristic_aliases[alias.lower()] = canonical_key
89 def _store_descriptor(self, info: DescriptorInfo) -> None:
90 """Store descriptor info with canonical storage + aliases."""
91 canonical_key = info.uuid.normalized
93 # Store once in canonical location
94 self._descriptors[canonical_key] = info
96 # Create lightweight alias mappings (normalized to lowercase)
97 aliases = self._generate_aliases(info)
98 for alias in aliases:
99 self._descriptor_aliases[alias.lower()] = canonical_key
101 def _generate_aliases(self, info: SIGInfo) -> set[str]:
102 """Generate name/ID-based alias keys for domain info types (UUID variations handled by BluetoothUUID)."""
103 aliases: set[str] = {
104 info.name.lower(),
105 }
107 if info.id:
108 aliases.add(info.id)
110 if info.id and "service" in info.id:
111 service_name = info.id.replace("org.bluetooth.service.", "")
112 if service_name.endswith("_service"):
113 service_name = service_name[:-8] # Remove _service
114 service_name = service_name.replace("_", " ").title()
115 aliases.add(service_name)
116 # Also add "Service" suffix if not present
117 if not service_name.endswith(" Service"):
118 aliases.add(service_name + " Service")
119 elif info.id and "characteristic" in info.id:
120 char_name = info.id.replace("org.bluetooth.characteristic.", "")
121 char_name = char_name.replace("_", " ").title()
122 aliases.add(char_name)
124 # Add space-separated words from name
125 name_words = info.name.replace("_", " ").replace("-", " ")
126 if " " in name_words:
127 aliases.add(name_words.title())
128 aliases.add(name_words.lower())
130 # Remove empty strings, None values, and the canonical key itself
131 canonical_key = info.uuid.normalized
132 return {alias for alias in aliases if alias and alias.strip() and alias != canonical_key}
134 def _load_uuids(self) -> None: # pylint: disable=too-many-branches
135 """Load all UUIDs from YAML files."""
136 base_path = find_bluetooth_sig_path()
137 if not base_path:
138 return
140 # Load service UUIDs
141 service_yaml = base_path / "service_uuids.yaml"
142 if service_yaml.exists():
143 for uuid_info in load_yaml_uuids(service_yaml):
144 uuid = normalize_uuid_string(uuid_info["uuid"])
146 bt_uuid = BluetoothUUID(uuid)
147 info = ServiceInfo(
148 uuid=bt_uuid,
149 name=uuid_info["name"],
150 id=uuid_info.get("id", ""),
151 )
152 self._store_service(info)
154 # Load characteristic UUIDs
155 characteristic_yaml = base_path / "characteristic_uuids.yaml"
156 if characteristic_yaml.exists():
157 for uuid_info in load_yaml_uuids(characteristic_yaml):
158 uuid = normalize_uuid_string(uuid_info["uuid"])
160 bt_uuid = BluetoothUUID(uuid)
161 char_info = CharacteristicInfo(
162 uuid=bt_uuid,
163 name=uuid_info["name"],
164 id=uuid_info.get("id", ""),
165 unit="", # Will be set from unit mappings if available
166 )
167 self._store_characteristic(char_info)
169 # Load descriptor UUIDs
170 descriptor_yaml = base_path / "descriptors.yaml"
171 if descriptor_yaml.exists():
172 for uuid_info in load_yaml_uuids(descriptor_yaml):
173 uuid = normalize_uuid_string(uuid_info["uuid"])
175 bt_uuid = BluetoothUUID(uuid)
176 desc_info = DescriptorInfo(
177 uuid=bt_uuid,
178 name=uuid_info["name"],
179 id=uuid_info.get("id", ""),
180 )
181 self._store_descriptor(desc_info)
183 # Load GSS specifications
184 self._gss_registry = GssRegistry.get_instance()
185 self._load_gss_characteristic_info()
187 # TODO: Remove when bluetooth_sig submodule includes these UUIDs.
188 # Analog (0x2A58) and Digital (0x2A56) are present in service specs
189 # (Automation IO) but absent from the assigned-number YAML files.
190 _yaml_absent: list[tuple[BluetoothUUID, str, str]] = [
191 (BluetoothUUID(0x2A56), "Digital", "org.bluetooth.characteristic.digital"),
192 (BluetoothUUID(0x2A58), "Analog", "org.bluetooth.characteristic.analog"),
193 ]
194 for _bt_uuid, _name, _identifier in _yaml_absent:
195 if _bt_uuid.normalized not in self._characteristics:
196 self._store_characteristic(
197 CharacteristicInfo(
198 uuid=_bt_uuid,
199 name=_name,
200 id=_identifier,
201 unit="",
202 python_type=None,
203 )
204 )
206 def _load_gss_characteristic_info(self) -> None:
207 """Load GSS specs and update characteristics with extracted info."""
208 if self._gss_registry is None:
209 return
211 all_specs = self._gss_registry.get_all_specs()
213 # Group by identifier to avoid duplicate processing
214 processed_ids: set[str] = set()
215 for spec in all_specs.values():
216 if spec.identifier in processed_ids:
217 continue
218 processed_ids.add(spec.identifier)
220 # Extract unit and value_type from structure
221 char_data = {
222 "structure": [
223 {
224 "field": f.field,
225 "type": f.type,
226 "size": f.size,
227 "description": f.description,
228 }
229 for f in spec.structure
230 ]
231 }
232 unit, value_type = self._gss_registry.extract_info_from_gss(char_data)
234 # Multi-field structs have per-field units; no single representative
235 # unit, and the first field's scalar wire type (e.g. int) is not
236 # representative of the struct-valued characteristic.
237 if len(spec.structure) > 1:
238 unit = None
239 value_type = None
241 if unit or value_type:
242 self._update_characteristic_with_gss_info(spec.name, spec.identifier, unit, value_type)
244 def _update_characteristic_with_gss_info(
245 self, char_name: str, char_id: str, unit: str | None, python_type: type | None
246 ) -> None:
247 """Update existing characteristic with GSS info."""
248 with self._lock:
249 # Find the canonical entry by checking aliases (normalized to lowercase)
250 canonical_uuid = None
251 for search_key in [char_name, char_id]:
252 canonical_uuid = self._characteristic_aliases.get(search_key.lower())
253 if canonical_uuid:
254 break
256 if not canonical_uuid or canonical_uuid not in self._characteristics:
257 return
259 # Get existing info and create updated version
260 existing_info = self._characteristics[canonical_uuid]
262 # Use provided python_type or keep existing
263 new_python_type = python_type if python_type is not None else existing_info.python_type
265 # Create updated CharacteristicInfo (immutable, so create new instance)
266 updated_info = CharacteristicInfo(
267 uuid=existing_info.uuid,
268 name=existing_info.name,
269 id=existing_info.id,
270 unit=unit or existing_info.unit,
271 python_type=new_python_type,
272 )
274 # Update canonical store (aliases remain the same since UUID/name/id unchanged)
275 self._characteristics[canonical_uuid] = updated_info
277 def _convert_bluetooth_unit_to_readable(self, unit_spec: str) -> str:
278 """Convert Bluetooth SIG unit specification to human-readable symbol.
280 Args:
281 unit_spec: Unit specification (e.g., "thermodynamic_temperature.degree_celsius")
283 Returns:
284 Human-readable symbol (e.g., "°C"), or unit_spec if no mapping found
285 """
286 unit_spec = unit_spec.rstrip(".").lower()
287 unit_id = f"org.bluetooth.unit.{unit_spec}"
289 units_registry = UnitsRegistry.get_instance()
290 unit_info = units_registry.get_info(unit_id)
291 if unit_info and unit_info.symbol:
292 return unit_info.symbol
294 return unit_spec
296 def register_characteristic( # pylint: disable=too-many-arguments,too-many-positional-arguments
297 self,
298 uuid: BluetoothUUID,
299 name: str,
300 identifier: str | None = None,
301 unit: str | None = None,
302 python_type: type | str | None = None,
303 override: bool = False,
304 ) -> None:
305 """Register a custom characteristic at runtime.
307 Args:
308 uuid: The Bluetooth UUID for the characteristic
309 name: Human-readable name
310 identifier: Optional identifier (auto-generated if not provided)
311 unit: Optional unit of measurement
312 python_type: Optional Python type for the value
313 override: If True, allow overriding existing entries
314 """
315 with self._lock:
316 canonical_key = uuid.normalized
318 # Check for conflicts with existing entries
319 if canonical_key in self._characteristics:
320 # Check if it's a SIG characteristic (not in runtime set)
321 if canonical_key not in self._runtime_uuids:
322 if not override:
323 raise ValueError(
324 f"UUID {uuid} conflicts with existing SIG "
325 "characteristic entry. Use override=True to replace."
326 )
327 # Preserve original SIG entry for restoration
328 self._characteristic_overrides.setdefault(canonical_key, self._characteristics[canonical_key])
329 elif not override:
330 # Runtime entry already exists
331 raise ValueError(
332 f"UUID {uuid} already registered as runtime characteristic. Use override=True to replace."
333 )
335 info = CharacteristicInfo(
336 uuid=uuid,
337 name=name,
338 id=identifier or f"runtime.characteristic.{name.lower().replace(' ', '_')}",
339 unit=unit or "",
340 python_type=python_type,
341 )
343 # Track as runtime-registered UUID
344 self._runtime_uuids.add(canonical_key)
346 self._store_characteristic(info)
348 def register_service(
349 self,
350 uuid: BluetoothUUID,
351 name: str,
352 identifier: str | None = None,
353 override: bool = False,
354 ) -> None:
355 """Register a custom service at runtime.
357 Args:
358 uuid: The Bluetooth UUID for the service
359 name: Human-readable name
360 identifier: Optional identifier (auto-generated if not provided)
361 override: If True, allow overriding existing entries
362 """
363 with self._lock:
364 canonical_key = uuid.normalized
366 # Check for conflicts with existing entries
367 if canonical_key in self._services:
368 # Check if it's a SIG service (not in runtime set)
369 if canonical_key not in self._runtime_uuids:
370 if not override:
371 raise ValueError(
372 f"UUID {uuid} conflicts with existing SIG service entry. Use override=True to replace."
373 )
374 # Preserve original SIG entry for restoration
375 self._service_overrides.setdefault(canonical_key, self._services[canonical_key])
376 elif not override:
377 # Runtime entry already exists
378 raise ValueError(
379 f"UUID {uuid} already registered as runtime service. Use override=True to replace."
380 )
382 info = ServiceInfo(
383 uuid=uuid,
384 name=name,
385 id=identifier or f"runtime.service.{name.lower().replace(' ', '_')}",
386 )
388 # Track as runtime-registered UUID
389 self._runtime_uuids.add(canonical_key)
391 self._store_service(info)
393 def get_service_info(self, key: str | BluetoothUUID) -> ServiceInfo | None:
394 """Get information about a service by UUID, name, or ID."""
395 with self._lock:
396 # Convert BluetoothUUID to canonical key
397 if isinstance(key, BluetoothUUID):
398 canonical_key = key.normalized
399 # Direct canonical lookup
400 if canonical_key in self._services:
401 return self._services[canonical_key]
402 else:
403 search_key = str(key).strip()
405 # Try UUID normalization first
406 try:
407 bt_uuid = BluetoothUUID(search_key)
408 canonical_key = bt_uuid.normalized
409 if canonical_key in self._services:
410 return self._services[canonical_key]
411 except ValueError:
412 pass # UUID normalization failed, continue to alias lookup
414 # Check alias index (normalized to lowercase)
415 alias_key = self._service_aliases.get(search_key.lower())
416 if alias_key and alias_key in self._services:
417 return self._services[alias_key]
419 return None
421 def get_characteristic_info(self, identifier: str | BluetoothUUID) -> CharacteristicInfo | None:
422 """Get information about a characteristic by UUID, name, or ID."""
423 with self._lock:
424 # Convert BluetoothUUID to canonical key
425 if isinstance(identifier, BluetoothUUID):
426 canonical_key = identifier.normalized
427 # Direct canonical lookup
428 if canonical_key in self._characteristics:
429 return self._characteristics[canonical_key]
430 else:
431 search_key = str(identifier).strip()
433 # Try UUID normalization first
434 try:
435 bt_uuid = BluetoothUUID(search_key)
436 canonical_key = bt_uuid.normalized
437 if canonical_key in self._characteristics:
438 return self._characteristics[canonical_key]
439 except ValueError:
440 pass # UUID normalization failed, continue to alias lookup
442 # Check alias index (normalized to lowercase)
443 alias_key = self._characteristic_aliases.get(search_key.lower())
444 if alias_key and alias_key in self._characteristics:
445 return self._characteristics[alias_key]
447 return None
449 def get_descriptor_info(self, identifier: str | BluetoothUUID) -> DescriptorInfo | None:
450 """Get information about a descriptor by UUID, name, or ID."""
451 with self._lock:
452 # Convert BluetoothUUID to canonical key
453 if isinstance(identifier, BluetoothUUID):
454 canonical_key = identifier.normalized
455 # Direct canonical lookup
456 if canonical_key in self._descriptors:
457 return self._descriptors[canonical_key]
458 else:
459 search_key = str(identifier).strip()
461 # Try UUID normalization first
462 try:
463 bt_uuid = BluetoothUUID(search_key)
464 canonical_key = bt_uuid.normalized
465 if canonical_key in self._descriptors:
466 return self._descriptors[canonical_key]
467 except ValueError:
468 pass # UUID normalization failed, continue to alias lookup
470 # Check alias index (normalized to lowercase)
471 alias_key = self._descriptor_aliases.get(search_key.lower())
472 if alias_key and alias_key in self._descriptors:
473 return self._descriptors[alias_key]
475 return None
477 def get_gss_spec(self, identifier: str | BluetoothUUID) -> GssCharacteristicSpec | None:
478 """Get the full GSS characteristic specification with all field metadata.
480 This provides access to the complete YAML structure including all fields,
481 their units, resolutions, ranges, and presence conditions.
483 Args:
484 identifier: Characteristic name, ID, or UUID
486 Returns:
487 GssCharacteristicSpec with full field structure, or None if not found
489 Example::
490 gss = uuid_registry.get_gss_spec("Location and Speed")
491 if gss:
492 for field in gss.structure:
493 print(f"{field.python_name}: unit={field.unit_id}, resolution={field.resolution}")
495 """
496 if self._gss_registry is None:
497 return None
499 with self._lock:
500 # Try direct lookup by name or ID
501 if isinstance(identifier, str):
502 spec = self._gss_registry.get_spec(identifier)
503 if spec:
504 return spec
506 # Try to get CharacteristicInfo to find the ID
507 char_info = self.get_characteristic_info(identifier)
508 if char_info:
509 spec = self._gss_registry.get_spec(char_info.id)
510 if spec:
511 return spec
512 elif isinstance(identifier, BluetoothUUID):
513 # Look up by UUID
514 char_info = self.get_characteristic_info(identifier)
515 if char_info:
516 spec = self._gss_registry.get_spec(char_info.name)
517 if spec:
518 return spec
519 spec = self._gss_registry.get_spec(char_info.id)
520 if spec:
521 return spec
523 return None
525 def resolve_characteristic_spec(self, characteristic_name: str) -> CharacteristicSpec | None: # pylint: disable=too-many-locals
526 """Resolve characteristic specification with rich YAML metadata.
528 This method provides detailed characteristic information including data types,
529 field sizes, units, and descriptions by cross-referencing multiple YAML sources.
531 Args:
532 characteristic_name: Name of the characteristic (e.g., "Temperature", "Battery Level")
534 Returns:
535 CharacteristicSpec with full metadata, or None if not found
537 Example::
538 spec = uuid_registry.resolve_characteristic_spec("Temperature")
539 if spec:
540 print(f"UUID: {spec.uuid}, Unit: {spec.unit_symbol}, Type: {spec.data_type}")
542 """
543 with self._lock:
544 # 1. Get UUID from characteristic registry
545 char_info = self.get_characteristic_info(characteristic_name)
546 if not char_info:
547 return None
549 # 2. Get typed GSS specification if available
550 gss_spec = self.get_gss_spec(characteristic_name)
552 # 3. Extract metadata from GSS specification
553 data_type = None
554 field_size = None
555 unit_id = None
556 unit_symbol = None
557 unit_readable_name = None
558 base_unit = None
559 resolution_text = None
560 description = None
562 if gss_spec:
563 description = gss_spec.description
565 # Only set data_type for single-field characteristics
566 # Multi-field characteristics have complex structures and no single data type
567 if len(gss_spec.structure) == 1:
568 # Use primary field for metadata extraction
569 primary = gss_spec.primary_field
570 if primary:
571 data_type = primary.type
572 field_size = str(primary.fixed_size) if primary.fixed_size else primary.size
574 # Use FieldSpec's unit_id property (auto-parsed from description)
575 if primary.unit_id:
576 unit_id = f"org.bluetooth.unit.{primary.unit_id}"
577 unit_symbol = self._convert_bluetooth_unit_to_readable(primary.unit_id)
578 # Preserve the human-readable long-form name
579 unit_info_obj = UnitsRegistry.get_instance().get_info(unit_id)
580 if unit_info_obj:
581 unit_readable_name = unit_info_obj.readable_name
582 base_unit = unit_id
584 # Get resolution from FieldSpec
585 if primary.resolution is not None:
586 resolution_text = f"Resolution: {primary.resolution}"
588 # 4. Use existing unit from CharacteristicInfo if GSS didn't provide one.
589 # Multi-field structs have per-field units; don't promote one to top-level.
590 is_multi_field = gss_spec is not None and len(gss_spec.structure) > 1
591 if not unit_symbol and char_info.unit and not is_multi_field:
592 unit_symbol = char_info.unit
594 return CharacteristicSpec(
595 uuid=char_info.uuid,
596 name=char_info.name,
597 field_info=FieldInfo(data_type=data_type, field_size=field_size),
598 unit_info=UnitMetadata(
599 unit_id=unit_id,
600 unit_symbol=unit_symbol,
601 unit_name=unit_readable_name,
602 base_unit=base_unit,
603 resolution_text=resolution_text,
604 ),
605 description=description,
606 structure=gss_spec.structure if gss_spec else [],
607 )
609 def get_signed_from_data_type(self, data_type: str | None) -> bool:
610 """Determine if data type is signed from GSS data type.
612 Args:
613 data_type: GSS data type string (e.g., "sint16", "float32", "uint8")
615 Returns:
616 True if the type represents signed values, False otherwise
618 """
619 if not data_type:
620 return False
621 # Comprehensive signed type detection
622 signed_types = {"float32", "float64", "medfloat16", "medfloat32"}
623 return data_type.startswith("sint") or data_type in signed_types
625 @staticmethod
626 def get_byte_order_hint() -> str:
627 """Get byte order hint for Bluetooth SIG specifications.
629 Returns:
630 "little" - Bluetooth SIG uses little-endian by convention
632 """
633 return "little"
635 def clear_custom_registrations(self) -> None:
636 """Clear all custom registrations (for testing)."""
637 with self._lock:
638 # Use runtime_uuids set to identify what to remove
639 runtime_keys = list(self._runtime_uuids)
641 # Remove runtime entries from canonical stores
642 for key in runtime_keys:
643 self._services.pop(key, None)
644 self._characteristics.pop(key, None)
645 self._descriptors.pop(key, None)
647 # Remove corresponding aliases (alias -> canonical_key where canonical_key is runtime)
648 runtime_service_aliases = [
649 alias for alias, canonical in self._service_aliases.items() if canonical in runtime_keys
650 ]
651 runtime_char_aliases = [
652 alias for alias, canonical in self._characteristic_aliases.items() if canonical in runtime_keys
653 ]
654 runtime_desc_aliases = [
655 alias for alias, canonical in self._descriptor_aliases.items() if canonical in runtime_keys
656 ]
658 for alias in runtime_service_aliases:
659 del self._service_aliases[alias]
660 for alias in runtime_char_aliases:
661 del self._characteristic_aliases[alias]
662 for alias in runtime_desc_aliases:
663 del self._descriptor_aliases[alias]
665 # Restore any preserved SIG entries that were overridden
666 for key in runtime_keys:
667 original = self._service_overrides.pop(key, None)
668 if original is not None:
669 self._store_service(original)
670 original = self._characteristic_overrides.pop(key, None)
671 if original is not None:
672 self._store_characteristic(original)
673 original = self._descriptor_overrides.pop(key, None)
674 if original is not None:
675 self._store_descriptor(original)
677 # Clear the runtime tracking set
678 self._runtime_uuids.clear()
681# Global instance
682uuid_registry = UuidRegistry()