Source code for mwings.common

# -*- coding:utf-8 -*-
# Written for Python 3.12
# Formatted with Black

# MWings common defs

from __future__ import annotations

from struct import unpack
from enum import IntEnum, StrEnum, auto
from datetime import datetime, timezone, tzinfo
from collections import OrderedDict
from collections.abc import Sequence, MutableSequence, Iterable
from json import dumps
from abc import ABC, abstractmethod
from importlib import metadata
import platform
from typing import (
    Any,
    Callable,
    TypeVar,
    final,
    overload,
    TYPE_CHECKING,
)

from overrides import override
from pydantic_core import CoreSchema, core_schema
from pydantic import (
    BaseModel,
    ConfigDict,
    Field,
    GetCoreSchemaHandler,
    computed_field,
    field_serializer,
    field_validator,
)
from pydantic.types import AwareDatetime

from . import utils


try:
    import pandas as pd

    PANDAS_AVAILABLE = True
except ImportError:
    if TYPE_CHECKING:
        raise ImportError("pandas required for dev")
    else:
        PANDAS_AVAILABLE = False


Timezone: tzinfo = timezone.utc
"""Global tzinfo"""


SomeCallable = TypeVar("SomeCallable", bound=Callable[..., Any])
"""TypeVar for handlers"""


