lib.dhcp

  1import re
  2from dataclasses import dataclass, field
  3from ipaddress import IPv4Address, IPv4Network
  4from typing import Any
  5
  6from SRE.lib_sre import Grade0, NetScheme0
  7
  8
  9@dataclass
 10class DhcpSubnet:
 11    """Configuration for a single DHCP subnet declaration."""
 12    subnet: IPv4Network
 13    range_start: IPv4Address
 14    range_end: IPv4Address
 15    routers: list[IPv4Address]            = field(default_factory=list)
 16    dns_servers: list[IPv4Address]        = field(default_factory=list)
 17    domain_name: str | None               = None
 18    broadcast_address: IPv4Address | None = None
 19    default_lease_time: int | None        = None  # subnet-level override; None = use global
 20    max_lease_time: int | None            = None  # subnet-level override; None = use global
 21    fixed_addresses: dict[str, str]       = field(default_factory=dict)  # MAC → IP string
 22
 23
 24@dataclass
 25class DhcpParameters:
 26    """All parameters required to configure an ISC DHCP server on a Debian machine.
 27
 28    Covers both /etc/default/isc-dhcp-server (``interfaces_v4``, ``interfaces_v6``) and
 29    /etc/dhcp/dhcpd.conf (everything else).
 30    """
 31    interfaces_v4: list[str]            # written to INTERFACESv4 in /etc/default/isc-dhcp-server
 32    interfaces_v6: list[str]            = field(default_factory=list)  # written to INTERFACESv6
 33    subnets: list[DhcpSubnet]           = field(default_factory=list)
 34    authoritative: bool                 = True
 35    default_lease_time: int | None      = None  # omitted from global section when None
 36    max_lease_time: int | None          = None  # omitted from global section when None
 37    ddns_update_style: str              = 'none'
 38
 39
 40def set_dhcp_server(net_scheme: NetScheme0, machine: str,
 41                    dhcp_params: DhcpParameters, step: int = 1) -> None:
 42    """Write DHCP server configuration files and (re)start the service on *machine_name*.
 43
 44    Writes:
 45    - /etc/default/isc-dhcp-server  (INTERFACESv4)
 46    - /etc/dhcp/dhcpd.conf          (generated from dhcp_params)
 47
 48    Then runs ``systemctl enable`` and ``systemctl restart`` for isc-dhcp-server.
 49    """
 50    # /etc/default/isc-dhcp-server
 51    ifaces_v4 = ' '.join(dhcp_params.interfaces_v4)
 52    ifaces_v6 = ' '.join(dhcp_params.interfaces_v6)
 53    default_content = f'INTERFACESv4="{ifaces_v4}"\nINTERFACESv6="{ifaces_v6}"\n'
 54    net_scheme.file(machine, '/etc/default/isc-dhcp-server', default_content, step=step)
 55
 56    # /etc/dhcp/dhcpd.conf
 57    lines = []
 58    auth = 'authoritative' if dhcp_params.authoritative else 'not authoritative'
 59    lines += [
 60        f'ddns-update-style {dhcp_params.ddns_update_style};',
 61        f'{auth};',
 62    ]
 63    if dhcp_params.default_lease_time is not None:
 64        lines.append(f'default-lease-time {dhcp_params.default_lease_time};')
 65    if dhcp_params.max_lease_time is not None:
 66        lines.append(f'max-lease-time {dhcp_params.max_lease_time};')
 67    for s in dhcp_params.subnets:
 68        lines.append('')
 69        lines.append(f'subnet {s.subnet.network_address} netmask {s.subnet.netmask} {{')
 70        lines.append(f'    range {s.range_start} {s.range_end};')
 71        if s.routers:
 72            lines.append(f'    option routers {", ".join(str(r) for r in s.routers)};')
 73        if s.dns_servers:
 74            lines.append(f'    option domain-name-servers {", ".join(str(d) for d in s.dns_servers)};')
 75        if s.domain_name is not None:
 76            lines.append(f'    option domain-name "{s.domain_name}";')
 77        if s.broadcast_address is not None:
 78            lines.append(f'    option broadcast-address {s.broadcast_address};')
 79        if s.default_lease_time is not None:
 80            lines.append(f'    default-lease-time {s.default_lease_time};')
 81        if s.max_lease_time is not None:
 82            lines.append(f'    max-lease-time {s.max_lease_time};')
 83        for mac, ip in s.fixed_addresses.items():
 84            safe = mac.replace(':', '-')
 85            lines += [
 86                f'    host {safe} {{',
 87                f'        hardware ethernet {mac};',
 88                f'        fixed-address {ip};',
 89                f'    }}',
 90            ]
 91        lines.append('}')
 92    net_scheme.file(machine, '/etc/dhcp/dhcpd.conf', '\n'.join(lines) + '\n', step=step)
 93
 94    net_scheme.cmd(machine, 'systemctl enable isc-dhcp-server', step=step)
 95    net_scheme.cmd(machine, 'systemctl restart isc-dhcp-server', step=step)
 96
 97
 98def get_dhcp_server(grade: Grade0, machine: str,
 99                    step: int = 1) -> tuple[DhcpParameters | None, int]:
