Skip to content

Commit

Permalink
[serial] Provide a generic version of the command prompt
Browse files Browse the repository at this point in the history
  • Loading branch information
niklaut committed Nov 29, 2024
1 parent 677e7ef commit a507e74
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 43 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 1.7.0

- Generalize the `Nsh` command prompt into `Cmd`.

## 1.6.0

- Add inline function analyzer.
Expand Down
2 changes: 1 addition & 1 deletion src/emdbg/serial/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
.. include:: README.md
"""

from .protocol import nsh
from .protocol import cmd, nsh
87 changes: 45 additions & 42 deletions src/emdbg/serial/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
from pathlib import Path
from serial import Serial
from serial.threaded import ReaderThread, Protocol
from .utils import find_serial_port
from .utils import find_serial_port, ansi_escape
from ..utils import add_datetime as add_dt

_LOGGER = logging.getLogger("serial:nsh")


class _NshReader(Protocol):
class _CmdReader(Protocol):
def __init__(self, device: Serial):
super().__init__()
self.stream = ""
Expand Down Expand Up @@ -45,34 +45,31 @@ def clear(self):
self.clear_input()


# -----------------------------------------------------------------------------
class Nsh:
class CommandPrompt:
"""
Manages the NSH protocol, in particular, receiving data in the background
Manages a command prompt, in particular, receiving data in the background
and logging it out to the INFO logger.
Several convenience methods allow you to send a command and receive its
response, or wait for a certain pattern to arrive in the stream.
Transmitting a command is intentionally slow to not overflow the NSH receive
buffers while it is being debugged. Note that the PX4 target needs to be
running for the NSH to be functional.
"""
_ANSI_ESCAPE = re.compile(r"\x1b(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
_NEWLINE = "\r\n"
_PROMPT = "nsh> "
_TIMEOUT = 3

def __init__(self, reader_thread: ReaderThread, protocol: _NshReader):
def __init__(self, reader_thread: ReaderThread, protocol: _CmdReader,
prompt: str = None, newline: str = None):
"""
Use the `nsh` context manager to build this class correctly.
:param reader_thread: The background reader thread.
:param protocol: The NSH protocol.
:param protocol: The command prompt protocol.
:param prompt: Optional prefix of the command prompt (default empty string).
:param newline: The newline characters used in the prompt (default `\\r\\n`).
"""
self._serial = protocol
self._reader_thread = reader_thread
self._serial._data_received = self._print
self._print_data = ""
self._prompt = "" if prompt is None else prompt
self._newline = "\r\n" if newline is None else newline
self.filter_ansi_escapes = True
"""Filter ANSI escape codes from the output."""
self._logfile = None
Expand All @@ -83,16 +80,16 @@ def _write_line(self, line):

def _print(self, data: str):
self._print_data += data
if Nsh._NEWLINE in self._print_data:
*lines, self._print_data = self._print_data.split(Nsh._NEWLINE)
if self._newline in self._print_data:
*lines, self._print_data = self._print_data.split(self._newline)
for line in self._filter(lines):
_LOGGER.debug(line)
if self._logfile is not None:
self._logfile.write(line + "\n")

def _filter(self, lines):
if self.filter_ansi_escapes:
lines = [self._ANSI_ESCAPE.sub('', line) for line in lines]
lines = list(map(ansi_escape, lines))
return lines

def _read_packets(self, separator: str, timeout: float = _TIMEOUT) -> list[str]:
Expand All @@ -106,8 +103,7 @@ def _read_packets(self, separator: str, timeout: float = _TIMEOUT) -> list[str]:
return []

def _join(self, lines: list[str]) -> str | None:
return Nsh._NEWLINE.join(lines) if lines else None

return self._newline.join(lines) if lines else None

def clear(self):
"""Clear the receive and transmit buffers."""
Expand Down Expand Up @@ -136,7 +132,6 @@ def log_to_file(self, filename: Path | str, add_datetime: bool = False) -> Path:
self._logfile = filename.open("wt")
return filename


def read_lines(self, timeout: float = _TIMEOUT) -> str | None:
"""
Return any lines received within `timeout`.
Expand All @@ -147,10 +142,9 @@ def read_lines(self, timeout: float = _TIMEOUT) -> str | None:
:return: received lines or None on timeout
"""
lines = self._filter(self._read_packets(Nsh._NEWLINE, timeout))
lines = self._filter(self._read_packets(self._newline, timeout))
return self._join(lines)


def wait_for(self, pattern: str, timeout: float = _TIMEOUT) -> str | None:
"""
Waits for a regex pattern to appear in a line in the stream.
Expand Down Expand Up @@ -180,29 +174,28 @@ def wait_for(self, pattern: str, timeout: float = _TIMEOUT) -> str | None:

