lib.net_config

  1import re
  2from collections import deque
  3from typing import List, Tuple, Dict, Any, Literal, TypeAlias
  4from ipaddress import IPv4Address, IPv4Interface, IPv4Network
  5
  6from SRE.lib_sre import NetScheme0, Grade0
  7
  8NetConfigInterface: TypeAlias = (
  9        Tuple[
 10            List[IPv4Interface],
 11            List[Tuple[IPv4Network, IPv4Address | IPv4Interface]]
 12        ]
 13        | Literal['dhcp']
 14        | None
 15)
 16
 17NetConfigEntry: TypeAlias = List[NetConfigInterface]
 18
 19NetConfig: TypeAlias = Dict[str, NetConfigInterface]
 20
 21SysctlConfig = Dict[str, Any]
 22
 23
 24def get_ip_addresses(grade: Grade0, machine_name: str, step: int = 1) -> Dict[str, List[Tuple[str, int]]]:
 25    """Run 'ip a' on machine_name and parse the output.
 26
 27    Returns a dict mapping interface name -> list of (address, prefix_len).
 28
 29    Addresses for each interface are sorted by:
 30      1. prefix length descending
 31      2. IP address lexicographically ascending
 32
 33    Edge cases:
 34      - virtual interfaces (e.g. eth0@if5) -> name stripped to 'eth0'
 35      - non-zero exit code or empty output  -> {}
 36    """
 37    output, code = grade.test(machine_name, 'ip a', step=step)
 38    if code != 0 or not output:
 39        return {}
 40    result: Dict[str, List[Tuple[str, int]]] = {}
 41    current_iface = None
 42    for line in output.splitlines():
 43        m = re.match(r'^\d+:\s+(\S+?)(?:@\S+)?:', line)
 44        if m:
 45            current_iface = m.group(1)
 46            result.setdefault(current_iface, [])
 47            continue
 48        m = re.match(r'^\s+inet\s+(\d+\.\d+\.\d+\.\d+)/(\d+)', line)
 49        if m and current_iface is not None:
 50            result[current_iface].append((m.group(1), int(m.group(2))))
 51    for iface in result:
 52        result[iface].sort(key=lambda x: (-x[1], x[0]))
 53    return result
 54
 55
 56def get_routes(grade: Grade0, machine_name: str, step: int = 1) -> Dict[Tuple[str, int], Tuple[str, str, int]]:
 57    """Run 'ip route' on machine_name and parse the output.
 58
 59    Returns a dict mapping (network, mask) -> (via, dev, metric).
 60
 61    Edge cases:
 62      - 'default' route              -> key ('0.0.0.0', 0)
 63      - host route without mask      -> mask 32
 64      - direct route (no via)        -> via ''
 65      - no explicit metric           -> metric 0
 66      - non-zero exit code or empty output -> {}
 67    """
 68    output, code = grade.test(machine_name, 'ip route', step=step)
 69    if code != 0 or not output:
 70        return {}
 71    result: Dict[Tuple[str, int], Tuple[str, str, int]] = {}
 72    for line in output.splitlines():
 73        parts = line.split()
 74        if not parts:
 75            continue
 76        dest = parts[0]
 77        if dest == 'default':
 78            net, mask = '0.0.0.0', 0
 79        elif '/' in dest:
 80            net, mask_str = dest.split('/')
 81            mask = int(mask_str)
 82        else:
 83            net, mask = dest, 32
 84        via, dev, metric = '', '', 0
 85        i = 1
 86        while i < len(parts):
 87            if parts[i] == 'via' and i + 1 < len(parts):
 88                via = parts[i + 1]
 89                i += 2
 90            elif parts[i] == 'dev' and i + 1 < len(parts):
 91                dev = parts[i + 1]
 92                i += 2
 93            elif parts[i] == 'metric' and i + 1 < len(parts):
 94                metric = int(parts[i + 1])
 95                i += 2
 96            else:
 97                i += 1
 98        result[(net, mask)] = (via, dev, metric)
 99    return result
