lib.ips

  1import json
  2import random
  3from ipaddress import IPv4Address, IPv4Interface, IPv4Network
  4
  5import msgpack
  6from netaddr import EUI
  7
  8
  9class IPv4Addresses:
 10    """Dynamic container for named IPv4Address attributes, with no boilerplate.
 11
 12    Usage:
 13        ips = IPv4Addresses()
 14        ips.ip1 = IPv4Address("192.168.1.1")
 15        ips.ip2 = IPv4Address("192.168.1.2")
 16    """
 17
 18    def __setattr__(self, name, value):
 19        if not isinstance(value, IPv4Interface):
 20            raise TypeError(f"{name}: expected IPv4Interface, got {type(value).__name__}")
 21        super().__setattr__(name, value)
 22
 23    # ---------------- dict ----------------
 24
 25    def to_dict(self):
 26        return {k: str(v) for k, v in self.__dict__.items()}
 27
 28    @classmethod
 29    def from_dict(cls, d):
 30        obj = cls()
 31        for k, v in d.items():
 32            super(IPv4Addresses, obj).__setattr__(k, IPv4Interface(v))
 33        return obj
 34
 35    # ---------------- JSON ----------------
 36
 37    def to_json(self):
 38        return json.dumps(self.to_dict())
 39
 40    @classmethod
 41    def from_json(cls, s):
 42        return cls.from_dict(json.loads(s))
 43
 44    # ---------------- msgpack ----------------
 45
 46    def pack(self):
 47        return msgpack.packb(self.to_dict(), use_bin_type=True)
 48
 49    @classmethod
 50    def unpack(cls, blob):
 51        return cls.from_dict(msgpack.unpackb(blob, raw=False))
 52
 53
 54class IPv4Networks:
 55    """Dynamic container for named IPv4Network attributes, with no boilerplate.
 56
 57    Usage:
 58        nets = IPv4Networks()
 59        nets.lan = IPv4Network("192.168.1.0/24")
 60        nets.mgmt = IPv4Network("10.0.0.0/8")
 61    """
 62
 63    def __setattr__(self, name, value):
 64        if not isinstance(value, IPv4Network):
 65            raise TypeError(f"{name}: expected IPv4Network, got {type(value).__name__}")
 66        super().__setattr__(name, value)
 67
 68    # ---------------- dict ----------------
 69
 70    def to_dict(self):
 71        return {k: str(v) for k, v in self.__dict__.items()}
 72
 73    @classmethod
 74    def from_dict(cls, d):
 75        obj = cls()
 76        for k, v in d.items():
 77            super(IPv4Networks, obj).__setattr__(k, IPv4Network(v))
 78        return obj
 79
 80    # ---------------- JSON ----------------
 81
 82    def to_json(self):
 83        return json.dumps(self.to_dict())
 84
 85    @classmethod
 86    def from_json(cls, s):
 87        return cls.from_dict(json.loads(s))
 88
 89    # ---------------- msgpack ----------------
 90
 91    def pack(self):
 92        return msgpack.packb(self.to_dict(), use_bin_type=True)
 93
 94    @classmethod
 95    def unpack(cls, blob):
 96        return cls.from_dict(msgpack.unpackb(blob, raw=False))
 97
 98
 99_PRIVATE_NETWORKS = [
100    IPv4Network("10.0.0.0/8"),
101    IPv4Network("172.16.0.0/12"),
102    IPv4Network("192.168.0.0/16"),
103]
104
105
106def _pick_one(mask, from_network, exclude, from_private_network):
107    """Pick a single random /mask network. Internal helper for random_ipv4networks."""
108    block_size = 2 ** (32 - mask)
109
110    def make(abs_idx):
111        return IPv4Network((abs_idx * block_size, mask))
112
113    def available(net):
114        return not any(net.overlaps(ex) for ex in exclude)
115
116    def indices_in(a, b):
117        first = (a + block_size - 1) // block_size
118        last = (b + 1) // block_size - 1
119        return range(first, last + 1) if first <= last else range(0, 0)
120
121    flo = int(from_network.network_address)
122    fhi = int(from_network.broadcast_address)
123    if from_private_network:
124        boxes = []
125        for p in _PRIVATE_NETWORKS:
126            lo = max(flo, int(p.network_address))
127            hi = min(fhi, int(p.broadcast_address))
128            if lo <= hi:
129                boxes.append((lo, hi))
130    else:
131        boxes = [(flo, fhi)]
132
133    spans = [(r.start, len(r)) for lo, hi in boxes for r in [indices_in(lo, hi)] if r]
134    total = sum(count for _, count in spans)
135
136    if total == 0:
137        raise ValueError(f"No /{mask} network fits in the search space")
138
139    def pick_abs_idx():
140        if from_private_network and len(spans) > 1:
141            # Pick uniformly across private ranges (not weighted by range size)
142            start, count = random.choice(spans)
143            return start + random.randrange(count)
144        n = random.randrange(total)
145        for start, count in spans:
146            if n < count:
147                return start + n
148            n -= count
149        return spans[-1][0]  # unreachable: n < total guarantees a match above
150
151    if total <= 65536:
152        candidates = [make(i) for start, count in spans
153                      for i in range(start, start + count)
154                      if available(make(i))]
155        if not candidates:
156            raise ValueError(f"No available /{mask} network with the given constraints")
157        return random.choice(candidates)
158
159    for _ in range(1000):
160        candidate = make(pick_abs_idx())
161        if available(candidate):
162            return candidate
163
164    raise ValueError(f"No available /{mask} network found after 1000 attempts")
165
166
167def random_ipv4networks(
168    masks,
169    from_network=IPv4Network("0.0.0.0/0"),
170    exclude=None,
171    from_private_network=False,
172):
173    """Return a list of disjoint random IPv4Networks.
174
175    `masks` is an int or a list of ints; one network is returned per mask,
176    all mutually disjoint and not overlapping any network in `exclude`.
177    Optionally restricted to RFC-1918 private ranges with `from_private_network=True`.
178
179    `from` being a reserved keyword, the parameter is named `from_network`.
180
181    Raises ValueError if any network cannot be allocated.
182    """
183    if isinstance(masks, int):
184        masks = [masks]
185
186    working_exclude = list(exclude) if exclude else []
187    result = []
188    for mask in masks:
189        net = _pick_one(mask, from_network, working_exclude, from_private_network)
190        result.append(net)
191        working_exclude.append(net)
192    return result
193
194
195def random_ipv4s(network, n=1, exclude_ips=None, exclude_nets=None):
196    """Return a list of n distinct random IPv4Interface within `network`,
197    excluding any address in `exclude_ips` or covered by any network in `exclude_nets`.
198
199    Raises ValueError if fewer than n addresses are available.
200    """
201    ex_ips = set(exclude_ips) if exclude_ips else set()
202    ex_nets = list(exclude_nets) if exclude_nets else []
203
204    base = int(network.network_address)
205    total = network.num_addresses
206
207    def make(i):
208        return IPv4Interface(f"{IPv4Address(base + i)}/{network.prefixlen}")
209
210    def available(ip):
211        return ip not in ex_ips and not any(ip in net for net in ex_nets)
212
213    # Exclude network address (offset 0) and broadcast (offset total-1) for prefix <= 30.
214    # /31 (point-to-point, RFC 3021) and /32 (host route) have no reserved boundary addresses.
215    if network.prefixlen <= 30:
216        host_range = range(1, total - 1)
217    else:
218        host_range = range(total)
219
220    # Small space: enumerate all candidates, then sample.
221    if total <= 65536:
222        candidates = [make(i) for i in host_range if available(make(i))]
223        if len(candidates) < n:
224            raise ValueError(
225                f"Not enough available addresses: need {n}, found {len(candidates)}"
226            )
227        return random.sample(candidates, n)
228
229    # Large space: pick one at a time, accumulating into a working exclude set.
230    working_ex = set(ex_ips)
231    result = []
232    for _ in range(n):
233        for _ in range(1000):
234            candidate = make(random.randrange(host_range.start, host_range.stop))
235            if candidate not in working_ex and not any(candidate in net for net in ex_nets):
236                result.append(candidate)
237                working_ex.add(candidate)
238                break
239        else:
240            raise ValueError(
241                f"Could not find enough available addresses (got {len(result)}/{n})"
242            )
243    return result
244
245
246def random_ipv4s_with_range(network, gap, n=1, exclude_ips=None, exclude_nets=None):
247    """Return a list of n + 2*k distinct random IPv4Interface within `network`.
248
249    `gap` is either an int or a list of ints. ``k = 1`` when gap is an int,
250    ``k = len(gap)`` otherwise.
251
252    The returned list has the form:
253        [ip_min1, ip_max1, ip_min2, ip_max2, ..., ip_mink, ip_maxk, ip1, ..., ipn]
254
255    Guarantees:
256    - int(ip_max_i.ip) - int(ip_min_i.ip) == gap[i]  (or gap when gap is an int)
257    - ip_max_i < ip_min_{i+1}  (ranges are strictly ordered, non-overlapping)
258    - ip1 .. ipn are outside every range [ip_min_i, ip_max_i]
259
260    Raises ValueError if the constraints cannot be satisfied.
261    """
262    gaps = [gap] if isinstance(gap, int) else list(gap)
263    k = len(gaps)
264
265    ex_ips = set(exclude_ips) if exclude_ips else set()
266    ex_nets = list(exclude_nets) if exclude_nets else []
267
268    base = int(network.network_address)
269    total = network.num_addresses
270    prefixlen = network.prefixlen
271
272    def make(i):
273        return IPv4Interface(f"{IPv4Address(base + i)}/{prefixlen}")
274
275    def is_excluded(ip):
276        return ip in ex_ips or any(ip in net for net in ex_nets)
277
278    # Minimum space: k ranges placed back-to-back with 1-address gaps between them.
279    # Minimum last offset: sum(gaps) + k - 1; need total >= sum(gaps) + k.
280    min_needed = sum(gaps) + k
281    if total < min_needed:
282        raise ValueError(
283            f"Network {network} is too small to fit {k} range(s) with gaps {gaps}"
284        )
285
286    # suffix_sums[i] = sum(gaps[i:])
287    suffix_sums = [0] * (k + 1)
288    for i in range(k - 1, -1, -1):
289        suffix_sums[i] = suffix_sums[i + 1] + gaps[i]
290
291    def hi_for(i):
292        """Max start offset for range i that still leaves room for ranges i+1..k-1."""
293        return total - 1 - suffix_sums[i] - (k - 1 - i)
294
295    SMALL = 65536
296
297    def try_place_ranges():
298        """Try one left-to-right placement. Returns [(start, end), ...] or None."""
299        range_offsets = []
300        lo = 0
301        for i in range(k):
302            h = hi_for(i)
303            if h < lo:
304                return None
305            g = gaps[i]
306            space = h - lo + 1
307            if space <= SMALL:
308                candidates = [
309                    j
310                    for j in range(lo, h + 1)
311                    if not is_excluded(make(j)) and not is_excluded(make(j + g))
312                ]
313                if not candidates:
314                    return None
315                s = random.choice(candidates)
316            else:
317                s = None
318                for _ in range(1000):
319                    c = random.randint(lo, h)
320                    if not is_excluded(make(c)) and not is_excluded(make(c + g)):
321                        s = c
322                        break
323                if s is None:
324                    return None
325            range_offsets.append((s, s + g))
326            lo = s + g + 1
327        return range_offsets
328
329    range_offsets = None
330    for _ in range(1000):
331        range_offsets = try_place_ranges()
332        if range_offsets is not None:
333            break
334    if range_offsets is None:
335        raise ValueError(
336            f"Could not place {k} range(s) with gaps {gaps} with the given exclusions"
337        )
338
339    def in_any_range(offset):
340        return any(s <= offset <= e for s, e in range_offsets)
341
342    def extra_available(offset):
343        ip = make(offset)
344        return (
345            ip not in ex_ips
346            and not in_any_range(offset)
347            and not any(ip in net for net in ex_nets)
348        )
349
350    if total <= SMALL:
351        extra_candidates = [make(i) for i in range(total) if extra_available(i)]
352        if len(extra_candidates) < n:
353            raise ValueError(
354                f"Not enough addresses outside the range(s) for {n} extra IPs: "
355                f"found {len(extra_candidates)}"
356            )
357        extras = random.sample(extra_candidates, n)
358    else:
359        chosen = set()
360        extras = []
361        for _ in range(n):
362            for _ in range(1000):
363                offset = random.randrange(total)
364                candidate = make(offset)
365                if extra_available(offset) and candidate not in chosen:
366                    extras.append(candidate)
367                    chosen.add(candidate)
368                    break
369            else:
370                raise ValueError(
371                    f"Could not find enough extra addresses (got {len(extras)}/{n})"
372                )
373
374    result = []
375    for s, e in range_offsets:
376        result.append(make(s))
377        result.append(make(e))
378    result.extend(extras)
379    return result
380
381
382def random_ips_from_topology(data, topology):
383    """Assign random IPs to data.ips from data.nets based on a NetScheme0 topology.
384
385    topology: {net_name: [machine, ...] or {machine: iface_spec, ...}}
386    (the _topology class attribute format of NetScheme0)
387
388    For each machine m:
389    - belongs to exactly one network netX → data.ips.m  (in data.nets.netX)
390    - belongs to multiple networks       → data.ips.m_netX for each netX
391    """
392    # Build inverse map: machine -> [net_name, ...]  (preserve insertion order)
393    machine_nets = {}
394    for net_name, machines in topology.items():
395        names = machines.keys() if isinstance(machines, dict) else machines
396        for m in names:
397            machine_nets.setdefault(m, []).append(net_name)
398
399    # Assign IPs, tracking used addresses per network to avoid duplicates
400    assigned = {}  # {net_name: list[IPv4Interface]}
401    for machine, nets in machine_nets.items():
402        for net_name in nets:
403            network = getattr(data.nets, net_name)
404            ip = random_ipv4s(network, 1, exclude_ips=assigned.get(net_name))[0]
405            assigned.setdefault(net_name, []).append(ip)
406            attr = machine if len(nets) == 1 else f"{machine}_{net_name}"
407            setattr(data.ips, attr, ip)
408
409
410def random_mac_address(prefix=None, n=1):
411    """Return a list of n distinct random EUI MAC addresses.
412
413    `prefix` is an optional colon- or dash-separated hex string specifying the
414    leading bytes (e.g. ``"00:1A:2B"`` for a 3-byte OUI prefix).
415    Remaining bytes are chosen at random.
416
417    Raises ValueError if n distinct addresses cannot be generated.
418    """
419    if prefix is not None:
420        sep = "-" if "-" in prefix else ":"
421        prefix_bytes = bytes(int(x, 16) for x in prefix.split(sep))
422    else:
423        prefix_bytes = b""
424
425    suffix_len = 6 - len(prefix_bytes)
426    if suffix_len < 0:
427        raise ValueError(
428            f"Prefix too long: {prefix!r} ({len(prefix_bytes)} bytes, max 6)"
429        )
430
431    total = 256**suffix_len
432    if n > total:
433        raise ValueError(
434            f"Cannot generate {n} distinct MACs with {suffix_len} random bytes (max {total})"
435        )
436
437    def make(suffix_bytes):
438        all_bytes = prefix_bytes + suffix_bytes
439        if prefix is None:
440            all_bytes = bytes([all_bytes[0] & 0xFE]) + all_bytes[1:]
441        return EUI(":".join(f"{b:02x}" for b in all_bytes))
442
443    if total <= 65536:
444        candidates = [make(i.to_bytes(suffix_len, "big")) for i in range(total)]
445        return random.sample(candidates, n)
446
447    seen = set()
448    result = []
449    for _ in range(n):
450        for _ in range(1000):
451            suffix = random.randbytes(suffix_len)
452            if suffix not in seen:
453                seen.add(suffix)
454                result.append(make(suffix))
455                break
456        else:
457            raise ValueError(
458                f"Could not generate {n} distinct MACs after 1000 attempts"
459            )
460    return result
class IPv4Addresses:
10class IPv4Addresses:
11    """Dynamic container for named IPv4Address attributes, with no boilerplate.
12
13    Usage:
14        ips = IPv4Addresses()
15        ips.ip1 = IPv4Address("192.168.1.1")
16        ips.ip2 = IPv4Address("192.168.1.2")
17    """
18
19    def __setattr__(self, name, value):
20        if not isinstance(value, IPv4Interface):
21            raise TypeError(f"{name}: expected IPv4Interface, got {type(value).__name__}")
22        super().__setattr__(name, value)
23
24    # ---------------- dict ----------------
25
26    def to_dict(self):
27        return {k: str(v) for k, v in self.__dict__.items()}
28
29    @classmethod
30    def from_dict(cls, d):
31        obj = cls()
32        for k, v in d.items():
33            super(IPv4Addresses, obj).__setattr__(k, IPv4Interface(v))
34        return obj
35
36    # ---------------- JSON ----------------
37
38    def to_json(self):
39        return json.dumps(self.to_dict())
40
41    @classmethod
42    def from_json(cls, s):
43        return cls.from_dict(json.loads(s))
44
45    # ---------------- msgpack ----------------
46
47    def pack(self):
48        return msgpack.packb(self.to_dict(), use_bin_type=True)
49
50    @classmethod
51    def unpack(cls, blob):
52        return cls.from_dict(msgpack.unpackb(blob, raw=False))

