Coverage for src/bluetooth_sig/gatt/services/registry.py: 87%

126 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-30 00:10 +0000

1"""Bluetooth SIG GATT service registry. 

2 

3This module contains the service registry implementation, including the 

4ServiceName enum, service class mappings, and the GattServiceRegistry 

5class. This was moved from __init__.py to follow Python best practices 

6of keeping __init__.py files lightweight. 

7""" 

8 

9from __future__ import annotations 

10 

11import threading 

12from functools import lru_cache 

13 

14from typing_extensions import TypeGuard 

15 

16from ...types.gatt_enums import ServiceName 

17from ...types.gatt_services import ServiceDiscoveryData 

18from ...types.uuid import BluetoothUUID 

19from ..registry_utils import ModuleDiscovery, TypeValidator 

20from ..uuid_registry import uuid_registry 

21from .base import BaseGattService 

22 

23__all__ = [ 

24 "ServiceName", 

25 "SERVICE_CLASS_MAP", 

26 "GattServiceRegistry", 

27] 

28 

29 

30class _ServiceClassValidator: # pylint: disable=too-few-public-methods 

31 """Utility class for validating service classes. 

32 

33 Note: Single-purpose validator class - pylint disable justified. 

34 """ 

35 

36 @staticmethod 

37 def is_service_subclass(candidate: object) -> TypeGuard[type[BaseGattService]]: 

38 """Return True when candidate is a BaseGattService subclass.""" 

39 return TypeValidator.is_subclass_of(candidate, BaseGattService) 

40 

41 

42class _ServiceClassDiscovery: 

43 """Handles discovery and validation of service classes in the package.""" 

44 

45 _MODULE_EXCLUSIONS = {"__main__", "__init__", "base", "registry"} 

46 

47 @classmethod 

48 def iter_module_names(cls) -> list[str]: 

49 """Return sorted service module names discovered via pkgutil.walk_packages.""" 

50 package_name = __package__ or "bluetooth_sig.gatt.services" 

51 return ModuleDiscovery.iter_module_names(package_name, cls._MODULE_EXCLUSIONS) 

52 

53 @classmethod 

54 def discover_classes(cls) -> list[type[BaseGattService]]: 

55 """Discover all concrete service classes defined in the package. 

56 

57 Validates that discovered classes have required methods for proper operation. 

58 """ 

59 module_names = cls.iter_module_names() 

60 return ModuleDiscovery.discover_classes( 

61 module_names, 

62 BaseGattService, 

63 _ServiceClassValidator.is_service_subclass, 

64 ) 

65 

66 

67class _ServiceClassMapBuilder: 

68 """Builds and caches the service class map using dynamic discovery.""" 

69 

70 @staticmethod 

71 def build_enum_map() -> dict[ServiceName, type[BaseGattService]]: 

72 """Build the service class mapping using runtime discovery.""" 

73 mapping: dict[ServiceName, type[BaseGattService]] = {} 

74 

75 for service_cls in _ServiceClassDiscovery.discover_classes(): 

76 # Get UUID from class 

77 try: 

78 uuid_obj = service_cls.get_class_uuid() 

79 except (AttributeError, ValueError): 

80 continue 

81 

82 # Find the corresponding enum member by UUID 

83 enum_member = None 

84 for candidate_enum in ServiceName: 

85 candidate_info = uuid_registry.get_service_info(candidate_enum.value) 

86 if candidate_info and candidate_info.uuid == uuid_obj: 

87 enum_member = candidate_enum 

88 break 

89 

90 if enum_member is None: 

91 continue 

92 

93 existing = mapping.get(enum_member) 

94 if existing is not None and existing is not service_cls: 

95 raise RuntimeError( 

96 f"Multiple service classes resolved for {enum_member.name}:" 

97 f" {existing.__name__} and {service_cls.__name__}" 

98 ) 

99 mapping[enum_member] = service_cls 

100 

101 return mapping 

102 

103 @staticmethod 

104 @lru_cache(maxsize=1) 

105 def get_cached_enum_map() -> dict[ServiceName, type[BaseGattService]]: 

106 """Return the cached enum-keyed service class map.""" 

107 return _ServiceClassMapBuilder.build_enum_map() 

108 

109 @staticmethod 

110 def clear_cache() -> None: 

111 """Clear the service class map cache.""" 

112 _ServiceClassMapBuilder.get_cached_enum_map.cache_clear() 

113 

114 

115# Public API functions for backward compatibility 

116def get_service_class_map() -> dict[ServiceName, type[BaseGattService]]: 

117 """Get the service class map, building it if necessary.""" 

118 return _ServiceClassMapBuilder.get_cached_enum_map() 

119 

120 

121# Public API - backward compatibility globals 

122SERVICE_CLASS_MAP = _ServiceClassMapBuilder.get_cached_enum_map() 

123 

124 

125class GattServiceRegistry: 

126 """Registry for all supported GATT services.""" 

127 