100    """Read and parse the DHCP server configuration on *machine_name*.
101
102    Returns ``(params, errors)`` where *params* is a :class:`DhcpParameters`
103    instance (or ``None`` if /etc/default/isc-dhcp-server is absent/unreadable)
104    and *errors* is the number of parse errors found in dhcpd.conf.
105    """
106    interfaces_v4 = parse_ipv4_interfaces_in_default_dhcp_server_file(grade, machine, step)
107    if interfaces_v4 is None:
108        return None, 1
109    interfaces_v6 = parse_ipv6_interfaces_in_default_dhcp_server_file(grade, machine, step) or []
110
111    parsed = _parse_dhcpd_config(grade, machine, step)
112    errors: int = parsed['errors']
113    gp = parsed['global_parameters']
114
115    authoritative     = bool(gp.get('authoritative', False))
116    _dlt = gp.get('default-lease-time', None)
117    _mlt = gp.get('max-lease-time', None)
118    default_lease_time: int | None = int(_dlt) if _dlt is not None else None
119    max_lease_time: int | None     = int(_mlt) if _mlt is not None else None
120    ddns_update_style = str(gp.get('ddns-update-style', 'none'))
121
122    subnets: list[DhcpSubnet] = []
123    for net_addr, sp in parsed['subnets'].items():
124        # Reconstruct IPv4Network
125        try:
126            subnet_net = IPv4Network(f'{net_addr}/{sp["netmask"]}', strict=False)
127        except (KeyError, ValueError):
128            errors += 1
129            continue
130
131        # Parse pool range — stored as "start end" or "dynamic-bootp start end"
132        range_val = sp.get('range', '')
133        range_parts = range_val.split()
134        if range_parts and range_parts[0].lower() == 'dynamic-bootp':
135            range_parts = range_parts[1:]
136        try:
137            range_start = IPv4Address(range_parts[0])
138            range_end   = IPv4Address(range_parts[1] if len(range_parts) > 1 else range_parts[0])
139        except (IndexError, ValueError):
140            errors += 1
141            continue
142
143        def _parse_addr_list(val: str) -> list[IPv4Address]:
144            result = []
145            for part in re.split(r'[,\s]+', val.strip()):
146                if part:
147                    try:
148                        result.append(IPv4Address(part))
149                    except ValueError:
150                        pass
151            return result
152
153        routers = _parse_addr_list(sp.get('option routers', ''))
154        dns_servers = _parse_addr_list(sp.get('option domain-name-servers', ''))
155
156        domain_name = sp.get('option domain-name', None)
157        if domain_name is not None:
158            domain_name = domain_name.strip('"')
159
160        bcast_str = sp.get('option broadcast-address', None)
161        try:
162            broadcast_address = IPv4Address(bcast_str) if bcast_str else None
163        except ValueError:
164            broadcast_address = None
165
166        # Only set per-subnet overrides when they differ from the global values
167        sub_dlt = sp.get('default-lease-time', None)
168        sub_mlt = sp.get('max-lease-time', None)
169        try:
170            sub_dlt = int(sub_dlt) if sub_dlt is not None else None
171            sub_mlt = int(sub_mlt) if sub_mlt is not None else None
172        except ValueError:
173            sub_dlt = sub_mlt = None
174        subnet_default_lease = sub_dlt if sub_dlt != default_lease_time else None
175        subnet_max_lease     = sub_mlt if sub_mlt != max_lease_time else None
176
177        subnets.append(DhcpSubnet(
178            subnet=subnet_net,
179            range_start=range_start,
180            range_end=range_end,
181            routers=routers,
182            dns_servers=dns_servers,
183            domain_name=domain_name,
184            broadcast_address=broadcast_address,
185            default_lease_time=subnet_default_lease,
186            max_lease_time=subnet_max_lease,
187            fixed_addresses={str(k): str(v) for k, v in sp.get('fixed-addresses', {}).items()},
188        ))
189
190    params = DhcpParameters(
191        interfaces_v4=interfaces_v4,
192        interfaces_v6=interfaces_v6,
193        subnets=subnets,
194        authoritative=authoritative,
195        default_lease_time=default_lease_time,
196        max_lease_time=max_lease_time,
197        ddns_update_style=ddns_update_style,
198    )
199    return params, errors
200
201
202def _parse_dhcpd_interfaces(cmdline_output: str) -> list[str]:
203    """Extract interface names from dhcpd cmdline tokens (one token per line).
204
205    Returns the list of interface names passed to dhcpd, or ``["*"]`` if none
206    were specified (meaning dhcpd listens on all interfaces).
207    """
208    # Flags that consume the next token as their value argument.
209    VALUE_FLAGS = {
210        '-cf', '-lf', '-pf', '-tf', '-sf', '-hpf',
211        '-user', '-group', '-chroot', '-port', '-relay',
212    }
213    tokens = [t for t in cmdline_output.splitlines() if t.strip()]
214    interfaces = []
215    skip_next = False
216    for i, tok in enumerate(tokens):
217        if i == 0:
218            continue  # executable path (e.g. /usr/sbin/dhcpd)
219        if skip_next:
220            skip_next = False
221            continue
222        if tok in VALUE_FLAGS:
223            skip_next = True
224            continue
225        if tok.startswith('-'):
226            continue
227        interfaces.append(tok)
228    return interfaces if interfaces else ['*']
229
230
231def check_running_dhcp_server(grade: Grade0, machine: str) -> tuple[bool, list[str]]:
232    """Check whether ISC DHCP server is running on *machine*.
233
234    Returns a tuple ``(running, interfaces)`` where:
235
236    - *running*: ``True`` if ``isc-dhcp-server`` is currently active.
237    - *interfaces*: list of interface names dhcpd is bound to (e.g.
238      ``["eth0", "eth1"]``), or ``["*"]`` if it listens on all interfaces.
239      Empty list when *running* is ``False``.
240
241    Listening interfaces are read from the running process command line so
242    they reflect the actual state, not just the configuration file.
243    """
244    _, code = grade.test(machine, 'systemctl is-active isc-dhcp-server',
245                         allow_error=True)
246    if code != 0:
247        return False, []
248
249    cmdline_out, _ = grade.test(
250        machine,
251        r"tr '\000' '\n' < /proc/$(pidof -s dhcpd)/cmdline 2>/dev/null",
252        allow_error=True,
253    )
254    return True, _parse_dhcpd_interfaces(cmdline_out)
255
256
257def parse_ipv4_interfaces_in_default_dhcp_server_file(
258        grade: Grade0, machine: str, step: int = 1) -> list[str] | None:
259    """Return the list of interfaces from INTERFACESv4 in /etc/default/isc-dhcp-server.
260
261    Returns None if the file is absent or unreadable.
262    Returns an empty list if the file exists but INTERFACESv4 is not set.
263    """
264    output, code = grade.test(machine, 'cat /etc/default/isc-dhcp-server',
265                              step=step, allow_error=True)
266    if code != 0:
267        return None
268    for line in output.splitlines():
269        line = line.strip()
270        if line.startswith('#'):
271            continue
272        m = re.match(r'^INTERFACESv4\s*=\s*"([^"]*)"', line)
273        if m:
274            return m.group(1).split()
275    return []
276
277
278def parse_ipv6_interfaces_in_default_dhcp_server_file(
279        grade: Grade0, machine: str, step: int = 1) -> list[str] | None:
280    """Return the list of interfaces from INTERFACESv6 in /etc/default/isc-dhcp-server.
281
282    Returns None if the file is absent or unreadable.
283    Returns an empty list if the file exists but INTERFACESv6 is not set.
284    """
285    output, code = grade.test(machine, 'cat /etc/default/isc-dhcp-server',
286                              step=step, allow_error=True)
287    if code != 0:
288        return None
289    for line in output.splitlines():
290        line = line.strip()
291        if line.startswith('#'):
292            continue
293        m = re.match(r'^INTERFACESv6\s*=\s*"([^"]*)"', line)
294        if m:
295            return m.group(1).split()
296    return []
297
298
299def _parse_dhcpd_config(grade: Grade0, machine: str, step: int = 1) -> dict:
300    """Parse /etc/dhcp/dhcpd.conf on *machine* via grade.test() and return a
301    structured dict describing the ISC DHCP server configuration.
302
303    Returns a dict with three keys:
304
305    ``errors``
306        Number of syntactically incorrect or unrecognised statements.
307
308    ``global_parameters``
309        Dict of top-level parameters (name → value).  ``option`` parameters are
310        stored with their full two-word key, e.g. ``"option domain-name-servers"``.
311        ``authoritative`` maps to ``True``/``False``.  All other parameters map
312        to their value as a string (or space-joined string for multi-token values).
313
314    ``subnets``
315        Dict keyed by the network address string (e.g. ``"10.152.187.0"``).
316        Each value is a dict of *effective* parameters: global parameters are
317        copied first, then subnet-level declarations override them.  A special
318        key ``"fixed-addresses"`` holds a ``{MAC: IP-or-hostname}`` mapping
319        assembled from all ``host`` blocks whose ``fixed-address`` falls inside
320        that subnet (or that were declared directly inside that subnet block).
321    """
322    output, code = grade.test(machine, 'cat /etc/dhcp/dhcpd.conf', step=step)
323
324    empty: dict[str, Any] = {'errors': 0, 'global_parameters': {}, 'subnets': {}}
325    if code != 0 or not output:
326        return empty
327
328    # ------------------------------------------------------------------ #
329    # Strip comments: /* … */,  // … \n,  # … \n                         #
330    # ------------------------------------------------------------------ #
331    text = re.sub(r'/\*.*?\*/', '', output, flags=re.DOTALL)
332    text = re.sub(r'(?:#|//).*', '', text)
333
334    # ------------------------------------------------------------------ #
335    # Tokenise: {  }  ;  are single-character tokens; quoted strings are  #
336    # kept intact; everything else is split on whitespace.                #
337    # ------------------------------------------------------------------ #
338    token_re = re.compile(r'[{};]|"[^"]*"|[^\s{};]+')
339    tokens = token_re.findall(text)
340
341    # ------------------------------------------------------------------ #
342    # Known single-value global / subnet keywords                         #
343    # ------------------------------------------------------------------ #
344    _KNOWN_PARAMS = {
345        'default-lease-time', 'max-lease-time', 'min-lease-time',
346        'ddns-update-style', 'log-facility', 'server-identifier',
347        'filename', 'next-server', 'server-name',
348        'use-host-decl-names', 'get-lease-hostnames', 'ping-check',
349        'one-lease-per-client', 'dynamic-bootp-lease-length',
350        'lease-file-name', 'pid-file-name', 'omapi-port',
351        'update-conflict-detection', 'update-optimization',
352        'stash-agent-options', 'local-port', 'remote-port',
353        'db-time-format', 'bootp-lease-length', 'min-secs',
354        'always-reply-rfc1048', 'server-name',
355    }
356
357    _KNOWN_HOST_PARAMS = {
358        'hardware', 'fixed-address', 'filename', 'next-server',
359        'server-name', 'client-identifier', 'ddns-hostname',
360        'ddns-domainname', 'option', 'supersede', 'prepend', 'append',
361        'default', 'deny', 'allow', 'ignore',
362    }
363
364    # ------------------------------------------------------------------ #
365    # Parser state                                                         #
366    # ------------------------------------------------------------------ #
367    errors: int = 0
368
369    # Collected raw data
370    global_params: dict[str, Any] = {}
371    # net_addr -> {'netmask': str, 'fixed-addresses': {}, param: val, ...}
372    subnets: dict[str, dict[str, Any]] = {}
373
374    # Hosts collected during parsing: (parent_subnet_net or None, mac, ip)
375    all_hosts: list[tuple[str | None, str, str]] = []
376
377    # Context stack: list of (kind, data)
378    #   kind = 'global' | 'subnet' | 'host' | 'other'
379    #   data = net_addr str for 'subnet', host-info dict for 'host', None otherwise
380    context_stack: list[tuple[str, Any]] = [('global', None)]
381
382    pending: list[str] = []   # words accumulated before the next ; or {
383
384    def _find_parent_subnet() -> str | None:
385        for frame in reversed(context_stack):
386            if frame[0] == 'subnet':
387                return frame[1]
388        return None
389
390    def _parse_param(words: list[str]) -> tuple[str, Any] | None:
391        """Return (key, value) for a recognised parameter, or None on error."""
392        if not words:
393            return None
394        kw = words[0].lower()
395        rest = words[1:]
396
397        if kw == 'authoritative':
398            return 'authoritative', True
399
400        if kw == 'not' and rest and rest[0].lower() == 'authoritative':
401            return 'authoritative', False
402
403        if kw == 'option':
404            if not rest:
405                return None
406            opt_name = rest[0].lower()
407            opt_val = ' '.join(v.strip('"') for v in rest[1:]) if len(rest) > 1 else ''
408            return f'option {opt_name}', opt_val
409
410        if kw in ('allow', 'deny', 'ignore'):
411            if not rest:
412                return None
413            return f'{kw} {" ".join(rest)}', True
414
415        if kw == 'range':
416            # range [dynamic-bootp] start [end]
417            return 'range', ' '.join(rest)
418
419        if kw == 'include':
420            # include "file"; — silently skip
421            return 'include', ' '.join(v.strip('"') for v in rest)
422
423        if kw in _KNOWN_PARAMS:
424            val = ' '.join(v.strip('"') for v in rest) if rest else ''
425            return kw, val
426
427        return None  # unrecognised keyword
428
429    # ------------------------------------------------------------------ #
430    # Main token loop                                                      #
431    # ------------------------------------------------------------------ #
432    for tok in tokens:
433
434        if tok == ';':
435            if not pending:
436                continue  # empty statement is fine
437
438            kw = pending[0].lower()
439            kind, data = context_stack[-1]
440
441            if kind == 'host':
442                if kw == 'hardware' and len(pending) >= 3 and pending[1].lower() == 'ethernet':
443                    data['mac'] = pending[2].lower()
444                elif kw == 'fixed-address' and len(pending) >= 2:
445                    data['ip'] = pending[1]
446                elif kw in _KNOWN_HOST_PARAMS:
447                    pass  # valid host option, not needed for our output
448                else:
449                    errors += 1
450
451            elif kind in ('global', 'subnet'):
452                parsed = _parse_param(pending)
453                if parsed is not None:
454                    target = global_params if kind == 'global' else subnets[data]
455                    key, val = parsed
456                    if key != 'include':   # do not store include directives
457                        target[key] = val
458                else:
459                    errors += 1
460
461            # else: inside 'other' (pool, group, shared-network, …) — ignore content
462
463            pending = []
464
465        elif tok == '{':
466            kw = pending[0].lower() if pending else ''
467
468            if kw == 'subnet':
469                if len(pending) >= 4 and pending[2].lower() == 'netmask':
470                    net_addr = pending[1]
471                    netmask = pending[3]
472                    subnets[net_addr] = {'netmask': netmask, 'fixed-addresses': {}}
473                    context_stack.append(('subnet', net_addr))
474                else:
475                    errors += 1
476                    context_stack.append(('other', None))
477
478            elif kw == 'host':
479                if len(pending) >= 2:
480                    host_info: dict[str, Any] = {
481                        'mac': None,
482                        'ip': None,
483                        'parent_subnet': _find_parent_subnet(),
484                    }
485                    context_stack.append(('host', host_info))
486                else:
487                    errors += 1
488                    context_stack.append(('other', None))
489
490            elif kw in ('shared-network', 'group', 'class', 'subclass',
491                        'pool', 'failover', 'peer', 'key', 'zone',
492                        'on', 'if', 'elsif', 'else'):
493                context_stack.append(('other', None))
494
495            elif not pending:
496                # Bare { with no preceding keyword
497                errors += 1
498                context_stack.append(('other', None))
499
500            else:
501                errors += 1
502                context_stack.append(('other', None))
503
504            pending = []
505
506        elif tok == '}':
507            if len(context_stack) <= 1:
508                errors += 1   # unmatched closing brace
509                pending = []
510                continue
511
512            kind, data = context_stack.pop()
513
514            if kind == 'host' and data is not None:
515                mac = data.get('mac')
516                ip = data.get('ip')
517                parent = data.get('parent_subnet')
518                if mac and ip:
519                    all_hosts.append((parent, mac, ip))
520
521            pending = []
522
523        else:
524            pending.append(tok)
525
526    # Unfinished statement at end of file (missing semicolon)
527    if pending:
528        errors += 1
529
530    # Unclosed blocks (missing closing braces)
531    errors += max(0, len(context_stack) - 1)
532
533    # ------------------------------------------------------------------ #
534    # Assign collected hosts to their subnets                             #
535    # ------------------------------------------------------------------ #
536    # Build (IPv4Network, net_addr_str) pairs for membership tests
537    subnet_nets: list[tuple[IPv4Network, str]] = []
538    for net_addr, subnet_data in subnets.items():
539        netmask = subnet_data.get('netmask', '')
540        try:
541            subnet_nets.append((IPv4Network(f'{net_addr}/{netmask}', strict=False), net_addr))
542        except ValueError:
543            pass
544
545    for parent, mac, ip in all_hosts:
546        # If the host was declared directly inside a subnet block, use that subnet.
547        if parent is not None and parent in subnets:
548            subnets[parent]['fixed-addresses'][mac] = ip
549            continue
550        # Otherwise resolve by checking which subnet the fixed-address belongs to.
551        try:
552            host_ip = IPv4Address(ip)
553        except ValueError:
554            continue  # ip is a hostname — can't resolve to a subnet here
555        for net, net_addr in subnet_nets:
556            if host_ip in net:
557                subnets[net_addr]['fixed-addresses'][mac] = ip
558                break
559
560    # ------------------------------------------------------------------ #
561    # Build effective per-subnet parameters (global ← subnet overrides)  #
562    # ------------------------------------------------------------------ #
563    effective_subnets: dict[str, dict[str, Any]] = {}
564    for net_addr, subnet_data in subnets.items():
565        fixed = subnet_data.pop('fixed-addresses')
566        effective: dict[str, Any] = {**global_params, **subnet_data}
567        effective['fixed-addresses'] = fixed
568        effective_subnets[net_addr] = effective
569
570    return {
571        'errors': errors,
572        'global_parameters': global_params,
573        'subnets': effective_subnets,
574    }
@dataclass
class DhcpSubnet:
10@dataclass
11class DhcpSubnet:
12    """Configuration for a single DHCP subnet declaration."""
13    subnet: IPv4Network
14    range_start: IPv4Address
15    range_end: IPv4Address
16    routers: list[IPv4Address]            = field(default_factory=list)
17    dns_servers: list[IPv4Address]        = field(default_factory=list)
18    domain_name: str | None               = None
19    broadcast_address: IPv4Address | None = None
20    default_lease_time: int | None        = None  # subnet-level override; None = use global
21    max_lease_time: int | None            = None  # subnet-level override; None = use global
22    fixed_addresses: dict[str, str]       = field(default_factory=dict)  # MAC → IP string