Dynamic container for named IPv4Address attributes, with no boilerplate.

Usage:

ips = IPv4Addresses() ips.ip1 = IPv4Address("192.168.1.1") ips.ip2 = IPv4Address("192.168.1.2")

def to_dict(self):
26    def to_dict(self):
27        return {k: str(v) for k, v in self.__dict__.items()}
@classmethod
def from_dict(cls, d):
29    @classmethod
30    def from_dict(cls, d):
31        obj = cls()
32        for k, v in d.items():
33            super(IPv4Addresses, obj).__setattr__(k, IPv4Interface(v))
34        return obj
def to_json(self):
38    def to_json(self):
39        return json.dumps(self.to_dict())
@classmethod
def from_json(cls, s):
41    @classmethod
42    def from_json(cls, s):
43        return cls.from_dict(json.loads(s))
def pack(self):
47    def pack(self):
48        return msgpack.packb(self.to_dict(), use_bin_type=True)
@classmethod
def unpack(cls, blob):
50    @classmethod
51    def unpack(cls, blob):
52        return cls.from_dict(msgpack.unpackb(blob, raw=False))
class IPv4Networks:
55class IPv4Networks:
56    """Dynamic container for named IPv4Network attributes, with no boilerplate.
57
58    Usage:
59        nets = IPv4Networks()
60        nets.lan = IPv4Network("192.168.1.0/24")
61        nets.mgmt = IPv4Network("10.0.0.0/8")
62    """
63
64    def __setattr__(self, name, value):
65        if not isinstance(value, IPv4Network):
66            raise TypeError(f"{name}: expected IPv4Network, got {type(value).__name__}")
67        super().__setattr__(name, value)
68
69    # ---------------- dict ----------------
70
71    def to_dict(self):
72        return {k: str(v) for k, v in self.__dict__.items()}
73
74    @classmethod
75    def from_dict(cls, d):
76        obj = cls()
77        for k, v in d.items():
78            super(IPv4Networks, obj).__setattr__(k, IPv4Network(v))
79        return obj
80
81    # ---------------- JSON ----------------
82
83    def to_json(self):
84        return json.dumps(self.to_dict())
85
86    @classmethod
87    def from_json(cls, s):
88        return cls.from_dict(json.loads(s))
89
90    # ---------------- msgpack ----------------
91
92    def pack(self):
93        return msgpack.packb(self.to_dict(), use_bin_type=True)
94
95    @classmethod
96    def unpack(cls, blob):
97        return cls.from_dict(msgpack.unpackb(blob, raw=False))

