Coverage for src / bluetooth_sig / gatt / characteristics / blood_pressure_common.py: 100%
89 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 20:14 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 20:14 +0000
1"""Shared constants and types for blood pressure characteristics."""
3from __future__ import annotations
5from datetime import datetime
6from enum import IntFlag
7from typing import Any, Protocol
9import msgspec
11from bluetooth_sig.types.units import PressureUnit
13from .base import BaseCharacteristic
14from .blood_pressure_feature import BloodPressureFeatureCharacteristic
15from .utils import DataParser, IEEE11073Parser
17# Bluetooth SIG Blood Pressure Service specification constants
18BLOOD_PRESSURE_MAX_MMHG = 300 # Maximum blood pressure in mmHg
19BLOOD_PRESSURE_MAX_KPA = 40 # Maximum blood pressure in kPa
22class BloodPressureFlags(IntFlag):
23 """Blood Pressure flags as per Bluetooth SIG specification."""
25 UNITS_KPA = 0x01
26 TIMESTAMP_PRESENT = 0x02
27 PULSE_RATE_PRESENT = 0x04
28 USER_ID_PRESENT = 0x08
29 MEASUREMENT_STATUS_PRESENT = 0x10
32class BloodPressureOptionalFields(msgspec.Struct, frozen=True, kw_only=True):
33 """Optional fields common to blood pressure characteristics."""
35 timestamp: datetime | None = None
36 pulse_rate: float | None = None
37 user_id: int | None = None
38 measurement_status: int | None = None
41class BloodPressureDataProtocol(Protocol):
42 """Protocol for blood pressure data structs with unit field."""
44 @property
45 def unit(self) -> PressureUnit:
46 """Pressure unit for blood pressure measurement."""
49class BaseBloodPressureCharacteristic(BaseCharacteristic[Any]):
50 """Base class for blood pressure characteristics with common parsing logic."""
52 _is_base_class = True # Exclude from characteristic discovery
54 _manual_value_type = "string" # Override since decode_value returns dataclass
56 # Declare optional dependency on Blood Pressure Feature for status interpretation
57 _optional_dependencies = [BloodPressureFeatureCharacteristic]
59 min_length = 7 # Flags(1) + Pressure values minimum
60 max_length = 19 # + Timestamp(7) + PulseRate(2) + UserID(1) + MeasurementStatus(2) maximum
61 allow_variable_length: bool = True # Variable optional fields
63 @staticmethod
64 def _parse_blood_pressure_flags(data: bytearray) -> BloodPressureFlags:
65 """Parse blood pressure flags from data."""
66 return BloodPressureFlags(data[0])
68 @staticmethod
69 def _parse_blood_pressure_unit(flags: BloodPressureFlags) -> PressureUnit:
70 """Parse pressure unit from flags."""
71 return PressureUnit.KPA if flags & BloodPressureFlags.UNITS_KPA else PressureUnit.MMHG
73 @staticmethod
74 def _parse_optional_fields(
75 data: bytearray, flags: BloodPressureFlags, start_offset: int = 7
76 ) -> tuple[datetime | None, float | None, int | None, int | None]:
77 """Parse optional fields from blood pressure data.
79 Returns:
80 Tuple of (timestamp, pulse_rate, user_id, measurement_status)
81 """
82 timestamp: datetime | None = None
83 pulse_rate: float | None = None
84 user_id: int | None = None
85 measurement_status: int | None = None
86 offset = start_offset
88 if (flags & BloodPressureFlags.TIMESTAMP_PRESENT) and len(data) >= offset + 7:
89 timestamp = IEEE11073Parser.parse_timestamp(data, offset)
90 offset += 7
92 if (flags & BloodPressureFlags.PULSE_RATE_PRESENT) and len(data) >= offset + 2:
93 pulse_rate = IEEE11073Parser.parse_sfloat(data, offset)
94 offset += 2
96 if (flags & BloodPressureFlags.USER_ID_PRESENT) and len(data) >= offset + 1:
97 user_id = data[offset]
98 offset += 1
100 if (flags & BloodPressureFlags.MEASUREMENT_STATUS_PRESENT) and len(data) >= offset + 2:
101 measurement_status = DataParser.parse_int16(data, offset, signed=False)
103 return timestamp, pulse_rate, user_id, measurement_status
105 @staticmethod
106 def _encode_blood_pressure_flags(
107 data: BloodPressureDataProtocol,
108 optional_fields: BloodPressureOptionalFields,
109 ) -> int:
110 """Encode flags from blood pressure data struct and optional fields."""
111 flags = 0
112 if data.unit == PressureUnit.KPA:
113 flags |= BloodPressureFlags.UNITS_KPA
114 if optional_fields.timestamp is not None:
115 flags |= BloodPressureFlags.TIMESTAMP_PRESENT
116 if optional_fields.pulse_rate is not None:
117 flags |= BloodPressureFlags.PULSE_RATE_PRESENT
118 if optional_fields.user_id is not None:
119 flags |= BloodPressureFlags.USER_ID_PRESENT
120 if optional_fields.measurement_status is not None:
121 flags |= BloodPressureFlags.MEASUREMENT_STATUS_PRESENT
122 return flags
124 @staticmethod
125 def _encode_optional_fields(result: bytearray, optional_fields: BloodPressureOptionalFields) -> None:
126 """Encode optional fields to result bytearray."""
127 if optional_fields.timestamp is not None:
128 result.extend(IEEE11073Parser.encode_timestamp(optional_fields.timestamp))
130 if optional_fields.pulse_rate is not None:
131 result.extend(IEEE11073Parser.encode_sfloat(optional_fields.pulse_rate))
133 if optional_fields.user_id is not None:
134 result.append(optional_fields.user_id)
136 if optional_fields.measurement_status is not None:
137 result.extend(DataParser.encode_int16(optional_fields.measurement_status, signed=False))
139 def _encode_blood_pressure_base(
140 self,
141 data: BloodPressureDataProtocol,
142 optional_fields: BloodPressureOptionalFields,
143 pressure_values: list[float],
144 ) -> bytearray:
145 """Common encoding logic for blood pressure characteristics.
147 Args:
148 data: Blood pressure data with unit field
149 optional_fields: Optional fields to encode
150 pressure_values: List of pressure values to encode (1-3 SFLOAT values)
152 Returns:
153 Encoded bytearray
154 """
155 result = bytearray()
157 flags = self._encode_blood_pressure_flags(data, optional_fields)
158 result.append(flags)
160 for value in pressure_values:
161 result.extend(IEEE11073Parser.encode_sfloat(value))
163 self._encode_optional_fields(result, optional_fields)
165 return result