Configuration for a single DHCP subnet declaration.

DhcpSubnet( subnet: ipaddress.IPv4Network, range_start: ipaddress.IPv4Address, range_end: ipaddress.IPv4Address, routers: list[ipaddress.IPv4Address] = <factory>, dns_servers: list[ipaddress.IPv4Address] = <factory>, domain_name: str | None = None, broadcast_address: ipaddress.IPv4Address | None = None, default_lease_time: int | None = None, max_lease_time: int | None = None, fixed_addresses: dict[str, str] = <factory>)
subnet: ipaddress.IPv4Network
range_start: ipaddress.IPv4Address
range_end: ipaddress.IPv4Address
routers: list[ipaddress.IPv4Address]
dns_servers: list[ipaddress.IPv4Address]
domain_name: str | None = None
broadcast_address: ipaddress.IPv4Address | None = None
default_lease_time: int | None = None
max_lease_time: int | None = None
fixed_addresses: dict[str, str]
@dataclass
class DhcpParameters:
25@dataclass
26class DhcpParameters:
27    """All parameters required to configure an ISC DHCP server on a Debian machine.
28
29    Covers both /etc/default/isc-dhcp-server (``interfaces_v4``, ``interfaces_v6``) and
30    /etc/dhcp/dhcpd.conf (everything else).
31    """
32    interfaces_v4: list[str]            # written to INTERFACESv4 in /etc/default/isc-dhcp-server
33    interfaces_v6: list[str]            = field(default_factory=list)  # written to INTERFACESv6
34    subnets: list[DhcpSubnet]           = field(default_factory=list)
35    authoritative: bool                 = True
36    default_lease_time: int | None      = None  # omitted from global section when None
37    max_lease_time: int | None          = None  # omitted from global section when None
38    ddns_update_style: str              = 'none'

