Coverage for src/bluetooth_sig/gatt/characteristics/registry.py: 92%

163 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-30 00:10 +0000

1"""Bluetooth SIG GATT characteristic registry. 

2 

3This module contains the characteristic registry implementation and 

4class mappings. CharacteristicName enum is now centralized in 

5types.gatt_enums to avoid circular imports. 

6""" 

7 

8from __future__ import annotations 

9 

10import re 

11import threading 

12from functools import lru_cache 

13 

14from typing_extensions import TypeGuard 

15 

16from ...types.gatt_enums import CharacteristicName 

17from ...types.uuid import BluetoothUUID 

18from ..registry_utils import ModuleDiscovery, TypeValidator 

19from ..resolver import NameVariantGenerator 

20from ..uuid_registry import uuid_registry 

21from .base import BaseCharacteristic 

22 

23# Export for other modules to import 

24__all__ = ["CharacteristicName", "CharacteristicRegistry"] 

25 

26 

27class _CharacteristicClassValidator: # pylint: disable=too-few-public-methods 

28 """Utility class for validating characteristic classes. 

29 

30 Note: Single-purpose validator class - pylint disable justified. 

31 """ 

32 

33 @staticmethod 

34 def is_characteristic_subclass(candidate: object) -> TypeGuard[type[BaseCharacteristic]]: 

35 """Return True when candidate is a BaseCharacteristic subclass.""" 

36 return TypeValidator.is_subclass_of(candidate, BaseCharacteristic) 

37 

38 

39class _RegistryKeyBuilder: 

40 """Builds registry lookup keys for characteristics.""" 

41 

42 _NON_ALPHANUMERIC_RE = re.compile(r"[^a-z0-9]+") 

43 

44 # Special cases for characteristics whose YAML names don't match enum display names 

45 # NOTE: CO2 uses LaTeX formatting in official Bluetooth SIG spec: "CO\textsubscript{2} Concentration" 

46 _SPECIAL_INFO_NAME_TO_ENUM = { 

47 "CO\\textsubscript{2} Concentration": CharacteristicName.CO2_CONCENTRATION, 

48 } 

49 

50 @classmethod 

51 def slugify_characteristic_identifier(cls, value: str) -> str: 

52 """Convert a characteristic display name into an org.bluetooth identifier slug.""" 

53 return cls._NON_ALPHANUMERIC_RE.sub("_", value.lower()).strip("_") 

54 

55 @classmethod 

56 def generate_candidate_keys(cls, enum_member: CharacteristicName) -> list[str]: 

57 """Generate registry lookup keys for a characteristic enum value.""" 

58 class_name = enum_member.value.replace(" ", "") + "Characteristic" 

59 variants = NameVariantGenerator.generate_characteristic_variants(class_name, enum_member.value) 

60 slug = cls.slugify_characteristic_identifier(enum_member.value) 

61 org_identifier = f"org.bluetooth.characteristic.{slug}" 

62 return [*variants, enum_member.name.replace("_", " "), org_identifier] 

63 

64 @classmethod 

65 def build_uuid_to_enum_map(cls) -> dict[str, CharacteristicName]: 

66 """Create a mapping from normalized UUID string to CharacteristicName.""" 

67 uuid_to_enum: dict[str, CharacteristicName] = {} 

68 

69 for enum_member in CharacteristicName: 

70 for candidate in cls.generate_candidate_keys(enum_member): 

71 info = uuid_registry.get_characteristic_info(candidate) 

72 if info is None: 

73 continue 

74 uuid_to_enum[info.uuid.normalized] = enum_member 

75 break 

76 

77 for info_name, enum_member in cls._SPECIAL_INFO_NAME_TO_ENUM.items(): 

78 info = uuid_registry.get_characteristic_info(info_name) 

79 if info is None: 

80 continue 

81 uuid_to_enum.setdefault(info.uuid.normalized, enum_member) 

82 

83 return uuid_to_enum 

84 

85 

86class _CharacteristicClassDiscovery: 

87 """Handles discovery and validation of characteristic classes in the package.""" 

88 

89 _MODULE_EXCLUSIONS = {"__main__", "__init__", "base", "registry", "templates"} 

90 

91 @classmethod 

92 def iter_module_names(cls) -> list[str]: 

93 """Return sorted characteristic module names discovered via pkgutil.walk_packages. 

94 

95 References: 

96 Python standard library documentation, pkgutil.walk_packages, 

97 https://docs.python.org/3/library/pkgutil.html#pkgutil.walk_packages 

98 

99 """ 

100 package_name = __package__ or "bluetooth_sig.gatt.characteristics" 

101 return ModuleDiscovery.iter_module_names(package_name, cls._MODULE_EXCLUSIONS) 

102 

103 @classmethod 

104 def discover_classes(cls) -> list[type[BaseCharacteristic]]: 

105 """Discover all concrete characteristic classes defined in the package. 

106 

107 Validates that discovered classes have required methods for proper operation. 

108 """ 