Dynamic container for named IPv4Network attributes, with no boilerplate.

Usage:

nets = IPv4Networks() nets.lan = IPv4Network("192.168.1.0/24") nets.mgmt = IPv4Network("10.0.0.0/8")

def to_dict(self):
71    def to_dict(self):
72        return {k: str(v) for k, v in self.__dict__.items()}
@classmethod
def from_dict(cls, d):
74    @classmethod
75    def from_dict(cls, d):
76        obj = cls()
77        for k, v in d.items():
78            super(IPv4Networks, obj).__setattr__(k, IPv4Network(v))
79        return obj
def to_json(self):
83    def to_json(self):
84        return json.dumps(self.to_dict())
@classmethod
def from_json(cls, s):
86    @classmethod
87    def from_json(cls, s):
88        return cls.from_dict(json.loads(s))
def pack(self):
92    def pack(self):
93        return msgpack.packb(self.to_dict(), use_bin_type=True)
@classmethod
def unpack(cls, blob):
95    @classmethod
96    def unpack(cls, blob):
97        return cls.from_dict(msgpack.unpackb(blob, raw=False))
def random_ipv4networks( masks, from_network=IPv4Network('0.0.0.0/0'), exclude=None, from_private_network=False):
168def random_ipv4networks(
169    masks,
170    from_network=IPv4Network("0.0.0.0/0"),
171    exclude=None,
172    from_private_network=False,
173):
174    """Return a list of disjoint random IPv4Networks.
175
176    `masks` is an int or a list of ints; one network is returned per mask,
177    all mutually disjoint and not overlapping any network in `exclude`.
178    Optionally restricted to RFC-1918 private ranges with `from_private_network=True`.
179
180    `from` being a reserved keyword, the parameter is named `from_network`.
181
182    Raises ValueError if any network cannot be allocated.
183    """
184    if isinstance(masks, int):
185        masks = [masks]
186
187    working_exclude = list(exclude) if exclude else []
188    result = []
189    for mask in masks:
190        net = _pick_one(mask, from_network, working_exclude, from_private_network)
191        result.append(net)
192        working_exclude.append(net)
193    return result

