lib.grade_helpers
1import re 2from ipaddress import IPv4Address, IPv4Interface 3from typing import Union 4 5from SRE.lib_sre import Grade0 6 7 8def test_dig(grade: Grade0, machine_name: str, 9 server_ip: Union[str, IPv4Address, IPv4Interface], *, 10 proto: str = "udp", port: int = 53, request: str, 11 timeout: int = 2, step: int = 1) -> tuple[str, int]: 12 """Run a ``dig +short`` query from a container and return ``(stdout, exit_code)``. 13 14 The command issued is:: 15 16 dig +time={timeout-1} +tries=1 +short [+tcp] -p {port} @{server_ip} {request} 17 18 Args: 19 grade: the Grade0 instance — required so the command is 20 registered in the first pass and re-fetched in the second. 21 machine_name: container the dig command is executed from. 22 server_ip: target DNS server. Accepts a dotted-quad string, an 23 ``IPv4Address`` or an ``IPv4Interface`` (the prefix is 24 stripped). Substituted as ``@server_ip``. 25 proto: ``"udp"`` (default) or ``"tcp"``; TCP adds ``+tcp``. 26 port: server port (default 53). 27 request: dig query body — everything that follows ``@server``, 28 e.g. ``"www.example.com A"`` or ``"example.com SOA"``. 29 timeout: total timeout in seconds passed to ``grade.test()`` (default 2). 30 ``dig`` itself is given ``+time={timeout-1}`` so it gives up 31 one second before the outer test, leaving room for clean exit. 32 step: step number passed to ``grade.test()`` (default 1). 33 34 Returns: 35 ``(stdout, exit_code)`` — stdout is stripped. On time-outs or unreachable 36 servers the output may be empty; the exit code from ``grade.test()`` is 37 passed through unchanged. Errors are not recorded (``allow_error=True``). 38 """ 39 if isinstance(server_ip, IPv4Interface): 40 server_ip_str = str(server_ip.ip) 41 elif isinstance(server_ip, IPv4Address): 42 server_ip_str = str(server_ip) 43 else: 44 server_ip_str = server_ip 45 proto_flag = " +tcp" if proto.lower() == "tcp" else "" 46 cmd = (f"dig +time={timeout - 1} +tries=1 +short{proto_flag} " 47 f"-p {port} @{server_ip_str} {request}") 48 out, code = grade.test(machine_name, cmd, 49 step=step, timeout=timeout, allow_error=True) 50 return (out or "").strip(), code 51 52 53def eval_tcp_server(grade: Grade0, machine_name: str, server_name: str, 54 step: int = 1) -> list[int] | None: 55 """Check that a process matching server_name is running on machine_name and 56 return the TCP ports it listens on. 57 58 Args: 59 grade: the Grade0 instance. 60 machine_name: name of the virtual machine to inspect. 61 server_name: substring to match against running process names (ps output). 62 step: step number passed to grade.test() (default: 1). 63 64 Returns: 65 A list of TCP port numbers in LISTEN state used by the process, 66 or None if no matching process is found. 67 """ 68 # All grade.test() calls must be made unconditionally so they are registered 69 # in the first (registration) pass and carry real results in the second pass. 70 pids_out, pids_code = grade.test( 71 machine_name=machine_name, 72 command=f"pgrep -f {server_name}", 73 step=step, 74 ) 75 ss_out, _ = grade.test( 76 machine_name=machine_name, 77 command=f"ss -tlnp", 78 step=step, 79 ) 80 81 if pids_code != 0: 82 return None 83 84 pids = set(pids_out.split()) 85 if not pids: 86 return None 87 88 ports = [] 89 for line in ss_out.splitlines(): 90 # ss -tlnp output has a "users:(("name",pid=NNN,...))" field 91 if "LISTEN" not in line: 92 continue 93 if not any(f"pid={pid}" in line for pid in pids): 94 continue 95 # Extract port from the local address column (e.g. 0.0.0.0:443 or *:80) 96 addr_m = re.search(r'\s+\*?[\d.:]+:(\d+)\s+', line) 97 if addr_m: 98 ports.append(int(addr_m.group(1))) 99 100 return sorted(set(ports))
def
test_dig( grade: SRE.lib_sre.Grade0, machine_name: str, server_ip: Union[str, ipaddress.IPv4Address, ipaddress.IPv4Interface], *, proto: str = 'udp', port: int = 53, request: str, timeout: int = 2, step: int = 1) -> tuple[str, int]:
9def test_dig(grade: Grade0, machine_name: str, 10 server_ip: Union[str, IPv4Address, IPv4Interface], *, 11 proto: str = "udp", port: int = 53, request: str, 12 timeout: int = 2, step: int = 1) -> tuple[str, int]: 13 """Run a ``dig +short`` query from a container and return ``(stdout, exit_code)``. 14 15 The command issued is:: 16 17 dig +time={timeout-1} +tries=1 +short [+tcp] -p {port} @{server_ip} {request} 18 19 Args: 20 grade: the Grade0 instance — required so the command is 21 registered in the first pass and re-fetched in the second. 22 machine_name: container the dig command is executed from. 23 server_ip: target DNS server. Accepts a dotted-quad string, an 24 ``IPv4Address`` or an ``IPv4Interface`` (the prefix is 25 stripped). Substituted as ``@server_ip``. 26 proto: ``"udp"`` (default) or ``"tcp"``; TCP adds ``+tcp``. 27 port: server port (default 53). 28 request: dig query body — everything that follows ``@server``, 29 e.g. ``"www.example.com A"`` or ``"example.com SOA"``. 30 timeout: total timeout in seconds passed to ``grade.test()`` (default 2). 31 ``dig`` itself is given ``+time={timeout-1}`` so it gives up 32 one second before the outer test, leaving room for clean exit. 33 step: step number passed to ``grade.test()`` (default 1). 34 35 Returns: 36 ``(stdout, exit_code)`` — stdout is stripped. On time-outs or unreachable 37 servers the output may be empty; the exit code from ``grade.test()`` is 38 passed through unchanged. Errors are not recorded (``allow_error=True``). 39 """ 40 if isinstance(server_ip, IPv4Interface): 41 server_ip_str = str(server_ip.ip) 42 elif isinstance(server_ip, IPv4Address): 43 server_ip_str = str(server_ip) 44 else: 45 server_ip_str = server_ip 46 proto_flag = " +tcp" if proto.lower() == "tcp" else "" 47 cmd = (f"dig +time={timeout - 1} +tries=1 +short{proto_flag} " 48 f"-p {port} @{server_ip_str} {request}") 49 out, code = grade.test(machine_name, cmd, 50 step=step, timeout=timeout, allow_error=True) 51 return (out or "").strip(), code
Run a dig +short query from a container and return (stdout, exit_code).
The command issued is::
dig +time={timeout-1} +tries=1 +short [+tcp] -p {port} @{server_ip} {request}
Arguments:
- grade: the Grade0 instance — required so the command is registered in the first pass and re-fetched in the second.
- machine_name: container the dig command is executed from.
- server_ip: target DNS server. Accepts a dotted-quad string, an
IPv4Addressor anIPv4Interface(the prefix is stripped). Substituted as@server_ip. - proto:
"udp"(default) or"tcp"; TCP adds+tcp. - port: server port (default 53).
- request: dig query body — everything that follows
@server, e.g."www.example.com A"or"example.com SOA". - timeout: total timeout in seconds passed to
grade.test()(default 2).digitself is given+time={timeout-1}so it gives up one second before the outer test, leaving room for clean exit. - step: step number passed to
grade.test()(default 1).
Returns:
(stdout, exit_code)— stdout is stripped. On time-outs or unreachable servers the output may be empty; the exit code fromgrade.test()is passed through unchanged. Errors are not recorded (allow_error=True).
def
eval_tcp_server( grade: SRE.lib_sre.Grade0, machine_name: str, server_name: str, step: int = 1) -> list[int] | None:
54def eval_tcp_server(grade: Grade0, machine_name: str, server_name: str, 55 step: int = 1) -> list[int] | None: 56 """Check that a process matching server_name is running on machine_name and 57 return the TCP ports it listens on. 58 59 Args: 60 grade: the Grade0 instance. 61 machine_name: name of the virtual machine to inspect. 62 server_name: substring to match against running process names (ps output). 63 step: step number passed to grade.test() (default: 1). 64 65 Returns: 66 A list of TCP port numbers in LISTEN state used by the process, 67 or None if no matching process is found. 68 """ 69 # All grade.test() calls must be made unconditionally so they are registered 70 # in the first (registration) pass and carry real results in the second pass. 71 pids_out, pids_code = grade.test( 72 machine_name=machine_name, 73 command=f"pgrep -f {server_name}", 74 step=step, 75 ) 76 ss_out, _ = grade.test( 77 machine_name=machine_name, 78 command=f"ss -tlnp", 79 step=step, 80 ) 81 82 if pids_code != 0: 83 return None 84 85 pids = set(pids_out.split()) 86 if not pids: 87 return None 88 89 ports = [] 90 for line in ss_out.splitlines(): 91 # ss -tlnp output has a "users:(("name",pid=NNN,...))" field 92 if "LISTEN" not in line: 93 continue 94 if not any(f"pid={pid}" in line for pid in pids): 95 continue 96 # Extract port from the local address column (e.g. 0.0.0.0:443 or *:80) 97 addr_m = re.search(r'\s+\*?[\d.:]+:(\d+)\s+', line) 98 if addr_m: 99 ports.append(int(addr_m.group(1))) 100 101 return sorted(set(ports))
Check that a process matching server_name is running on machine_name and return the TCP ports it listens on.
Arguments:
- grade: the Grade0 instance.
- machine_name: name of the virtual machine to inspect.
- server_name: substring to match against running process names (ps output).
- step: step number passed to grade.test() (default: 1).
Returns:
A list of TCP port numbers in LISTEN state used by the process, or None if no matching process is found.