Coverage for src / bluetooth_sig / advertising / registry.py: 52%

80 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 20:14 +0000

1"""Registry for advertising data interpreter routing.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6from typing import Any 

7 

8from bluetooth_sig.advertising.base import AdvertisingDataInterpreter, DataSource 

9from bluetooth_sig.types.uuid import BluetoothUUID 

10 

11logger = logging.getLogger(__name__) 

12 

13 

14class AdvertisingInterpreterRegistry: 

15 """Routes advertisements to interpreter classes. 

16 

17 Does NOT manage interpreter instances - Device owns those. 

18 Only handles class registration and lookup. 

19 """ 

20 

21 def __init__(self) -> None: 

22 """Initialise empty registry.""" 

23 self._manufacturer_interpreters: dict[int, list[type[AdvertisingDataInterpreter[Any]]]] = {} 

24 self._service_interpreters: dict[str, list[type[AdvertisingDataInterpreter[Any]]]] = {} 

25 self._fallback_interpreters: list[type[AdvertisingDataInterpreter[Any]]] = [] 

26 

27 def register(self, interpreter_class: type[AdvertisingDataInterpreter[Any]]) -> None: 

28 """Register an interpreter class (called by AdvertisingDataInterpreter.__init_subclass__).""" 

29 if not hasattr(interpreter_class, "_info"): 

30 logger.warning("Interpreter %s has no _info, skipping", interpreter_class.__name__) 

31 return 

32 

33 info = interpreter_class._info # pylint: disable=protected-access 

34 

35 if info.data_source == DataSource.MANUFACTURER and info.company_id is not None: 

36 if info.company_id not in self._manufacturer_interpreters: 

37 self._manufacturer_interpreters[info.company_id] = [] 

38 self._manufacturer_interpreters[info.company_id].append(interpreter_class) 

39 logger.debug("Registered %s for company 0x%04X", interpreter_class.__name__, info.company_id) 

40 

41 elif info.data_source == DataSource.SERVICE and info.service_uuid is not None: 

42 uuid_key = str(info.service_uuid).upper() 

43 if uuid_key not in self._service_interpreters: 

44 self._service_interpreters[uuid_key] = [] 

45 self._service_interpreters[uuid_key].append(interpreter_class) 

46 logger.debug("Registered %s for UUID %s", interpreter_class.__name__, uuid_key) 

47 

48 else: 

49 self._fallback_interpreters.append(interpreter_class) 

50 logger.debug("Registered fallback interpreter %s", interpreter_class.__name__) 

51 

52 def unregister(self, interpreter_class: type[AdvertisingDataInterpreter[Any]]) -> None: 

53 """Unregister an interpreter class.""" 

54 if not hasattr(interpreter_class, "_info"): 

55 return 

56 

57 info = interpreter_class._info # pylint: disable=protected-access 

58 

59 if info.data_source == DataSource.MANUFACTURER and info.company_id is not None: 

60 if info.company_id in self._manufacturer_interpreters: 

61 self._manufacturer_interpreters[info.company_id] = [ 

62 p for p in self._manufacturer_interpreters[info.company_id] if p is not interpreter_class 

63 ] 

64 

65 elif info.data_source == DataSource.SERVICE and info.service_uuid is not None: 

66 uuid_key = str(info.service_uuid).upper() 

67 if uuid_key in self._service_interpreters: 

68 self._service_interpreters[uuid_key] = [ 

69 p for p in self._service_interpreters[uuid_key] if p is not interpreter_class 

70 ] 

71 

72 if interpreter_class in self._fallback_interpreters: 

73 self._fallback_interpreters.remove(interpreter_class) 

74 

75 def find_interpreter_class( 

76 self, 

77 manufacturer_data: dict[int, bytes], 

78 service_data: dict[BluetoothUUID, bytes], 

79 local_name: str | None, 

80 ) -> type[AdvertisingDataInterpreter[Any]] | None: 

81 """Find first interpreter class that handles this advertisement.""" 

82 candidates: list[type[AdvertisingDataInterpreter[Any]]] = [] 

83 

84 for company_id in manufacturer_data: 

85 if company_id in self._manufacturer_interpreters: 

86 candidates.extend(self._manufacturer_interpreters[company_id]) 

87 

88 for service_uuid in service_data: 

89 uuid_key = str(service_uuid).upper() 

90 if uuid_key in self._service_interpreters: 

91 candidates.extend(self._service_interpreters[uuid_key]) 

92 

93 candidates.extend(self._fallback_interpreters) 

94 

95 for interpreter_class in candidates: 

96 if interpreter_class.supports(manufacturer_data, service_data, local_name): 

97 return interpreter_class 

98 

99 return None 

100 

101 def find_all_interpreter_classes( 

102 self, 

103 manufacturer_data: dict[int, bytes], 

104 service_data: dict[BluetoothUUID, bytes], 

105 local_name: str | None, 

106 ) -> list[type[AdvertisingDataInterpreter[Any]]]: 

107 """Find all interpreter classes that handle this advertisement.""" 

108 candidates: list[type[AdvertisingDataInterpreter[Any]]] = [] 

109 

110 for company_id in manufacturer_data: 

111 if company_id in self._manufacturer_interpreters: 

112 candidates.extend(self._manufacturer_interpreters[company_id]) 

113 

114 for service_uuid in service_data: 

115 uuid_key = str(service_uuid).upper() 

116 if uuid_key in self._service_interpreters: 

117 candidates.extend(self._service_interpreters[uuid_key]) 

118 

119 candidates.extend(self._fallback_interpreters) 

120 

121 return [ic for ic in candidates if ic.supports(manufacturer_data, service_data, local_name)] 

122 

123 def get_registered_interpreters(self) -> list[type[AdvertisingDataInterpreter[Any]]]: 

124 """Get all registered interpreter classes.""" 

125 all_interpreters: list[type[AdvertisingDataInterpreter[Any]]] = [] 

126 for interpreters in self._manufacturer_interpreters.values(): 

127 all_interpreters.extend(interpreters) 

128 for interpreters in self._service_interpreters.values(): 

129 all_interpreters.extend(interpreters) 

130 all_interpreters.extend(self._fallback_interpreters) 

131 return list(set(all_interpreters)) 

132 

133 def clear(self) -> None: 

134 """Clear all registered interpreters.""" 

135 self._manufacturer_interpreters.clear() 

136 self._service_interpreters.clear() 

137 self._fallback_interpreters.clear() 

138 

139 

140# Global singleton for class registration 

141advertising_interpreter_registry = AdvertisingInterpreterRegistry()