lib.net_config
1import re 2from collections import deque 3from typing import List, Tuple, Dict, Any, Literal, TypeAlias 4from ipaddress import IPv4Address, IPv4Interface, IPv4Network 5 6from SRE.lib_sre import NetScheme0, Grade0 7 8NetConfigInterface: TypeAlias = ( 9 Tuple[ 10 List[IPv4Interface], 11 List[Tuple[IPv4Network, IPv4Address | IPv4Interface]] 12 ] 13 | Literal['dhcp'] 14 | None 15) 16 17NetConfigEntry: TypeAlias = List[NetConfigInterface] 18 19NetConfig: TypeAlias = Dict[str, NetConfigInterface] 20 21SysctlConfig = Dict[str, Any] 22 23 24def get_ip_addresses(grade: Grade0, machine_name: str, step: int = 1) -> Dict[str, List[Tuple[str, int]]]: 25 """Run 'ip a' on machine_name and parse the output. 26 27 Returns a dict mapping interface name -> list of (address, prefix_len). 28 29 Addresses for each interface are sorted by: 30 1. prefix length descending 31 2. IP address lexicographically ascending 32 33 Edge cases: 34 - virtual interfaces (e.g. eth0@if5) -> name stripped to 'eth0' 35 - non-zero exit code or empty output -> {} 36 """ 37 output, code = grade.test(machine_name, 'ip a', step=step) 38 if code != 0 or not output: 39 return {} 40 result: Dict[str, List[Tuple[str, int]]] = {} 41 current_iface = None 42 for line in output.splitlines(): 43 m = re.match(r'^\d+:\s+(\S+?)(?:@\S+)?:', line) 44 if m: 45 current_iface = m.group(1) 46 result.setdefault(current_iface, []) 47 continue 48 m = re.match(r'^\s+inet\s+(\d+\.\d+\.\d+\.\d+)/(\d+)', line) 49 if m and current_iface is not None: 50 result[current_iface].append((m.group(1), int(m.group(2)))) 51 for iface in result: 52 result[iface].sort(key=lambda x: (-x[1], x[0])) 53 return result 54 55 56def get_routes(grade: Grade0, machine_name: str, step: int = 1) -> Dict[Tuple[str, int], Tuple[str, str, int]]: 57 """Run 'ip route' on machine_name and parse the output. 58 59 Returns a dict mapping (network, mask) -> (via, dev, metric). 60 61 Edge cases: 62 - 'default' route -> key ('0.0.0.0', 0) 63 - host route without mask -> mask 32 64 - direct route (no via) -> via '' 65 - no explicit metric -> metric 0 66 - non-zero exit code or empty output -> {} 67 """ 68 output, code = grade.test(machine_name, 'ip route', step=step) 69 if code != 0 or not output: 70 return {} 71 result: Dict[Tuple[str, int], Tuple[str, str, int]] = {} 72 for line in output.splitlines(): 73 parts = line.split() 74 if not parts: 75 continue 76 dest = parts[0] 77 if dest == 'default': 78 net, mask = '0.0.0.0', 0 79 elif '/' in dest: 80 net, mask_str = dest.split('/') 81 mask = int(mask_str) 82 else: 83 net, mask = dest, 32 84 via, dev, metric = '', '', 0 85 i = 1 86 while i < len(parts): 87 if parts[i] == 'via' and i + 1 < len(parts): 88 via = parts[i + 1] 89 i += 2 90 elif parts[i] == 'dev' and i + 1 < len(parts): 91 dev = parts[i + 1] 92 i += 2 93 elif parts[i] == 'metric' and i + 1 < len(parts): 94 metric = int(parts[i + 1]) 95 i += 2 96 else: 97 i += 1 98 result[(net, mask)] = (via, dev, metric) 99 return result 100 101 102def get_sysctl_conf(grade: Grade0, machine_name: str, step: int = 1) -> Dict[str, str]: 103 """Read /etc/sysctl.conf and all files under /etc/sysctl.d/ and parse kernel parameters. 104 105 Returns a dict mapping parameter name -> value (both as strings). 106 107 Parsing rules: 108 - lines starting with '#' or ';' are comments and ignored 109 - blank lines are ignored 110 - accepted formats: 'key = value' and 'key=value' 111 - later definitions override earlier ones (sysctl.d files override sysctl.conf) 112 113 Edge cases: 114 - non-zero exit code from any grade.test() -> {} immediately 115 - first-pass empty output -> {} and continues to next call 116 """ 117 result: Dict[str, str] = {} 118 119 def _parse(text: str) -> None: 120 for line in text.splitlines(): 121 line = line.strip() 122 if not line or line[0] in ('#', ';'): 123 continue 124 if '=' in line: 125 key, _, value = line.partition('=') 126 result[key.strip()] = value.strip() 127 128 output, code = grade.test(machine_name, 'cat /etc/sysctl.conf', step=step) 129 if code != 0: 130 return {} 131 _parse(output) 132 133 output2, code = grade.test(machine_name, 'cat /etc/sysctl.d/*', step=step) 134 if code != 0: 135 return {} 136 _parse(output2) 137 138 return result 139 140 141def get_net_config_entry(grade: Grade0, machine_name: str, step: int = 1) -> NetConfigEntry: 142 """Run 'ip a', 'ip route', and 'ip link show' on machine_name and return a NetConfig. 143 144 Each entry in the returned list corresponds to one interface (eth0, eth1, …) 145 in index order. 146 147 Entry types: 148 - Tuple (addresses, routes): statically configured interface. 149 - 'dhcp': interface whose address(es) were all assigned dynamically (DHCP). 150 - None: interface present in the kernel but carrying no IP address. 151 152 The list covers eth0 … ethN where N is the highest eth index reported by 153 'ip link show'. Interfaces beyond the highest present index are not included. 154 155 Edge cases: 156 - non-zero exit code or empty output from any command -> [] 157 - interfaces whose name does not match eth\\d+ -> ignored 158 - routes with an unknown gateway -> attached to the first interface, or 159 dropped if there are no interfaces 160 """ 161 raw_addrs = get_ip_addresses(grade, machine_name, step=step) 162 raw_routes = get_routes(grade, machine_name, step=step) 163 164 # Re-use the cached 'ip a' output to detect dynamic (DHCP) addresses. 165 ip_a_out, _ = grade.test(machine_name, 'ip a', step=step) 166 167 # Parse which interfaces have *all* their inet addresses marked dynamic. 168 dynamic_ifaces: set[str] = set() 169 current_iface: str | None = None 170 iface_addr_flags: dict[str, list[bool]] = {} # iface -> [is_dynamic, ...] 171 for line in (ip_a_out or '').splitlines(): 172 m = re.match(r'^\d+:\s+(\S+?)(?:@\S+)?:', line) 173 if m: 174 current_iface = m.group(1) 175 iface_addr_flags.setdefault(current_iface, []) 176 continue 177 m = re.match(r'^\s+inet\s+\S+', line) 178 if m and current_iface is not None: 179 iface_addr_flags.setdefault(current_iface, []).append('dynamic' in line) 180 for iface, flags in iface_addr_flags.items(): 181 if flags and all(flags): 182 dynamic_ifaces.add(iface) 183 184 # Discover all present eth<N> interfaces via 'ip link show'. 185 ip_link_out, link_code = grade.test(machine_name, 'ip link show', step=step) 186 if link_code != 0 or not ip_link_out: 187 return [] 188 present_eths: set[str] = set( 189 re.findall(r'^\d+:\s+(eth\d+)(?:@\S+)?:', ip_link_out, re.MULTILINE) 190 ) 191 if not present_eths: 192 return [] 193 194 max_n = max(int(name[3:]) for name in present_eths) 195 196 # Build interface → addresses lookup from raw_addrs (only non-empty entries). 197 eth_addrs: dict[str, list[tuple[str, int]]] = { 198 name: addrs 199 for name, addrs in raw_addrs.items() 200 if name.startswith('eth') and name[3:].isdigit() and addrs 201 } 202 203 # Collect static eth interfaces in order (for route distribution). 204 static_eth_ifaces: list[tuple[str, list[IPv4Interface]]] = [] 205 for n in range(max_n + 1): 206 name = f'eth{n}' 207 if name not in present_eths: 208 continue 209 if name in eth_addrs and name not in dynamic_ifaces: 210 ifaces = [IPv4Interface(f"{addr}/{plen}") for addr, plen in eth_addrs[name]] 211 if ifaces: 212 static_eth_ifaces.append((name, ifaces)) 213 214 # Distribute routes among static interfaces (existing logic). 215 iface_networks: list[list[IPv4Network]] = [ 216 [iface.network for iface in ifaces] 217 for _, ifaces in static_eth_ifaces 218 ] 219 routes_per_iface: list[list[tuple[IPv4Network, IPv4Address]]] = [ 220 [] for _ in static_eth_ifaces 221 ] 222 for (net_str, mask), (via, dev, _metric) in raw_routes.items(): 223 if not via: 224 continue 225 dest = IPv4Network(f"{net_str}/{mask}") 226 gw = IPv4Address(via) 227 matched = None 228 for idx, nets in enumerate(iface_networks): 229 if any(gw in net for net in nets): 230 matched = idx 231 break 232 if matched is None: 233 matched = 0 234 if routes_per_iface: 235 routes_per_iface[matched].append((dest, gw)) 236 237 static_map: dict[str, tuple[list[IPv4Interface], list[tuple[IPv4Network, IPv4Address]]]] = { 238 name: (ifaces, routes_per_iface[idx]) 239 for idx, (name, ifaces) in enumerate(static_eth_ifaces) 240 } 241 242 # Assemble final NetConfig list. 243 result: NetConfigEntry = [] 244 for n in range(max_n + 1): 245 name = f'eth{n}' 246 if name not in present_eths: 247 result.append(None) 248 continue 249 if name in dynamic_ifaces: 250 result.append('dhcp') 251 elif name in static_map: 252 result.append(static_map[name]) 253 else: 254 result.append(None) 255 256 return result 257 258 259def get_persistent_net_config_entry(grade: Grade0, machine_name: str, step: int = 1) -> tuple[NetConfigEntry, int]: 260 """Parse /etc/network/interfaces (+ interfaces.d/) and return (NetConfig, error_count). 261 262 Reconstructs the network configuration that would result from 263 'systemctl networking start' (i.e. ifup -a). 264 265 Entry types in the returned NetConfig: 266 - Tuple (addresses, routes): inet-static stanza for eth<N>. 267 - 'dhcp': inet-dhcp stanza for eth<N>. 268 - None: eth<N> mentioned in an auto/allow-hotplug line but with no stanza. 269 270 Only eth<N> interfaces are included, in index order. 271 272 error_count counts lines that are syntactically wrong or have invalid 273 values (bad IP/network, unknown stanza keywords, malformed iface lines…). 274 275 Parsing rules: 276 - 'address <ip>[/<prefix>]' — primary address; prefix may be absent 277 - 'netmask <mask>' — applied to primary address when it has no prefix 278 - 'gateway <ip>' — inserted as a 0.0.0.0/0 route 279 - 'post-up / up ip addr add <addr> dev <iface>' — extra addresses 280 - 'post-up / up ip route add <net> via <gw>' — static routes 281 - Other post-up/up/down commands are silently ignored 282 """ 283 out1, code1 = grade.test(machine_name, 'cat /etc/network/interfaces', step=step, allow_error=True) 284 out2, code2 = grade.test(machine_name, 'cat /etc/network/interfaces.d/*', step=step, allow_error=True) 285 286 lines: list[str] = [] 287 include_interfaces_d = False 288 if code1 == 0 and out1: 289 lines.extend(out1.splitlines()) 290 for raw_line in out1.splitlines(): 291 stripped = raw_line.strip() 292 if not stripped or stripped.startswith('#'): 293 continue 294 parts = stripped.split() 295 if (parts[0] in ('source', 'source-directory') 296 and len(parts) >= 2 297 and '/etc/network/interfaces.d' in parts[1]): 298 include_interfaces_d = True 299 break 300 if include_interfaces_d and code2 == 0 and out2: 301 lines.extend(out2.splitlines()) 302 303 errors = 0 304 305 # Known stanza-level options that ifup accepts — won't be counted as errors. 306 _KNOWN_OPTS = { 307 'address', 'netmask', 'broadcast', 'network', 'gateway', 308 'metric', 'hwaddress', 'mtu', 'scope', 309 'pre-up', 'up', 'post-up', 'down', 'pre-down', 'post-down', 310 'dns-nameservers', 'dns-search', 'dns-domain', 311 'vlan-raw-device', 'bridge_ports', 'bridge_stp', 'bridge_fd', 312 'bond-master', 'bond-slaves', 'bond-mode', 'bond-miimon', 313 'wpa-ssid', 'wpa-psk', 'wpa-conf', 314 } 315 316 # Accumulate per-interface data keyed by interface name. 317 iface_data: dict[str, dict] = {} 318 dhcp_ifaces: set[str] = set() # interfaces with 'inet dhcp' stanza 319 auto_ifaces: set[str] = set() # interfaces seen in auto/allow-hotplug lines 320 current_iface: str | None = None # name of the active inet-static stanza 321 in_ignored_stanza: bool = False # True inside loopback / dhcp / manual stanza 322 323 for raw_line in lines: 324 stripped = raw_line.strip() 325 if not stripped or stripped.startswith('#'): 326 continue 327 328 parts = stripped.split() 329 kw = parts[0] 330 331 if kw == 'iface': 332 if len(parts) < 4: 333 errors += 1 334 current_iface = None 335 in_ignored_stanza = False 336 continue 337 iface_name, family, method = parts[1], parts[2], parts[3] 338 if family == 'inet' and method == 'static': 339 current_iface = iface_name 340 in_ignored_stanza = False 341 if iface_name not in iface_data: 342 iface_data[iface_name] = { 343 'primary': None, 'has_prefix': False, 344 'netmask': None, 'extras': [], 345 'routes': [], 'gateway': None, 346 } 347 elif family == 'inet' and method == 'dhcp': 348 dhcp_ifaces.add(iface_name) 349 current_iface = None 350 in_ignored_stanza = True 351 else: 352 current_iface = None # loopback / manual / other — irrelevant 353 in_ignored_stanza = True 354 355 elif kw in ('auto', 'allow-hotplug', 'allow-auto'): 356 for iface_name in parts[1:]: 357 auto_ifaces.add(iface_name) 358 359 elif kw in ('source', 'source-directory', 'mapping', 360 'no-auto-down', 'no-scripts'): 361 pass # Valid top-level directives, nothing to record 362 363 else: 364 # Option line within a stanza (indentation not required by ifupdown). 365 # If outside any stanza entirely, count as an error. 366 directive = kw 367 368 if current_iface is None: 369 if not in_ignored_stanza: 370 errors += 1 371 continue 372 373 data = iface_data[current_iface] 374 375 if directive == 'address': 376 if len(parts) < 2: 377 errors += 1 378 else: 379 try: 380 iface = IPv4Interface(parts[1]) 381 data['primary'] = iface 382 data['has_prefix'] = '/' in parts[1] 383 except ValueError: 384 errors += 1 385 386 elif directive == 'netmask': 387 if len(parts) < 2: 388 errors += 1 389 else: 390 data['netmask'] = parts[1] 391 392 elif directive == 'gateway': 393 if len(parts) < 2: 394 errors += 1 395 else: 396 try: 397 data['gateway'] = IPv4Address(parts[1]) 398 except ValueError: 399 errors += 1 400 401 elif directive in ('post-up', 'up'): 402 cmd = stripped[len(directive):].strip() 403 m = re.match(r'ip\s+route\s+add\s+(\S+)\s+via\s+(\S+)', cmd) 404 if m: 405 try: 406 net = IPv4Network(m.group(1), strict=False) 407 gw = IPv4Address(m.group(2)) 408 data['routes'].append((net, gw)) 409 except ValueError: 410 errors += 1 411 continue 412 m = re.match(r'ip\s+addr\s+add\s+(\S+)\s+dev\s+\S+', cmd) 413 if m: 414 try: 415 data['extras'].append(IPv4Interface(m.group(1))) 416 except ValueError: 417 errors += 1 418 # Other post-up / up commands (iptables, arp…) are not errors 419 420 elif directive not in _KNOWN_OPTS: 421 errors += 1 422 423 # Collect all eth<N> interface names we know about, in index order. 424 all_known = ( 425 set(iface_data.keys()) | dhcp_ifaces | 426 {n for n in auto_ifaces if n.startswith('eth') and n[3:].isdigit()} 427 ) 428 eth_names = sorted( 429 [n for n in all_known if n.startswith('eth') and n[3:].isdigit()], 430 key=lambda n: int(n[3:]) 431 ) 432 433 result: NetConfigEntry = [] 434 for name in eth_names: 435 if name in dhcp_ifaces: 436 result.append('dhcp') 437 continue 438 439 if name not in iface_data: 440 # Mentioned in auto but no inet stanza — unconfigured. 441 result.append(None) 442 continue 443 444 data = iface_data[name] 445 primary = data['primary'] 446 if primary is None: 447 result.append(None) 448 continue 449 450 # Combine bare IP with netmask when no CIDR prefix was given. 451 if not data['has_prefix'] and data['netmask']: 452 try: 453 prefix = IPv4Network(f'0.0.0.0/{data["netmask"]}').prefixlen 454 primary = IPv4Interface(f'{primary.ip}/{prefix}') 455 except ValueError: 456 errors += 1 457 458 addresses = [primary] + data['extras'] 459 460 routes = list(data['routes']) 461 if data['gateway']: 462 routes.insert(0, (IPv4Network('0.0.0.0/0'), data['gateway'])) 463 464 result.append((addresses, routes)) 465 466 return result, errors 467 468 469class _AttrDict(dict): 470 __getattr__ = dict.__getitem__ 471 472 473def eval_net_config(grade: Grade0, expected: NetConfigEntry, machine_name: str = None, 474 current: NetConfigEntry | None = None, step: int = 1) -> _AttrDict: 475 """Compare current and expected NetConfig; return a dict with 10 keys: 476 477 Existing keys (unchanged semantics — 'dhcp' and None entries are skipped): 478 - "ips" : number of IP addresses that are correct 479 - "default_route" : 1 if the default route matches the expected one, 0 otherwise 480 - "other_routes" : number of correct non-default static routes 481 - "wrong_routes" : number of non-default static routes present in current but not in expected 482 - "ips_expected" : number of IP addresses in expected 483 - "default_route_expected": 1 if expected has a default route, 0 otherwise 484 - "other_routes_expected" : number of non-default static routes in expected 485 486 New keys for DHCP/None tracking: 487 - "dhcp_interfaces" : positions where both expected and current are 'dhcp' 488 - "dhcp_interfaces_expected": count of 'dhcp' entries in expected 489 - "none_interfaces_expected": count of None entries in expected 490 491 If *current* is None, get_net_config() is called first. 492 IP addresses are compared as IPv4Interface strings (address + prefix). 493 Routes are compared as (network, gateway) pairs. 494 """ 495 if current is None: 496 if machine_name is None: 497 raise ValueError("eval_net_config: current and machine_name can't be both None") 498 current = get_net_config_entry(grade, machine_name, step=step) 499 500 def _collect_ips(nc: NetConfigEntry) -> set[str]: 501 result = set() 502 for entry in nc: 503 if not isinstance(entry, tuple): 504 continue 505 ifaces, _ = entry 506 for iface in ifaces: 507 result.add(str(iface)) 508 return result 509 510 def _collect_routes(nc: NetConfigEntry) -> set[tuple[str, str]]: 511 result = set() 512 for entry in nc: 513 if not isinstance(entry, tuple): 514 continue 515 _, routes = entry 516 for net, gw in routes: 517 if gw is None: 518 gw_str = '' 519 elif isinstance(gw, IPv4Interface): 520 gw_str = str(gw.ip) 521 else: 522 gw_str = str(gw) 523 result.add((str(net), gw_str)) 524 return result 525 526 exp_ips = _collect_ips(expected) 527 cur_ips = _collect_ips(current) 528 529 exp_routes = _collect_routes(expected) 530 cur_routes = _collect_routes(current) 531 532 default_key = '0.0.0.0/0' 533 exp_default = {r for r in exp_routes if r[0] == default_key} 534 cur_default = {r for r in cur_routes if r[0] == default_key} 535 exp_non_default = {r for r in exp_routes if r[0] != default_key} 536 cur_non_default = {r for r in cur_routes if r[0] != default_key} 537 538 dhcp_match = sum( 539 1 for e, c in zip(expected, current) 540 if e == 'dhcp' and c == 'dhcp' 541 ) 542 543 return _AttrDict( 544 ips=len(exp_ips & cur_ips), 545 default_route=1 if exp_default == cur_default else 0, 546 other_routes=len(exp_non_default & cur_non_default), 547 wrong_routes=len(cur_non_default - exp_non_default), 548 ips_expected=len(exp_ips), 549 default_route_expected=1 if exp_default else 0, 550 other_routes_expected=len(exp_non_default), 551 dhcp_interfaces=dhcp_match, 552 dhcp_interfaces_expected=sum(1 for e in expected if e == 'dhcp'), 553 none_interfaces_expected=sum(1 for e in expected if e is None), 554 ) 555 556 557def set_persistent_net_config_entry(net_scheme: NetScheme0, machine_name: str, nc_entry: NetConfigEntry): 558 lines = ['auto lo', 'iface lo inet loopback', ''] 559 for i, entry in enumerate(nc_entry): 560 interface = f'eth{i}' 561 if entry is None: 562 continue 563 if entry == 'dhcp': 564 lines += [ 565 f'auto {interface}', 566 f'iface {interface} inet dhcp', 567 '', 568 ] 569 continue 570 iface_list, routes = entry 571 lines += [ 572 f'auto {interface}', 573 f'iface {interface} inet static', 574 f' address {iface_list[0]}', 575 ] 576 for extra in iface_list[1:]: 577 lines.append(f' post-up ip addr add {extra} dev {interface}') 578 for net, via in routes: 579 via_addr = via.ip if isinstance(via, IPv4Interface) else via 580 if net == IPv4Network('0.0.0.0/0'): 581 lines.append(f' gateway {via_addr}') 582 for net, via in routes: 583 via_addr = via.ip if isinstance(via, IPv4Interface) else via 584 if net != IPv4Network('0.0.0.0/0'): 585 lines.append(f' post-up ip route add {net.compressed} via {via_addr}') 586 lines.append('') 587 net_scheme.file(machine_name, '/etc/network/interfaces', '\n'.join(lines)) 588 589 590def set_net_config_entry(net_scheme: NetScheme0, machine_name: str, nc_entry: NetConfigEntry): 591 for i, entry in enumerate(nc_entry): 592 interface = f'eth{i}' 593 if entry is None: 594 continue 595 if entry == 'dhcp': 596 net_scheme.cmd(machine_name, f'ip link set {interface} up') 597 net_scheme.cmd(machine_name, f'dhclient {interface}') 598 continue 599 iface_list, routes = entry 600 net_scheme.cmd(machine_name, f'ip link set {interface} up') 601 for iface in iface_list: 602 net_scheme.cmd(machine_name, f'ip addr add {iface} dev {interface}') 603 for net, via in routes: 604 via_addr = via.ip if isinstance(via, IPv4Interface) else via 605 net_scheme.cmd(machine_name, f'ip route add {net.compressed} via {str(via_addr)}') 606 607 608def set_persistent_sysctl(net_scheme: NetScheme0, machine_name: str, sysctl_config: SysctlConfig): 609 for name, value in sysctl_config.items(): 610 net_scheme.cmd(machine_name, f'echo "{name}={value}" >> /etc/sysctl.conf') 611 if sysctl_config: 612 remount_proc_sys(net_scheme, machine_name) 613 net_scheme.cmd(machine_name, 'sysctl -p') 614 615def remount_proc_sys(net_scheme: NetScheme0, machine_name: str): 616 if not hasattr(net_scheme, "remount_proc_sys_done"): 617 net_scheme.remount_proc_sys_done = {} 618 if net_scheme.remount_proc_sys_done.get(machine_name): 619 return 620 net_scheme.cmd(machine_name, "mount -o rw,remount /proc/sys") 621 net_scheme.remount_proc_sys_done[machine_name] = True 622 623 624def set_sysctl(net_scheme: NetScheme0, machine_name: str, sysctl_config: SysctlConfig): 625 if len(sysctl_config) > 0: 626 remount_proc_sys(net_scheme, machine_name) 627 for name, value in sysctl_config.items(): 628 net_scheme.cmd(machine_name, f'sysctl -w {name}={value}') 629 630 631def set_ip_forward(net_scheme: NetScheme0, machine_name: str, ip_forward: bool, step: int = 1): 632 value = 1 if ip_forward else 0 633 remount_proc_sys(net_scheme, machine_name) 634 net_scheme.cmd(machine_name, f'sysctl -w net.ipv4.ip_forward={value}', step=step) 635 636 637def get_ip_forward(grade: Grade0, machine_name: str, step: int = 1) -> bool: 638 output, code = grade.test(machine_name, 'cat /proc/sys/net/ipv4/ip_forward', step=step) 639 if code != 0: 640 return False 641 return output.strip() == '1' 642 643 644def get_sys_parameter_bool(grade: Grade0, machine_name: str, parameter: str, step: int = 1) -> bool | None: 645 """Read a boolean kernel parameter via sysctl and return its value. 646 647 Returns True for '1', False for '0', None if the parameter does not exist or the command fails. 648 """ 649 value = get_sys_parameter(grade, machine_name, parameter, step=step) 650 if value is None: 651 return None 652 if value == '1': 653 return True 654 if value == '0': 655 return False 656 return None 657 658 659def get_net_config_from_topology( 660 net_scheme: NetScheme0, 661 topology=None, 662 gateway: str = None, 663 default_route: IPv4Address | IPv4Interface = IPv4Interface("172.17.0.1/24"), 664) -> NetConfig: 665 """Build a NetConfig for every machine in the topology so they can all reach each other. 666 667 Returns {machine_name: NetConfig}. 668 669 Each machine gets: 670 - One interface entry per network it belongs to, with the IP from net_scheme.data.ips. 671 - Static routes to every non-directly-connected network (found via BFS through 672 multi-homed router machines). 673 674 If *gateway* is not None: 675 - Every machine except the gateway gets a default route (0.0.0.0/0) toward the 676 gateway machine via the appropriate next-hop IP. 677 - The gateway itself gets a default route via *default_route* (typically the 678 Docker host bridge, e.g. 172.17.0.1). 679 680 The interface indices (eth0, eth1, …) are computed with the same counter logic as 681 NetScheme0.__init__, so they match the actual Kathara deployment. 682 683 IP names follow the convention set by random_ips_from_topology(): 684 - data.ips.m when machine m is in exactly one network 685 - data.ips.m_netX when machine m is in multiple networks 686 """ 687 if topology is None: 688 topology = net_scheme.get_topology() 689 data = net_scheme.data 690 691 # --- 1. Replicate NetScheme0 eth-index assignment --- 692 # machine_iface: {machine: {net_name: eth_idx}} 693 machine_iface: dict[str, dict[str, int]] = {} 694 _ctr: dict[str, int] = {} 695 for net_name, machines in topology.items(): 696 items = list(machines.items()) if isinstance(machines, dict) else [(m, None) for m in machines] 697 for mname, iface_spec in items: 698 iface = iface_spec[0] if isinstance(iface_spec, tuple) else iface_spec 699 if iface is None: 700 iface = _ctr.get(mname, 0) 701 _ctr[mname] = max(_ctr.get(mname, 0), iface) + 1 702 machine_iface.setdefault(mname, {})[net_name] = iface 703 704 machine_nets: dict[str, list[str]] = {m: list(d.keys()) for m, d in machine_iface.items()} 705 706 # --- 2. IP lookup (matches random_ips_from_topology naming) --- 707 def get_ip(machine: str, net_name: str) -> IPv4Interface: 708 if len(machine_nets[machine]) == 1: 709 return getattr(data.ips, machine) 710 return getattr(data.ips, f"{machine}_{net_name}") 711 712 # --- 3. Network routing graph --- 713 # Multi-homed machines act as routers; each pair of their networks is an edge. 714 net_graph: dict[str, list[tuple[str, str]]] = {n: [] for n in topology} 715 for mname, nets in machine_nets.items(): 716 for i, n1 in enumerate(nets): 717 for n2 in nets[i + 1:]: 718 net_graph[n1].append((n2, mname)) 719 net_graph[n2].append((n1, mname)) 720 721 # --- 4. BFS: find (via_start_net, first_router) from start_nets to a target --- 722 def _bfs(start_nets: set[str], is_target) -> tuple[str, str] | None: 723 visited = set(start_nets) 724 q: deque = deque() 725 for n in start_nets: 726 for adj, router in net_graph.get(n, []): 727 if adj not in visited: 728 visited.add(adj) 729 q.append((adj, n, router)) 730 while q: 731 cur, via, router = q.popleft() 732 if is_target(cur): 733 return (via, router) 734 for adj, r in net_graph.get(cur, []): 735 if adj not in visited: 736 visited.add(adj) 737 q.append((adj, via, router)) 738 return None 739 740 def first_hop_to_net(start_nets: set[str], target_net: str) -> tuple[str, str] | None: 741 """Return (via_net, router) for the first hop to reach target_net, or None if direct.""" 742 if target_net in start_nets: 743 return None 744 return _bfs(start_nets, lambda n: n == target_net) 745 746 def first_hop_to_machine(start_nets: set[str], target: str) -> tuple[str, str] | None: 747 """Return (via_net, router) for the first hop toward target machine. 748 When directly connected, router is the target machine itself.""" 749 target_nets = set(machine_nets.get(target, [])) 750 shared = start_nets & target_nets 751 if shared: 752 return (next(iter(shared)), target) 753 return _bfs(start_nets, lambda n: n in target_nets) 754 755 # --- 5. default_route as bare IPv4Address --- 756 dr_ip: IPv4Address = default_route.ip if isinstance(default_route, IPv4Interface) else default_route 757 758 # --- 6. Assemble NetConfig per machine --- 759 result: dict[str, NetConfigEntry] = {} 760 761 for machine, net_to_eth in machine_iface.items(): 762 start_nets = set(machine_nets[machine]) 763 764 # Collect all candidate routes as (net, eth_idx, next_hop). 765 route_triples: list[tuple[IPv4Network, int, IPv4Address]] = [] 766 767 # Specific routes to every non-directly-connected network 768 for target_net in topology: 769 if target_net in start_nets: 770 continue 771 hop = first_hop_to_net(start_nets, target_net) 772 if hop is None: 773 continue 774 via_net, router = hop 775 route_triples.append(( 776 getattr(data.nets, target_net), 777 net_to_eth[via_net], 778 get_ip(router, via_net).ip, 779 )) 780 781 # Default route 782 if gateway is not None: 783 default_net = IPv4Network('0.0.0.0/0') 784 if machine == gateway: 785 # External default route on the lowest-index interface 786 route_triples.append((default_net, min(net_to_eth.values()), dr_ip)) 787 else: 788 hop = first_hop_to_machine(start_nets, gateway) 789 if hop is not None: 790 via_net, router = hop 791 route_triples.append(( 792 default_net, 793 net_to_eth[via_net], 794 get_ip(router, via_net).ip, 795 )) 796 797 # Drop redundant routes: a route (net, hop) is redundant when another route 798 # (net2, hop2) exists with the same next-hop and net2 is a strict supernet of net 799 # (i.e. net2 covers net entirely — the less-specific route already handles it). 800 def _is_redundant(net: IPv4Network, hop: IPv4Address) -> bool: 801 return any( 802 net2 != net and net.subnet_of(net2) and hop == hop2 803 for net2, _, hop2 in route_triples 804 ) 805 806 eth_routes: dict[int, list] = {eth: [] for eth in net_to_eth.values()} 807 for net, eth, hop in route_triples: 808 if not _is_redundant(net, hop): 809 eth_routes[eth].append((net, hop)) 810 811 # Build the NetConfig list indexed by eth number 812 max_eth = max(net_to_eth.values()) 813 nc: NetConfigEntry = [None] * (max_eth + 1) 814 for net_name, eth_idx in net_to_eth.items(): 815 nc[eth_idx] = ([get_ip(machine, net_name)], eth_routes[eth_idx]) 816 result[machine] = nc 817 818 return result 819 820 821def get_sys_parameter(grade: Grade0, machine_name: str, parameter: str, step: int = 1) -> str | None: 822 """Read a kernel parameter via sysctl and return its value as a string. 823 824 Returns None if the parameter does not exist or the command fails. 825 """ 826 output, code = grade.test(machine_name, f'sysctl -n {parameter}', step=step) 827 if code != 0: 828 return None 829 return output.strip() or None
25def get_ip_addresses(grade: Grade0, machine_name: str, step: int = 1) -> Dict[str, List[Tuple[str, int]]]: 26 """Run 'ip a' on machine_name and parse the output. 27 28 Returns a dict mapping interface name -> list of (address, prefix_len). 29 30 Addresses for each interface are sorted by: 31 1. prefix length descending 32 2. IP address lexicographically ascending 33 34 Edge cases: 35 - virtual interfaces (e.g. eth0@if5) -> name stripped to 'eth0' 36 - non-zero exit code or empty output -> {} 37 """ 38 output, code = grade.test(machine_name, 'ip a', step=step) 39 if code != 0 or not output: 40 return {} 41 result: Dict[str, List[Tuple[str, int]]] = {} 42 current_iface = None 43 for line in output.splitlines(): 44 m = re.match(r'^\d+:\s+(\S+?)(?:@\S+)?:', line) 45 if m: 46 current_iface = m.group(1) 47 result.setdefault(current_iface, []) 48 continue 49 m = re.match(r'^\s+inet\s+(\d+\.\d+\.\d+\.\d+)/(\d+)', line) 50 if m and current_iface is not None: 51 result[current_iface].append((m.group(1), int(m.group(2)))) 52 for iface in result: 53 result[iface].sort(key=lambda x: (-x[1], x[0])) 54 return result
Run 'ip a' on machine_name and parse the output.
Returns a dict mapping interface name -> list of (address, prefix_len).
Addresses for each interface are sorted by:
- prefix length descending
- IP address lexicographically ascending
Edge cases:
- virtual interfaces (e.g. eth0@if5) -> name stripped to 'eth0'
- non-zero exit code or empty output -> {}
57def get_routes(grade: Grade0, machine_name: str, step: int = 1) -> Dict[Tuple[str, int], Tuple[str, str, int]]: 58 """Run 'ip route' on machine_name and parse the output. 59 60 Returns a dict mapping (network, mask) -> (via, dev, metric). 61 62 Edge cases: 63 - 'default' route -> key ('0.0.0.0', 0) 64 - host route without mask -> mask 32 65 - direct route (no via) -> via '' 66 - no explicit metric -> metric 0 67 - non-zero exit code or empty output -> {} 68 """ 69 output, code = grade.test(machine_name, 'ip route', step=step) 70 if code != 0 or not output: 71 return {} 72 result: Dict[Tuple[str, int], Tuple[str, str, int]] = {} 73 for line in output.splitlines(): 74 parts = line.split() 75 if not parts: 76 continue 77 dest = parts[0] 78 if dest == 'default': 79 net, mask = '0.0.0.0', 0 80 elif '/' in dest: 81 net, mask_str = dest.split('/') 82 mask = int(mask_str) 83 else: 84 net, mask = dest, 32 85 via, dev, metric = '', '', 0 86 i = 1 87 while i < len(parts): 88 if parts[i] == 'via' and i + 1 < len(parts): 89 via = parts[i + 1] 90 i += 2 91 elif parts[i] == 'dev' and i + 1 < len(parts): 92 dev = parts[i + 1] 93 i += 2 94 elif parts[i] == 'metric' and i + 1 < len(parts): 95 metric = int(parts[i + 1]) 96 i += 2 97 else: 98 i += 1 99 result[(net, mask)] = (via, dev, metric) 100 return result
Run 'ip route' on machine_name and parse the output.
Returns a dict mapping (network, mask) -> (via, dev, metric).
Edge cases:
- 'default' route -> key ('0.0.0.0', 0)
- host route without mask -> mask 32
- direct route (no via) -> via ''
- no explicit metric -> metric 0
- non-zero exit code or empty output -> {}
103def get_sysctl_conf(grade: Grade0, machine_name: str, step: int = 1) -> Dict[str, str]: 104 """Read /etc/sysctl.conf and all files under /etc/sysctl.d/ and parse kernel parameters. 105 106 Returns a dict mapping parameter name -> value (both as strings). 107 108 Parsing rules: 109 - lines starting with '#' or ';' are comments and ignored 110 - blank lines are ignored 111 - accepted formats: 'key = value' and 'key=value' 112 - later definitions override earlier ones (sysctl.d files override sysctl.conf) 113 114 Edge cases: 115 - non-zero exit code from any grade.test() -> {} immediately 116 - first-pass empty output -> {} and continues to next call 117 """ 118 result: Dict[str, str] = {} 119 120 def _parse(text: str) -> None: 121 for line in text.splitlines(): 122 line = line.strip() 123 if not line or line[0] in ('#', ';'): 124 continue 125 if '=' in line: 126 key, _, value = line.partition('=') 127 result[key.strip()] = value.strip() 128 129 output, code = grade.test(machine_name, 'cat /etc/sysctl.conf', step=step) 130 if code != 0: 131 return {} 132 _parse(output) 133 134 output2, code = grade.test(machine_name, 'cat /etc/sysctl.d/*', step=step) 135 if code != 0: 136 return {} 137 _parse(output2) 138 139 return result
Read /etc/sysctl.conf and all files under /etc/sysctl.d/ and parse kernel parameters.
Returns a dict mapping parameter name -> value (both as strings).
Parsing rules:
- lines starting with '#' or ';' are comments and ignored
- blank lines are ignored
- accepted formats: 'key = value' and 'key=value'
- later definitions override earlier ones (sysctl.d files override sysctl.conf)
Edge cases:
- non-zero exit code from any grade.test() -> {} immediately
- first-pass empty output -> {} and continues to next call
142def get_net_config_entry(grade: Grade0, machine_name: str, step: int = 1) -> NetConfigEntry: 143 """Run 'ip a', 'ip route', and 'ip link show' on machine_name and return a NetConfig. 144 145 Each entry in the returned list corresponds to one interface (eth0, eth1, …) 146 in index order. 147 148 Entry types: 149 - Tuple (addresses, routes): statically configured interface. 150 - 'dhcp': interface whose address(es) were all assigned dynamically (DHCP). 151 - None: interface present in the kernel but carrying no IP address. 152 153 The list covers eth0 … ethN where N is the highest eth index reported by 154 'ip link show'. Interfaces beyond the highest present index are not included. 155 156 Edge cases: 157 - non-zero exit code or empty output from any command -> [] 158 - interfaces whose name does not match eth\\d+ -> ignored 159 - routes with an unknown gateway -> attached to the first interface, or 160 dropped if there are no interfaces 161 """ 162 raw_addrs = get_ip_addresses(grade, machine_name, step=step) 163 raw_routes = get_routes(grade, machine_name, step=step) 164 165 # Re-use the cached 'ip a' output to detect dynamic (DHCP) addresses. 166 ip_a_out, _ = grade.test(machine_name, 'ip a', step=step) 167 168 # Parse which interfaces have *all* their inet addresses marked dynamic. 169 dynamic_ifaces: set[str] = set() 170 current_iface: str | None = None 171 iface_addr_flags: dict[str, list[bool]] = {} # iface -> [is_dynamic, ...] 172 for line in (ip_a_out or '').splitlines(): 173 m = re.match(r'^\d+:\s+(\S+?)(?:@\S+)?:', line) 174 if m: 175 current_iface = m.group(1) 176 iface_addr_flags.setdefault(current_iface, []) 177 continue 178 m = re.match(r'^\s+inet\s+\S+', line) 179 if m and current_iface is not None: 180 iface_addr_flags.setdefault(current_iface, []).append('dynamic' in line) 181 for iface, flags in iface_addr_flags.items(): 182 if flags and all(flags): 183 dynamic_ifaces.add(iface) 184 185 # Discover all present eth<N> interfaces via 'ip link show'. 186 ip_link_out, link_code = grade.test(machine_name, 'ip link show', step=step) 187 if link_code != 0 or not ip_link_out: 188 return [] 189 present_eths: set[str] = set( 190 re.findall(r'^\d+:\s+(eth\d+)(?:@\S+)?:', ip_link_out, re.MULTILINE) 191 ) 192 if not present_eths: 193 return [] 194 195 max_n = max(int(name[3:]) for name in present_eths) 196 197 # Build interface → addresses lookup from raw_addrs (only non-empty entries). 198 eth_addrs: dict[str, list[tuple[str, int]]] = { 199 name: addrs 200 for name, addrs in raw_addrs.items() 201 if name.startswith('eth') and name[3:].isdigit() and addrs 202 } 203 204 # Collect static eth interfaces in order (for route distribution). 205 static_eth_ifaces: list[tuple[str, list[IPv4Interface]]] = [] 206 for n in range(max_n + 1): 207 name = f'eth{n}' 208 if name not in present_eths: 209 continue 210 if name in eth_addrs and name not in dynamic_ifaces: 211 ifaces = [IPv4Interface(f"{addr}/{plen}") for addr, plen in eth_addrs[name]] 212 if ifaces: 213 static_eth_ifaces.append((name, ifaces)) 214 215 # Distribute routes among static interfaces (existing logic). 216 iface_networks: list[list[IPv4Network]] = [ 217 [iface.network for iface in ifaces] 218 for _, ifaces in static_eth_ifaces 219 ] 220 routes_per_iface: list[list[tuple[IPv4Network, IPv4Address]]] = [ 221 [] for _ in static_eth_ifaces 222 ] 223 for (net_str, mask), (via, dev, _metric) in raw_routes.items(): 224 if not via: 225 continue 226 dest = IPv4Network(f"{net_str}/{mask}") 227 gw = IPv4Address(via) 228 matched = None 229 for idx, nets in enumerate(iface_networks): 230 if any(gw in net for net in nets): 231 matched = idx 232 break 233 if matched is None: 234 matched = 0 235 if routes_per_iface: 236 routes_per_iface[matched].append((dest, gw)) 237 238 static_map: dict[str, tuple[list[IPv4Interface], list[tuple[IPv4Network, IPv4Address]]]] = { 239 name: (ifaces, routes_per_iface[idx]) 240 for idx, (name, ifaces) in enumerate(static_eth_ifaces) 241 } 242 243 # Assemble final NetConfig list. 244 result: NetConfigEntry = [] 245 for n in range(max_n + 1): 246 name = f'eth{n}' 247 if name not in present_eths: 248 result.append(None) 249 continue 250 if name in dynamic_ifaces: 251 result.append('dhcp') 252 elif name in static_map: 253 result.append(static_map[name]) 254 else: 255 result.append(None) 256 257 return result
Run 'ip a', 'ip route', and 'ip link show' on machine_name and return a NetConfig.
Each entry in the returned list corresponds to one interface (eth0, eth1, …) in index order.
Entry types:
- Tuple (addresses, routes): statically configured interface.
- 'dhcp': interface whose address(es) were all assigned dynamically (DHCP).
- None: interface present in the kernel but carrying no IP address.
The list covers eth0 … ethN where N is the highest eth index reported by 'ip link show'. Interfaces beyond the highest present index are not included.
Edge cases:
- non-zero exit code or empty output from any command -> []
- interfaces whose name does not match eth\d+ -> ignored
- routes with an unknown gateway -> attached to the first interface, or dropped if there are no interfaces
260def get_persistent_net_config_entry(grade: Grade0, machine_name: str, step: int = 1) -> tuple[NetConfigEntry, int]: 261 """Parse /etc/network/interfaces (+ interfaces.d/) and return (NetConfig, error_count). 262 263 Reconstructs the network configuration that would result from 264 'systemctl networking start' (i.e. ifup -a). 265 266 Entry types in the returned NetConfig: 267 - Tuple (addresses, routes): inet-static stanza for eth<N>. 268 - 'dhcp': inet-dhcp stanza for eth<N>. 269 - None: eth<N> mentioned in an auto/allow-hotplug line but with no stanza. 270 271 Only eth<N> interfaces are included, in index order. 272 273 error_count counts lines that are syntactically wrong or have invalid 274 values (bad IP/network, unknown stanza keywords, malformed iface lines…). 275 276 Parsing rules: 277 - 'address <ip>[/<prefix>]' — primary address; prefix may be absent 278 - 'netmask <mask>' — applied to primary address when it has no prefix 279 - 'gateway <ip>' — inserted as a 0.0.0.0/0 route 280 - 'post-up / up ip addr add <addr> dev <iface>' — extra addresses 281 - 'post-up / up ip route add <net> via <gw>' — static routes 282 - Other post-up/up/down commands are silently ignored 283 """ 284 out1, code1 = grade.test(machine_name, 'cat /etc/network/interfaces', step=step, allow_error=True) 285 out2, code2 = grade.test(machine_name, 'cat /etc/network/interfaces.d/*', step=step, allow_error=True) 286 287 lines: list[str] = [] 288 include_interfaces_d = False 289 if code1 == 0 and out1: 290 lines.extend(out1.splitlines()) 291 for raw_line in out1.splitlines(): 292 stripped = raw_line.strip() 293 if not stripped or stripped.startswith('#'): 294 continue 295 parts = stripped.split() 296 if (parts[0] in ('source', 'source-directory') 297 and len(parts) >= 2 298 and '/etc/network/interfaces.d' in parts[1]): 299 include_interfaces_d = True 300 break 301 if include_interfaces_d and code2 == 0 and out2: 302 lines.extend(out2.splitlines()) 303 304 errors = 0 305 306 # Known stanza-level options that ifup accepts — won't be counted as errors. 307 _KNOWN_OPTS = { 308 'address', 'netmask', 'broadcast', 'network', 'gateway', 309 'metric', 'hwaddress', 'mtu', 'scope', 310 'pre-up', 'up', 'post-up', 'down', 'pre-down', 'post-down', 311 'dns-nameservers', 'dns-search', 'dns-domain', 312 'vlan-raw-device', 'bridge_ports', 'bridge_stp', 'bridge_fd', 313 'bond-master', 'bond-slaves', 'bond-mode', 'bond-miimon', 314 'wpa-ssid', 'wpa-psk', 'wpa-conf', 315 } 316 317 # Accumulate per-interface data keyed by interface name. 318 iface_data: dict[str, dict] = {} 319 dhcp_ifaces: set[str] = set() # interfaces with 'inet dhcp' stanza 320 auto_ifaces: set[str] = set() # interfaces seen in auto/allow-hotplug lines 321 current_iface: str | None = None # name of the active inet-static stanza 322 in_ignored_stanza: bool = False # True inside loopback / dhcp / manual stanza 323 324 for raw_line in lines: 325 stripped = raw_line.strip() 326 if not stripped or stripped.startswith('#'): 327 continue 328 329 parts = stripped.split() 330 kw = parts[0] 331 332 if kw == 'iface': 333 if len(parts) < 4: 334 errors += 1 335 current_iface = None 336 in_ignored_stanza = False 337 continue 338 iface_name, family, method = parts[1], parts[2], parts[3] 339 if family == 'inet' and method == 'static': 340 current_iface = iface_name 341 in_ignored_stanza = False 342 if iface_name not in iface_data: 343 iface_data[iface_name] = { 344 'primary': None, 'has_prefix': False, 345 'netmask': None, 'extras': [], 346 'routes': [], 'gateway': None, 347 } 348 elif family == 'inet' and method == 'dhcp': 349 dhcp_ifaces.add(iface_name) 350 current_iface = None 351 in_ignored_stanza = True 352 else: 353 current_iface = None # loopback / manual / other — irrelevant 354 in_ignored_stanza = True 355 356 elif kw in ('auto', 'allow-hotplug', 'allow-auto'): 357 for iface_name in parts[1:]: 358 auto_ifaces.add(iface_name) 359 360 elif kw in ('source', 'source-directory', 'mapping', 361 'no-auto-down', 'no-scripts'): 362 pass # Valid top-level directives, nothing to record 363 364 else: 365 # Option line within a stanza (indentation not required by ifupdown). 366 # If outside any stanza entirely, count as an error. 367 directive = kw 368 369 if current_iface is None: 370 if not in_ignored_stanza: 371 errors += 1 372 continue 373 374 data = iface_data[current_iface] 375 376 if directive == 'address': 377 if len(parts) < 2: 378 errors += 1 379 else: 380 try: 381 iface = IPv4Interface(parts[1]) 382 data['primary'] = iface 383 data['has_prefix'] = '/' in parts[1] 384 except ValueError: 385 errors += 1 386 387 elif directive == 'netmask': 388 if len(parts) < 2: 389 errors += 1 390 else: 391 data['netmask'] = parts[1] 392 393 elif directive == 'gateway': 394 if len(parts) < 2: 395 errors += 1 396 else: 397 try: 398 data['gateway'] = IPv4Address(parts[1]) 399 except ValueError: 400 errors += 1 401 402 elif directive in ('post-up', 'up'): 403 cmd = stripped[len(directive):].strip() 404 m = re.match(r'ip\s+route\s+add\s+(\S+)\s+via\s+(\S+)', cmd) 405 if m: 406 try: 407 net = IPv4Network(m.group(1), strict=False) 408 gw = IPv4Address(m.group(2)) 409 data['routes'].append((net, gw)) 410 except ValueError: 411 errors += 1 412 continue 413 m = re.match(r'ip\s+addr\s+add\s+(\S+)\s+dev\s+\S+', cmd) 414 if m: 415 try: 416 data['extras'].append(IPv4Interface(m.group(1))) 417 except ValueError: 418 errors += 1 419 # Other post-up / up commands (iptables, arp…) are not errors 420 421 elif directive not in _KNOWN_OPTS: 422 errors += 1 423 424 # Collect all eth<N> interface names we know about, in index order. 425 all_known = ( 426 set(iface_data.keys()) | dhcp_ifaces | 427 {n for n in auto_ifaces if n.startswith('eth') and n[3:].isdigit()} 428 ) 429 eth_names = sorted( 430 [n for n in all_known if n.startswith('eth') and n[3:].isdigit()], 431 key=lambda n: int(n[3:]) 432 ) 433 434 result: NetConfigEntry = [] 435 for name in eth_names: 436 if name in dhcp_ifaces: 437 result.append('dhcp') 438 continue 439 440 if name not in iface_data: 441 # Mentioned in auto but no inet stanza — unconfigured. 442 result.append(None) 443 continue 444 445 data = iface_data[name] 446 primary = data['primary'] 447 if primary is None: 448 result.append(None) 449 continue 450 451 # Combine bare IP with netmask when no CIDR prefix was given. 452 if not data['has_prefix'] and data['netmask']: 453 try: 454 prefix = IPv4Network(f'0.0.0.0/{data["netmask"]}').prefixlen 455 primary = IPv4Interface(f'{primary.ip}/{prefix}') 456 except ValueError: 457 errors += 1 458 459 addresses = [primary] + data['extras'] 460 461 routes = list(data['routes']) 462 if data['gateway']: 463 routes.insert(0, (IPv4Network('0.0.0.0/0'), data['gateway'])) 464 465 result.append((addresses, routes)) 466 467 return result, errors
Parse /etc/network/interfaces (+ interfaces.d/) and return (NetConfig, error_count).
Reconstructs the network configuration that would result from 'systemctl networking start' (i.e. ifup -a).
Entry types in the returned NetConfig:
- Tuple (addresses, routes): inet-static stanza for eth
. - 'dhcp': inet-dhcp stanza for eth
. - None: eth
mentioned in an auto/allow-hotplug line but with no stanza.
Only eth
error_count counts lines that are syntactically wrong or have invalid values (bad IP/network, unknown stanza keywords, malformed iface lines…).
Parsing rules:
- 'address
[/ ]' — primary address; prefix may be absent - 'netmask
' — applied to primary address when it has no prefix - 'gateway
' — inserted as a 0.0.0.0/0 route - 'post-up / up ip addr add
dev ' — extra addresses - 'post-up / up ip route add
via ' — static routes - Other post-up/up/down commands are silently ignored
474def eval_net_config(grade: Grade0, expected: NetConfigEntry, machine_name: str = None, 475 current: NetConfigEntry | None = None, step: int = 1) -> _AttrDict: 476 """Compare current and expected NetConfig; return a dict with 10 keys: 477 478 Existing keys (unchanged semantics — 'dhcp' and None entries are skipped): 479 - "ips" : number of IP addresses that are correct 480 - "default_route" : 1 if the default route matches the expected one, 0 otherwise 481 - "other_routes" : number of correct non-default static routes 482 - "wrong_routes" : number of non-default static routes present in current but not in expected 483 - "ips_expected" : number of IP addresses in expected 484 - "default_route_expected": 1 if expected has a default route, 0 otherwise 485 - "other_routes_expected" : number of non-default static routes in expected 486 487 New keys for DHCP/None tracking: 488 - "dhcp_interfaces" : positions where both expected and current are 'dhcp' 489 - "dhcp_interfaces_expected": count of 'dhcp' entries in expected 490 - "none_interfaces_expected": count of None entries in expected 491 492 If *current* is None, get_net_config() is called first. 493 IP addresses are compared as IPv4Interface strings (address + prefix). 494 Routes are compared as (network, gateway) pairs. 495 """ 496 if current is None: 497 if machine_name is None: 498 raise ValueError("eval_net_config: current and machine_name can't be both None") 499 current = get_net_config_entry(grade, machine_name, step=step) 500 501 def _collect_ips(nc: NetConfigEntry) -> set[str]: 502 result = set() 503 for entry in nc: 504 if not isinstance(entry, tuple): 505 continue 506 ifaces, _ = entry 507 for iface in ifaces: 508 result.add(str(iface)) 509 return result 510 511 def _collect_routes(nc: NetConfigEntry) -> set[tuple[str, str]]: 512 result = set() 513 for entry in nc: 514 if not isinstance(entry, tuple): 515 continue 516 _, routes = entry 517 for net, gw in routes: 518 if gw is None: 519 gw_str = '' 520 elif isinstance(gw, IPv4Interface): 521 gw_str = str(gw.ip) 522 else: 523 gw_str = str(gw) 524 result.add((str(net), gw_str)) 525 return result 526 527 exp_ips = _collect_ips(expected) 528 cur_ips = _collect_ips(current) 529 530 exp_routes = _collect_routes(expected) 531 cur_routes = _collect_routes(current) 532 533 default_key = '0.0.0.0/0' 534 exp_default = {r for r in exp_routes if r[0] == default_key} 535 cur_default = {r for r in cur_routes if r[0] == default_key} 536 exp_non_default = {r for r in exp_routes if r[0] != default_key} 537 cur_non_default = {r for r in cur_routes if r[0] != default_key} 538 539 dhcp_match = sum( 540 1 for e, c in zip(expected, current) 541 if e == 'dhcp' and c == 'dhcp' 542 ) 543 544 return _AttrDict( 545 ips=len(exp_ips & cur_ips), 546 default_route=1 if exp_default == cur_default else 0, 547 other_routes=len(exp_non_default & cur_non_default), 548 wrong_routes=len(cur_non_default - exp_non_default), 549 ips_expected=len(exp_ips), 550 default_route_expected=1 if exp_default else 0, 551 other_routes_expected=len(exp_non_default), 552 dhcp_interfaces=dhcp_match, 553 dhcp_interfaces_expected=sum(1 for e in expected if e == 'dhcp'), 554 none_interfaces_expected=sum(1 for e in expected if e is None), 555 )
Compare current and expected NetConfig; return a dict with 10 keys:
Existing keys (unchanged semantics — 'dhcp' and None entries are skipped):
- "ips" : number of IP addresses that are correct
- "default_route" : 1 if the default route matches the expected one, 0 otherwise
- "other_routes" : number of correct non-default static routes
- "wrong_routes" : number of non-default static routes present in current but not in expected
- "ips_expected" : number of IP addresses in expected
- "default_route_expected": 1 if expected has a default route, 0 otherwise
- "other_routes_expected" : number of non-default static routes in expected
New keys for DHCP/None tracking:
- "dhcp_interfaces" : positions where both expected and current are 'dhcp'
- "dhcp_interfaces_expected": count of 'dhcp' entries in expected
- "none_interfaces_expected": count of None entries in expected
If current is None, get_net_config() is called first. IP addresses are compared as IPv4Interface strings (address + prefix). Routes are compared as (network, gateway) pairs.
558def set_persistent_net_config_entry(net_scheme: NetScheme0, machine_name: str, nc_entry: NetConfigEntry): 559 lines = ['auto lo', 'iface lo inet loopback', ''] 560 for i, entry in enumerate(nc_entry): 561 interface = f'eth{i}' 562 if entry is None: 563 continue 564 if entry == 'dhcp': 565 lines += [ 566 f'auto {interface}', 567 f'iface {interface} inet dhcp', 568 '', 569 ] 570 continue 571 iface_list, routes = entry 572 lines += [ 573 f'auto {interface}', 574 f'iface {interface} inet static', 575 f' address {iface_list[0]}', 576 ] 577 for extra in iface_list[1:]: 578 lines.append(f' post-up ip addr add {extra} dev {interface}') 579 for net, via in routes: 580 via_addr = via.ip if isinstance(via, IPv4Interface) else via 581 if net == IPv4Network('0.0.0.0/0'): 582 lines.append(f' gateway {via_addr}') 583 for net, via in routes: 584 via_addr = via.ip if isinstance(via, IPv4Interface) else via 585 if net != IPv4Network('0.0.0.0/0'): 586 lines.append(f' post-up ip route add {net.compressed} via {via_addr}') 587 lines.append('') 588 net_scheme.file(machine_name, '/etc/network/interfaces', '\n'.join(lines))
591def set_net_config_entry(net_scheme: NetScheme0, machine_name: str, nc_entry: NetConfigEntry): 592 for i, entry in enumerate(nc_entry): 593 interface = f'eth{i}' 594 if entry is None: 595 continue 596 if entry == 'dhcp': 597 net_scheme.cmd(machine_name, f'ip link set {interface} up') 598 net_scheme.cmd(machine_name, f'dhclient {interface}') 599 continue 600 iface_list, routes = entry 601 net_scheme.cmd(machine_name, f'ip link set {interface} up') 602 for iface in iface_list: 603 net_scheme.cmd(machine_name, f'ip addr add {iface} dev {interface}') 604 for net, via in routes: 605 via_addr = via.ip if isinstance(via, IPv4Interface) else via 606 net_scheme.cmd(machine_name, f'ip route add {net.compressed} via {str(via_addr)}')
609def set_persistent_sysctl(net_scheme: NetScheme0, machine_name: str, sysctl_config: SysctlConfig): 610 for name, value in sysctl_config.items(): 611 net_scheme.cmd(machine_name, f'echo "{name}={value}" >> /etc/sysctl.conf') 612 if sysctl_config: 613 remount_proc_sys(net_scheme, machine_name) 614 net_scheme.cmd(machine_name, 'sysctl -p')
616def remount_proc_sys(net_scheme: NetScheme0, machine_name: str): 617 if not hasattr(net_scheme, "remount_proc_sys_done"): 618 net_scheme.remount_proc_sys_done = {} 619 if net_scheme.remount_proc_sys_done.get(machine_name): 620 return 621 net_scheme.cmd(machine_name, "mount -o rw,remount /proc/sys") 622 net_scheme.remount_proc_sys_done[machine_name] = True
645def get_sys_parameter_bool(grade: Grade0, machine_name: str, parameter: str, step: int = 1) -> bool | None: 646 """Read a boolean kernel parameter via sysctl and return its value. 647 648 Returns True for '1', False for '0', None if the parameter does not exist or the command fails. 649 """ 650 value = get_sys_parameter(grade, machine_name, parameter, step=step) 651 if value is None: 652 return None 653 if value == '1': 654 return True 655 if value == '0': 656 return False 657 return None
Read a boolean kernel parameter via sysctl and return its value.
Returns True for '1', False for '0', None if the parameter does not exist or the command fails.
660def get_net_config_from_topology( 661 net_scheme: NetScheme0, 662 topology=None, 663 gateway: str = None, 664 default_route: IPv4Address | IPv4Interface = IPv4Interface("172.17.0.1/24"), 665) -> NetConfig: 666 """Build a NetConfig for every machine in the topology so they can all reach each other. 667 668 Returns {machine_name: NetConfig}. 669 670 Each machine gets: 671 - One interface entry per network it belongs to, with the IP from net_scheme.data.ips. 672 - Static routes to every non-directly-connected network (found via BFS through 673 multi-homed router machines). 674 675 If *gateway* is not None: 676 - Every machine except the gateway gets a default route (0.0.0.0/0) toward the 677 gateway machine via the appropriate next-hop IP. 678 - The gateway itself gets a default route via *default_route* (typically the 679 Docker host bridge, e.g. 172.17.0.1). 680 681 The interface indices (eth0, eth1, …) are computed with the same counter logic as 682 NetScheme0.__init__, so they match the actual Kathara deployment. 683 684 IP names follow the convention set by random_ips_from_topology(): 685 - data.ips.m when machine m is in exactly one network 686 - data.ips.m_netX when machine m is in multiple networks 687 """ 688 if topology is None: 689 topology = net_scheme.get_topology() 690 data = net_scheme.data 691 692 # --- 1. Replicate NetScheme0 eth-index assignment --- 693 # machine_iface: {machine: {net_name: eth_idx}} 694 machine_iface: dict[str, dict[str, int]] = {} 695 _ctr: dict[str, int] = {} 696 for net_name, machines in topology.items(): 697 items = list(machines.items()) if isinstance(machines, dict) else [(m, None) for m in machines] 698 for mname, iface_spec in items: 699 iface = iface_spec[0] if isinstance(iface_spec, tuple) else iface_spec 700 if iface is None: 701 iface = _ctr.get(mname, 0) 702 _ctr[mname] = max(_ctr.get(mname, 0), iface) + 1 703 machine_iface.setdefault(mname, {})[net_name] = iface 704 705 machine_nets: dict[str, list[str]] = {m: list(d.keys()) for m, d in machine_iface.items()} 706 707 # --- 2. IP lookup (matches random_ips_from_topology naming) --- 708 def get_ip(machine: str, net_name: str) -> IPv4Interface: 709 if len(machine_nets[machine]) == 1: 710 return getattr(data.ips, machine) 711 return getattr(data.ips, f"{machine}_{net_name}") 712 713 # --- 3. Network routing graph --- 714 # Multi-homed machines act as routers; each pair of their networks is an edge. 715 net_graph: dict[str, list[tuple[str, str]]] = {n: [] for n in topology} 716 for mname, nets in machine_nets.items(): 717 for i, n1 in enumerate(nets): 718 for n2 in nets[i + 1:]: 719 net_graph[n1].append((n2, mname)) 720 net_graph[n2].append((n1, mname)) 721 722 # --- 4. BFS: find (via_start_net, first_router) from start_nets to a target --- 723 def _bfs(start_nets: set[str], is_target) -> tuple[str, str] | None: 724 visited = set(start_nets) 725 q: deque = deque() 726 for n in start_nets: 727 for adj, router in net_graph.get(n, []): 728 if adj not in visited: 729 visited.add(adj) 730 q.append((adj, n, router)) 731 while q: 732 cur, via, router = q.popleft() 733 if is_target(cur): 734 return (via, router) 735 for adj, r in net_graph.get(cur, []): 736 if adj not in visited: 737 visited.add(adj) 738 q.append((adj, via, router)) 739 return None 740 741 def first_hop_to_net(start_nets: set[str], target_net: str) -> tuple[str, str] | None: 742 """Return (via_net, router) for the first hop to reach target_net, or None if direct.""" 743 if target_net in start_nets: 744 return None 745 return _bfs(start_nets, lambda n: n == target_net) 746 747 def first_hop_to_machine(start_nets: set[str], target: str) -> tuple[str, str] | None: 748 """Return (via_net, router) for the first hop toward target machine. 749 When directly connected, router is the target machine itself.""" 750 target_nets = set(machine_nets.get(target, [])) 751 shared = start_nets & target_nets 752 if shared: 753 return (next(iter(shared)), target) 754 return _bfs(start_nets, lambda n: n in target_nets) 755 756 # --- 5. default_route as bare IPv4Address --- 757 dr_ip: IPv4Address = default_route.ip if isinstance(default_route, IPv4Interface) else default_route 758 759 # --- 6. Assemble NetConfig per machine --- 760 result: dict[str, NetConfigEntry] = {} 761 762 for machine, net_to_eth in machine_iface.items(): 763 start_nets = set(machine_nets[machine]) 764 765 # Collect all candidate routes as (net, eth_idx, next_hop). 766 route_triples: list[tuple[IPv4Network, int, IPv4Address]] = [] 767 768 # Specific routes to every non-directly-connected network 769 for target_net in topology: 770 if target_net in start_nets: 771 continue 772 hop = first_hop_to_net(start_nets, target_net) 773 if hop is None: 774 continue 775 via_net, router = hop 776 route_triples.append(( 777 getattr(data.nets, target_net), 778 net_to_eth[via_net], 779 get_ip(router, via_net).ip, 780 )) 781 782 # Default route 783 if gateway is not None: 784 default_net = IPv4Network('0.0.0.0/0') 785 if machine == gateway: 786 # External default route on the lowest-index interface 787 route_triples.append((default_net, min(net_to_eth.values()), dr_ip)) 788 else: 789 hop = first_hop_to_machine(start_nets, gateway) 790 if hop is not None: 791 via_net, router = hop 792 route_triples.append(( 793 default_net, 794 net_to_eth[via_net], 795 get_ip(router, via_net).ip, 796 )) 797 798 # Drop redundant routes: a route (net, hop) is redundant when another route 799 # (net2, hop2) exists with the same next-hop and net2 is a strict supernet of net 800 # (i.e. net2 covers net entirely — the less-specific route already handles it). 801 def _is_redundant(net: IPv4Network, hop: IPv4Address) -> bool: 802 return any( 803 net2 != net and net.subnet_of(net2) and hop == hop2 804 for net2, _, hop2 in route_triples 805 ) 806 807 eth_routes: dict[int, list] = {eth: [] for eth in net_to_eth.values()} 808 for net, eth, hop in route_triples: 809 if not _is_redundant(net, hop): 810 eth_routes[eth].append((net, hop)) 811 812 # Build the NetConfig list indexed by eth number 813 max_eth = max(net_to_eth.values()) 814 nc: NetConfigEntry = [None] * (max_eth + 1) 815 for net_name, eth_idx in net_to_eth.items(): 816 nc[eth_idx] = ([get_ip(machine, net_name)], eth_routes[eth_idx]) 817 result[machine] = nc 818 819 return result
Build a NetConfig for every machine in the topology so they can all reach each other.
Returns {machine_name: NetConfig}.
Each machine gets:
- One interface entry per network it belongs to, with the IP from net_scheme.data.ips.
- Static routes to every non-directly-connected network (found via BFS through multi-homed router machines).
If gateway is not None:
- Every machine except the gateway gets a default route (0.0.0.0/0) toward the gateway machine via the appropriate next-hop IP.
- The gateway itself gets a default route via default_route (typically the Docker host bridge, e.g. 172.17.0.1).
The interface indices (eth0, eth1, …) are computed with the same counter logic as NetScheme0.__init__, so they match the actual Kathara deployment.
IP names follow the convention set by random_ips_from_topology():
- data.ips.m when machine m is in exactly one network
- data.ips.m_netX when machine m is in multiple networks
822def get_sys_parameter(grade: Grade0, machine_name: str, parameter: str, step: int = 1) -> str | None: 823 """Read a kernel parameter via sysctl and return its value as a string. 824 825 Returns None if the parameter does not exist or the command fails. 826 """ 827 output, code = grade.test(machine_name, f'sysctl -n {parameter}', step=step) 828 if code != 0: 829 return None 830 return output.strip() or None
Read a kernel parameter via sysctl and return its value as a string.
Returns None if the parameter does not exist or the command fails.