Return a list of disjoint random IPv4Networks.

masks is an int or a list of ints; one network is returned per mask, all mutually disjoint and not overlapping any network in exclude. Optionally restricted to RFC-1918 private ranges with from_private_network=True.

from being a reserved keyword, the parameter is named from_network.

Raises ValueError if any network cannot be allocated.

def random_ipv4s(network, n=1, exclude_ips=None, exclude_nets=None):
196def random_ipv4s(network, n=1, exclude_ips=None, exclude_nets=None):
197    """Return a list of n distinct random IPv4Interface within `network`,
198    excluding any address in `exclude_ips` or covered by any network in `exclude_nets`.
199
200    Raises ValueError if fewer than n addresses are available.
201    """
202    ex_ips = set(exclude_ips) if exclude_ips else set()
203    ex_nets = list(exclude_nets) if exclude_nets else []
204
205    base = int(network.network_address)
206    total = network.num_addresses
207
208    def make(i):
209        return IPv4Interface(f"{IPv4Address(base + i)}/{network.prefixlen}")
210
211    def available(ip):
212        return ip not in ex_ips and not any(ip in net for net in ex_nets)
213
214    # Exclude network address (offset 0) and broadcast (offset total-1) for prefix <= 30.
215    # /31 (point-to-point, RFC 3021) and /32 (host route) have no reserved boundary addresses.
216    if network.prefixlen <= 30:
217        host_range = range(1, total - 1)
218    else:
219        host_range = range(total)
220
221    # Small space: enumerate all candidates, then sample.
222    if total <= 65536:
223        candidates = [make(i) for i in host_range if available(make(i))]
224        if len(candidates) < n:
225            raise ValueError(
226                f"Not enough available addresses: need {n}, found {len(candidates)}"
227            )
228        return random.sample(candidates, n)
229
230    # Large space: pick one at a time, accumulating into a working exclude set.
231    working_ex = set(ex_ips)
232    result = []
233    for _ in range(n):
234        for _ in range(1000):
235            candidate = make(random.randrange(host_range.start, host_range.stop))
236            if candidate not in working_ex and not any(candidate in net for net in ex_nets):
237                result.append(candidate)
238                working_ex.add(candidate)
239                break
240        else:
241            raise ValueError(
242                f"Could not find enough available addresses (got {len(result)}/{n})"
243            )
244    return result