128 _lock = threading.RLock() 

129 _custom_service_classes: dict[BluetoothUUID, type[BaseGattService]] = {} 

130 

131 @classmethod 

132 def register_service_class(cls, uuid: str | BluetoothUUID, service_cls: object, override: bool = False) -> None: 

133 """Register a custom service class at runtime. 

134 

135 Args: 

136 uuid: The service UUID 

137 service_cls: The service class to register 

138 override: Whether to override existing registrations 

139 

140 Raises: 

141 TypeError: If service_cls does not inherit from BaseGattService 

142 ValueError: If UUID conflicts with existing registration and override=False 

143 

144 """ 

145 # Runtime safety check retained in case of dynamic caller misuse despite type hints. 

146 if not _ServiceClassValidator.is_service_subclass(service_cls): 

147 raise TypeError(f"Class {service_cls!r} must inherit from BaseGattService") 

148 

149 # Always normalize UUID to BluetoothUUID 

150 bt_uuid = BluetoothUUID(uuid) if not isinstance(uuid, BluetoothUUID) else uuid 

151 

152 with cls._lock: 

153 # Check for conflicts 

154 if not override: 

155 if bt_uuid in cls._custom_service_classes: 

156 raise ValueError(f"UUID {bt_uuid} already registered. Use override=True to replace.") 

157 

158 cls._custom_service_classes[bt_uuid] = service_cls 

159 

160 @classmethod 

161 def unregister_service_class(cls, uuid: str | BluetoothUUID) -> None: 

162 """Unregister a custom service class. 

163 

164 Args: 

165 uuid: The service UUID to unregister 

166 

167 """ 

168 bt_uuid = BluetoothUUID(uuid) if not isinstance(uuid, BluetoothUUID) else uuid 

169 with cls._lock: 

170 cls._custom_service_classes.pop(bt_uuid, None) 

171 

172 @classmethod 

173 def _get_services(cls) -> list[type[BaseGattService]]: 

174 """Get the list of service classes.""" 

175 return list(get_service_class_map().values()) 

176 

177 @classmethod 

178 def get_service_class(cls, uuid: str | BluetoothUUID) -> type[BaseGattService] | None: 

179 """Get the service class for a given UUID.""" 

180 try: 

181 if isinstance(uuid, str): 

182 bt_uuid = BluetoothUUID(uuid) 

183 else: 

184 bt_uuid = uuid 

185 except ValueError: 

186 return None 

187 # Check custom registry first 

188 with cls._lock: 

189 if custom_cls := cls._custom_service_classes.get(bt_uuid): 

190 return custom_cls 

191 

192 for service_class in cls._get_services(): 

193 if service_class.matches_uuid(bt_uuid): 

194 return service_class 

195 return None 

196 

197 @classmethod 

198 def get_service_class_by_name(cls, name: str | ServiceName) -> type[BaseGattService] | None: 

199 """Get the service class for a given name or enum.""" 

200 if isinstance(name, str): 

201 # For string names, find the matching enum member 

202 for enum_member in ServiceName: 

203 if enum_member.value == name: 

204 name = enum_member 

205 break 

206 else: 

207 return None # No matching enum found 

208 return get_service_class_map().get(name) 

209 

210 @classmethod 

211 def get_service_class_by_uuid(cls, uuid: BluetoothUUID) -> type[BaseGattService] | None: 

212 """Get the service class for a given UUID (alias for get_service_class).""" 

213 return cls.get_service_class(uuid) 

214 

215 @classmethod 

216 def create_service(cls, uuid: BluetoothUUID, characteristics: ServiceDiscoveryData) -> BaseGattService | None: 

217 """Create a service instance for the given UUID and characteristics. 

218 

219 Args: 

220 uuid: Service UUID 

221 characteristics: Dict mapping characteristic UUIDs to CharacteristicInfo 

222 

223 Returns: 

224 Service instance if found, None otherwise 

225 

226 """ 

227 service_class = cls.get_service_class(uuid) 

228 if not service_class: 

229 return None 

230 

231 service = service_class() 

232 service.process_characteristics(characteristics) 

233 return service 

234 

235 @classmethod 

236 def get_all_services(cls) -> list[type[BaseGattService]]: 

237 """Get all registered service classes. 

238 

239 Returns: 

240 List of all registered service classes 

241 

242 """ 

243 return cls._get_services().copy() 

244 

245 @classmethod 

246 def supported_services(cls) -> list[str]: 

247 """Get a list of supported service UUIDs.""" 

248 return [str(service().uuid) for service in cls._get_services()] 

249 

250 @classmethod 

251 def supported_service_names(cls) -> list[str]: 

252 """Get a list of supported service names.""" 

253 return [e.value for e in ServiceName] 

254 

255 @classmethod 

256 def clear_custom_registrations(cls) -> None: 

257 """Clear all custom service registrations (for testing).""" 

258 with cls._lock: 

259 cls._custom_service_classes.clear()