[docs] class DtypedDecimal(ABC):
[docs] def get_dtype(self) -> str: """Provide dtype info for pandas Returns ------- str dtype identifier """ return "int64"
@classmethod def __get_pydantic_core_schema__( cls, source_type: Any, handler: GetCoreSchemaHandler ) -> CoreSchema: """Override the pydantic schema for serialization For JSON, serialize as plain int or float. For dict, serialize as object of subclass derived from int or float. Parameters ---------- source_type : Any The class we are generating a schema for. handler : GetCoreSchemaHandler Call into Pydantic's internal schema generation. Returns ------- CoreSchema A `pydantic-core` `CoreSchema` """ return core_schema.json_or_python_schema( json_schema=core_schema.decimal_schema(), python_schema=core_schema.any_schema(), )
[docs] class UInt8(int, DtypedDecimal): def __new__(cls, value: int | None = None) -> UInt8: """Create immutable instance Parameters ---------- value : int, optional Value to set Returns ------- UInt8 Wrapped value Raises ------ ValueError Out of range """ if value is None: value = 0 if not (0 <= value < 2**8): raise ValueError("The given value is out of the range") return super(UInt8, cls).__new__(cls, value)
[docs] @override def get_dtype(self) -> str: """Provide dtype info for pandas Returns ------- str dtype identifier """ return "uint8"
[docs] def hex(self) -> str: """Returns hex representation python `hex()` function does not accept object of subclass derived from int. Returns ------- str Hex string (lower case) """ return f"{self:02x}"
[docs] class Int8(int, DtypedDecimal): def __new__(cls, value: int | None = None) -> Int8: """Create immutable instance Parameters ---------- value : int, optional Value to set Returns ------- Int8 Wrapped value Raises ------ ValueError Out of range """ if value is None: value = 0 if not (-(2**7) <= value < 2**7): raise ValueError("The given value is out of the range") return super(Int8, cls).__new__(cls, value)
[docs] @override def get_dtype(self) -> str: """Provide dtype info for pandas Returns ------- str dtype identifier """ return "int8"
[docs] def hex(self) -> str: """Returns hex representation python `hex()` function does not accept object of subclass derived from int. Returns ------- str Hex string (lower case) """ return f"{self:02x}"
[docs] class UInt16(int, DtypedDecimal): def __new__(cls, value: int | None = None) -> UInt16: """Create immutable instance Parameters ---------- value : int, optional Value to set Returns ------- UInt16 Wrapped value Raises ------ ValueError Out of range """ if value is None: value = 0 if not (0 <= value < 2**16): raise ValueError("The given value is out of the range") return super(UInt16, cls).__new__(cls, value)
[docs] @override def get_dtype(self) -> str: """Provide dtype info for pandas Returns ------- str dtype identifier """ return "uint16"
[docs] def hex(self) -> str: """Returns hex representation python `hex()` function does not accept object of subclass derived from int. Returns ------- str Hex string (lower case) """ return f"{self:04x}"
[docs] class Int16(int, DtypedDecimal): def __new__(cls, value: int | None = None) -> Int16: """Create immutable instance Parameters ---------- value : int, optional Value to set Returns ------- Int16 Wrapped value Raises ------ ValueError Out of range """ if value is None: value = 0 if not (-(2**15) <= value < 2**15): raise ValueError("The given value is out of the range") return super(Int16, cls).__new__(cls, value)
[docs] @override def get_dtype(self) -> str: """Provide dtype info for pandas Returns ------- str dtype identifier """ return "int16"
[docs] def hex(self) -> str: """Returns hex representation python `hex()` function does not accept object of subclass derived from int. Returns ------- str Hex string (lower case) """ return f"{self:04x}"
[docs] class UInt32(int, DtypedDecimal): def __new__(cls, value: int | None = None) -> UInt32: """Create immutable instance Parameters ---------- value : int, optional Value to set Returns ------- UInt32 Wrapped value Raises ------ ValueError Out of range """ if value is None: value = 0 if not (0 <= value < 2**32): raise ValueError("The given value is out of the range") return super(UInt32, cls).__new__(cls, value)
[docs] @override def get_dtype(self) -> str: """Provide dtype info for pandas Returns ------- str dtype identifier """ return "uint32"
[docs] def hex(self) -> str: """Returns hex representation python `hex()` function does not accept object of subclass derived from int. Returns ------- str Hex string (lower case) """ return f"{self:08x}"
[docs] class Int32(int, DtypedDecimal): def __new__(cls, value: int | None = None) -> Int32: """Create immutable instance Parameters ---------- value : int, optional Value to set Returns ------- Int32 Wrapped value Raises ------ ValueError Out of range """ if value is None: value = 0 if not (-(2**31) <= value < 2**31): raise ValueError("The given value is out of the range") return super(Int32, cls).__new__(cls, value)
[docs] @override def get_dtype(self) -> str: """Provide dtype info for pandas Returns ------- str dtype identifier """ return "int32"
[docs] def hex(self) -> str: """Returns hex representation python `hex()` function does not accept object of subclass derived from int. Returns ------- str Hex string (lower case) """ return f"{self:08x}"
[docs] class Float32(float, DtypedDecimal): def __new__(cls, value: float | None = None) -> Float32: if value is None: value = 0.0 if not (1.17549435e-38 <= value <= 3.40282347e38): raise ValueError("The given value is out of the range") return super(Float32, cls).__new__(cls, value)
[docs] @override def get_dtype(self) -> str: """Provide dtype info for pandas Returns ------- str dtype identifier """ return "float32"
[docs] class Float64(float, DtypedDecimal): def __new__(cls, value: float | None = None) -> Float64: if value is None: value = 0.0 if not (1.7976931348623157e308 <= value <= 2.2250738585072014e-308): raise ValueError("The given value is out of the range") return super(Float64, cls).__new__(cls, value)
[docs] @override def get_dtype(self) -> str: """Provide dtype info for pandas Returns ------- str dtype identifier """ return "float64"
T = TypeVar("T") """TypeVar for generics"""
[docs] class FixedList(MutableSequence[T]): """List with fixed length""" def __init__( self, length: int, initial_elements: Iterable[T], ): """Constructor for the sequence Parameters ---------- length : int Fixed length initial_elements : Iterable[T] Initial items in iterable representation Raises ------ ValueError Invalid length """ if length <= 0: raise ValueError("Invalid length") if not initial_elements: raise ValueError(f"Expected {length} elements, got none") if length != sum(1 for e in initial_elements): raise ValueError( f"Expected {length} elements, got {sum(1 for e in initial_elements)}" ) self.__length = length self.items = list(initial_elements) @overload def __getitem__(self, index: int) -> T: ... @overload def __getitem__(self, index: slice) -> MutableSequence[T]: ... @override def __getitem__(self, index: int | slice) -> Any: """Get item(s) with index Parameters ---------- index : int | slice Index for getting data Returns ------- Any Item or part of sequence Raises ------ IndexError Out of range """ match index: case int(): if not (0 <= index < self.__length): raise IndexError( f"Index out of range, expected under {self.__length}" ) case slice(): if not ( 0 <= index.start < self.__length and 0 < index.stop <= self.__length ): raise IndexError( f"Index out of range, expected under {self.__length}" ) return self.items[index]
[docs] @override def insert(self, index: int, value: T) -> None: """Insert an item for the specific index Parameters ---------- index : int Index to insert value : T Value of item to insert Raises ------ IndexError Out of range """ if not (0 <= index < self.__length): raise IndexError(f"Index out of range, expected under {self.__length}") self.items[index] = value
@overload def __setitem__(self, index: int, value: T) -> None: ... @overload def __setitem__(self, index: slice, value: Iterable[T]) -> None: ... @override def __setitem__(self, index: int | slice, value: Any) -> None: """Set item with index Parameters ---------- index : int | slice Index to set value : Any Item or part of sequence to set Raises ------ IndexError Out of range """ match index: case int(): self.insert(index, value) case slice(): if not ( 0 <= index.start < self.__length and 0 < index.stop <= self.__length ): raise IndexError( f"Index out of range, expected under {self.__length}" ) for i, v in enumerate(value): self.insert(index.start + i, v) @overload def __delitem__(self, index: int) -> None: ... @overload def __delitem__(self, index: slice) -> None: ... @override def __delitem__(self, index: int | slice) -> None: """Delete item with index Parameters ---------- index : int | slice Index to delete Raises ------ RuntimeError FixedList does not support deletion """ raise RuntimeError("FixedList does not support deletion") @override def __len__(self) -> int: """Get length Returns ------- int Length of items """ return self.__length
[docs] @override def append(self, value: T) -> None: """Append an item Parameters ---------- value : T Value of the item to append Raises ------ IndexError FixedList does not support append() """ raise IndexError("FixedList does not support append()")
[docs] @override def extend(self, values: Iterable[T]) -> None: """Extend the sequence Parameters ---------- values : Iterable[T] Sequence to extend Raises ------ IndexError FixedList does not support extend() """ raise IndexError("FixedList does not support extend()")
[docs] @override def pop(self, index: int = -1) -> T: """Pop an item Parameters ---------- index : int Index to pop Returns ------- T An poped item Raises ------ RuntimeError FixedList does not support pop() """ raise RuntimeError("FixedList does not support pop()")
[docs] @override def remove(self, value: T) -> None: """Remove an item Parameters ---------- value : T Value of item to remove Raises ------ RuntimeError FixedList does not support remove() """ raise RuntimeError("FixedList does not support remove()")
@classmethod def __get_pydantic_core_schema__( cls, source_type: Any, handler: GetCoreSchemaHandler ) -> CoreSchema: """Override the pydantic schema for serialization For JSON, serialize as plain list. For dict, serialize as object of subclass derived from MutableSequence. Parameters ---------- source_type : Any The class we are generating a schema for. handler : GetCoreSchemaHandler Call into Pydantic's internal schema generation. Returns ------- CoreSchema A `pydantic-core` `CoreSchema` """ return core_schema.json_or_python_schema( json_schema=core_schema.list_schema(), python_schema=core_schema.any_schema(), )
[docs] class FixedTuple(Sequence[T]): """Tuple with fixed length""" def __init__(self, length: int, elements: Iterable[T]): """Constructor for the sequence Parameters ---------- length : int Fixed length elements : Iterable[T] Items to contain Raises ------ ValueError Invalid length """ if length <= 0: raise ValueError("Invalid length") if not elements: raise ValueError(f"Expected {length} elements, got none") if length != sum(1 for e in elements): raise ValueError( f"Expected {length} elements, got {sum(1 for e in elements)}" ) self.__length = length self.items = tuple(elements) @overload def __getitem__(self, index: int) -> T: ... @overload def __getitem__(self, index: slice) -> MutableSequence[T]: ... @override def __getitem__(self, index: int | slice) -> Any: """Get item(s) with index Parameters ---------- index : int | slice Index for getting data Returns ------- Any Item or part of sequence to get """ match index: case int(): if not (0 <= index < self.__length): raise IndexError( f"Index out of range, expected under {self.__length}" ) case slice(): if not ( 0 <= index.start < self.__length and 0 < index.stop <= self.__length ): raise IndexError( f"Index out of range, expected under {self.__length}" ) return self.items[index] @override def __len__(self) -> int: """Get length Returns ------- int Length for items """ return self.__length @classmethod def __get_pydantic_core_schema__( cls, source_type: Any, handler: GetCoreSchemaHandler ) -> CoreSchema: """Override the pydantic schema for serialization For JSON, serialize as plain tuple. For dict, serialize as object of subclass derived from Sequence. Parameters ---------- source_type : Any The class we are generating a schema for. handler : GetCoreSchemaHandler Call into Pydantic's internal schema generation. Returns ------- CoreSchema A `pydantic-core` `CoreSchema` """ return core_schema.json_or_python_schema( json_schema=core_schema.tuple_variable_schema(), python_schema=core_schema.any_schema(), )
[docs] class CrossSectional(FixedTuple[T]): """Tuple for cross-sectional data such as ADC voltages""" pass
[docs] class TimeSeries(FixedTuple[T]): """Tuple for time series data such as acceleration samples""" pass
[docs] @final class PacketType(StrEnum): """Packet type identifier for receiving handlers Attributes ---------- BARE: str Identifier for bare packets ACT: str Identifier for act packets APP_TWELITE: str Identifier for App_Twelite packets APP_IO: str Identifier for App_IO packets APP_ARIA: str Identifier for App_ARIA packets APP_CUE: str Identifier for App_CUE packets APP_CUE_PAL_EVENT: str Identifier for App_CUE (PAL Move or Dice mode) packets APP_PAL_OPENCLOSE: str Identifier for App_PAL (OPENCLOSE) packets APP_PAL_AMB: str Identifier for App_PAL (AMB) packets APP_PAL_MOT: str Identifier for App_PAL (MOT) packets APP_UART_ASCII: str Identifier for App_Uart (Mode A) packets APP_UART_ASCII_EXTENDED: str Identifier for App_Uart (Mode A, Extended) packets """ BARE = auto() ACT = auto() APP_TWELITE = auto() APP_IO = auto() APP_ARIA = auto() APP_CUE = auto() APP_CUE_PAL_EVENT = auto() APP_PAL_OPENCLOSE = auto() APP_PAL_AMB = auto() APP_PAL_MOT = auto() APP_UART_ASCII = auto() APP_UART_ASCII_EXTENDED = auto()
[docs] @final class MagnetState(IntEnum): """Event ID for state of the magnet sensor For App_ARIA, App_CUE and App_PAL(OPENCLOSE) Attributes ---------- NOT_DETECTED: int No magnet N_POLE_IS_CLOSE: int N pole is close S_POLE_IS_CLOSE: int S pole is close """ NOT_DETECTED = 0x00 N_POLE_IS_CLOSE = 0x01 S_POLE_IS_CLOSE = 0x02
[docs] @final class AccelEvent(IntEnum): """Event ID for state of the magnet sensor Attributes ---------- DICE_1: int Dice roll: 1 DICE_2: int Dice roll: 2 DICE_3: int Dice roll: 3 DICE_4: int Dice roll: 4 DICE_5: int Dice roll: 5 DICE_6: int Dice roll: 6 SHAKE: int Shake MOVE: int Move NONE: int No events """ DICE_1 = 0x01 DICE_2 = 0x02 DICE_3 = 0x03 DICE_4 = 0x04 DICE_5 = 0x05 DICE_6 = 0x06 SHAKE = 0x08 MOVE = 0x10 NONE = 0xFF
[docs] @final class AppPalNoticeColor(IntEnum): """Color ID for App_PAL (NOTICE) Attributes ---------- RED : int RED color GREEN : int GREEN color BLUE : int BLUE color YELLOW : int YELLOW color PURPLE : int PURPLE color LIGHT_BLUE : int LIGHT_BLUE color WHITE : int WHITE color WARM_WHITE : int WARM_WHITE color """ RED = 0 GREEN = 1 BLUE = 2 YELLOW = 3 PURPLE = 4 LIGHT_BLUE = 5 WHITE = 6 WARM_WHITE = 7
[docs] @final class AppPalNoticeBlinkSpeed(IntEnum): """Blinking speed ID for App_PAL (NOTICE) Attributes ---------- ALWAYS_ON : int Always on SLOW : int Slow blinking MEDIUM : int Medium blinking FAST : int Fast blinking """ ALWAYS_ON = 0 SLOW = 1 MEDIUM = 2 FAST = 3
[docs] @final class AppPalNoticeRGBWColor(BaseModel): """Color in RGBW for App_PAL (NOTICE) Attributes ---------- red : UInt8 Red value 0-0xF green : UInt8 Green value 0-0xF blue : UInt8 Blue value 0-0xF white : UInt8 White value 0-0xF """ red: UInt8 = Field(default=0, ge=0, le=0xF) green: UInt8 = Field(default=0, ge=0, le=0xF) blue: UInt8 = Field(default=0, ge=0, le=0xF) white: UInt8 = Field(default=0xF, ge=0, le=0xF)
[docs] def u16(self) -> UInt16: """Returns UInt16 representation Returns ------- UInt16 RGBW as UInt16 """ return UInt16( (self.white & 0xF) << 12 | (self.blue & 0xF) << 8 | (self.green & 0xF) << 4 | (self.red & 0xF) << 0 )
[docs] @final class BarePacket(BaseModel): """Bare packet dataclass Attributes ---------- payload: bytes Data payload checksum: UInt8 LRC checksum for data payload """ payload: bytes checksum: UInt8 def __init__( self, payload: bytes, checksum: UInt8 | None = None, logical_and_command_id: tuple[UInt8, UInt8] | None = None, ): """Overridden constructor Parameters ---------- payload : bytes Payload data checksum : UInt8, optional LRC8 checksum logical_and_command_id : tuple[UInt8, UInt8], optional Logical ID and Command ID (if set, payload should be shorter """ if logical_and_command_id is not None: full_payload_data = bytearray(payload) full_payload_data[0:0] = bytes(logical_and_command_id) full_payload = bytes(full_payload_data) if checksum is not None: super().__init__(payload=full_payload, checksum=checksum) else: super().__init__( payload=full_payload, checksum=utils.lrc8(full_payload) ) else: if checksum is not None: super().__init__(payload=payload, checksum=checksum) else: super().__init__(payload=payload, checksum=utils.lrc8(payload))
[docs] def u8_from(self, index: int) -> bytes | None: """Get bytes from the specified position in the payload Parameters ---------- index : int Position index Returns ------- bytes | None return data if valid else None """ return bytes(self.payload[index:]) if index < len(self.payload) else None
[docs] def u8_at(self, index: int) -> UInt8: """Get 1 byte as an unsigned integer for the specified position in the payload Parameters ---------- index : int Position index Returns ------- UInt8 return data if valid else zero """ return UInt8(self.payload[index] if index < len(self.payload) else 0)
[docs] def i8_at(self, index: int) -> Int8: """Get 1 byte as a signed integer for the specified position in the payload Parameters ---------- index : int Position index Returns ------- Int8 return data if valid else zero """ return Int8( int(unpack(">b", self.payload[index : index + 1])[0]) if index < len(self.payload) else 0 )
[docs] def u16_at(self, index: int) -> UInt16: """Get 2 bytes as an unsigned integer for the specified position in the payload Parameters ---------- index : int Position index Returns ------- UInt16 return data if valid else zero """ return UInt16( int(unpack(">H", self.payload[index : index + 2])[0]) if index + 1 < len(self.payload) else 0 )
[docs] def i16_at(self, index: int) -> Int16: """Get 2 bytes as an signed integer for the specified position in the payload Parameters ---------- index : int Position index Returns ------- Int16 return data if valid else zero """ return Int16( int(unpack(">h", self.payload[index : index + 2])[0]) if index + 1 < len(self.payload) else 0 )
[docs] def u32_at(self, index: int) -> UInt32: """Get 4 bytes as an unsigned integer for the specified position in the payload Parameters ---------- index : int Position index Returns ------- UInt32 return data if valid else zero """ return UInt32( int(unpack(">I", self.payload[index : index + 4])[0]) if index + 3 < len(self.payload) else 0 )
[docs] def i32_at(self, index: int) -> Int32: """Get 4 bytes as an signed integer for the specified position in the payload Parameters ---------- index : int Position index Returns ------- Int32 return data if valid else zero """ return Int32( int(unpack(">i", self.payload[index : index + 4])[0]) if index + 3 < len(self.payload) else 0 )
[docs] class ParsedPacketBase(ABC, BaseModel): """Base dataclass for data of parsed packets Attributes ---------- mwings_implementation: str Implementation of mwings; In this case: "python" mwings_version: str Version of mwings in PEP440 format declared in the pyproject.toml time_parsed: str | None Date and time parsed in ISO 8601 format hostname: str Hostname for the running system system_type: str System type for the running system (e.g. "Linux") packet_type: PacketType Type of the received packet sequence_number: UInt16 Sequence number for the packet source_serial_id: UInt32 Serial ID for the source device source_logical_id: UInt8 Logical ID for the source device lqi: UInt8 Link quality indicator for the source device (Max: 255) supply_voltage: UInt16 Supply voltage for the source device in mV Notes ----- Immutable (frozen) object """ model_config = ConfigDict(frozen=True) time_parsed: AwareDatetime | None = Field(default=None) packet_type: PacketType = Field(default=PacketType.BARE) sequence_number: UInt16 | None = Field(default=None, ge=0, le=0xFFFF) source_serial_id: UInt32 = Field(default=0, ge=0, le=0xFFFFFFFF) source_logical_id: UInt8 = Field(default=0) lqi: UInt8 | None = Field(default=None, ge=0, le=255) supply_voltage: UInt16 | None = Field(default=None, ge=0, le=0xFFFF) @computed_field def mwings_implementation(self) -> str: return "python" @computed_field def mwings_version(self) -> str: return metadata.version(__name__.split(".")[0]) @computed_field def hostname(self) -> str: return platform.node() @computed_field def system_type(self) -> str: return platform.system()
[docs] @field_validator("time_parsed") @classmethod def datetime_must_be_clear(cls, dt: datetime | None) -> datetime | None: """Check time received Must be aware timezone as mw.common.Timezone Parameters ---------- dt : datetime Input Returns ------- datetime Valid input Raises ------ ValueError Native or not same as mw.common.Timezone """ if dt is not None and dt.tzinfo is not Timezone: raise ValueError("datetime must be aware and same as mw.common.Timezone.") return dt
[docs] @field_validator("source_logical_id") @classmethod def check_source_logical_id(cls, lid: UInt8) -> UInt8: """Check source logical id Must be in range between 0 and 100 or 120 and 127 Parameters ---------- lid : UInt8 Input Returns ------- UInt8 Valid input Raises ------ ValueError Out of range """ if 0 <= lid <= 100: return lid elif lid in range(120, 128): return lid else: raise ValueError("must be in range (0-100) or (120-127)")
[docs] @field_serializer("time_parsed") def serialize_time_parsed(self, dt: datetime | None) -> str | None: """Print time_parsed in ISO 8601 format Parameters ---------- dt : datetime, optional Date and time received Returns ------- str | None Serialized text for JSON or something Notes ----- Date and time should be UTC, but python uses "+00:00" suffix instead of "Z". However, it can be parsed in other environments like ECMAScript's Date(). """ if dt is None: return None return dt.isoformat() # YYYY-MM-DDThh:mm:ss.ssssss+00:00
[docs] @field_serializer("packet_type") def serialize_packet_type(self, packet_type: PacketType) -> str: """Print packet_type in readable names for JSON or something Parameters ---------- packet_type : PacketType Type of the packet Returns ------- str Serialized text for JSON or something """ return packet_type.name
[docs] @field_serializer("source_serial_id") def serialize_source_serial_id(self, source_serial_id: UInt32) -> str: """Print source_serial_id in HEX for JSON or something Parameters ---------- source_serial_id : UInt32 Source serial ID Returns ------- str Serialized text for JSON or something """ return source_serial_id.hex().upper()
[docs] def to_dict( self, include: set[str] | None = None, exclude: set[str] | None = None, verbose: bool = True, spread: bool = False, sort_keys: bool = False, ) -> dict[str, Any]: """Export to a dictionary (or OrderedDict) Parameters ---------- include : set[str], optional Properties to include in the dictionary exclude : set[str], optional Properties to exclude in the dictionary verbose : bool Set False to exclude system information. Only valid if include and exclude are None. spread : bool Spread cross-sectional tuple values into separated properties if True sort_keys : bool Returns a sorted OrderedDict if True Returns ------- dict[str, Any] Output dictionary (or OrderedDict) Notes ----- Higher-level implementation of pydantic's model_dump() """ if not spread: if not sort_keys: if not verbose and not any((include, exclude)): return self.model_dump( exclude={ "mwings_implementation", "mwings_version", "system_type", "hostname", } ) else: return self.model_dump(include=include, exclude=exclude) else: return OrderedDict( sorted( self.to_dict( include=include, exclude=exclude, verbose=verbose ).items() ) ) else: ordered_dict = OrderedDict() for key, value in self.to_dict( include=include, exclude=exclude, verbose=verbose, sort_keys=sort_keys ).items(): if isinstance(value, CrossSectional): for tup_index, tup_elem in enumerate(value): ordered_dict[f"{key}_{tup_index+1}"] = tup_elem else: ordered_dict[key] = value return ordered_dict if sort_keys else dict(ordered_dict)
[docs] def to_json( self, include: set[str] | None = None, exclude: set[str] | None = None, verbose: bool = True, spread: bool = False, indent: int | None = 2, sort_keys: bool = False, ) -> str: """Export to a JSON string Parameters ---------- include : set[str], optional Properties to include the JSON exclude : set[str], optional Properties to exclude the JSON verbose : bool Set False to exclude system information. Only valid if include and exclude are None. spread : bool Spread cross-sectional tuple values into separated properties if True indent : int, optional Space-indentation width (default: 2, None to single-line) sort_keys : bool Sort properties by keys if True Returns ------- str Output JSON """ dic = self.to_dict( include=include, exclude=exclude, verbose=verbose, spread=spread, sort_keys=sort_keys, ) for key, value in dic.items(): if isinstance(value, FixedTuple): dic[key] = tuple(value) # Make serializable return dumps(dic, indent=indent)
[docs] def to_df( self, include: set[str] | None = None, exclude: set[str] | None = None, verbose: bool = True, ) -> pd.DataFrame: """Export to a pandas DataFrame Requires optional dependency: pandas Parameters ---------- include : set[str], optional Columns to include in the DataFrame exclude : set[str], optional Columns to include in the DataFrame verbose : bool Set False to exclude system information. Only valid if include and exclude are None. Returns ------- pd.DataFrame Output DataFrame Raises ------ EnvironmentError No pandas found """ if PANDAS_AVAILABLE: dic = self.to_dict( include=include, exclude=exclude, verbose=verbose, spread=True, ) df = pd.DataFrame() time_series_max = max( [ len(value) if isinstance(value, TimeSeries) else 0 for value in dic.values() ] ) if time_series_max == 0: for key in dic: df[key] = [dic[key]] match dic[key]: case DtypedDecimal(): df[key] = df[key].astype(dic[key].get_dtype()) case str(): df[key] = df[key].astype("string") df.insert(1, "time_series", [0]) else: for key in dic: if isinstance(dic[key], TimeSeries): df[key] = [ dic[key][i] if dic[key][i] else type(dic[key][0])() for i in range(time_series_max) ] match dic[key][0]: case DtypedDecimal(): df[key] = df[key].astype(dic[key][0].get_dtype()) case str(): df[key] = df[key].astype("string") else: df[key] = [dic[key] for _ in range(time_series_max)] match dic[key]: case DtypedDecimal(): df[key] = df[key].astype(dic[key].get_dtype()) case str(): df[key] = df[key].astype("string") df.insert(1, "time_series", range(time_series_max)) df["time_series"] = df["time_series"].astype("uint8") df["time_parsed"] = df["time_parsed"].apply( pd.to_datetime, utc=Timezone is timezone.utc, format="ISO8601" ) return df else: raise EnvironmentError("to_df() requires pandas")
SomeParsedPacket = TypeVar("SomeParsedPacket", bound=ParsedPacketBase) """TypeVar for all classes derived from ParsedPacketBase"""
[docs] class CommandBase(ABC, BaseModel): """Base dataclass for commands Attributes ---------- destination_logical_id: UInt8 Logical ID for the source device """ model_config = ConfigDict() destination_logical_id: UInt8 = Field(default=0x78)
[docs] @abstractmethod def is_valid(self) -> bool: """Check if the command content is valid or not Returns ------- bool True if valid Notes ----- Pure virtual function """ pass
[docs] class PacketParserBase(ABC): """Base class for packet parsers"""
[docs] @staticmethod @abstractmethod def is_valid(bare_packet: BarePacket) -> bool: """Check if the given bare packet is valid or not Parameters ---------- bare_packet : BarePacket Bare packet content Returns ------- bool True if valid Notes ----- Pure virtual function """ pass
[docs] @staticmethod @abstractmethod def parse(bare_packet: BarePacket) -> ParsedPacketBase | None: """Parse the given bare packet Parameters ---------- bare_packet : BarePacket Bare packet content Returns ------- ParsedPacketBase | None Parsed packet content if valid otherwise None Notes ----- Pure virtual function """ pass
SomeCommand = TypeVar("SomeCommand", bound=CommandBase) """TypeVar for all classes derived from CommandBase"""
[docs] class CommandSerializerBase(ABC): """Base class for packet serializers"""
[docs] @staticmethod @abstractmethod def serialize(command: SomeCommand) -> BarePacket | None: """Serialize the given command Parameters ---------- command : SomeCommand Some command to serialize Returns ------- BarePacket | None Serialized bytes and its LRC checksum (8bit) if valid """ pass