109 module_names = cls.iter_module_names() 

110 return ModuleDiscovery.discover_classes( 

111 module_names, 

112 BaseCharacteristic, 

113 _CharacteristicClassValidator.is_characteristic_subclass, 

114 ) 

115 

116 

117class _CharacteristicMapBuilder: 

118 """Builds and caches the characteristic class map.""" 

119 

120 @staticmethod 

121 def build_map() -> dict[CharacteristicName, type[BaseCharacteristic]]: 

122 """Build the characteristic class mapping lazily using runtime discovery.""" 

123 mapping: dict[CharacteristicName, type[BaseCharacteristic]] = {} 

124 uuid_to_enum = _RegistryKeyBuilder.build_uuid_to_enum_map() 

125 

126 for char_cls in _CharacteristicClassDiscovery.discover_classes(): 

127 uuid_obj = char_cls.get_class_uuid() 

128 if uuid_obj is None: 

129 continue 

130 enum_member = uuid_to_enum.get(uuid_obj.normalized) 

131 if enum_member is None: 

132 continue 

133 existing = mapping.get(enum_member) 

134 if existing is not None and existing is not char_cls: 

135 raise RuntimeError( 

136 f"Multiple characteristic classes resolved for {enum_member.name}:" 

137 f" {existing.__name__} and {char_cls.__name__}" 

138 ) 

139 mapping[enum_member] = char_cls 

140 

141 return mapping 

142 

143 @staticmethod 

144 @lru_cache(maxsize=1) 

145 def get_cached_map() -> dict[CharacteristicName, type[BaseCharacteristic]]: 

146 """Return the cached characteristic class map.""" 

147 return _CharacteristicMapBuilder.build_map() 

148 

149 @staticmethod 

150 def clear_cache() -> None: 

151 """Clear the characteristic class map cache.""" 

152 _CharacteristicMapBuilder.get_cached_map.cache_clear() 

153 

154 

155# Public API - enum-keyed map 

156CHARACTERISTIC_CLASS_MAP = _CharacteristicMapBuilder.get_cached_map() 

157 

158 

159class CharacteristicRegistry: 

160 """Encapsulates all GATT characteristic registry operations.""" 

161 

162 _lock = threading.RLock() 

163 _custom_characteristic_classes: dict[BluetoothUUID, type[BaseCharacteristic]] = {} 

164 

165 @classmethod 

166 def register_characteristic_class(cls, uuid: str | BluetoothUUID, char_cls: object, override: bool = False) -> None: 

167 """Register a custom characteristic class at runtime. 

168 

169 Args: 

170 uuid: The characteristic UUID (string or BluetoothUUID) 

171 char_cls: The characteristic class to register 

172 override: Whether to override existing registrations 

173 

174 Raises: 

175 TypeError: If char_cls does not inherit from BaseCharacteristic 

176 ValueError: If UUID conflicts with existing registration and override=False 

177 

178 """ 

179 # Runtime safety check retained in case of dynamic caller misuse despite type hints. 

180 if not _CharacteristicClassValidator.is_characteristic_subclass(char_cls): 

181 raise TypeError(f"Class {char_cls!r} must inherit from BaseCharacteristic") 

182 

183 characteristic_cls: type[BaseCharacteristic] = char_cls 

184 

185 # Always normalize UUID to BluetoothUUID 

186 bt_uuid = BluetoothUUID(uuid) if not isinstance(uuid, BluetoothUUID) else uuid 

187 

188 # Determine if this UUID is already represented by a SIG (built-in) characteristic 

189 def _find_sig_class_for_uuid(target: BluetoothUUID) -> type[BaseCharacteristic] | None: 

190 for candidate in _CharacteristicMapBuilder.get_cached_map().values(): 

191 resolved_uuid_obj = candidate.get_class_uuid() 

192 if resolved_uuid_obj and resolved_uuid_obj == target: 

193 return candidate 

194 return None 

195 

196 sig_cls = _find_sig_class_for_uuid(bt_uuid) 

197 

198 with cls._lock: 

199 # Prevent duplicate custom registration unless override explicitly requested 

200 if not override and bt_uuid in cls._custom_characteristic_classes: 

201 raise ValueError(f"UUID {bt_uuid} already registered. Use override=True to replace.") 

202 

203 # If collides with a SIG characteristic, enforce explicit override + permission flag 

204 if sig_cls is not None: 

205 if not override: 

206 raise ValueError( 

207 f"UUID {bt_uuid} conflicts with existing SIG characteristic {sig_cls.__name__}. " 

208 "Use override=True to replace." 

209 ) 

210 # Require an explicit opt‑in marker on the custom class 

211 allows_override = characteristic_cls.get_allows_sig_override() 

212 if not allows_override: 

213 raise ValueError( 

214 "Override of SIG characteristic " 

215 f"{sig_cls.__name__} requires _allows_sig_override=True on {characteristic_cls.__name__}." 

216 ) 

217 

218 cls._custom_characteristic_classes[bt_uuid] = characteristic_cls 