All parameters required to configure an ISC DHCP server on a Debian machine.

Covers both /etc/default/isc-dhcp-server (interfaces_v4, interfaces_v6) and /etc/dhcp/dhcpd.conf (everything else).

DhcpParameters( interfaces_v4: list[str], interfaces_v6: list[str] = <factory>, subnets: list[DhcpSubnet] = <factory>, authoritative: bool = True, default_lease_time: int | None = None, max_lease_time: int | None = None, ddns_update_style: str = 'none')
interfaces_v4: list[str]
interfaces_v6: list[str]
subnets: list[DhcpSubnet]
authoritative: bool = True
default_lease_time: int | None = None
max_lease_time: int | None = None
ddns_update_style: str = 'none'
def set_dhcp_server( net_scheme: SRE.lib_sre.NetScheme0, machine: str, dhcp_params: DhcpParameters, step: int = 1) -> None:
41def set_dhcp_server(net_scheme: NetScheme0, machine: str,
42                    dhcp_params: DhcpParameters, step: int = 1) -> None:
43    """Write DHCP server configuration files and (re)start the service on *machine_name*.
44
45    Writes:
46    - /etc/default/isc-dhcp-server  (INTERFACESv4)
47    - /etc/dhcp/dhcpd.conf          (generated from dhcp_params)
48
49    Then runs ``systemctl enable`` and ``systemctl restart`` for isc-dhcp-server.
50    """
51    # /etc/default/isc-dhcp-server
52    ifaces_v4 = ' '.join(dhcp_params.interfaces_v4)
53    ifaces_v6 = ' '.join(dhcp_params.interfaces_v6)
54    default_content = f'INTERFACESv4="{ifaces_v4}"\nINTERFACESv6="{ifaces_v6}"\n'
55    net_scheme.file(machine, '/etc/default/isc-dhcp-server', default_content, step=step)
56
57    # /etc/dhcp/dhcpd.conf
58    lines = []
59    auth = 'authoritative' if dhcp_params.authoritative else 'not authoritative'
60    lines += [
61        f'ddns-update-style {dhcp_params.ddns_update_style};',
62        f'{auth};',
63    ]
64    if dhcp_params.default_lease_time is not None:
65        lines.append(f'default-lease-time {dhcp_params.default_lease_time};')
66    if dhcp_params.max_lease_time is not None:
67        lines.append(f'max-lease-time {dhcp_params.max_lease_time};')
68    for s in dhcp_params.subnets:
69        lines.append('')
70        lines.append(f'subnet {s.subnet.network_address} netmask {s.subnet.netmask} {{')
71        lines.append(f'    range {s.range_start} {s.range_end};')
72        if s.routers:
73            lines.append(f'    option routers {", ".join(str(r) for r in s.routers)};')
74        if s.dns_servers:
75            lines.append(f'    option domain-name-servers {", ".join(str(d) for d in s.dns_servers)};')
76        if s.domain_name is not None:
77            lines.append(f'    option domain-name "{s.domain_name}";')
78        if s.broadcast_address is not None:
79            lines.append(f'    option broadcast-address {s.broadcast_address};')
80        if s.default_lease_time is not None:
81            lines.append(f'    default-lease-time {s.default_lease_time};')
82        if s.max_lease_time is not None:
83            lines.append(f'    max-lease-time {s.max_lease_time};')
84        for mac, ip in s.fixed_addresses.items():
85            safe = mac.replace(':', '-')
86            lines += [
87                f'    host {safe} {{',
88                f'        hardware ethernet {mac};',
89                f'        fixed-address {ip};',
90                f'    }}',
91            ]
92        lines.append('}')
93    net_scheme.file(machine, '/etc/dhcp/dhcpd.conf', '\n'.join(lines) + '\n', step=step)
94
95    net_scheme.cmd(machine, 'systemctl enable isc-dhcp-server', step=step)
96    net_scheme.cmd(machine, 'systemctl restart isc-dhcp-server', step=step)

