Source code for mwings.parsers.app_twelite

# -*- coding:utf-8 -*-
# Written for Python 3.12
# Formatted with Black

# Packet parser for App_Twelite

from datetime import datetime
from typing import Any, final

from overrides import override
from pydantic import Field, field_validator

from .. import common


[docs] @final class ParsedPacket(common.ParsedPacketBase): """Dataclass for parsed packets from App_Twelite Attributes ---------- destination_logical_id: common.UInt8 Logical ID for the destination (parent) device relay_count: common.UInt8 Number of relay stations periodic: bool True if the packet is a periodic transmit packet di_changed: common.CrossSectional[bool] State for each digital interfaces; True if changed di_state: common.CrossSectional[bool] Input status for each digital interfaces ai_voltage: common.CrossSectional[common.UInt16] Voltage in mV for each analog interfaces """ destination_logical_id: common.UInt8 = Field(default=0x78) relay_count: common.UInt8 = Field(default=0, ge=0, le=3) periodic: bool = Field(default=False) di_changed: common.CrossSectional[bool] = Field( default=common.CrossSectional[bool](4, [False for _ in range(4)]) ) di_state: common.CrossSectional[bool] = Field( default=common.CrossSectional[bool](4, [False for _ in range(4)]) ) ai_voltage: common.CrossSectional[common.UInt16] = Field( default=common.CrossSectional[common.UInt16]( 4, [common.UInt16(0) for _ in range(4)] ) )
[docs] @field_validator("destination_logical_id") @classmethod def check_destination_logical_id(cls, lid: int) -> int: """Check destination lid Must be in range between 0 and 100 or 120 and 127 but 122 (Router) is invalid Parameters ---------- lid : int Input Returns ------- int Valid input Raises ------ ValueError Out of range """ if 0 <= lid <= 100: return lid elif lid in range(120, 128) and lid != 122: return lid else: raise ValueError("must be in range (0-100) or (120-121/123-127)")
[docs] @field_validator("ai_voltage") @classmethod def check_ai_voltage( cls, aiv: common.CrossSectional[int] ) -> common.CrossSectional[int]: """Check voltage of analog interfaces Must be in range between 0 and 3700 (VCCmax3600+margin100) Parameters ---------- aiv : common.CrossSectional Input Returns ------- common.CrossSectional Valid input Raises ------ ValueError Out of range """ for voltage in aiv: if voltage < 0 or voltage > 3700: raise ValueError("Out of range") return aiv
[docs] @final class PacketParser(common.PacketParserBase): """Packet parser for App_Twelite"""
[docs] @staticmethod @override def is_valid(bare_packet: common.BarePacket) -> bool: """Check the given bare packet is valid or not Parameters ---------- bare_packet : common.BarePacket Bare packet content Returns ------- bool True if valid Notes ----- Static overridden method """ if ( bare_packet.u8_at(1) == 0x81 and bare_packet.u8_at(3) == 0x01 and (bare_packet.u8_at(5) & 0x80) == 0x80 and len(bare_packet.payload) == 23 ): return True return False
[docs] @staticmethod @override def parse(bare_packet: common.BarePacket) -> ParsedPacket | None: """Try to parse the given bare packet Parameters ---------- bare_packet : common.BarePacket Bare packet content Returns ------- ParsedPacket | None Parsed packet data if valid else None Notes ----- Static overridden method """ if not PacketParser.is_valid(bare_packet): return None parsed_packet_data: dict[str, Any] = { "time_parsed": datetime.now(common.Timezone), "packet_type": common.PacketType.APP_TWELITE, "sequence_number": bare_packet.u16_at(10), # In this, timestamp "source_serial_id": bare_packet.u32_at(5), "source_logical_id": bare_packet.u8_at(0), "lqi": bare_packet.u8_at(4), "supply_voltage": bare_packet.u16_at(13), "destination_logical_id": bare_packet.u8_at(9), "relay_count": bare_packet.u8_at(12), "periodic": ((bare_packet.u8_at(16) & 0x80) == 0x80), "di_state": common.CrossSectional[bool]( 4, [bool(bare_packet.u8_at(16) & (1 << port)) for port in range(4)], ), "di_changed": common.CrossSectional[bool]( 4, [bool(bare_packet.u8_at(17) & (1 << port)) for port in range(4)], ), "ai_voltage": common.CrossSectional[common.UInt16]( 4, [ common.UInt16( min( bare_packet.u8_at(18 + port) * 16 + ((bare_packet.u8_at(22) >> (port * 2)) & 0x03) * 4, 2000, ) ) for port in range(4) ], ), } return ParsedPacket(**parsed_packet_data)