lib.dhcp
1import re 2from dataclasses import dataclass, field 3from ipaddress import IPv4Address, IPv4Network 4from typing import Any 5 6from SRE.lib_sre import Grade0, NetScheme0 7 8 9@dataclass 10class DhcpSubnet: 11 """Configuration for a single DHCP subnet declaration.""" 12 subnet: IPv4Network 13 range_start: IPv4Address 14 range_end: IPv4Address 15 routers: list[IPv4Address] = field(default_factory=list) 16 dns_servers: list[IPv4Address] = field(default_factory=list) 17 domain_name: str | None = None 18 broadcast_address: IPv4Address | None = None 19 default_lease_time: int | None = None # subnet-level override; None = use global 20 max_lease_time: int | None = None # subnet-level override; None = use global 21 fixed_addresses: dict[str, str] = field(default_factory=dict) # MAC → IP string 22 23 24@dataclass 25class DhcpParameters: 26 """All parameters required to configure an ISC DHCP server on a Debian machine. 27 28 Covers both /etc/default/isc-dhcp-server (``interfaces_v4``, ``interfaces_v6``) and 29 /etc/dhcp/dhcpd.conf (everything else). 30 """ 31 interfaces_v4: list[str] # written to INTERFACESv4 in /etc/default/isc-dhcp-server 32 interfaces_v6: list[str] = field(default_factory=list) # written to INTERFACESv6 33 subnets: list[DhcpSubnet] = field(default_factory=list) 34 authoritative: bool = True 35 default_lease_time: int | None = None # omitted from global section when None 36 max_lease_time: int | None = None # omitted from global section when None 37 ddns_update_style: str = 'none' 38 39 40def set_dhcp_server(net_scheme: NetScheme0, machine: str, 41 dhcp_params: DhcpParameters, step: int = 1) -> None: 42 """Write DHCP server configuration files and (re)start the service on *machine_name*. 43 44 Writes: 45 - /etc/default/isc-dhcp-server (INTERFACESv4) 46 - /etc/dhcp/dhcpd.conf (generated from dhcp_params) 47 48 Then runs ``systemctl enable`` and ``systemctl restart`` for isc-dhcp-server. 49 """ 50 # /etc/default/isc-dhcp-server 51 ifaces_v4 = ' '.join(dhcp_params.interfaces_v4) 52 ifaces_v6 = ' '.join(dhcp_params.interfaces_v6) 53 default_content = f'INTERFACESv4="{ifaces_v4}"\nINTERFACESv6="{ifaces_v6}"\n' 54 net_scheme.file(machine, '/etc/default/isc-dhcp-server', default_content, step=step) 55 56 # /etc/dhcp/dhcpd.conf 57 lines = [] 58 auth = 'authoritative' if dhcp_params.authoritative else 'not authoritative' 59 lines += [ 60 f'ddns-update-style {dhcp_params.ddns_update_style};', 61 f'{auth};', 62 ] 63 if dhcp_params.default_lease_time is not None: 64 lines.append(f'default-lease-time {dhcp_params.default_lease_time};') 65 if dhcp_params.max_lease_time is not None: 66 lines.append(f'max-lease-time {dhcp_params.max_lease_time};') 67 for s in dhcp_params.subnets: 68 lines.append('') 69 lines.append(f'subnet {s.subnet.network_address} netmask {s.subnet.netmask} {{') 70 lines.append(f' range {s.range_start} {s.range_end};') 71 if s.routers: 72 lines.append(f' option routers {", ".join(str(r) for r in s.routers)};') 73 if s.dns_servers: 74 lines.append(f' option domain-name-servers {", ".join(str(d) for d in s.dns_servers)};') 75 if s.domain_name is not None: 76 lines.append(f' option domain-name "{s.domain_name}";') 77 if s.broadcast_address is not None: 78 lines.append(f' option broadcast-address {s.broadcast_address};') 79 if s.default_lease_time is not None: 80 lines.append(f' default-lease-time {s.default_lease_time};') 81 if s.max_lease_time is not None: 82 lines.append(f' max-lease-time {s.max_lease_time};') 83 for mac, ip in s.fixed_addresses.items(): 84 safe = mac.replace(':', '-') 85 lines += [ 86 f' host {safe} {{', 87 f' hardware ethernet {mac};', 88 f' fixed-address {ip};', 89 f' }}', 90 ] 91 lines.append('}') 92 net_scheme.file(machine, '/etc/dhcp/dhcpd.conf', '\n'.join(lines) + '\n', step=step) 93 94 net_scheme.cmd(machine, 'systemctl enable isc-dhcp-server', step=step) 95 net_scheme.cmd(machine, 'systemctl restart isc-dhcp-server', step=step) 96 97 98def get_dhcp_server(grade: Grade0, machine: str, 99 step: int = 1) -> tuple[DhcpParameters | None, int]: 100 """Read and parse the DHCP server configuration on *machine_name*. 101 102 Returns ``(params, errors)`` where *params* is a :class:`DhcpParameters` 103 instance (or ``None`` if /etc/default/isc-dhcp-server is absent/unreadable) 104 and *errors* is the number of parse errors found in dhcpd.conf. 105 """ 106 interfaces_v4 = parse_ipv4_interfaces_in_default_dhcp_server_file(grade, machine, step) 107 if interfaces_v4 is None: 108 return None, 1 109 interfaces_v6 = parse_ipv6_interfaces_in_default_dhcp_server_file(grade, machine, step) or [] 110 111 parsed = _parse_dhcpd_config(grade, machine, step) 112 errors: int = parsed['errors'] 113 gp = parsed['global_parameters'] 114 115 authoritative = bool(gp.get('authoritative', False)) 116 _dlt = gp.get('default-lease-time', None) 117 _mlt = gp.get('max-lease-time', None) 118 default_lease_time: int | None = int(_dlt) if _dlt is not None else None 119 max_lease_time: int | None = int(_mlt) if _mlt is not None else None 120 ddns_update_style = str(gp.get('ddns-update-style', 'none')) 121 122 subnets: list[DhcpSubnet] = [] 123 for net_addr, sp in parsed['subnets'].items(): 124 # Reconstruct IPv4Network 125 try: 126 subnet_net = IPv4Network(f'{net_addr}/{sp["netmask"]}', strict=False) 127 except (KeyError, ValueError): 128 errors += 1 129 continue 130 131 # Parse pool range — stored as "start end" or "dynamic-bootp start end" 132 range_val = sp.get('range', '') 133 range_parts = range_val.split() 134 if range_parts and range_parts[0].lower() == 'dynamic-bootp': 135 range_parts = range_parts[1:] 136 try: 137 range_start = IPv4Address(range_parts[0]) 138 range_end = IPv4Address(range_parts[1] if len(range_parts) > 1 else range_parts[0]) 139 except (IndexError, ValueError): 140 errors += 1 141 continue 142 143 def _parse_addr_list(val: str) -> list[IPv4Address]: 144 result = [] 145 for part in re.split(r'[,\s]+', val.strip()): 146 if part: 147 try: 148 result.append(IPv4Address(part)) 149 except ValueError: 150 pass 151 return result 152 153 routers = _parse_addr_list(sp.get('option routers', '')) 154 dns_servers = _parse_addr_list(sp.get('option domain-name-servers', '')) 155 156 domain_name = sp.get('option domain-name', None) 157 if domain_name is not None: 158 domain_name = domain_name.strip('"') 159 160 bcast_str = sp.get('option broadcast-address', None) 161 try: 162 broadcast_address = IPv4Address(bcast_str) if bcast_str else None 163 except ValueError: 164 broadcast_address = None 165 166 # Only set per-subnet overrides when they differ from the global values 167 sub_dlt = sp.get('default-lease-time', None) 168 sub_mlt = sp.get('max-lease-time', None) 169 try: 170 sub_dlt = int(sub_dlt) if sub_dlt is not None else None 171 sub_mlt = int(sub_mlt) if sub_mlt is not None else None 172 except ValueError: 173 sub_dlt = sub_mlt = None 174 subnet_default_lease = sub_dlt if sub_dlt != default_lease_time else None 175 subnet_max_lease = sub_mlt if sub_mlt != max_lease_time else None 176 177 subnets.append(DhcpSubnet( 178 subnet=subnet_net, 179 range_start=range_start, 180 range_end=range_end, 181 routers=routers, 182 dns_servers=dns_servers, 183 domain_name=domain_name, 184 broadcast_address=broadcast_address, 185 default_lease_time=subnet_default_lease, 186 max_lease_time=subnet_max_lease, 187 fixed_addresses={str(k): str(v) for k, v in sp.get('fixed-addresses', {}).items()}, 188 )) 189 190 params = DhcpParameters( 191 interfaces_v4=interfaces_v4, 192 interfaces_v6=interfaces_v6, 193 subnets=subnets, 194 authoritative=authoritative, 195 default_lease_time=default_lease_time, 196 max_lease_time=max_lease_time, 197 ddns_update_style=ddns_update_style, 198 ) 199 return params, errors 200 201 202def _parse_dhcpd_interfaces(cmdline_output: str) -> list[str]: 203 """Extract interface names from dhcpd cmdline tokens (one token per line). 204 205 Returns the list of interface names passed to dhcpd, or ``["*"]`` if none 206 were specified (meaning dhcpd listens on all interfaces). 207 """ 208 # Flags that consume the next token as their value argument. 209 VALUE_FLAGS = { 210 '-cf', '-lf', '-pf', '-tf', '-sf', '-hpf', 211 '-user', '-group', '-chroot', '-port', '-relay', 212 } 213 tokens = [t for t in cmdline_output.splitlines() if t.strip()] 214 interfaces = [] 215 skip_next = False 216 for i, tok in enumerate(tokens): 217 if i == 0: 218 continue # executable path (e.g. /usr/sbin/dhcpd) 219 if skip_next: 220 skip_next = False 221 continue 222 if tok in VALUE_FLAGS: 223 skip_next = True 224 continue 225 if tok.startswith('-'): 226 continue 227 interfaces.append(tok) 228 return interfaces if interfaces else ['*'] 229 230 231def check_running_dhcp_server(grade: Grade0, machine: str) -> tuple[bool, list[str]]: 232 """Check whether ISC DHCP server is running on *machine*. 233 234 Returns a tuple ``(running, interfaces)`` where: 235 236 - *running*: ``True`` if ``isc-dhcp-server`` is currently active. 237 - *interfaces*: list of interface names dhcpd is bound to (e.g. 238 ``["eth0", "eth1"]``), or ``["*"]`` if it listens on all interfaces. 239 Empty list when *running* is ``False``. 240 241 Listening interfaces are read from the running process command line so 242 they reflect the actual state, not just the configuration file. 243 """ 244 _, code = grade.test(machine, 'systemctl is-active isc-dhcp-server', 245 allow_error=True) 246 if code != 0: 247 return False, [] 248 249 cmdline_out, _ = grade.test( 250 machine, 251 r"tr '\000' '\n' < /proc/$(pidof -s dhcpd)/cmdline 2>/dev/null", 252 allow_error=True, 253 ) 254 return True, _parse_dhcpd_interfaces(cmdline_out) 255 256 257def parse_ipv4_interfaces_in_default_dhcp_server_file( 258 grade: Grade0, machine: str, step: int = 1) -> list[str] | None: 259 """Return the list of interfaces from INTERFACESv4 in /etc/default/isc-dhcp-server. 260 261 Returns None if the file is absent or unreadable. 262 Returns an empty list if the file exists but INTERFACESv4 is not set. 263 """ 264 output, code = grade.test(machine, 'cat /etc/default/isc-dhcp-server', 265 step=step, allow_error=True) 266 if code != 0: 267 return None 268 for line in output.splitlines(): 269 line = line.strip() 270 if line.startswith('#'): 271 continue 272 m = re.match(r'^INTERFACESv4\s*=\s*"([^"]*)"', line) 273 if m: 274 return m.group(1).split() 275 return [] 276 277 278def parse_ipv6_interfaces_in_default_dhcp_server_file( 279 grade: Grade0, machine: str, step: int = 1) -> list[str] | None: 280 """Return the list of interfaces from INTERFACESv6 in /etc/default/isc-dhcp-server. 281 282 Returns None if the file is absent or unreadable. 283 Returns an empty list if the file exists but INTERFACESv6 is not set. 284 """ 285 output, code = grade.test(machine, 'cat /etc/default/isc-dhcp-server', 286 step=step, allow_error=True) 287 if code != 0: 288 return None 289 for line in output.splitlines(): 290 line = line.strip() 291 if line.startswith('#'): 292 continue 293 m = re.match(r'^INTERFACESv6\s*=\s*"([^"]*)"', line) 294 if m: 295 return m.group(1).split() 296 return [] 297 298 299def _parse_dhcpd_config(grade: Grade0, machine: str, step: int = 1) -> dict: 300 """Parse /etc/dhcp/dhcpd.conf on *machine* via grade.test() and return a 301 structured dict describing the ISC DHCP server configuration. 302 303 Returns a dict with three keys: 304 305 ``errors`` 306 Number of syntactically incorrect or unrecognised statements. 307 308 ``global_parameters`` 309 Dict of top-level parameters (name → value). ``option`` parameters are 310 stored with their full two-word key, e.g. ``"option domain-name-servers"``. 311 ``authoritative`` maps to ``True``/``False``. All other parameters map 312 to their value as a string (or space-joined string for multi-token values). 313 314 ``subnets`` 315 Dict keyed by the network address string (e.g. ``"10.152.187.0"``). 316 Each value is a dict of *effective* parameters: global parameters are 317 copied first, then subnet-level declarations override them. A special 318 key ``"fixed-addresses"`` holds a ``{MAC: IP-or-hostname}`` mapping 319 assembled from all ``host`` blocks whose ``fixed-address`` falls inside 320 that subnet (or that were declared directly inside that subnet block). 321 """ 322 output, code = grade.test(machine, 'cat /etc/dhcp/dhcpd.conf', step=step) 323 324 empty: dict[str, Any] = {'errors': 0, 'global_parameters': {}, 'subnets': {}} 325 if code != 0 or not output: 326 return empty 327 328 # ------------------------------------------------------------------ # 329 # Strip comments: /* … */, // … \n, # … \n # 330 # ------------------------------------------------------------------ # 331 text = re.sub(r'/\*.*?\*/', '', output, flags=re.DOTALL) 332 text = re.sub(r'(?:#|//).*', '', text) 333 334 # ------------------------------------------------------------------ # 335 # Tokenise: { } ; are single-character tokens; quoted strings are # 336 # kept intact; everything else is split on whitespace. # 337 # ------------------------------------------------------------------ # 338 token_re = re.compile(r'[{};]|"[^"]*"|[^\s{};]+') 339 tokens = token_re.findall(text) 340 341 # ------------------------------------------------------------------ # 342 # Known single-value global / subnet keywords # 343 # ------------------------------------------------------------------ # 344 _KNOWN_PARAMS = { 345 'default-lease-time', 'max-lease-time', 'min-lease-time', 346 'ddns-update-style', 'log-facility', 'server-identifier', 347 'filename', 'next-server', 'server-name', 348 'use-host-decl-names', 'get-lease-hostnames', 'ping-check', 349 'one-lease-per-client', 'dynamic-bootp-lease-length', 350 'lease-file-name', 'pid-file-name', 'omapi-port', 351 'update-conflict-detection', 'update-optimization', 352 'stash-agent-options', 'local-port', 'remote-port', 353 'db-time-format', 'bootp-lease-length', 'min-secs', 354 'always-reply-rfc1048', 'server-name', 355 } 356 357 _KNOWN_HOST_PARAMS = { 358 'hardware', 'fixed-address', 'filename', 'next-server', 359 'server-name', 'client-identifier', 'ddns-hostname', 360 'ddns-domainname', 'option', 'supersede', 'prepend', 'append', 361 'default', 'deny', 'allow', 'ignore', 362 } 363 364 # ------------------------------------------------------------------ # 365 # Parser state # 366 # ------------------------------------------------------------------ # 367 errors: int = 0 368 369 # Collected raw data 370 global_params: dict[str, Any] = {} 371 # net_addr -> {'netmask': str, 'fixed-addresses': {}, param: val, ...} 372 subnets: dict[str, dict[str, Any]] = {} 373 374 # Hosts collected during parsing: (parent_subnet_net or None, mac, ip) 375 all_hosts: list[tuple[str | None, str, str]] = [] 376 377 # Context stack: list of (kind, data) 378 # kind = 'global' | 'subnet' | 'host' | 'other' 379 # data = net_addr str for 'subnet', host-info dict for 'host', None otherwise 380 context_stack: list[tuple[str, Any]] = [('global', None)] 381 382 pending: list[str] = [] # words accumulated before the next ; or { 383 384 def _find_parent_subnet() -> str | None: 385 for frame in reversed(context_stack): 386 if frame[0] == 'subnet': 387 return frame[1] 388 return None 389 390 def _parse_param(words: list[str]) -> tuple[str, Any] | None: 391 """Return (key, value) for a recognised parameter, or None on error.""" 392 if not words: 393 return None 394 kw = words[0].lower() 395 rest = words[1:] 396 397 if kw == 'authoritative': 398 return 'authoritative', True 399 400 if kw == 'not' and rest and rest[0].lower() == 'authoritative': 401 return 'authoritative', False 402 403 if kw == 'option': 404 if not rest: 405 return None 406 opt_name = rest[0].lower() 407 opt_val = ' '.join(v.strip('"') for v in rest[1:]) if len(rest) > 1 else '' 408 return f'option {opt_name}', opt_val 409 410 if kw in ('allow', 'deny', 'ignore'): 411 if not rest: 412 return None 413 return f'{kw} {" ".join(rest)}', True 414 415 if kw == 'range': 416 # range [dynamic-bootp] start [end] 417 return 'range', ' '.join(rest) 418 419 if kw == 'include': 420 # include "file"; — silently skip 421 return 'include', ' '.join(v.strip('"') for v in rest) 422 423 if kw in _KNOWN_PARAMS: 424 val = ' '.join(v.strip('"') for v in rest) if rest else '' 425 return kw, val 426 427 return None # unrecognised keyword 428 429 # ------------------------------------------------------------------ # 430 # Main token loop # 431 # ------------------------------------------------------------------ # 432 for tok in tokens: 433 434 if tok == ';': 435 if not pending: 436 continue # empty statement is fine 437 438 kw = pending[0].lower() 439 kind, data = context_stack[-1] 440 441 if kind == 'host': 442 if kw == 'hardware' and len(pending) >= 3 and pending[1].lower() == 'ethernet': 443 data['mac'] = pending[2].lower() 444 elif kw == 'fixed-address' and len(pending) >= 2: 445 data['ip'] = pending[1] 446 elif kw in _KNOWN_HOST_PARAMS: 447 pass # valid host option, not needed for our output 448 else: 449 errors += 1 450 451 elif kind in ('global', 'subnet'): 452 parsed = _parse_param(pending) 453 if parsed is not None: 454 target = global_params if kind == 'global' else subnets[data] 455 key, val = parsed 456 if key != 'include': # do not store include directives 457 target[key] = val 458 else: 459 errors += 1 460 461 # else: inside 'other' (pool, group, shared-network, …) — ignore content 462 463 pending = [] 464 465 elif tok == '{': 466 kw = pending[0].lower() if pending else '' 467 468 if kw == 'subnet': 469 if len(pending) >= 4 and pending[2].lower() == 'netmask': 470 net_addr = pending[1] 471 netmask = pending[3] 472 subnets[net_addr] = {'netmask': netmask, 'fixed-addresses': {}} 473 context_stack.append(('subnet', net_addr)) 474 else: 475 errors += 1 476 context_stack.append(('other', None)) 477 478 elif kw == 'host': 479 if len(pending) >= 2: 480 host_info: dict[str, Any] = { 481 'mac': None, 482 'ip': None, 483 'parent_subnet': _find_parent_subnet(), 484 } 485 context_stack.append(('host', host_info)) 486 else: 487 errors += 1 488 context_stack.append(('other', None)) 489 490 elif kw in ('shared-network', 'group', 'class', 'subclass', 491 'pool', 'failover', 'peer', 'key', 'zone', 492 'on', 'if', 'elsif', 'else'): 493 context_stack.append(('other', None)) 494 495 elif not pending: 496 # Bare { with no preceding keyword 497 errors += 1 498 context_stack.append(('other', None)) 499 500 else: 501 errors += 1 502 context_stack.append(('other', None)) 503 504 pending = [] 505 506 elif tok == '}': 507 if len(context_stack) <= 1: 508 errors += 1 # unmatched closing brace 509 pending = [] 510 continue 511 512 kind, data = context_stack.pop() 513 514 if kind == 'host' and data is not None: 515 mac = data.get('mac') 516 ip = data.get('ip') 517 parent = data.get('parent_subnet') 518 if mac and ip: 519 all_hosts.append((parent, mac, ip)) 520 521 pending = [] 522 523 else: 524 pending.append(tok) 525 526 # Unfinished statement at end of file (missing semicolon) 527 if pending: 528 errors += 1 529 530 # Unclosed blocks (missing closing braces) 531 errors += max(0, len(context_stack) - 1) 532 533 # ------------------------------------------------------------------ # 534 # Assign collected hosts to their subnets # 535 # ------------------------------------------------------------------ # 536 # Build (IPv4Network, net_addr_str) pairs for membership tests 537 subnet_nets: list[tuple[IPv4Network, str]] = [] 538 for net_addr, subnet_data in subnets.items(): 539 netmask = subnet_data.get('netmask', '') 540 try: 541 subnet_nets.append((IPv4Network(f'{net_addr}/{netmask}', strict=False), net_addr)) 542 except ValueError: 543 pass 544 545 for parent, mac, ip in all_hosts: 546 # If the host was declared directly inside a subnet block, use that subnet. 547 if parent is not None and parent in subnets: 548 subnets[parent]['fixed-addresses'][mac] = ip 549 continue 550 # Otherwise resolve by checking which subnet the fixed-address belongs to. 551 try: 552 host_ip = IPv4Address(ip) 553 except ValueError: 554 continue # ip is a hostname — can't resolve to a subnet here 555 for net, net_addr in subnet_nets: 556 if host_ip in net: 557 subnets[net_addr]['fixed-addresses'][mac] = ip 558 break 559 560 # ------------------------------------------------------------------ # 561 # Build effective per-subnet parameters (global ← subnet overrides) # 562 # ------------------------------------------------------------------ # 563 effective_subnets: dict[str, dict[str, Any]] = {} 564 for net_addr, subnet_data in subnets.items(): 565 fixed = subnet_data.pop('fixed-addresses') 566 effective: dict[str, Any] = {**global_params, **subnet_data} 567 effective['fixed-addresses'] = fixed 568 effective_subnets[net_addr] = effective 569 570 return { 571 'errors': errors, 572 'global_parameters': global_params, 573 'subnets': effective_subnets, 574 }
10@dataclass 11class DhcpSubnet: 12 """Configuration for a single DHCP subnet declaration.""" 13 subnet: IPv4Network 14 range_start: IPv4Address 15 range_end: IPv4Address 16 routers: list[IPv4Address] = field(default_factory=list) 17 dns_servers: list[IPv4Address] = field(default_factory=list) 18 domain_name: str | None = None 19 broadcast_address: IPv4Address | None = None 20 default_lease_time: int | None = None # subnet-level override; None = use global 21 max_lease_time: int | None = None # subnet-level override; None = use global 22 fixed_addresses: dict[str, str] = field(default_factory=dict) # MAC → IP string
Configuration for a single DHCP subnet declaration.
25@dataclass 26class DhcpParameters: 27 """All parameters required to configure an ISC DHCP server on a Debian machine. 28 29 Covers both /etc/default/isc-dhcp-server (``interfaces_v4``, ``interfaces_v6``) and 30 /etc/dhcp/dhcpd.conf (everything else). 31 """ 32 interfaces_v4: list[str] # written to INTERFACESv4 in /etc/default/isc-dhcp-server 33 interfaces_v6: list[str] = field(default_factory=list) # written to INTERFACESv6 34 subnets: list[DhcpSubnet] = field(default_factory=list) 35 authoritative: bool = True 36 default_lease_time: int | None = None # omitted from global section when None 37 max_lease_time: int | None = None # omitted from global section when None 38 ddns_update_style: str = 'none'
All parameters required to configure an ISC DHCP server on a Debian machine.
Covers both /etc/default/isc-dhcp-server (interfaces_v4, interfaces_v6) and
/etc/dhcp/dhcpd.conf (everything else).
41def set_dhcp_server(net_scheme: NetScheme0, machine: str, 42 dhcp_params: DhcpParameters, step: int = 1) -> None: 43 """Write DHCP server configuration files and (re)start the service on *machine_name*. 44 45 Writes: 46 - /etc/default/isc-dhcp-server (INTERFACESv4) 47 - /etc/dhcp/dhcpd.conf (generated from dhcp_params) 48 49 Then runs ``systemctl enable`` and ``systemctl restart`` for isc-dhcp-server. 50 """ 51 # /etc/default/isc-dhcp-server 52 ifaces_v4 = ' '.join(dhcp_params.interfaces_v4) 53 ifaces_v6 = ' '.join(dhcp_params.interfaces_v6) 54 default_content = f'INTERFACESv4="{ifaces_v4}"\nINTERFACESv6="{ifaces_v6}"\n' 55 net_scheme.file(machine, '/etc/default/isc-dhcp-server', default_content, step=step) 56 57 # /etc/dhcp/dhcpd.conf 58 lines = [] 59 auth = 'authoritative' if dhcp_params.authoritative else 'not authoritative' 60 lines += [ 61 f'ddns-update-style {dhcp_params.ddns_update_style};', 62 f'{auth};', 63 ] 64 if dhcp_params.default_lease_time is not None: 65 lines.append(f'default-lease-time {dhcp_params.default_lease_time};') 66 if dhcp_params.max_lease_time is not None: 67 lines.append(f'max-lease-time {dhcp_params.max_lease_time};') 68 for s in dhcp_params.subnets: 69 lines.append('') 70 lines.append(f'subnet {s.subnet.network_address} netmask {s.subnet.netmask} {{') 71 lines.append(f' range {s.range_start} {s.range_end};') 72 if s.routers: 73 lines.append(f' option routers {", ".join(str(r) for r in s.routers)};') 74 if s.dns_servers: 75 lines.append(f' option domain-name-servers {", ".join(str(d) for d in s.dns_servers)};') 76 if s.domain_name is not None: 77 lines.append(f' option domain-name "{s.domain_name}";') 78 if s.broadcast_address is not None: 79 lines.append(f' option broadcast-address {s.broadcast_address};') 80 if s.default_lease_time is not None: 81 lines.append(f' default-lease-time {s.default_lease_time};') 82 if s.max_lease_time is not None: 83 lines.append(f' max-lease-time {s.max_lease_time};') 84 for mac, ip in s.fixed_addresses.items(): 85 safe = mac.replace(':', '-') 86 lines += [ 87 f' host {safe} {{', 88 f' hardware ethernet {mac};', 89 f' fixed-address {ip};', 90 f' }}', 91 ] 92 lines.append('}') 93 net_scheme.file(machine, '/etc/dhcp/dhcpd.conf', '\n'.join(lines) + '\n', step=step) 94 95 net_scheme.cmd(machine, 'systemctl enable isc-dhcp-server', step=step) 96 net_scheme.cmd(machine, 'systemctl restart isc-dhcp-server', step=step)
Write DHCP server configuration files and (re)start the service on machine_name.
Writes:
- /etc/default/isc-dhcp-server (INTERFACESv4)
- /etc/dhcp/dhcpd.conf (generated from dhcp_params)
Then runs systemctl enable and systemctl restart for isc-dhcp-server.
99def get_dhcp_server(grade: Grade0, machine: str, 100 step: int = 1) -> tuple[DhcpParameters | None, int]: 101 """Read and parse the DHCP server configuration on *machine_name*. 102 103 Returns ``(params, errors)`` where *params* is a :class:`DhcpParameters` 104 instance (or ``None`` if /etc/default/isc-dhcp-server is absent/unreadable) 105 and *errors* is the number of parse errors found in dhcpd.conf. 106 """ 107 interfaces_v4 = parse_ipv4_interfaces_in_default_dhcp_server_file(grade, machine, step) 108 if interfaces_v4 is None: 109 return None, 1 110 interfaces_v6 = parse_ipv6_interfaces_in_default_dhcp_server_file(grade, machine, step) or [] 111 112 parsed = _parse_dhcpd_config(grade, machine, step) 113 errors: int = parsed['errors'] 114 gp = parsed['global_parameters'] 115 116 authoritative = bool(gp.get('authoritative', False)) 117 _dlt = gp.get('default-lease-time', None) 118 _mlt = gp.get('max-lease-time', None) 119 default_lease_time: int | None = int(_dlt) if _dlt is not None else None 120 max_lease_time: int | None = int(_mlt) if _mlt is not None else None 121 ddns_update_style = str(gp.get('ddns-update-style', 'none')) 122 123 subnets: list[DhcpSubnet] = [] 124 for net_addr, sp in parsed['subnets'].items(): 125 # Reconstruct IPv4Network 126 try: 127 subnet_net = IPv4Network(f'{net_addr}/{sp["netmask"]}', strict=False) 128 except (KeyError, ValueError): 129 errors += 1 130 continue 131 132 # Parse pool range — stored as "start end" or "dynamic-bootp start end" 133 range_val = sp.get('range', '') 134 range_parts = range_val.split() 135 if range_parts and range_parts[0].lower() == 'dynamic-bootp': 136 range_parts = range_parts[1:] 137 try: 138 range_start = IPv4Address(range_parts[0]) 139 range_end = IPv4Address(range_parts[1] if len(range_parts) > 1 else range_parts[0]) 140 except (IndexError, ValueError): 141 errors += 1 142 continue 143 144 def _parse_addr_list(val: str) -> list[IPv4Address]: 145 result = [] 146 for part in re.split(r'[,\s]+', val.strip()): 147 if part: 148 try: 149 result.append(IPv4Address(part)) 150 except ValueError: 151 pass 152 return result 153 154 routers = _parse_addr_list(sp.get('option routers', '')) 155 dns_servers = _parse_addr_list(sp.get('option domain-name-servers', '')) 156 157 domain_name = sp.get('option domain-name', None) 158 if domain_name is not None: 159 domain_name = domain_name.strip('"') 160 161 bcast_str = sp.get('option broadcast-address', None) 162 try: 163 broadcast_address = IPv4Address(bcast_str) if bcast_str else None 164 except ValueError: 165 broadcast_address = None 166 167 # Only set per-subnet overrides when they differ from the global values 168 sub_dlt = sp.get('default-lease-time', None) 169 sub_mlt = sp.get('max-lease-time', None) 170 try: 171 sub_dlt = int(sub_dlt) if sub_dlt is not None else None 172 sub_mlt = int(sub_mlt) if sub_mlt is not None else None 173 except ValueError: 174 sub_dlt = sub_mlt = None 175 subnet_default_lease = sub_dlt if sub_dlt != default_lease_time else None 176 subnet_max_lease = sub_mlt if sub_mlt != max_lease_time else None 177 178 subnets.append(DhcpSubnet( 179 subnet=subnet_net, 180 range_start=range_start, 181 range_end=range_end, 182 routers=routers, 183 dns_servers=dns_servers, 184 domain_name=domain_name, 185 broadcast_address=broadcast_address, 186 default_lease_time=subnet_default_lease, 187 max_lease_time=subnet_max_lease, 188 fixed_addresses={str(k): str(v) for k, v in sp.get('fixed-addresses', {}).items()}, 189 )) 190 191 params = DhcpParameters( 192 interfaces_v4=interfaces_v4, 193 interfaces_v6=interfaces_v6, 194 subnets=subnets, 195 authoritative=authoritative, 196 default_lease_time=default_lease_time, 197 max_lease_time=max_lease_time, 198 ddns_update_style=ddns_update_style, 199 ) 200 return params, errors
Read and parse the DHCP server configuration on machine_name.
Returns (params, errors) where params is a DhcpParameters
instance (or None if /etc/default/isc-dhcp-server is absent/unreadable)
and errors is the number of parse errors found in dhcpd.conf.
232def check_running_dhcp_server(grade: Grade0, machine: str) -> tuple[bool, list[str]]: 233 """Check whether ISC DHCP server is running on *machine*. 234 235 Returns a tuple ``(running, interfaces)`` where: 236 237 - *running*: ``True`` if ``isc-dhcp-server`` is currently active. 238 - *interfaces*: list of interface names dhcpd is bound to (e.g. 239 ``["eth0", "eth1"]``), or ``["*"]`` if it listens on all interfaces. 240 Empty list when *running* is ``False``. 241 242 Listening interfaces are read from the running process command line so 243 they reflect the actual state, not just the configuration file. 244 """ 245 _, code = grade.test(machine, 'systemctl is-active isc-dhcp-server', 246 allow_error=True) 247 if code != 0: 248 return False, [] 249 250 cmdline_out, _ = grade.test( 251 machine, 252 r"tr '\000' '\n' < /proc/$(pidof -s dhcpd)/cmdline 2>/dev/null", 253 allow_error=True, 254 ) 255 return True, _parse_dhcpd_interfaces(cmdline_out)
Check whether ISC DHCP server is running on machine.
Returns a tuple (running, interfaces) where:
- running:
Trueifisc-dhcp-serveris currently active. - interfaces: list of interface names dhcpd is bound to (e.g.
["eth0", "eth1"]), or["*"]if it listens on all interfaces. Empty list when running isFalse.
Listening interfaces are read from the running process command line so they reflect the actual state, not just the configuration file.
258def parse_ipv4_interfaces_in_default_dhcp_server_file( 259 grade: Grade0, machine: str, step: int = 1) -> list[str] | None: 260 """Return the list of interfaces from INTERFACESv4 in /etc/default/isc-dhcp-server. 261 262 Returns None if the file is absent or unreadable. 263 Returns an empty list if the file exists but INTERFACESv4 is not set. 264 """ 265 output, code = grade.test(machine, 'cat /etc/default/isc-dhcp-server', 266 step=step, allow_error=True) 267 if code != 0: 268 return None 269 for line in output.splitlines(): 270 line = line.strip() 271 if line.startswith('#'): 272 continue 273 m = re.match(r'^INTERFACESv4\s*=\s*"([^"]*)"', line) 274 if m: 275 return m.group(1).split() 276 return []
Return the list of interfaces from INTERFACESv4 in /etc/default/isc-dhcp-server.
Returns None if the file is absent or unreadable. Returns an empty list if the file exists but INTERFACESv4 is not set.
279def parse_ipv6_interfaces_in_default_dhcp_server_file( 280 grade: Grade0, machine: str, step: int = 1) -> list[str] | None: 281 """Return the list of interfaces from INTERFACESv6 in /etc/default/isc-dhcp-server. 282 283 Returns None if the file is absent or unreadable. 284 Returns an empty list if the file exists but INTERFACESv6 is not set. 285 """ 286 output, code = grade.test(machine, 'cat /etc/default/isc-dhcp-server', 287 step=step, allow_error=True) 288 if code != 0: 289 return None 290 for line in output.splitlines(): 291 line = line.strip() 292 if line.startswith('#'): 293 continue 294 m = re.match(r'^INTERFACESv6\s*=\s*"([^"]*)"', line) 295 if m: 296 return m.group(1).split() 297 return []
Return the list of interfaces from INTERFACESv6 in /etc/default/isc-dhcp-server.
Returns None if the file is absent or unreadable. Returns an empty list if the file exists but INTERFACESv6 is not set.