Return a list of n distinct random IPv4Interface within network, excluding any address in exclude_ips or covered by any network in exclude_nets.

Raises ValueError if fewer than n addresses are available.

def random_ipv4s_with_range(network, gap, n=1, exclude_ips=None, exclude_nets=None):
247def random_ipv4s_with_range(network, gap, n=1, exclude_ips=None, exclude_nets=None):
248    """Return a list of n + 2*k distinct random IPv4Interface within `network`.
249
250    `gap` is either an int or a list of ints. ``k = 1`` when gap is an int,
251    ``k = len(gap)`` otherwise.
252
253    The returned list has the form:
254        [ip_min1, ip_max1, ip_min2, ip_max2, ..., ip_mink, ip_maxk, ip1, ..., ipn]
255
256    Guarantees:
257    - int(ip_max_i.ip) - int(ip_min_i.ip) == gap[i]  (or gap when gap is an int)
258    - ip_max_i < ip_min_{i+1}  (ranges are strictly ordered, non-overlapping)
259    - ip1 .. ipn are outside every range [ip_min_i, ip_max_i]
260
261    Raises ValueError if the constraints cannot be satisfied.
262    """
263    gaps = [gap] if isinstance(gap, int) else list(gap)
264    k = len(gaps)
265
266    ex_ips = set(exclude_ips) if exclude_ips else set()
267    ex_nets = list(exclude_nets) if exclude_nets else []
268
269    base = int(network.network_address)
270    total = network.num_addresses
271    prefixlen = network.prefixlen
272
273    def make(i):
274        return IPv4Interface(f"{IPv4Address(base + i)}/{prefixlen}")
275
276    def is_excluded(ip):
277        return ip in ex_ips or any(ip in net for net in ex_nets)
278
279    # Minimum space: k ranges placed back-to-back with 1-address gaps between them.
280    # Minimum last offset: sum(gaps) + k - 1; need total >= sum(gaps) + k.
281    min_needed = sum(gaps) + k
282    if total < min_needed:
283        raise ValueError(
284            f"Network {network} is too small to fit {k} range(s) with gaps {gaps}"
285        )
286
287    # suffix_sums[i] = sum(gaps[i:])
288    suffix_sums = [0] * (k + 1)
289    for i in range(k - 1, -1, -1):
290        suffix_sums[i] = suffix_sums[i + 1] + gaps[i]
291
292    def hi_for(i):
293        """Max start offset for range i that still leaves room for ranges i+1..k-1."""
294        return total - 1 - suffix_sums[i] - (k - 1 - i)
295
296    SMALL = 65536
297
298    def try_place_ranges():
299        """Try one left-to-right placement. Returns [(start, end), ...] or None."""
300        range_offsets = []
301        lo = 0
302        for i in range(k):
303            h = hi_for(i)
304            if h < lo:
305                return None
306            g = gaps[i]
307            space = h - lo + 1
308            if space <= SMALL:
309                candidates = [
310                    j
311                    for j in range(lo, h + 1)
312                    if not is_excluded(make(j)) and not is_excluded(make(j + g))
313                ]
314                if not candidates:
315                    return None
316                s = random.choice(candidates)
317            else:
318                s = None
319                for _ in range(1000):
320                    c = random.randint(lo, h)
321                    if not is_excluded(make(c)) and not is_excluded(make(c + g)):
322                        s = c
323                        break
324                if s is None:
325                    return None
326            range_offsets.append((s, s + g))
327            lo = s + g + 1
328        return range_offsets
329
330    range_offsets = None
331    for _ in range(1000):
332        range_offsets = try_place_ranges()
333        if range_offsets is not None:
334            break
335    if range_offsets is None:
336        raise ValueError(
337            f"Could not place {k} range(s) with gaps {gaps} with the given exclusions"
338        )
339
340    def in_any_range(offset):
341        return any(s <= offset <= e for s, e in range_offsets)
342
343    def extra_available(offset):
344        ip = make(offset)
345        return (
346            ip not in ex_ips
347            and not in_any_range(offset)
348            and not any(ip in net for net in ex_nets)
349        )
350
351    if total <= SMALL:
352        extra_candidates = [make(i) for i in range(total) if extra_available(i)]
353        if len(extra_candidates) < n:
354            raise ValueError(
355                f"Not enough addresses outside the range(s) for {n} extra IPs: "
356                f"found {len(extra_candidates)}"
357            )
358        extras = random.sample(extra_candidates, n)
359    else:
360        chosen = set()
361        extras = []
362        for _ in range(n):
363            for _ in range(1000):
364                offset = random.randrange(total)
365                candidate = make(offset)
366                if extra_available(offset) and candidate not in chosen:
367                    extras.append(candidate)
368                    chosen.add(candidate)
369                    break
370            else:
371                raise ValueError(
372                    f"Could not find enough extra addresses (got {len(extras)}/{n})"
373                )
374
375    result = []
376    for s, e in range_offsets:
377        result.append(make(s))
378        result.append(make(e))
379    result.extend(extras)
380    return result

