lib.grade_helpers

  1import re
  2from ipaddress import IPv4Address, IPv4Interface
  3from typing import Union
  4
  5from SRE.lib_sre import Grade0
  6
  7
  8def test_dig(grade: Grade0, machine_name: str,
  9             server_ip: Union[str, IPv4Address, IPv4Interface], *,
 10             proto: str = "udp", port: int = 53, request: str,
 11             timeout: int = 2, step: int = 1) -> tuple[str, int]:
 12    """Run a ``dig +short`` query from a container and return ``(stdout, exit_code)``.
 13
 14    The command issued is::
 15
 16        dig +time={timeout-1} +tries=1 +short [+tcp] -p {port} @{server_ip} {request}
 17
 18    Args:
 19        grade:        the Grade0 instance — required so the command is
 20                      registered in the first pass and re-fetched in the second.
 21        machine_name: container the dig command is executed from.
 22        server_ip:    target DNS server. Accepts a dotted-quad string, an
 23                      ``IPv4Address`` or an ``IPv4Interface`` (the prefix is
 24                      stripped). Substituted as ``@server_ip``.
 25        proto:        ``"udp"`` (default) or ``"tcp"``; TCP adds ``+tcp``.
 26        port:         server port (default 53).
 27        request:      dig query body — everything that follows ``@server``,
 28                      e.g. ``"www.example.com A"`` or ``"example.com SOA"``.
 29        timeout:      total timeout in seconds passed to ``grade.test()`` (default 2).
 30                      ``dig`` itself is given ``+time={timeout-1}`` so it gives up
 31                      one second before the outer test, leaving room for clean exit.
 32        step:         step number passed to ``grade.test()`` (default 1).
 33
 34    Returns:
 35        ``(stdout, exit_code)`` — stdout is stripped. On time-outs or unreachable
 36        servers the output may be empty; the exit code from ``grade.test()`` is
 37        passed through unchanged. Errors are not recorded (``allow_error=True``).
 38    """
 39    if isinstance(server_ip, IPv4Interface):
 40        server_ip_str = str(server_ip.ip)
 41    elif isinstance(server_ip, IPv4Address):
 42        server_ip_str = str(server_ip)
 43    else:
 44        server_ip_str = server_ip
 45    proto_flag = " +tcp" if proto.lower() == "tcp" else ""
 46    cmd = (f"dig +time={timeout - 1} +tries=1 +short{proto_flag} "
 47           f"-p {port} @{server_ip_str} {request}")
 48    out, code = grade.test(machine_name, cmd,
 49                           step=step, timeout=timeout, allow_error=True)
 50    return (out or "").strip(), code
 51
 52
 53def eval_tcp_server(grade: Grade0, machine_name: str, server_name: str,
 54                    step: int = 1) -> list[int] | None:
 55    """Check that a process matching server_name is running on machine_name and
 56    return the TCP ports it listens on.
 57
 58    Args:
 59        grade:        the Grade0 instance.
 60        machine_name: name of the virtual machine to inspect.
 61        server_name:  substring to match against running process names (ps output).
 62        step:         step number passed to grade.test() (default: 1).
 63
 64    Returns:
 65        A list of TCP port numbers in LISTEN state used by the process,
 66        or None if no matching process is found.
 67    """
 68    # All grade.test() calls must be made unconditionally so they are registered
 69    # in the first (registration) pass and carry real results in the second pass.
 70    pids_out, pids_code = grade.test(
 71        machine_name=machine_name,
 72        command=f"pgrep -f {server_name}",
 73        step=step,
 74    )
 75    ss_out, _ = grade.test(
 76        machine_name=machine_name,
 77        command=f"ss -tlnp",
 78        step=step,
 79    )
 80
 81    if pids_code != 0:
 82        return None
 83
 84    pids = set(pids_out.split())
 85    if not pids:
 86        return None
 87
 88    ports = []
 89    for line in ss_out.splitlines():
 90        # ss -tlnp output has a "users:(("name",pid=NNN,...))" field
 91        if "LISTEN" not in line:
 92            continue
 93        if not any(f"pid={pid}" in line for pid in pids):
 94            continue
 95        # Extract port from the local address column (e.g. 0.0.0.0:443 or *:80)
 96        addr_m = re.search(r'\s+\*?[\d.:]+:(\d+)\s+', line)
 97        if addr_m:
 98            ports.append(int(addr_m.group(1)))
 99
