Source code for mwings

# -*- coding:utf-8 -*-
# Written for Python 3.12
# Formatted with Black

# MWings

from enum import IntEnum, auto
from threading import Thread, Event
from datetime import timezone, tzinfo
from typing import Any, Callable, overload, final
from warnings import warn

import serial  # type: ignore
from pyee.base import EventEmitter
from overrides import override

from . import common
from . import utils
from . import parsers
from . import serializers


[docs] @final class Twelite(Thread): """MWings main class""" # Private inner enum classes @final class __State(IntEnum): """Parser state Attributes ---------- WAITING_FOR_HEADER: int Waiting for the header character ':' RETRIEVING_PAYLOAD: int Retrieving packet payload data WAITING_FOR_FOOTER: int Waiting for the footer character <LF> COMPLETED: int Completed to receive a packet CHECKSUM_ERROR: int Error in checksum UNKNOWN_ERROR: int Unknown error """ WAITING_FOR_HEADER = auto() RETRIEVING_PAYLOAD = auto() WAITING_FOR_FOOTER = auto() COMPLETED = auto() UNKNOWN_ERROR = auto() CHECKSUM_ERROR = auto() TIMEOUT_ERROR = auto() @final class __Command(IntEnum): """Revised 0xDB commands Attributes ---------- ACK: int Revised 0xDB command: Ack MODULE_ADDRESS: int Revised 0xDB command: Module address SET_PARAMETER: int Revised 0xDB command: Set parameter GET_PARAMETER: int Revised 0xDB command: Get parameter CONTROL: int Revised 0xDB command: Control module DISABLE_SILENT_MODE: int Revised 0xDB command: Disable silent mode CLEAR: int Revised 0xDB command: Clear settings SAVE: int Revised 0xDB command: Save settings RESET: int Revised 0xDB command: Reset module """ ACK = 0xD0 MODULE_ADDRESS = 0xD1 SET_PARAMETER = 0xD2 GET_PARAMETER = 0xD3 CONTROL = 0xD8 DISABLE_SILENT_MODE = 0xD9 CLEAR = 0xDD SAVE = 0xDE RESET = 0xDF @final class __Parameter(IntEnum): """Parameter for commands: set/get parameter Attributes --------- APP_ID: int Application ID CH_MASK: int Channels bit mask RETRY_TX: int Retry count / Tx power ROUTING_LAYER: int Routing layer AP_ADDRESS: int Access point address UART_BAUDRATE: int Uart baudrate ENCRYPTION: int Encryption settings OPTION_BITS: int Option bits """ APP_ID = 0x01 CH_MASK = 0x02 RETRY_TX = 0x03 ROUTING_LAYER = 0x04 AP_ADDRESS = 0x05 UART_BAUDRATE = 0x06 ENCRYPTION = 0x07 OPTION_BITS = 0x08 @final class __SavingStatus(IntEnum): """Return status during saving parameters Attributes ---------- SUCCEEDED: int Succeeded FAILED: int Failed SUCCEEDED_NO_MODIFICATIONS: int Succeeded, but no modifications FAILED_NO_MODIFICATIONS: int Failed, but no modifications """ SUCCEEDED = 0x01 FAILED = 0x00 SUCCEEDED_NO_MODIFICATIONS = 0x81 FAILED_NO_MODIFICATIONS = 0x80 # Private varibles # pyserial instance __serial: Any = None # Rx binary data buffer __buffer: bytearray # Max size of the rx buffer __rx_buffer_size: int # Number of received ASCII characters __character_count: int # LRC checksum for the latest received packet __checksum: int # Timeout for each packet in milliseconds __timeout: int # Timestamp of the last time the header was received __latest_timestamp: int # Print debug info if True __debugging: bool # Parser state __state: __State # Event emitter for receiving events __event_emitter: EventEmitter # Status for the receiver thread __running: bool # Set to ensure the receiver thread has stopped __ensure_stopped: Event # Public methods def __init__( self, port: str | None = None, rx_buffer_size: int = 1024, timeout: int = 100, tz: tzinfo | None = None, debugging: bool = False, ): """Constructor Parameters ---------- port : str | None Name for the serial port to use / set None to disable serial rx_buffer_size : int Receive buffer size timeout : int Timeout for each packet in milliseconds tz : tzinfo Timezone for datetime data. Default is UTC (Aware). Use ZoneInfo() for others. debugging : bool Print debug info if true """ super().__init__() self.daemon = False if port is not None: try: self.__serial = serial.Serial(port, 115200, timeout=1) except serial.serialutil.SerialException: raise IOError("Specified port is busy or not available") self.__buffer = bytearray() self.__rx_buffer_size = rx_buffer_size self.__character_count = 0 self.__checksum = 0 self.__timeout = timeout self.__latest_timestamp = -1 self.__debugging = debugging self.__state = self.__State.WAITING_FOR_HEADER self.__event_emitter = EventEmitter() self.__running = False self.__ensure_stopped = Event() common.Timezone = tz if tz is not None else timezone.utc def __del__(self) -> None: """Destructor""" if self.__serial is not None: if self.__running: self.stop() if self.__serial.is_open: self.__serial.close()
[docs] @staticmethod def set_timezone(tz: tzinfo | None) -> None: """Set timezone for received data Parameters ---------- tz : tzinfo tzinfo object. Typically Zoneinfo("IANA/City"). None for UTC. """ common.Timezone = tz if tz is not None else timezone.utc
@property def timezone(self) -> tzinfo: """Get timezone set Returns ------- tzinfo Timezone set for mwings """ return common.Timezone @overload def add_listener( self, event: common.PacketType, handler: Callable[[common.BarePacket], None] ) -> None: """Register a handler for receiving packets Parameters ---------- event : common.PacketType Identifier for packets to receive handler : Callable[[common.BarePacket], None] Handler to handle bare packets """ ... @overload def add_listener( self, event: common.PacketType, handler: Callable[[common.SomeParsedPacket], None], ) -> None: """Register a handler for receiving packets Parameters ---------- event : common.PacketType Identifier for packets to receive handler : Callable[[common.SomeParsedPacket], None] Handler to handle some parsed packets """ ...
[docs] def add_listener( self, event: common.PacketType, handler: common.SomeCallable ) -> None: """Register a handler for receiving packets Parameters ---------- event : common.PacketType Identifier for packets to receive handler : common.SomeCallable Handler to handle packets """ self.__event_emitter.add_listener(event, handler)
[docs] def on( self, event: common.PacketType ) -> Callable[[common.SomeCallable], common.SomeCallable]: """Generate a decorator to register a handler for receiving packets Parameters ---------- event : common.PacketType Identifier for packets to receive Returns ------- Callable[[common.SomeCallable], common.SomeCallable] Decorator to register a handler for receiving packets """ def decorator(handler: common.SomeCallable) -> common.SomeCallable: """Decorator to register a handler for receiving packets Parameters ---------- handler : common.SomeCallable Original handler for receiving packets Returns ------- common.SomeCallable Decorated handler for receiving packets (Actually, it's same) """ self.add_listener(event, handler) return handler return decorator
@overload def send(self, data: common.BarePacket) -> bool: """Send data to the device with ModBus format Parameters ---------- data : common.BarePacket Payload and checksum data to send Returns ------- bool True if succeeded """ ... @overload def send(self, data: common.SomeCommand) -> bool: """Send data to the device with ModBus format Parameters ---------- data : common.SomeCommand Some command to serialize and send Returns ------- bool True if succeeded """ ...
[docs] def send(self, data: Any) -> bool: """Send data to the device with ModBus format Parameters ---------- data : Any Data to send Returns ------- bool True if succeeded """ if self.__serial is None: raise RuntimeError("send() can only be used when the port is initialized") if not utils.is_writable(self.__serial): return False match data: case common.BarePacket(): utils.write_binary(self.__serial, ord(":")) utils.write_in_ascii(self.__serial, data.payload) utils.write_in_ascii(self.__serial, data.checksum) utils.write_binary(self.__serial, ord("\r")) utils.write_binary(self.__serial, ord("\n")) if self.__debugging: print(f"Sent ascii: {data.payload.hex()}") case serializers.app_twelite.Command(): serialized_data = serializers.app_twelite.CommandSerializer.serialize( data ) if serialized_data is None: return False else: self.send(serialized_data) case serializers.app_io.Command(): serialized_data = serializers.app_io.CommandSerializer.serialize(data) if serialized_data is None: return False else: self.send(serialized_data) case serializers.app_pal_notice.Command(): serialized_data = ( serializers.app_pal_notice.CommandSerializer.serialize(data) ) if serialized_data is None: return False else: self.send(serialized_data) case serializers.app_pal_notice_detailed.Command(): serialized_data = ( serializers.app_pal_notice_detailed.CommandSerializer.serialize( data ) ) if serialized_data is None: return False else: self.send(serialized_data) case serializers.app_pal_notice_event.Command(): serialized_data = ( serializers.app_pal_notice_event.CommandSerializer.serialize(data) ) if serialized_data is None: return False else: self.send(serialized_data) case serializers.app_uart_ascii.Command(): serialized_data = ( serializers.app_uart_ascii.CommandSerializer.serialize(data) ) if serialized_data is None: return False else: self.send(serialized_data) return True
[docs] @override def start(self) -> None: """Start the thread to receive continuously Notes ----- Overrides threading.Thread.start() """ if self.__serial is None: raise RuntimeError("start() can only be used when the port is initialized") self.__running = True super().start()
[docs] @override def run(self) -> None: """Run the thread to receive continuously Call this function via Twelite.start() Notes ----- Overrides threading.Thread.run() """ while self.__running: self.update() self.__ensure_stopped.set()
[docs] def stop(self) -> None: """Stop the thread to receive continuously""" if self.__serial is None: raise RuntimeError("stop() can only be used when the port is initialized") self.__ensure_stopped.clear() self.__running = False self.__ensure_stopped.wait()
[docs] def receive(self) -> common.PacketType: """Wait for parsing of a single packet Returns ------- common.PacketType Identifier for packet received Notes ----- This function blocks current thread """ if self.__serial is None: raise RuntimeError( "receive() can only be used when the port is initialized" ) while True: if (packet_type := self.update()) is not None: return packet_type
[docs] def update(self) -> common.PacketType | None: """Update parsing state with serial data Returns ------- common.PacketType | None Returns packet type identifier if available else None """ if self.__serial is None: raise RuntimeError("update() can only be used when the port is initialized") # Abort if the serial is not initialized if not utils.is_initialized(self.__serial): return None # Process all byte in the buffer while utils.is_readable(self.__serial): # Parse a read character result = self.__parse(int.from_bytes(self.__serial.read())) if result is not None: # When parsed a complete packet return result return None
[docs] def parse_line(self, line: str, use_lf: bool = True) -> common.PacketType | None: """Parse a single string Useful when use local log file instead of serial port rx Parameters ---------- line : str String line use_lf : bool Use LF instead of CRLF (For f.readline, set True as default) Returns ------- common.PacketType | None Latest packet type identifier if available else None Raises ------ RuntimeError Serial IS initialized """ if self.__serial is not None: raise RuntimeError( "parse_line() can only be used when the port is NOT initialized" ) result: common.PacketType | None = None for character in line.lstrip(): # Parse a read character latest_result = self.__parse(character, use_lf) if latest_result is not None: # When parsed a complete packet result = latest_result return result
@overload def parse(self, character: str, use_lf: bool = False) -> common.PacketType | None: ... @overload def parse(self, character: bytes, use_lf: bool = False) -> common.PacketType | None: ... @overload def parse(self, character: int, use_lf: bool = False) -> common.PacketType | None: ...
[docs] def parse(self, character: Any, use_lf: bool = False) -> common.PacketType | None: """Parse a single character Parameters ---------- character : Any character to parse use_lf : bool If use LF instead of CRLF, set True Returns ------- common.PacketType | None If completed, returns packet type identifier Raises ------ RuntimeError Serial IS initialized ValueError Unsupported character """ if self.__serial is not None: raise RuntimeError( "parse() can only be used when the port is NOT initialized" ) return self.__parse(character, use_lf)
# Private method(s) @overload def __parse(self, character: str, use_lf: bool = False) -> common.PacketType | None: ... @overload def __parse( self, character: bytes, use_lf: bool = False ) -> common.PacketType | None: ... @overload def __parse(self, character: int, use_lf: bool = False) -> common.PacketType | None: ... def __parse(self, character: Any, use_lf: bool = False) -> common.PacketType | None: """Parse a single character Parameters ---------- character : Any character to parse use_lf : bool If use LF instead of CRLF, set True Returns ------- common.PacketType | None If completed, returns packet type identifier Raises ------ ValueError Unsupported character """ match character: case str(): if len(character) != 1: raise ValueError("character must be single length") character_bytes: bytes try: character_bytes = character.encode("ascii") except UnicodeEncodeError: raise ValueError("character must bpe ASCII") return self.__parse(character_bytes, use_lf) case bytes(): if len(character) != 1: raise ValueError("character must be single length") return self.__parse(character[0], use_lf) case int(): # Abort if the read byte is invalid if not (0 <= character <= 0xFF): return None if self.__debugging: pass # print(repr(chr(character))) # Process packet contents upon parsing completion if (bare_packet := self.__process_ascii(character, use_lf)) is not None: # Emit events here # Bare packet handler self.__event_emitter.emit(common.PacketType.BARE, bare_packet) # Act packet handler if parsers.act.PacketParser.is_valid(bare_packet): if ( self.__event_emitter.emit( common.PacketType.ACT, parsers.act.PacketParser.parse(bare_packet), ) is not True and self.__debugging ): warn( f"No handler(s) registered for {common.PacketType.ACT}" ) return common.PacketType.ACT # App_Twelite packet handler if parsers.app_twelite.PacketParser.is_valid(bare_packet): if ( self.__event_emitter.emit( common.PacketType.APP_TWELITE, parsers.app_twelite.PacketParser.parse(bare_packet), ) is not True and self.__debugging ): warn( f"No handler(s) registered for {common.PacketType.APP_TWELITE}" ) return common.PacketType.APP_TWELITE # App_Io packet handler if parsers.app_io.PacketParser.is_valid(bare_packet): if ( self.__event_emitter.emit( common.PacketType.APP_IO, parsers.app_io.PacketParser.parse(bare_packet), ) is not True and self.__debugging ): warn( f"No handler(s) registered for {common.PacketType.APP_IO}" ) return common.PacketType.APP_IO # App_ARIA packet handler if parsers.app_aria.PacketParser.is_valid(bare_packet): if ( self.__event_emitter.emit( common.PacketType.APP_ARIA, parsers.app_aria.PacketParser.parse(bare_packet), ) is not True and self.__debugging ): warn( f"No handler(s) registered for {common.PacketType.APP_ARIA}" ) return common.PacketType.APP_ARIA # App_CUE packet handler if parsers.app_cue.PacketParser.is_valid(bare_packet): if ( self.__event_emitter.emit( common.PacketType.APP_CUE, parsers.app_cue.PacketParser.parse(bare_packet), ) is not True and self.__debugging ): warn( f"No handler(s) registered for {common.PacketType.APP_CUE}" ) return common.PacketType.APP_CUE # App_CUE (PAL Move or Dice mode) packet handler if parsers.app_cue_pal_event.PacketParser.is_valid(bare_packet): if ( self.__event_emitter.emit( common.PacketType.APP_CUE_PAL_EVENT, parsers.app_cue_pal_event.PacketParser.parse( bare_packet ), ) is not True and self.__debugging ): warn( f"No handler(s) registered for {common.PacketType.APP_CUE_PAL_EVENT}" ) return common.PacketType.APP_CUE_PAL_EVENT # App_PAL (OPENCLOSE) packet handler if parsers.app_pal_openclose.PacketParser.is_valid(bare_packet): if ( self.__event_emitter.emit( common.PacketType.APP_PAL_OPENCLOSE, parsers.app_pal_openclose.PacketParser.parse( bare_packet ), ) is not True and self.__debugging ): warn( f"No handler(s) registered for {common.PacketType.APP_PAL_OPENCLOSE}" ) return common.PacketType.APP_PAL_OPENCLOSE # App_PAL (AMB) packet handler if parsers.app_pal_amb.PacketParser.is_valid(bare_packet): if ( self.__event_emitter.emit( common.PacketType.APP_PAL_AMB, parsers.app_pal_amb.PacketParser.parse(bare_packet), ) is not True and self.__debugging ): warn( f"No handler(s) registered for {common.PacketType.APP_PAL_AMB}" ) return common.PacketType.APP_PAL_AMB # App_PAL (MOT) packet handler if parsers.app_pal_mot.PacketParser.is_valid(bare_packet): if ( self.__event_emitter.emit( common.PacketType.APP_PAL_MOT, parsers.app_pal_mot.PacketParser.parse(bare_packet), ) is not True and self.__debugging ): warn( f"No handler(s) registered for {common.PacketType.APP_PAL_MOT}" ) return common.PacketType.APP_PAL_MOT # App_Uart (Mode A) packet handler if parsers.app_uart_ascii.PacketParser.is_valid(bare_packet): if ( self.__event_emitter.emit( common.PacketType.APP_UART_ASCII, parsers.app_uart_ascii.PacketParser.parse(bare_packet), ) is not True and self.__debugging ): warn( f"No handler(s) registered for {common.PacketType.APP_UART_ASCII}" ) return common.PacketType.APP_UART_ASCII # App_Uart (Mode A, Extended) packet handler if parsers.app_uart_ascii_extended.PacketParser.is_valid( bare_packet ): if ( self.__event_emitter.emit( common.PacketType.APP_UART_ASCII_EXTENDED, parsers.app_uart_ascii_extended.PacketParser.parse( bare_packet ), ) is not True and self.__debugging ): warn( f"No handler(s) registered for {common.PacketType.APP_UART_ASCII_EXTENDED}" ) return common.PacketType.APP_UART_ASCII_EXTENDED return common.PacketType.BARE return None def __process_ascii( self, character: int, use_lf: bool = False ) -> common.BarePacket | None: """Process an ascii character Parameters ---------- character : int ASCII code for the received character use_lf : bool If use LF instead of CRLF, set True Returns ------- common.BarePacket | None Complete bare packet if received the whole packet else None """ # Reset if the state is error or completed if ( self.__state == self.__State.COMPLETED or self.__state == self.__State.UNKNOWN_ERROR or self.__state == self.__State.CHECKSUM_ERROR or self.__state == self.__State.TIMEOUT_ERROR ): self.__state = self.__State.WAITING_FOR_HEADER # Reset on timeout if ( self.__timeout > 0 and self.__state != self.__State.WAITING_FOR_HEADER and utils.millis() - self.__latest_timestamp > self.__timeout ): self.__state = self.__State.TIMEOUT_ERROR if self.__debugging: print("TIMEOUT_ERROR\n") # Run state machine match self.__state: case self.__State.WAITING_FOR_HEADER: # If the character is colon, start to read if character == ord(":"): self.__state = self.__State.RETRIEVING_PAYLOAD self.__latest_timestamp = utils.millis() self.__character_count = 0 self.__checksum = 0 self.__buffer = bytearray() case self.__State.RETRIEVING_PAYLOAD: if (character >= ord("0") and character <= ord("9")) or ( character >= ord("A") and character <= ord("F") ): # Valid hex character # Abort if the buffer is overflowing if ( utils.byte_count_from(self.__character_count) >= self.__rx_buffer_size ): self.__state = self.__State.UNKNOWN_ERROR if self.__debugging: print("OVERFLOW ERROR\n") # Convert character to hex hex_value: int = utils.hex_from(character) # Get an index for the new byte newByteIndex: int = ( utils.byte_count_from(self.__character_count) + 1 - 1 ) # next position, but zero origin # Add byte if self.__character_count & 1: # Odd: set 0-3 bit of the new byte self.__buffer[newByteIndex] = ( self.__buffer[newByteIndex] & 0xF0 ) | hex_value self.__checksum += self.__buffer[newByteIndex] else: # Even: set 7-4 bit of the byte self.__buffer.append(hex_value << 4) self.__character_count += 1 elif (not use_lf and character == ord("\r")) or ( use_lf and character == ord("\n") ): # Abort if received data are not valid if ( not self.__character_count >= 4 and (self.__character_count & 1) == 0 ): self.__state = self.__State.UNKNOWN_ERROR if self.__debugging: print("LENGTH ERROR") # Mask checksum self.__checksum = self.__checksum & 0xFF # Abort if the checksum is not valid if not self.__checksum == 0: self.__state = self.__State.CHECKSUM_ERROR if self.__debugging: print("CHECKSUM_ERROR") if not use_lf: self.__state = self.__State.WAITING_FOR_FOOTER else: self.__state = self.__State.COMPLETED else: # Invalid characters self.__state = self.__State.UNKNOWN_ERROR if self.__debugging: print(f"INVALID CHAR ERROR: {repr(chr(character))}") case self.__State.WAITING_FOR_FOOTER: if character == ord("\n"): # Completed self.__state = self.__State.COMPLETED else: # CR only self.__state = self.__State.UNKNOWN_ERROR if self.__debugging: print("NO LF ERROR") case _: self.__state = self.__State.UNKNOWN_ERROR if self.__debugging: print("UNKNOWN ERROR") # Make bare packet available when parsing was completed if self.__state == self.__State.COMPLETED: bare_packet_data: dict[str, Any] = { "payload": bytes(self.__buffer[:-1]), # -1 for checksum "checksum": self.__buffer[ utils.byte_count_from(self.__character_count) - 1 ], } return common.BarePacket(**bare_packet_data) return None