nixos/lib/test-driver: add wait_for_console_text

This method is similar to wait_for_text but is based on matching
serial console lines instead of the VGA output.
This commit is contained in:
rnhmjoj 2020-06-13 12:04:05 +02:00
parent 37ec7c488a
commit b520055df6
No known key found for this signature in database
GPG key ID: BFBAF4C975F76450
2 changed files with 35 additions and 0 deletions

View file

@ -360,6 +360,18 @@ start_all()
</note> </note>
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry>
<term>
<methodname>wait_for_console_text</methodname>
</term>
<listitem>
<para>
Wait until the supplied regular expressions match a line of the serial
console output. This method is useful when OCR is not possibile or
accurate enough.
</para>
</listitem>
</varlistentry>
<varlistentry> <varlistentry>
<term> <term>
<methodname>wait_for_window</methodname> <methodname>wait_for_window</methodname>

View file

@ -3,6 +3,8 @@ from contextlib import contextmanager, _GeneratorContextManager
from queue import Queue, Empty from queue import Queue, Empty
from typing import Tuple, Any, Callable, Dict, Iterator, Optional, List from typing import Tuple, Any, Callable, Dict, Iterator, Optional, List
from xml.sax.saxutils import XMLGenerator from xml.sax.saxutils import XMLGenerator
import queue
import io
import _thread import _thread
import atexit import atexit
import base64 import base64
@ -671,6 +673,22 @@ class Machine:
with self.nested("waiting for {} to appear on screen".format(regex)): with self.nested("waiting for {} to appear on screen".format(regex)):
retry(screen_matches) retry(screen_matches)
def wait_for_console_text(self, regex: str) -> None:
self.log("waiting for {} to appear on console".format(regex))
# Buffer the console output, this is needed
# to match multiline regexes.
console = io.StringIO()
while True:
try:
console.write(self.last_lines.get())
except queue.Empty:
self.sleep(1)
continue
console.seek(0)
matches = re.search(regex, console.read())
if matches is not None:
return
def send_key(self, key: str) -> None: def send_key(self, key: str) -> None:
key = CHAR_TO_KEY.get(key, key) key = CHAR_TO_KEY.get(key, key)
self.send_monitor_command("sendkey {}".format(key)) self.send_monitor_command("sendkey {}".format(key))
@ -734,11 +752,16 @@ class Machine:
self.monitor, _ = self.monitor_socket.accept() self.monitor, _ = self.monitor_socket.accept()
self.shell, _ = self.shell_socket.accept() self.shell, _ = self.shell_socket.accept()
# Store last serial console lines for use
# of wait_for_console_text
self.last_lines: Queue = Queue()
def process_serial_output() -> None: def process_serial_output() -> None:
assert self.process.stdout is not None assert self.process.stdout is not None
for _line in self.process.stdout: for _line in self.process.stdout:
# Ignore undecodable bytes that may occur in boot menus # Ignore undecodable bytes that may occur in boot menus
line = _line.decode(errors="ignore").replace("\r", "").rstrip() line = _line.decode(errors="ignore").replace("\r", "").rstrip()
self.last_lines.put(line)
eprint("{} # {}".format(self.name, line)) eprint("{} # {}".format(self.name, line))
self.logger.enqueue({"msg": line, "machine": self.name}) self.logger.enqueue({"msg": line, "machine": self.name})