diff --git a/nixos/doc/manual/development/writing-nixos-tests.xml b/nixos/doc/manual/development/writing-nixos-tests.xml index 150bea8c2d8..ff37b3b2f6f 100644 --- a/nixos/doc/manual/development/writing-nixos-tests.xml +++ b/nixos/doc/manual/development/writing-nixos-tests.xml @@ -360,6 +360,18 @@ start_all() + + + wait_for_console_text + + + + Wait until the supplied regular expressions match a line of the serial + console output. This method is useful when OCR is not possibile or + accurate enough. + + + wait_for_window diff --git a/nixos/lib/test-driver/test-driver.py b/nixos/lib/test-driver/test-driver.py index e7b05968b07..0c1946387ae 100644 --- a/nixos/lib/test-driver/test-driver.py +++ b/nixos/lib/test-driver/test-driver.py @@ -3,6 +3,8 @@ from contextlib import contextmanager, _GeneratorContextManager from queue import Queue, Empty from typing import Tuple, Any, Callable, Dict, Iterator, Optional, List from xml.sax.saxutils import XMLGenerator +import queue +import io import _thread import atexit import base64 @@ -671,6 +673,22 @@ class Machine: with self.nested("waiting for {} to appear on screen".format(regex)): retry(screen_matches) + def wait_for_console_text(self, regex: str) -> None: + self.log("waiting for {} to appear on console".format(regex)) + # Buffer the console output, this is needed + # to match multiline regexes. + console = io.StringIO() + while True: + try: + console.write(self.last_lines.get()) + except queue.Empty: + self.sleep(1) + continue + console.seek(0) + matches = re.search(regex, console.read()) + if matches is not None: + return + def send_key(self, key: str) -> None: key = CHAR_TO_KEY.get(key, key) self.send_monitor_command("sendkey {}".format(key)) @@ -734,11 +752,16 @@ class Machine: self.monitor, _ = self.monitor_socket.accept() self.shell, _ = self.shell_socket.accept() + # Store last serial console lines for use + # of wait_for_console_text + self.last_lines: Queue = Queue() + def process_serial_output() -> None: assert self.process.stdout is not None for _line in self.process.stdout: # Ignore undecodable bytes that may occur in boot menus line = _line.decode(errors="ignore").replace("\r", "").rstrip() + self.last_lines.put(line) eprint("{} # {}".format(self.name, line)) self.logger.enqueue({"msg": line, "machine": self.name})