100
101
102def get_sysctl_conf(grade: Grade0, machine_name: str, step: int = 1) -> Dict[str, str]:
103    """Read /etc/sysctl.conf and all files under /etc/sysctl.d/ and parse kernel parameters.
104
105    Returns a dict mapping parameter name -> value (both as strings).
106
107    Parsing rules:
108      - lines starting with '#' or ';' are comments and ignored
109      - blank lines are ignored
110      - accepted formats: 'key = value' and 'key=value'
111      - later definitions override earlier ones (sysctl.d files override sysctl.conf)
112
113    Edge cases:
114      - non-zero exit code from any grade.test() -> {} immediately
115      - first-pass empty output                  -> {} and continues to next call
116    """
117    result: Dict[str, str] = {}
118
119    def _parse(text: str) -> None:
120        for line in text.splitlines():
121            line = line.strip()
122            if not line or line[0] in ('#', ';'):
123                continue
124            if '=' in line:
125                key, _, value = line.partition('=')
126                result[key.strip()] = value.strip()
127
128    output, code = grade.test(machine_name, 'cat /etc/sysctl.conf', step=step)
129    if code != 0:
130        return {}
131    _parse(output)
132
133    output2, code = grade.test(machine_name, 'cat /etc/sysctl.d/*', step=step)
134    if code != 0:
135        return {}
136    _parse(output2)
137
138    return result
139
140
141def get_net_config_entry(grade: Grade0, machine_name: str, step: int = 1) -> NetConfigEntry:
142    """Run 'ip a', 'ip route', and 'ip link show' on machine_name and return a NetConfig.
143
144    Each entry in the returned list corresponds to one interface (eth0, eth1, …)
145    in index order.
146
147    Entry types:
148      - Tuple (addresses, routes): statically configured interface.
149      - 'dhcp': interface whose address(es) were all assigned dynamically (DHCP).
150      - None: interface present in the kernel but carrying no IP address.
151
152    The list covers eth0 … ethN where N is the highest eth index reported by
153    'ip link show'.  Interfaces beyond the highest present index are not included.
154
155    Edge cases:
156      - non-zero exit code or empty output from any command -> []
157      - interfaces whose name does not match eth\\d+  -> ignored
158      - routes with an unknown gateway -> attached to the first interface, or
159        dropped if there are no interfaces
160    """
161    raw_addrs = get_ip_addresses(grade, machine_name, step=step)
162    raw_routes = get_routes(grade, machine_name, step=step)
163
164    # Re-use the cached 'ip a' output to detect dynamic (DHCP) addresses.
165    ip_a_out, _ = grade.test(machine_name, 'ip a', step=step)
166
167    # Parse which interfaces have *all* their inet addresses marked dynamic.
168    dynamic_ifaces: set[str] = set()
169    current_iface: str | None = None
170    iface_addr_flags: dict[str, list[bool]] = {}  # iface -> [is_dynamic, ...]
171    for line in (ip_a_out or '').splitlines():
172        m = re.match(r'^\d+:\s+(\S+?)(?:@\S+)?:', line)
173        if m:
174            current_iface = m.group(1)
175            iface_addr_flags.setdefault(current_iface, [])
176            continue
177        m = re.match(r'^\s+inet\s+\S+', line)
178        if m and current_iface is not None:
179            iface_addr_flags.setdefault(current_iface, []).append('dynamic' in line)
180    for iface, flags in iface_addr_flags.items():
181        if flags and all(flags):
182            dynamic_ifaces.add(iface)
183
184    # Discover all present eth<N> interfaces via 'ip link show'.
185    ip_link_out, link_code = grade.test(machine_name, 'ip link show', step=step)
186    if link_code != 0 or not ip_link_out:
187        return []
188    present_eths: set[str] = set(
189        re.findall(r'^\d+:\s+(eth\d+)(?:@\S+)?:', ip_link_out, re.MULTILINE)
190    )
191    if not present_eths:
192        return []
193
194    max_n = max(int(name[3:]) for name in present_eths)
195
196    # Build interface → addresses lookup from raw_addrs (only non-empty entries).
197    eth_addrs: dict[str, list[tuple[str, int]]] = {
198        name: addrs
199        for name, addrs in raw_addrs.items()
200        if name.startswith('eth') and name[3:].isdigit() and addrs
201    }
202
203    # Collect static eth interfaces in order (for route distribution).
204    static_eth_ifaces: list[tuple[str, list[IPv4Interface]]] = []
205    for n in range(max_n + 1):
206        name = f'eth{n}'
207        if name not in present_eths:
208            continue
209        if name in eth_addrs and name not in dynamic_ifaces:
210            ifaces = [IPv4Interface(f"{addr}/{plen}") for addr, plen in eth_addrs[name]]
211            if ifaces:
212                static_eth_ifaces.append((name, ifaces))
213
214    # Distribute routes among static interfaces (existing logic).
215    iface_networks: list[list[IPv4Network]] = [
216        [iface.network for iface in ifaces]
217        for _, ifaces in static_eth_ifaces
218    ]
219    routes_per_iface: list[list[tuple[IPv4Network, IPv4Address]]] = [
220        [] for _ in static_eth_ifaces
221    ]
222    for (net_str, mask), (via, dev, _metric) in raw_routes.items():
223        if not via:
224            continue
225        dest = IPv4Network(f"{net_str}/{mask}")
226        gw = IPv4Address(via)
227        matched = None
228        for idx, nets in enumerate(iface_networks):
229            if any(gw in net for net in nets):
230                matched = idx
231                break
232        if matched is None:
233            matched = 0
234        if routes_per_iface:
235            routes_per_iface[matched].append((dest, gw))
236
237    static_map: dict[str, tuple[list[IPv4Interface], list[tuple[IPv4Network, IPv4Address]]]] = {
238        name: (ifaces, routes_per_iface[idx])
239        for idx, (name, ifaces) in enumerate(static_eth_ifaces)
240    }
241
242    # Assemble final NetConfig list.
243    result: NetConfigEntry = []
244    for n in range(max_n + 1):
245        name = f'eth{n}'
246        if name not in present_eths:
247            result.append(None)
248            continue
249        if name in dynamic_ifaces:
250            result.append('dhcp')
251        elif name in static_map:
252            result.append(static_map[name])
253        else:
254            result.append(None)
255
256    return result
257
258
259def get_persistent_net_config_entry(grade: Grade0, machine_name: str, step: int = 1) -> tuple[NetConfigEntry, int]:
260    """Parse /etc/network/interfaces (+ interfaces.d/) and return (NetConfig, error_count).
261
262    Reconstructs the network configuration that would result from
263    'systemctl networking start' (i.e. ifup -a).
264
265    Entry types in the returned NetConfig:
266      - Tuple (addresses, routes): inet-static stanza for eth<N>.
267      - 'dhcp': inet-dhcp stanza for eth<N>.
268      - None: eth<N> mentioned in an auto/allow-hotplug line but with no stanza.
269
270    Only eth<N> interfaces are included, in index order.
271
272    error_count counts lines that are syntactically wrong or have invalid
273    values (bad IP/network, unknown stanza keywords, malformed iface lines…).
274
275    Parsing rules:
276      - 'address <ip>[/<prefix>]' — primary address; prefix may be absent
277      - 'netmask <mask>'          — applied to primary address when it has no prefix
278      - 'gateway <ip>'            — inserted as a 0.0.0.0/0 route
279      - 'post-up / up ip addr add <addr> dev <iface>' — extra addresses
280      - 'post-up / up ip route add <net> via <gw>'    — static routes
281      - Other post-up/up/down commands are silently ignored
282    """
283    out1, code1 = grade.test(machine_name, 'cat /etc/network/interfaces', step=step, allow_error=True)
284    out2, code2 = grade.test(machine_name, 'cat /etc/network/interfaces.d/*', step=step, allow_error=True)
285
286    lines: list[str] = []
287    include_interfaces_d = False
288    if code1 == 0 and out1:
289        lines.extend(out1.splitlines())
290        for raw_line in out1.splitlines():
291            stripped = raw_line.strip()
292            if not stripped or stripped.startswith('#'):
293                continue
294            parts = stripped.split()
295            if (parts[0] in ('source', 'source-directory')
296                    and len(parts) >= 2
297                    and '/etc/network/interfaces.d' in parts[1]):
298                include_interfaces_d = True
299                break
300    if include_interfaces_d and code2 == 0 and out2:
301        lines.extend(out2.splitlines())
302
303    errors = 0
304
305    # Known stanza-level options that ifup accepts — won't be counted as errors.
306    _KNOWN_OPTS = {
307        'address', 'netmask', 'broadcast', 'network', 'gateway',
308        'metric', 'hwaddress', 'mtu', 'scope',
309        'pre-up', 'up', 'post-up', 'down', 'pre-down', 'post-down',
310        'dns-nameservers', 'dns-search', 'dns-domain',
311        'vlan-raw-device', 'bridge_ports', 'bridge_stp', 'bridge_fd',
312        'bond-master', 'bond-slaves', 'bond-mode', 'bond-miimon',
313        'wpa-ssid', 'wpa-psk', 'wpa-conf',
314    }
315
316    # Accumulate per-interface data keyed by interface name.
317    iface_data: dict[str, dict] = {}
318    dhcp_ifaces: set[str] = set()  # interfaces with 'inet dhcp' stanza
319    auto_ifaces: set[str] = set()  # interfaces seen in auto/allow-hotplug lines
320    current_iface: str | None = None  # name of the active inet-static stanza
321    in_ignored_stanza: bool = False  # True inside loopback / dhcp / manual stanza
322
323    for raw_line in lines:
324        stripped = raw_line.strip()
325        if not stripped or stripped.startswith('#'):
326            continue
327
328        parts = stripped.split()
329        kw = parts[0]
330
331        if kw == 'iface':
332            if len(parts) < 4:
333                errors += 1
334                current_iface = None
335                in_ignored_stanza = False
336                continue
337            iface_name, family, method = parts[1], parts[2], parts[3]
338            if family == 'inet' and method == 'static':
339                current_iface = iface_name
340                in_ignored_stanza = False
341                if iface_name not in iface_data:
342                    iface_data[iface_name] = {
343                        'primary': None, 'has_prefix': False,
344                        'netmask': None, 'extras': [],
345                        'routes': [], 'gateway': None,
346                    }
347            elif family == 'inet' and method == 'dhcp':
348                dhcp_ifaces.add(iface_name)
349                current_iface = None
350                in_ignored_stanza = True
351            else:
352                current_iface = None  # loopback / manual / other — irrelevant
353                in_ignored_stanza = True
354
355        elif kw in ('auto', 'allow-hotplug', 'allow-auto'):
356            for iface_name in parts[1:]:
357                auto_ifaces.add(iface_name)
358
359        elif kw in ('source', 'source-directory', 'mapping',
360                    'no-auto-down', 'no-scripts'):
361            pass  # Valid top-level directives, nothing to record
362
363        else:
364            # Option line within a stanza (indentation not required by ifupdown).
365            # If outside any stanza entirely, count as an error.
366            directive = kw
367
368            if current_iface is None:
369                if not in_ignored_stanza:
370                    errors += 1
371                continue
372
373            data = iface_data[current_iface]
374
375            if directive == 'address':
376                if len(parts) < 2:
377                    errors += 1
378                else:
379                    try:
380                        iface = IPv4Interface(parts[1])
381                        data['primary'] = iface
382                        data['has_prefix'] = '/' in parts[1]
383                    except ValueError:
384                        errors += 1
385
386            elif directive == 'netmask':
387                if len(parts) < 2:
388                    errors += 1
389                else:
390                    data['netmask'] = parts[1]
391
392            elif directive == 'gateway':
393                if len(parts) < 2:
394                    errors += 1
395                else:
396                    try:
397                        data['gateway'] = IPv4Address(parts[1])
398                    except ValueError:
399                        errors += 1
400
401            elif directive in ('post-up', 'up'):
402                cmd = stripped[len(directive):].strip()
403                m = re.match(r'ip\s+route\s+add\s+(\S+)\s+via\s+(\S+)', cmd)
404                if m:
405                    try:
406                        net = IPv4Network(m.group(1), strict=False)
407                        gw = IPv4Address(m.group(2))
408                        data['routes'].append((net, gw))
409                    except ValueError:
410                        errors += 1
411                    continue
412                m = re.match(r'ip\s+addr\s+add\s+(\S+)\s+dev\s+\S+', cmd)
413                if m:
414                    try:
415                        data['extras'].append(IPv4Interface(m.group(1)))
416                    except ValueError:
417                        errors += 1
418                # Other post-up / up commands (iptables, arp…) are not errors
419
420            elif directive not in _KNOWN_OPTS:
421                errors += 1
422
423    # Collect all eth<N> interface names we know about, in index order.
424    all_known = (
425            set(iface_data.keys()) | dhcp_ifaces |
426            {n for n in auto_ifaces if n.startswith('eth') and n[3:].isdigit()}
427    )
428    eth_names = sorted(
429        [n for n in all_known if n.startswith('eth') and n[3:].isdigit()],
430        key=lambda n: int(n[3:])
431    )
432
433    result: NetConfigEntry = []
434    for name in eth_names:
435        if name in dhcp_ifaces:
436            result.append('dhcp')
437            continue
438
439        if name not in iface_data:
440            # Mentioned in auto but no inet stanza — unconfigured.
441            result.append(None)
442            continue
443
444        data = iface_data[name]
445        primary = data['primary']
446        if primary is None:
447            result.append(None)
448            continue
449
450        # Combine bare IP with netmask when no CIDR prefix was given.
451        if not data['has_prefix'] and data['netmask']:
452            try:
453                prefix = IPv4Network(f'0.0.0.0/{data["netmask"]}').prefixlen
454                primary = IPv4Interface(f'{primary.ip}/{prefix}')
455            except ValueError:
456                errors += 1
457
458        addresses = [primary] + data['extras']
459
460        routes = list(data['routes'])
461        if data['gateway']:
462            routes.insert(0, (IPv4Network('0.0.0.0/0'), data['gateway']))
463
464        result.append((addresses, routes))
465
466    return result, errors
467
468
469class _AttrDict(dict):
470    __getattr__ = dict.__getitem__
471
472
473def eval_net_config(grade: Grade0, expected: NetConfigEntry, machine_name: str = None,
474                    current: NetConfigEntry | None = None, step: int = 1) -> _AttrDict:
475    """Compare current and expected NetConfig; return a dict with 10 keys:
476
477    Existing keys (unchanged semantics — 'dhcp' and None entries are skipped):
478    - "ips"                   : number of IP addresses that are correct
479    - "default_route"         : 1 if the default route matches the expected one, 0 otherwise
480    - "other_routes"          : number of correct non-default static routes
481    - "wrong_routes"          : number of non-default static routes present in current but not in expected
482    - "ips_expected"          : number of IP addresses in expected
483    - "default_route_expected": 1 if expected has a default route, 0 otherwise
484    - "other_routes_expected" : number of non-default static routes in expected
485
486    New keys for DHCP/None tracking:
487    - "dhcp_interfaces"         : positions where both expected and current are 'dhcp'
488    - "dhcp_interfaces_expected": count of 'dhcp' entries in expected
489    - "none_interfaces_expected": count of None entries in expected
490
491    If *current* is None, get_net_config() is called first.
492    IP addresses are compared as IPv4Interface strings (address + prefix).
493    Routes are compared as (network, gateway) pairs.
494    """
495    if current is None:
496        if machine_name is None:
497            raise ValueError("eval_net_config: current and machine_name can't be both None")
498        current = get_net_config_entry(grade, machine_name, step=step)
499
500    def _collect_ips(nc: NetConfigEntry) -> set[str]:
501        result = set()
502        for entry in nc:
503            if not isinstance(entry, tuple):
504                continue
505            ifaces, _ = entry
506            for iface in ifaces:
507                result.add(str(iface))
508        return result
509
510    def _collect_routes(nc: NetConfigEntry) -> set[tuple[str, str]]:
511        result = set()
512        for entry in nc:
513            if not isinstance(entry, tuple):
514                continue
515            _, routes = entry
516            for net, gw in routes:
517                if gw is None:
518                    gw_str = ''
519                elif isinstance(gw, IPv4Interface):
520                    gw_str = str(gw.ip)
521                else:
522                    gw_str = str(gw)
523                result.add((str(net), gw_str))
524        return result
525
526    exp_ips = _collect_ips(expected)
527    cur_ips = _collect_ips(current)
528
529    exp_routes = _collect_routes(expected)
530    cur_routes = _collect_routes(current)
531
532    default_key = '0.0.0.0/0'
533    exp_default = {r for r in exp_routes if r[0] == default_key}
534    cur_default = {r for r in cur_routes if r[0] == default_key}
535    exp_non_default = {r for r in exp_routes if r[0] != default_key}
536    cur_non_default = {r for r in cur_routes if r[0] != default_key}
537
538    dhcp_match = sum(
539        1 for e, c in zip(expected, current)
540        if e == 'dhcp' and c == 'dhcp'
541    )
542
543    return _AttrDict(
544        ips=len(exp_ips & cur_ips),
545        default_route=1 if exp_default == cur_default else 0,
546        other_routes=len(exp_non_default & cur_non_default),
547        wrong_routes=len(cur_non_default - exp_non_default),
548        ips_expected=len(exp_ips),
549        default_route_expected=1 if exp_default else 0,
550        other_routes_expected=len(exp_non_default),
551        dhcp_interfaces=dhcp_match,
552        dhcp_interfaces_expected=sum(1 for e in expected if e == 'dhcp'),
553        none_interfaces_expected=sum(1 for e in expected if e is None),
554    )
555
556
557def set_persistent_net_config_entry(net_scheme: NetScheme0, machine_name: str, nc_entry: NetConfigEntry):
558    lines = ['auto lo', 'iface lo inet loopback', '']
559    for i, entry in enumerate(nc_entry):
560        interface = f'eth{i}'
561        if entry is None:
562            continue
563        if entry == 'dhcp':
564            lines += [
565                f'auto {interface}',
566                f'iface {interface} inet dhcp',
567                '',
568            ]
569            continue
570        iface_list, routes = entry
571        lines += [
572            f'auto {interface}',
573            f'iface {interface} inet static',
574            f'    address {iface_list[0]}',
575        ]
576        for extra in iface_list[1:]:
577            lines.append(f'    post-up ip addr add {extra} dev {interface}')
578        for net, via in routes:
579            via_addr = via.ip if isinstance(via, IPv4Interface) else via
580            if net == IPv4Network('0.0.0.0/0'):
581                lines.append(f'    gateway {via_addr}')
582        for net, via in routes:
583            via_addr = via.ip if isinstance(via, IPv4Interface) else via
584            if net != IPv4Network('0.0.0.0/0'):
585                lines.append(f'    post-up ip route add {net.compressed} via {via_addr}')
586        lines.append('')
587    net_scheme.file(machine_name, '/etc/network/interfaces', '\n'.join(lines))
588
589
590def set_net_config_entry(net_scheme: NetScheme0, machine_name: str, nc_entry: NetConfigEntry):
591    for i, entry in enumerate(nc_entry):
592        interface = f'eth{i}'
593        if entry is None:
594            continue
595        if entry == 'dhcp':
596            net_scheme.cmd(machine_name, f'ip link set {interface} up')
597            net_scheme.cmd(machine_name, f'dhclient {interface}')
598            continue
599        iface_list, routes = entry
600        net_scheme.cmd(machine_name, f'ip link set {interface} up')
601        for iface in iface_list:
602            net_scheme.cmd(machine_name, f'ip addr add {iface} dev {interface}')
603        for net, via in routes:
604            via_addr = via.ip if isinstance(via, IPv4Interface) else via
605            net_scheme.cmd(machine_name, f'ip route add {net.compressed} via {str(via_addr)}')
606
607
608def set_persistent_sysctl(net_scheme: NetScheme0, machine_name: str, sysctl_config: SysctlConfig):
609    for name, value in sysctl_config.items():
610        net_scheme.cmd(machine_name, f'echo "{name}={value}" >> /etc/sysctl.conf')
611    if sysctl_config:
612        remount_proc_sys(net_scheme, machine_name)
613        net_scheme.cmd(machine_name, 'sysctl -p')
614
615def remount_proc_sys(net_scheme: NetScheme0, machine_name: str):
616    if not hasattr(net_scheme, "remount_proc_sys_done"):
617        net_scheme.remount_proc_sys_done = {}
618    if net_scheme.remount_proc_sys_done.get(machine_name):
619        return
620    net_scheme.cmd(machine_name, "mount -o rw,remount /proc/sys")
621    net_scheme.remount_proc_sys_done[machine_name] = True
622
623
624def set_sysctl(net_scheme: NetScheme0, machine_name: str, sysctl_config: SysctlConfig):
625    if len(sysctl_config) > 0:
626        remount_proc_sys(net_scheme, machine_name)
627        for name, value in sysctl_config.items():
628            net_scheme.cmd(machine_name, f'sysctl -w {name}={value}')
629
630
631def set_ip_forward(net_scheme: NetScheme0, machine_name: str, ip_forward: bool, step: int = 1):
632    value = 1 if ip_forward else 0
633    remount_proc_sys(net_scheme, machine_name)
634    net_scheme.cmd(machine_name, f'sysctl -w net.ipv4.ip_forward={value}', step=step)
635
636
637def get_ip_forward(grade: Grade0, machine_name: str, step: int = 1) -> bool:
638    output, code = grade.test(machine_name, 'cat /proc/sys/net/ipv4/ip_forward', step=step)
639    if code != 0:
640        return False
641    return output.strip() == '1'
642
643
644def get_sys_parameter_bool(grade: Grade0, machine_name: str, parameter: str, step: int = 1) -> bool | None:
645    """Read a boolean kernel parameter via sysctl and return its value.
646
647    Returns True for '1', False for '0', None if the parameter does not exist or the command fails.
648    """
649    value = get_sys_parameter(grade, machine_name, parameter, step=step)
650    if value is None:
651        return None
652    if value == '1':
653        return True
654    if value == '0':
655        return False
656    return None
657
658
659def get_net_config_from_topology(
660        net_scheme: NetScheme0,
661        topology=None,
662        gateway: str = None,
663        default_route: IPv4Address | IPv4Interface = IPv4Interface("172.17.0.1/24"),
664) -> NetConfig:
665    """Build a NetConfig for every machine in the topology so they can all reach each other.
666
667    Returns {machine_name: NetConfig}.
668
669    Each machine gets:
670    - One interface entry per network it belongs to, with the IP from net_scheme.data.ips.
671    - Static routes to every non-directly-connected network (found via BFS through
672      multi-homed router machines).
673
674    If *gateway* is not None:
675    - Every machine except the gateway gets a default route (0.0.0.0/0) toward the
676      gateway machine via the appropriate next-hop IP.
677    - The gateway itself gets a default route via *default_route* (typically the
678      Docker host bridge, e.g. 172.17.0.1).
679
680    The interface indices (eth0, eth1, …) are computed with the same counter logic as
681    NetScheme0.__init__, so they match the actual Kathara deployment.
682
683    IP names follow the convention set by random_ips_from_topology():
684    - data.ips.m       when machine m is in exactly one network
685    - data.ips.m_netX  when machine m is in multiple networks
686    """
687    if topology is None:
688        topology = net_scheme.get_topology()
689    data = net_scheme.data
690
691    # --- 1. Replicate NetScheme0 eth-index assignment ---
692    # machine_iface: {machine: {net_name: eth_idx}}
693    machine_iface: dict[str, dict[str, int]] = {}
694    _ctr: dict[str, int] = {}
695    for net_name, machines in topology.items():
696        items = list(machines.items()) if isinstance(machines, dict) else [(m, None) for m in machines]
697        for mname, iface_spec in items:
698            iface = iface_spec[0] if isinstance(iface_spec, tuple) else iface_spec
699            if iface is None:
700                iface = _ctr.get(mname, 0)
701            _ctr[mname] = max(_ctr.get(mname, 0), iface) + 1
702            machine_iface.setdefault(mname, {})[net_name] = iface
703
704    machine_nets: dict[str, list[str]] = {m: list(d.keys()) for m, d in machine_iface.items()}
705
706    # --- 2. IP lookup (matches random_ips_from_topology naming) ---
707    def get_ip(machine: str, net_name: str) -> IPv4Interface:
708        if len(machine_nets[machine]) == 1:
709            return getattr(data.ips, machine)
710        return getattr(data.ips, f"{machine}_{net_name}")
711
712    # --- 3. Network routing graph ---
713    # Multi-homed machines act as routers; each pair of their networks is an edge.
714    net_graph: dict[str, list[tuple[str, str]]] = {n: [] for n in topology}
715    for mname, nets in machine_nets.items():
716        for i, n1 in enumerate(nets):
717            for n2 in nets[i + 1:]:
718                net_graph[n1].append((n2, mname))
719                net_graph[n2].append((n1, mname))
720
721    # --- 4. BFS: find (via_start_net, first_router) from start_nets to a target ---
722    def _bfs(start_nets: set[str], is_target) -> tuple[str, str] | None:
723        visited = set(start_nets)
724        q: deque = deque()
725        for n in start_nets:
726            for adj, router in net_graph.get(n, []):
727                if adj not in visited:
728                    visited.add(adj)
729                    q.append((adj, n, router))
730        while q:
731            cur, via, router = q.popleft()
732            if is_target(cur):
733                return (via, router)
734            for adj, r in net_graph.get(cur, []):
735                if adj not in visited:
736                    visited.add(adj)
737                    q.append((adj, via, router))
738        return None
739
740    def first_hop_to_net(start_nets: set[str], target_net: str) -> tuple[str, str] | None:
741        """Return (via_net, router) for the first hop to reach target_net, or None if direct."""
742        if target_net in start_nets:
743            return None
744        return _bfs(start_nets, lambda n: n == target_net)
745
746    def first_hop_to_machine(start_nets: set[str], target: str) -> tuple[str, str] | None:
747        """Return (via_net, router) for the first hop toward target machine.
748        When directly connected, router is the target machine itself."""
749        target_nets = set(machine_nets.get(target, []))
750        shared = start_nets & target_nets
751        if shared:
752            return (next(iter(shared)), target)
753        return _bfs(start_nets, lambda n: n in target_nets)
754
755    # --- 5. default_route as bare IPv4Address ---
756    dr_ip: IPv4Address = default_route.ip if isinstance(default_route, IPv4Interface) else default_route
757
758    # --- 6. Assemble NetConfig per machine ---
759    result: dict[str, NetConfigEntry] = {}
760
761    for machine, net_to_eth in machine_iface.items():
762        start_nets = set(machine_nets[machine])
763
764        # Collect all candidate routes as (net, eth_idx, next_hop).
765        route_triples: list[tuple[IPv4Network, int, IPv4Address]] = []
766
767        # Specific routes to every non-directly-connected network
768        for target_net in topology:
769            if target_net in start_nets:
770                continue
771            hop = first_hop_to_net(start_nets, target_net)
772            if hop is None:
773                continue
774            via_net, router = hop
775            route_triples.append((
776                getattr(data.nets, target_net),
777                net_to_eth[via_net],
778                get_ip(router, via_net).ip,
779            ))
780
781        # Default route
782        if gateway is not None:
783            default_net = IPv4Network('0.0.0.0/0')
784            if machine == gateway:
785                # External default route on the lowest-index interface
786                route_triples.append((default_net, min(net_to_eth.values()), dr_ip))
787            else:
788                hop = first_hop_to_machine(start_nets, gateway)
789                if hop is not None:
790                    via_net, router = hop
791                    route_triples.append((
792                        default_net,
793                        net_to_eth[via_net],
794                        get_ip(router, via_net).ip,
795                    ))
796
797        # Drop redundant routes: a route (net, hop) is redundant when another route
798        # (net2, hop2) exists with the same next-hop and net2 is a strict supernet of net
799        # (i.e. net2 covers net entirely — the less-specific route already handles it).
800        def _is_redundant(net: IPv4Network, hop: IPv4Address) -> bool:
801            return any(
802                net2 != net and net.subnet_of(net2) and hop == hop2
803                for net2, _, hop2 in route_triples
804            )
805
806        eth_routes: dict[int, list] = {eth: [] for eth in net_to_eth.values()}
807        for net, eth, hop in route_triples:
808            if not _is_redundant(net, hop):
809                eth_routes[eth].append((net, hop))
810
811        # Build the NetConfig list indexed by eth number
812        max_eth = max(net_to_eth.values())
813        nc: NetConfigEntry = [None] * (max_eth + 1)
814        for net_name, eth_idx in net_to_eth.items():
815            nc[eth_idx] = ([get_ip(machine, net_name)], eth_routes[eth_idx])
816        result[machine] = nc
817
818    return result
819
820
821def get_sys_parameter(grade: Grade0, machine_name: str, parameter: str, step: int = 1) -> str | None:
822    """Read a kernel parameter via sysctl and return its value as a string.
823
824    Returns None if the parameter does not exist or the command fails.
825    """
826    output, code = grade.test(machine_name, f'sysctl -n {parameter}', step=step)
827    if code != 0:
828        return None
829    return output.strip() or None
NetConfigInterface: TypeAlias = Union[Tuple[List[ipaddress.IPv4Interface], List[Tuple[ipaddress.IPv4Network, ipaddress.IPv4Address | ipaddress.IPv4Interface]]], Literal['dhcp'], NoneType]
NetConfigEntry: TypeAlias = List[Union[Tuple[List[ipaddress.IPv4Interface], List[Tuple[ipaddress.IPv4Network, ipaddress.IPv4Address | ipaddress.IPv4Interface]]], Literal['dhcp'], NoneType]]
NetConfig: TypeAlias = Dict[str, Union[Tuple[List[ipaddress.IPv4Interface], List[Tuple[ipaddress.IPv4Network, ipaddress.IPv4Address | ipaddress.IPv4Interface]]], Literal['dhcp'], NoneType]]
SysctlConfig = typing.Dict[str, typing.Any]
def get_ip_addresses( grade: SRE.lib_sre.Grade0, machine_name: str, step: int = 1) -> Dict[str, List[Tuple[str, int]]]:
25def get_ip_addresses(grade: Grade0, machine_name: str, step: int = 1) -> Dict[str, List[Tuple[str, int]]]:
26    """Run 'ip a' on machine_name and parse the output.
27
28    Returns a dict mapping interface name -> list of (address, prefix_len).
29
30    Addresses for each interface are sorted by:
31      1. prefix length descending
32      2. IP address lexicographically ascending
33
34    Edge cases:
35      - virtual interfaces (e.g. eth0@if5) -> name stripped to 'eth0'
36      - non-zero exit code or empty output  -> {}
37    """
38    output, code = grade.test(machine_name, 'ip a', step=step)
39    if code != 0 or not output:
40        return {}
41    result: Dict[str, List[Tuple[str, int]]] = {}
42    current_iface = None
43    for line in output.splitlines():
44        m = re.match(r'^\d+:\s+(\S+?)(?:@\S+)?:', line)
45        if m:
46            current_iface = m.group(1)
47            result.setdefault(current_iface, [])
48            continue
49        m = re.match(r'^\s+inet\s+(\d+\.\d+\.\d+\.\d+)/(\d+)', line)
50        if m and current_iface is not None:
51            result[current_iface].append((m.group(1), int(m.group(2))))
52    for iface in result:
53        result[iface].sort(key=lambda x: (-x[1], x[0]))
54    return result

