Coverage for src/bluetooth_sig/gatt/characteristics/blood_pressure_common.py: 100%

81 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-30 00:10 +0000

1"""Shared constants and types for blood pressure characteristics.""" 

2 

3from __future__ import annotations 

4 

5from datetime import datetime 

6from enum import IntFlag 

7from typing import Protocol 

8 

9import msgspec 

10 

11from bluetooth_sig.types.units import PressureUnit 

12 

13from .base import BaseCharacteristic 

14from .blood_pressure_feature import BloodPressureFeatureCharacteristic 

15from .utils import DataParser, IEEE11073Parser 

16 

17# Bluetooth SIG Blood Pressure Service specification constants 

18BLOOD_PRESSURE_MAX_MMHG = 300 # Maximum blood pressure in mmHg 

19BLOOD_PRESSURE_MAX_KPA = 40 # Maximum blood pressure in kPa 

20 

21 

22class BloodPressureFlags(IntFlag): 

23 """Blood Pressure flags as per Bluetooth SIG specification.""" 

24 

25 UNITS_KPA = 0x01 

26 TIMESTAMP_PRESENT = 0x02 

27 PULSE_RATE_PRESENT = 0x04 

28 USER_ID_PRESENT = 0x08 

29 MEASUREMENT_STATUS_PRESENT = 0x10 

30 

31 

32class BloodPressureOptionalFields(msgspec.Struct, frozen=True, kw_only=True): 

33 """Optional fields common to blood pressure characteristics.""" 

34 

35 timestamp: datetime | None = None 

36 pulse_rate: float | None = None 

37 user_id: int | None = None 

38 measurement_status: int | None = None 

39 

40 

41class BloodPressureDataProtocol(Protocol): 

42 """Protocol for blood pressure data structs with unit field.""" 

43 

44 @property 

45 def unit(self) -> PressureUnit: 

46 """Pressure unit for blood pressure measurement.""" 

47 ... 

48 

49 

50class BaseBloodPressureCharacteristic(BaseCharacteristic): 

51 """Base class for blood pressure characteristics with common parsing logic.""" 

52 

53 _is_base_class = True # Exclude from characteristic discovery 

54 

55 _manual_value_type = "string" # Override since decode_value returns dataclass 

56 

57 # Declare optional dependency on Blood Pressure Feature for status interpretation 

58 _optional_dependencies = [BloodPressureFeatureCharacteristic] 

59 

60 min_length = 7 # Flags(1) + Pressure values minimum 

61 max_length = 19 # + Timestamp(7) + PulseRate(2) + UserID(1) + MeasurementStatus(2) maximum 

62 allow_variable_length: bool = True # Variable optional fields 

63 

64 @staticmethod 

65 def _parse_blood_pressure_flags(data: bytearray) -> BloodPressureFlags: 

66 """Parse blood pressure flags from data.""" 

67 return BloodPressureFlags(data[0]) 

68 

69 @staticmethod 

70 def _parse_blood_pressure_unit(flags: BloodPressureFlags) -> PressureUnit: 

71 """Parse pressure unit from flags.""" 

72 return PressureUnit.KPA if flags & BloodPressureFlags.UNITS_KPA else PressureUnit.MMHG 

73 

74 @staticmethod 

75 def _parse_optional_fields( 

76 data: bytearray, flags: BloodPressureFlags, start_offset: int = 7 

77 ) -> tuple[datetime | None, float | None, int | None, int | None]: 

78 """Parse optional fields from blood pressure data. 

79 

80 Returns: 

81 Tuple of (timestamp, pulse_rate, user_id, measurement_status) 

82 """ 

83 timestamp: datetime | None = None 

84 pulse_rate: float | None = None 

85 user_id: int | None = None 

86 measurement_status: int | None = None 

87 offset = start_offset 

88 

89 if (flags & BloodPressureFlags.TIMESTAMP_PRESENT) and len(data) >= offset + 7: 

90 timestamp = IEEE11073Parser.parse_timestamp(data, offset) 

91 offset += 7 

92 

93 if (flags & BloodPressureFlags.PULSE_RATE_PRESENT) and len(data) >= offset + 2: 

94 pulse_rate = IEEE11073Parser.parse_sfloat(data, offset) 

95 offset += 2 

96 

97 if (flags & BloodPressureFlags.USER_ID_PRESENT) and len(data) >= offset + 1: 

98 user_id = data[offset] 

99 offset += 1 

100 

101 if (flags & BloodPressureFlags.MEASUREMENT_STATUS_PRESENT) and len(data) >= offset + 2: 

102 measurement_status = DataParser.parse_int16(data, offset, signed=False) 

103 

104 return timestamp, pulse_rate, user_id, measurement_status 

105 

106 @staticmethod 

107 def _encode_blood_pressure_flags( 

108 data: BloodPressureDataProtocol, 

109 optional_fields: BloodPressureOptionalFields, 

110 ) -> int: 

111 """Encode flags from blood pressure data struct and optional fields.""" 

112 flags = 0 

113 if data.unit == PressureUnit.KPA: 

114 flags |= BloodPressureFlags.UNITS_KPA 

115 if optional_fields.timestamp is not None: 

116 flags |= BloodPressureFlags.TIMESTAMP_PRESENT 

117 if optional_fields.pulse_rate is not None: 

118 flags |= BloodPressureFlags.PULSE_RATE_PRESENT 

119 if optional_fields.user_id is not None: 

120 flags |= BloodPressureFlags.USER_ID_PRESENT 

121 if optional_fields.measurement_status is not None: 

122 flags |= BloodPressureFlags.MEASUREMENT_STATUS_PRESENT 

123 return flags 

124 

125 @staticmethod 

126 def _encode_optional_fields(result: bytearray, optional_fields: BloodPressureOptionalFields) -> None: 

127 """Encode optional fields to result bytearray.""" 

128 if optional_fields.timestamp is not None: 

129 result.extend(IEEE11073Parser.encode_timestamp(optional_fields.timestamp)) 

130 

131 if optional_fields.pulse_rate is not None: 

132 result.extend(IEEE11073Parser.encode_sfloat(optional_fields.pulse_rate)) 

133 

134 if optional_fields.user_id is not None: 

135 result.append(optional_fields.user_id) 

136 

137 if optional_fields.measurement_status is not None: 

138 result.extend(DataParser.encode_int16(optional_fields.measurement_status, signed=False))