lib.ips
1import json 2import random 3from ipaddress import IPv4Address, IPv4Interface, IPv4Network 4 5import msgpack 6from netaddr import EUI 7 8 9class IPv4Addresses: 10 """Dynamic container for named IPv4Address attributes, with no boilerplate. 11 12 Usage: 13 ips = IPv4Addresses() 14 ips.ip1 = IPv4Address("192.168.1.1") 15 ips.ip2 = IPv4Address("192.168.1.2") 16 """ 17 18 def __setattr__(self, name, value): 19 if not isinstance(value, IPv4Interface): 20 raise TypeError(f"{name}: expected IPv4Interface, got {type(value).__name__}") 21 super().__setattr__(name, value) 22 23 # ---------------- dict ---------------- 24 25 def to_dict(self): 26 return {k: str(v) for k, v in self.__dict__.items()} 27 28 @classmethod 29 def from_dict(cls, d): 30 obj = cls() 31 for k, v in d.items(): 32 super(IPv4Addresses, obj).__setattr__(k, IPv4Interface(v)) 33 return obj 34 35 # ---------------- JSON ---------------- 36 37 def to_json(self): 38 return json.dumps(self.to_dict()) 39 40 @classmethod 41 def from_json(cls, s): 42 return cls.from_dict(json.loads(s)) 43 44 # ---------------- msgpack ---------------- 45 46 def pack(self): 47 return msgpack.packb(self.to_dict(), use_bin_type=True) 48 49 @classmethod 50 def unpack(cls, blob): 51 return cls.from_dict(msgpack.unpackb(blob, raw=False)) 52 53 54class IPv4Networks: 55 """Dynamic container for named IPv4Network attributes, with no boilerplate. 56 57 Usage: 58 nets = IPv4Networks() 59 nets.lan = IPv4Network("192.168.1.0/24") 60 nets.mgmt = IPv4Network("10.0.0.0/8") 61 """ 62 63 def __setattr__(self, name, value): 64 if not isinstance(value, IPv4Network): 65 raise TypeError(f"{name}: expected IPv4Network, got {type(value).__name__}") 66 super().__setattr__(name, value) 67 68 # ---------------- dict ---------------- 69 70 def to_dict(self): 71 return {k: str(v) for k, v in self.__dict__.items()} 72 73 @classmethod 74 def from_dict(cls, d): 75 obj = cls() 76 for k, v in d.items(): 77 super(IPv4Networks, obj).__setattr__(k, IPv4Network(v)) 78 return obj 79 80 # ---------------- JSON ---------------- 81 82 def to_json(self): 83 return json.dumps(self.to_dict()) 84 85 @classmethod 86 def from_json(cls, s): 87 return cls.from_dict(json.loads(s)) 88 89 # ---------------- msgpack ---------------- 90 91 def pack(self): 92 return msgpack.packb(self.to_dict(), use_bin_type=True) 93 94 @classmethod 95 def unpack(cls, blob): 96 return cls.from_dict(msgpack.unpackb(blob, raw=False)) 97 98 99_PRIVATE_NETWORKS = [ 100 IPv4Network("10.0.0.0/8"), 101 IPv4Network("172.16.0.0/12"), 102 IPv4Network("192.168.0.0/16"), 103] 104 105 106def _pick_one(mask, from_network, exclude, from_private_network): 107 """Pick a single random /mask network. Internal helper for random_ipv4networks.""" 108 block_size = 2 ** (32 - mask) 109 110 def make(abs_idx): 111 return IPv4Network((abs_idx * block_size, mask)) 112 113 def available(net): 114 return not any(net.overlaps(ex) for ex in exclude) 115 116 def indices_in(a, b): 117 first = (a + block_size - 1) // block_size 118 last = (b + 1) // block_size - 1 119 return range(first, last + 1) if first <= last else range(0, 0) 120 121 flo = int(from_network.network_address) 122 fhi = int(from_network.broadcast_address) 123 if from_private_network: 124 boxes = [] 125 for p in _PRIVATE_NETWORKS: 126 lo = max(flo, int(p.network_address)) 127 hi = min(fhi, int(p.broadcast_address)) 128 if lo <= hi: 129 boxes.append((lo, hi)) 130 else: 131 boxes = [(flo, fhi)] 132 133 spans = [(r.start, len(r)) for lo, hi in boxes for r in [indices_in(lo, hi)] if r] 134 total = sum(count for _, count in spans) 135 136 if total == 0: 137 raise ValueError(f"No /{mask} network fits in the search space") 138 139 def pick_abs_idx(): 140 if from_private_network and len(spans) > 1: 141 # Pick uniformly across private ranges (not weighted by range size) 142 start, count = random.choice(spans) 143 return start + random.randrange(count) 144 n = random.randrange(total) 145 for start, count in spans: 146 if n < count: 147 return start + n 148 n -= count 149 return spans[-1][0] # unreachable: n < total guarantees a match above 150 151 if total <= 65536: 152 candidates = [make(i) for start, count in spans 153 for i in range(start, start + count) 154 if available(make(i))] 155 if not candidates: 156 raise ValueError(f"No available /{mask} network with the given constraints") 157 return random.choice(candidates) 158 159 for _ in range(1000): 160 candidate = make(pick_abs_idx()) 161 if available(candidate): 162 return candidate 163 164 raise ValueError(f"No available /{mask} network found after 1000 attempts") 165 166 167def random_ipv4networks( 168 masks, 169 from_network=IPv4Network("0.0.0.0/0"), 170 exclude=None, 171 from_private_network=False, 172): 173 """Return a list of disjoint random IPv4Networks. 174 175 `masks` is an int or a list of ints; one network is returned per mask, 176 all mutually disjoint and not overlapping any network in `exclude`. 177 Optionally restricted to RFC-1918 private ranges with `from_private_network=True`. 178 179 `from` being a reserved keyword, the parameter is named `from_network`. 180 181 Raises ValueError if any network cannot be allocated. 182 """ 183 if isinstance(masks, int): 184 masks = [masks] 185 186 working_exclude = list(exclude) if exclude else [] 187 result = [] 188 for mask in masks: 189 net = _pick_one(mask, from_network, working_exclude, from_private_network) 190 result.append(net) 191 working_exclude.append(net) 192 return result 193 194 195def random_ipv4s(network, n=1, exclude_ips=None, exclude_nets=None): 196 """Return a list of n distinct random IPv4Interface within `network`, 197 excluding any address in `exclude_ips` or covered by any network in `exclude_nets`. 198 199 Raises ValueError if fewer than n addresses are available. 200 """ 201 ex_ips = set(exclude_ips) if exclude_ips else set() 202 ex_nets = list(exclude_nets) if exclude_nets else [] 203 204 base = int(network.network_address) 205 total = network.num_addresses 206 207 def make(i): 208 return IPv4Interface(f"{IPv4Address(base + i)}/{network.prefixlen}") 209 210 def available(ip): 211 return ip not in ex_ips and not any(ip in net for net in ex_nets) 212 213 # Exclude network address (offset 0) and broadcast (offset total-1) for prefix <= 30. 214 # /31 (point-to-point, RFC 3021) and /32 (host route) have no reserved boundary addresses. 215 if network.prefixlen <= 30: 216 host_range = range(1, total - 1) 217 else: 218 host_range = range(total) 219 220 # Small space: enumerate all candidates, then sample. 221 if total <= 65536: 222 candidates = [make(i) for i in host_range if available(make(i))] 223 if len(candidates) < n: 224 raise ValueError( 225 f"Not enough available addresses: need {n}, found {len(candidates)}" 226 ) 227 return random.sample(candidates, n) 228 229 # Large space: pick one at a time, accumulating into a working exclude set. 230 working_ex = set(ex_ips) 231 result = [] 232 for _ in range(n): 233 for _ in range(1000): 234 candidate = make(random.randrange(host_range.start, host_range.stop)) 235 if candidate not in working_ex and not any(candidate in net for net in ex_nets): 236 result.append(candidate) 237 working_ex.add(candidate) 238 break 239 else: 240 raise ValueError( 241 f"Could not find enough available addresses (got {len(result)}/{n})" 242 ) 243 return result 244 245 246def random_ipv4s_with_range(network, gap, n=1, exclude_ips=None, exclude_nets=None): 247 """Return a list of n + 2*k distinct random IPv4Interface within `network`. 248 249 `gap` is either an int or a list of ints. ``k = 1`` when gap is an int, 250 ``k = len(gap)`` otherwise. 251 252 The returned list has the form: 253 [ip_min1, ip_max1, ip_min2, ip_max2, ..., ip_mink, ip_maxk, ip1, ..., ipn] 254 255 Guarantees: 256 - int(ip_max_i.ip) - int(ip_min_i.ip) == gap[i] (or gap when gap is an int) 257 - ip_max_i < ip_min_{i+1} (ranges are strictly ordered, non-overlapping) 258 - ip1 .. ipn are outside every range [ip_min_i, ip_max_i] 259 260 Raises ValueError if the constraints cannot be satisfied. 261 """ 262 gaps = [gap] if isinstance(gap, int) else list(gap) 263 k = len(gaps) 264 265 ex_ips = set(exclude_ips) if exclude_ips else set() 266 ex_nets = list(exclude_nets) if exclude_nets else [] 267 268 base = int(network.network_address) 269 total = network.num_addresses 270 prefixlen = network.prefixlen 271 272 def make(i): 273 return IPv4Interface(f"{IPv4Address(base + i)}/{prefixlen}") 274 275 def is_excluded(ip): 276 return ip in ex_ips or any(ip in net for net in ex_nets) 277 278 # Minimum space: k ranges placed back-to-back with 1-address gaps between them. 279 # Minimum last offset: sum(gaps) + k - 1; need total >= sum(gaps) + k. 280 min_needed = sum(gaps) + k 281 if total < min_needed: 282 raise ValueError( 283 f"Network {network} is too small to fit {k} range(s) with gaps {gaps}" 284 ) 285 286 # suffix_sums[i] = sum(gaps[i:]) 287 suffix_sums = [0] * (k + 1) 288 for i in range(k - 1, -1, -1): 289 suffix_sums[i] = suffix_sums[i + 1] + gaps[i] 290 291 def hi_for(i): 292 """Max start offset for range i that still leaves room for ranges i+1..k-1.""" 293 return total - 1 - suffix_sums[i] - (k - 1 - i) 294 295 SMALL = 65536 296 297 def try_place_ranges(): 298 """Try one left-to-right placement. Returns [(start, end), ...] or None.""" 299 range_offsets = [] 300 lo = 0 301 for i in range(k): 302 h = hi_for(i) 303 if h < lo: 304 return None 305 g = gaps[i] 306 space = h - lo + 1 307 if space <= SMALL: 308 candidates = [ 309 j 310 for j in range(lo, h + 1) 311 if not is_excluded(make(j)) and not is_excluded(make(j + g)) 312 ] 313 if not candidates: 314 return None 315 s = random.choice(candidates) 316 else: 317 s = None 318 for _ in range(1000): 319 c = random.randint(lo, h) 320 if not is_excluded(make(c)) and not is_excluded(make(c + g)): 321 s = c 322 break 323 if s is None: 324 return None 325 range_offsets.append((s, s + g)) 326 lo = s + g + 1 327 return range_offsets 328 329 range_offsets = None 330 for _ in range(1000): 331 range_offsets = try_place_ranges() 332 if range_offsets is not None: 333 break 334 if range_offsets is None: 335 raise ValueError( 336 f"Could not place {k} range(s) with gaps {gaps} with the given exclusions" 337 ) 338 339 def in_any_range(offset): 340 return any(s <= offset <= e for s, e in range_offsets) 341 342 def extra_available(offset): 343 ip = make(offset) 344 return ( 345 ip not in ex_ips 346 and not in_any_range(offset) 347 and not any(ip in net for net in ex_nets) 348 ) 349 350 if total <= SMALL: 351 extra_candidates = [make(i) for i in range(total) if extra_available(i)] 352 if len(extra_candidates) < n: 353 raise ValueError( 354 f"Not enough addresses outside the range(s) for {n} extra IPs: " 355 f"found {len(extra_candidates)}" 356 ) 357 extras = random.sample(extra_candidates, n) 358 else: 359 chosen = set() 360 extras = [] 361 for _ in range(n): 362 for _ in range(1000): 363 offset = random.randrange(total) 364 candidate = make(offset) 365 if extra_available(offset) and candidate not in chosen: 366 extras.append(candidate) 367 chosen.add(candidate) 368 break 369 else: 370 raise ValueError( 371 f"Could not find enough extra addresses (got {len(extras)}/{n})" 372 ) 373 374 result = [] 375 for s, e in range_offsets: 376 result.append(make(s)) 377 result.append(make(e)) 378 result.extend(extras) 379 return result 380 381 382def random_ips_from_topology(data, topology): 383 """Assign random IPs to data.ips from data.nets based on a NetScheme0 topology. 384 385 topology: {net_name: [machine, ...] or {machine: iface_spec, ...}} 386 (the _topology class attribute format of NetScheme0) 387 388 For each machine m: 389 - belongs to exactly one network netX → data.ips.m (in data.nets.netX) 390 - belongs to multiple networks → data.ips.m_netX for each netX 391 """ 392 # Build inverse map: machine -> [net_name, ...] (preserve insertion order) 393 machine_nets = {} 394 for net_name, machines in topology.items(): 395 names = machines.keys() if isinstance(machines, dict) else machines 396 for m in names: 397 machine_nets.setdefault(m, []).append(net_name) 398 399 # Assign IPs, tracking used addresses per network to avoid duplicates 400 assigned = {} # {net_name: list[IPv4Interface]} 401 for machine, nets in machine_nets.items(): 402 for net_name in nets: 403 network = getattr(data.nets, net_name) 404 ip = random_ipv4s(network, 1, exclude_ips=assigned.get(net_name))[0] 405 assigned.setdefault(net_name, []).append(ip) 406 attr = machine if len(nets) == 1 else f"{machine}_{net_name}" 407 setattr(data.ips, attr, ip) 408 409 410def random_mac_address(prefix=None, n=1): 411 """Return a list of n distinct random EUI MAC addresses. 412 413 `prefix` is an optional colon- or dash-separated hex string specifying the 414 leading bytes (e.g. ``"00:1A:2B"`` for a 3-byte OUI prefix). 415 Remaining bytes are chosen at random. 416 417 Raises ValueError if n distinct addresses cannot be generated. 418 """ 419 if prefix is not None: 420 sep = "-" if "-" in prefix else ":" 421 prefix_bytes = bytes(int(x, 16) for x in prefix.split(sep)) 422 else: 423 prefix_bytes = b"" 424 425 suffix_len = 6 - len(prefix_bytes) 426 if suffix_len < 0: 427 raise ValueError( 428 f"Prefix too long: {prefix!r} ({len(prefix_bytes)} bytes, max 6)" 429 ) 430 431 total = 256**suffix_len 432 if n > total: 433 raise ValueError( 434 f"Cannot generate {n} distinct MACs with {suffix_len} random bytes (max {total})" 435 ) 436 437 def make(suffix_bytes): 438 all_bytes = prefix_bytes + suffix_bytes 439 if prefix is None: 440 all_bytes = bytes([all_bytes[0] & 0xFE]) + all_bytes[1:] 441 return EUI(":".join(f"{b:02x}" for b in all_bytes)) 442 443 if total <= 65536: 444 candidates = [make(i.to_bytes(suffix_len, "big")) for i in range(total)] 445 return random.sample(candidates, n) 446 447 seen = set() 448 result = [] 449 for _ in range(n): 450 for _ in range(1000): 451 suffix = random.randbytes(suffix_len) 452 if suffix not in seen: 453 seen.add(suffix) 454 result.append(make(suffix)) 455 break 456 else: 457 raise ValueError( 458 f"Could not generate {n} distinct MACs after 1000 attempts" 459 ) 460 return result
10class IPv4Addresses: 11 """Dynamic container for named IPv4Address attributes, with no boilerplate. 12 13 Usage: 14 ips = IPv4Addresses() 15 ips.ip1 = IPv4Address("192.168.1.1") 16 ips.ip2 = IPv4Address("192.168.1.2") 17 """ 18 19 def __setattr__(self, name, value): 20 if not isinstance(value, IPv4Interface): 21 raise TypeError(f"{name}: expected IPv4Interface, got {type(value).__name__}") 22 super().__setattr__(name, value) 23 24 # ---------------- dict ---------------- 25 26 def to_dict(self): 27 return {k: str(v) for k, v in self.__dict__.items()} 28 29 @classmethod 30 def from_dict(cls, d): 31 obj = cls() 32 for k, v in d.items(): 33 super(IPv4Addresses, obj).__setattr__(k, IPv4Interface(v)) 34 return obj 35 36 # ---------------- JSON ---------------- 37 38 def to_json(self): 39 return json.dumps(self.to_dict()) 40 41 @classmethod 42 def from_json(cls, s): 43 return cls.from_dict(json.loads(s)) 44 45 # ---------------- msgpack ---------------- 46 47 def pack(self): 48 return msgpack.packb(self.to_dict(), use_bin_type=True) 49 50 @classmethod 51 def unpack(cls, blob): 52 return cls.from_dict(msgpack.unpackb(blob, raw=False))
Dynamic container for named IPv4Address attributes, with no boilerplate.
Usage:
ips = IPv4Addresses() ips.ip1 = IPv4Address("192.168.1.1") ips.ip2 = IPv4Address("192.168.1.2")
55class IPv4Networks: 56 """Dynamic container for named IPv4Network attributes, with no boilerplate. 57 58 Usage: 59 nets = IPv4Networks() 60 nets.lan = IPv4Network("192.168.1.0/24") 61 nets.mgmt = IPv4Network("10.0.0.0/8") 62 """ 63 64 def __setattr__(self, name, value): 65 if not isinstance(value, IPv4Network): 66 raise TypeError(f"{name}: expected IPv4Network, got {type(value).__name__}") 67 super().__setattr__(name, value) 68 69 # ---------------- dict ---------------- 70 71 def to_dict(self): 72 return {k: str(v) for k, v in self.__dict__.items()} 73 74 @classmethod 75 def from_dict(cls, d): 76 obj = cls() 77 for k, v in d.items(): 78 super(IPv4Networks, obj).__setattr__(k, IPv4Network(v)) 79 return obj 80 81 # ---------------- JSON ---------------- 82 83 def to_json(self): 84 return json.dumps(self.to_dict()) 85 86 @classmethod 87 def from_json(cls, s): 88 return cls.from_dict(json.loads(s)) 89 90 # ---------------- msgpack ---------------- 91 92 def pack(self): 93 return msgpack.packb(self.to_dict(), use_bin_type=True) 94 95 @classmethod 96 def unpack(cls, blob): 97 return cls.from_dict(msgpack.unpackb(blob, raw=False))
Dynamic container for named IPv4Network attributes, with no boilerplate.
Usage:
nets = IPv4Networks() nets.lan = IPv4Network("192.168.1.0/24") nets.mgmt = IPv4Network("10.0.0.0/8")
168def random_ipv4networks( 169 masks, 170 from_network=IPv4Network("0.0.0.0/0"), 171 exclude=None, 172 from_private_network=False, 173): 174 """Return a list of disjoint random IPv4Networks. 175 176 `masks` is an int or a list of ints; one network is returned per mask, 177 all mutually disjoint and not overlapping any network in `exclude`. 178 Optionally restricted to RFC-1918 private ranges with `from_private_network=True`. 179 180 `from` being a reserved keyword, the parameter is named `from_network`. 181 182 Raises ValueError if any network cannot be allocated. 183 """ 184 if isinstance(masks, int): 185 masks = [masks] 186 187 working_exclude = list(exclude) if exclude else [] 188 result = [] 189 for mask in masks: 190 net = _pick_one(mask, from_network, working_exclude, from_private_network) 191 result.append(net) 192 working_exclude.append(net) 193 return result
Return a list of disjoint random IPv4Networks.
masks is an int or a list of ints; one network is returned per mask,
all mutually disjoint and not overlapping any network in exclude.
Optionally restricted to RFC-1918 private ranges with from_private_network=True.
from being a reserved keyword, the parameter is named from_network.
Raises ValueError if any network cannot be allocated.
196def random_ipv4s(network, n=1, exclude_ips=None, exclude_nets=None): 197 """Return a list of n distinct random IPv4Interface within `network`, 198 excluding any address in `exclude_ips` or covered by any network in `exclude_nets`. 199 200 Raises ValueError if fewer than n addresses are available. 201 """ 202 ex_ips = set(exclude_ips) if exclude_ips else set() 203 ex_nets = list(exclude_nets) if exclude_nets else [] 204 205 base = int(network.network_address) 206 total = network.num_addresses 207 208 def make(i): 209 return IPv4Interface(f"{IPv4Address(base + i)}/{network.prefixlen}") 210 211 def available(ip): 212 return ip not in ex_ips and not any(ip in net for net in ex_nets) 213 214 # Exclude network address (offset 0) and broadcast (offset total-1) for prefix <= 30. 215 # /31 (point-to-point, RFC 3021) and /32 (host route) have no reserved boundary addresses. 216 if network.prefixlen <= 30: 217 host_range = range(1, total - 1) 218 else: 219 host_range = range(total) 220 221 # Small space: enumerate all candidates, then sample. 222 if total <= 65536: 223 candidates = [make(i) for i in host_range if available(make(i))] 224 if len(candidates) < n: 225 raise ValueError( 226 f"Not enough available addresses: need {n}, found {len(candidates)}" 227 ) 228 return random.sample(candidates, n) 229 230 # Large space: pick one at a time, accumulating into a working exclude set. 231 working_ex = set(ex_ips) 232 result = [] 233 for _ in range(n): 234 for _ in range(1000): 235 candidate = make(random.randrange(host_range.start, host_range.stop)) 236 if candidate not in working_ex and not any(candidate in net for net in ex_nets): 237 result.append(candidate) 238 working_ex.add(candidate) 239 break 240 else: 241 raise ValueError( 242 f"Could not find enough available addresses (got {len(result)}/{n})" 243 ) 244 return result
Return a list of n distinct random IPv4Interface within network,
excluding any address in exclude_ips or covered by any network in exclude_nets.
Raises ValueError if fewer than n addresses are available.
247def random_ipv4s_with_range(network, gap, n=1, exclude_ips=None, exclude_nets=None): 248 """Return a list of n + 2*k distinct random IPv4Interface within `network`. 249 250 `gap` is either an int or a list of ints. ``k = 1`` when gap is an int, 251 ``k = len(gap)`` otherwise. 252 253 The returned list has the form: 254 [ip_min1, ip_max1, ip_min2, ip_max2, ..., ip_mink, ip_maxk, ip1, ..., ipn] 255 256 Guarantees: 257 - int(ip_max_i.ip) - int(ip_min_i.ip) == gap[i] (or gap when gap is an int) 258 - ip_max_i < ip_min_{i+1} (ranges are strictly ordered, non-overlapping) 259 - ip1 .. ipn are outside every range [ip_min_i, ip_max_i] 260 261 Raises ValueError if the constraints cannot be satisfied. 262 """ 263 gaps = [gap] if isinstance(gap, int) else list(gap) 264 k = len(gaps) 265 266 ex_ips = set(exclude_ips) if exclude_ips else set() 267 ex_nets = list(exclude_nets) if exclude_nets else [] 268 269 base = int(network.network_address) 270 total = network.num_addresses 271 prefixlen = network.prefixlen 272 273 def make(i): 274 return IPv4Interface(f"{IPv4Address(base + i)}/{prefixlen}") 275 276 def is_excluded(ip): 277 return ip in ex_ips or any(ip in net for net in ex_nets) 278 279 # Minimum space: k ranges placed back-to-back with 1-address gaps between them. 280 # Minimum last offset: sum(gaps) + k - 1; need total >= sum(gaps) + k. 281 min_needed = sum(gaps) + k 282 if total < min_needed: 283 raise ValueError( 284 f"Network {network} is too small to fit {k} range(s) with gaps {gaps}" 285 ) 286 287 # suffix_sums[i] = sum(gaps[i:]) 288 suffix_sums = [0] * (k + 1) 289 for i in range(k - 1, -1, -1): 290 suffix_sums[i] = suffix_sums[i + 1] + gaps[i] 291 292 def hi_for(i): 293 """Max start offset for range i that still leaves room for ranges i+1..k-1.""" 294 return total - 1 - suffix_sums[i] - (k - 1 - i) 295 296 SMALL = 65536 297 298 def try_place_ranges(): 299 """Try one left-to-right placement. Returns [(start, end), ...] or None.""" 300 range_offsets = [] 301 lo = 0 302 for i in range(k): 303 h = hi_for(i) 304 if h < lo: 305 return None 306 g = gaps[i] 307 space = h - lo + 1 308 if space <= SMALL: 309 candidates = [ 310 j 311 for j in range(lo, h + 1) 312 if not is_excluded(make(j)) and not is_excluded(make(j + g)) 313 ] 314 if not candidates: 315 return None 316 s = random.choice(candidates) 317 else: 318 s = None 319 for _ in range(1000): 320 c = random.randint(lo, h) 321 if not is_excluded(make(c)) and not is_excluded(make(c + g)): 322 s = c 323 break 324 if s is None: 325 return None 326 range_offsets.append((s, s + g)) 327 lo = s + g + 1 328 return range_offsets 329 330 range_offsets = None 331 for _ in range(1000): 332 range_offsets = try_place_ranges() 333 if range_offsets is not None: 334 break 335 if range_offsets is None: 336 raise ValueError( 337 f"Could not place {k} range(s) with gaps {gaps} with the given exclusions" 338 ) 339 340 def in_any_range(offset): 341 return any(s <= offset <= e for s, e in range_offsets) 342 343 def extra_available(offset): 344 ip = make(offset) 345 return ( 346 ip not in ex_ips 347 and not in_any_range(offset) 348 and not any(ip in net for net in ex_nets) 349 ) 350 351 if total <= SMALL: 352 extra_candidates = [make(i) for i in range(total) if extra_available(i)] 353 if len(extra_candidates) < n: 354 raise ValueError( 355 f"Not enough addresses outside the range(s) for {n} extra IPs: " 356 f"found {len(extra_candidates)}" 357 ) 358 extras = random.sample(extra_candidates, n) 359 else: 360 chosen = set() 361 extras = [] 362 for _ in range(n): 363 for _ in range(1000): 364 offset = random.randrange(total) 365 candidate = make(offset) 366 if extra_available(offset) and candidate not in chosen: 367 extras.append(candidate) 368 chosen.add(candidate) 369 break 370 else: 371 raise ValueError( 372 f"Could not find enough extra addresses (got {len(extras)}/{n})" 373 ) 374 375 result = [] 376 for s, e in range_offsets: 377 result.append(make(s)) 378 result.append(make(e)) 379 result.extend(extras) 380 return result
Return a list of n + 2*k distinct random IPv4Interface within network.
gap is either an int or a list of ints. k = 1 when gap is an int,
k = len(gap) otherwise.
The returned list has the form:
[ip_min1, ip_max1, ip_min2, ip_max2, ..., ip_mink, ip_maxk, ip1, ..., ipn]
Guarantees:
- int(ip_max_i.ip) - int(ip_min_i.ip) == gap[i] (or gap when gap is an int)
- ip_max_i < ip_min_{i+1} (ranges are strictly ordered, non-overlapping)
- ip1 .. ipn are outside every range [ip_min_i, ip_max_i]
Raises ValueError if the constraints cannot be satisfied.
383def random_ips_from_topology(data, topology): 384 """Assign random IPs to data.ips from data.nets based on a NetScheme0 topology. 385 386 topology: {net_name: [machine, ...] or {machine: iface_spec, ...}} 387 (the _topology class attribute format of NetScheme0) 388 389 For each machine m: 390 - belongs to exactly one network netX → data.ips.m (in data.nets.netX) 391 - belongs to multiple networks → data.ips.m_netX for each netX 392 """ 393 # Build inverse map: machine -> [net_name, ...] (preserve insertion order) 394 machine_nets = {} 395 for net_name, machines in topology.items(): 396 names = machines.keys() if isinstance(machines, dict) else machines 397 for m in names: 398 machine_nets.setdefault(m, []).append(net_name) 399 400 # Assign IPs, tracking used addresses per network to avoid duplicates 401 assigned = {} # {net_name: list[IPv4Interface]} 402 for machine, nets in machine_nets.items(): 403 for net_name in nets: 404 network = getattr(data.nets, net_name) 405 ip = random_ipv4s(network, 1, exclude_ips=assigned.get(net_name))[0] 406 assigned.setdefault(net_name, []).append(ip) 407 attr = machine if len(nets) == 1 else f"{machine}_{net_name}" 408 setattr(data.ips, attr, ip)
Assign random IPs to data.ips from data.nets based on a NetScheme0 topology.
topology: {net_name: [machine, ...] or {machine: iface_spec, ...}} (the _topology class attribute format of NetScheme0)
For each machine m:
- belongs to exactly one network netX → data.ips.m (in data.nets.netX)
- belongs to multiple networks → data.ips.m_netX for each netX
411def random_mac_address(prefix=None, n=1): 412 """Return a list of n distinct random EUI MAC addresses. 413 414 `prefix` is an optional colon- or dash-separated hex string specifying the 415 leading bytes (e.g. ``"00:1A:2B"`` for a 3-byte OUI prefix). 416 Remaining bytes are chosen at random. 417 418 Raises ValueError if n distinct addresses cannot be generated. 419 """ 420 if prefix is not None: 421 sep = "-" if "-" in prefix else ":" 422 prefix_bytes = bytes(int(x, 16) for x in prefix.split(sep)) 423 else: 424 prefix_bytes = b"" 425 426 suffix_len = 6 - len(prefix_bytes) 427 if suffix_len < 0: 428 raise ValueError( 429 f"Prefix too long: {prefix!r} ({len(prefix_bytes)} bytes, max 6)" 430 ) 431 432 total = 256**suffix_len 433 if n > total: 434 raise ValueError( 435 f"Cannot generate {n} distinct MACs with {suffix_len} random bytes (max {total})" 436 ) 437 438 def make(suffix_bytes): 439 all_bytes = prefix_bytes + suffix_bytes 440 if prefix is None: 441 all_bytes = bytes([all_bytes[0] & 0xFE]) + all_bytes[1:] 442 return EUI(":".join(f"{b:02x}" for b in all_bytes)) 443 444 if total <= 65536: 445 candidates = [make(i.to_bytes(suffix_len, "big")) for i in range(total)] 446 return random.sample(candidates, n) 447 448 seen = set() 449 result = [] 450 for _ in range(n): 451 for _ in range(1000): 452 suffix = random.randbytes(suffix_len) 453 if suffix not in seen: 454 seen.add(suffix) 455 result.append(make(suffix)) 456 break 457 else: 458 raise ValueError( 459 f"Could not generate {n} distinct MACs after 1000 attempts" 460 ) 461 return result
Return a list of n distinct random EUI MAC addresses.
prefix is an optional colon- or dash-separated hex string specifying the
leading bytes (e.g. "00:1A:2B" for a 3-byte OUI prefix).
Remaining bytes are chosen at random.
Raises ValueError if n distinct addresses cannot be generated.