Write DHCP server configuration files and (re)start the service on machine_name.

Writes:

  • /etc/default/isc-dhcp-server (INTERFACESv4)
  • /etc/dhcp/dhcpd.conf (generated from dhcp_params)

Then runs systemctl enable and systemctl restart for isc-dhcp-server.

def get_dhcp_server( grade: SRE.lib_sre.Grade0, machine: str, step: int = 1) -> tuple[DhcpParameters | None, int]:
 99def get_dhcp_server(grade: Grade0, machine: str,
100                    step: int = 1) -> tuple[DhcpParameters | None, int]:
101    """Read and parse the DHCP server configuration on *machine_name*.
102
103    Returns ``(params, errors)`` where *params* is a :class:`DhcpParameters`
104    instance (or ``None`` if /etc/default/isc-dhcp-server is absent/unreadable)
105    and *errors* is the number of parse errors found in dhcpd.conf.
106    """
107    interfaces_v4 = parse_ipv4_interfaces_in_default_dhcp_server_file(grade, machine, step)
108    if interfaces_v4 is None:
109        return None, 1
110    interfaces_v6 = parse_ipv6_interfaces_in_default_dhcp_server_file(grade, machine, step) or []
111
112    parsed = _parse_dhcpd_config(grade, machine, step)
113    errors: int = parsed['errors']
114    gp = parsed['global_parameters']
115
116    authoritative     = bool(gp.get('authoritative', False))
117    _dlt = gp.get('default-lease-time', None)
118    _mlt = gp.get('max-lease-time', None)
119    default_lease_time: int | None = int(_dlt) if _dlt is not None else None
120    max_lease_time: int | None     = int(_mlt) if _mlt is not None else None
121    ddns_update_style = str(gp.get('ddns-update-style', 'none'))
122
123    subnets: list[DhcpSubnet] = []
124    for net_addr, sp in parsed['subnets'].items():
125        # Reconstruct IPv4Network
126        try:
127            subnet_net = IPv4Network(f'{net_addr}/{sp["netmask"]}', strict=False)
128        except (KeyError, ValueError):
129            errors += 1
130            continue
131
132        # Parse pool range — stored as "start end" or "dynamic-bootp start end"
133        range_val = sp.get('range', '')
134        range_parts = range_val.split()
135        if range_parts and range_parts[0].lower() == 'dynamic-bootp':
136            range_parts = range_parts[1:]
137        try:
138            range_start = IPv4Address(range_parts[0])
139            range_end   = IPv4Address(range_parts[1] if len(range_parts) > 1 else range_parts[0])
140        except (IndexError, ValueError):
141            errors += 1
142            continue
143
144        def _parse_addr_list(val: str) -> list[IPv4Address]:
145            result = []
146            for part in re.split(r'[,\s]+', val.strip()):
147                if part:
148                    try:
149                        result.append(IPv4Address(part))
150                    except ValueError:
151                        pass
152            return result
153
154        routers = _parse_addr_list(sp.get('option routers', ''))
155        dns_servers = _parse_addr_list(sp.get('option domain-name-servers', ''))
156
157        domain_name = sp.get('option domain-name', None)
158        if domain_name is not None:
159            domain_name = domain_name.strip('"')
160
161        bcast_str = sp.get('option broadcast-address', None)
162        try:
163            broadcast_address = IPv4Address(bcast_str) if bcast_str else None
164        except ValueError:
165            broadcast_address = None
166
167        # Only set per-subnet overrides when they differ from the global values
168        sub_dlt = sp.get('default-lease-time', None)
169        sub_mlt = sp.get('max-lease-time', None)
170        try:
171            sub_dlt = int(sub_dlt) if sub_dlt is not None else None
172            sub_mlt = int(sub_mlt) if sub_mlt is not None else None
173        except ValueError:
174            sub_dlt = sub_mlt = None
175        subnet_default_lease = sub_dlt if sub_dlt != default_lease_time else None
176        subnet_max_lease     = sub_mlt if sub_mlt != max_lease_time else None
177
178        subnets.append(DhcpSubnet(
179            subnet=subnet_net,
180            range_start=range_start,
181            range_end=range_end,
182            routers=routers,
183            dns_servers=dns_servers,
184            domain_name=domain_name,
185            broadcast_address=broadcast_address,
186            default_lease_time=subnet_default_lease,
187            max_lease_time=subnet_max_lease,
188            fixed_addresses={str(k): str(v) for k, v in sp.get('fixed-addresses', {}).items()},
189        ))
190
191    params = DhcpParameters(
192        interfaces_v4=interfaces_v4,
193        interfaces_v6=interfaces_v6,
194        subnets=subnets,
195        authoritative=authoritative,
196        default_lease_time=default_lease_time,
197        max_lease_time=max_lease_time,
198        ddns_update_style=ddns_update_style,
199    )
200    return params, errors