Return a list of n + 2*k distinct random IPv4Interface within network.

gap is either an int or a list of ints. k = 1 when gap is an int, k = len(gap) otherwise.

The returned list has the form:

[ip_min1, ip_max1, ip_min2, ip_max2, ..., ip_mink, ip_maxk, ip1, ..., ipn]

Guarantees:

  • int(ip_max_i.ip) - int(ip_min_i.ip) == gap[i] (or gap when gap is an int)
  • ip_max_i < ip_min_{i+1} (ranges are strictly ordered, non-overlapping)
  • ip1 .. ipn are outside every range [ip_min_i, ip_max_i]

Raises ValueError if the constraints cannot be satisfied.

def random_ips_from_topology(data, topology):
383def random_ips_from_topology(data, topology):
384    """Assign random IPs to data.ips from data.nets based on a NetScheme0 topology.
385
386    topology: {net_name: [machine, ...] or {machine: iface_spec, ...}}
387    (the _topology class attribute format of NetScheme0)
388
389    For each machine m:
390    - belongs to exactly one network netX → data.ips.m  (in data.nets.netX)
391    - belongs to multiple networks       → data.ips.m_netX for each netX
392    """
393    # Build inverse map: machine -> [net_name, ...]  (preserve insertion order)
394    machine_nets = {}
395    for net_name, machines in topology.items():
396        names = machines.keys() if isinstance(machines, dict) else machines
397        for m in names:
398            machine_nets.setdefault(m, []).append(net_name)
399
400    # Assign IPs, tracking used addresses per network to avoid duplicates
401    assigned = {}  # {net_name: list[IPv4Interface]}
402    for machine, nets in machine_nets.items():
403        for net_name in nets:
404            network = getattr(data.nets, net_name)
405            ip = random_ipv4s(network, 1, exclude_ips=assigned.get(net_name))[0]
406            assigned.setdefault(net_name, []).append(ip)
407            attr = machine if len(nets) == 1 else f"{machine}_{net_name}"
408            setattr(data.ips, attr, ip)

