Coverage for src / bluetooth_sig / gatt / characteristics / blood_pressure_common.py: 100%
88 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
1"""Shared constants and types for blood pressure characteristics."""
3from __future__ import annotations
5from datetime import datetime
6from enum import IntFlag
7from typing import Any, ClassVar, Protocol
9import msgspec
11from bluetooth_sig.types.units import PressureUnit
13from .base import BaseCharacteristic
14from .blood_pressure_feature import BloodPressureFeatureCharacteristic
15from .utils import DataParser, IEEE11073Parser
17# Bluetooth SIG Blood Pressure Service specification constants
18BLOOD_PRESSURE_MAX_MMHG = 300 # Maximum blood pressure in mmHg
19BLOOD_PRESSURE_MAX_KPA = 40 # Maximum blood pressure in kPa
22class BloodPressureFlags(IntFlag):
23 """Blood Pressure flags as per Bluetooth SIG specification."""
25 UNITS_KPA = 0x01
26 TIMESTAMP_PRESENT = 0x02
27 PULSE_RATE_PRESENT = 0x04
28 USER_ID_PRESENT = 0x08
29 MEASUREMENT_STATUS_PRESENT = 0x10
32class BloodPressureOptionalFields(msgspec.Struct, frozen=True, kw_only=True):
33 """Optional fields common to blood pressure characteristics."""
35 timestamp: datetime | None = None
36 pulse_rate: float | None = None
37 user_id: int | None = None
38 measurement_status: int | None = None
41class BloodPressureDataProtocol(Protocol):
42 """Protocol for blood pressure data structs with unit field."""
44 @property
45 def unit(self) -> PressureUnit:
46 """Pressure unit for blood pressure measurement."""
49class BaseBloodPressureCharacteristic(BaseCharacteristic[Any]):
50 """Base class for blood pressure characteristics with common parsing logic."""
52 _is_base_class = True # Exclude from characteristic discovery
54 # Declare optional dependency on Blood Pressure Feature for status interpretation
55 _optional_dependencies: ClassVar[list[type[BaseCharacteristic[Any]]]] = [BloodPressureFeatureCharacteristic]
57 min_length = 7 # Flags(1) + Pressure values minimum
58 max_length = 19 # + Timestamp(7) + PulseRate(2) + UserID(1) + MeasurementStatus(2) maximum
59 allow_variable_length: bool = True # Variable optional fields
61 @staticmethod
62 def _parse_blood_pressure_flags(data: bytearray) -> BloodPressureFlags:
63 """Parse blood pressure flags from data."""
64 return BloodPressureFlags(data[0])
66 @staticmethod
67 def _parse_blood_pressure_unit(flags: BloodPressureFlags) -> PressureUnit:
68 """Parse pressure unit from flags."""
69 return PressureUnit.KPA if flags & BloodPressureFlags.UNITS_KPA else PressureUnit.MMHG
71 @staticmethod
72 def _parse_optional_fields(
73 data: bytearray, flags: BloodPressureFlags, start_offset: int = 7
74 ) -> tuple[datetime | None, float | None, int | None, int | None]:
75 """Parse optional fields from blood pressure data.
77 Returns:
78 Tuple of (timestamp, pulse_rate, user_id, measurement_status)
79 """
80 timestamp: datetime | None = None
81 pulse_rate: float | None = None
82 user_id: int | None = None
83 measurement_status: int | None = None
84 offset = start_offset
86 if (flags & BloodPressureFlags.TIMESTAMP_PRESENT) and len(data) >= offset + 7:
87 timestamp = IEEE11073Parser.parse_timestamp(data, offset)
88 offset += 7
90 if (flags & BloodPressureFlags.PULSE_RATE_PRESENT) and len(data) >= offset + 2:
91 pulse_rate = IEEE11073Parser.parse_sfloat(data, offset)
92 offset += 2
94 if (flags & BloodPressureFlags.USER_ID_PRESENT) and len(data) >= offset + 1:
95 user_id = data[offset]
96 offset += 1
98 if (flags & BloodPressureFlags.MEASUREMENT_STATUS_PRESENT) and len(data) >= offset + 2:
99 measurement_status = DataParser.parse_int16(data, offset, signed=False)
101 return timestamp, pulse_rate, user_id, measurement_status
103 @staticmethod
104 def _encode_blood_pressure_flags(
105 data: BloodPressureDataProtocol,
106 optional_fields: BloodPressureOptionalFields,
107 ) -> int:
108 """Encode flags from blood pressure data struct and optional fields."""
109 flags = 0
110 if data.unit == PressureUnit.KPA:
111 flags |= BloodPressureFlags.UNITS_KPA
112 if optional_fields.timestamp is not None:
113 flags |= BloodPressureFlags.TIMESTAMP_PRESENT
114 if optional_fields.pulse_rate is not None:
115 flags |= BloodPressureFlags.PULSE_RATE_PRESENT
116 if optional_fields.user_id is not None:
117 flags |= BloodPressureFlags.USER_ID_PRESENT
118 if optional_fields.measurement_status is not None:
119 flags |= BloodPressureFlags.MEASUREMENT_STATUS_PRESENT
120 return flags
122 @staticmethod
123 def _encode_optional_fields(result: bytearray, optional_fields: BloodPressureOptionalFields) -> None:
124 """Encode optional fields to result bytearray."""
125 if optional_fields.timestamp is not None:
126 result.extend(IEEE11073Parser.encode_timestamp(optional_fields.timestamp))
128 if optional_fields.pulse_rate is not None:
129 result.extend(IEEE11073Parser.encode_sfloat(optional_fields.pulse_rate))
131 if optional_fields.user_id is not None:
132 result.append(optional_fields.user_id)
134 if optional_fields.measurement_status is not None:
135 result.extend(DataParser.encode_int16(optional_fields.measurement_status, signed=False))
137 def _encode_blood_pressure_base(
138 self,
139 data: BloodPressureDataProtocol,
140 optional_fields: BloodPressureOptionalFields,
141 pressure_values: list[float],
142 ) -> bytearray:
143 """Common encoding logic for blood pressure characteristics.
145 Args:
146 data: Blood pressure data with unit field
147 optional_fields: Optional fields to encode
148 pressure_values: List of pressure values to encode (1-3 SFLOAT values)
150 Returns:
151 Encoded bytearray
152 """
153 result = bytearray()
155 flags = self._encode_blood_pressure_flags(data, optional_fields)
156 result.append(flags)
158 for value in pressure_values:
159 result.extend(IEEE11073Parser.encode_sfloat(value))
161 self._encode_optional_fields(result, optional_fields)
163 return result