Coverage for src/bluetooth_sig/utils/profiling.py: 99%

77 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-30 00:10 +0000

1"""Profiling and performance measurement utilities for Bluetooth SIG library.""" 

2 

3from __future__ import annotations 

4 

5import time 

6from collections.abc import Generator 

7from contextlib import contextmanager 

8from typing import Any, Callable, TypeVar 

9 

10import msgspec 

11 

12T = TypeVar("T") 

13 

14 

15class TimingResult(msgspec.Struct, kw_only=True): 

16 """Result of a timing measurement.""" 

17 

18 operation: str 

19 iterations: int 

20 total_time: float 

21 avg_time: float 

22 min_time: float 

23 max_time: float 

24 per_second: float 

25 

26 def __str__(self) -> str: 

27 """Format timing result as human-readable string.""" 

28 return ( 

29 f"{self.operation}:\n" 

30 f" Iterations: {self.iterations}\n" 

31 f" Total time: {self.total_time:.4f}s\n" 

32 f" Average: {self.avg_time * 1000:.4f}ms per operation\n" 

33 f" Min: {self.min_time * 1000:.4f}ms\n" 

34 f" Max: {self.max_time * 1000:.4f}ms\n" 

35 f" Throughput: {self.per_second:.0f} ops/sec" 

36 ) 

37 

38 

39class ProfilingSession(msgspec.Struct, kw_only=True): 

40 """Track multiple profiling results in a session.""" 

41 

42 name: str 

43 results: list[TimingResult] = msgspec.field(default_factory=list) 

44 

45 def add_result(self, result: TimingResult) -> None: 

46 """Add a timing result to the session.""" 

47 self.results.append(result) 

48 

49 def __str__(self) -> str: 

50 """Format session results as human-readable string.""" 

51 lines = [f"=== {self.name} ===", ""] 

52 for result in self.results: 

53 lines.append(str(result)) 

54 lines.append("") 

55 return "\n".join(lines) 

56 

57 

58@contextmanager 

59def timer(_operation: str = "operation") -> Generator[dict[str, float], None, None]: 

60 """Context manager for timing a single operation. 

61 

62 Args: 

63 operation: Name of the operation being timed (currently unused, reserved for future use) 

64 

65 Yields: 

66 Dictionary that will contain 'elapsed' key with timing result 

67 

68 Example: 

69 >>> with timer("parse") as t: 

70 ... parse_characteristic(data) 

71 >>> print(f"Elapsed: {t['elapsed']:.4f}s") 

72 

73 """ 

74 timing: dict[str, float] = {} 

75 start = time.perf_counter() 

76 try: 

77 yield timing 

78 finally: 

79 timing["elapsed"] = time.perf_counter() - start 

80 

81 

82def benchmark_function( 

83 func: Callable[[], T], 

84 iterations: int = 1000, 

85 operation: str = "function", 

86) -> TimingResult: 

87 r"""Benchmark a function by running it multiple times. 

88 

89 Args: 

90 func: Function to benchmark (should take no arguments) 

91 iterations: Number of times to run the function 

92 operation: Name of the operation for reporting 

93 

94 Returns: 

95 TimingResult with detailed performance metrics 

96 

97 Example: 

98 >>> result = benchmark_function( 

99 ... lambda: translator.parse_characteristic("2A19", b"\\x64"), 

100 ... iterations=10000, 

101 ... operation="Battery Level parsing", 

102 ... ) 

103 >>> print(result) 

104 

105 Note: 

106 Uses time.perf_counter() for high-resolution timing. The function 

107 includes a warmup run to avoid JIT compilation overhead in the 

108 measurements. Individual timings are collected to compute min/max 

109 statistics. 

110 

111 """ 

112 times: list[float] = [] 

113 

114 # Warmup run to avoid JIT compilation overhead 

115 func() 

116 

117 # Collect individual timings with minimal overhead 

118 # We measure both individual times (for min/max) and total time 

119 perf_counter = time.perf_counter # Cache function lookup 

120 start_total = perf_counter() 

121 for _ in range(iterations): 

122 start = perf_counter() 

123 func() 

124 times.append(perf_counter() - start) 

125 total_time = perf_counter() - start_total 

126 

127 # Calculate statistics 

128 avg_time = total_time / iterations 

129 min_time = min(times) 

130 max_time = max(times) 

131 per_second = iterations / total_time if total_time > 0 else 0 

132 

133 return TimingResult( 

134 operation=operation, 

135 iterations=iterations, 

136 total_time=total_time, 

137 avg_time=avg_time, 

138 min_time=min_time, 

139 max_time=max_time, 

140 per_second=per_second, 

141 ) 

142 

143 

144def compare_implementations( 

145 implementations: dict[str, Callable[[], Any]], 

146 iterations: int = 1000, 

147) -> dict[str, TimingResult]: 

148 """Compare performance of multiple implementations. 

149 

150 Args: 

151 implementations: Dict mapping implementation name to callable 

152 iterations: Number of times to run each implementation 

153 

154 Returns: 

155 Dictionary mapping implementation names to their TimingResults 

156 

157 Example: 

158 >>> results = compare_implementations( 

159 ... { 

160 ... "manual": lambda: manual_parse(data), 

161 ... "sig_lib": lambda: translator.parse_characteristic("2A19", data), 

162 ... }, 

163 ... iterations=10000, 

164 ... ) 

165 >>> for name, result in results.items(): 

166 ... print(f"{name}: {result.avg_time * 1000:.4f}ms") 

167 

168 """ 

169 results: dict[str, TimingResult] = {} 

170 for name, func in implementations.items(): 

171 results[name] = benchmark_function(func, iterations, name) 

172 return results 

173 

174 

175def format_comparison(results: dict[str, TimingResult], baseline: str | None = None) -> str: 

176 """Format comparison results as a human-readable table. 

177 

178 Args: 

179 results: Dictionary of timing results 

180 baseline: Optional name of baseline implementation for comparison 

181 

182 Returns: 

183 Formatted string with comparison table 

184 

185 """ 

186 if not results: 

187 return "No results to display" 

188 

189 lines = ["Performance Comparison:", "=" * 80] 

190 

191 # Header 

192 lines.append(f"{'Implementation':<30} {'Avg Time':<15} {'Throughput':<20} {'vs Baseline'}") 

193 lines.append("-" * 80) 

194 

195 baseline_time = None 

196 if baseline and baseline in results: 

197 baseline_time = results[baseline].avg_time 

198 

199 for name, result in results.items(): 

200 avg_str = f"{result.avg_time * 1000:.4f}ms" 

201 throughput_str = f"{result.per_second:.0f} ops/sec" 

202 

203 if baseline_time and name != baseline: 

204 ratio = result.avg_time / baseline_time 

205 if ratio < 1: 

206 comparison = f"{1 / ratio:.2f}x faster" 

207 else: 

208 comparison = f"{ratio:.2f}x slower" 

209 elif name == baseline: 

210 comparison = "(baseline)" 

211 else: 

212 comparison = "-" 

213 

214 lines.append(f"{name:<30} {avg_str:<15} {throughput_str:<20} {comparison}") 

215 

216 return "\n".join(lines)