Coverage for src/bluetooth_sig/gatt/characteristics/blood_pressure_measurement.py: 96%

50 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-30 00:10 +0000

1"""Blood Pressure Measurement characteristic implementation.""" 

2 

3from __future__ import annotations 

4 

5from enum import IntFlag 

6 

7import msgspec 

8 

9from bluetooth_sig.types.units import PressureUnit 

10 

11from ..context import CharacteristicContext 

12from .blood_pressure_common import ( 

13 BLOOD_PRESSURE_MAX_KPA, 

14 BLOOD_PRESSURE_MAX_MMHG, 

15 BaseBloodPressureCharacteristic, 

16 BloodPressureFlags, 

17 BloodPressureOptionalFields, 

18) 

19from .utils import IEEE11073Parser 

20 

21 

22class BloodPressureMeasurementStatus(IntFlag): 

23 """Blood Pressure Measurement Status flags as per Bluetooth SIG specification.""" 

24 

25 BODY_MOVEMENT_DETECTED = 0x0001 

26 CUFF_TOO_LOOSE = 0x0002 

27 IRREGULAR_PULSE_DETECTED = 0x0004 

28 PULSE_RATE_OUT_OF_RANGE = 0x0008 

29 IMPROPER_MEASUREMENT_POSITION = 0x0010 

30 

31 

32class BloodPressureData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods,too-many-instance-attributes 

33 """Parsed data from Blood Pressure Measurement characteristic.""" 

34 

35 systolic: float 

36 diastolic: float 

37 mean_arterial_pressure: float 

38 unit: PressureUnit 

39 optional_fields: BloodPressureOptionalFields = BloodPressureOptionalFields() 

40 flags: BloodPressureFlags = BloodPressureFlags(0) 

41 

42 def __post_init__(self) -> None: 

43 """Validate blood pressure data.""" 

44 if self.unit not in (PressureUnit.MMHG, PressureUnit.KPA): 

45 raise ValueError(f"Blood pressure unit must be MMHG or KPA, got {self.unit}") 

46 

47 if self.unit == PressureUnit.MMHG: 

48 valid_range = (0, BLOOD_PRESSURE_MAX_MMHG) 

49 else: # kPa 

50 valid_range = (0, BLOOD_PRESSURE_MAX_KPA) 

51 

52 for name, value in [ 

53 ("systolic", self.systolic), 

54 ("diastolic", self.diastolic), 

55 ("mean_arterial_pressure", self.mean_arterial_pressure), 

56 ]: 

57 if not valid_range[0] <= value <= valid_range[1]: 

58 raise ValueError( 

59 f"{name} pressure {value} {self.unit.value} is outside valid range " 

60 f"({valid_range[0]}-{valid_range[1]} {self.unit.value})" 

61 ) 

62 

63 

64class BloodPressureMeasurementCharacteristic(BaseBloodPressureCharacteristic): 

65 """Blood Pressure Measurement characteristic (0x2A35). 

66 

67 Used to transmit blood pressure measurements with systolic, 

68 diastolic and mean arterial pressure. 

69 

70 SIG Specification Pattern: 

71 This characteristic can use Blood Pressure Feature (0x2A49) to interpret 

72 which status flags are supported by the device. 

73 """ 

74 

75 _is_base_class = False # This is a concrete characteristic class 

76 

77 def decode_value(self, data: bytearray, ctx: CharacteristicContext | None = None) -> BloodPressureData: # pylint: disable=too-many-locals 

78 """Parse blood pressure measurement data according to Bluetooth specification. 

79 

80 Format: Flags(1) + Systolic(2) + Diastolic(2) + MAP(2) + [Timestamp(7)] + 

81 [Pulse Rate(2)] + [User ID(1)] + [Measurement Status(2)]. 

82 All pressure values are IEEE-11073 16-bit SFLOAT. 

83 

84 Args: 

85 data: Raw bytearray from BLE characteristic 

86 ctx: Optional context providing access to Blood Pressure Feature characteristic 

87 for validating which measurement status flags are supported 

88 

89 Returns: 

90 BloodPressureData containing parsed blood pressure data with metadata 

91 

92 SIG Pattern: 

93 When context is available, can validate that measurement status flags are 

94 within the device's supported features as indicated by Blood Pressure Feature. 

95 

96 """ 

97 if len(data) < 7: 

98 raise ValueError("Blood Pressure Measurement data must be at least 7 bytes") 

99 

100 flags = self._parse_blood_pressure_flags(data) 

101 

102 # Parse required fields 

103 systolic = IEEE11073Parser.parse_sfloat(data, 1) 

104 diastolic = IEEE11073Parser.parse_sfloat(data, 3) 

105 mean_arterial_pressure = IEEE11073Parser.parse_sfloat(data, 5) 

106 unit = self._parse_blood_pressure_unit(flags) 

107 

108 # Parse optional fields 

109 timestamp, pulse_rate, user_id, measurement_status = self._parse_optional_fields(data, flags) 

110 

111 # Create immutable struct with all values 

112 return BloodPressureData( 

113 systolic=systolic, 

114 diastolic=diastolic, 

115 mean_arterial_pressure=mean_arterial_pressure, 

116 unit=unit, 

117 optional_fields=BloodPressureOptionalFields( 

118 timestamp=timestamp, 

119 pulse_rate=pulse_rate, 

120 user_id=user_id, 

121 measurement_status=measurement_status, 

122 ), 

123 flags=flags, 

124 ) 

125 

126 def encode_value(self, data: BloodPressureData) -> bytearray: 

127 """Encode BloodPressureData back to bytes. 

128 

129 Args: 

130 data: BloodPressureData instance to encode 

131 

132 Returns: 

133 Encoded bytes representing the blood pressure measurement 

134 

135 """ 

136 result = bytearray() 

137 

138 flags = self._encode_blood_pressure_flags(data, data.optional_fields) 

139 result.append(flags) 

140 

141 result.extend(IEEE11073Parser.encode_sfloat(data.systolic)) 

142 result.extend(IEEE11073Parser.encode_sfloat(data.diastolic)) 

143 result.extend(IEEE11073Parser.encode_sfloat(data.mean_arterial_pressure)) 

144 

145 self._encode_optional_fields(result, data.optional_fields) 

146 

147 return result