Assign random IPs to data.ips from data.nets based on a NetScheme0 topology.

topology: {net_name: [machine, ...] or {machine: iface_spec, ...}} (the _topology class attribute format of NetScheme0)

For each machine m:

  • belongs to exactly one network netX → data.ips.m (in data.nets.netX)
  • belongs to multiple networks → data.ips.m_netX for each netX
def random_mac_address(prefix=None, n=1):
411def random_mac_address(prefix=None, n=1):
412    """Return a list of n distinct random EUI MAC addresses.
413
414    `prefix` is an optional colon- or dash-separated hex string specifying the
415    leading bytes (e.g. ``"00:1A:2B"`` for a 3-byte OUI prefix).
416    Remaining bytes are chosen at random.
417
418    Raises ValueError if n distinct addresses cannot be generated.
419    """
420    if prefix is not None:
421        sep = "-" if "-" in prefix else ":"
422        prefix_bytes = bytes(int(x, 16) for x in prefix.split(sep))
423    else:
424        prefix_bytes = b""
425
426    suffix_len = 6 - len(prefix_bytes)
427    if suffix_len < 0:
428        raise ValueError(
429            f"Prefix too long: {prefix!r} ({len(prefix_bytes)} bytes, max 6)"
430        )
431
432    total = 256**suffix_len
433    if n > total:
434        raise ValueError(
435            f"Cannot generate {n} distinct MACs with {suffix_len} random bytes (max {total})"
436        )
437
438    def make(suffix_bytes):
439        all_bytes = prefix_bytes + suffix_bytes
440        if prefix is None:
441            all_bytes = bytes([all_bytes[0] & 0xFE]) + all_bytes[1:]
442        return EUI(":".join(f"{b:02x}" for b in all_bytes))
443
444    if total <= 65536:
445        candidates = [make(i.to_bytes(suffix_len, "big")) for i in range(total)]
446        return random.sample(candidates, n)
447
448    seen = set()
449    result = []
450    for _ in range(n):
451        for _ in range(1000):
452            suffix = random.randbytes(suffix_len)
453            if suffix not in seen:
454                seen.add(suffix)
455                result.append(make(suffix))
456                break
457        else:
458            raise ValueError(
459                f"Could not generate {n} distinct MACs after 1000 attempts"
460            )
461    return result

Return a list of n distinct random EUI MAC addresses.

prefix is an optional colon- or dash-separated hex string specifying the leading bytes (e.g. "00:1A:2B" for a 3-byte OUI prefix). Remaining bytes are chosen at random.

Raises ValueError if n distinct addresses cannot be generated.