Read and parse the DHCP server configuration on machine_name.

Returns (params, errors) where params is a DhcpParameters instance (or None if /etc/default/isc-dhcp-server is absent/unreadable) and errors is the number of parse errors found in dhcpd.conf.

def check_running_dhcp_server(grade: SRE.lib_sre.Grade0, machine: str) -> tuple[bool, list[str]]:
232def check_running_dhcp_server(grade: Grade0, machine: str) -> tuple[bool, list[str]]:
233    """Check whether ISC DHCP server is running on *machine*.
234
235    Returns a tuple ``(running, interfaces)`` where:
236
237    - *running*: ``True`` if ``isc-dhcp-server`` is currently active.
238    - *interfaces*: list of interface names dhcpd is bound to (e.g.
239      ``["eth0", "eth1"]``), or ``["*"]`` if it listens on all interfaces.
240      Empty list when *running* is ``False``.
241
242    Listening interfaces are read from the running process command line so
243    they reflect the actual state, not just the configuration file.
244    """
245    _, code = grade.test(machine, 'systemctl is-active isc-dhcp-server',
246                         allow_error=True)
247    if code != 0:
248        return False, []
249
250    cmdline_out, _ = grade.test(
251        machine,
252        r"tr '\000' '\n' < /proc/$(pidof -s dhcpd)/cmdline 2>/dev/null",
253        allow_error=True,
254    )
255    return True, _parse_dhcpd_interfaces(cmdline_out)