100    return sorted(set(ports))
def test_dig( grade: SRE.lib_sre.Grade0, machine_name: str, server_ip: Union[str, ipaddress.IPv4Address, ipaddress.IPv4Interface], *, proto: str = 'udp', port: int = 53, request: str, timeout: int = 2, step: int = 1) -> tuple[str, int]:
 9def test_dig(grade: Grade0, machine_name: str,
10             server_ip: Union[str, IPv4Address, IPv4Interface], *,
11             proto: str = "udp", port: int = 53, request: str,
12             timeout: int = 2, step: int = 1) -> tuple[str, int]:
13    """Run a ``dig +short`` query from a container and return ``(stdout, exit_code)``.
14
15    The command issued is::
16
17        dig +time={timeout-1} +tries=1 +short [+tcp] -p {port} @{server_ip} {request}
18
19    Args:
20        grade:        the Grade0 instance — required so the command is
21                      registered in the first pass and re-fetched in the second.
22        machine_name: container the dig command is executed from.
23        server_ip:    target DNS server. Accepts a dotted-quad string, an
24                      ``IPv4Address`` or an ``IPv4Interface`` (the prefix is
25                      stripped). Substituted as ``@server_ip``.
26        proto:        ``"udp"`` (default) or ``"tcp"``; TCP adds ``+tcp``.
27        port:         server port (default 53).
28        request:      dig query body — everything that follows ``@server``,
29                      e.g. ``"www.example.com A"`` or ``"example.com SOA"``.
30        timeout:      total timeout in seconds passed to ``grade.test()`` (default 2).
31                      ``dig`` itself is given ``+time={timeout-1}`` so it gives up
32                      one second before the outer test, leaving room for clean exit.
33        step:         step number passed to ``grade.test()`` (default 1).
34
35    Returns:
36        ``(stdout, exit_code)`` — stdout is stripped. On time-outs or unreachable
37        servers the output may be empty; the exit code from ``grade.test()`` is
38        passed through unchanged. Errors are not recorded (``allow_error=True``).
39    """
40    if isinstance(server_ip, IPv4Interface):
41        server_ip_str = str(server_ip.ip)
42    elif isinstance(server_ip, IPv4Address):
43        server_ip_str = str(server_ip)
44    else:
45        server_ip_str = server_ip
46    proto_flag = " +tcp" if proto.lower() == "tcp" else ""
47    cmd = (f"dig +time={timeout - 1} +tries=1 +short{proto_flag} "
48           f"-p {port} @{server_ip_str} {request}")
49    out, code = grade.test(machine_name, cmd,
50                           step=step, timeout=timeout, allow_error=True)
51    return (out or "").strip(), code

Run a dig +short query from a container and return (stdout, exit_code).

The command issued is::

dig +time={timeout-1} +tries=1 +short [+tcp] -p {port} @{server_ip} {request}
Arguments:
  • grade: the Grade0 instance — required so the command is registered in the first pass and re-fetched in the second.
  • machine_name: container the dig command is executed from.
  • server_ip: target DNS server. Accepts a dotted-quad string, an IPv4Address or an IPv4Interface (the prefix is stripped). Substituted as @server_ip.
  • proto: "udp" (default) or "tcp"; TCP adds +tcp.
  • port: server port (default 53).
  • request: dig query body — everything that follows @server, e.g. "www.example.com A" or "example.com SOA".
  • timeout: total timeout in seconds passed to grade.test() (default 2). dig itself is given +time={timeout-1} so it gives up one second before the outer test, leaving room for clean exit.
  • step: step number passed to grade.test() (default 1).
Returns:

(stdout, exit_code) — stdout is stripped. On time-outs or unreachable servers the output may be empty; the exit code from grade.test() is passed through unchanged. Errors are not recorded (allow_error=True).

def eval_tcp_server( grade: SRE.lib_sre.Grade0, machine_name: str, server_name: str, step: int = 1) -> list[int] | None:
 54def eval_tcp_server(grade: Grade0, machine_name: str, server_name: str,
 55                    step: int = 1) -> list[int] | None:
 56    """Check that a process matching server_name is running on machine_name and
 57    return the TCP ports it listens on.
 58
 59    Args:
 60        grade:        the Grade0 instance.
 61        machine_name: name of the virtual machine to inspect.
 62        server_name:  substring to match against running process names (ps output).
 63        step:         step number passed to grade.test() (default: 1).
 64
 65    Returns:
 66        A list of TCP port numbers in LISTEN state used by the process,
 67        or None if no matching process is found.
 68    """
 69    # All grade.test() calls must be made unconditionally so they are registered
 70    # in the first (registration) pass and carry real results in the second pass.
 71    pids_out, pids_code = grade.test(
 72        machine_name=machine_name,
 73        command=f"pgrep -f {server_name}",
 74        step=step,
 75    )
 76    ss_out, _ = grade.test(
 77        machine_name=machine_name,
 78        command=f"ss -tlnp",
 79        step=step,
 80    )
 81
 82    if pids_code != 0:
 83        return None
 84
 85    pids = set(pids_out.split())
 86    if not pids:
 87        return None
 88
 89    ports = []
 90    for line in ss_out.splitlines():
 91        # ss -tlnp output has a "users:(("name",pid=NNN,...))" field
 92        if "LISTEN" not in line:
 93            continue
 94        if not any(f"pid={pid}" in line for pid in pids):
 95            continue
 96        # Extract port from the local address column (e.g. 0.0.0.0:443 or *:80)
 97        addr_m = re.search(r'\s+\*?[\d.:]+:(\d+)\s+', line)
 98        if addr_m:
 99            ports.append(int(addr_m.group(1)))
100
101    return sorted(set(ports))

Check that a process matching server_name is running on machine_name and return the TCP ports it listens on.

Arguments:
  • grade: the Grade0 instance.
  • machine_name: name of the virtual machine to inspect.
  • server_name: substring to match against running process names (ps output).
  • step: step number passed to grade.test() (default: 1).
Returns:

A list of TCP port numbers in LISTEN state used by the process, or None if no matching process is found.