Run 'ip a' on machine_name and parse the output.

Returns a dict mapping interface name -> list of (address, prefix_len).

Addresses for each interface are sorted by:
  1. prefix length descending
  2. IP address lexicographically ascending
Edge cases:
  • virtual interfaces (e.g. eth0@if5) -> name stripped to 'eth0'
  • non-zero exit code or empty output -> {}
def get_routes( grade: SRE.lib_sre.Grade0, machine_name: str, step: int = 1) -> Dict[Tuple[str, int], Tuple[str, str, int]]:
 57def get_routes(grade: Grade0, machine_name: str, step: int = 1) -> Dict[Tuple[str, int], Tuple[str, str, int]]:
 58    """Run 'ip route' on machine_name and parse the output.
 59
 60    Returns a dict mapping (network, mask) -> (via, dev, metric).
 61
 62    Edge cases:
 63      - 'default' route              -> key ('0.0.0.0', 0)
 64      - host route without mask      -> mask 32
 65      - direct route (no via)        -> via ''
 66      - no explicit metric           -> metric 0
 67      - non-zero exit code or empty output -> {}
 68    """
 69    output, code = grade.test(machine_name, 'ip route', step=step)
 70    if code != 0 or not output:
 71        return {}
 72    result: Dict[Tuple[str, int], Tuple[str, str, int]] = {}
 73    for line in output.splitlines():
 74        parts = line.split()
 75        if not parts:
 76            continue
 77        dest = parts[0]
 78        if dest == 'default':
 79            net, mask = '0.0.0.0', 0
 80        elif '/' in dest:
 81            net, mask_str = dest.split('/')
 82            mask = int(mask_str)
 83        else:
 84            net, mask = dest, 32
 85        via, dev, metric = '', '', 0
 86        i = 1
 87        while i < len(parts):
 88            if parts[i] == 'via' and i + 1 < len(parts):
 89                via = parts[i + 1]
 90                i += 2
 91            elif parts[i] == 'dev' and i + 1 < len(parts):
 92                dev = parts[i + 1]
 93                i += 2
 94            elif parts[i] == 'metric' and i + 1 < len(parts):
 95                metric = int(parts[i + 1])
 96                i += 2
 97            else:
 98                i += 1
 99        result[(net, mask)] = (via, dev, metric)