219 

220 @classmethod 

221 def unregister_characteristic_class(cls, uuid: str | BluetoothUUID) -> None: 

222 """Unregister a custom characteristic class. 

223 

224 Args: 

225 uuid: The characteristic UUID to unregister (string or BluetoothUUID) 

226 

227 """ 

228 bt_uuid = BluetoothUUID(uuid) if not isinstance(uuid, BluetoothUUID) else uuid 

229 with cls._lock: 

230 cls._custom_characteristic_classes.pop(bt_uuid, None) 

231 

232 @staticmethod 

233 def get_characteristic_class( 

234 name: CharacteristicName, 

235 ) -> type[BaseCharacteristic] | None: 

236 """Get the characteristic class for a given CharacteristicName enum. 

237 

238 This API is enum-only. Callers must pass a `CharacteristicName`. 

239 """ 

240 return _CharacteristicMapBuilder.get_cached_map().get(name) 

241 

242 @staticmethod 

243 def list_all_characteristic_names() -> list[str]: 

244 """List all supported characteristic names as strings. 

245 

246 Returns: 

247 List of all characteristic names. 

248 

249 """ 

250 return [e.value for e in CharacteristicName] 

251 

252 @staticmethod 

253 def list_all_characteristic_enums() -> list[CharacteristicName]: 

254 """List all supported characteristic names as enum values. 

255 

256 Returns: 

257 List of all characteristic enum values. 

258 

259 """ 

260 return list(CharacteristicName) 

261 

262 @classmethod 

263 def create_characteristic( 

264 cls, 

265 uuid: str | BluetoothUUID, 

266 ) -> BaseCharacteristic | None: 

267 """Create a characteristic instance from a UUID. 

268 

269 Args: 

270 uuid: The characteristic UUID (string or BluetoothUUID). 

271 

272 Returns: 

273 Characteristic instance if found, None otherwise. 

274 

275 """ 

276 # Handle UUID input 

277 if isinstance(uuid, BluetoothUUID): 

278 uuid_obj = uuid 

279 else: 

280 try: 

281 uuid_obj = BluetoothUUID(uuid) 

282 except ValueError: 

283 # Invalid UUID format, cannot create characteristic 

284 return None 

285 

286 # Check custom registry first 

287 with cls._lock: 

288 if custom_cls := cls._custom_characteristic_classes.get(uuid_obj): 

289 return custom_cls() 

290 

291 # Look up by UUID at class level (no instantiation needed) 

292 for _, char_cls in _CharacteristicMapBuilder.get_cached_map().items(): 

293 resolved_uuid = char_cls.get_class_uuid() 

294 if resolved_uuid and resolved_uuid == uuid_obj: 

295 return char_cls() 

296 

297 return None 

298 

299 @classmethod 

300 def get_characteristic_class_by_uuid(cls, uuid: str | BluetoothUUID) -> type[BaseCharacteristic] | None: 

301 """Get the characteristic class for a given UUID. 

302 

303 Args: 

304 uuid: The characteristic UUID (with or without dashes). 

305 

306 Returns: 

307 The characteristic class if found, None otherwise. 

308 

309 """ 

310 # Always normalize UUID to BluetoothUUID 

311 try: 

312 bt_uuid = BluetoothUUID(uuid) if not isinstance(uuid, BluetoothUUID) else uuid 

313 except ValueError: 

314 return None 

315 

316 # Check custom registry first 

317 with cls._lock: 

318 if custom_cls := cls._custom_characteristic_classes.get(bt_uuid): 

319 return custom_cls 

320 

321 # Look up by UUID at class level (no instantiation needed) 

322 for char_cls in _CharacteristicMapBuilder.get_cached_map().values(): 

323 resolved_uuid = char_cls.get_class_uuid() 

324 if resolved_uuid and resolved_uuid == bt_uuid: 

325 return char_cls 

326 

327 return None 

328 

329 @staticmethod 

330 def get_all_characteristics() -> dict[CharacteristicName, type[BaseCharacteristic]]: 

331 """Get all registered characteristic classes. 

332 

333 Returns: 

334 Dictionary mapping characteristic names to classes 

335 

336 """ 

337 result: dict[CharacteristicName, type[BaseCharacteristic]] = {} 

338 for name, char_cls in _CharacteristicMapBuilder.get_cached_map().items(): 

339 result[name] = char_cls 

340 return result 

341 

342 @classmethod 

343 def clear_custom_registrations(cls) -> None: 

344 """Clear all custom characteristic registrations (for testing).""" 

345 with cls._lock: 

346 cls._custom_characteristic_classes.clear() 

347 

348 @staticmethod 

349 def clear_cache() -> None: 

350 """Clear the characteristic class map cache (for testing). 

351 

352 This forces the registry to be rebuilt on next access. 

353 Use sparingly - primarily for testing scenarios where 

354 characteristic classes are modified at runtime. 

355 """ 

356 _CharacteristicMapBuilder.clear_cache()