Coverage for src / bluetooth_sig / device / characteristic_io.py: 70%

109 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-03 16:41 +0000

1"""Characteristic I/O operations for BLE devices. 

2 

3Encapsulates read, write, and notification operations for GATT characteristics, 

4including type-safe overloads for class-based and string/enum-based access. 

5""" 

6 

7from __future__ import annotations 

8 

9import logging 

10from collections.abc import Callable 

11from typing import Any, TypeVar, cast, overload 

12 

13from ..gatt.characteristics.base import BaseCharacteristic 

14from ..gatt.characteristics.registry import CharacteristicName, CharacteristicRegistry 

15from ..gatt.context import CharacteristicContext, DeviceInfo 

16from ..types.uuid import BluetoothUUID 

17from .client import ClientManagerProtocol 

18from .dependency_resolver import DependencyResolutionMode, DependencyResolver 

19from .protocols import SIGTranslatorProtocol 

20 

21logger = logging.getLogger(__name__) 

22 

23# Type variable for generic characteristic return types 

24T = TypeVar("T") 

25 

26 

27class CharacteristicIO: 

28 """Read, write, and notification operations for GATT characteristics. 

29 

30 Encapsulates the I/O logic extracted from Device, handling both type-safe 

31 (class-based) and dynamic (string/enum-based) characteristic access patterns. 

32 

33 Uses ``DependencyResolver`` for automatic dependency resolution before reads, 

34 and a ``device_info_factory`` callable to get current ``DeviceInfo`` without 

35 a back-reference to the owning Device. 

36 """ 

37 

38 def __init__( 

39 self, 

40 connection_manager: ClientManagerProtocol, 

41 translator: SIGTranslatorProtocol, 

42 dep_resolver: DependencyResolver, 

43 device_info_factory: Callable[[], DeviceInfo], 

44 ) -> None: 

45 """Initialise with connection manager, translator, resolver, and info factory. 

46 

47 Args: 

48 connection_manager: Connection manager for BLE I/O 

49 translator: Translator for parsing/encoding characteristics 

50 dep_resolver: Resolver for characteristic dependencies 

51 device_info_factory: Callable returning current DeviceInfo 

52 

53 """ 

54 self._connection_manager = connection_manager 

55 self._translator = translator 

56 self._dep_resolver = dep_resolver 

57 self._device_info_factory = device_info_factory 

58 

59 # ------------------------------------------------------------------ 

60 # Read 

61 # ------------------------------------------------------------------ 

62 

63 @overload 

64 async def read( 

65 self, 

66 char: type[BaseCharacteristic[T]], 

67 resolution_mode: DependencyResolutionMode = ..., 

68 ) -> T | None: ... 

69 

70 @overload 

71 async def read( 

72 self, 

73 char: str | CharacteristicName, 

74 resolution_mode: DependencyResolutionMode = ..., 

75 ) -> Any | None: ... # noqa: ANN401 # Runtime UUID dispatch cannot be type-safe 

76 

77 async def read( 

78 self, 

79 char: str | CharacteristicName | type[BaseCharacteristic[T]], 

80 resolution_mode: DependencyResolutionMode = DependencyResolutionMode.NORMAL, 

81 ) -> T | Any | None: # Runtime UUID dispatch cannot be type-safe 

82 """Read a characteristic value from the device. 

83 

84 Args: 

85 char: Name, enum, or characteristic class to read. 

86 Passing the class enables type-safe return values. 

87 resolution_mode: How to handle automatic dependency resolution: 

88 - NORMAL: Auto-resolve dependencies, use cache when available (default) 

89 - SKIP_DEPENDENCIES: Skip dependency resolution and validation 

90 - FORCE_REFRESH: Re-read dependencies from device, ignoring cache 

91 

92 Returns: 

93 Parsed characteristic value or None if read fails. 

94 Return type is inferred from characteristic class when provided. 

95 

96 Raises: 

97 RuntimeError: If no connection manager is attached 

98 ValueError: If required dependencies cannot be resolved 

99 

100 """ 

101 # Handle characteristic class input (type-safe path) 

102 if isinstance(char, type) and issubclass(char, BaseCharacteristic): 

103 char_class: type[BaseCharacteristic[Any]] = char 

104 char_instance = char_class() 

105 resolved_uuid = char_instance.uuid 

106 

107 ctx: CharacteristicContext | None = None 

108 if resolution_mode != DependencyResolutionMode.SKIP_DEPENDENCIES: 

109 device_info = self._device_info_factory() 

110 ctx = await self._dep_resolver.resolve(char_class, resolution_mode, device_info) 

111 

