Coverage for src / bluetooth_sig / gatt / characteristics / blood_pressure_common.py: 100%

88 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 11:17 +0000

1"""Shared constants and types for blood pressure characteristics.""" 

2 

3from __future__ import annotations 

4 

5from datetime import datetime 

6from enum import IntFlag 

7from typing import Any, ClassVar, Protocol 

8 

9import msgspec 

10 

11from bluetooth_sig.types.units import PressureUnit 

12 

13from .base import BaseCharacteristic 

14from .blood_pressure_feature import BloodPressureFeatureCharacteristic 

15from .utils import DataParser, IEEE11073Parser 

16 

17# Bluetooth SIG Blood Pressure Service specification constants 

18BLOOD_PRESSURE_MAX_MMHG = 300 # Maximum blood pressure in mmHg 

19BLOOD_PRESSURE_MAX_KPA = 40 # Maximum blood pressure in kPa 

20 

21 

22class BloodPressureFlags(IntFlag): 

23 """Blood Pressure flags as per Bluetooth SIG specification.""" 

24 

25 UNITS_KPA = 0x01 

26 TIMESTAMP_PRESENT = 0x02 

27 PULSE_RATE_PRESENT = 0x04 

28 USER_ID_PRESENT = 0x08 

29 MEASUREMENT_STATUS_PRESENT = 0x10 

30 

31 

32class BloodPressureOptionalFields(msgspec.Struct, frozen=True, kw_only=True): 

33 """Optional fields common to blood pressure characteristics.""" 

34 

35 timestamp: datetime | None = None 

36 pulse_rate: float | None = None 

37 user_id: int | None = None 

38 measurement_status: int | None = None 

39 

40 

41class BloodPressureDataProtocol(Protocol): 

42 """Protocol for blood pressure data structs with unit field.""" 

43 

44 @property 

45 def unit(self) -> PressureUnit: 

46 """Pressure unit for blood pressure measurement.""" 

47 

48 

49class BaseBloodPressureCharacteristic(BaseCharacteristic[Any]): 

50 """Base class for blood pressure characteristics with common parsing logic.""" 

51 

52 _is_base_class = True # Exclude from characteristic discovery 

53 

54 # Declare optional dependency on Blood Pressure Feature for status interpretation 

55 _optional_dependencies: ClassVar[list[type[BaseCharacteristic[Any]]]] = [BloodPressureFeatureCharacteristic] 

56 

57 min_length = 7 # Flags(1) + Pressure values minimum 

58 max_length = 19 # + Timestamp(7) + PulseRate(2) + UserID(1) + MeasurementStatus(2) maximum 

59 allow_variable_length: bool = True # Variable optional fields 

60 

61 @staticmethod 

62 def _parse_blood_pressure_flags(data: bytearray) -> BloodPressureFlags: 

63 """Parse blood pressure flags from data.""" 

64 return BloodPressureFlags(data[0]) 

65 

66 @staticmethod 

67 def _parse_blood_pressure_unit(flags: BloodPressureFlags) -> PressureUnit: 

68 """Parse pressure unit from flags.""" 

69 return PressureUnit.KPA if flags & BloodPressureFlags.UNITS_KPA else PressureUnit.MMHG 

70 

71 @staticmethod 

72 def _parse_optional_fields( 

73 data: bytearray, flags: BloodPressureFlags, start_offset: int = 7 

74 ) -> tuple[datetime | None, float | None, int | None, int | None]: 

75 """Parse optional fields from blood pressure data. 

76 

77 Returns: 

78 Tuple of (timestamp, pulse_rate, user_id, measurement_status) 

79 """ 

80 timestamp: datetime | None = None 

81 pulse_rate: float | None = None 

82 user_id: int | None = None 

83 measurement_status: int | None = None 

84 offset = start_offset 

85 

86 if (flags & BloodPressureFlags.TIMESTAMP_PRESENT) and len(data) >= offset + 7: 

87 timestamp = IEEE11073Parser.parse_timestamp(data, offset) 

88 offset += 7 

89 

90 if (flags & BloodPressureFlags.PULSE_RATE_PRESENT) and len(data) >= offset + 2: 

91 pulse_rate = IEEE11073Parser.parse_sfloat(data, offset) 

92 offset += 2 

93 

94 if (flags & BloodPressureFlags.USER_ID_PRESENT) and len(data) >= offset + 1: 

95 user_id = data[offset] 

96 offset += 1 

97 

98 if (flags & BloodPressureFlags.MEASUREMENT_STATUS_PRESENT) and len(data) >= offset + 2: 

99 measurement_status = DataParser.parse_int16(data, offset, signed=False) 

100 

101 return timestamp, pulse_rate, user_id, measurement_status 

102 

103 @staticmethod 

104 def _encode_blood_pressure_flags( 

105 data: BloodPressureDataProtocol, 

106 optional_fields: BloodPressureOptionalFields, 

107 ) -> int: 

108 """Encode flags from blood pressure data struct and optional fields.""" 

109 flags = 0 

110 if data.unit == PressureUnit.KPA: 

111 flags |= BloodPressureFlags.UNITS_KPA 

112 if optional_fields.timestamp is not None: 

113 flags |= BloodPressureFlags.TIMESTAMP_PRESENT 

114 if optional_fields.pulse_rate is not None: 

115 flags |= BloodPressureFlags.PULSE_RATE_PRESENT 

116 if optional_fields.user_id is not None: 

117 flags |= BloodPressureFlags.USER_ID_PRESENT 

118 if optional_fields.measurement_status is not None: 

119 flags |= BloodPressureFlags.MEASUREMENT_STATUS_PRESENT 

120 return flags 

121 

122 @staticmethod 

123 def _encode_optional_fields(result: bytearray, optional_fields: BloodPressureOptionalFields) -> None: 

124 """Encode optional fields to result bytearray.""" 

125 if optional_fields.timestamp is not None: 

126 result.extend(IEEE11073Parser.encode_timestamp(optional_fields.timestamp)) 

127 

128 if optional_fields.pulse_rate is not None: 

129 result.extend(IEEE11073Parser.encode_sfloat(optional_fields.pulse_rate)) 

130 

131 if optional_fields.user_id is not None: 

132 result.append(optional_fields.user_id) 

133 

134 if optional_fields.measurement_status is not None: 

135 result.extend(DataParser.encode_int16(optional_fields.measurement_status, signed=False)) 

136 

137 def _encode_blood_pressure_base( 

138 self, 

139 data: BloodPressureDataProtocol, 

140 optional_fields: BloodPressureOptionalFields, 

141 pressure_values: list[float], 

142 ) -> bytearray: 

143 """Common encoding logic for blood pressure characteristics. 

144 

145 Args: 

146 data: Blood pressure data with unit field 

147 optional_fields: Optional fields to encode 

148 pressure_values: List of pressure values to encode (1-3 SFLOAT values) 

149 

150 Returns: 

151 Encoded bytearray 

152 """ 

153 result = bytearray() 

154 

155 flags = self._encode_blood_pressure_flags(data, optional_fields) 

156 result.append(flags) 

157 

158 for value in pressure_values: 

159 result.extend(IEEE11073Parser.encode_sfloat(value)) 

160 

161 self._encode_optional_fields(result, optional_fields) 

162 

163 return result