Source code for mwings.utils

# -*- coding:utf-8 -*-
# Written for Python 3.12
# Formatted with Black

# Utils for MWings

import sys
import os
import subprocess
import time
import math
import re
from pathlib import Path
from typing import Callable

import serial  # type: ignore
from serial.tools import list_ports  # type: ignore


[docs] def open_on_system(path: Path) -> None: """Open the file in the given path on system using default application Parameters ---------- path : Path pathlib.Path for file """ if sys.platform == "win32": os.startfile(path) else: opener = "open" if sys.platform == "darwin" else "xdg-open" subprocess.call([opener, path])
[docs] def ask_user( prompt: str, regex: str, on_error: str, ex_verifier: Callable[[str], bool] | None = None, max_attempts: int | None = None, ) -> str: """Ask user for something in text Parameters ---------- prompt : str Prompt message regex : str Regular expression for validate user input on_error : str Message for invalid input ex_verifier : Callable[[str], bool] | None Extra verifier in addition to regex max_attempts : int | None Max count for attempts. None to infinite Returns ------- str Valid user input """ attempt = 0 while True: if max_attempts and attempt >= max_attempts: return "" try: user_input = input(prompt) if (re.search(regex, user_input) is None) or ( ex_verifier is not None and not ex_verifier(user_input) ): raise ValueError(on_error) break except ValueError as e: print(e) attempt = attempt + 1 return user_input
[docs] def get_ports() -> list[str]: """Get port informations Returns ------- list[str] List of port names """ comports: list[list_ports.ListPortInfo] = list_ports.comports() return [port.device for port in comports]
[docs] def is_there_some_ports() -> bool: """Check if there is some ports exists Returns ------- bool True if exists """ return bool(len(get_ports()))
[docs] def ask_user_for_port() -> str: """Ask the user for the port to use If there's only one port, auto selects without asking. Returns ------- str Port name (COM or fd) Notes ----- If the console is not available, raise EnvironmentError. If there's no ports, raise IOError. """ if not sys.stdin.isatty(): raise EnvironmentError("There's no console.") ports: list[list_ports.ListPortInfo] = list_ports.comports() if ports == []: raise IOError("There's no serial port.") if len(ports) == 1: print(f"Auto selected: {ports[0].device}") return str(ports[0].device) print("Multiple ports detected.") for i, port in enumerate(ports): if port.manufacturer == "MONOWIRELESS": # Print in magenta print(f"[{i+1}] {port.device} \033[35m{port.description} (Genuine)\033[00m") elif port.manufacturer == "TOCOS": # Print in blue print(f"[{i+1}] {port.device} \033[34m{port.description} (Legacy)\033[00m") else: print(f"[{i+1}] {port.device} {port.description}") user_input = ask_user( f"Select [1-{len(ports)}]: ", regex=r"^[0-9]+$", ex_verifier=lambda s: int(s) <= len(ports), on_error=f"Please answer 1-{len(ports)}.", ) print(f"Selected: {ports[int(user_input) - 1].device}") return str(ports[int(user_input) - 1].device)
[docs] def millis() -> int: """Get current time in milliseconds Returns ------- int Current epoch in milliseconds """ return round(time.time_ns() / 1000000)
[docs] def lrc8(data: bytes) -> int: """Calculate 8-bit LRC for given data Parameters ---------- data : bytes Bytes to calculate Returns ------- int LRC checksum """ return int(((sum(data) ^ 0xFF) + 1) & 0xFF)
[docs] def hex_from(character: int) -> int: """Convert to hex from character Parameters ---------- character : int Integer value of an ASCII character ('0'-'F') Returns ------- int Binary integer value (0x0-0xF) """ return character - ord("0") if character < ord("A") else character - ord("A") + 0xA
[docs] def character_from(hexvalue: int) -> int: """Convert to character from hex Parameters ---------- hexvalue : int Binary integer value (0x0-0xF) Returns ------- int Integer value of an ASCII character ('0'-'F') """ return ord("0") + hexvalue if hexvalue < 0xA else ord("A") + hexvalue - 0xA
[docs] def byte_count_from(character_count: int) -> int: """Convert to byte count from character count Parameters ---------- character_count : int bytes count in ascii format Returns ------- int bytes count in binary format """ return math.floor(character_count / 2)
[docs] def is_initialized(port: serial.Serial) -> bool: """Check if the serial port is initialized Parameters ---------- port : serial.Serial pyserial instance Returns ------- bool initialized if true """ return bool(port.readable() and port.writable())
[docs] def is_readable(port: serial.Serial) -> bool: """Check if the serial port is readable Parameters ---------- port : serial.Serial pyserial instance Returns ------- bool readable if true """ return bool(port.readable())
[docs] def is_writable(port: serial.Serial) -> bool: """Check if the serial port is writable Parameters ---------- port : serial.Serial pyserial instance Returns ------- bool writable if true """ return bool(port.writable())
[docs] def write_binary(port: serial.Serial, data: int | bytes) -> None: """Write binary integer value to the serial port Parameters ---------- port : serial.Serial pyserial instance data : int | bytes Binary integer value or bytes """ match data: case int(): if not is_writable(port): return None if not data >= 0: return None elif data <= 0xFF: port.write(data.to_bytes(1)) elif data <= 0xFFFF: port.write(data.to_bytes(2)) elif data <= 0xFFFFFFFF: port.write(data.to_bytes(4)) case bytes(): for byte in data: write_binary(port, byte)
[docs] def write_in_ascii(port: serial.Serial, data: int | bytes) -> None: """Write binary in ASCII format to the serial port Parameters ---------- port : serial.Serial pyserial instance data : int Binary integer value or bytes """ match data: case int(): if not is_writable(port): return None if not data >= 0: return None elif data <= 0xFF: port.write( bytes( [character_from((data >> i) & 0x0F) for i in range(4, -1, -4)] ) ) elif data <= 0xFFFF: port.write( bytes( [character_from((data >> i) & 0x0F) for i in range(12, -1, -4)] ) ) elif data <= 0xFFFFFFFF: port.write( bytes( [character_from((data >> i) & 0x0F) for i in range(28, -1, -4)] ) ) case bytes(): for byte in data: write_in_ascii(port, byte)
[docs] def flush_rx_buffer(port: serial.Serial) -> None: """Flush serial rx buffer Parameters ---------- port : serial.Serial pyserial instance """ port.reset_input_buffer()
[docs] def flush_tx_buffer(port: serial.Serial) -> None: """Flush serial tx buffer Parameters ---------- port : serial.Serial pyserial instance """ port.reset_output_buffer()
[docs] def find_binary( port: serial.Serial, data: bytes, timeout: int, with_terminal: bool = False, terminal: int = 0, debugging: bool = False, ) -> bool: """Find binary bytes in serial rx data Parameters ---------- port : serial.Serial pyserial instance data : bytes Binary data bytes to find timeout : int Timeout in seconds with_terminal : bool Use terminal byte for data input or not terminal : int Terminal byte for data input debugging : bool Print debug info if true Returns ------- bool Found if true """ if not is_initialized(port): return False if not len(data) > 0: return False timestamp: int = millis() while True: if millis() - timestamp > timeout * 1000: return False if port.in_waiting: read_byte = port.read() if debugging: print(read_byte.decode("utf-8")) if int.from_bytes(read_byte) == data[0]: # compare as int break for datum in data[1:]: # omit the first one if with_terminal and terminal == datum: break while True: if millis() - timestamp > timeout * 1000: return False if port.in_waiting: read_byte = port.read() if debugging: print(read_byte.decode("utf-8")) if int.from_bytes(read_byte) == datum: break else: # If data is invalid, retry recursively. return find_binary( port, data, timeout * 1000 - millis() + timestamp, with_terminal, terminal, debugging, ) return True
[docs] def find_ascii( port: serial.Serial, data: str, timeout: int, debugging: bool = False ) -> bool: """Find ASCII-formatted bytes in serial rx data Parameters ---------- port : serial.Serial pyserial instance data : str ASCII-formatted bytes to find timeout : int Timeout in seconds debugging : bool Print debug info if true Returns ------- bool Found if true """ return find_binary(port, data.encode("ascii"), timeout, False, 0, debugging)