112 raw = await self._connection_manager.read_gatt_char(resolved_uuid) 

113 return char_instance.parse_value(raw, ctx=ctx) 

114 

115 # Handle string/enum input (not type-safe path) 

116 resolved_uuid = self._resolve_characteristic_name(char) 

117 

118 char_class_lookup = CharacteristicRegistry.get_characteristic_class_by_uuid(resolved_uuid) 

119 

120 # Resolve dependencies if characteristic class is known 

121 ctx = None 

122 if char_class_lookup and resolution_mode != DependencyResolutionMode.SKIP_DEPENDENCIES: 

123 device_info = self._device_info_factory() 

124 ctx = await self._dep_resolver.resolve(char_class_lookup, resolution_mode, device_info) 

125 

126 # Read the characteristic 

127 raw = await self._connection_manager.read_gatt_char(resolved_uuid) 

128 return self._translator.parse_characteristic(str(resolved_uuid), raw, ctx=ctx) 

129 

130 # ------------------------------------------------------------------ 

131 # Write 

132 # ------------------------------------------------------------------ 

133 

134 @overload 

135 async def write( 

136 self, 

137 char: type[BaseCharacteristic[T]], 

138 data: T, 

139 response: bool = ..., 

140 ) -> None: ... 

141 

142 @overload 

143 async def write( 

144 self, 

145 char: str | CharacteristicName, 

146 data: bytes, 

147 response: bool = ..., 

148 ) -> None: ... 

149 

150 async def write( 

151 self, 

152 char: str | CharacteristicName | type[BaseCharacteristic[T]], 

153 data: bytes | T, 

154 response: bool = True, 

155 ) -> None: 

156 r"""Write data to a characteristic on the device. 

157 

158 Args: 

159 char: Name, enum, or characteristic class to write to. 

160 Passing the class enables type-safe value encoding. 

161 data: Raw bytes (for string/enum) or typed value (for characteristic class). 

162 When using characteristic class, the value is encoded using build_value(). 

163 response: If True, use write-with-response (wait for acknowledgment). 

164 If False, use write-without-response (faster but no confirmation). 

165 Default is True for reliability. 

166 

167 Raises: 

168 RuntimeError: If no connection manager is attached 

169 CharacteristicEncodeError: If encoding fails (when using characteristic class) 

170 

171 """ 

172 # Handle characteristic class input (type-safe path) 

173 if isinstance(char, type) and issubclass(char, BaseCharacteristic): 

174 char_instance = char() 

175 resolved_uuid = char_instance.uuid 

176 # data is typed value T, encode it 

177 encoded = char_instance.build_value(data) # type: ignore[arg-type] # T is erased at runtime; overload ensures type safety at call site 

178 await self._connection_manager.write_gatt_char(resolved_uuid, bytes(encoded), response=response) 

179 return 

180 

181 # Handle string/enum input (not type-safe path) 

182 # data must be bytes in this path 

183 if not isinstance(data, (bytes, bytearray)): 

184 raise TypeError(f"When using string/enum char_name, data must be bytes, got {type(data).__name__}") 

185 

186 resolved_uuid = self._resolve_characteristic_name(char) 

187 # cast is safe: isinstance check above ensures data is bytes/bytearray 

188 await self._connection_manager.write_gatt_char(resolved_uuid, cast("bytes", data), response=response) 

189 

190 # ------------------------------------------------------------------ 

191 # Notifications 

192 # ------------------------------------------------------------------ 

193 

194 @overload 

195 async def start_notify( 

196 self, 

197 char: type[BaseCharacteristic[T]], 

198 callback: Callable[[T], None], 

199 ) -> None: ... 

200 

201 @overload 

202 async def start_notify( 

203 self, 

204 char: str | CharacteristicName, 

205 callback: Callable[[Any], None], 

206 ) -> None: ... 

207 

208 async def start_notify( 

209 self, 

210 char: str | CharacteristicName | type[BaseCharacteristic[T]], 

211 callback: Callable[[T], None] | Callable[[Any], None], 

212 ) -> None: 

213 """Start notifications for a characteristic. 

214 

215 Args: 

216 char: Name, enum, or characteristic class to monitor. 

217 Passing the class enables type-safe callbacks. 

218 callback: Function to call when notifications are received. 

219 Callback parameter type is inferred from characteristic class. 

220 

221 Raises: 

222 RuntimeError: If no connection manager is attached 

223 

224 """ 

225 # Handle characteristic class input (type-safe path) 

226 if isinstance(char, type) and issubclass(char, BaseCharacteristic): 

227 char_instance = char() 

228 resolved_uuid = char_instance.uuid 