Check whether ISC DHCP server is running on machine.

Returns a tuple (running, interfaces) where:

  • running: True if isc-dhcp-server is currently active.
  • interfaces: list of interface names dhcpd is bound to (e.g. ["eth0", "eth1"]), or ["*"] if it listens on all interfaces. Empty list when running is False.

Listening interfaces are read from the running process command line so they reflect the actual state, not just the configuration file.

def parse_ipv4_interfaces_in_default_dhcp_server_file( grade: SRE.lib_sre.Grade0, machine: str, step: int = 1) -> list[str] | None:
258def parse_ipv4_interfaces_in_default_dhcp_server_file(
259        grade: Grade0, machine: str, step: int = 1) -> list[str] | None:
260    """Return the list of interfaces from INTERFACESv4 in /etc/default/isc-dhcp-server.
261
262    Returns None if the file is absent or unreadable.
263    Returns an empty list if the file exists but INTERFACESv4 is not set.
264    """
265    output, code = grade.test(machine, 'cat /etc/default/isc-dhcp-server',
266                              step=step, allow_error=True)
267    if code != 0:
268        return None
269    for line in output.splitlines():
270        line = line.strip()
271        if line.startswith('#'):
272            continue
273        m = re.match(r'^INTERFACESv4\s*=\s*"([^"]*)"', line)
274        if m:
275            return m.group(1).split()
276    return []