100    return result

Run 'ip route' on machine_name and parse the output.

Returns a dict mapping (network, mask) -> (via, dev, metric).

Edge cases:
  • 'default' route -> key ('0.0.0.0', 0)
  • host route without mask -> mask 32
  • direct route (no via) -> via ''
  • no explicit metric -> metric 0
  • non-zero exit code or empty output -> {}
def get_sysctl_conf( grade: SRE.lib_sre.Grade0, machine_name: str, step: int = 1) -> Dict[str, str]:
103def get_sysctl_conf(grade: Grade0, machine_name: str, step: int = 1) -> Dict[str, str]:
104    """Read /etc/sysctl.conf and all files under /etc/sysctl.d/ and parse kernel parameters.
105
106    Returns a dict mapping parameter name -> value (both as strings).
107
108    Parsing rules:
109      - lines starting with '#' or ';' are comments and ignored
110      - blank lines are ignored
111      - accepted formats: 'key = value' and 'key=value'
112      - later definitions override earlier ones (sysctl.d files override sysctl.conf)
113
114    Edge cases:
115      - non-zero exit code from any grade.test() -> {} immediately
116      - first-pass empty output                  -> {} and continues to next call
117    """
118    result: Dict[str, str] = {}
119
120    def _parse(text: str) -> None:
121        for line in text.splitlines():
122            line = line.strip()
123            if not line or line[0] in ('#', ';'):
124                continue
125            if '=' in line:
126                key, _, value = line.partition('=')
127                result[key.strip()] = value.strip()
128
129    output, code = grade.test(machine_name, 'cat /etc/sysctl.conf', step=step)
130    if code != 0:
131        return {}
132    _parse(output)
133
134    output2, code = grade.test(machine_name, 'cat /etc/sysctl.d/*', step=step)
135    if code != 0:
136        return {}
137    _parse(output2)
138
139    return result

