Coverage for src / bluetooth_sig / gatt / characteristics / electric_current_statistics.py: 92%

62 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-03 16:41 +0000

1"""Electric Current Statistics characteristic implementation.""" 

2 

3from __future__ import annotations 

4 

5import math 

6 

7import msgspec 

8 

9from ..constants import UINT8_MAX, UINT16_MAX 

10from ..context import CharacteristicContext 

11from .base import BaseCharacteristic 

12from .utils import DataParser 

13 

14_CURRENT_RESOLUTION = 0.01 # 0.01 A per raw unit 

15_MAX_CURRENT = UINT16_MAX * _CURRENT_RESOLUTION # 655.35 A 

16_TIME_EXP_BASE = 1.1 

17_TIME_EXP_OFFSET = 64 

18 

19 

20def _decode_time_exponential(raw: int) -> float: 

21 """Decode Time Exponential 8 raw value to seconds.""" 

22 if raw == 0: 

23 return 0.0 

24 return _TIME_EXP_BASE ** (raw - _TIME_EXP_OFFSET) 

25 

26 

27def _encode_time_exponential(seconds: float) -> int: 

28 """Encode seconds to Time Exponential 8 raw value.""" 

29 if seconds <= 0.0: 

30 return 0 

31 n = round(math.log(seconds) / math.log(_TIME_EXP_BASE) + _TIME_EXP_OFFSET) 

32 return max(1, min(n, 0xFD)) 

33 

34 

35class ElectricCurrentStatisticsData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods 

36 """Data class for electric current statistics. 

37 

38 Five fields per GSS YAML: Average (uint16, 0.01 A), Standard Deviation 

39 (uint16, 0.01 A), Minimum (uint16, 0.01 A), Maximum (uint16, 0.01 A), 

40 and Sensing Duration (uint8, Time Exponential 8). 

41 """ 

42 

43 average: float # Average current in Amperes 

44 standard_deviation: float # Standard deviation in Amperes 

45 minimum: float # Minimum current in Amperes 

46 maximum: float # Maximum current in Amperes 

47 sensing_duration: float # Sensing duration in seconds (exponential encoding) 

48 

49 def __post_init__(self) -> None: 

50 """Validate current statistics data.""" 

51 if self.minimum > self.maximum: 

52 raise ValueError(f"Minimum current {self.minimum} A cannot be greater than maximum {self.maximum} A") 

53 

54 for name, current in [ 

55 ("average", self.average), 

56 ("standard_deviation", self.standard_deviation), 

57 ("minimum", self.minimum), 

58 ("maximum", self.maximum), 

59 ]: 

60 if not 0.0 <= current <= _MAX_CURRENT: 

61 raise ValueError( 

62 f"{name.capitalize()} current {current} A is outside valid range (0.0 to {_MAX_CURRENT} A)" 

63 ) 

64 if self.sensing_duration < 0.0: 

65 raise ValueError(f"Sensing duration {self.sensing_duration} s cannot be negative") 

66 

67 

68class ElectricCurrentStatisticsCharacteristic(BaseCharacteristic[ElectricCurrentStatisticsData]): 

69 """Electric Current Statistics characteristic (0x2AF1). 

70 

71 org.bluetooth.characteristic.electric_current_statistics 

72 

73 Statistics for current measurements: average, standard deviation, 

74 minimum, maximum (all uint16, 0.01 A), and sensing duration 

75 (Time Exponential 8). 

76 """ 

77 

78 expected_length: int = 9 # 4x uint16 + uint8 

79 min_length: int = 9 

80 

81 def _decode_value( 

82 self, data: bytearray, ctx: CharacteristicContext | None = None, *, validate: bool = True 

83 ) -> ElectricCurrentStatisticsData: 

84 """Parse current statistics data (4x uint16 + uint8).""" 

85 avg_raw = DataParser.parse_int16(data, 0, signed=False) 

86 std_raw = DataParser.parse_int16(data, 2, signed=False) 

87 min_raw = DataParser.parse_int16(data, 4, signed=False) 

88 max_raw = DataParser.parse_int16(data, 6, signed=False) 

89 dur_raw = DataParser.parse_int8(data, 8, signed=False) 

90 

91 return ElectricCurrentStatisticsData( 

92 average=avg_raw * _CURRENT_RESOLUTION, 

93 standard_deviation=std_raw * _CURRENT_RESOLUTION, 

94 minimum=min_raw * _CURRENT_RESOLUTION, 

95 maximum=max_raw * _CURRENT_RESOLUTION, 

96 sensing_duration=_decode_time_exponential(dur_raw), 

97 ) 

98 

99 def _encode_value(self, data: ElectricCurrentStatisticsData) -> bytearray: 

100 """Encode electric current statistics value back to bytes.""" 

101 avg_raw = round(data.average * 100) 

102 std_raw = round(data.standard_deviation * 100) 

103 min_raw = round(data.minimum * 100) 

104 max_raw = round(data.maximum * 100) 

105 dur_raw = _encode_time_exponential(data.sensing_duration) 

106 

107 for name, value in [ 

108 ("average", avg_raw), 

109 ("standard_deviation", std_raw), 

110 ("minimum", min_raw), 

111 ("maximum", max_raw), 

112 ]: 

113 if not 0 <= value <= UINT16_MAX: 

114 raise ValueError(f"Current {name} value {value} exceeds uint16 range") 

115 if not 0 <= dur_raw <= UINT8_MAX: 

116 raise ValueError(f"Duration raw {dur_raw} exceeds uint8 range") 

117 

118 result = bytearray() 

119 result.extend(DataParser.encode_int16(avg_raw, signed=False)) 

120 result.extend(DataParser.encode_int16(std_raw, signed=False)) 

121 result.extend(DataParser.encode_int16(min_raw, signed=False)) 

122 result.extend(DataParser.encode_int16(max_raw, signed=False)) 

123 result.extend(DataParser.encode_int8(dur_raw, signed=False)) 

124 

125 return result