Return the list of interfaces from INTERFACESv4 in /etc/default/isc-dhcp-server.

Returns None if the file is absent or unreadable. Returns an empty list if the file exists but INTERFACESv4 is not set.

def parse_ipv6_interfaces_in_default_dhcp_server_file( grade: SRE.lib_sre.Grade0, machine: str, step: int = 1) -> list[str] | None:
279def parse_ipv6_interfaces_in_default_dhcp_server_file(
280        grade: Grade0, machine: str, step: int = 1) -> list[str] | None:
281    """Return the list of interfaces from INTERFACESv6 in /etc/default/isc-dhcp-server.
282
283    Returns None if the file is absent or unreadable.
284    Returns an empty list if the file exists but INTERFACESv6 is not set.
285    """
286    output, code = grade.test(machine, 'cat /etc/default/isc-dhcp-server',
287                              step=step, allow_error=True)
288    if code != 0:
289        return None
290    for line in output.splitlines():
291        line = line.strip()
292        if line.startswith('#'):
293            continue
294        m = re.match(r'^INTERFACESv6\s*=\s*"([^"]*)"', line)
295        if m:
296            return m.group(1).split()
297    return []

Return the list of interfaces from INTERFACESv6 in /etc/default/isc-dhcp-server.

Returns None if the file is absent or unreadable. Returns an empty list if the file exists but INTERFACESv6 is not set.