229 

230 def _typed_cb(sender: str, data: bytes) -> None: 

231 del sender # Required by callback interface 

232 parsed = char_instance.parse_value(data) 

233 try: 

234 callback(parsed) 

235 except Exception: # pylint: disable=broad-exception-caught 

236 logger.exception("Notification callback raised an exception") 

237 

238 await self._connection_manager.start_notify(resolved_uuid, _typed_cb) 

239 return 

240 

241 # Handle string/enum input (not type-safe path) 

242 resolved_uuid = self._resolve_characteristic_name(char) 

243 translator = self._translator 

244 

245 def _internal_cb(sender: str, data: bytes) -> None: 

246 parsed = translator.parse_characteristic(sender, data) 

247 try: 

248 callback(parsed) 

249 except Exception: # pylint: disable=broad-exception-caught 

250 logger.exception("Notification callback raised an exception") 

251 

252 await self._connection_manager.start_notify(resolved_uuid, _internal_cb) 

253 

254 async def stop_notify(self, char_name: str | CharacteristicName) -> None: 

255 """Stop notifications for a characteristic. 

256 

257 Args: 

258 char_name: Characteristic name or UUID 

259 

260 """ 

261 resolved_uuid = self._resolve_characteristic_name(char_name) 

262 await self._connection_manager.stop_notify(resolved_uuid) 

263 

264 # ------------------------------------------------------------------ 

265 # Batch operations 

266 # ------------------------------------------------------------------ 

267 

268 async def read_multiple(self, char_names: list[str | CharacteristicName]) -> dict[str, Any | None]: 

269 """Read multiple characteristics in batch. 

270 

271 Args: 

272 char_names: List of characteristic names or enums to read 

273 

274 Returns: 

275 Dictionary mapping characteristic UUIDs to parsed values 

276 

277 """ 

278 results: dict[str, Any | None] = {} 

279 for char_name in char_names: 

280 try: 

281 value = await self.read(char_name) 

282 resolved_uuid = self._resolve_characteristic_name(char_name) 

283 results[str(resolved_uuid)] = value 

284 except Exception as exc: # pylint: disable=broad-exception-caught 

285 resolved_uuid = self._resolve_characteristic_name(char_name) 

286 results[str(resolved_uuid)] = None 

287 logger.warning("Failed to read characteristic %s: %s", char_name, exc) 

288 

289 return results 

290 

291 async def write_multiple( 

292 self, data_map: dict[str | CharacteristicName, bytes], response: bool = True 

293 ) -> dict[str, bool]: 

294 """Write to multiple characteristics in batch. 

295 

296 Args: 

297 data_map: Dictionary mapping characteristic names/enums to data bytes 

298 response: If True, use write-with-response for all writes. 

299 If False, use write-without-response for all writes. 

300 

301 Returns: 

302 Dictionary mapping characteristic UUIDs to success status 

303 

304 """ 

305 results: dict[str, bool] = {} 

306 for char_name, data in data_map.items(): 

307 try: 

308 await self.write(char_name, data, response=response) 

309 resolved_uuid = self._resolve_characteristic_name(char_name) 

310 results[str(resolved_uuid)] = True 

311 except Exception as exc: # pylint: disable=broad-exception-caught 

312 resolved_uuid = self._resolve_characteristic_name(char_name) 

313 results[str(resolved_uuid)] = False 

314 logger.warning("Failed to write characteristic %s: %s", char_name, exc) 

315 

316 return results 

317 

318 # ------------------------------------------------------------------ 

319 # Internal helpers 

320 # ------------------------------------------------------------------ 

321 

322 def _resolve_characteristic_name(self, identifier: str | CharacteristicName) -> BluetoothUUID: 

323 """Resolve a characteristic name or enum to its UUID. 

324 

325 Args: 

326 identifier: Characteristic name string or enum 

327 

328 Returns: 

329 Characteristic UUID string 

330 

331 Raises: 

332 ValueError: If the characteristic name cannot be resolved 

333 

334 """ 

335 if isinstance(identifier, CharacteristicName): 

336 # For enum inputs, ask the translator for the UUID 

337 uuid = self._translator.get_characteristic_uuid_by_name(identifier) 

338 if uuid: 

339 return uuid 

340 norm = identifier.value.strip() 

341 else: 

342 norm = identifier 

343 stripped = norm.replace("-", "") 

344 if len(stripped) in (4, 8, 32) and all(c in "0123456789abcdefABCDEF" for c in stripped): 

345 return BluetoothUUID(norm) 

346 

347 raise ValueError(f"Unknown characteristic name: '{identifier}'")