Coverage for src/bluetooth_sig/gatt/characteristics/registry.py: 92%
163 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 00:10 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 00:10 +0000
1"""Bluetooth SIG GATT characteristic registry.
3This module contains the characteristic registry implementation and
4class mappings. CharacteristicName enum is now centralized in
5types.gatt_enums to avoid circular imports.
6"""
8from __future__ import annotations
10import re
11import threading
12from functools import lru_cache
14from typing_extensions import TypeGuard
16from ...types.gatt_enums import CharacteristicName
17from ...types.uuid import BluetoothUUID
18from ..registry_utils import ModuleDiscovery, TypeValidator
19from ..resolver import NameVariantGenerator
20from ..uuid_registry import uuid_registry
21from .base import BaseCharacteristic
23# Export for other modules to import
24__all__ = ["CharacteristicName", "CharacteristicRegistry"]
27class _CharacteristicClassValidator: # pylint: disable=too-few-public-methods
28 """Utility class for validating characteristic classes.
30 Note: Single-purpose validator class - pylint disable justified.
31 """
33 @staticmethod
34 def is_characteristic_subclass(candidate: object) -> TypeGuard[type[BaseCharacteristic]]:
35 """Return True when candidate is a BaseCharacteristic subclass."""
36 return TypeValidator.is_subclass_of(candidate, BaseCharacteristic)
39class _RegistryKeyBuilder:
40 """Builds registry lookup keys for characteristics."""
42 _NON_ALPHANUMERIC_RE = re.compile(r"[^a-z0-9]+")
44 # Special cases for characteristics whose YAML names don't match enum display names
45 # NOTE: CO2 uses LaTeX formatting in official Bluetooth SIG spec: "CO\textsubscript{2} Concentration"
46 _SPECIAL_INFO_NAME_TO_ENUM = {
47 "CO\\textsubscript{2} Concentration": CharacteristicName.CO2_CONCENTRATION,
48 }
50 @classmethod
51 def slugify_characteristic_identifier(cls, value: str) -> str:
52 """Convert a characteristic display name into an org.bluetooth identifier slug."""
53 return cls._NON_ALPHANUMERIC_RE.sub("_", value.lower()).strip("_")
55 @classmethod
56 def generate_candidate_keys(cls, enum_member: CharacteristicName) -> list[str]:
57 """Generate registry lookup keys for a characteristic enum value."""
58 class_name = enum_member.value.replace(" ", "") + "Characteristic"
59 variants = NameVariantGenerator.generate_characteristic_variants(class_name, enum_member.value)
60 slug = cls.slugify_characteristic_identifier(enum_member.value)
61 org_identifier = f"org.bluetooth.characteristic.{slug}"
62 return [*variants, enum_member.name.replace("_", " "), org_identifier]
64 @classmethod
65 def build_uuid_to_enum_map(cls) -> dict[str, CharacteristicName]:
66 """Create a mapping from normalized UUID string to CharacteristicName."""
67 uuid_to_enum: dict[str, CharacteristicName] = {}
69 for enum_member in CharacteristicName:
70 for candidate in cls.generate_candidate_keys(enum_member):
71 info = uuid_registry.get_characteristic_info(candidate)
72 if info is None:
73 continue
74 uuid_to_enum[info.uuid.normalized] = enum_member
75 break
77 for info_name, enum_member in cls._SPECIAL_INFO_NAME_TO_ENUM.items():
78 info = uuid_registry.get_characteristic_info(info_name)
79 if info is None:
80 continue
81 uuid_to_enum.setdefault(info.uuid.normalized, enum_member)
83 return uuid_to_enum
86class _CharacteristicClassDiscovery:
87 """Handles discovery and validation of characteristic classes in the package."""
89 _MODULE_EXCLUSIONS = {"__main__", "__init__", "base", "registry", "templates"}
91 @classmethod
92 def iter_module_names(cls) -> list[str]:
93 """Return sorted characteristic module names discovered via pkgutil.walk_packages.
95 References:
96 Python standard library documentation, pkgutil.walk_packages,
97 https://docs.python.org/3/library/pkgutil.html#pkgutil.walk_packages
99 """
100 package_name = __package__ or "bluetooth_sig.gatt.characteristics"
101 return ModuleDiscovery.iter_module_names(package_name, cls._MODULE_EXCLUSIONS)
103 @classmethod
104 def discover_classes(cls) -> list[type[BaseCharacteristic]]:
105 """Discover all concrete characteristic classes defined in the package.
107 Validates that discovered classes have required methods for proper operation.
108 """
109 module_names = cls.iter_module_names()
110 return ModuleDiscovery.discover_classes(
111 module_names,
112 BaseCharacteristic,
113 _CharacteristicClassValidator.is_characteristic_subclass,
114 )
117class _CharacteristicMapBuilder:
118 """Builds and caches the characteristic class map."""
120 @staticmethod
121 def build_map() -> dict[CharacteristicName, type[BaseCharacteristic]]:
122 """Build the characteristic class mapping lazily using runtime discovery."""
123 mapping: dict[CharacteristicName, type[BaseCharacteristic]] = {}
124 uuid_to_enum = _RegistryKeyBuilder.build_uuid_to_enum_map()
126 for char_cls in _CharacteristicClassDiscovery.discover_classes():
127 uuid_obj = char_cls.get_class_uuid()
128 if uuid_obj is None:
129 continue
130 enum_member = uuid_to_enum.get(uuid_obj.normalized)
131 if enum_member is None:
132 continue
133 existing = mapping.get(enum_member)
134 if existing is not None and existing is not char_cls:
135 raise RuntimeError(
136 f"Multiple characteristic classes resolved for {enum_member.name}:"
137 f" {existing.__name__} and {char_cls.__name__}"
138 )
139 mapping[enum_member] = char_cls
141 return mapping
143 @staticmethod
144 @lru_cache(maxsize=1)
145 def get_cached_map() -> dict[CharacteristicName, type[BaseCharacteristic]]:
146 """Return the cached characteristic class map."""
147 return _CharacteristicMapBuilder.build_map()
149 @staticmethod
150 def clear_cache() -> None:
151 """Clear the characteristic class map cache."""
152 _CharacteristicMapBuilder.get_cached_map.cache_clear()
155# Public API - enum-keyed map
156CHARACTERISTIC_CLASS_MAP = _CharacteristicMapBuilder.get_cached_map()
159class CharacteristicRegistry:
160 """Encapsulates all GATT characteristic registry operations."""
162 _lock = threading.RLock()
163 _custom_characteristic_classes: dict[BluetoothUUID, type[BaseCharacteristic]] = {}
165 @classmethod
166 def register_characteristic_class(cls, uuid: str | BluetoothUUID, char_cls: object, override: bool = False) -> None:
167 """Register a custom characteristic class at runtime.
169 Args:
170 uuid: The characteristic UUID (string or BluetoothUUID)
171 char_cls: The characteristic class to register
172 override: Whether to override existing registrations
174 Raises:
175 TypeError: If char_cls does not inherit from BaseCharacteristic
176 ValueError: If UUID conflicts with existing registration and override=False
178 """
179 # Runtime safety check retained in case of dynamic caller misuse despite type hints.
180 if not _CharacteristicClassValidator.is_characteristic_subclass(char_cls):
181 raise TypeError(f"Class {char_cls!r} must inherit from BaseCharacteristic")
183 characteristic_cls: type[BaseCharacteristic] = char_cls
185 # Always normalize UUID to BluetoothUUID
186 bt_uuid = BluetoothUUID(uuid) if not isinstance(uuid, BluetoothUUID) else uuid
188 # Determine if this UUID is already represented by a SIG (built-in) characteristic
189 def _find_sig_class_for_uuid(target: BluetoothUUID) -> type[BaseCharacteristic] | None:
190 for candidate in _CharacteristicMapBuilder.get_cached_map().values():
191 resolved_uuid_obj = candidate.get_class_uuid()
192 if resolved_uuid_obj and resolved_uuid_obj == target:
193 return candidate
194 return None
196 sig_cls = _find_sig_class_for_uuid(bt_uuid)
198 with cls._lock:
199 # Prevent duplicate custom registration unless override explicitly requested
200 if not override and bt_uuid in cls._custom_characteristic_classes:
201 raise ValueError(f"UUID {bt_uuid} already registered. Use override=True to replace.")
203 # If collides with a SIG characteristic, enforce explicit override + permission flag
204 if sig_cls is not None:
205 if not override:
206 raise ValueError(
207 f"UUID {bt_uuid} conflicts with existing SIG characteristic {sig_cls.__name__}. "
208 "Use override=True to replace."
209 )
210 # Require an explicit opt‑in marker on the custom class
211 allows_override = characteristic_cls.get_allows_sig_override()
212 if not allows_override:
213 raise ValueError(
214 "Override of SIG characteristic "
215 f"{sig_cls.__name__} requires _allows_sig_override=True on {characteristic_cls.__name__}."
216 )
218 cls._custom_characteristic_classes[bt_uuid] = characteristic_cls
220 @classmethod
221 def unregister_characteristic_class(cls, uuid: str | BluetoothUUID) -> None:
222 """Unregister a custom characteristic class.
224 Args:
225 uuid: The characteristic UUID to unregister (string or BluetoothUUID)
227 """
228 bt_uuid = BluetoothUUID(uuid) if not isinstance(uuid, BluetoothUUID) else uuid
229 with cls._lock:
230 cls._custom_characteristic_classes.pop(bt_uuid, None)
232 @staticmethod
233 def get_characteristic_class(
234 name: CharacteristicName,
235 ) -> type[BaseCharacteristic] | None:
236 """Get the characteristic class for a given CharacteristicName enum.
238 This API is enum-only. Callers must pass a `CharacteristicName`.
239 """
240 return _CharacteristicMapBuilder.get_cached_map().get(name)
242 @staticmethod
243 def list_all_characteristic_names() -> list[str]:
244 """List all supported characteristic names as strings.
246 Returns:
247 List of all characteristic names.
249 """
250 return [e.value for e in CharacteristicName]
252 @staticmethod
253 def list_all_characteristic_enums() -> list[CharacteristicName]:
254 """List all supported characteristic names as enum values.
256 Returns:
257 List of all characteristic enum values.
259 """
260 return list(CharacteristicName)
262 @classmethod
263 def create_characteristic(
264 cls,
265 uuid: str | BluetoothUUID,
266 ) -> BaseCharacteristic | None:
267 """Create a characteristic instance from a UUID.
269 Args:
270 uuid: The characteristic UUID (string or BluetoothUUID).
272 Returns:
273 Characteristic instance if found, None otherwise.
275 """
276 # Handle UUID input
277 if isinstance(uuid, BluetoothUUID):
278 uuid_obj = uuid
279 else:
280 try:
281 uuid_obj = BluetoothUUID(uuid)
282 except ValueError:
283 # Invalid UUID format, cannot create characteristic
284 return None
286 # Check custom registry first
287 with cls._lock:
288 if custom_cls := cls._custom_characteristic_classes.get(uuid_obj):
289 return custom_cls()
291 # Look up by UUID at class level (no instantiation needed)
292 for _, char_cls in _CharacteristicMapBuilder.get_cached_map().items():
293 resolved_uuid = char_cls.get_class_uuid()
294 if resolved_uuid and resolved_uuid == uuid_obj:
295 return char_cls()
297 return None
299 @classmethod
300 def get_characteristic_class_by_uuid(cls, uuid: str | BluetoothUUID) -> type[BaseCharacteristic] | None:
301 """Get the characteristic class for a given UUID.
303 Args:
304 uuid: The characteristic UUID (with or without dashes).
306 Returns:
307 The characteristic class if found, None otherwise.
309 """
310 # Always normalize UUID to BluetoothUUID
311 try:
312 bt_uuid = BluetoothUUID(uuid) if not isinstance(uuid, BluetoothUUID) else uuid
313 except ValueError:
314 return None
316 # Check custom registry first
317 with cls._lock:
318 if custom_cls := cls._custom_characteristic_classes.get(bt_uuid):
319 return custom_cls
321 # Look up by UUID at class level (no instantiation needed)
322 for char_cls in _CharacteristicMapBuilder.get_cached_map().values():
323 resolved_uuid = char_cls.get_class_uuid()
324 if resolved_uuid and resolved_uuid == bt_uuid:
325 return char_cls
327 return None
329 @staticmethod
330 def get_all_characteristics() -> dict[CharacteristicName, type[BaseCharacteristic]]:
331 """Get all registered characteristic classes.
333 Returns:
334 Dictionary mapping characteristic names to classes
336 """
337 result: dict[CharacteristicName, type[BaseCharacteristic]] = {}
338 for name, char_cls in _CharacteristicMapBuilder.get_cached_map().items():
339 result[name] = char_cls
340 return result
342 @classmethod
343 def clear_custom_registrations(cls) -> None:
344 """Clear all custom characteristic registrations (for testing)."""
345 with cls._lock:
346 cls._custom_characteristic_classes.clear()
348 @staticmethod
349 def clear_cache() -> None:
350 """Clear the characteristic class map cache (for testing).
352 This forces the registry to be rebuilt on next access.
353 Use sparingly - primarily for testing scenarios where
354 characteristic classes are modified at runtime.
355 """
356 _CharacteristicMapBuilder.clear_cache()