Read /etc/sysctl.conf and all files under /etc/sysctl.d/ and parse kernel parameters.

Returns a dict mapping parameter name -> value (both as strings).

Parsing rules:
  • lines starting with '#' or ';' are comments and ignored
  • blank lines are ignored
  • accepted formats: 'key = value' and 'key=value'
  • later definitions override earlier ones (sysctl.d files override sysctl.conf)
Edge cases:
  • non-zero exit code from any grade.test() -> {} immediately
  • first-pass empty output -> {} and continues to next call
def get_net_config_entry( grade: SRE.lib_sre.Grade0, machine_name: str, step: int = 1) -> List[Union[Tuple[List[ipaddress.IPv4Interface], List[Tuple[ipaddress.IPv4Network, ipaddress.IPv4Address | ipaddress.IPv4Interface]]], Literal['dhcp'], NoneType]]:
142def get_net_config_entry(grade: Grade0, machine_name: str, step: int = 1) -> NetConfigEntry:
143    """Run 'ip a', 'ip route', and 'ip link show' on machine_name and return a NetConfig.
144
145    Each entry in the returned list corresponds to one interface (eth0, eth1, …)
146    in index order.
147
148    Entry types:
149      - Tuple (addresses, routes): statically configured interface.
150      - 'dhcp': interface whose address(es) were all assigned dynamically (DHCP).
151      - None: interface present in the kernel but carrying no IP address.
152
153    The list covers eth0 … ethN where N is the highest eth index reported by
154    'ip link show'.  Interfaces beyond the highest present index are not included.
155
156    Edge cases:
157      - non-zero exit code or empty output from any command -> []
158      - interfaces whose name does not match eth\\d+  -> ignored
159      - routes with an unknown gateway -> attached to the first interface, or
160        dropped if there are no interfaces
161    """
162    raw_addrs = get_ip_addresses(grade, machine_name, step=step)
163    raw_routes = get_routes(grade, machine_name, step=step)
164
165    # Re-use the cached 'ip a' output to detect dynamic (DHCP) addresses.
166    ip_a_out, _ = grade.test(machine_name, 'ip a', step=step)
167
168    # Parse which interfaces have *all* their inet addresses marked dynamic.
169    dynamic_ifaces: set[str] = set()
170    current_iface: str | None = None
171    iface_addr_flags: dict[str, list[bool]] = {}  # iface -> [is_dynamic, ...]
172    for line in (ip_a_out or '').splitlines():
173        m = re.match(r'^\d+:\s+(\S+?)(?:@\S+)?:', line)
174        if m:
175            current_iface = m.group(1)
176            iface_addr_flags.setdefault(current_iface, [])
177            continue
178        m = re.match(r'^\s+inet\s+\S+', line)
179        if m and current_iface is not None:
180            iface_addr_flags.setdefault(current_iface, []).append('dynamic' in line)
181    for iface, flags in iface_addr_flags.items():
182        if flags and all(flags):
183            dynamic_ifaces.add(iface)
184
185    # Discover all present eth<N> interfaces via 'ip link show'.
186    ip_link_out, link_code = grade.test(machine_name, 'ip link show', step=step)
187    if link_code != 0 or not ip_link_out:
188        return []
189    present_eths: set[str] = set(
190        re.findall(r'^\d+:\s+(eth\d+)(?:@\S+)?:', ip_link_out, re.MULTILINE)
191    )
192    if not present_eths:
193        return []
194
195    max_n = max(int(name[3:]) for name in present_eths)
196
197    # Build interface → addresses lookup from raw_addrs (only non-empty entries).
198    eth_addrs: dict[str, list[tuple[str, int]]] = {
199        name: addrs
200        for name, addrs in raw_addrs.items()
201        if name.startswith('eth') and name[3:].isdigit() and addrs
202    }
203
204    # Collect static eth interfaces in order (for route distribution).
205    static_eth_ifaces: list[tuple[str, list[IPv4Interface]]] = []
206    for n in range(max_n + 1):
207        name = f'eth{n}'
208        if name not in present_eths:
209            continue
210        if name in eth_addrs and name not in dynamic_ifaces:
211            ifaces = [IPv4Interface(f"{addr}/{plen}") for addr, plen in eth_addrs[name]]
212            if ifaces:
213                static_eth_ifaces.append((name, ifaces))
214
215    # Distribute routes among static interfaces (existing logic).
216    iface_networks: list[list[IPv4Network]] = [
217        [iface.network for iface in ifaces]
218        for _, ifaces in static_eth_ifaces
219    ]
220    routes_per_iface: list[list[tuple[IPv4Network, IPv4Address]]] = [
221        [] for _ in static_eth_ifaces
222    ]
223    for (net_str, mask), (via, dev, _metric) in raw_routes.items():
224        if not via:
225            continue
226        dest = IPv4Network(f"{net_str}/{mask}")
227        gw = IPv4Address(via)
228        matched = None
229        for idx, nets in enumerate(iface_networks):
230            if any(gw in net for net in nets):
231                matched = idx
232                break
233        if matched is None:
234            matched = 0
235        if routes_per_iface:
236            routes_per_iface[matched].append((dest, gw))
237
238    static_map: dict[str, tuple[list[IPv4Interface], list[tuple[IPv4Network, IPv4Address]]]] = {
239        name: (ifaces, routes_per_iface[idx])
240        for idx, (name, ifaces) in enumerate(static_eth_ifaces)
241    }
242
243    # Assemble final NetConfig list.
244    result: NetConfigEntry = []
245    for n in range(max_n + 1):
246        name = f'eth{n}'
247        if name not in present_eths:
248            result.append(None)
249            continue
250        if name in dynamic_ifaces:
251            result.append('dhcp')
252        elif name in static_map:
253            result.append(static_map[name])
254        else:
255            result.append(None)
256
257    return result

Run 'ip a', 'ip route', and 'ip link show' on machine_name and return a NetConfig.

Each entry in the returned list corresponds to one interface (eth0, eth1, …) in index order.

Entry types:
  • Tuple (addresses, routes): statically configured interface.
  • 'dhcp': interface whose address(es) were all assigned dynamically (DHCP).
  • None: interface present in the kernel but carrying no IP address.

The list covers eth0 … ethN where N is the highest eth index reported by 'ip link show'. Interfaces beyond the highest present index are not included.

Edge cases:
  • non-zero exit code or empty output from any command -> []
  • interfaces whose name does not match eth\d+ -> ignored
  • routes with an unknown gateway -> attached to the first interface, or dropped if there are no interfaces