def wait_for_prompt(self, timeout: float = _TIMEOUT) -> list[str]:
"""
Waits to the `nsh> ` prompt to arrive in the stream.
Waits to the prompt to arrive in the stream.
Note that any ANSI escape codes (for color or cursor position) are
filtered out.
:param timeout: seconds to wait until the prompt arrives.
:return: all lines until the prompt arrives.
"""
if prompts := self._read_packets(Nsh._NEWLINE + Nsh._PROMPT, timeout):
prompt = Nsh._PROMPT + Nsh._PROMPT.join(prompts)
return self._join(self._filter(prompt.split(Nsh._NEWLINE)))
_LOGGER.warning(f"Waiting for 'nsh> ' prompt timed out after {timeout:.1f}s!")
if prompts := self._read_packets(self._newline + self._prompt, timeout):
prompt = self._prompt + self._prompt.join(prompts)
return self._join(self._filter(prompt.split(self._newline)))
_LOGGER.warning(f"Waiting for '{self._prompt}' prompt timed out after {timeout:.1f}s!")
return None


def command(self, command: str, timeout: float = _TIMEOUT) -> str | None:
"""
Send a command and return all lines until the next prompt.
If the command is asynchronous, you need to poll for new lines separately.
Note that any ANSI escape codes (for color or cursor position) are
filtered out.
:param command: command string to send to the NSH.
:param command: command string to send to the command prompt.
:param timeout: seconds to wait until the prompt arrives.
:return: all lines from the command issue until the next prompt arrives.
Expand All @@ -214,13 +207,12 @@ def command(self, command: str, timeout: float = _TIMEOUT) -> str | None:

def command_nowait(self, command: str):
"""
Send a command to the NSH without waiting for a response.
Send a command to the command prompt without waiting for a response.
:param command: command string to send to the NSH.
:param command: command string to send to the command prompt.
"""
self.command(command, None)


def reboot(self, timeout: int = 15) -> str | None:
"""
Send the reboot command and wait for the reboot to be completed.
Expand All @@ -233,12 +225,12 @@ def reboot(self, timeout: int = 15) -> str | None:

def is_alive(self, timeout: float = _TIMEOUT, attempts: int = 4) -> bool:
"""
Check if the NSH is responding to newline inputs with a `nsh> ` prompt.
Check if the command prompt is responding to newline inputs with a prompt.
The total timeout is `attempts * timeout`!
:param timeout: seconds to wait until the prompt arrives.
:param attempts: number of times to send a newline and wait.
:return: `True` is NSH responds, `False` otherwise
:return: `True` is command prompt responds, `False` otherwise
"""
self._serial.clear()
attempt = 0
Expand All @@ -253,36 +245,47 @@ def is_alive(self, timeout: float = _TIMEOUT, attempts: int = 4) -> bool:

# -----------------------------------------------------------------------------
@contextmanager
def nsh(serial_or_port: str, baudrate: int = 57600):
def cmd(serial_or_port: str, baudrate: int = 115200, prompt: str = None, newline: str = None):
"""
Opens a serial port with the `serial` number or filepath and closes it again.
:param serial_or_port: the serial number or the filepath of the port to
connect to.
:param baudrate: the baudrate to use.
:param prompt: Optional prefix of the command prompt.
:param newline: The newline characters used in the prompt.
:raises `SerialException`: if serial port is not found.
:return: yields an initialized `Nsh` object.
:return: yields an initialized `CommandPrompt` object.
"""
nsh = None
cmd = None
if "/" in serial_or_port:
ttyDevice = serial_or_port
else:
ttyDevice = find_serial_port(serial_or_port).device
try:
_LOGGER.info(f"Starting on port '{serial_or_port}'..."
if serial_or_port else "Starting...")
if serial_or_port else "Starting...")
device = Serial(ttyDevice, baudrate=baudrate)
reader_thread = ReaderThread(device, lambda: _NshReader(device))
reader_thread = ReaderThread(device, lambda: _CmdReader(device))
with reader_thread as reader:
nsh = Nsh(reader_thread, reader)
yield nsh
cmd = CommandPrompt(reader_thread, reader, prompt, newline)
yield cmd
finally:
if nsh is not None: nsh.log_to_file(None)
if cmd is not None: cmd.log_to_file(None)
_LOGGER.debug("Stopping.")


@contextmanager
def nsh(serial_or_port: str, baudrate: int = 57600):
"""
Same as `cmd()` but with a `nsh> ` prompt for use with PX4.
"""
with cmd(serial_or_port, baudrate, "nsh> ") as nsh:
yield nsh


# -----------------------------------------------------------------------------
# We need to monkey patch the ReaderThread.run() function to prevent a
# "device not ready" error to abort the reader thread.
Expand Down
8 changes: 8 additions & 0 deletions src/emdbg/serial/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from __future__ import annotations
from serial.tools import list_ports
import re
import logging
_LOGGER = logging.getLogger("serial")

Expand Down Expand Up @@ -42,3 +43,10 @@ def find_serial_port(identifier=None):
raise SerialException(msg)

return serials


_ANSI_ESCAPE = re.compile(r"\x1b(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")

def ansi_escape(line: str) -> str:
"""Removes ANSI escape sequences from a string"""
return _ANSI_ESCAPE.sub("", line)

0 comments on commit a507e74

Please sign in to comment.