Coverage for src / bluetooth_sig / gatt / characteristics / blood_pressure_common.py: 100%

89 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 20:14 +0000

1"""Shared constants and types for blood pressure characteristics.""" 

2 

3from __future__ import annotations 

4 

5from datetime import datetime 

6from enum import IntFlag 

7from typing import Any, Protocol 

8 

9import msgspec 

10 

11from bluetooth_sig.types.units import PressureUnit 

12 

13from .base import BaseCharacteristic 

14from .blood_pressure_feature import BloodPressureFeatureCharacteristic 

15from .utils import DataParser, IEEE11073Parser 

16 

17# Bluetooth SIG Blood Pressure Service specification constants 

18BLOOD_PRESSURE_MAX_MMHG = 300 # Maximum blood pressure in mmHg 

19BLOOD_PRESSURE_MAX_KPA = 40 # Maximum blood pressure in kPa 

20 

21 

22class BloodPressureFlags(IntFlag): 

23 """Blood Pressure flags as per Bluetooth SIG specification.""" 

24 

25 UNITS_KPA = 0x01 

26 TIMESTAMP_PRESENT = 0x02 

27 PULSE_RATE_PRESENT = 0x04 

28 USER_ID_PRESENT = 0x08 

29 MEASUREMENT_STATUS_PRESENT = 0x10 

30 

31 

32class BloodPressureOptionalFields(msgspec.Struct, frozen=True, kw_only=True): 

33 """Optional fields common to blood pressure characteristics.""" 

34 

35 timestamp: datetime | None = None 

36 pulse_rate: float | None = None 

37 user_id: int | None = None 

38 measurement_status: int | None = None 

39 

40 

41class BloodPressureDataProtocol(Protocol): 

42 """Protocol for blood pressure data structs with unit field.""" 

43 

44 @property 

45 def unit(self) -> PressureUnit: 

46 """Pressure unit for blood pressure measurement.""" 

47 

48 

49class BaseBloodPressureCharacteristic(BaseCharacteristic[Any]): 

50 """Base class for blood pressure characteristics with common parsing logic.""" 

51 

52 _is_base_class = True # Exclude from characteristic discovery 

53 

54 _manual_value_type = "string" # Override since decode_value returns dataclass 

55 

56 # Declare optional dependency on Blood Pressure Feature for status interpretation 

57 _optional_dependencies = [BloodPressureFeatureCharacteristic] 

58 

59 min_length = 7 # Flags(1) + Pressure values minimum 

60 max_length = 19 # + Timestamp(7) + PulseRate(2) + UserID(1) + MeasurementStatus(2) maximum 

61 allow_variable_length: bool = True # Variable optional fields 

62 

63 @staticmethod 

64 def _parse_blood_pressure_flags(data: bytearray) -> BloodPressureFlags: 

65 """Parse blood pressure flags from data.""" 

66 return BloodPressureFlags(data[0]) 

67 

68 @staticmethod 

69 def _parse_blood_pressure_unit(flags: BloodPressureFlags) -> PressureUnit: 

70 """Parse pressure unit from flags.""" 

71 return PressureUnit.KPA if flags & BloodPressureFlags.UNITS_KPA else PressureUnit.MMHG 

72 

73 @staticmethod 

74 def _parse_optional_fields( 

75 data: bytearray, flags: BloodPressureFlags, start_offset: int = 7 

76 ) -> tuple[datetime | None, float | None, int | None, int | None]: 

77 """Parse optional fields from blood pressure data. 

78 

79 Returns: 

80 Tuple of (timestamp, pulse_rate, user_id, measurement_status) 

81 """ 

82 timestamp: datetime | None = None 

83 pulse_rate: float | None = None 

84 user_id: int | None = None 

85 measurement_status: int | None = None 

86 offset = start_offset 

87 

88 if (flags & BloodPressureFlags.TIMESTAMP_PRESENT) and len(data) >= offset + 7: 

89 timestamp = IEEE11073Parser.parse_timestamp(data, offset) 

90 offset += 7 

91 

92 if (flags & BloodPressureFlags.PULSE_RATE_PRESENT) and len(data) >= offset + 2: 

93 pulse_rate = IEEE11073Parser.parse_sfloat(data, offset) 

94 offset += 2 

95 

96 if (flags & BloodPressureFlags.USER_ID_PRESENT) and len(data) >= offset + 1: 

97 user_id = data[offset] 

98 offset += 1 

99 

100 if (flags & BloodPressureFlags.MEASUREMENT_STATUS_PRESENT) and len(data) >= offset + 2: 

101 measurement_status = DataParser.parse_int16(data, offset, signed=False) 

102 

103 return timestamp, pulse_rate, user_id, measurement_status 

104 

105 @staticmethod 

106 def _encode_blood_pressure_flags( 

107 data: BloodPressureDataProtocol, 

108 optional_fields: BloodPressureOptionalFields, 

109 ) -> int: 

110 """Encode flags from blood pressure data struct and optional fields.""" 

111 flags = 0 

112 if data.unit == PressureUnit.KPA: 

113 flags |= BloodPressureFlags.UNITS_KPA 

114 if optional_fields.timestamp is not None: 

115 flags |= BloodPressureFlags.TIMESTAMP_PRESENT 

116 if optional_fields.pulse_rate is not None: 

117 flags |= BloodPressureFlags.PULSE_RATE_PRESENT 

118 if optional_fields.user_id is not None: 

119 flags |= BloodPressureFlags.USER_ID_PRESENT 

120 if optional_fields.measurement_status is not None: 

121 flags |= BloodPressureFlags.MEASUREMENT_STATUS_PRESENT 

122 return flags 

123 

124 @staticmethod 

125 def _encode_optional_fields(result: bytearray, optional_fields: BloodPressureOptionalFields) -> None: 

126 """Encode optional fields to result bytearray.""" 

127 if optional_fields.timestamp is not None: 

128 result.extend(IEEE11073Parser.encode_timestamp(optional_fields.timestamp)) 

129 

130 if optional_fields.pulse_rate is not None: 

131 result.extend(IEEE11073Parser.encode_sfloat(optional_fields.pulse_rate)) 

132 

133 if optional_fields.user_id is not None: 

134 result.append(optional_fields.user_id) 

135 

136 if optional_fields.measurement_status is not None: 

137 result.extend(DataParser.encode_int16(optional_fields.measurement_status, signed=False)) 

138 

139 def _encode_blood_pressure_base( 

140 self, 

141 data: BloodPressureDataProtocol, 

142 optional_fields: BloodPressureOptionalFields, 

143 pressure_values: list[float], 

144 ) -> bytearray: 

145 """Common encoding logic for blood pressure characteristics. 

146 

147 Args: 

148 data: Blood pressure data with unit field 

149 optional_fields: Optional fields to encode 

150 pressure_values: List of pressure values to encode (1-3 SFLOAT values) 

151 

152 Returns: 

153 Encoded bytearray 

154 """ 

155 result = bytearray() 

156 

157 flags = self._encode_blood_pressure_flags(data, optional_fields) 

158 result.append(flags) 

159 

160 for value in pressure_values: 

161 result.extend(IEEE11073Parser.encode_sfloat(value)) 

162 

163 self._encode_optional_fields(result, optional_fields) 

164 

165 return result