def get_persistent_net_config_entry( grade: SRE.lib_sre.Grade0, machine_name: str, step: int = 1) -> tuple[typing.List[typing.Union[typing.Tuple[typing.List[ipaddress.IPv4Interface], typing.List[typing.Tuple[ipaddress.IPv4Network, ipaddress.IPv4Address | ipaddress.IPv4Interface]]], typing.Literal['dhcp'], NoneType]], int]:
260def get_persistent_net_config_entry(grade: Grade0, machine_name: str, step: int = 1) -> tuple[NetConfigEntry, int]:
261    """Parse /etc/network/interfaces (+ interfaces.d/) and return (NetConfig, error_count).
262
263    Reconstructs the network configuration that would result from
264    'systemctl networking start' (i.e. ifup -a).
265
266    Entry types in the returned NetConfig:
267      - Tuple (addresses, routes): inet-static stanza for eth<N>.
268      - 'dhcp': inet-dhcp stanza for eth<N>.
269      - None: eth<N> mentioned in an auto/allow-hotplug line but with no stanza.
270
271    Only eth<N> interfaces are included, in index order.
272
273    error_count counts lines that are syntactically wrong or have invalid
274    values (bad IP/network, unknown stanza keywords, malformed iface lines…).
275
276    Parsing rules:
277      - 'address <ip>[/<prefix>]' — primary address; prefix may be absent
278      - 'netmask <mask>'          — applied to primary address when it has no prefix
279      - 'gateway <ip>'            — inserted as a 0.0.0.0/0 route
280      - 'post-up / up ip addr add <addr> dev <iface>' — extra addresses
281      - 'post-up / up ip route add <net> via <gw>'    — static routes
282      - Other post-up/up/down commands are silently ignored
283    """
284    out1, code1 = grade.test(machine_name, 'cat /etc/network/interfaces', step=step, allow_error=True)
285    out2, code2 = grade.test(machine_name, 'cat /etc/network/interfaces.d/*', step=step, allow_error=True)
286
287    lines: list[str] = []
288    include_interfaces_d = False
289    if code1 == 0 and out1:
290        lines.extend(out1.splitlines())
291        for raw_line in out1.splitlines():
292            stripped = raw_line.strip()
293            if not stripped or stripped.startswith('#'):
294                continue
295            parts = stripped.split()
296            if (parts[0] in ('source', 'source-directory')
297                    and len(parts) >= 2
298                    and '/etc/network/interfaces.d' in parts[1]):
299                include_interfaces_d = True
300                break
301    if include_interfaces_d and code2 == 0 and out2:
302        lines.extend(out2.splitlines())
303
304    errors = 0
305
306    # Known stanza-level options that ifup accepts — won't be counted as errors.
307    _KNOWN_OPTS = {
308        'address', 'netmask', 'broadcast', 'network', 'gateway',
309        'metric', 'hwaddress', 'mtu', 'scope',
310        'pre-up', 'up', 'post-up', 'down', 'pre-down', 'post-down',
311        'dns-nameservers', 'dns-search', 'dns-domain',
312        'vlan-raw-device', 'bridge_ports', 'bridge_stp', 'bridge_fd',
313        'bond-master', 'bond-slaves', 'bond-mode', 'bond-miimon',
314        'wpa-ssid', 'wpa-psk', 'wpa-conf',
315    }
316
317    # Accumulate per-interface data keyed by interface name.
318    iface_data: dict[str, dict] = {}
319    dhcp_ifaces: set[str] = set()  # interfaces with 'inet dhcp' stanza
320    auto_ifaces: set[str] = set()  # interfaces seen in auto/allow-hotplug lines
321    current_iface: str | None = None  # name of the active inet-static stanza
322    in_ignored_stanza: bool = False  # True inside loopback / dhcp / manual stanza
323
324    for raw_line in lines:
325        stripped = raw_line.strip()
326        if not stripped or stripped.startswith('#'):
327            continue
328
329        parts = stripped.split()
330        kw = parts[0]
331
332        if kw == 'iface':
333            if len(parts) < 4:
334                errors += 1
335                current_iface = None
336                in_ignored_stanza = False
337                continue
338            iface_name, family, method = parts[1], parts[2], parts[3]
339            if family == 'inet' and method == 'static':
340                current_iface = iface_name
341                in_ignored_stanza = False
342                if iface_name not in iface_data:
343                    iface_data[iface_name] = {
344                        'primary': None, 'has_prefix': False,
345                        'netmask': None, 'extras': [],
346                        'routes': [], 'gateway': None,
347                    }
348            elif family == 'inet' and method == 'dhcp':
349                dhcp_ifaces.add(iface_name)
350                current_iface = None
351                in_ignored_stanza = True
352            else:
353                current_iface = None  # loopback / manual / other — irrelevant
354                in_ignored_stanza = True
355
356        elif kw in ('auto', 'allow-hotplug', 'allow-auto'):
357            for iface_name in parts[1:]:
358                auto_ifaces.add(iface_name)
359
360        elif kw in ('source', 'source-directory', 'mapping',
361                    'no-auto-down', 'no-scripts'):
362            pass  # Valid top-level directives, nothing to record
363
364        else:
365            # Option line within a stanza (indentation not required by ifupdown).
366            # If outside any stanza entirely, count as an error.
367            directive = kw
368
369            if current_iface is None:
370                if not in_ignored_stanza:
371                    errors += 1
372                continue
373
374            data = iface_data[current_iface]
375
376            if directive == 'address':
377                if len(parts) < 2:
378                    errors += 1
379                else:
380                    try:
381                        iface = IPv4Interface(parts[1])
382                        data['primary'] = iface
383                        data['has_prefix'] = '/' in parts[1]
384                    except ValueError:
385                        errors += 1
386
387            elif directive == 'netmask':
388                if len(parts) < 2:
389                    errors += 1
390                else:
391                    data['netmask'] = parts[1]
392
393            elif directive == 'gateway':
394                if len(parts) < 2:
395                    errors += 1
396                else:
397                    try:
398                        data['gateway'] = IPv4Address(parts[1])
399                    except ValueError:
400                        errors += 1
401
402            elif directive in ('post-up', 'up'):
403                cmd = stripped[len(directive):].strip()
404                m = re.match(r'ip\s+route\s+add\s+(\S+)\s+via\s+(\S+)', cmd)
405                if m:
406                    try:
407                        net = IPv4Network(m.group(1), strict=False)
408                        gw = IPv4Address(m.group(2))
409                        data['routes'].append((net, gw))
410                    except ValueError:
411                        errors += 1
412                    continue
413                m = re.match(r'ip\s+addr\s+add\s+(\S+)\s+dev\s+\S+', cmd)
414                if m:
415                    try:
416                        data['extras'].append(IPv4Interface(m.group(1)))
417                    except ValueError:
418                        errors += 1
419                # Other post-up / up commands (iptables, arp…) are not errors
420
421            elif directive not in _KNOWN_OPTS:
422                errors += 1
423
424    # Collect all eth<N> interface names we know about, in index order.
425    all_known = (
426            set(iface_data.keys()) | dhcp_ifaces |
427            {n for n in auto_ifaces if n.startswith('eth') and n[3:].isdigit()}
428    )
429    eth_names = sorted(
430        [n for n in all_known if n.startswith('eth') and n[3:].isdigit()],
431        key=lambda n: int(n[3:])
432    )
433
434    result: NetConfigEntry = []
435    for name in eth_names:
436        if name in dhcp_ifaces:
437            result.append('dhcp')
438            continue
439
440        if name not in iface_data:
441            # Mentioned in auto but no inet stanza — unconfigured.
442            result.append(None)
443            continue
444
445        data = iface_data[name]
446        primary = data['primary']
447        if primary is None:
448            result.append(None)
449            continue
450
451        # Combine bare IP with netmask when no CIDR prefix was given.
452        if not data['has_prefix'] and data['netmask']:
453            try:
454                prefix = IPv4Network(f'0.0.0.0/{data["netmask"]}').prefixlen
455                primary = IPv4Interface(f'{primary.ip}/{prefix}')
456            except ValueError:
457                errors += 1
458
459        addresses = [primary] + data['extras']
460
461        routes = list(data['routes'])
462        if data['gateway']:
463            routes.insert(0, (IPv4Network('0.0.0.0/0'), data['gateway']))
464
465        result.append((addresses, routes))
466
467    return result, errors

Parse /etc/network/interfaces (+ interfaces.d/) and return (NetConfig, error_count).

Reconstructs the network configuration that would result from 'systemctl networking start' (i.e. ifup -a).

Entry types in the returned NetConfig:
  • Tuple (addresses, routes): inet-static stanza for eth.
  • 'dhcp': inet-dhcp stanza for eth.
  • None: eth mentioned in an auto/allow-hotplug line but with no stanza.

Only eth interfaces are included, in index order.

error_count counts lines that are syntactically wrong or have invalid values (bad IP/network, unknown stanza keywords, malformed iface lines…).

Parsing rules:
  • 'address [/]' — primary address; prefix may be absent
  • 'netmask ' — applied to primary address when it has no prefix
  • 'gateway ' — inserted as a 0.0.0.0/0 route
  • 'post-up / up ip addr add dev ' — extra addresses
  • 'post-up / up ip route add via ' — static routes
  • Other post-up/up/down commands are silently ignored
