# -*- coding:utf-8 -*-
# Written for Python 3.12
# Formatted with Black
# Packet parser for App_PAL (MOT, accel)
from datetime import datetime
from typing import Any, final
from overrides import override
from pydantic import Field, field_validator
from .. import common
[docs]
@final
class ParsedPacket(common.ParsedPacketBase):
"""Dataclass for parsed packets from App_PAL (MOT)
Attributes
----------
ai1_voltage: common.UInt16
Voltage for AI1 port in mV
sample_count: common.UInt8
Number of accel samples
samples_x: common.TimeSeries[common.Int16]
Accel samples for x axis
samples_y: common.TimeSeries[common.Int16]
Accel samples for y axis
samples_z: common.TimeSeries[common.Int16]
Accel samples for z axis
sampling_frequency: UInt16
Sampling frequency in Hz
"""
ai1_voltage: common.UInt16 = Field(default=0, ge=0, le=3700)
sample_count: common.UInt8 = Field(default=16, ge=16, le=16)
samples_x: common.TimeSeries[common.Int16] = Field(
default=common.TimeSeries[common.Int16](
16, [common.Int16(0) for _ in range(16)]
)
)
samples_y: common.TimeSeries[common.Int16] = Field(
default=common.TimeSeries[common.Int16](
16, [common.Int16(0) for _ in range(16)]
)
)
samples_z: common.TimeSeries[common.Int16] = Field(
default=common.TimeSeries[common.Int16](
16, [common.Int16(0) for _ in range(16)]
)
)
sampling_frequency: common.UInt16 = Field(default=25)
[docs]
@field_validator("sampling_frequency")
@classmethod
def check_sampling_frequency(cls, freq: int) -> int:
"""Check for sampling_frequency
Parameters
----------
freq : int
Input
Returns
-------
int
Valid input
Raises
------
ValueError
Raise if the specified frequency is not supported
"""
if freq not in set(
[
25,
50,
100,
190,
380,
750,
1100,
1300,
]
):
raise ValueError("not supported")
return freq
[docs]
@final
class PacketParser(common.PacketParserBase):
"""Packet parser for App_PAL (MOT)"""
[docs]
@staticmethod
@override
def is_valid(bare_packet: common.BarePacket) -> bool:
"""Check the given bare packet is valid or not
Parameters
----------
bare_packet : common.BarePacket
Bare packet content
Returns
-------
bool
True if valid
Notes
-----
Static overridden method
"""
if (
(bare_packet.u8_at(0) & 0x80) == 0x80
and (bare_packet.u8_at(7) & 0x80) == 0x80
and bare_packet.u8_at(12) == 0x80
and bare_packet.u8_at(13) == 0x83
and len(bare_packet.payload) == 188
):
return True
return False
[docs]
@staticmethod
@override
def parse(bare_packet: common.BarePacket) -> ParsedPacket | None:
"""Try to parse the given bare packet
Parameters
----------
bare_packet : common.BarePacket
Bare packet content
Returns
-------
ParsedPacket | None
Parsed packet data if valid else None
Notes
-----
Static overridden method
"""
if not PacketParser.is_valid(bare_packet):
return None
frequencies = common.FixedTuple[common.UInt16](
8,
[
common.UInt16(25),
common.UInt16(50),
common.UInt16(100),
common.UInt16(190),
common.UInt16(380),
common.UInt16(750),
common.UInt16(1100),
common.UInt16(1300),
],
)
parsed_packet_data: dict[str, Any] = {
"time_parsed": datetime.now(common.Timezone),
"packet_type": common.PacketType.APP_PAL_MOT,
"sequence_number": bare_packet.u16_at(5),
"source_serial_id": bare_packet.u32_at(7),
"source_logical_id": bare_packet.u8_at(11),
"lqi": bare_packet.u8_at(4),
"supply_voltage": bare_packet.u16_at(19),
"ai1_voltage": bare_packet.u16_at(25),
"sample_count": 16,
"samples_x": common.TimeSeries[common.Int16](
16,
[bare_packet.i16_at(31 + (10 * index) + 0) for index in range(16)],
),
"samples_y": common.TimeSeries[common.Int16](
16,
[bare_packet.i16_at(31 + (10 * index) + 2) for index in range(16)],
),
"samples_z": common.TimeSeries[common.Int16](
16,
[bare_packet.i16_at(31 + (10 * index) + 4) for index in range(16)],
),
"sampling_frequency": frequencies[bare_packet.u8_at(29) >> 5],
}
return ParsedPacket(**parsed_packet_data)