Coverage for src/bluetooth_sig/gatt/characteristics/blood_pressure_common.py: 100%
81 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 00:10 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 00:10 +0000
1"""Shared constants and types for blood pressure characteristics."""
3from __future__ import annotations
5from datetime import datetime
6from enum import IntFlag
7from typing import Protocol
9import msgspec
11from bluetooth_sig.types.units import PressureUnit
13from .base import BaseCharacteristic
14from .blood_pressure_feature import BloodPressureFeatureCharacteristic
15from .utils import DataParser, IEEE11073Parser
17# Bluetooth SIG Blood Pressure Service specification constants
18BLOOD_PRESSURE_MAX_MMHG = 300 # Maximum blood pressure in mmHg
19BLOOD_PRESSURE_MAX_KPA = 40 # Maximum blood pressure in kPa
22class BloodPressureFlags(IntFlag):
23 """Blood Pressure flags as per Bluetooth SIG specification."""
25 UNITS_KPA = 0x01
26 TIMESTAMP_PRESENT = 0x02
27 PULSE_RATE_PRESENT = 0x04
28 USER_ID_PRESENT = 0x08
29 MEASUREMENT_STATUS_PRESENT = 0x10
32class BloodPressureOptionalFields(msgspec.Struct, frozen=True, kw_only=True):
33 """Optional fields common to blood pressure characteristics."""
35 timestamp: datetime | None = None
36 pulse_rate: float | None = None
37 user_id: int | None = None
38 measurement_status: int | None = None
41class BloodPressureDataProtocol(Protocol):
42 """Protocol for blood pressure data structs with unit field."""
44 @property
45 def unit(self) -> PressureUnit:
46 """Pressure unit for blood pressure measurement."""
47 ...
50class BaseBloodPressureCharacteristic(BaseCharacteristic):
51 """Base class for blood pressure characteristics with common parsing logic."""
53 _is_base_class = True # Exclude from characteristic discovery
55 _manual_value_type = "string" # Override since decode_value returns dataclass
57 # Declare optional dependency on Blood Pressure Feature for status interpretation
58 _optional_dependencies = [BloodPressureFeatureCharacteristic]
60 min_length = 7 # Flags(1) + Pressure values minimum
61 max_length = 19 # + Timestamp(7) + PulseRate(2) + UserID(1) + MeasurementStatus(2) maximum
62 allow_variable_length: bool = True # Variable optional fields
64 @staticmethod
65 def _parse_blood_pressure_flags(data: bytearray) -> BloodPressureFlags:
66 """Parse blood pressure flags from data."""
67 return BloodPressureFlags(data[0])
69 @staticmethod
70 def _parse_blood_pressure_unit(flags: BloodPressureFlags) -> PressureUnit:
71 """Parse pressure unit from flags."""
72 return PressureUnit.KPA if flags & BloodPressureFlags.UNITS_KPA else PressureUnit.MMHG
74 @staticmethod
75 def _parse_optional_fields(
76 data: bytearray, flags: BloodPressureFlags, start_offset: int = 7
77 ) -> tuple[datetime | None, float | None, int | None, int | None]:
78 """Parse optional fields from blood pressure data.
80 Returns:
81 Tuple of (timestamp, pulse_rate, user_id, measurement_status)
82 """
83 timestamp: datetime | None = None
84 pulse_rate: float | None = None
85 user_id: int | None = None
86 measurement_status: int | None = None
87 offset = start_offset
89 if (flags & BloodPressureFlags.TIMESTAMP_PRESENT) and len(data) >= offset + 7:
90 timestamp = IEEE11073Parser.parse_timestamp(data, offset)
91 offset += 7
93 if (flags & BloodPressureFlags.PULSE_RATE_PRESENT) and len(data) >= offset + 2:
94 pulse_rate = IEEE11073Parser.parse_sfloat(data, offset)
95 offset += 2
97 if (flags & BloodPressureFlags.USER_ID_PRESENT) and len(data) >= offset + 1:
98 user_id = data[offset]
99 offset += 1
101 if (flags & BloodPressureFlags.MEASUREMENT_STATUS_PRESENT) and len(data) >= offset + 2:
102 measurement_status = DataParser.parse_int16(data, offset, signed=False)
104 return timestamp, pulse_rate, user_id, measurement_status
106 @staticmethod
107 def _encode_blood_pressure_flags(
108 data: BloodPressureDataProtocol,
109 optional_fields: BloodPressureOptionalFields,
110 ) -> int:
111 """Encode flags from blood pressure data struct and optional fields."""
112 flags = 0
113 if data.unit == PressureUnit.KPA:
114 flags |= BloodPressureFlags.UNITS_KPA
115 if optional_fields.timestamp is not None:
116 flags |= BloodPressureFlags.TIMESTAMP_PRESENT
117 if optional_fields.pulse_rate is not None:
118 flags |= BloodPressureFlags.PULSE_RATE_PRESENT
119 if optional_fields.user_id is not None:
120 flags |= BloodPressureFlags.USER_ID_PRESENT
121 if optional_fields.measurement_status is not None:
122 flags |= BloodPressureFlags.MEASUREMENT_STATUS_PRESENT
123 return flags
125 @staticmethod
126 def _encode_optional_fields(result: bytearray, optional_fields: BloodPressureOptionalFields) -> None:
127 """Encode optional fields to result bytearray."""
128 if optional_fields.timestamp is not None:
129 result.extend(IEEE11073Parser.encode_timestamp(optional_fields.timestamp))
131 if optional_fields.pulse_rate is not None:
132 result.extend(IEEE11073Parser.encode_sfloat(optional_fields.pulse_rate))
134 if optional_fields.user_id is not None:
135 result.append(optional_fields.user_id)
137 if optional_fields.measurement_status is not None:
138 result.extend(DataParser.encode_int16(optional_fields.measurement_status, signed=False))