def eval_net_config( grade: SRE.lib_sre.Grade0, expected: List[Union[Tuple[List[ipaddress.IPv4Interface], List[Tuple[ipaddress.IPv4Network, ipaddress.IPv4Address | ipaddress.IPv4Interface]]], Literal['dhcp'], NoneType]], machine_name: str = None, current: Optional[List[Union[Tuple[List[ipaddress.IPv4Interface], List[Tuple[ipaddress.IPv4Network, ipaddress.IPv4Address | ipaddress.IPv4Interface]]], Literal['dhcp'], NoneType]]] = None, step: int = 1) -> lib.net_config._AttrDict:
474def eval_net_config(grade: Grade0, expected: NetConfigEntry, machine_name: str = None,
475                    current: NetConfigEntry | None = None, step: int = 1) -> _AttrDict:
476    """Compare current and expected NetConfig; return a dict with 10 keys:
477
478    Existing keys (unchanged semantics — 'dhcp' and None entries are skipped):
479    - "ips"                   : number of IP addresses that are correct
480    - "default_route"         : 1 if the default route matches the expected one, 0 otherwise
481    - "other_routes"          : number of correct non-default static routes
482    - "wrong_routes"          : number of non-default static routes present in current but not in expected
483    - "ips_expected"          : number of IP addresses in expected
484    - "default_route_expected": 1 if expected has a default route, 0 otherwise
485    - "other_routes_expected" : number of non-default static routes in expected
486
487    New keys for DHCP/None tracking:
488    - "dhcp_interfaces"         : positions where both expected and current are 'dhcp'
489    - "dhcp_interfaces_expected": count of 'dhcp' entries in expected
490    - "none_interfaces_expected": count of None entries in expected
491
492    If *current* is None, get_net_config() is called first.
493    IP addresses are compared as IPv4Interface strings (address + prefix).
494    Routes are compared as (network, gateway) pairs.
495    """
496    if current is None:
497        if machine_name is None:
498            raise ValueError("eval_net_config: current and machine_name can't be both None")
499        current = get_net_config_entry(grade, machine_name, step=step)
500
501    def _collect_ips(nc: NetConfigEntry) -> set[str]:
502        result = set()
503        for entry in nc:
504            if not isinstance(entry, tuple):
505                continue
506            ifaces, _ = entry
507            for iface in ifaces:
508                result.add(str(iface))
509        return result
510
511    def _collect_routes(nc: NetConfigEntry) -> set[tuple[str, str]]:
512        result = set()
513        for entry in nc:
514            if not isinstance(entry, tuple):
515                continue
516            _, routes = entry
517            for net, gw in routes:
518                if gw is None:
519                    gw_str = ''
520                elif isinstance(gw, IPv4Interface):
521                    gw_str = str(gw.ip)
522                else:
523                    gw_str = str(gw)
524                result.add((str(net), gw_str))
525        return result
526
527    exp_ips = _collect_ips(expected)
528    cur_ips = _collect_ips(current)
529
530    exp_routes = _collect_routes(expected)
531    cur_routes = _collect_routes(current)
532
533    default_key = '0.0.0.0/0'
534    exp_default = {r for r in exp_routes if r[0] == default_key}
535    cur_default = {r for r in cur_routes if r[0] == default_key}
536    exp_non_default = {r for r in exp_routes if r[0] != default_key}
537    cur_non_default = {r for r in cur_routes if r[0] != default_key}
538
539    dhcp_match = sum(
540        1 for e, c in zip(expected, current)
541        if e == 'dhcp' and c == 'dhcp'
542    )
543
544    return _AttrDict(
545        ips=len(exp_ips & cur_ips),
546        default_route=1 if exp_default == cur_default else 0,
547        other_routes=len(exp_non_default & cur_non_default),
548        wrong_routes=len(cur_non_default - exp_non_default),
549        ips_expected=len(exp_ips),
550        default_route_expected=1 if exp_default else 0,
551        other_routes_expected=len(exp_non_default),
552        dhcp_interfaces=dhcp_match,
553        dhcp_interfaces_expected=sum(1 for e in expected if e == 'dhcp'),
554        none_interfaces_expected=sum(1 for e in expected if e is None),
555    )

Compare current and expected NetConfig; return a dict with 10 keys:

Existing keys (unchanged semantics — 'dhcp' and None entries are skipped):

  • "ips" : number of IP addresses that are correct
  • "default_route" : 1 if the default route matches the expected one, 0 otherwise
  • "other_routes" : number of correct non-default static routes
  • "wrong_routes" : number of non-default static routes present in current but not in expected
  • "ips_expected" : number of IP addresses in expected
  • "default_route_expected": 1 if expected has a default route, 0 otherwise
  • "other_routes_expected" : number of non-default static routes in expected

New keys for DHCP/None tracking:

  • "dhcp_interfaces" : positions where both expected and current are 'dhcp'
  • "dhcp_interfaces_expected": count of 'dhcp' entries in expected
  • "none_interfaces_expected": count of None entries in expected

If current is None, get_net_config() is called first. IP addresses are compared as IPv4Interface strings (address + prefix). Routes are compared as (network, gateway) pairs.

def set_persistent_net_config_entry( net_scheme: SRE.lib_sre.NetScheme0, machine_name: str, nc_entry: List[Union[Tuple[List[ipaddress.IPv4Interface], List[Tuple[ipaddress.IPv4Network, ipaddress.IPv4Address | ipaddress.IPv4Interface]]], Literal['dhcp'], NoneType]]):
558def set_persistent_net_config_entry(net_scheme: NetScheme0, machine_name: str, nc_entry: NetConfigEntry):
559    lines = ['auto lo', 'iface lo inet loopback', '']
560    for i, entry in enumerate(nc_entry):
561        interface = f'eth{i}'
562        if entry is None:
563            continue
564        if entry == 'dhcp':
565            lines += [
566                f'auto {interface}',
567                f'iface {interface} inet dhcp',
568                '',
569            ]
570            continue
571        iface_list, routes = entry
572        lines += [
573            f'auto {interface}',
574            f'iface {interface} inet static',
575            f'    address {iface_list[0]}',
576        ]
577        for extra in iface_list[1:]:
578            lines.append(f'    post-up ip addr add {extra} dev {interface}')
579        for net, via in routes:
580            via_addr = via.ip if isinstance(via, IPv4Interface) else via
581            if net == IPv4Network('0.0.0.0/0'):
582                lines.append(f'    gateway {via_addr}')
583        for net, via in routes:
584            via_addr = via.ip if isinstance(via, IPv4Interface) else via
585            if net != IPv4Network('0.0.0.0/0'):
586                lines.append(f'    post-up ip route add {net.compressed} via {via_addr}')
587        lines.append('')
588    net_scheme.file(machine_name, '/etc/network/interfaces', '\n'.join(lines))
def set_net_config_entry( net_scheme: SRE.lib_sre.NetScheme0, machine_name: str, nc_entry: List[Union[Tuple[List[ipaddress.IPv4Interface], List[Tuple[ipaddress.IPv4Network, ipaddress.IPv4Address | ipaddress.IPv4Interface]]], Literal['dhcp'], NoneType]]):
591def set_net_config_entry(net_scheme: NetScheme0, machine_name: str, nc_entry: NetConfigEntry):
592    for i, entry in enumerate(nc_entry):
593        interface = f'eth{i}'
594        if entry is None:
595            continue
596        if entry == 'dhcp':
597            net_scheme.cmd(machine_name, f'ip link set {interface} up')
598            net_scheme.cmd(machine_name, f'dhclient {interface}')
599            continue
600        iface_list, routes = entry
601        net_scheme.cmd(machine_name, f'ip link set {interface} up')
602        for iface in iface_list:
603            net_scheme.cmd(machine_name, f'ip addr add {iface} dev {interface}')
604        for net, via in routes:
605            via_addr = via.ip if isinstance(via, IPv4Interface) else via
606            net_scheme.cmd(machine_name, f'ip route add {net.compressed} via {str(via_addr)}')
def set_persistent_sysctl( net_scheme: SRE.lib_sre.NetScheme0, machine_name: str, sysctl_config: Dict[str, Any]):
609def set_persistent_sysctl(net_scheme: NetScheme0, machine_name: str, sysctl_config: SysctlConfig):
610    for name, value in sysctl_config.items():
611        net_scheme.cmd(machine_name, f'echo "{name}={value}" >> /etc/sysctl.conf')
612    if sysctl_config:
613        remount_proc_sys(net_scheme, machine_name)
614        net_scheme.cmd(machine_name, 'sysctl -p')
def remount_proc_sys(net_scheme: SRE.lib_sre.NetScheme0, machine_name: str):
616def remount_proc_sys(net_scheme: NetScheme0, machine_name: str):
617    if not hasattr(net_scheme, "remount_proc_sys_done"):
618        net_scheme.remount_proc_sys_done = {}
619    if net_scheme.remount_proc_sys_done.get(machine_name):
620        return
621    net_scheme.cmd(machine_name, "mount -o rw,remount /proc/sys")
622    net_scheme.remount_proc_sys_done[machine_name] = True
def set_sysctl( net_scheme: SRE.lib_sre.NetScheme0, machine_name: str, sysctl_config: Dict[str, Any]):
625def set_sysctl(net_scheme: NetScheme0, machine_name: str, sysctl_config: SysctlConfig):
626    if len(sysctl_config) > 0:
627        remount_proc_sys(net_scheme, machine_name)
628        for name, value in sysctl_config.items():
629            net_scheme.cmd(machine_name, f'sysctl -w {name}={value}')
def set_ip_forward( net_scheme: SRE.lib_sre.NetScheme0, machine_name: str, ip_forward: bool, step: int = 1):
632def set_ip_forward(net_scheme: NetScheme0, machine_name: str, ip_forward: bool, step: int = 1):
633    value = 1 if ip_forward else 0
634    remount_proc_sys(net_scheme, machine_name)
635    net_scheme.cmd(machine_name, f'sysctl -w net.ipv4.ip_forward={value}', step=step)
def get_ip_forward(grade: SRE.lib_sre.Grade0, machine_name: str, step: int = 1) -> bool:
638def get_ip_forward(grade: Grade0, machine_name: str, step: int = 1) -> bool:
639    output, code = grade.test(machine_name, 'cat /proc/sys/net/ipv4/ip_forward', step=step)
640    if code != 0:
641        return False
642    return output.strip() == '1'
def get_sys_parameter_bool( grade: SRE.lib_sre.Grade0, machine_name: str, parameter: str, step: int = 1) -> bool | None:
645def get_sys_parameter_bool(grade: Grade0, machine_name: str, parameter: str, step: int = 1) -> bool | None:
646    """Read a boolean kernel parameter via sysctl and return its value.
647
648    Returns True for '1', False for '0', None if the parameter does not exist or the command fails.
649    """
650    value = get_sys_parameter(grade, machine_name, parameter, step=step)
651    if value is None:
652        return None
653    if value == '1':
654        return True
655    if value == '0':
656        return False
657    return None

Read a boolean kernel parameter via sysctl and return its value.

Returns True for '1', False for '0', None if the parameter does not exist or the command fails.

def get_net_config_from_topology( net_scheme: SRE.lib_sre.NetScheme0, topology=None, gateway: str = None, default_route: ipaddress.IPv4Address | ipaddress.IPv4Interface = IPv4Interface('172.17.0.1/24')) -> Dict[str, Union[Tuple[List[ipaddress.IPv4Interface], List[Tuple[ipaddress.IPv4Network, ipaddress.IPv4Address | ipaddress.IPv4Interface]]], Literal['dhcp'], NoneType]]:
660def get_net_config_from_topology(
661        net_scheme: NetScheme0,
662        topology=None,
663        gateway: str = None,
664        default_route: IPv4Address | IPv4Interface = IPv4Interface("172.17.0.1/24"),
665) -> NetConfig:
666    """Build a NetConfig for every machine in the topology so they can all reach each other.
667
668    Returns {machine_name: NetConfig}.
669
670    Each machine gets:
671    - One interface entry per network it belongs to, with the IP from net_scheme.data.ips.
672    - Static routes to every non-directly-connected network (found via BFS through
673      multi-homed router machines).
674
675    If *gateway* is not None:
676    - Every machine except the gateway gets a default route (0.0.0.0/0) toward the
677      gateway machine via the appropriate next-hop IP.
678    - The gateway itself gets a default route via *default_route* (typically the
679      Docker host bridge, e.g. 172.17.0.1).
680
681    The interface indices (eth0, eth1, …) are computed with the same counter logic as
682    NetScheme0.__init__, so they match the actual Kathara deployment.
683
684    IP names follow the convention set by random_ips_from_topology():
685    - data.ips.m       when machine m is in exactly one network
686    - data.ips.m_netX  when machine m is in multiple networks
687    """
688    if topology is None:
689        topology = net_scheme.get_topology()
690    data = net_scheme.data
691
692    # --- 1. Replicate NetScheme0 eth-index assignment ---
693    # machine_iface: {machine: {net_name: eth_idx}}
694    machine_iface: dict[str, dict[str, int]] = {}
695    _ctr: dict[str, int] = {}
696    for net_name, machines in topology.items():
697        items = list(machines.items()) if isinstance(machines, dict) else [(m, None) for m in machines]
698        for mname, iface_spec in items:
699            iface = iface_spec[0] if isinstance(iface_spec, tuple) else iface_spec
700            if iface is None:
701                iface = _ctr.get(mname, 0)
702            _ctr[mname] = max(_ctr.get(mname, 0), iface) + 1
703            machine_iface.setdefault(mname, {})[net_name] = iface
704
705    machine_nets: dict[str, list[str]] = {m: list(d.keys()) for m, d in machine_iface.items()}
706
707    # --- 2. IP lookup (matches random_ips_from_topology naming) ---
708    def get_ip(machine: str, net_name: str) -> IPv4Interface:
709        if len(machine_nets[machine]) == 1:
710            return getattr(data.ips, machine)
711        return getattr(data.ips, f"{machine}_{net_name}")
712
713    # --- 3. Network routing graph ---
714    # Multi-homed machines act as routers; each pair of their networks is an edge.
715    net_graph: dict[str, list[tuple[str, str]]] = {n: [] for n in topology}
716    for mname, nets in machine_nets.items():
717        for i, n1 in enumerate(nets):
718            for n2 in nets[i + 1:]:
719                net_graph[n1].append((n2, mname))
720                net_graph[n2].append((n1, mname))
721
722    # --- 4. BFS: find (via_start_net, first_router) from start_nets to a target ---
723    def _bfs(start_nets: set[str], is_target) -> tuple[str, str] | None:
724        visited = set(start_nets)
725        q: deque = deque()
726        for n in start_nets:
727            for adj, router in net_graph.get(n, []):
728                if adj not in visited:
729                    visited.add(adj)
730                    q.append((adj, n, router))
731        while q:
732            cur, via, router = q.popleft()
733            if is_target(cur):
734                return (via, router)
735            for adj, r in net_graph.get(cur, []):
736                if adj not in visited:
737                    visited.add(adj)
738                    q.append((adj, via, router))
739        return None
740
741    def first_hop_to_net(start_nets: set[str], target_net: str) -> tuple[str, str] | None:
742        """Return (via_net, router) for the first hop to reach target_net, or None if direct."""
743        if target_net in start_nets:
744            return None
745        return _bfs(start_nets, lambda n: n == target_net)
746
747    def first_hop_to_machine(start_nets: set[str], target: str) -> tuple[str, str] | None:
748        """Return (via_net, router) for the first hop toward target machine.
749        When directly connected, router is the target machine itself."""
750        target_nets = set(machine_nets.get(target, []))
751        shared = start_nets & target_nets
752        if shared:
753            return (next(iter(shared)), target)
754        return _bfs(start_nets, lambda n: n in target_nets)
755
756    # --- 5. default_route as bare IPv4Address ---
757    dr_ip: IPv4Address = default_route.ip if isinstance(default_route, IPv4Interface) else default_route
758
759    # --- 6. Assemble NetConfig per machine ---
760    result: dict[str, NetConfigEntry] = {}
761
762    for machine, net_to_eth in machine_iface.items():
763        start_nets = set(machine_nets[machine])
764
765        # Collect all candidate routes as (net, eth_idx, next_hop).
766        route_triples: list[tuple[IPv4Network, int, IPv4Address]] = []
767
768        # Specific routes to every non-directly-connected network
769        for target_net in topology:
770            if target_net in start_nets:
771                continue
772            hop = first_hop_to_net(start_nets, target_net)
773            if hop is None:
774                continue
775            via_net, router = hop
776            route_triples.append((
777                getattr(data.nets, target_net),
778                net_to_eth[via_net],
779                get_ip(router, via_net).ip,
780            ))
781
782        # Default route
783        if gateway is not None:
784            default_net = IPv4Network('0.0.0.0/0')
785            if machine == gateway:
786                # External default route on the lowest-index interface
787                route_triples.append((default_net, min(net_to_eth.values()), dr_ip))
788            else:
789                hop = first_hop_to_machine(start_nets, gateway)
790                if hop is not None:
791                    via_net, router = hop
792                    route_triples.append((
793                        default_net,
794                        net_to_eth[via_net],
795                        get_ip(router, via_net).ip,
796                    ))
797
798        # Drop redundant routes: a route (net, hop) is redundant when another route
799        # (net2, hop2) exists with the same next-hop and net2 is a strict supernet of net
800        # (i.e. net2 covers net entirely — the less-specific route already handles it).
801        def _is_redundant(net: IPv4Network, hop: IPv4Address) -> bool:
802            return any(
803                net2 != net and net.subnet_of(net2) and hop == hop2
804                for net2, _, hop2 in route_triples
805            )
806
807        eth_routes: dict[int, list] = {eth: [] for eth in net_to_eth.values()}
808        for net, eth, hop in route_triples:
809            if not _is_redundant(net, hop):
810                eth_routes[eth].append((net, hop))
811
812        # Build the NetConfig list indexed by eth number
813        max_eth = max(net_to_eth.values())
814        nc: NetConfigEntry = [None] * (max_eth + 1)
815        for net_name, eth_idx in net_to_eth.items():
816            nc[eth_idx] = ([get_ip(machine, net_name)], eth_routes[eth_idx])
817        result[machine] = nc
818
819    return result

Build a NetConfig for every machine in the topology so they can all reach each other.

Returns {machine_name: NetConfig}.

Each machine gets:

  • One interface entry per network it belongs to, with the IP from net_scheme.data.ips.
  • Static routes to every non-directly-connected network (found via BFS through multi-homed router machines).

If gateway is not None:

  • Every machine except the gateway gets a default route (0.0.0.0/0) toward the gateway machine via the appropriate next-hop IP.
  • The gateway itself gets a default route via default_route (typically the Docker host bridge, e.g. 172.17.0.1).

The interface indices (eth0, eth1, …) are computed with the same counter logic as NetScheme0.__init__, so they match the actual Kathara deployment.

IP names follow the convention set by random_ips_from_topology():

  • data.ips.m when machine m is in exactly one network
  • data.ips.m_netX when machine m is in multiple networks
def get_sys_parameter( grade: SRE.lib_sre.Grade0, machine_name: str, parameter: str, step: int = 1) -> str | None:
822def get_sys_parameter(grade: Grade0, machine_name: str, parameter: str, step: int = 1) -> str | None:
823    """Read a kernel parameter via sysctl and return its value as a string.
824
825    Returns None if the parameter does not exist or the command fails.
826    """
827    output, code = grade.test(machine_name, f'sysctl -n {parameter}', step=step)
828    if code != 0:
829        return None
830    return output.strip() or None

Read a kernel parameter via sysctl and return its value as a string.

Returns None if the parameter does not exist or the command fails.