SRE.lib_sre
1import datetime 2import enum 3import fnmatch 4import re as _re 5import os 6import shlex 7import socket 8import subprocess 9import sys 10import tempfile 11import math 12 13from dataclasses import asdict 14 15from concurrent.futures import ThreadPoolExecutor, as_completed 16from typing import Tuple 17 18import zstandard as zstd 19 20from pathlib import Path 21 22import msgpack 23import json 24import random 25from netaddr import EUI 26from dataclasses import dataclass, fields 27from ipaddress import ( 28 ip_address, 29 ip_network, 30 IPv4Address, IPv6Address, 31 IPv4Interface, 32 IPv4Network, IPv6Network 33) 34 35from Kathara.manager.Kathara import Kathara 36from Kathara.model.Lab import Lab 37 38from . import params 39from .common import (QuestionText, QuestionDummy, GradeElement, GradePart, InfoMachine, InfoLab, InfoInterface, 40 TranslatedText, _tt_hash_str) 41from .utils import log_error, error_quit, log_debug 42from .params import SRE 43 44 45class ErrorCategory(enum.Enum): 46 ERROR = "ERROR" 47 WARNING = "WARNING" 48 49 50def write_error(string): 51 print(string, file=sys.stderr) 52 53 54def _lookup_translations(caller_globals: dict, text: str, inline: dict) -> dict: 55 """Merge ``_TRANSLATIONS`` from a module's globals with inline kwargs. 56 57 Inline kwargs take priority. ``None`` values in ``_TRANSLATIONS`` are 58 skipped (they mark strings not yet translated). 59 """ 60 result = {} 61 for lang, strings in caller_globals.get('_TRANSLATIONS', {}).items(): 62 val = strings.get(text) 63 if val is not None and lang not in inline: 64 result[lang] = val 65 result.update(inline) 66 return result 67 68 69def tr(text: str, **langs) -> TranslatedText: 70 """Build a TranslatedText with 'en' as the default language key. 71 72 Looks up the caller module's ``_TRANSLATIONS`` dict for any language not 73 already supplied as a keyword argument. Inline kwargs take priority. 74 ``None`` values in ``_TRANSLATIONS`` are skipped (placeholder for 75 untranslated strings). 76 77 ``_TRANSLATIONS`` must be defined **before** the first ``tr()`` call in 78 the file, because module-level expressions are evaluated at import time. 79 80 Usage in lab files:: 81 82 title = tr("My lab") # translation from _TRANSLATIONS 83 title = tr("My lab", fr="Mon TP") # inline, retro-compatible 84 85 For labs whose primary language is not English, use :func:`make_tr` instead. 86 """ 87 merged = _lookup_translations(sys._getframe(1).f_globals, text, langs) 88 return TranslatedText({'en': text, **merged}) 89 90 91def make_tr(default_lang: str, translations: dict | None = None): 92 """Return a tr() function bound to *default_lang* as the first-positional language. 93 94 If *translations* is supplied explicitly it is used directly (no frame 95 inspection). Otherwise the caller module's globals are inspected for a 96 ``_TRANSLATIONS`` dict at each call. 97 98 In both cases ``_TRANSLATIONS`` (or the dict passed as *translations*) must 99 be defined **before** the first ``tr()`` call in the file, because 100 module-level expressions are evaluated at import time. 101 102 Usage with explicit dict (no frame inspection):: 103 104 _TRANSLATIONS = {'fr': {"Mon TP": "My lab"}} 105 tr = make_tr('fr', translations=_TRANSLATIONS) 106 title = tr("Mon TP") # 'en' from _TRANSLATIONS 107 108 Usage with frame inspection:: 109 110 tr = make_tr('fr') 111 _TRANSLATIONS = {'en': {"Mon TP": "My lab"}} # must come before tr() calls 112 title = tr("Mon TP") 113 """ 114 if translations is not None: 115 _wrapped = {'_TRANSLATIONS': translations} 116 117 def _tr(text: str, **langs) -> TranslatedText: 118 merged = _lookup_translations(_wrapped, text, langs) 119 return TranslatedText({default_lang: text, **merged}) 120 else: 121 caller_globals = sys._getframe(1).f_globals 122 123 def _tr(text: str, **langs) -> TranslatedText: 124 merged = _lookup_translations(caller_globals, text, langs) 125 return TranslatedText({default_lang: text, **merged}) 126 127 return _tr 128 129 130def no_tr(text: str) -> str: 131 """Marker for prepare-sre-translations: leave this string untranslated. 132 133 Identity passthrough at runtime (returns *text* unchanged). It is purely a 134 hint to the translation tooling so the wrapped string is never wrapped in 135 tr() nor registered in _TRANSLATIONS — use it for short internal labels 136 (e.g. grade-element titles) that are not natural-language prose. 137 """ 138 return text 139 140 141_PORT_WILDCARD_RE = _re.compile(r'^(\d*)(X+):(\d+)(?:/(\w+))?$', _re.IGNORECASE) 142 143 144def _is_port_free(port: int, proto: str) -> bool: 145 sock_type = socket.SOCK_DGRAM if proto == 'udp' else socket.SOCK_STREAM 146 with socket.socket(socket.AF_INET, sock_type) as s: 147 try: 148 s.bind(('', port)) 149 return True 150 except OSError: 151 return False 152 153 154def _resolve_port(spec: str, used: set) -> str: 155 """Resolve a port spec, replacing trailing X wildcards with a free host port. 156 157 E.g. '80XX:80/tcp' -> '8042:80/tcp' (first free port in 8000-8099). 158 Specs without wildcards are returned unchanged. 159 `used` is updated in-place to track ports already allocated in this call. 160 """ 161 m = _PORT_WILDCARD_RE.match(spec) 162 if not m: 163 return spec 164 prefix_str, x_str, container_port, proto = m.group(1), m.group(2), m.group(3), (m.group(4) or 'tcp').lower() 165 range_size = 10 ** len(x_str) 166 base = (int(prefix_str) if prefix_str else 0) * range_size 167 for offset in range(range_size): 168 host_port = base + offset 169 key = (host_port, proto) 170 if key not in used and _is_port_free(host_port, proto): 171 used.add(key) 172 return f"{host_port}:{container_port}/{proto}" 173 error_quit(f"no free {proto} port in range {base}-{base + range_size - 1}") 174 175 176def _resolve_xauth_cookie(): 177 """Return the validated hex X11 magic cookie from $SRE_XAUTH_COOKIE, or None. 178 179 The env var is set by sre-wrapper (from `xauth list`) and preserved across 180 sudo via env_keep. When `sre start --xauth-file <file>` is used, 181 action_start parses that file early (with privileges) and overrides this 182 env var with the file's cookie before NetScheme is built. 183 """ 184 cookie = os.environ.get(params.sre_xauth_cookie_env_variable) 185 return cookie if cookie and _re.fullmatch(r'[0-9A-Fa-f]+', cookie) else None 186 187 188class _AppendOp: 189 """A file-append operation registered via NetScheme0.append_to_file().""" 190 __slots__ = ('filename', 'content', 'permissions', 'owner', 'mtime') 191 192 def __init__(self, filename: str, content: bytes, 193 permissions: int | None, owner: str | None, mtime: float | None): 194 self.filename = filename 195 self.content = content 196 self.permissions = permissions 197 self.owner = owner 198 self.mtime = mtime 199 200 201class _IdempotentAppendOp: 202 """An idempotent file-append operation registered via NetScheme0.idempotent_append_to_file(). 203 204 Appends content only if the file does not already end with it. 205 Optionally sets permissions, ownership, and mtime regardless of whether content was appended. 206 """ 207 __slots__ = ('filename', 'content', 'permissions', 'owner', 'mtime') 208 209 def __init__(self, filename: str, content: bytes, 210 permissions: int | None, owner: str | None, mtime: float | None): 211 self.filename = filename 212 self.content = content 213 self.permissions = permissions 214 self.owner = owner 215 self.mtime = mtime 216 217 218class _FileOp: 219 """A file-write operation registered via NetScheme0.file().""" 220 __slots__ = ('filename', 'content', 'permissions', 'owner', 'mtime') 221 222 def __init__(self, filename: str, content: bytes, permissions: int, owner: str, mtime: float): 223 self.filename = filename 224 self.content = content 225 self.permissions = permissions 226 self.owner = owner 227 self.mtime = mtime 228 229 230class _HostCmdOp: 231 """A host-side command registered via NetScheme0.host_cmd().""" 232 __slots__ = ('command',) 233 234 def __init__(self, command: str): 235 self.command = command 236 237 238class _CpFromHostOp: 239 """A deferred file-copy-from-host operation; file is read at execution time.""" 240 __slots__ = ('src_path', 'dest', 'permissions', 'owner', 'mtime') 241 242 def __init__(self, src_path, dest: str, permissions, owner: str, mtime): 243 self.src_path = src_path 244 self.dest = dest 245 self.permissions = permissions 246 self.owner = owner 247 self.mtime = mtime 248 249 250class _CpToHostOp: 251 """Copy a file from a container to the host files_dir.""" 252 __slots__ = ('src_path', 'dest_path', 'permissions') 253 254 def __init__(self, src_path: str, dest_path: str, permissions: int = None): 255 self.src_path = src_path # absolute path inside the container 256 self.dest_path = dest_path # resolved absolute path on the host 257 self.permissions = permissions # mode to chmod on the host (None = leave as-is) 258 259 260class _HostCallbackOp: 261 """A host-side Python callback registered via NetScheme0.host_callback().""" 262 __slots__ = ('callback',) 263 264 def __init__(self, callback): 265 self.callback = callback 266 267 268class _IPv4InterfaceContainer: 269 """Holds named IPv4Interface attributes. Automatically available as Data0.ips.""" 270 271 def __setattr__(self, name, value): 272 if not isinstance(value, IPv4Interface): 273 raise TypeError(f"{name}: expected IPv4Interface, got {type(value).__name__}") 274 super().__setattr__(name, value) 275 276 def __getitem__(self, name): 277 return getattr(self, name) 278 279 def to_dict(self): 280 return {k: str(v) for k, v in self.__dict__.items()} 281 282 283class _IPv4NetContainer: 284 """Holds named IPv4Network attributes. Automatically available as Data0.nets.""" 285 286 def __setattr__(self, name, value): 287 if not isinstance(value, IPv4Network): 288 raise TypeError(f"{name}: expected IPv4Network, got {type(value).__name__}") 289 super().__setattr__(name, value) 290 291 def __getitem__(self, name): 292 return getattr(self, name) 293 294 def to_dict(self): 295 return {k: str(v) for k, v in self.__dict__.items()} 296 297 298class _MacContainer: 299 """Holds named EUI MAC address attributes. Automatically available as Data0.macs.""" 300 301 def __setattr__(self, name, value): 302 if not isinstance(value, EUI): 303 raise TypeError(f"{name}: expected EUI, got {type(value).__name__}") 304 super().__setattr__(name, value) 305 306 def __getitem__(self, name): 307 return getattr(self, name) 308 309 def to_dict(self): 310 return {k: str(v) for k, v in self.__dict__.items()} 311 312 313@dataclass 314class Flavor0: 315 """Base class for optional lab-parameterisation dataclasses. 316 317 Subclass with ``@dataclass(slots=True)`` and declare your fields. If the 318 module defines ``flavor_form_at_startup = True``, the GUI renders the 319 ``flavor_form`` string (containing ``@@{field:regex}@@`` markers) as a form 320 before starting the lab. 321 322 Named presets can be attached as class attributes, e.g.:: 323 324 Flavor.easy = Flavor(nb=1) 325 326 and referenced on the CLI with ``--set-flavor-name easy``. 327 328 Override :meth:`allowed_by_user` to restrict which values students may choose. 329 """ 330 331 _registry = {} 332 333 def __init_subclass__(cls, **kwargs): 334 super().__init_subclass__(**kwargs) 335 key = f"{cls.__module__}.{cls.__qualname__}" 336 cls._type_key = key 337 Flavor0._registry[key] = cls 338 339 def to_dict(self): 340 """Serialise dataclass fields to a plain dict using :meth:`Data0._encode_value`.""" 341 return {f.name: Data0._encode_value(getattr(self, f.name)) for f in fields(self)} 342 343 @classmethod 344 def from_dict(cls, d): 345 """Reconstruct a ``Flavor0`` instance from a dict produced by :meth:`to_dict`.""" 346 decoded = {k: Data0._decode_value(v) for k, v in d.items()} 347 return cls(**decoded) # type: ignore[call-arg] 348 349 @classmethod 350 def from_form_dict(cls, d: dict): 351 """Build a Flavor from form field values (strings from text inputs, bools from 352 submit buttons). Declared dataclass fields are coerced to their declared type. 353 Extra keys in d (form fields not declared in the dataclass) are set as plain 354 attributes on the instance so that allowed_by_user() can read them. 355 """ 356 from typing import get_type_hints 357 hints = get_type_hints(cls) 358 declared = {f.name for f in fields(cls)} 359 coerced = {} 360 for f in fields(cls): 361 v = d.get(f.name) 362 if v is None: 363 continue 364 t = hints.get(f.name, str) 365 if not isinstance(t, type) or isinstance(v, t): 366 coerced[f.name] = v 367 elif t == bool: 368 coerced[f.name] = str(v).lower() in ('true', '1', 'yes') 369 elif t == int: 370 coerced[f.name] = int(v) 371 elif t == float: 372 coerced[f.name] = float(v) 373 else: 374 coerced[f.name] = str(v) 375 obj = cls(**coerced) 376 for k, v in d.items(): 377 if k not in declared: 378 setattr(obj, k, v) 379 return obj 380 381 def allowed_by_user(self) -> Tuple[bool, str]: 382 """Return ``(True, "")`` if the student may use this flavor; ``(False, reason)`` otherwise. 383 384 Override in subclasses to restrict which values students can choose. 385 """ 386 return True, "" 387 388 389class Data0: 390 """Base class for lab-specific parameter dataclasses. 391 392 Subclass with ``@dataclass(slots=True)`` and declare your fields normally. 393 Three dynamic containers are injected automatically into every instance by 394 ``__post_init__``: 395 396 * ``self.ips`` — :class:`_IPv4InterfaceContainer`: named ``IPv4Interface`` values 397 * ``self.nets`` — :class:`_IPv4NetContainer`: named ``IPv4Network`` values 398 * ``self.macs`` — :class:`_MacContainer`: named ``EUI`` MAC-address values 399 400 The class-level ``_registry`` maps ``"module.ClassName"`` keys to concrete 401 subclasses so that ``from_dict``/``unpack``/``from_json`` can reconstruct the 402 correct type from serialised data without knowing it in advance. 403 404 Override ``generate(flavor=None)`` as a ``@classmethod`` to produce a fresh 405 randomised instance. The result is serialised to ``data.json`` at lab start 406 and reloaded for each evaluation. 407 """ 408 409 _registry = {} 410 _rng = random.Random() 411 412 def __init_subclass__(cls, **kwargs): 413 super().__init_subclass__(**kwargs) 414 key = f"{cls.__module__}.{cls.__qualname__}" 415 cls._type_key = key 416 Data0._registry[key] = cls 417 418 def __post_init__(self): 419 """Inject ``ips``, ``nets``, ``macs``, and ``flavor`` into every instance. 420 421 Called automatically by the dataclass ``__init__``. Uses 422 ``object.__setattr__`` so the injection works even when the subclass 423 declares ``slots=True``. 424 """ 425 # Inject ips/nets/flavor into __dict__ (available even with slots=True subclasses 426 # because Data0 itself has no __slots__, so __dict__ is always inherited). 427 object.__setattr__(self, 'ips', _IPv4InterfaceContainer()) 428 object.__setattr__(self, 'nets', _IPv4NetContainer()) 429 object.__setattr__(self, 'macs', _MacContainer()) 430 object.__setattr__(self, 'flavor', None) 431 object.__setattr__(self, '__flavor_name', None) 432 object.__setattr__(self, '__current_srelab_file', None) 433 434 # ---------------- lifecycle hooks ---------------- 435 @classmethod 436 def compute_pre_generate(cls, flavor=None): 437 """Hook called just before generate() and after JSON/msgpack deserialization. 438 439 Override in a subclass to set class-level derived state from *flavor*. 440 The default implementation is a no-op (fully retro-compatible). 441 """ 442 pass 443 444 def compute_post_generate(self): 445 """Hook called after generate() and after JSON/msgpack deserialization. 446 447 Override in a subclass to set instance-level derived state from data fields. 448 The default implementation is a no-op (fully retro-compatible). 449 """ 450 pass 451 452 # ---------------- dict conversion ---------------- 453 def to_dict(self): 454 """Serialise to a plain dict (JSON-safe). 455 456 Dataclass fields are encoded via :meth:`_encode_value`. ``ips``, ``nets``, 457 and ``macs`` are stored as nested dicts of strings. An attached ``Flavor`` 458 is stored under the ``"flavor"`` key with its type key. 459 """ 460 result = {} 461 for f in fields(self): 462 v = getattr(self, f.name) 463 result[f.name] = self._encode_value(v) 464 result['ips'] = self.ips.to_dict() 465 result['nets'] = self.nets.to_dict() 466 result['macs'] = self.macs.to_dict() 467 if self.flavor is not None: 468 result['flavor'] = {"__flavor_type__": self.flavor._type_key, "data": self.flavor.to_dict()} 469 flavor_name = getattr(self, '__flavor_name', None) 470 if flavor_name is not None: 471 result['__flavor_name'] = flavor_name 472 current_srelab_file = getattr(self, '__current_srelab_file', None) 473 if current_srelab_file is not None: 474 result['__current_srelab_file'] = current_srelab_file 475 return result 476 477 @classmethod 478 def from_dict(cls, d): 479 """Reconstruct a ``Data0`` instance from a plain dict produced by :meth:`to_dict`. 480 481 The concrete subclass is resolved from ``d["__type__"]`` when *cls* is 482 ``Data0`` itself; otherwise the calling class is used directly. 483 Raises ``ValueError`` for unknown type keys. 484 """ 485 d = dict(d) 486 ips_data = d.pop('ips', {}) 487 nets_data = d.pop('nets', {}) 488 macs_data = d.pop('macs', {}) 489 flavor_data = d.pop('flavor', None) 490 flavor_name = d.pop('__flavor_name', None) 491 current_srelab_file = d.pop('__current_srelab_file', None) 492 decoded = {k: cls._decode_value(v) for k, v in d.items()} 493 obj = cls(**decoded) 494 for k, v in ips_data.items(): 495 setattr(obj.ips, k, IPv4Interface(v)) 496 for k, v in nets_data.items(): 497 setattr(obj.nets, k, IPv4Network(v)) 498 for k, v in macs_data.items(): 499 setattr(obj.macs, k, EUI(v)) 500 if flavor_data is not None: 501 flavor_key = flavor_data.get("__flavor_type__") 502 if flavor_key not in Flavor0._registry: 503 raise ValueError(f"unknown Flavor0 type: {flavor_key!r}") 504 flavor_cls = Flavor0._registry[flavor_key] 505 object.__setattr__(obj, 'flavor', flavor_cls.from_dict(flavor_data["data"])) 506 object.__setattr__(obj, '__flavor_name', flavor_name) 507 object.__setattr__(obj, '__current_srelab_file', current_srelab_file) 508 return obj 509 510 # ---------------- value encoding ---------------- 511 @staticmethod 512 def _encode_value(v): 513 """Encode a field value for JSON/msgpack storage. 514 515 * ``IPv4Address`` / ``IPv6Address`` → ``{"__ip__": "..."}`` 516 * ``IPv4Network`` / ``IPv6Network`` → ``{"__net__": "..."}`` 517 * nested ``Data0`` → ``{"__type__": "...", "data": {...}}`` 518 * all other values are returned unchanged. 519 """ 520 if isinstance(v, (IPv4Address, IPv6Address)): 521 return {"__ip__": str(v)} 522 if isinstance(v, (IPv4Network, IPv6Network)): 523 return {"__net__": str(v)} 524 525 if isinstance(v, Data0): 526 return { 527 "__type__": v._type_key, 528 "data": v.to_dict() 529 } 530 return v 531 532 @staticmethod 533 def _decode_value(v): 534 """Decode a value produced by :meth:`_encode_value` back to its Python type.""" 535 if isinstance(v, dict): 536 if "__ip__" in v: 537 return ip_address(v["__ip__"]) 538 if "__net__" in v: 539 return ip_network(v["__net__"]) 540 if "__type__" in v: 541 type_key = v["__type__"] 542 if type_key not in Data0._registry: 543 raise ValueError(f"unknown Data0 type: {type_key!r}") 544 return Data0._registry[type_key].from_dict(v["data"]) 545 return v 546 547 # ---------------- msgpack ---------------- 548 def pack(self): 549 """Serialise to a msgpack binary blob (includes type key for polymorphic reload).""" 550 return msgpack.packb( 551 { 552 "__type__": self._type_key, 553 "data": self.to_dict() 554 }, 555 use_bin_type=True 556 ) 557 558 @classmethod 559 def unpack(cls, blob): 560 """Deserialise a msgpack blob produced by :meth:`pack`. Raises ``ValueError`` for unknown types.""" 561 obj = msgpack.unpackb(blob, raw=False) 562 type_key = obj.get("__type__") 563 if type_key not in Data0._registry: 564 raise ValueError(f"unknown Data0 type: {type_key!r}") 565 result = Data0._registry[type_key].from_dict(obj["data"]) 566 type(result).compute_pre_generate(result.flavor) 567 result.compute_post_generate() 568 return result 569 570 # ---------------- JSON ---------------- 571 def to_json(self): 572 """Serialise to a JSON string (includes type key for polymorphic reload).""" 573 return json.dumps({ 574 "__type__": self._type_key, 575 "data": self.to_dict() 576 }) 577 578 @classmethod 579 def from_json(cls, s): 580 """Deserialise a JSON string produced by :meth:`to_json`.""" 581 obj = json.loads(s) 582 concrete = Data0._registry[obj["__type__"]] 583 result = concrete.from_dict(obj["data"]) 584 type(result).compute_pre_generate(result.flavor) 585 result.compute_post_generate() 586 return result 587 588 @classmethod 589 def load_from_json_file(cls, filename): 590 """ 591 Load a Data0-derived object from a JSON file. 592 The concrete class is resolved automatically. 593 """ 594 595 path = Path(filename) 596 with path.open("r", encoding="utf-8") as f: 597 obj = json.load(f) 598 concrete = Data0._registry[obj["__type__"]] 599 result = concrete.from_dict(obj["data"]) 600 type(result).compute_pre_generate(result.flavor) 601 result.compute_post_generate() 602 return result 603 604 def save_to_json_file(self, filename): 605 """Write the instance to *filename* as JSON, mode 0o600 (lab secrets stay private).""" 606 path = Path(filename) 607 fd = os.open( 608 path, 609 os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 610 0o600 611 ) 612 with os.fdopen(fd, "w", encoding="utf-8") as f: 613 json.dump( 614 { 615 "__type__": self._type_key, 616 "data": self.to_dict() 617 }, 618 f, 619 indent=2 # optional but operationally useful 620 ) 621 622 623def sre_state(fn=None, *, user_allowed=False, description=''): 624 def decorator(f): 625 f._is_sre_state = True 626 f._sre_state_user_allowed = user_allowed 627 f._sre_state_description = description 628 return f 629 630 if fn is not None: 631 return decorator(fn) 632 return decorator 633 634 635class NetScheme0: 636 """Base class for lab network topology definitions. 637 638 Subclasses declare the topology as class-level dicts and implement state 639 methods decorated with :func:`sre_state`. 640 641 Class-level attributes: 642 643 * ``_machine_specs`` — ``{name: {Machine kwargs}}`` — one entry per container. 644 * ``_network_specs`` — ``{net_name: {color: ...}}`` — optional display hints. 645 * ``_topology`` — ``{net_name: [machine, ...]}`` or ``{net_name: {machine: iface}}`` 646 — which machines connect to each network and on which interface. 647 648 If any of these three attributes is not defined in the subclass (neither as a class 649 variable nor as a property), the corresponding attribute is read from ``data`` 650 (``data.topology``, ``data.machine_specs``, ``data.network_specs``). 651 Explicit subclass definitions always take precedence. 652 653 Instance attributes set by ``__init__``: 654 655 * ``self.data`` — the ``Data0`` instance for this lab run. 656 * ``self.informations`` — Markdown text shown in the Informations tab (set this in ``__init__``). 657 * Named machine and network objects (e.g. ``self.router``, ``self.lan``). 658 659 The ``initial`` state method is always called at ``sre start``. Additional 660 states are applied with ``sre state <lab> <state_name>``. 661 """ 662 663 _machine_specs = {} 664 _network_specs = {} # {net_name: {'color': ...}} — optional colors for networks 665 _topology = {} # {net_name: [m, ...] or {m: iface, ...}} — mixed allowed 666 667 def _resolve_spec(self, attr: str, data_attr: str): 668 """Return the topology/spec dict for *attr*. 669 670 Resolution order: 671 1. If the concrete subclass (or any class before ``NetScheme0`` in the MRO) 672 defines *attr* as a class variable or property, return that value. 673 2. Otherwise look for *data_attr* on ``self.data`` (checks instance attributes 674 first, then class-level attributes set e.g. by ``compute_pre_generate``). 675 3. If neither source provides the attribute, return the ``NetScheme0`` default 676 (an empty dict), which is equivalent to "no topology/specs defined". 677 """ 678 for cls in type(self).__mro__: 679 if cls is NetScheme0: 680 break 681 if attr in cls.__dict__: 682 return getattr(self, attr) 683 return getattr(self.data, data_attr, getattr(NetScheme0, attr)) 684 685 def __init__(self, data, running_lab_name, lab_hash=None): 686 """Build all ``Machine``, ``Network``, and ``NetAdapter`` objects from the class-level specs. 687 688 Args: 689 data: a ``Data0`` instance containing lab-specific parameters. 690 running_lab_name: the runtime identifier ``{ts}@@@{lab}@@@{user}``. 691 lab_hash: optional Kathara lab hash (resolved lazily if omitted). 692 """ 693 self.data = data 694 self.running_lab_name = running_lab_name 695 self.debug_project = os.path.exists(params.debug_project_marker_filename(running_lab_name)) 696 self.lab_name = params.get_lab_name_from_running_lab_name(running_lab_name) 697 self.lab_hash = lab_hash 698 self.current_srelab_file = params.get_current_srelab_file_from_running_lab_name(running_lab_name) 699 self.informations = "" 700 701 _machine_specs = self._resolve_spec('_machine_specs', 'machine_specs') 702 _network_specs = self._resolve_spec('_network_specs', 'network_specs') 703 _topology = self._resolve_spec('_topology', 'topology') 704 705 for name, parameters in _machine_specs.items(): 706 setattr(self, name, Machine(name=name, **parameters)) 707 708 # Create networks from _network_specs (with optional color), then remaining from _topology 709 for name, parameters in _network_specs.items(): 710 setattr(self, name, Network(name=name, **parameters)) 711 712 for net_name in _topology: 713 if not hasattr(self, net_name): 714 setattr(self, net_name, Network(name=net_name)) 715 716 # Create NetAdapters from _topology. 717 # Each value is either a list (auto interface) or a dict {machine: iface_spec}. 718 # iface_spec can be: None (auto), an int, or a (int, mac) tuple. 719 # Interface auto-assignment counts prior connections per machine across all networks. 720 _iface_counter: dict[str, int] = {} 721 for net_name, machines in _topology.items(): 722 net = getattr(self, net_name) 723 if isinstance(machines, dict): 724 items = machines.items() 725 else: 726 items = ((m, None) for m in machines) 727 for mname, iface_spec in items: 728 machine = getattr(self, mname) 729 if isinstance(iface_spec, tuple): 730 iface, mac = iface_spec 731 else: 732 iface, mac = iface_spec, None 733 if iface is None: 734 iface = _iface_counter.get(mname, 0) 735 _iface_counter[mname] = max(_iface_counter.get(mname, 0), iface) + 1 736 NetAdapter(network=net, machine=machine, interface=iface, mac=mac) 737 738 self._ops: dict[str, list] = {} # machine → [str | _FileOp, ...] 739 self._host_ops: dict[int, list] = {} 740 741 self.net_config = None 742 743 def host_interfaces_from_topology(self) -> dict: 744 """Return {machine_name: [net_name, ...]} derived from the resolved topology.""" 745 topology = self._resolve_spec('_topology', 'topology') 746 result = {} 747 for net_name, machines in topology.items(): 748 names = machines.keys() if isinstance(machines, dict) else machines 749 for mname in names: 750 result.setdefault(mname, []).append(net_name) 751 return result 752 753 @sre_state(user_allowed=False) 754 def initial(self): 755 pass 756 757 @classmethod 758 def get_state_methods(cls): 759 """Return a sorted list of all ``@sre_state``-decorated method names in this class hierarchy.""" 760 names = set() 761 for klass in cls.__mro__: 762 for name, fn in klass.__dict__.items(): 763 if callable(fn) and getattr(fn, '_is_sre_state', False): 764 names.add(name) 765 return sorted(names) 766 767 @classmethod 768 def is_state_user_allowed(cls, state): 769 """Return ``True`` if *state* was decorated with ``@sre_state(user_allowed=True)``.""" 770 for klass in cls.__mro__: 771 fn = klass.__dict__.get(state) 772 if fn is not None and getattr(fn, '_is_sre_state', False): 773 return getattr(fn, '_sre_state_user_allowed', False) 774 return False 775 776 @classmethod 777 def get_user_allowed_states(cls): 778 """Return ``{state_name: description}`` for all user-allowed states.""" 779 result = {} 780 for state in cls.get_state_methods(): 781 for klass in cls.__mro__: 782 fn = klass.__dict__.get(state) 783 if fn is not None and getattr(fn, '_is_sre_state', False): 784 if getattr(fn, '_sre_state_user_allowed', False): 785 result[state] = getattr(fn, '_sre_state_description', '') 786 break 787 return result 788 789 def get_data(self): 790 return self.data 791 792 def get_machine(self, machine_name): 793 """Return the :class:`Machine` with *machine_name*, or ``None`` if not found.""" 794 if not hasattr(self, machine_name): 795 return None 796 machine = getattr(self, machine_name) 797 if not isinstance(machine, Machine): 798 return None 799 return machine 800 801 def get_network(self, network_name): 802 """Return the :class:`Network` with *network_name*, or ``None`` if not found.""" 803 if not hasattr(self, network_name): 804 return None 805 machine = getattr(self, network_name) 806 if not isinstance(machine, Network): 807 return None 808 return machine 809 810 def get_machines(self): 811 for _, value in self.__dict__.items(): 812 if isinstance(value, Machine): 813 yield value 814 815 def get_machine_names(self): 816 return [m.name for m in self.get_machines()] 817 818 def has_privileged_machines(self): 819 return any(m.privileged for m in self.get_machines()) 820 821 def get_accessible_machine_names(self): 822 return [m.name for m in self.get_machines() 823 if not m.hidden and m.allow_connection] 824 825 def get_visible_machine_names(self): 826 return [m.name for m in self.get_visibles_machines()] 827 828 def get_visibles_machines(self): 829 for _, value in self.__dict__.items(): 830 if isinstance(value, Machine): 831 if not value.hidden: 832 yield value 833 834 def get_networks(self): 835 for _, value in self.__dict__.items(): 836 if isinstance(value, Network): 837 yield value 838 839 def get_topology(self): 840 return self._resolve_spec('_topology', 'topology') 841 842 def get_machine_specs(self): 843 return self._resolve_spec('_machine_specs', 'machine_specs') 844 845 def get_network_specs(self): 846 return self._resolve_spec('_network_specs', 'network_specs') 847 848 def cmd(self, machine, command, step=1): 849 """Register a shell command to execute inside *machine*'s container at *step*.""" 850 if SRE.args.debug: 851 print(f"[state] [{machine}] CMD (step={step}): {command}", file=sys.stderr) 852 self._ops.setdefault(step, {}).setdefault(machine, []).append(command) 853 854 def host_cmd(self, command, step=1): 855 """Register a shell command to execute on the **host** (not inside a container) at *step*. 856 857 Requires ``params.execute_commands_on_host`` to be enabled; aborts otherwise. 858 """ 859 if params.execute_commands_on_host is False: 860 sys.exit("host_cmd() is disabled by params.execute_commands_on_host") 861 if SRE.args.debug: 862 print(f"[state] HOST_CMD (step={step}): {command}", file=sys.stderr) 863 self._host_ops.setdefault(step, []).append(_HostCmdOp(command)) 864 865 def host_callback(self, callback, step=1): 866 """Register a Python callable to invoke on the host at *step* (called with no arguments).""" 867 if SRE.args.debug: 868 print(f"[state] HOST_CALLBACK (step={step}): {getattr(callback, '__name__', repr(callback))}", 869 file=sys.stderr) 870 self._host_ops.setdefault(step, []).append(_HostCallbackOp(callback)) 871 872 def cp_from_host(self, src: str, machine: str, dest: str, owner: str = "root:root", permissions: int = None, 873 mtime: float = None, step=1): 874 """Register a file copy from the host into *machine*'s container at *step*. 875 876 *src* may be relative (resolved against ``params.files_dir``). 877 *dest* is the absolute path inside the container. 878 """ 879 orig_path = Path(src) 880 if not orig_path.is_absolute(): 881 orig_path = Path(params.files_dir(self.running_lab_name)) / orig_path 882 if SRE.args.debug: 883 print(f"[state] [{machine}] CP (step={step}): {orig_path} -> {dest} (owner={owner})", file=sys.stderr) 884 self._ops.setdefault(step, {}).setdefault(machine, []).append( 885 _CpFromHostOp(orig_path, dest, permissions, owner, mtime) 886 ) 887 888 def cp_to_host(self, machine: str, path: str, dest: str, permissions: int = None, step=1): 889 """Copy file `path` from `machine` to the host files_dir. 890 891 Args: 892 machine: container name 893 path: absolute path of the file inside the container 894 dest: relative destination path inside params.files_dir(running_lab_name) 895 permissions: file mode to apply on the host copy (default: None, leave as written) 896 step: execution step (default 1) 897 """ 898 files_dir = Path(params.files_dir(self.running_lab_name)).resolve() 899 dest_path = (files_dir / dest).resolve() 900 if not dest_path.is_relative_to(files_dir): 901 error_quit(f"cp_to_host: dest '{dest}' is outside files_dir '{files_dir}'") 902 if SRE.args.debug: 903 print(f"[state] [{machine}] CP_TO_HOST (step={step}): {path} -> {dest_path}", file=sys.stderr) 904 self._ops.setdefault(step, {}).setdefault(machine, []).append( 905 _CpToHostOp(path, str(dest_path), permissions) 906 ) 907 908 def file(self, machine, filename, content, permissions=0o644, owner="root:root", mtime=None, step=1): 909 """Register a file to create/overwrite on `machine` during the current state. 910 911 Args: 912 machine: machine name (str) 913 filename: absolute path inside the container (e.g. "/etc/myconfig") 914 content: file content (str or bytes) 915 permissions: octal mode (default 0o644) 916 owner: "user:group" string (default "root:root") 917 mtime: modification time as a Unix timestamp (float); defaults to now 918 step: execution step (default 1); higher steps run after lower ones 919 """ 920 import time as _time 921 raw = content.encode() if isinstance(content, str) else content 922 if SRE.args.debug: 923 print( 924 f"[state] [{machine}] FILE (step={step}): {filename} (permissions={permissions:#o}, owner={owner}, size={len(raw)}B)", 925 file=sys.stderr) 926 self._ops.setdefault(step, {}).setdefault(machine, []).append( 927 _FileOp(filename, raw, permissions, owner, mtime if mtime is not None else _time.time()) 928 ) 929 930 def append_to_file(self, machine, filename, content, permissions=None, owner=None, mtime=None, step=1): 931 """Append content to a file on `machine`; create the file if it does not exist. 932 933 Args: 934 machine: machine name (str) 935 filename: absolute path inside the container (e.g. "/etc/hosts") 936 content: content to append (str or bytes) 937 permissions: octal mode to set after appending (default: leave unchanged) 938 owner: "user:group" to set after appending (default: leave unchanged) 939 mtime: modification time as a Unix timestamp (default: leave unchanged) 940 step: execution step (default 1); higher steps run after lower ones 941 """ 942 raw = content.encode() if isinstance(content, str) else content 943 if SRE.args.debug: 944 print(f"[state] [{machine}] APPEND (step={step}): {filename} (size={len(raw)}B)", file=sys.stderr) 945 self._ops.setdefault(step, {}).setdefault(machine, []).append( 946 _AppendOp(filename, raw, permissions, owner, mtime) 947 ) 948 949 def idempotent_append_to_file(self, machine, filename, content, permissions=None, owner=None, mtime=None, step=1): 950 """Append content to a file on `machine` only if the file does not already end with it. 951 952 Idempotent version of append_to_file: safe to call multiple times — the content 953 is appended at most once per application. If permissions, owner, or mtime are 954 provided they are applied regardless of whether content was appended. 955 956 Args: 957 machine: machine name (str) 958 filename: absolute path inside the container (e.g. "/etc/hosts") 959 content: content to append (str or bytes) 960 permissions: octal mode to set after the check (default: leave unchanged) 961 owner: "user:group" to set after the check (default: leave unchanged) 962 mtime: modification time as a Unix timestamp (default: leave unchanged) 963 step: execution step (default 1); higher steps run after lower ones 964 """ 965 raw = content.encode() if isinstance(content, str) else content 966 if SRE.args.debug: 967 print(f"[state] [{machine}] IDEMPOTENT_APPEND (step={step}): {filename} (size={len(raw)}B)", 968 file=sys.stderr) 969 self._ops.setdefault(step, {}).setdefault(machine, []).append( 970 _IdempotentAppendOp(filename, raw, permissions, owner, mtime) 971 ) 972 973 def compute_state_ops(self, state): 974 self._ops = {} 975 self._host_ops = {} 976 if not hasattr(self, state): 977 error_quit(f"state method {state} does not exist") 978 method = getattr(self, state) 979 if not callable(method): 980 error_quit(f"state method {state} not callable") 981 try: 982 method() 983 except Exception as e: 984 error_quit(f"error during {state} execution: {e}") 985 return self._ops, self._host_ops 986 987 def get_new_lab_from_scheme(self): 988 lab = Lab(name=self.running_lab_name) 989 abb_lab_name = params.get_abbreviated_lab_name_from_running_lab_name(self.running_lab_name) 990 _used_ports: set = set() 991 xauth_cookie = _resolve_xauth_cookie() 992 for m in self.get_machines(): 993 m.ports = [_resolve_port(p, _used_ports) for p in m.ports] 994 m.envs[f"{params.sre_name_env_variable}={abb_lab_name}"] = True 995 if m.x11_host: 996 m.envs[f"{params.sre_host_ip_env_variable}={params.sre_host_ip}"] = True 997 if xauth_cookie: 998 m.envs[f"{params.sre_xauth_cookie_env_variable}={xauth_cookie}"] = True 999 volumes = [] 1000 if len(m.volumes) > 0: 1001 has_private_volume = False 1002 for v in m.volumes: 1003 if len(v) > 2 and v[3] == 'private': 1004 has_private_volume = True 1005 break 1006 1007 if has_private_volume and params.disable_volume_mount_on_root_partition: 1008 private_mount_dir = self.get_private_mount_dir() 1009 p = Path(private_mount_dir) 1010 while not p.exists(): 1011 p = p.parent 1012 if os.stat(p).st_dev == os.stat('/').st_dev: 1013 error_quit( 1014 f"Machine '{m.name}': private mount dir '{private_mount_dir}' is on the root partition (disable_volume_mount_on_root_partition=True)") 1015 1016 for v in m.volumes: 1017 if len(v) > 2 and v[3] == 'private': 1018 if v[0].startswith('/'): 1019 error_quit(f"volume {v[0]} is private and have an absolute path") 1020 v_host = f"{self.get_private_mount_dir()}/{v[0]}" 1021 os.makedirs(v_host, exist_ok=True) 1022 os.chmod(v_host, 0o777) 1023 else: 1024 if v[0].startswith('/'): 1025 v_host = v[0] 1026 else: 1027 v_host = f"{self.get_user_public_dir()}/{v[0]}" 1028 os.makedirs(v_host, exist_ok=True) 1029 os.chmod(v_host, 0o777) 1030 if v[2] not in ['rw', 'ro']: 1031 error_quit(f"volume mode {v[2]} not supported (only rw and ro)") 1032 1033 volumes.append(f"{v_host}|{v[1]}|{v[2]}") 1034 lab.new_machine(m.name, 1035 exec_commands=m.exec_commands, 1036 sysctls=m.sysctls, 1037 envs=m.envs, 1038 ports=m.ports, 1039 ulimits=m.ulimits, 1040 volumes=volumes, 1041 shell=m.kathara_shell, 1042 image=m.image, 1043 bridged=m.bridged, 1044 privileged=m.privileged, 1045 entrypoint=m.entrypoint, 1046 ) 1047 for net, netAdapter in m.net_adapters.items(): 1048 lab.connect_machine_to_link(m.name, net.name, machine_iface_number=netAdapter.interface, 1049 mac_address=str(netAdapter.mac).replace('-', 1050 ':').lower() if netAdapter.mac is not None else None) 1051 self.lab_hash = lab.hash 1052 return lab 1053 1054 def get_lab_hash(self): 1055 if self.lab_hash is None: 1056 lab = Lab(name=self.running_lab_name) 1057 self.lab_hash = lab.hash 1058 return self.lab_hash 1059 1060 def get_lab_from_kathara(self): 1061 lab_hash = self.get_lab_hash() 1062 return Kathara.get_instance().get_lab_from_api(lab_hash=lab_hash) 1063 1064 def get_public_lab_dir(self): 1065 return params.public_lab_dir(self.running_lab_name) 1066 1067 def get_private_lab_dir(self): 1068 return params.private_lab_dir(self.running_lab_name) 1069 1070 def get_files_dir(self): 1071 return params.files_dir(self.running_lab_name) 1072 1073 def get_private_mount_dir(self): 1074 return params.private_mount_dir(self.running_lab_name) 1075 1076 def get_user_public_dir(self): 1077 try: 1078 return Path(params.link_to_user_public_dir(self.running_lab_name)).readlink() 1079 except OSError: 1080 return Path(params.link_to_user_public_dir(self.running_lab_name)) 1081 1082 def get_shared_dir(self): 1083 return f"{self.get_user_public_dir()}/{params.shared_dir_name}" 1084 1085 def answers_dir(self): 1086 return params.answers_dir(self.running_lab_name) 1087 1088 def answers_file(self): 1089 return params.answers_filename(self.running_lab_name) 1090 1091 1092class NetAdapter: 1093 def __init__(self, network, machine, interface, mac=None, addresses=None): 1094 if addresses is None: 1095 addresses = [] 1096 if not isinstance(machine, Machine): 1097 raise ValueError("machine must an object of class Machine") 1098 if not isinstance(network, Network): 1099 raise ValueError("network must be an instance of Network") 1100 if mac is not None: 1101 first_byte = int(str(mac).replace('-', ':').split(':')[0], 16) 1102 if first_byte & 1: 1103 raise ValueError( 1104 f"MAC address {mac} on machine '{machine.name}': " 1105 f"multicast bit (LSB of first byte) is set — use a unicast address " 1106 f"(first byte must be even, e.g. replace '{hex(first_byte)}' with '{hex(first_byte & 0xfe)}')." 1107 ) 1108 self.network = network 1109 self.machine = machine 1110 self.interface = interface 1111 self.mac = mac 1112 self.addresses = addresses 1113 machine.net_adapters[network] = self 1114 network.net_adapters[machine] = self 1115 1116 1117class Network: 1118 def __init__(self, name, color=None, shape=None): 1119 self.name = name 1120 self.color = color 1121 self.shape = shape 1122 self.net_adapters = {} 1123 1124 def get_machines(self): 1125 for m in self.net_adapters.keys(): 1126 yield m 1127 1128 1129class Machine: 1130 def __init__(self, name, image=params.default_docker_image, bridged=False, x11_host=False, mem="", 1131 cpus=None, ipv6=None, 1132 exec_commands=[], 1133 sysctls={}, 1134 envs={}, 1135 ports=None, ulimits={}, volumes=[], 1136 shell=None, 1137 kathara_shell=None, 1138 privileged=None, entrypoint=None, args=[], 1139 hidden=False, 1140 allow_connection=True, 1141 color=None, 1142 shape=None): 1143 if ports is None: 1144 ports = [] 1145 self.name = name 1146 self.image = image 1147 self.bridged = bridged 1148 self.x11_host = x11_host 1149 self.mem = mem 1150 self.cpus = cpus 1151 self.ipv6 = ipv6 1152 self.exec_commands = exec_commands 1153 self.sysctls = sysctls 1154 # Copy: the constructor's mutable default {} is shared across Machines; 1155 # post-construction mutations in get_new_lab_from_scheme (e.g. SRE_HOST_IP 1156 # for x11_host machines) must not leak into machines that didn't opt in. 1157 self.envs = dict(envs) 1158 self.ports = ports 1159 self.ulimits = ulimits 1160 self.volumes = volumes 1161 self.shell = shell 1162 self.kathara_shell = kathara_shell 1163 self.privileged = privileged 1164 self.entrypoint = entrypoint 1165 self.args = args 1166 self.hidden = hidden 1167 self.allow_connection = allow_connection 1168 self.shape = shape 1169 self.color = color 1170 1171 self.net_adapters = {} 1172 1173 if params.disable_volume_mount_on_root_partition: 1174 for v in self.volumes: 1175 v_host = v[0] 1176 if v_host.startswith('/'): 1177 p = Path(v_host) 1178 while not p.exists(): 1179 p = p.parent 1180 if os.stat(p).st_dev == os.stat('/').st_dev: 1181 error_quit( 1182 f"Machine '{self.name}': volume host path '{v_host}' is on the root partition (disable_volume_mount_on_root_partition=True)") 1183 1184 if self.privileged and not params.allow_privileged_machines: 1185 error_quit( 1186 f"Machine '{self.name}': privileged mode is disabled (allow_privileged_machines=False). Can't run in privileged mode") 1187 1188 if not self.bridged and len(self.ports) > 0: 1189 error_quit("To add ports in a machine, you need to activate bridged mode") 1190 1191 1192class Grade0: 1193 """Base class for lab evaluation logic. 1194 1195 Subclasses override :meth:`grade` to *register* tests and questions. The 1196 actual execution happens later in :meth:`run_tests`, which drives a 1197 multi-step loop: 1198 1199 1. Call :meth:`grade` to register commands (they return placeholder values). 1200 2. Execute all registered commands in containers (via :mod:`exetests`). 1201 3. Repeat for each additional step (``self.max_step``). 1202 4. Call :meth:`grade` a final time — commands now return real results. 1203 5. Save the archive. 1204 1205 Key instance attributes available inside ``grade()``: 1206 1207 * ``self.data`` — the ``Data0`` instance (via ``net_scheme``). 1208 * ``self.step`` — current step number (1-based). 1209 * ``self.max_step`` — highest step number registered so far. 1210 """ 1211 1212 def __init__(self, net_scheme): 1213 self.net_scheme = net_scheme 1214 self.archive_dirs = [] 1215 self.files_to_save_in_archives = [] 1216 self._tests = None 1217 self._allow_errors_in_tests = None 1218 self.step = 0 1219 self.max_step = 1 1220 self._questions = None 1221 self._questions_order = None 1222 self._questions_current_order = None 1223 self._cheat_answers = {} 1224 self._answers = {} 1225 self._grade_list = [] 1226 self._grades = {} 1227 self._grade_parts: list[GradePart] = [] 1228 self._errors = [] 1229 self._total_grade_self_eval = 0 1230 self._total_max_self_eval = 0 1231 self._total_grade_exo_eval = 0 1232 self._total_max_exo_eval = 0 1233 self._maximum_mark = params.default_maximum_mark 1234 self._use_numerical_marks = params.use_numerical_marks_by_default 1235 self._display_marks_in_auto_evaluations = params.display_marks_in_auto_evaluations_by_default 1236 self._mark_self_eval = None 1237 self._mark_exo_eval = None 1238 self._section_counter = [] 1239 self.section_fmt = [('R', 1), ('N', 1), ('l', 2), ('N', 3)] 1240 self._eval_date = None 1241 self._re_eval_date = None 1242 self._default_language = 'en' 1243 self.auto_eval_count = 0 1244 self._exam_json = None 1245 self.full_reset() 1246 1247 def full_reset(self): 1248 """Reset all state including tests and answers (called from ``__init__``).""" 1249 self.step = 0 1250 self.max_step = 1 1251 self._tests = {} # keys : (machine,step)->(cmd, timeout)->(result, code)) 1252 self._allow_errors_in_tests = {} 1253 self._host_tests = {} # step -> {(command, timeout): (result, code)} 1254 self._allow_errors_in_host_tests = {} # (step, command, timeout) -> True 1255 self._answers = {} # hash -> answer 1256 self._eval_date = None 1257 self.reset_before_grade() 1258 1259 def reset_before_grade(self): 1260 """Clear questions, grade list, and section counters before each call to :meth:`grade`.""" 1261 self._questions = dict() # hash->Question 1262 self._questions_order = dict() # order -> [Question1, Question2, ...] 1263 self._questions_current_order = 100 1264 self._cheat_answers = {} # state->(question_hash->answer) 1265 self._grade_list = [] 1266 self._grades = dict() # title -> GradeElement 1267 self._grade_parts = [] 1268 self._section_counter = [] 1269 1270 def grade(self): 1271 """Override to register tests, questions, and grade elements. 1272 1273 Called multiple times during :meth:`run_tests` — once per step plus a 1274 final time when all results are available. Do **not** produce side-effects 1275 here; only call ``self.test()``, ``self.question_*()`` and 1276 ``self.add_grade_element()`` / ``self.set_grade()``. 1277 """ 1278 self._grade_list = [] 1279 self._grade_parts = [] 1280 1281 def _compute_mark(self, total_grade, total_max): 1282 """Compute a mark from a ``(total_grade, total_max)`` pair. 1283 1284 Returns ``None`` if ``total_max == 0``. 1285 Numerical mode (default): rounded to one decimal, scaled to ``_maximum_mark``. 1286 Letter mode: A+/A/B/C/D/F. 1287 """ 1288 if total_max == 0: 1289 return None 1290 if self._use_numerical_marks: 1291 return math.ceil(10 * self._maximum_mark * total_grade / total_max) / 10 1292 else: 1293 ratio = total_grade / total_max 1294 if ratio >= 18 / 20: 1295 return "A+" 1296 elif ratio >= 16 / 20: 1297 return "A" 1298 elif ratio >= 14 / 20: 1299 return "B" 1300 elif ratio >= 12 / 20: 1301 return "C" 1302 elif ratio >= 10 / 20: 1303 return "D" 1304 else: 1305 return "F" 1306 1307 def mark_self_eval(self): 1308 """Final mark over elements visible in self-eval (scope & SELF_EVAL_SCOPE).""" 1309 return self._compute_mark(self._total_grade_self_eval, self._total_max_self_eval) 1310 1311 def mark_exo_eval(self): 1312 """Final mark over elements visible in non-auto eval / outline / sheet.""" 1313 return self._compute_mark(self._total_grade_exo_eval, self._total_max_exo_eval) 1314 1315 def get_data(self): 1316 return self.net_scheme.get_data() 1317 1318 def get_errors(self): 1319 return self._errors 1320 1321 def get_grade_list(self): 1322 return self._grade_list 1323 1324 def get_grade_parts(self): 1325 return self._grade_parts 1326 1327 def get_answers(self): 1328 return self._answers 1329 1330 def get_cheat_answers(self, state: str): 1331 return self._cheat_answers.get(state) 1332 1333 def get_tests(self): 1334 return self._tests 1335 1336 def get_exetests_strings(self, step: int): 1337 result = dict() 1338 for (machine, step1) in self._tests.keys(): 1339 if step != step1: 1340 continue 1341 result[machine] = params.exetests_separator.join( 1342 [f"{timeout}:{cmd}" for (cmd, timeout) in self._tests[(machine, step)].keys()]) 1343 return result 1344 1345 def get_running_lab_name(self): 1346 return self.net_scheme.running_lab_name 1347 1348 def increment_section_counter(self, level: int): 1349 if len(self._section_counter) < level + 1: 1350 self._section_counter += [0 for i in range(level + 1 - len(self._section_counter))] 1351 else: 1352 self._section_counter = self._section_counter[:(level + 1)] 1353 self._section_counter[level] += 1 1354 1355 def set_section_counter(self, level: int, value: int): 1356 if len(self._section_counter) < level + 1: 1357 self._section_counter += [0 for i in range(level + 1 - len(self._section_counter))] 1358 else: 1359 self._section_counter = self._section_counter[:(level + 1)] 1360 self._section_counter[level] = value 1361 1362 def section(self, level: int = 0, fmt=None, show: int = None, pad: str = None): 1363 self.increment_section_counter(level) 1364 return self.current_section(level, fmt, show, pad) 1365 1366 def current_section(self, level: int = 0, fmt=None, show: int = None, pad: str = None): 1367 # fmt is a list of (type, depth[, prefix]) tuples, one per level. 1368 # type: R=ROMAN, r=roman, L=Letters, l=letters, N=number 1369 # depth: how many consecutive counters to display, ending at this level 1370 # prefix: optional string prepended to the result (default: '') 1371 # e.g. [('R',1),('N',2,' ')] → level 0: "I." / level 1: " I.1." 1372 if fmt is None: 1373 fmt = self.section_fmt 1374 1375 def _to_roman(n: int) -> str: 1376 vals = [ 1377 (1000, 'M'), (900, 'CM'), (500, 'D'), (400, 'CD'), 1378 (100, 'C'), (90, 'XC'), (50, 'L'), (40, 'XL'), 1379 (10, 'X'), (9, 'IX'), (5, 'V'), (4, 'IV'), (1, 'I'), 1380 ] 1381 result = '' 1382 for value, numeral in vals: 1383 while n >= value: 1384 result += numeral 1385 n -= value 1386 return result 1387 1388 def _to_capital_letter(n: int) -> str: 1389 result = '' 1390 while n > 0: 1391 n, r = divmod(n - 1, 26) 1392 result = chr(ord('A') + r) + result 1393 return result 1394 1395 def _to_lowercase_letter(n: int) -> str: 1396 result = '' 1397 while n > 0: 1398 n, r = divmod(n - 1, 26) 1399 result = chr(ord('a') + r) + result 1400 return result 1401 1402 def _convert(n: int, fmt_type: str) -> str: 1403 match fmt_type: 1404 case 'R': 1405 return _to_roman(n) 1406 case 'r': 1407 return _to_roman(n).lower() 1408 case 'L': 1409 return _to_capital_letter(n) 1410 case 'l': 1411 return _to_lowercase_letter(n) 1412 case _: 1413 return str(n) 1414 1415 entry = fmt[level] if level < len(fmt) else ('N', level + 1) 1416 _, depth = entry[:2] 1417 prefix = pad if pad is not None else (entry[2] if len(entry) > 2 else '') 1418 if show is not None: 1419 depth = show 1420 start = max(0, level - depth + 1) 1421 parts = [_convert(self._section_counter[j] if j < len(self._section_counter) else 0, 1422 fmt[j][0] if j < len(fmt) else 'N') 1423 for j in range(start, level + 1)] 1424 return prefix + ".".join(parts) + ". " 1425 1426 def load_answers(self): 1427 """Load student answers from ``answers.json`` into ``self._answers``.""" 1428 self._answers = {} 1429 try: 1430 fd = os.open(params.answers_filename(running_lab_name=self.get_running_lab_name()), 1431 os.O_RDONLY | os.O_NOFOLLOW) 1432 with os.fdopen(fd) as f: 1433 self._answers = json.load(f) 1434 except (OSError, json.JSONDecodeError): 1435 self._answers = {} 1436 1437 def get_questions_ordered(self): 1438 q = [] 1439 for _, v in sorted(self._questions_order.items()): 1440 q += v 1441 return q 1442 1443 def export_questions(self): 1444 pass 1445 1446 def test_host(self, command, step=1, timeout: int = params.default_timeout, default_value='', 1447 default_code: int = 0, allow_error: bool = False): 1448 """Register and retrieve the result of a host-side test command. 1449 1450 On the first call (registration pass) returns *default_value* / *default_code*. 1451 On subsequent calls (result pass) returns the actual ``(stdout, exit_code)``. 1452 Set *allow_error* to suppress error recording on non-zero exit. 1453 """ 1454 if params.execute_commands_on_host is False: 1455 sys.exit("test_host() is disabled by params.execute_commands_on_host") 1456 if self.max_step < step: 1457 self.max_step = step 1458 if step not in self._host_tests: 1459 self._host_tests[step] = {} 1460 if allow_error and (step, command, timeout) not in self._allow_errors_in_host_tests: 1461 self._allow_errors_in_host_tests[(step, command, timeout)] = True 1462 if (command, timeout) not in self._host_tests[step]: 1463 self._host_tests[step][(command, timeout)] = (default_value, default_code) 1464 return default_value, default_code 1465 return self._host_tests[step][(command, timeout)] 1466 1467 def test(self, machine_name, command, step=1, timeout: int = params.default_timeout, default_value='', 1468 default_code: int = 0, allow_error: bool = False): 1469 """Register and retrieve the result of a command executed inside *machine_name*. 1470 1471 On the first call (registration pass) returns *default_value* / *default_code*. 1472 On subsequent calls (result pass) returns the actual ``(stdout, exit_code)``. 1473 *timeout* is in seconds. Set *allow_error* to suppress error recording. 1474 """ 1475 if self.max_step < step: 1476 self.max_step = step 1477 if (machine_name, step) not in self._tests: 1478 self._tests[(machine_name, step)] = {} 1479 if allow_error and (machine_name, step, command, timeout) not in self._allow_errors_in_tests: 1480 self._allow_errors_in_tests[(machine_name, step, command, timeout)] = True 1481 if (command, timeout) not in self._tests[(machine_name, step)]: 1482 self._tests[(machine_name, step)][(command, timeout)] = (default_value, default_code) 1483 return default_value, default_code 1484 return self._tests[(machine_name, step)][(command, timeout)] 1485 1486 @staticmethod 1487 def _apply_section(section: str, title) -> TranslatedText: 1488 """Prepend *section* to every language value in *title*.""" 1489 tt = TranslatedText.from_value(title) 1490 if not section: 1491 return tt 1492 return TranslatedText({lang: section + text for lang, text in tt.items()}) 1493 1494 def question_text(self, title, section='', description='', hash=None, order=None, default_answer='', 1495 cheat_answers=None): 1496 """Register a free-text question and return the student's current answer string. 1497 1498 Returns *default_answer* if the student has not answered yet. 1499 *cheat_answers* maps state names to answer strings used in automated testing. 1500 """ 1501 title = self._apply_section(section, title) 1502 if order is None: 1503 order1 = self._questions_current_order 1504 self._questions_current_order = ((self._questions_current_order // 100) + 1) * 100 1505 else: 1506 order1 = order 1507 1508 q = QuestionText(title, description, hash, order1) 1509 if order1 not in self._questions_order: 1510 self._questions_order[order1] = [] 1511 self._questions_order[order1].append(q) 1512 1513 if q.question_hash in self._questions: 1514 write_error(f"Duplicate question hash {q.question_hash}") 1515 self._questions[q.question_hash] = q 1516 1517 if cheat_answers is not None: 1518 for state, answer in cheat_answers.items(): 1519 if state not in self._cheat_answers: 1520 self._cheat_answers[state] = {} 1521 self._cheat_answers[state][q.question_hash] = answer 1522 1523 if q.question_hash in self._answers: 1524 return self._answers[q.question_hash] 1525 return default_answer 1526 1527 _FORM_FIELD_RE = _re.compile(r'@@\{([^:}]+):([^}]*)\}@@') 1528 1529 def question_form(self, title, section='', description='', hash=None, order=None, cheat_answers=None): 1530 """Register a form question with inline @@{field_name:regex}@@ fields. 1531 1532 Returns the student's answers as a dict {field_name: value}, or {} if none yet. 1533 cheat_answers format: {state_name: {field_name: value, ...}} 1534 """ 1535 title = self._apply_section(section, title) 1536 from .common import QuestionForm 1537 1538 # The @@{field:regex}@@ markers are language-independent, so when the 1539 # description is a TranslatedText (wrapped in tr()) extract fields from 1540 # its resolved string. The full TranslatedText is still stored on the 1541 # question for per-language rendering. 1542 desc_str = (description.resolve('') 1543 if isinstance(description, TranslatedText) else description) 1544 1545 fields = [] 1546 for m in self._FORM_FIELD_RE.finditer(desc_str): 1547 name, spec = m.group(1), m.group(2) 1548 if spec.startswith('>') or '>>>' in spec: 1549 raw = spec[1:] if spec.startswith('>') else spec 1550 fields.append({"name": name, "choices": [ 1551 c.strip().split('>>>')[1].strip() if '>>>' in c.strip() else c.strip() 1552 for c in raw.split('|') 1553 ]}) 1554 elif spec.startswith('?'): 1555 default = spec[1:].strip().lower() not in ('', 'false') 1556 fields.append({"name": name, "checkbox": default}) 1557 else: 1558 fields.append({"name": name, "regex": spec}) 1559 1560 if order is None: 1561 order1 = self._questions_current_order 1562 self._questions_current_order = ((self._questions_current_order // 100) + 1) * 100 1563 else: 1564 order1 = order 1565 1566 q = QuestionForm(title, description, hash, order1, fields) 1567 if order1 not in self._questions_order: 1568 self._questions_order[order1] = [] 1569 self._questions_order[order1].append(q) 1570 1571 if q.question_hash in self._questions: 1572 write_error(f"Duplicate question hash {q.question_hash}") 1573 self._questions[q.question_hash] = q 1574 1575 if cheat_answers is not None: 1576 for state, field_answers in cheat_answers.items(): 1577 if state not in self._cheat_answers: 1578 self._cheat_answers[state] = {} 1579 self._cheat_answers[state][q.question_hash] = json.dumps( 1580 field_answers, ensure_ascii=False 1581 ) 1582 1583 if q.question_hash in self._answers: 1584 try: 1585 return json.loads(self._answers[q.question_hash]) 1586 except (json.JSONDecodeError, TypeError): 1587 return {} 1588 return {} 1589 1590 def question_dummy(self, title, section='', description='', hash=None, order=None): 1591 """Register a display-only block (no answer widget shown to the student).""" 1592 title = self._apply_section(section, title) 1593 if order is None: 1594 order1 = self._questions_current_order 1595 self._questions_current_order = ((self._questions_current_order // 100) + 1) * 100 1596 else: 1597 order1 = order 1598 1599 q = QuestionDummy(title, description, hash, order1) 1600 if order1 not in self._questions_order: 1601 self._questions_order[order1] = [] 1602 self._questions_order[order1].append(q) 1603 1604 if q.question_hash in self._questions: 1605 write_error(f"Duplicate question hash {q.question_hash}") 1606 self._questions[q.question_hash] = q 1607 1608 def add_grade_part(self, title, description=''): 1609 """Register a new :class:`GradePart` group and return it. 1610 1611 Pass the returned object to ``add_grade_element(..., grade_part=...)`` 1612 to associate elements with this part. Parts are displayed in 1613 registration order (with a subtotal row per part) in the GUI 1614 evaluation view and in ``sre outline`` PDFs. 1615 """ 1616 if description: 1617 description = TranslatedText.from_value(description, self._default_language) 1618 gp = GradePart(title=title, description=description) 1619 if any(p.title == title for p in self._grade_parts): 1620 write_error(f"Duplicate grade part title = {title}") 1621 self._grade_parts.append(gp) 1622 return gp 1623 1624 def add_grade_element(self, title, max_grade, description='', grade=0, scope=params.BOTH_EVAL_SCOPE, 1625 grade_part=None): 1626 """Add a graded rubric item. Initial *grade* defaults to 0; use :meth:`set_grade` to update it. 1627 1628 ``scope`` is a bitmask: ``SELF_EVAL_SCOPE`` (1) for self-eval only, 1629 ``EXO_EVAL_SCOPE`` (2) for non-auto eval / outline / sheet only, 1630 ``BOTH_EVAL_SCOPE`` (3, default) for both audiences. 1631 1632 ``grade_part`` optionally associates this element with a 1633 :class:`GradePart` previously returned by :meth:`add_grade_part`. 1634 """ 1635 if scope not in params.grade_scopes: 1636 raise ValueError(f"Invalid scope {scope!r}; expected one of {params.grade_scopes}") 1637 if description: 1638 description = TranslatedText.from_value(description, self._default_language) 1639 grade_part_title = None 1640 if grade_part is not None: 1641 if not isinstance(grade_part, GradePart): 1642 raise TypeError(f"grade_part must be a GradePart, got {type(grade_part).__name__}") 1643 if grade_part not in self._grade_parts: 1644 write_error(f"Unregistered grade part {grade_part.title!r} passed to add_grade_element") 1645 grade_part_title = grade_part.title 1646 g = GradeElement(title=title, max_grade=max_grade, description=description, grade=grade, scope=scope, 1647 grade_part=grade_part_title) 1648 self._grade_list.append(g) 1649 key = _tt_hash_str(title) 1650 if key in self._grades: 1651 write_error(f"Duplicate grade title = {title}") 1652 self._grades[key] = g 1653 1654 def set_grade(self, title, grade): 1655 """Set the numeric *grade* for the element previously registered under *title*.""" 1656 self._grades[_tt_hash_str(title)].grade = grade 1657 1658 def save_lab_info(self): 1659 debug_project = os.path.exists( 1660 params.debug_project_marker_filename(self.net_scheme.running_lab_name) 1661 ) 1662 if debug_project: 1663 visible_machines = list(self.net_scheme.get_machines()) 1664 else: 1665 visible_machines = [m for m in self.net_scheme.get_machines() if not m.hidden] 1666 lab_hash = self.net_scheme.get_lab_hash() 1667 1668 def _get_stats(m): 1669 s = next(Kathara.get_instance().get_machine_stats(lab_hash=lab_hash, machine_name=m.name), None) 1670 return m, s 1671 1672 stats_map = {} 1673 with ThreadPoolExecutor(max_workers=min(params.max_docker_concurrency, len(visible_machines))) as executor: 1674 for m, s in executor.map(_get_stats, visible_machines): 1675 stats_map[m.name] = (m, s) 1676 1677 info_machines = [] 1678 for m in visible_machines: 1679 m, s = stats_map[m.name] 1680 interfaces = [] 1681 for net, netAdapter in m.net_adapters.items(): 1682 interfaces.append(InfoInterface(network=net.name, interface_name=f"eth{netAdapter.interface}")) 1683 info_machines.append( 1684 InfoMachine(name=m.name, status=s.status if s is not None else "", hidden=m.hidden, 1685 bridged=m.bridged, 1686 x11_host=m.x11_host, 1687 ports=m.ports, 1688 allow_connection=m.allow_connection, 1689 color=m.color or "", 1690 shape=m.shape or "", 1691 interfaces=interfaces)) 1692 1693 network_colors = { 1694 net.name: net.color 1695 for m in visible_machines 1696 for net in m.net_adapters 1697 if net.color 1698 } 1699 network_shapes = { 1700 net.name: net.shape 1701 for m in visible_machines 1702 for net in m.net_adapters 1703 if net.shape 1704 } 1705 1706 module_rvlab = sys.modules[params.srelab_py_name.removesuffix(".py")] 1707 default_language = getattr(module_rvlab, 'default_language', 'en') 1708 self._default_language = default_language 1709 show_nat_network = getattr(module_rvlab, 'show_nat_network', params.default_show_nat_network) 1710 nat_network_name = getattr(module_rvlab, 'host_network_name', params.default_host_network_name) 1711 nat_network_color = getattr(module_rvlab, 'host_network_color', params.default_host_network_color) 1712 host_network_exploded = getattr(module_rvlab, 'host_network_exploded', 1713 params.default_host_network_exploded) 1714 host_network_edge_relative_length = float(getattr( 1715 module_rvlab, 'host_network_edge_relative_length', 1716 params.default_host_network_edge_relative_length)) 1717 schema_splines = getattr(module_rvlab, 'schema_splines', params.graphviz_default_splines) 1718 schema_overlap = getattr(module_rvlab, 'schema_overlap', params.graphviz_default_overlap) 1719 1720 if self.step == 0: 1721 self.grade() 1722 self.step += 1 1723 1724 questions = self.get_questions_ordered() 1725 1726 if hasattr(module_rvlab, 'title'): 1727 title = module_rvlab.title 1728 else: 1729 lab_name = params.get_lab_name_from_running_lab_name(self.get_running_lab_name()) 1730 title = lab_name.removesuffix('.py') 1731 title = TranslatedText.from_value(title, default_language) 1732 1733 if hasattr(module_rvlab, 'delay_between_self_grade'): 1734 delay_between_self_grade = module_rvlab.delay_between_self_grade 1735 else: 1736 delay_between_self_grade = 0 1737 1738 if hasattr(module_rvlab, 'allow_self_grade'): 1739 allow_self_grade = module_rvlab.allow_self_grade 1740 else: 1741 allow_self_grade = False 1742 1743 if debug_project: 1744 delay_between_self_grade = 0 1745 allow_self_grade = True 1746 if hasattr(module_rvlab, 'export_kathara_project'): 1747 export_kathara_project = module_rvlab.export_kathara_project 1748 else: 1749 export_kathara_project = False 1750 1751 eval_interval_without_exam_mode = getattr(module_rvlab, 'eval_interval_without_exam_mode', params.default_eval_interval_without_exam_mode) 1752 eval_before_exit = getattr(module_rvlab, 'eval_before_exit', False) 1753 1754 informations = TranslatedText.from_value(self.net_scheme.informations, default_language) 1755 for q in questions: 1756 q.title = TranslatedText.from_value(q.title, default_language) 1757 q.description = TranslatedText.from_value(q.description, default_language) 1758 1759 net_scheme_cls = type(self.net_scheme) 1760 if debug_project: 1761 user_allowed_states_raw = {} 1762 for state in net_scheme_cls.get_state_methods(): 1763 desc = '' 1764 for klass in net_scheme_cls.__mro__: 1765 fn = klass.__dict__.get(state) 1766 if fn is not None and getattr(fn, '_is_sre_state', False): 1767 desc = getattr(fn, '_sre_state_description', '') 1768 break 1769 user_allowed_states_raw[state] = desc 1770 else: 1771 user_allowed_states_raw = ( 1772 net_scheme_cls.get_user_allowed_states() 1773 if getattr(module_rvlab, 'allow_user_states', False) else {} 1774 ) 1775 user_allowed_states = { 1776 state: TranslatedText.from_value(desc, default_language) if desc else desc 1777 for state, desc in user_allowed_states_raw.items() 1778 } 1779 1780 if debug_project: 1781 module_allows_user_states = getattr(module_rvlab, 'allow_user_states', False) 1782 admin_only_states = [ 1783 state for state in user_allowed_states_raw 1784 if not module_allows_user_states 1785 or not net_scheme_cls.is_state_user_allowed(state) 1786 ] 1787 else: 1788 admin_only_states = [] 1789 1790 info = InfoLab(lab_name=self.net_scheme.lab_name, lab_hash=self.net_scheme.lab_hash, 1791 title=title, machines=info_machines, delay_between_self_grade=delay_between_self_grade, 1792 questions=questions, informations=informations, 1793 export_kathara_project=export_kathara_project, allow_self_grade=allow_self_grade, 1794 debug_project=debug_project, 1795 eval_interval_without_exam_mode=eval_interval_without_exam_mode, 1796 eval_before_exit=eval_before_exit, 1797 default_language=default_language, 1798 user_allowed_states=user_allowed_states, 1799 admin_only_states=admin_only_states, 1800 network_colors=network_colors, 1801 network_shapes=network_shapes, 1802 show_nat_network=show_nat_network, 1803 nat_network_name=nat_network_name, 1804 nat_network_color=nat_network_color, 1805 host_network_exploded=host_network_exploded, 1806 host_network_edge_relative_length=host_network_edge_relative_length, 1807 schema_splines=schema_splines, 1808 schema_overlap=schema_overlap) 1809 1810 info_json = info.to_json() 1811 info_filename = params.info_filename(self.net_scheme.running_lab_name) 1812 1813 save_info_json = None 1814 try: 1815 with open(info_filename, "r") as f: 1816 save_info_json = f.read() 1817 except FileNotFoundError: 1818 pass 1819 if save_info_json == info_json: 1820 return 1821 1822 temp_file = tempfile.NamedTemporaryFile( 1823 delete=False, 1824 dir=Path(self.net_scheme.get_public_lab_dir()).parent) 1825 with open(temp_file.name, "w") as f: 1826 print(info_json, file=f) 1827 f.flush() 1828 os.fsync(f.fileno()) 1829 os.chmod(temp_file.name, 0o644) 1830 os.replace(temp_file.name, info_filename) 1831 1832 def add_error(self, error, category=ErrorCategory.ERROR, step: int = 1): 1833 if self.step != step: 1834 return 1835 log_error(error) 1836 self._errors.append((category.value, error)) 1837 1838 def add_warning(self, warning, step: int = 1): 1839 self.add_error(warning, category=ErrorCategory.WARNING, step=step) 1840 1841 @staticmethod 1842 def run_tests_on_machine(machine_name, machine, exetests): 1843 environment = {params.exetests_env_name: exetests} 1844 code, output = machine.api_object.exec_run([params.exetests_machines_path], 1845 stdin=False, 1846 stdout=True, 1847 stderr=False, 1848 tty=False, 1849 environment=environment, 1850 ) 1851 return machine_name, code, output 1852 1853 def run_tests(self): 1854 self._tests = {} 1855 self._section_counter = [] 1856 lab = self.net_scheme.get_lab_from_kathara() 1857 self._errors = [] 1858 self.load_answers() 1859 self._eval_date = datetime.datetime.now().isoformat() 1860 1861 while self.step <= self.max_step: 1862 self.reset_before_grade() 1863 self.grade() 1864 self.step += 1 1865 tests = self.get_tests() 1866 exetests_by_machine = self.get_exetests_strings(self.step) 1867 if SRE.args.debug and self.step <= self.max_step: 1868 log_debug(f"Commands step {self.step} (after running grade on step {self.step - 1}):") 1869 for machine, cmds in exetests_by_machine.items(): 1870 c = cmds.split(params.exetests_separator) 1871 c1 = " - ".join(c) 1872 log_debug(f"{machine}: {c1}") 1873 1874 results = {} 1875 with ThreadPoolExecutor( 1876 max_workers=max(1, min(params.max_docker_concurrency, len(lab.machines)))) as executor: 1877 futures_to_machines = { 1878 executor.submit( 1879 Grade0.run_tests_on_machine, 1880 machine_name, 1881 machine, 1882 exetests_by_machine[machine_name], 1883 ): machine_name 1884 for machine_name, machine in lab.machines.items() 1885 if machine_name in exetests_by_machine and len(exetests_by_machine[machine_name]) > 0 1886 } 1887 for future in as_completed(futures_to_machines): 1888 machine_name = futures_to_machines[future] 1889 try: 1890 machine_name, code, output = future.result() 1891 results[machine_name] = (code, output) 1892 except Exception as e: 1893 self.add_error(f"error during test execution on machine {machine_name}: {e}", step=self.step) 1894 continue 1895 1896 for machine_name, (exetests_code, output) in results.items(): 1897 if (machine_name, self.step) not in self._tests: 1898 self._tests[(machine_name, self.step)] = {} 1899 if exetests_code != 0: 1900 self.add_error( 1901 f"exetests error on {machine_name}: {exetests_by_machine[machine_name]} -- return code {exetests_code}", 1902 step=self.step) 1903 output1 = output.decode("utf-8") 1904 separator, _, rest = output1.partition("\n") 1905 output2 = rest.split(f"\n{separator}\n") 1906 for i in range(0, len(output2), 2): 1907 ligne1 = "" 1908 try: 1909 ligne1, date1, result = output2[i].split("\n", 2) 1910 except ValueError: 1911 result = "" 1912 if not ligne1: 1913 continue 1914 timeout_s, cmd = ligne1.split(":", 1) 1915 timeout = int(timeout_s) 1916 date2, code_s = output2[i + 1].split("\n", 1) 1917 try: 1918 code = int(code_s.strip()) 1919 except ValueError: 1920 code = -2 1921 self.add_error(f"test error on {machine_name}:{self.step}:{cmd} illegal error code", 1922 step=self.step) 1923 if code != 0: 1924 if not self._allow_errors_in_tests.get((machine_name, self.step, cmd, timeout), False): 1925 self.add_error(f"test error on {machine_name}:{cmd} code={code}", step=self.step) 1926 self._tests[(machine_name, self.step)][cmd, timeout] = (result, code) 1927 if SRE.args.debug: 1928 for machine_name in lab.machines: 1929 if (machine_name, self.step) not in self._tests: 1930 continue 1931 for (cmd, timeout) in self._tests[(machine_name, self.step)]: 1932 log_debug(f"machine {machine_name} - step {self.step} - command {cmd} - timeout {timeout}:") 1933 result, code = self._tests[(machine_name, self.step)][cmd, timeout] 1934 log_debug(result) 1935 log_debug(f"-------- exit code {code}\n") 1936 1937 host_step_cmds = self._host_tests.get(self.step, {}) 1938 if host_step_cmds: 1939 def _run_host_cmd(cmd, t): 1940 from .utils_privileges import preexec_drop_to_sre 1941 run_cmd = shlex.split(cmd) if params.execute_commands_on_host == "split" else cmd 1942 use_shell = params.execute_commands_on_host == "shell" 1943 try: 1944 proc = subprocess.run( 1945 run_cmd, shell=use_shell, capture_output=True, text=True, 1946 timeout=t if t > 0 else None, 1947 preexec_fn=preexec_drop_to_sre, 1948 ) 1949 return cmd, t, proc.stdout, proc.returncode 1950 except subprocess.TimeoutExpired: 1951 return cmd, t, '', -1 1952 except Exception: 1953 return cmd, t, '', -2 1954 1955 with ThreadPoolExecutor(max_workers=min(params.max_docker_concurrency, 1956 len(host_step_cmds))) as executor: 1957 futures = {executor.submit(_run_host_cmd, cmd, t): (cmd, t) 1958 for (cmd, t) in host_step_cmds} 1959 for future in as_completed(futures): 1960 cmd, t, result, code = future.result() 1961 if code != 0: 1962 if not self._allow_errors_in_host_tests.get((self.step, cmd, t), False): 1963 self.add_error(f"host test error: {cmd} code={code}", step=self.step) 1964 self._host_tests[self.step][(cmd, t)] = (result, code) 1965 if SRE.args.debug: 1966 log_debug(f"host - step {self.step} - command {cmd} - timeout {t}:") 1967 log_debug(result) 1968 log_debug(f"-------- exit code {code}\n") 1969 1970 if SRE.args.debug and len(self._errors) > 0: 1971 log_debug(f"{len(self._errors)} errors:\n " + "\n".join(f"[{e[0]}] {e[1]}" for e in self._errors)) 1972 self.compute_total() 1973 self._mark_self_eval = self.mark_self_eval() 1974 self._mark_exo_eval = self.mark_exo_eval() 1975 # log_error(f"DEBUG _tests at save: {len(self._tests)} machine-step entries") 1976 # for (machine, step), cmds in self._tests.items(): 1977 # for (cmd, timeout), (result, code) in cmds.items(): 1978 # log_error(f" [{machine}][step={step}] cmd={cmd!r} result_len={len(result)} code={code}") 1979 1980 def compute_total(self): 1981 """Accumulate per-scope totals in one pass; BOTH-scope elements contribute to both.""" 1982 self._total_grade_self_eval = 0 1983 self._total_max_self_eval = 0 1984 self._total_grade_exo_eval = 0 1985 self._total_max_exo_eval = 0 1986 for g in self._grade_list: 1987 if g.scope & params.SELF_EVAL_SCOPE: 1988 self._total_grade_self_eval += g.grade 1989 self._total_max_self_eval += g.max_grade 1990 if g.scope & params.EXO_EVAL_SCOPE: 1991 self._total_grade_exo_eval += g.grade 1992 self._total_max_exo_eval += g.max_grade 1993 1994 def save_tests(self): 1995 now = datetime.datetime.now() 1996 if self._exam_json is None: 1997 exam_path = Path(params.sre_pub_dir) / params.exam_json_name 1998 try: 1999 self._exam_json = json.loads(exam_path.read_text()) 2000 except FileNotFoundError: 2001 pass 2002 except Exception as e: 2003 log_error(f"can't read {exam_path}: {e}") 2004 for d1 in self.archive_dirs: 2005 d = Path(d1).expanduser().resolve() 2006 try: 2007 if not d.exists(): 2008 os.mkdir(d, 0o700) 2009 else: 2010 os.listdir(d) # trigger mount / catch stale handle early 2011 permissions = os.stat(d).st_mode 2012 if permissions != 0o700: 2013 os.chmod(d, 0o700) 2014 filename = d / params.get_archive_name(self.net_scheme.running_lab_name, now) 2015 self.save_tests_on_file(str(filename)) 2016 except Exception as e: 2017 log_error(f"can't save archive to {d}: {e}") 2018 2019 def save_tests_on_file(self, filename: str): 2020 files_content = {} 2021 if self.files_to_save_in_archives: 2022 fdir = Path(params.files_dir(self.get_running_lab_name())) 2023 if fdir.is_dir(): 2024 for pattern in self.files_to_save_in_archives: 2025 for fpath in fdir.iterdir(): 2026 if fpath.is_file() and fnmatch.fnmatch(fpath.name, pattern): 2027 try: 2028 files_content[fpath.name] = fpath.read_bytes() 2029 except Exception: 2030 pass 2031 archive = { 2032 params.running_lab_name_keyword: self.get_running_lab_name(), 2033 params.eval_date_keyword: self._eval_date, 2034 params.re_eval_date_keyword: self._re_eval_date, 2035 'data_json': self.get_data().to_json(), 2036 'tests': self.get_tests(), 2037 'errors': self.get_errors(), 2038 'answers': self.get_answers(), 2039 'grade_list': [asdict(e) for e in self._grade_list], 2040 'grade_parts': [asdict(p) for p in self._grade_parts], 2041 'total_grade_self_eval': self._total_grade_self_eval, 2042 'total_max_self_eval': self._total_max_self_eval, 2043 'mark_self_eval': self._mark_self_eval, 2044 'total_grade_exo_eval': self._total_grade_exo_eval, 2045 'total_max_exo_eval': self._total_max_exo_eval, 2046 'mark_exo_eval': self._mark_exo_eval, 2047 'maximum_mark': self._maximum_mark, 2048 'files': files_content, 2049 } 2050 if self._exam_json is not None: 2051 archive[params.exam_json_keyword] = self._exam_json 2052 cctx = zstd.ZstdCompressor(level=6) 2053 with tempfile.NamedTemporaryFile( 2054 mode="wb", 2055 dir=os.path.dirname(filename), 2056 delete=False 2057 ) as f: 2058 with cctx.stream_writer(f) as compressor: 2059 packed = msgpack.packb(archive, use_bin_type=True) 2060 compressor.write(packed) 2061 # f.flush() 2062 # os.fsync(f.fileno()) 2063 temp_name = f.name 2064 os.replace(temp_name, filename)
70def tr(text: str, **langs) -> TranslatedText: 71 """Build a TranslatedText with 'en' as the default language key. 72 73 Looks up the caller module's ``_TRANSLATIONS`` dict for any language not 74 already supplied as a keyword argument. Inline kwargs take priority. 75 ``None`` values in ``_TRANSLATIONS`` are skipped (placeholder for 76 untranslated strings). 77 78 ``_TRANSLATIONS`` must be defined **before** the first ``tr()`` call in 79 the file, because module-level expressions are evaluated at import time. 80 81 Usage in lab files:: 82 83 title = tr("My lab") # translation from _TRANSLATIONS 84 title = tr("My lab", fr="Mon TP") # inline, retro-compatible 85 86 For labs whose primary language is not English, use :func:`make_tr` instead. 87 """ 88 merged = _lookup_translations(sys._getframe(1).f_globals, text, langs) 89 return TranslatedText({'en': text, **merged})
Build a TranslatedText with 'en' as the default language key.
Looks up the caller module's _TRANSLATIONS dict for any language not
already supplied as a keyword argument. Inline kwargs take priority.
None values in _TRANSLATIONS are skipped (placeholder for
untranslated strings).
_TRANSLATIONS must be defined before the first tr() call in
the file, because module-level expressions are evaluated at import time.
Usage in lab files::
title = tr("My lab") # translation from _TRANSLATIONS
title = tr("My lab", fr="Mon TP") # inline, retro-compatible
For labs whose primary language is not English, use make_tr() instead.
92def make_tr(default_lang: str, translations: dict | None = None): 93 """Return a tr() function bound to *default_lang* as the first-positional language. 94 95 If *translations* is supplied explicitly it is used directly (no frame 96 inspection). Otherwise the caller module's globals are inspected for a 97 ``_TRANSLATIONS`` dict at each call. 98 99 In both cases ``_TRANSLATIONS`` (or the dict passed as *translations*) must 100 be defined **before** the first ``tr()`` call in the file, because 101 module-level expressions are evaluated at import time. 102 103 Usage with explicit dict (no frame inspection):: 104 105 _TRANSLATIONS = {'fr': {"Mon TP": "My lab"}} 106 tr = make_tr('fr', translations=_TRANSLATIONS) 107 title = tr("Mon TP") # 'en' from _TRANSLATIONS 108 109 Usage with frame inspection:: 110 111 tr = make_tr('fr') 112 _TRANSLATIONS = {'en': {"Mon TP": "My lab"}} # must come before tr() calls 113 title = tr("Mon TP") 114 """ 115 if translations is not None: 116 _wrapped = {'_TRANSLATIONS': translations} 117 118 def _tr(text: str, **langs) -> TranslatedText: 119 merged = _lookup_translations(_wrapped, text, langs) 120 return TranslatedText({default_lang: text, **merged}) 121 else: 122 caller_globals = sys._getframe(1).f_globals 123 124 def _tr(text: str, **langs) -> TranslatedText: 125 merged = _lookup_translations(caller_globals, text, langs) 126 return TranslatedText({default_lang: text, **merged}) 127 128 return _tr
Return a tr() function bound to default_lang as the first-positional language.
If translations is supplied explicitly it is used directly (no frame
inspection). Otherwise the caller module's globals are inspected for a
_TRANSLATIONS dict at each call.
In both cases _TRANSLATIONS (or the dict passed as translations) must
be defined before the first tr() call in the file, because
module-level expressions are evaluated at import time.
Usage with explicit dict (no frame inspection)::
_TRANSLATIONS = {'fr': {"Mon TP": "My lab"}}
tr = make_tr('fr', translations=_TRANSLATIONS)
title = tr("Mon TP") # 'en' from _TRANSLATIONS
Usage with frame inspection::
tr = make_tr('fr')
_TRANSLATIONS = {'en': {"Mon TP": "My lab"}} # must come before tr() calls
title = tr("Mon TP")
131def no_tr(text: str) -> str: 132 """Marker for prepare-sre-translations: leave this string untranslated. 133 134 Identity passthrough at runtime (returns *text* unchanged). It is purely a 135 hint to the translation tooling so the wrapped string is never wrapped in 136 tr() nor registered in _TRANSLATIONS — use it for short internal labels 137 (e.g. grade-element titles) that are not natural-language prose. 138 """ 139 return text
Marker for prepare-sre-translations: leave this string untranslated.
Identity passthrough at runtime (returns text unchanged). It is purely a hint to the translation tooling so the wrapped string is never wrapped in tr() nor registered in _TRANSLATIONS — use it for short internal labels (e.g. grade-element titles) that are not natural-language prose.
314@dataclass 315class Flavor0: 316 """Base class for optional lab-parameterisation dataclasses. 317 318 Subclass with ``@dataclass(slots=True)`` and declare your fields. If the 319 module defines ``flavor_form_at_startup = True``, the GUI renders the 320 ``flavor_form`` string (containing ``@@{field:regex}@@`` markers) as a form 321 before starting the lab. 322 323 Named presets can be attached as class attributes, e.g.:: 324 325 Flavor.easy = Flavor(nb=1) 326 327 and referenced on the CLI with ``--set-flavor-name easy``. 328 329 Override :meth:`allowed_by_user` to restrict which values students may choose. 330 """ 331 332 _registry = {} 333 334 def __init_subclass__(cls, **kwargs): 335 super().__init_subclass__(**kwargs) 336 key = f"{cls.__module__}.{cls.__qualname__}" 337 cls._type_key = key 338 Flavor0._registry[key] = cls 339 340 def to_dict(self): 341 """Serialise dataclass fields to a plain dict using :meth:`Data0._encode_value`.""" 342 return {f.name: Data0._encode_value(getattr(self, f.name)) for f in fields(self)} 343 344 @classmethod 345 def from_dict(cls, d): 346 """Reconstruct a ``Flavor0`` instance from a dict produced by :meth:`to_dict`.""" 347 decoded = {k: Data0._decode_value(v) for k, v in d.items()} 348 return cls(**decoded) # type: ignore[call-arg] 349 350 @classmethod 351 def from_form_dict(cls, d: dict): 352 """Build a Flavor from form field values (strings from text inputs, bools from 353 submit buttons). Declared dataclass fields are coerced to their declared type. 354 Extra keys in d (form fields not declared in the dataclass) are set as plain 355 attributes on the instance so that allowed_by_user() can read them. 356 """ 357 from typing import get_type_hints 358 hints = get_type_hints(cls) 359 declared = {f.name for f in fields(cls)} 360 coerced = {} 361 for f in fields(cls): 362 v = d.get(f.name) 363 if v is None: 364 continue 365 t = hints.get(f.name, str) 366 if not isinstance(t, type) or isinstance(v, t): 367 coerced[f.name] = v 368 elif t == bool: 369 coerced[f.name] = str(v).lower() in ('true', '1', 'yes') 370 elif t == int: 371 coerced[f.name] = int(v) 372 elif t == float: 373 coerced[f.name] = float(v) 374 else: 375 coerced[f.name] = str(v) 376 obj = cls(**coerced) 377 for k, v in d.items(): 378 if k not in declared: 379 setattr(obj, k, v) 380 return obj 381 382 def allowed_by_user(self) -> Tuple[bool, str]: 383 """Return ``(True, "")`` if the student may use this flavor; ``(False, reason)`` otherwise. 384 385 Override in subclasses to restrict which values students can choose. 386 """ 387 return True, ""
Base class for optional lab-parameterisation dataclasses.
Subclass with @dataclass(slots=True) and declare your fields. If the
module defines flavor_form_at_startup = True, the GUI renders the
flavor_form string (containing @@{field:regex}@@ markers) as a form
before starting the lab.
Named presets can be attached as class attributes, e.g.::
Flavor.easy = Flavor(nb=1)
and referenced on the CLI with --set-flavor-name easy.
Override allowed_by_user() to restrict which values students may choose.
340 def to_dict(self): 341 """Serialise dataclass fields to a plain dict using :meth:`Data0._encode_value`.""" 342 return {f.name: Data0._encode_value(getattr(self, f.name)) for f in fields(self)}
Serialise dataclass fields to a plain dict using Data0._encode_value().
350 @classmethod 351 def from_form_dict(cls, d: dict): 352 """Build a Flavor from form field values (strings from text inputs, bools from 353 submit buttons). Declared dataclass fields are coerced to their declared type. 354 Extra keys in d (form fields not declared in the dataclass) are set as plain 355 attributes on the instance so that allowed_by_user() can read them. 356 """ 357 from typing import get_type_hints 358 hints = get_type_hints(cls) 359 declared = {f.name for f in fields(cls)} 360 coerced = {} 361 for f in fields(cls): 362 v = d.get(f.name) 363 if v is None: 364 continue 365 t = hints.get(f.name, str) 366 if not isinstance(t, type) or isinstance(v, t): 367 coerced[f.name] = v 368 elif t == bool: 369 coerced[f.name] = str(v).lower() in ('true', '1', 'yes') 370 elif t == int: 371 coerced[f.name] = int(v) 372 elif t == float: 373 coerced[f.name] = float(v) 374 else: 375 coerced[f.name] = str(v) 376 obj = cls(**coerced) 377 for k, v in d.items(): 378 if k not in declared: 379 setattr(obj, k, v) 380 return obj
Build a Flavor from form field values (strings from text inputs, bools from submit buttons). Declared dataclass fields are coerced to their declared type. Extra keys in d (form fields not declared in the dataclass) are set as plain attributes on the instance so that allowed_by_user() can read them.
382 def allowed_by_user(self) -> Tuple[bool, str]: 383 """Return ``(True, "")`` if the student may use this flavor; ``(False, reason)`` otherwise. 384 385 Override in subclasses to restrict which values students can choose. 386 """ 387 return True, ""
Return (True, "") if the student may use this flavor; (False, reason) otherwise.
Override in subclasses to restrict which values students can choose.
390class Data0: 391 """Base class for lab-specific parameter dataclasses. 392 393 Subclass with ``@dataclass(slots=True)`` and declare your fields normally. 394 Three dynamic containers are injected automatically into every instance by 395 ``__post_init__``: 396 397 * ``self.ips`` — :class:`_IPv4InterfaceContainer`: named ``IPv4Interface`` values 398 * ``self.nets`` — :class:`_IPv4NetContainer`: named ``IPv4Network`` values 399 * ``self.macs`` — :class:`_MacContainer`: named ``EUI`` MAC-address values 400 401 The class-level ``_registry`` maps ``"module.ClassName"`` keys to concrete 402 subclasses so that ``from_dict``/``unpack``/``from_json`` can reconstruct the 403 correct type from serialised data without knowing it in advance. 404 405 Override ``generate(flavor=None)`` as a ``@classmethod`` to produce a fresh 406 randomised instance. The result is serialised to ``data.json`` at lab start 407 and reloaded for each evaluation. 408 """ 409 410 _registry = {} 411 _rng = random.Random() 412 413 def __init_subclass__(cls, **kwargs): 414 super().__init_subclass__(**kwargs) 415 key = f"{cls.__module__}.{cls.__qualname__}" 416 cls._type_key = key 417 Data0._registry[key] = cls 418 419 def __post_init__(self): 420 """Inject ``ips``, ``nets``, ``macs``, and ``flavor`` into every instance. 421 422 Called automatically by the dataclass ``__init__``. Uses 423 ``object.__setattr__`` so the injection works even when the subclass 424 declares ``slots=True``. 425 """ 426 # Inject ips/nets/flavor into __dict__ (available even with slots=True subclasses 427 # because Data0 itself has no __slots__, so __dict__ is always inherited). 428 object.__setattr__(self, 'ips', _IPv4InterfaceContainer()) 429 object.__setattr__(self, 'nets', _IPv4NetContainer()) 430 object.__setattr__(self, 'macs', _MacContainer()) 431 object.__setattr__(self, 'flavor', None) 432 object.__setattr__(self, '__flavor_name', None) 433 object.__setattr__(self, '__current_srelab_file', None) 434 435 # ---------------- lifecycle hooks ---------------- 436 @classmethod 437 def compute_pre_generate(cls, flavor=None): 438 """Hook called just before generate() and after JSON/msgpack deserialization. 439 440 Override in a subclass to set class-level derived state from *flavor*. 441 The default implementation is a no-op (fully retro-compatible). 442 """ 443 pass 444 445 def compute_post_generate(self): 446 """Hook called after generate() and after JSON/msgpack deserialization. 447 448 Override in a subclass to set instance-level derived state from data fields. 449 The default implementation is a no-op (fully retro-compatible). 450 """ 451 pass 452 453 # ---------------- dict conversion ---------------- 454 def to_dict(self): 455 """Serialise to a plain dict (JSON-safe). 456 457 Dataclass fields are encoded via :meth:`_encode_value`. ``ips``, ``nets``, 458 and ``macs`` are stored as nested dicts of strings. An attached ``Flavor`` 459 is stored under the ``"flavor"`` key with its type key. 460 """ 461 result = {} 462 for f in fields(self): 463 v = getattr(self, f.name) 464 result[f.name] = self._encode_value(v) 465 result['ips'] = self.ips.to_dict() 466 result['nets'] = self.nets.to_dict() 467 result['macs'] = self.macs.to_dict() 468 if self.flavor is not None: 469 result['flavor'] = {"__flavor_type__": self.flavor._type_key, "data": self.flavor.to_dict()} 470 flavor_name = getattr(self, '__flavor_name', None) 471 if flavor_name is not None: 472 result['__flavor_name'] = flavor_name 473 current_srelab_file = getattr(self, '__current_srelab_file', None) 474 if current_srelab_file is not None: 475 result['__current_srelab_file'] = current_srelab_file 476 return result 477 478 @classmethod 479 def from_dict(cls, d): 480 """Reconstruct a ``Data0`` instance from a plain dict produced by :meth:`to_dict`. 481 482 The concrete subclass is resolved from ``d["__type__"]`` when *cls* is 483 ``Data0`` itself; otherwise the calling class is used directly. 484 Raises ``ValueError`` for unknown type keys. 485 """ 486 d = dict(d) 487 ips_data = d.pop('ips', {}) 488 nets_data = d.pop('nets', {}) 489 macs_data = d.pop('macs', {}) 490 flavor_data = d.pop('flavor', None) 491 flavor_name = d.pop('__flavor_name', None) 492 current_srelab_file = d.pop('__current_srelab_file', None) 493 decoded = {k: cls._decode_value(v) for k, v in d.items()} 494 obj = cls(**decoded) 495 for k, v in ips_data.items(): 496 setattr(obj.ips, k, IPv4Interface(v)) 497 for k, v in nets_data.items(): 498 setattr(obj.nets, k, IPv4Network(v)) 499 for k, v in macs_data.items(): 500 setattr(obj.macs, k, EUI(v)) 501 if flavor_data is not None: 502 flavor_key = flavor_data.get("__flavor_type__") 503 if flavor_key not in Flavor0._registry: 504 raise ValueError(f"unknown Flavor0 type: {flavor_key!r}") 505 flavor_cls = Flavor0._registry[flavor_key] 506 object.__setattr__(obj, 'flavor', flavor_cls.from_dict(flavor_data["data"])) 507 object.__setattr__(obj, '__flavor_name', flavor_name) 508 object.__setattr__(obj, '__current_srelab_file', current_srelab_file) 509 return obj 510 511 # ---------------- value encoding ---------------- 512 @staticmethod 513 def _encode_value(v): 514 """Encode a field value for JSON/msgpack storage. 515 516 * ``IPv4Address`` / ``IPv6Address`` → ``{"__ip__": "..."}`` 517 * ``IPv4Network`` / ``IPv6Network`` → ``{"__net__": "..."}`` 518 * nested ``Data0`` → ``{"__type__": "...", "data": {...}}`` 519 * all other values are returned unchanged. 520 """ 521 if isinstance(v, (IPv4Address, IPv6Address)): 522 return {"__ip__": str(v)} 523 if isinstance(v, (IPv4Network, IPv6Network)): 524 return {"__net__": str(v)} 525 526 if isinstance(v, Data0): 527 return { 528 "__type__": v._type_key, 529 "data": v.to_dict() 530 } 531 return v 532 533 @staticmethod 534 def _decode_value(v): 535 """Decode a value produced by :meth:`_encode_value` back to its Python type.""" 536 if isinstance(v, dict): 537 if "__ip__" in v: 538 return ip_address(v["__ip__"]) 539 if "__net__" in v: 540 return ip_network(v["__net__"]) 541 if "__type__" in v: 542 type_key = v["__type__"] 543 if type_key not in Data0._registry: 544 raise ValueError(f"unknown Data0 type: {type_key!r}") 545 return Data0._registry[type_key].from_dict(v["data"]) 546 return v 547 548 # ---------------- msgpack ---------------- 549 def pack(self): 550 """Serialise to a msgpack binary blob (includes type key for polymorphic reload).""" 551 return msgpack.packb( 552 { 553 "__type__": self._type_key, 554 "data": self.to_dict() 555 }, 556 use_bin_type=True 557 ) 558 559 @classmethod 560 def unpack(cls, blob): 561 """Deserialise a msgpack blob produced by :meth:`pack`. Raises ``ValueError`` for unknown types.""" 562 obj = msgpack.unpackb(blob, raw=False) 563 type_key = obj.get("__type__") 564 if type_key not in Data0._registry: 565 raise ValueError(f"unknown Data0 type: {type_key!r}") 566 result = Data0._registry[type_key].from_dict(obj["data"]) 567 type(result).compute_pre_generate(result.flavor) 568 result.compute_post_generate() 569 return result 570 571 # ---------------- JSON ---------------- 572 def to_json(self): 573 """Serialise to a JSON string (includes type key for polymorphic reload).""" 574 return json.dumps({ 575 "__type__": self._type_key, 576 "data": self.to_dict() 577 }) 578 579 @classmethod 580 def from_json(cls, s): 581 """Deserialise a JSON string produced by :meth:`to_json`.""" 582 obj = json.loads(s) 583 concrete = Data0._registry[obj["__type__"]] 584 result = concrete.from_dict(obj["data"]) 585 type(result).compute_pre_generate(result.flavor) 586 result.compute_post_generate() 587 return result 588 589 @classmethod 590 def load_from_json_file(cls, filename): 591 """ 592 Load a Data0-derived object from a JSON file. 593 The concrete class is resolved automatically. 594 """ 595 596 path = Path(filename) 597 with path.open("r", encoding="utf-8") as f: 598 obj = json.load(f) 599 concrete = Data0._registry[obj["__type__"]] 600 result = concrete.from_dict(obj["data"]) 601 type(result).compute_pre_generate(result.flavor) 602 result.compute_post_generate() 603 return result 604 605 def save_to_json_file(self, filename): 606 """Write the instance to *filename* as JSON, mode 0o600 (lab secrets stay private).""" 607 path = Path(filename) 608 fd = os.open( 609 path, 610 os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 611 0o600 612 ) 613 with os.fdopen(fd, "w", encoding="utf-8") as f: 614 json.dump( 615 { 616 "__type__": self._type_key, 617 "data": self.to_dict() 618 }, 619 f, 620 indent=2 # optional but operationally useful 621 )
Base class for lab-specific parameter dataclasses.
Subclass with @dataclass(slots=True) and declare your fields normally.
Three dynamic containers are injected automatically into every instance by
__post_init__:
self.ips—_IPv4InterfaceContainer: namedIPv4Interfacevaluesself.nets—_IPv4NetContainer: namedIPv4Networkvaluesself.macs—_MacContainer: namedEUIMAC-address values
The class-level _registry maps "module.ClassName" keys to concrete
subclasses so that from_dict/unpack/from_json can reconstruct the
correct type from serialised data without knowing it in advance.
Override generate(flavor=None) as a @classmethod to produce a fresh
randomised instance. The result is serialised to data.json at lab start
and reloaded for each evaluation.
436 @classmethod 437 def compute_pre_generate(cls, flavor=None): 438 """Hook called just before generate() and after JSON/msgpack deserialization. 439 440 Override in a subclass to set class-level derived state from *flavor*. 441 The default implementation is a no-op (fully retro-compatible). 442 """ 443 pass
Hook called just before generate() and after JSON/msgpack deserialization.
Override in a subclass to set class-level derived state from flavor. The default implementation is a no-op (fully retro-compatible).
445 def compute_post_generate(self): 446 """Hook called after generate() and after JSON/msgpack deserialization. 447 448 Override in a subclass to set instance-level derived state from data fields. 449 The default implementation is a no-op (fully retro-compatible). 450 """ 451 pass
Hook called after generate() and after JSON/msgpack deserialization.
Override in a subclass to set instance-level derived state from data fields. The default implementation is a no-op (fully retro-compatible).
454 def to_dict(self): 455 """Serialise to a plain dict (JSON-safe). 456 457 Dataclass fields are encoded via :meth:`_encode_value`. ``ips``, ``nets``, 458 and ``macs`` are stored as nested dicts of strings. An attached ``Flavor`` 459 is stored under the ``"flavor"`` key with its type key. 460 """ 461 result = {} 462 for f in fields(self): 463 v = getattr(self, f.name) 464 result[f.name] = self._encode_value(v) 465 result['ips'] = self.ips.to_dict() 466 result['nets'] = self.nets.to_dict() 467 result['macs'] = self.macs.to_dict() 468 if self.flavor is not None: 469 result['flavor'] = {"__flavor_type__": self.flavor._type_key, "data": self.flavor.to_dict()} 470 flavor_name = getattr(self, '__flavor_name', None) 471 if flavor_name is not None: 472 result['__flavor_name'] = flavor_name 473 current_srelab_file = getattr(self, '__current_srelab_file', None) 474 if current_srelab_file is not None: 475 result['__current_srelab_file'] = current_srelab_file 476 return result
Serialise to a plain dict (JSON-safe).
Dataclass fields are encoded via _encode_value(). ips, nets,
and macs are stored as nested dicts of strings. An attached Flavor
is stored under the "flavor" key with its type key.
478 @classmethod 479 def from_dict(cls, d): 480 """Reconstruct a ``Data0`` instance from a plain dict produced by :meth:`to_dict`. 481 482 The concrete subclass is resolved from ``d["__type__"]`` when *cls* is 483 ``Data0`` itself; otherwise the calling class is used directly. 484 Raises ``ValueError`` for unknown type keys. 485 """ 486 d = dict(d) 487 ips_data = d.pop('ips', {}) 488 nets_data = d.pop('nets', {}) 489 macs_data = d.pop('macs', {}) 490 flavor_data = d.pop('flavor', None) 491 flavor_name = d.pop('__flavor_name', None) 492 current_srelab_file = d.pop('__current_srelab_file', None) 493 decoded = {k: cls._decode_value(v) for k, v in d.items()} 494 obj = cls(**decoded) 495 for k, v in ips_data.items(): 496 setattr(obj.ips, k, IPv4Interface(v)) 497 for k, v in nets_data.items(): 498 setattr(obj.nets, k, IPv4Network(v)) 499 for k, v in macs_data.items(): 500 setattr(obj.macs, k, EUI(v)) 501 if flavor_data is not None: 502 flavor_key = flavor_data.get("__flavor_type__") 503 if flavor_key not in Flavor0._registry: 504 raise ValueError(f"unknown Flavor0 type: {flavor_key!r}") 505 flavor_cls = Flavor0._registry[flavor_key] 506 object.__setattr__(obj, 'flavor', flavor_cls.from_dict(flavor_data["data"])) 507 object.__setattr__(obj, '__flavor_name', flavor_name) 508 object.__setattr__(obj, '__current_srelab_file', current_srelab_file) 509 return obj
549 def pack(self): 550 """Serialise to a msgpack binary blob (includes type key for polymorphic reload).""" 551 return msgpack.packb( 552 { 553 "__type__": self._type_key, 554 "data": self.to_dict() 555 }, 556 use_bin_type=True 557 )
Serialise to a msgpack binary blob (includes type key for polymorphic reload).
559 @classmethod 560 def unpack(cls, blob): 561 """Deserialise a msgpack blob produced by :meth:`pack`. Raises ``ValueError`` for unknown types.""" 562 obj = msgpack.unpackb(blob, raw=False) 563 type_key = obj.get("__type__") 564 if type_key not in Data0._registry: 565 raise ValueError(f"unknown Data0 type: {type_key!r}") 566 result = Data0._registry[type_key].from_dict(obj["data"]) 567 type(result).compute_pre_generate(result.flavor) 568 result.compute_post_generate() 569 return result
Deserialise a msgpack blob produced by pack(). Raises ValueError for unknown types.
572 def to_json(self): 573 """Serialise to a JSON string (includes type key for polymorphic reload).""" 574 return json.dumps({ 575 "__type__": self._type_key, 576 "data": self.to_dict() 577 })
Serialise to a JSON string (includes type key for polymorphic reload).
579 @classmethod 580 def from_json(cls, s): 581 """Deserialise a JSON string produced by :meth:`to_json`.""" 582 obj = json.loads(s) 583 concrete = Data0._registry[obj["__type__"]] 584 result = concrete.from_dict(obj["data"]) 585 type(result).compute_pre_generate(result.flavor) 586 result.compute_post_generate() 587 return result
Deserialise a JSON string produced by to_json().
589 @classmethod 590 def load_from_json_file(cls, filename): 591 """ 592 Load a Data0-derived object from a JSON file. 593 The concrete class is resolved automatically. 594 """ 595 596 path = Path(filename) 597 with path.open("r", encoding="utf-8") as f: 598 obj = json.load(f) 599 concrete = Data0._registry[obj["__type__"]] 600 result = concrete.from_dict(obj["data"]) 601 type(result).compute_pre_generate(result.flavor) 602 result.compute_post_generate() 603 return result
Load a Data0-derived object from a JSON file. The concrete class is resolved automatically.
605 def save_to_json_file(self, filename): 606 """Write the instance to *filename* as JSON, mode 0o600 (lab secrets stay private).""" 607 path = Path(filename) 608 fd = os.open( 609 path, 610 os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 611 0o600 612 ) 613 with os.fdopen(fd, "w", encoding="utf-8") as f: 614 json.dump( 615 { 616 "__type__": self._type_key, 617 "data": self.to_dict() 618 }, 619 f, 620 indent=2 # optional but operationally useful 621 )
Write the instance to filename as JSON, mode 0o600 (lab secrets stay private).
636class NetScheme0: 637 """Base class for lab network topology definitions. 638 639 Subclasses declare the topology as class-level dicts and implement state 640 methods decorated with :func:`sre_state`. 641 642 Class-level attributes: 643 644 * ``_machine_specs`` — ``{name: {Machine kwargs}}`` — one entry per container. 645 * ``_network_specs`` — ``{net_name: {color: ...}}`` — optional display hints. 646 * ``_topology`` — ``{net_name: [machine, ...]}`` or ``{net_name: {machine: iface}}`` 647 — which machines connect to each network and on which interface. 648 649 If any of these three attributes is not defined in the subclass (neither as a class 650 variable nor as a property), the corresponding attribute is read from ``data`` 651 (``data.topology``, ``data.machine_specs``, ``data.network_specs``). 652 Explicit subclass definitions always take precedence. 653 654 Instance attributes set by ``__init__``: 655 656 * ``self.data`` — the ``Data0`` instance for this lab run. 657 * ``self.informations`` — Markdown text shown in the Informations tab (set this in ``__init__``). 658 * Named machine and network objects (e.g. ``self.router``, ``self.lan``). 659 660 The ``initial`` state method is always called at ``sre start``. Additional 661 states are applied with ``sre state <lab> <state_name>``. 662 """ 663 664 _machine_specs = {} 665 _network_specs = {} # {net_name: {'color': ...}} — optional colors for networks 666 _topology = {} # {net_name: [m, ...] or {m: iface, ...}} — mixed allowed 667 668 def _resolve_spec(self, attr: str, data_attr: str): 669 """Return the topology/spec dict for *attr*. 670 671 Resolution order: 672 1. If the concrete subclass (or any class before ``NetScheme0`` in the MRO) 673 defines *attr* as a class variable or property, return that value. 674 2. Otherwise look for *data_attr* on ``self.data`` (checks instance attributes 675 first, then class-level attributes set e.g. by ``compute_pre_generate``). 676 3. If neither source provides the attribute, return the ``NetScheme0`` default 677 (an empty dict), which is equivalent to "no topology/specs defined". 678 """ 679 for cls in type(self).__mro__: 680 if cls is NetScheme0: 681 break 682 if attr in cls.__dict__: 683 return getattr(self, attr) 684 return getattr(self.data, data_attr, getattr(NetScheme0, attr)) 685 686 def __init__(self, data, running_lab_name, lab_hash=None): 687 """Build all ``Machine``, ``Network``, and ``NetAdapter`` objects from the class-level specs. 688 689 Args: 690 data: a ``Data0`` instance containing lab-specific parameters. 691 running_lab_name: the runtime identifier ``{ts}@@@{lab}@@@{user}``. 692 lab_hash: optional Kathara lab hash (resolved lazily if omitted). 693 """ 694 self.data = data 695 self.running_lab_name = running_lab_name 696 self.debug_project = os.path.exists(params.debug_project_marker_filename(running_lab_name)) 697 self.lab_name = params.get_lab_name_from_running_lab_name(running_lab_name) 698 self.lab_hash = lab_hash 699 self.current_srelab_file = params.get_current_srelab_file_from_running_lab_name(running_lab_name) 700 self.informations = "" 701 702 _machine_specs = self._resolve_spec('_machine_specs', 'machine_specs') 703 _network_specs = self._resolve_spec('_network_specs', 'network_specs') 704 _topology = self._resolve_spec('_topology', 'topology') 705 706 for name, parameters in _machine_specs.items(): 707 setattr(self, name, Machine(name=name, **parameters)) 708 709 # Create networks from _network_specs (with optional color), then remaining from _topology 710 for name, parameters in _network_specs.items(): 711 setattr(self, name, Network(name=name, **parameters)) 712 713 for net_name in _topology: 714 if not hasattr(self, net_name): 715 setattr(self, net_name, Network(name=net_name)) 716 717 # Create NetAdapters from _topology. 718 # Each value is either a list (auto interface) or a dict {machine: iface_spec}. 719 # iface_spec can be: None (auto), an int, or a (int, mac) tuple. 720 # Interface auto-assignment counts prior connections per machine across all networks. 721 _iface_counter: dict[str, int] = {} 722 for net_name, machines in _topology.items(): 723 net = getattr(self, net_name) 724 if isinstance(machines, dict): 725 items = machines.items() 726 else: 727 items = ((m, None) for m in machines) 728 for mname, iface_spec in items: 729 machine = getattr(self, mname) 730 if isinstance(iface_spec, tuple): 731 iface, mac = iface_spec 732 else: 733 iface, mac = iface_spec, None 734 if iface is None: 735 iface = _iface_counter.get(mname, 0) 736 _iface_counter[mname] = max(_iface_counter.get(mname, 0), iface) + 1 737 NetAdapter(network=net, machine=machine, interface=iface, mac=mac) 738 739 self._ops: dict[str, list] = {} # machine → [str | _FileOp, ...] 740 self._host_ops: dict[int, list] = {} 741 742 self.net_config = None 743 744 def host_interfaces_from_topology(self) -> dict: 745 """Return {machine_name: [net_name, ...]} derived from the resolved topology.""" 746 topology = self._resolve_spec('_topology', 'topology') 747 result = {} 748 for net_name, machines in topology.items(): 749 names = machines.keys() if isinstance(machines, dict) else machines 750 for mname in names: 751 result.setdefault(mname, []).append(net_name) 752 return result 753 754 @sre_state(user_allowed=False) 755 def initial(self): 756 pass 757 758 @classmethod 759 def get_state_methods(cls): 760 """Return a sorted list of all ``@sre_state``-decorated method names in this class hierarchy.""" 761 names = set() 762 for klass in cls.__mro__: 763 for name, fn in klass.__dict__.items(): 764 if callable(fn) and getattr(fn, '_is_sre_state', False): 765 names.add(name) 766 return sorted(names) 767 768 @classmethod 769 def is_state_user_allowed(cls, state): 770 """Return ``True`` if *state* was decorated with ``@sre_state(user_allowed=True)``.""" 771 for klass in cls.__mro__: 772 fn = klass.__dict__.get(state) 773 if fn is not None and getattr(fn, '_is_sre_state', False): 774 return getattr(fn, '_sre_state_user_allowed', False) 775 return False 776 777 @classmethod 778 def get_user_allowed_states(cls): 779 """Return ``{state_name: description}`` for all user-allowed states.""" 780 result = {} 781 for state in cls.get_state_methods(): 782 for klass in cls.__mro__: 783 fn = klass.__dict__.get(state) 784 if fn is not None and getattr(fn, '_is_sre_state', False): 785 if getattr(fn, '_sre_state_user_allowed', False): 786 result[state] = getattr(fn, '_sre_state_description', '') 787 break 788 return result 789 790 def get_data(self): 791 return self.data 792 793 def get_machine(self, machine_name): 794 """Return the :class:`Machine` with *machine_name*, or ``None`` if not found.""" 795 if not hasattr(self, machine_name): 796 return None 797 machine = getattr(self, machine_name) 798 if not isinstance(machine, Machine): 799 return None 800 return machine 801 802 def get_network(self, network_name): 803 """Return the :class:`Network` with *network_name*, or ``None`` if not found.""" 804 if not hasattr(self, network_name): 805 return None 806 machine = getattr(self, network_name) 807 if not isinstance(machine, Network): 808 return None 809 return machine 810 811 def get_machines(self): 812 for _, value in self.__dict__.items(): 813 if isinstance(value, Machine): 814 yield value 815 816 def get_machine_names(self): 817 return [m.name for m in self.get_machines()] 818 819 def has_privileged_machines(self): 820 return any(m.privileged for m in self.get_machines()) 821 822 def get_accessible_machine_names(self): 823 return [m.name for m in self.get_machines() 824 if not m.hidden and m.allow_connection] 825 826 def get_visible_machine_names(self): 827 return [m.name for m in self.get_visibles_machines()] 828 829 def get_visibles_machines(self): 830 for _, value in self.__dict__.items(): 831 if isinstance(value, Machine): 832 if not value.hidden: 833 yield value 834 835 def get_networks(self): 836 for _, value in self.__dict__.items(): 837 if isinstance(value, Network): 838 yield value 839 840 def get_topology(self): 841 return self._resolve_spec('_topology', 'topology') 842 843 def get_machine_specs(self): 844 return self._resolve_spec('_machine_specs', 'machine_specs') 845 846 def get_network_specs(self): 847 return self._resolve_spec('_network_specs', 'network_specs') 848 849 def cmd(self, machine, command, step=1): 850 """Register a shell command to execute inside *machine*'s container at *step*.""" 851 if SRE.args.debug: 852 print(f"[state] [{machine}] CMD (step={step}): {command}", file=sys.stderr) 853 self._ops.setdefault(step, {}).setdefault(machine, []).append(command) 854 855 def host_cmd(self, command, step=1): 856 """Register a shell command to execute on the **host** (not inside a container) at *step*. 857 858 Requires ``params.execute_commands_on_host`` to be enabled; aborts otherwise. 859 """ 860 if params.execute_commands_on_host is False: 861 sys.exit("host_cmd() is disabled by params.execute_commands_on_host") 862 if SRE.args.debug: 863 print(f"[state] HOST_CMD (step={step}): {command}", file=sys.stderr) 864 self._host_ops.setdefault(step, []).append(_HostCmdOp(command)) 865 866 def host_callback(self, callback, step=1): 867 """Register a Python callable to invoke on the host at *step* (called with no arguments).""" 868 if SRE.args.debug: 869 print(f"[state] HOST_CALLBACK (step={step}): {getattr(callback, '__name__', repr(callback))}", 870 file=sys.stderr) 871 self._host_ops.setdefault(step, []).append(_HostCallbackOp(callback)) 872 873 def cp_from_host(self, src: str, machine: str, dest: str, owner: str = "root:root", permissions: int = None, 874 mtime: float = None, step=1): 875 """Register a file copy from the host into *machine*'s container at *step*. 876 877 *src* may be relative (resolved against ``params.files_dir``). 878 *dest* is the absolute path inside the container. 879 """ 880 orig_path = Path(src) 881 if not orig_path.is_absolute(): 882 orig_path = Path(params.files_dir(self.running_lab_name)) / orig_path 883 if SRE.args.debug: 884 print(f"[state] [{machine}] CP (step={step}): {orig_path} -> {dest} (owner={owner})", file=sys.stderr) 885 self._ops.setdefault(step, {}).setdefault(machine, []).append( 886 _CpFromHostOp(orig_path, dest, permissions, owner, mtime) 887 ) 888 889 def cp_to_host(self, machine: str, path: str, dest: str, permissions: int = None, step=1): 890 """Copy file `path` from `machine` to the host files_dir. 891 892 Args: 893 machine: container name 894 path: absolute path of the file inside the container 895 dest: relative destination path inside params.files_dir(running_lab_name) 896 permissions: file mode to apply on the host copy (default: None, leave as written) 897 step: execution step (default 1) 898 """ 899 files_dir = Path(params.files_dir(self.running_lab_name)).resolve() 900 dest_path = (files_dir / dest).resolve() 901 if not dest_path.is_relative_to(files_dir): 902 error_quit(f"cp_to_host: dest '{dest}' is outside files_dir '{files_dir}'") 903 if SRE.args.debug: 904 print(f"[state] [{machine}] CP_TO_HOST (step={step}): {path} -> {dest_path}", file=sys.stderr) 905 self._ops.setdefault(step, {}).setdefault(machine, []).append( 906 _CpToHostOp(path, str(dest_path), permissions) 907 ) 908 909 def file(self, machine, filename, content, permissions=0o644, owner="root:root", mtime=None, step=1): 910 """Register a file to create/overwrite on `machine` during the current state. 911 912 Args: 913 machine: machine name (str) 914 filename: absolute path inside the container (e.g. "/etc/myconfig") 915 content: file content (str or bytes) 916 permissions: octal mode (default 0o644) 917 owner: "user:group" string (default "root:root") 918 mtime: modification time as a Unix timestamp (float); defaults to now 919 step: execution step (default 1); higher steps run after lower ones 920 """ 921 import time as _time 922 raw = content.encode() if isinstance(content, str) else content 923 if SRE.args.debug: 924 print( 925 f"[state] [{machine}] FILE (step={step}): {filename} (permissions={permissions:#o}, owner={owner}, size={len(raw)}B)", 926 file=sys.stderr) 927 self._ops.setdefault(step, {}).setdefault(machine, []).append( 928 _FileOp(filename, raw, permissions, owner, mtime if mtime is not None else _time.time()) 929 ) 930 931 def append_to_file(self, machine, filename, content, permissions=None, owner=None, mtime=None, step=1): 932 """Append content to a file on `machine`; create the file if it does not exist. 933 934 Args: 935 machine: machine name (str) 936 filename: absolute path inside the container (e.g. "/etc/hosts") 937 content: content to append (str or bytes) 938 permissions: octal mode to set after appending (default: leave unchanged) 939 owner: "user:group" to set after appending (default: leave unchanged) 940 mtime: modification time as a Unix timestamp (default: leave unchanged) 941 step: execution step (default 1); higher steps run after lower ones 942 """ 943 raw = content.encode() if isinstance(content, str) else content 944 if SRE.args.debug: 945 print(f"[state] [{machine}] APPEND (step={step}): {filename} (size={len(raw)}B)", file=sys.stderr) 946 self._ops.setdefault(step, {}).setdefault(machine, []).append( 947 _AppendOp(filename, raw, permissions, owner, mtime) 948 ) 949 950 def idempotent_append_to_file(self, machine, filename, content, permissions=None, owner=None, mtime=None, step=1): 951 """Append content to a file on `machine` only if the file does not already end with it. 952 953 Idempotent version of append_to_file: safe to call multiple times — the content 954 is appended at most once per application. If permissions, owner, or mtime are 955 provided they are applied regardless of whether content was appended. 956 957 Args: 958 machine: machine name (str) 959 filename: absolute path inside the container (e.g. "/etc/hosts") 960 content: content to append (str or bytes) 961 permissions: octal mode to set after the check (default: leave unchanged) 962 owner: "user:group" to set after the check (default: leave unchanged) 963 mtime: modification time as a Unix timestamp (default: leave unchanged) 964 step: execution step (default 1); higher steps run after lower ones 965 """ 966 raw = content.encode() if isinstance(content, str) else content 967 if SRE.args.debug: 968 print(f"[state] [{machine}] IDEMPOTENT_APPEND (step={step}): {filename} (size={len(raw)}B)", 969 file=sys.stderr) 970 self._ops.setdefault(step, {}).setdefault(machine, []).append( 971 _IdempotentAppendOp(filename, raw, permissions, owner, mtime) 972 ) 973 974 def compute_state_ops(self, state): 975 self._ops = {} 976 self._host_ops = {} 977 if not hasattr(self, state): 978 error_quit(f"state method {state} does not exist") 979 method = getattr(self, state) 980 if not callable(method): 981 error_quit(f"state method {state} not callable") 982 try: 983 method() 984 except Exception as e: 985 error_quit(f"error during {state} execution: {e}") 986 return self._ops, self._host_ops 987 988 def get_new_lab_from_scheme(self): 989 lab = Lab(name=self.running_lab_name) 990 abb_lab_name = params.get_abbreviated_lab_name_from_running_lab_name(self.running_lab_name) 991 _used_ports: set = set() 992 xauth_cookie = _resolve_xauth_cookie() 993 for m in self.get_machines(): 994 m.ports = [_resolve_port(p, _used_ports) for p in m.ports] 995 m.envs[f"{params.sre_name_env_variable}={abb_lab_name}"] = True 996 if m.x11_host: 997 m.envs[f"{params.sre_host_ip_env_variable}={params.sre_host_ip}"] = True 998 if xauth_cookie: 999 m.envs[f"{params.sre_xauth_cookie_env_variable}={xauth_cookie}"] = True 1000 volumes = [] 1001 if len(m.volumes) > 0: 1002 has_private_volume = False 1003 for v in m.volumes: 1004 if len(v) > 2 and v[3] == 'private': 1005 has_private_volume = True 1006 break 1007 1008 if has_private_volume and params.disable_volume_mount_on_root_partition: 1009 private_mount_dir = self.get_private_mount_dir() 1010 p = Path(private_mount_dir) 1011 while not p.exists(): 1012 p = p.parent 1013 if os.stat(p).st_dev == os.stat('/').st_dev: 1014 error_quit( 1015 f"Machine '{m.name}': private mount dir '{private_mount_dir}' is on the root partition (disable_volume_mount_on_root_partition=True)") 1016 1017 for v in m.volumes: 1018 if len(v) > 2 and v[3] == 'private': 1019 if v[0].startswith('/'): 1020 error_quit(f"volume {v[0]} is private and have an absolute path") 1021 v_host = f"{self.get_private_mount_dir()}/{v[0]}" 1022 os.makedirs(v_host, exist_ok=True) 1023 os.chmod(v_host, 0o777) 1024 else: 1025 if v[0].startswith('/'): 1026 v_host = v[0] 1027 else: 1028 v_host = f"{self.get_user_public_dir()}/{v[0]}" 1029 os.makedirs(v_host, exist_ok=True) 1030 os.chmod(v_host, 0o777) 1031 if v[2] not in ['rw', 'ro']: 1032 error_quit(f"volume mode {v[2]} not supported (only rw and ro)") 1033 1034 volumes.append(f"{v_host}|{v[1]}|{v[2]}") 1035 lab.new_machine(m.name, 1036 exec_commands=m.exec_commands, 1037 sysctls=m.sysctls, 1038 envs=m.envs, 1039 ports=m.ports, 1040 ulimits=m.ulimits, 1041 volumes=volumes, 1042 shell=m.kathara_shell, 1043 image=m.image, 1044 bridged=m.bridged, 1045 privileged=m.privileged, 1046 entrypoint=m.entrypoint, 1047 ) 1048 for net, netAdapter in m.net_adapters.items(): 1049 lab.connect_machine_to_link(m.name, net.name, machine_iface_number=netAdapter.interface, 1050 mac_address=str(netAdapter.mac).replace('-', 1051 ':').lower() if netAdapter.mac is not None else None) 1052 self.lab_hash = lab.hash 1053 return lab 1054 1055 def get_lab_hash(self): 1056 if self.lab_hash is None: 1057 lab = Lab(name=self.running_lab_name) 1058 self.lab_hash = lab.hash 1059 return self.lab_hash 1060 1061 def get_lab_from_kathara(self): 1062 lab_hash = self.get_lab_hash() 1063 return Kathara.get_instance().get_lab_from_api(lab_hash=lab_hash) 1064 1065 def get_public_lab_dir(self): 1066 return params.public_lab_dir(self.running_lab_name) 1067 1068 def get_private_lab_dir(self): 1069 return params.private_lab_dir(self.running_lab_name) 1070 1071 def get_files_dir(self): 1072 return params.files_dir(self.running_lab_name) 1073 1074 def get_private_mount_dir(self): 1075 return params.private_mount_dir(self.running_lab_name) 1076 1077 def get_user_public_dir(self): 1078 try: 1079 return Path(params.link_to_user_public_dir(self.running_lab_name)).readlink() 1080 except OSError: 1081 return Path(params.link_to_user_public_dir(self.running_lab_name)) 1082 1083 def get_shared_dir(self): 1084 return f"{self.get_user_public_dir()}/{params.shared_dir_name}" 1085 1086 def answers_dir(self): 1087 return params.answers_dir(self.running_lab_name) 1088 1089 def answers_file(self): 1090 return params.answers_filename(self.running_lab_name)
Base class for lab network topology definitions.
Subclasses declare the topology as class-level dicts and implement state
methods decorated with sre_state().
Class-level attributes:
_machine_specs—{name: {Machine kwargs}}— one entry per container._network_specs—{net_name: {color: ...}}— optional display hints._topology—{net_name: [machine, ...]}or{net_name: {machine: iface}}— which machines connect to each network and on which interface.
If any of these three attributes is not defined in the subclass (neither as a class
variable nor as a property), the corresponding attribute is read from data
(data.topology, data.machine_specs, data.network_specs).
Explicit subclass definitions always take precedence.
Instance attributes set by __init__:
self.data— theData0instance for this lab run.self.informations— Markdown text shown in the Informations tab (set this in__init__).- Named machine and network objects (e.g.
self.router,self.lan).
The initial state method is always called at sre start. Additional
states are applied with sre state <lab> <state_name>.
686 def __init__(self, data, running_lab_name, lab_hash=None): 687 """Build all ``Machine``, ``Network``, and ``NetAdapter`` objects from the class-level specs. 688 689 Args: 690 data: a ``Data0`` instance containing lab-specific parameters. 691 running_lab_name: the runtime identifier ``{ts}@@@{lab}@@@{user}``. 692 lab_hash: optional Kathara lab hash (resolved lazily if omitted). 693 """ 694 self.data = data 695 self.running_lab_name = running_lab_name 696 self.debug_project = os.path.exists(params.debug_project_marker_filename(running_lab_name)) 697 self.lab_name = params.get_lab_name_from_running_lab_name(running_lab_name) 698 self.lab_hash = lab_hash 699 self.current_srelab_file = params.get_current_srelab_file_from_running_lab_name(running_lab_name) 700 self.informations = "" 701 702 _machine_specs = self._resolve_spec('_machine_specs', 'machine_specs') 703 _network_specs = self._resolve_spec('_network_specs', 'network_specs') 704 _topology = self._resolve_spec('_topology', 'topology') 705 706 for name, parameters in _machine_specs.items(): 707 setattr(self, name, Machine(name=name, **parameters)) 708 709 # Create networks from _network_specs (with optional color), then remaining from _topology 710 for name, parameters in _network_specs.items(): 711 setattr(self, name, Network(name=name, **parameters)) 712 713 for net_name in _topology: 714 if not hasattr(self, net_name): 715 setattr(self, net_name, Network(name=net_name)) 716 717 # Create NetAdapters from _topology. 718 # Each value is either a list (auto interface) or a dict {machine: iface_spec}. 719 # iface_spec can be: None (auto), an int, or a (int, mac) tuple. 720 # Interface auto-assignment counts prior connections per machine across all networks. 721 _iface_counter: dict[str, int] = {} 722 for net_name, machines in _topology.items(): 723 net = getattr(self, net_name) 724 if isinstance(machines, dict): 725 items = machines.items() 726 else: 727 items = ((m, None) for m in machines) 728 for mname, iface_spec in items: 729 machine = getattr(self, mname) 730 if isinstance(iface_spec, tuple): 731 iface, mac = iface_spec 732 else: 733 iface, mac = iface_spec, None 734 if iface is None: 735 iface = _iface_counter.get(mname, 0) 736 _iface_counter[mname] = max(_iface_counter.get(mname, 0), iface) + 1 737 NetAdapter(network=net, machine=machine, interface=iface, mac=mac) 738 739 self._ops: dict[str, list] = {} # machine → [str | _FileOp, ...] 740 self._host_ops: dict[int, list] = {} 741 742 self.net_config = None
Build all Machine, Network, and NetAdapter objects from the class-level specs.
Arguments:
- data: a
Data0instance containing lab-specific parameters. - running_lab_name: the runtime identifier
{ts}@@@{lab}@@@{user}. - lab_hash: optional Kathara lab hash (resolved lazily if omitted).
744 def host_interfaces_from_topology(self) -> dict: 745 """Return {machine_name: [net_name, ...]} derived from the resolved topology.""" 746 topology = self._resolve_spec('_topology', 'topology') 747 result = {} 748 for net_name, machines in topology.items(): 749 names = machines.keys() if isinstance(machines, dict) else machines 750 for mname in names: 751 result.setdefault(mname, []).append(net_name) 752 return result
Return {machine_name: [net_name, ...]} derived from the resolved topology.
758 @classmethod 759 def get_state_methods(cls): 760 """Return a sorted list of all ``@sre_state``-decorated method names in this class hierarchy.""" 761 names = set() 762 for klass in cls.__mro__: 763 for name, fn in klass.__dict__.items(): 764 if callable(fn) and getattr(fn, '_is_sre_state', False): 765 names.add(name) 766 return sorted(names)
Return a sorted list of all @sre_state-decorated method names in this class hierarchy.
768 @classmethod 769 def is_state_user_allowed(cls, state): 770 """Return ``True`` if *state* was decorated with ``@sre_state(user_allowed=True)``.""" 771 for klass in cls.__mro__: 772 fn = klass.__dict__.get(state) 773 if fn is not None and getattr(fn, '_is_sre_state', False): 774 return getattr(fn, '_sre_state_user_allowed', False) 775 return False
Return True if state was decorated with @sre_state(user_allowed=True).
777 @classmethod 778 def get_user_allowed_states(cls): 779 """Return ``{state_name: description}`` for all user-allowed states.""" 780 result = {} 781 for state in cls.get_state_methods(): 782 for klass in cls.__mro__: 783 fn = klass.__dict__.get(state) 784 if fn is not None and getattr(fn, '_is_sre_state', False): 785 if getattr(fn, '_sre_state_user_allowed', False): 786 result[state] = getattr(fn, '_sre_state_description', '') 787 break 788 return result
Return {state_name: description} for all user-allowed states.
793 def get_machine(self, machine_name): 794 """Return the :class:`Machine` with *machine_name*, or ``None`` if not found.""" 795 if not hasattr(self, machine_name): 796 return None 797 machine = getattr(self, machine_name) 798 if not isinstance(machine, Machine): 799 return None 800 return machine
Return the Machine with machine_name, or None if not found.
802 def get_network(self, network_name): 803 """Return the :class:`Network` with *network_name*, or ``None`` if not found.""" 804 if not hasattr(self, network_name): 805 return None 806 machine = getattr(self, network_name) 807 if not isinstance(machine, Network): 808 return None 809 return machine
Return the Network with network_name, or None if not found.
849 def cmd(self, machine, command, step=1): 850 """Register a shell command to execute inside *machine*'s container at *step*.""" 851 if SRE.args.debug: 852 print(f"[state] [{machine}] CMD (step={step}): {command}", file=sys.stderr) 853 self._ops.setdefault(step, {}).setdefault(machine, []).append(command)
Register a shell command to execute inside machine's container at step.
855 def host_cmd(self, command, step=1): 856 """Register a shell command to execute on the **host** (not inside a container) at *step*. 857 858 Requires ``params.execute_commands_on_host`` to be enabled; aborts otherwise. 859 """ 860 if params.execute_commands_on_host is False: 861 sys.exit("host_cmd() is disabled by params.execute_commands_on_host") 862 if SRE.args.debug: 863 print(f"[state] HOST_CMD (step={step}): {command}", file=sys.stderr) 864 self._host_ops.setdefault(step, []).append(_HostCmdOp(command))
Register a shell command to execute on the host (not inside a container) at step.
Requires params.execute_commands_on_host to be enabled; aborts otherwise.
866 def host_callback(self, callback, step=1): 867 """Register a Python callable to invoke on the host at *step* (called with no arguments).""" 868 if SRE.args.debug: 869 print(f"[state] HOST_CALLBACK (step={step}): {getattr(callback, '__name__', repr(callback))}", 870 file=sys.stderr) 871 self._host_ops.setdefault(step, []).append(_HostCallbackOp(callback))
Register a Python callable to invoke on the host at step (called with no arguments).
873 def cp_from_host(self, src: str, machine: str, dest: str, owner: str = "root:root", permissions: int = None, 874 mtime: float = None, step=1): 875 """Register a file copy from the host into *machine*'s container at *step*. 876 877 *src* may be relative (resolved against ``params.files_dir``). 878 *dest* is the absolute path inside the container. 879 """ 880 orig_path = Path(src) 881 if not orig_path.is_absolute(): 882 orig_path = Path(params.files_dir(self.running_lab_name)) / orig_path 883 if SRE.args.debug: 884 print(f"[state] [{machine}] CP (step={step}): {orig_path} -> {dest} (owner={owner})", file=sys.stderr) 885 self._ops.setdefault(step, {}).setdefault(machine, []).append( 886 _CpFromHostOp(orig_path, dest, permissions, owner, mtime) 887 )
Register a file copy from the host into machine's container at step.
src may be relative (resolved against params.files_dir).
dest is the absolute path inside the container.
889 def cp_to_host(self, machine: str, path: str, dest: str, permissions: int = None, step=1): 890 """Copy file `path` from `machine` to the host files_dir. 891 892 Args: 893 machine: container name 894 path: absolute path of the file inside the container 895 dest: relative destination path inside params.files_dir(running_lab_name) 896 permissions: file mode to apply on the host copy (default: None, leave as written) 897 step: execution step (default 1) 898 """ 899 files_dir = Path(params.files_dir(self.running_lab_name)).resolve() 900 dest_path = (files_dir / dest).resolve() 901 if not dest_path.is_relative_to(files_dir): 902 error_quit(f"cp_to_host: dest '{dest}' is outside files_dir '{files_dir}'") 903 if SRE.args.debug: 904 print(f"[state] [{machine}] CP_TO_HOST (step={step}): {path} -> {dest_path}", file=sys.stderr) 905 self._ops.setdefault(step, {}).setdefault(machine, []).append( 906 _CpToHostOp(path, str(dest_path), permissions) 907 )
Copy file path from machine to the host files_dir.
Arguments:
- machine: container name
- path: absolute path of the file inside the container
- dest: relative destination path inside params.files_dir(running_lab_name)
- permissions: file mode to apply on the host copy (default: None, leave as written)
- step: execution step (default 1)
909 def file(self, machine, filename, content, permissions=0o644, owner="root:root", mtime=None, step=1): 910 """Register a file to create/overwrite on `machine` during the current state. 911 912 Args: 913 machine: machine name (str) 914 filename: absolute path inside the container (e.g. "/etc/myconfig") 915 content: file content (str or bytes) 916 permissions: octal mode (default 0o644) 917 owner: "user:group" string (default "root:root") 918 mtime: modification time as a Unix timestamp (float); defaults to now 919 step: execution step (default 1); higher steps run after lower ones 920 """ 921 import time as _time 922 raw = content.encode() if isinstance(content, str) else content 923 if SRE.args.debug: 924 print( 925 f"[state] [{machine}] FILE (step={step}): {filename} (permissions={permissions:#o}, owner={owner}, size={len(raw)}B)", 926 file=sys.stderr) 927 self._ops.setdefault(step, {}).setdefault(machine, []).append( 928 _FileOp(filename, raw, permissions, owner, mtime if mtime is not None else _time.time()) 929 )
Register a file to create/overwrite on machine during the current state.
Arguments:
- machine: machine name (str)
- filename: absolute path inside the container (e.g. "/etc/myconfig")
- content: file content (str or bytes)
- permissions: octal mode (default 0o644)
- owner: "user:group" string (default "root:root")
- mtime: modification time as a Unix timestamp (float); defaults to now
- step: execution step (default 1); higher steps run after lower ones
931 def append_to_file(self, machine, filename, content, permissions=None, owner=None, mtime=None, step=1): 932 """Append content to a file on `machine`; create the file if it does not exist. 933 934 Args: 935 machine: machine name (str) 936 filename: absolute path inside the container (e.g. "/etc/hosts") 937 content: content to append (str or bytes) 938 permissions: octal mode to set after appending (default: leave unchanged) 939 owner: "user:group" to set after appending (default: leave unchanged) 940 mtime: modification time as a Unix timestamp (default: leave unchanged) 941 step: execution step (default 1); higher steps run after lower ones 942 """ 943 raw = content.encode() if isinstance(content, str) else content 944 if SRE.args.debug: 945 print(f"[state] [{machine}] APPEND (step={step}): {filename} (size={len(raw)}B)", file=sys.stderr) 946 self._ops.setdefault(step, {}).setdefault(machine, []).append( 947 _AppendOp(filename, raw, permissions, owner, mtime) 948 )
Append content to a file on machine; create the file if it does not exist.
Arguments:
- machine: machine name (str)
- filename: absolute path inside the container (e.g. "/etc/hosts")
- content: content to append (str or bytes)
- permissions: octal mode to set after appending (default: leave unchanged)
- owner: "user:group" to set after appending (default: leave unchanged)
- mtime: modification time as a Unix timestamp (default: leave unchanged)
- step: execution step (default 1); higher steps run after lower ones
950 def idempotent_append_to_file(self, machine, filename, content, permissions=None, owner=None, mtime=None, step=1): 951 """Append content to a file on `machine` only if the file does not already end with it. 952 953 Idempotent version of append_to_file: safe to call multiple times — the content 954 is appended at most once per application. If permissions, owner, or mtime are 955 provided they are applied regardless of whether content was appended. 956 957 Args: 958 machine: machine name (str) 959 filename: absolute path inside the container (e.g. "/etc/hosts") 960 content: content to append (str or bytes) 961 permissions: octal mode to set after the check (default: leave unchanged) 962 owner: "user:group" to set after the check (default: leave unchanged) 963 mtime: modification time as a Unix timestamp (default: leave unchanged) 964 step: execution step (default 1); higher steps run after lower ones 965 """ 966 raw = content.encode() if isinstance(content, str) else content 967 if SRE.args.debug: 968 print(f"[state] [{machine}] IDEMPOTENT_APPEND (step={step}): {filename} (size={len(raw)}B)", 969 file=sys.stderr) 970 self._ops.setdefault(step, {}).setdefault(machine, []).append( 971 _IdempotentAppendOp(filename, raw, permissions, owner, mtime) 972 )
Append content to a file on machine only if the file does not already end with it.
Idempotent version of append_to_file: safe to call multiple times — the content is appended at most once per application. If permissions, owner, or mtime are provided they are applied regardless of whether content was appended.
Arguments:
- machine: machine name (str)
- filename: absolute path inside the container (e.g. "/etc/hosts")
- content: content to append (str or bytes)
- permissions: octal mode to set after the check (default: leave unchanged)
- owner: "user:group" to set after the check (default: leave unchanged)
- mtime: modification time as a Unix timestamp (default: leave unchanged)
- step: execution step (default 1); higher steps run after lower ones
974 def compute_state_ops(self, state): 975 self._ops = {} 976 self._host_ops = {} 977 if not hasattr(self, state): 978 error_quit(f"state method {state} does not exist") 979 method = getattr(self, state) 980 if not callable(method): 981 error_quit(f"state method {state} not callable") 982 try: 983 method() 984 except Exception as e: 985 error_quit(f"error during {state} execution: {e}") 986 return self._ops, self._host_ops
988 def get_new_lab_from_scheme(self): 989 lab = Lab(name=self.running_lab_name) 990 abb_lab_name = params.get_abbreviated_lab_name_from_running_lab_name(self.running_lab_name) 991 _used_ports: set = set() 992 xauth_cookie = _resolve_xauth_cookie() 993 for m in self.get_machines(): 994 m.ports = [_resolve_port(p, _used_ports) for p in m.ports] 995 m.envs[f"{params.sre_name_env_variable}={abb_lab_name}"] = True 996 if m.x11_host: 997 m.envs[f"{params.sre_host_ip_env_variable}={params.sre_host_ip}"] = True 998 if xauth_cookie: 999 m.envs[f"{params.sre_xauth_cookie_env_variable}={xauth_cookie}"] = True 1000 volumes = [] 1001 if len(m.volumes) > 0: 1002 has_private_volume = False 1003 for v in m.volumes: 1004 if len(v) > 2 and v[3] == 'private': 1005 has_private_volume = True 1006 break 1007 1008 if has_private_volume and params.disable_volume_mount_on_root_partition: 1009 private_mount_dir = self.get_private_mount_dir() 1010 p = Path(private_mount_dir) 1011 while not p.exists(): 1012 p = p.parent 1013 if os.stat(p).st_dev == os.stat('/').st_dev: 1014 error_quit( 1015 f"Machine '{m.name}': private mount dir '{private_mount_dir}' is on the root partition (disable_volume_mount_on_root_partition=True)") 1016 1017 for v in m.volumes: 1018 if len(v) > 2 and v[3] == 'private': 1019 if v[0].startswith('/'): 1020 error_quit(f"volume {v[0]} is private and have an absolute path") 1021 v_host = f"{self.get_private_mount_dir()}/{v[0]}" 1022 os.makedirs(v_host, exist_ok=True) 1023 os.chmod(v_host, 0o777) 1024 else: 1025 if v[0].startswith('/'): 1026 v_host = v[0] 1027 else: 1028 v_host = f"{self.get_user_public_dir()}/{v[0]}" 1029 os.makedirs(v_host, exist_ok=True) 1030 os.chmod(v_host, 0o777) 1031 if v[2] not in ['rw', 'ro']: 1032 error_quit(f"volume mode {v[2]} not supported (only rw and ro)") 1033 1034 volumes.append(f"{v_host}|{v[1]}|{v[2]}") 1035 lab.new_machine(m.name, 1036 exec_commands=m.exec_commands, 1037 sysctls=m.sysctls, 1038 envs=m.envs, 1039 ports=m.ports, 1040 ulimits=m.ulimits, 1041 volumes=volumes, 1042 shell=m.kathara_shell, 1043 image=m.image, 1044 bridged=m.bridged, 1045 privileged=m.privileged, 1046 entrypoint=m.entrypoint, 1047 ) 1048 for net, netAdapter in m.net_adapters.items(): 1049 lab.connect_machine_to_link(m.name, net.name, machine_iface_number=netAdapter.interface, 1050 mac_address=str(netAdapter.mac).replace('-', 1051 ':').lower() if netAdapter.mac is not None else None) 1052 self.lab_hash = lab.hash 1053 return lab
1093class NetAdapter: 1094 def __init__(self, network, machine, interface, mac=None, addresses=None): 1095 if addresses is None: 1096 addresses = [] 1097 if not isinstance(machine, Machine): 1098 raise ValueError("machine must an object of class Machine") 1099 if not isinstance(network, Network): 1100 raise ValueError("network must be an instance of Network") 1101 if mac is not None: 1102 first_byte = int(str(mac).replace('-', ':').split(':')[0], 16) 1103 if first_byte & 1: 1104 raise ValueError( 1105 f"MAC address {mac} on machine '{machine.name}': " 1106 f"multicast bit (LSB of first byte) is set — use a unicast address " 1107 f"(first byte must be even, e.g. replace '{hex(first_byte)}' with '{hex(first_byte & 0xfe)}')." 1108 ) 1109 self.network = network 1110 self.machine = machine 1111 self.interface = interface 1112 self.mac = mac 1113 self.addresses = addresses 1114 machine.net_adapters[network] = self 1115 network.net_adapters[machine] = self
1094 def __init__(self, network, machine, interface, mac=None, addresses=None): 1095 if addresses is None: 1096 addresses = [] 1097 if not isinstance(machine, Machine): 1098 raise ValueError("machine must an object of class Machine") 1099 if not isinstance(network, Network): 1100 raise ValueError("network must be an instance of Network") 1101 if mac is not None: 1102 first_byte = int(str(mac).replace('-', ':').split(':')[0], 16) 1103 if first_byte & 1: 1104 raise ValueError( 1105 f"MAC address {mac} on machine '{machine.name}': " 1106 f"multicast bit (LSB of first byte) is set — use a unicast address " 1107 f"(first byte must be even, e.g. replace '{hex(first_byte)}' with '{hex(first_byte & 0xfe)}')." 1108 ) 1109 self.network = network 1110 self.machine = machine 1111 self.interface = interface 1112 self.mac = mac 1113 self.addresses = addresses 1114 machine.net_adapters[network] = self 1115 network.net_adapters[machine] = self
1118class Network: 1119 def __init__(self, name, color=None, shape=None): 1120 self.name = name 1121 self.color = color 1122 self.shape = shape 1123 self.net_adapters = {} 1124 1125 def get_machines(self): 1126 for m in self.net_adapters.keys(): 1127 yield m
1130class Machine: 1131 def __init__(self, name, image=params.default_docker_image, bridged=False, x11_host=False, mem="", 1132 cpus=None, ipv6=None, 1133 exec_commands=[], 1134 sysctls={}, 1135 envs={}, 1136 ports=None, ulimits={}, volumes=[], 1137 shell=None, 1138 kathara_shell=None, 1139 privileged=None, entrypoint=None, args=[], 1140 hidden=False, 1141 allow_connection=True, 1142 color=None, 1143 shape=None): 1144 if ports is None: 1145 ports = [] 1146 self.name = name 1147 self.image = image 1148 self.bridged = bridged 1149 self.x11_host = x11_host 1150 self.mem = mem 1151 self.cpus = cpus 1152 self.ipv6 = ipv6 1153 self.exec_commands = exec_commands 1154 self.sysctls = sysctls 1155 # Copy: the constructor's mutable default {} is shared across Machines; 1156 # post-construction mutations in get_new_lab_from_scheme (e.g. SRE_HOST_IP 1157 # for x11_host machines) must not leak into machines that didn't opt in. 1158 self.envs = dict(envs) 1159 self.ports = ports 1160 self.ulimits = ulimits 1161 self.volumes = volumes 1162 self.shell = shell 1163 self.kathara_shell = kathara_shell 1164 self.privileged = privileged 1165 self.entrypoint = entrypoint 1166 self.args = args 1167 self.hidden = hidden 1168 self.allow_connection = allow_connection 1169 self.shape = shape 1170 self.color = color 1171 1172 self.net_adapters = {} 1173 1174 if params.disable_volume_mount_on_root_partition: 1175 for v in self.volumes: 1176 v_host = v[0] 1177 if v_host.startswith('/'): 1178 p = Path(v_host) 1179 while not p.exists(): 1180 p = p.parent 1181 if os.stat(p).st_dev == os.stat('/').st_dev: 1182 error_quit( 1183 f"Machine '{self.name}': volume host path '{v_host}' is on the root partition (disable_volume_mount_on_root_partition=True)") 1184 1185 if self.privileged and not params.allow_privileged_machines: 1186 error_quit( 1187 f"Machine '{self.name}': privileged mode is disabled (allow_privileged_machines=False). Can't run in privileged mode") 1188 1189 if not self.bridged and len(self.ports) > 0: 1190 error_quit("To add ports in a machine, you need to activate bridged mode")
1131 def __init__(self, name, image=params.default_docker_image, bridged=False, x11_host=False, mem="", 1132 cpus=None, ipv6=None, 1133 exec_commands=[], 1134 sysctls={}, 1135 envs={}, 1136 ports=None, ulimits={}, volumes=[], 1137 shell=None, 1138 kathara_shell=None, 1139 privileged=None, entrypoint=None, args=[], 1140 hidden=False, 1141 allow_connection=True, 1142 color=None, 1143 shape=None): 1144 if ports is None: 1145 ports = [] 1146 self.name = name 1147 self.image = image 1148 self.bridged = bridged 1149 self.x11_host = x11_host 1150 self.mem = mem 1151 self.cpus = cpus 1152 self.ipv6 = ipv6 1153 self.exec_commands = exec_commands 1154 self.sysctls = sysctls 1155 # Copy: the constructor's mutable default {} is shared across Machines; 1156 # post-construction mutations in get_new_lab_from_scheme (e.g. SRE_HOST_IP 1157 # for x11_host machines) must not leak into machines that didn't opt in. 1158 self.envs = dict(envs) 1159 self.ports = ports 1160 self.ulimits = ulimits 1161 self.volumes = volumes 1162 self.shell = shell 1163 self.kathara_shell = kathara_shell 1164 self.privileged = privileged 1165 self.entrypoint = entrypoint 1166 self.args = args 1167 self.hidden = hidden 1168 self.allow_connection = allow_connection 1169 self.shape = shape 1170 self.color = color 1171 1172 self.net_adapters = {} 1173 1174 if params.disable_volume_mount_on_root_partition: 1175 for v in self.volumes: 1176 v_host = v[0] 1177 if v_host.startswith('/'): 1178 p = Path(v_host) 1179 while not p.exists(): 1180 p = p.parent 1181 if os.stat(p).st_dev == os.stat('/').st_dev: 1182 error_quit( 1183 f"Machine '{self.name}': volume host path '{v_host}' is on the root partition (disable_volume_mount_on_root_partition=True)") 1184 1185 if self.privileged and not params.allow_privileged_machines: 1186 error_quit( 1187 f"Machine '{self.name}': privileged mode is disabled (allow_privileged_machines=False). Can't run in privileged mode") 1188 1189 if not self.bridged and len(self.ports) > 0: 1190 error_quit("To add ports in a machine, you need to activate bridged mode")
1193class Grade0: 1194 """Base class for lab evaluation logic. 1195 1196 Subclasses override :meth:`grade` to *register* tests and questions. The 1197 actual execution happens later in :meth:`run_tests`, which drives a 1198 multi-step loop: 1199 1200 1. Call :meth:`grade` to register commands (they return placeholder values). 1201 2. Execute all registered commands in containers (via :mod:`exetests`). 1202 3. Repeat for each additional step (``self.max_step``). 1203 4. Call :meth:`grade` a final time — commands now return real results. 1204 5. Save the archive. 1205 1206 Key instance attributes available inside ``grade()``: 1207 1208 * ``self.data`` — the ``Data0`` instance (via ``net_scheme``). 1209 * ``self.step`` — current step number (1-based). 1210 * ``self.max_step`` — highest step number registered so far. 1211 """ 1212 1213 def __init__(self, net_scheme): 1214 self.net_scheme = net_scheme 1215 self.archive_dirs = [] 1216 self.files_to_save_in_archives = [] 1217 self._tests = None 1218 self._allow_errors_in_tests = None 1219 self.step = 0 1220 self.max_step = 1 1221 self._questions = None 1222 self._questions_order = None 1223 self._questions_current_order = None 1224 self._cheat_answers = {} 1225 self._answers = {} 1226 self._grade_list = [] 1227 self._grades = {} 1228 self._grade_parts: list[GradePart] = [] 1229 self._errors = [] 1230 self._total_grade_self_eval = 0 1231 self._total_max_self_eval = 0 1232 self._total_grade_exo_eval = 0 1233 self._total_max_exo_eval = 0 1234 self._maximum_mark = params.default_maximum_mark 1235 self._use_numerical_marks = params.use_numerical_marks_by_default 1236 self._display_marks_in_auto_evaluations = params.display_marks_in_auto_evaluations_by_default 1237 self._mark_self_eval = None 1238 self._mark_exo_eval = None 1239 self._section_counter = [] 1240 self.section_fmt = [('R', 1), ('N', 1), ('l', 2), ('N', 3)] 1241 self._eval_date = None 1242 self._re_eval_date = None 1243 self._default_language = 'en' 1244 self.auto_eval_count = 0 1245 self._exam_json = None 1246 self.full_reset() 1247 1248 def full_reset(self): 1249 """Reset all state including tests and answers (called from ``__init__``).""" 1250 self.step = 0 1251 self.max_step = 1 1252 self._tests = {} # keys : (machine,step)->(cmd, timeout)->(result, code)) 1253 self._allow_errors_in_tests = {} 1254 self._host_tests = {} # step -> {(command, timeout): (result, code)} 1255 self._allow_errors_in_host_tests = {} # (step, command, timeout) -> True 1256 self._answers = {} # hash -> answer 1257 self._eval_date = None 1258 self.reset_before_grade() 1259 1260 def reset_before_grade(self): 1261 """Clear questions, grade list, and section counters before each call to :meth:`grade`.""" 1262 self._questions = dict() # hash->Question 1263 self._questions_order = dict() # order -> [Question1, Question2, ...] 1264 self._questions_current_order = 100 1265 self._cheat_answers = {} # state->(question_hash->answer) 1266 self._grade_list = [] 1267 self._grades = dict() # title -> GradeElement 1268 self._grade_parts = [] 1269 self._section_counter = [] 1270 1271 def grade(self): 1272 """Override to register tests, questions, and grade elements. 1273 1274 Called multiple times during :meth:`run_tests` — once per step plus a 1275 final time when all results are available. Do **not** produce side-effects 1276 here; only call ``self.test()``, ``self.question_*()`` and 1277 ``self.add_grade_element()`` / ``self.set_grade()``. 1278 """ 1279 self._grade_list = [] 1280 self._grade_parts = [] 1281 1282 def _compute_mark(self, total_grade, total_max): 1283 """Compute a mark from a ``(total_grade, total_max)`` pair. 1284 1285 Returns ``None`` if ``total_max == 0``. 1286 Numerical mode (default): rounded to one decimal, scaled to ``_maximum_mark``. 1287 Letter mode: A+/A/B/C/D/F. 1288 """ 1289 if total_max == 0: 1290 return None 1291 if self._use_numerical_marks: 1292 return math.ceil(10 * self._maximum_mark * total_grade / total_max) / 10 1293 else: 1294 ratio = total_grade / total_max 1295 if ratio >= 18 / 20: 1296 return "A+" 1297 elif ratio >= 16 / 20: 1298 return "A" 1299 elif ratio >= 14 / 20: 1300 return "B" 1301 elif ratio >= 12 / 20: 1302 return "C" 1303 elif ratio >= 10 / 20: 1304 return "D" 1305 else: 1306 return "F" 1307 1308 def mark_self_eval(self): 1309 """Final mark over elements visible in self-eval (scope & SELF_EVAL_SCOPE).""" 1310 return self._compute_mark(self._total_grade_self_eval, self._total_max_self_eval) 1311 1312 def mark_exo_eval(self): 1313 """Final mark over elements visible in non-auto eval / outline / sheet.""" 1314 return self._compute_mark(self._total_grade_exo_eval, self._total_max_exo_eval) 1315 1316 def get_data(self): 1317 return self.net_scheme.get_data() 1318 1319 def get_errors(self): 1320 return self._errors 1321 1322 def get_grade_list(self): 1323 return self._grade_list 1324 1325 def get_grade_parts(self): 1326 return self._grade_parts 1327 1328 def get_answers(self): 1329 return self._answers 1330 1331 def get_cheat_answers(self, state: str): 1332 return self._cheat_answers.get(state) 1333 1334 def get_tests(self): 1335 return self._tests 1336 1337 def get_exetests_strings(self, step: int): 1338 result = dict() 1339 for (machine, step1) in self._tests.keys(): 1340 if step != step1: 1341 continue 1342 result[machine] = params.exetests_separator.join( 1343 [f"{timeout}:{cmd}" for (cmd, timeout) in self._tests[(machine, step)].keys()]) 1344 return result 1345 1346 def get_running_lab_name(self): 1347 return self.net_scheme.running_lab_name 1348 1349 def increment_section_counter(self, level: int): 1350 if len(self._section_counter) < level + 1: 1351 self._section_counter += [0 for i in range(level + 1 - len(self._section_counter))] 1352 else: 1353 self._section_counter = self._section_counter[:(level + 1)] 1354 self._section_counter[level] += 1 1355 1356 def set_section_counter(self, level: int, value: int): 1357 if len(self._section_counter) < level + 1: 1358 self._section_counter += [0 for i in range(level + 1 - len(self._section_counter))] 1359 else: 1360 self._section_counter = self._section_counter[:(level + 1)] 1361 self._section_counter[level] = value 1362 1363 def section(self, level: int = 0, fmt=None, show: int = None, pad: str = None): 1364 self.increment_section_counter(level) 1365 return self.current_section(level, fmt, show, pad) 1366 1367 def current_section(self, level: int = 0, fmt=None, show: int = None, pad: str = None): 1368 # fmt is a list of (type, depth[, prefix]) tuples, one per level. 1369 # type: R=ROMAN, r=roman, L=Letters, l=letters, N=number 1370 # depth: how many consecutive counters to display, ending at this level 1371 # prefix: optional string prepended to the result (default: '') 1372 # e.g. [('R',1),('N',2,' ')] → level 0: "I." / level 1: " I.1." 1373 if fmt is None: 1374 fmt = self.section_fmt 1375 1376 def _to_roman(n: int) -> str: 1377 vals = [ 1378 (1000, 'M'), (900, 'CM'), (500, 'D'), (400, 'CD'), 1379 (100, 'C'), (90, 'XC'), (50, 'L'), (40, 'XL'), 1380 (10, 'X'), (9, 'IX'), (5, 'V'), (4, 'IV'), (1, 'I'), 1381 ] 1382 result = '' 1383 for value, numeral in vals: 1384 while n >= value: 1385 result += numeral 1386 n -= value 1387 return result 1388 1389 def _to_capital_letter(n: int) -> str: 1390 result = '' 1391 while n > 0: 1392 n, r = divmod(n - 1, 26) 1393 result = chr(ord('A') + r) + result 1394 return result 1395 1396 def _to_lowercase_letter(n: int) -> str: 1397 result = '' 1398 while n > 0: 1399 n, r = divmod(n - 1, 26) 1400 result = chr(ord('a') + r) + result 1401 return result 1402 1403 def _convert(n: int, fmt_type: str) -> str: 1404 match fmt_type: 1405 case 'R': 1406 return _to_roman(n) 1407 case 'r': 1408 return _to_roman(n).lower() 1409 case 'L': 1410 return _to_capital_letter(n) 1411 case 'l': 1412 return _to_lowercase_letter(n) 1413 case _: 1414 return str(n) 1415 1416 entry = fmt[level] if level < len(fmt) else ('N', level + 1) 1417 _, depth = entry[:2] 1418 prefix = pad if pad is not None else (entry[2] if len(entry) > 2 else '') 1419 if show is not None: 1420 depth = show 1421 start = max(0, level - depth + 1) 1422 parts = [_convert(self._section_counter[j] if j < len(self._section_counter) else 0, 1423 fmt[j][0] if j < len(fmt) else 'N') 1424 for j in range(start, level + 1)] 1425 return prefix + ".".join(parts) + ". " 1426 1427 def load_answers(self): 1428 """Load student answers from ``answers.json`` into ``self._answers``.""" 1429 self._answers = {} 1430 try: 1431 fd = os.open(params.answers_filename(running_lab_name=self.get_running_lab_name()), 1432 os.O_RDONLY | os.O_NOFOLLOW) 1433 with os.fdopen(fd) as f: 1434 self._answers = json.load(f) 1435 except (OSError, json.JSONDecodeError): 1436 self._answers = {} 1437 1438 def get_questions_ordered(self): 1439 q = [] 1440 for _, v in sorted(self._questions_order.items()): 1441 q += v 1442 return q 1443 1444 def export_questions(self): 1445 pass 1446 1447 def test_host(self, command, step=1, timeout: int = params.default_timeout, default_value='', 1448 default_code: int = 0, allow_error: bool = False): 1449 """Register and retrieve the result of a host-side test command. 1450 1451 On the first call (registration pass) returns *default_value* / *default_code*. 1452 On subsequent calls (result pass) returns the actual ``(stdout, exit_code)``. 1453 Set *allow_error* to suppress error recording on non-zero exit. 1454 """ 1455 if params.execute_commands_on_host is False: 1456 sys.exit("test_host() is disabled by params.execute_commands_on_host") 1457 if self.max_step < step: 1458 self.max_step = step 1459 if step not in self._host_tests: 1460 self._host_tests[step] = {} 1461 if allow_error and (step, command, timeout) not in self._allow_errors_in_host_tests: 1462 self._allow_errors_in_host_tests[(step, command, timeout)] = True 1463 if (command, timeout) not in self._host_tests[step]: 1464 self._host_tests[step][(command, timeout)] = (default_value, default_code) 1465 return default_value, default_code 1466 return self._host_tests[step][(command, timeout)] 1467 1468 def test(self, machine_name, command, step=1, timeout: int = params.default_timeout, default_value='', 1469 default_code: int = 0, allow_error: bool = False): 1470 """Register and retrieve the result of a command executed inside *machine_name*. 1471 1472 On the first call (registration pass) returns *default_value* / *default_code*. 1473 On subsequent calls (result pass) returns the actual ``(stdout, exit_code)``. 1474 *timeout* is in seconds. Set *allow_error* to suppress error recording. 1475 """ 1476 if self.max_step < step: 1477 self.max_step = step 1478 if (machine_name, step) not in self._tests: 1479 self._tests[(machine_name, step)] = {} 1480 if allow_error and (machine_name, step, command, timeout) not in self._allow_errors_in_tests: 1481 self._allow_errors_in_tests[(machine_name, step, command, timeout)] = True 1482 if (command, timeout) not in self._tests[(machine_name, step)]: 1483 self._tests[(machine_name, step)][(command, timeout)] = (default_value, default_code) 1484 return default_value, default_code 1485 return self._tests[(machine_name, step)][(command, timeout)] 1486 1487 @staticmethod 1488 def _apply_section(section: str, title) -> TranslatedText: 1489 """Prepend *section* to every language value in *title*.""" 1490 tt = TranslatedText.from_value(title) 1491 if not section: 1492 return tt 1493 return TranslatedText({lang: section + text for lang, text in tt.items()}) 1494 1495 def question_text(self, title, section='', description='', hash=None, order=None, default_answer='', 1496 cheat_answers=None): 1497 """Register a free-text question and return the student's current answer string. 1498 1499 Returns *default_answer* if the student has not answered yet. 1500 *cheat_answers* maps state names to answer strings used in automated testing. 1501 """ 1502 title = self._apply_section(section, title) 1503 if order is None: 1504 order1 = self._questions_current_order 1505 self._questions_current_order = ((self._questions_current_order // 100) + 1) * 100 1506 else: 1507 order1 = order 1508 1509 q = QuestionText(title, description, hash, order1) 1510 if order1 not in self._questions_order: 1511 self._questions_order[order1] = [] 1512 self._questions_order[order1].append(q) 1513 1514 if q.question_hash in self._questions: 1515 write_error(f"Duplicate question hash {q.question_hash}") 1516 self._questions[q.question_hash] = q 1517 1518 if cheat_answers is not None: 1519 for state, answer in cheat_answers.items(): 1520 if state not in self._cheat_answers: 1521 self._cheat_answers[state] = {} 1522 self._cheat_answers[state][q.question_hash] = answer 1523 1524 if q.question_hash in self._answers: 1525 return self._answers[q.question_hash] 1526 return default_answer 1527 1528 _FORM_FIELD_RE = _re.compile(r'@@\{([^:}]+):([^}]*)\}@@') 1529 1530 def question_form(self, title, section='', description='', hash=None, order=None, cheat_answers=None): 1531 """Register a form question with inline @@{field_name:regex}@@ fields. 1532 1533 Returns the student's answers as a dict {field_name: value}, or {} if none yet. 1534 cheat_answers format: {state_name: {field_name: value, ...}} 1535 """ 1536 title = self._apply_section(section, title) 1537 from .common import QuestionForm 1538 1539 # The @@{field:regex}@@ markers are language-independent, so when the 1540 # description is a TranslatedText (wrapped in tr()) extract fields from 1541 # its resolved string. The full TranslatedText is still stored on the 1542 # question for per-language rendering. 1543 desc_str = (description.resolve('') 1544 if isinstance(description, TranslatedText) else description) 1545 1546 fields = [] 1547 for m in self._FORM_FIELD_RE.finditer(desc_str): 1548 name, spec = m.group(1), m.group(2) 1549 if spec.startswith('>') or '>>>' in spec: 1550 raw = spec[1:] if spec.startswith('>') else spec 1551 fields.append({"name": name, "choices": [ 1552 c.strip().split('>>>')[1].strip() if '>>>' in c.strip() else c.strip() 1553 for c in raw.split('|') 1554 ]}) 1555 elif spec.startswith('?'): 1556 default = spec[1:].strip().lower() not in ('', 'false') 1557 fields.append({"name": name, "checkbox": default}) 1558 else: 1559 fields.append({"name": name, "regex": spec}) 1560 1561 if order is None: 1562 order1 = self._questions_current_order 1563 self._questions_current_order = ((self._questions_current_order // 100) + 1) * 100 1564 else: 1565 order1 = order 1566 1567 q = QuestionForm(title, description, hash, order1, fields) 1568 if order1 not in self._questions_order: 1569 self._questions_order[order1] = [] 1570 self._questions_order[order1].append(q) 1571 1572 if q.question_hash in self._questions: 1573 write_error(f"Duplicate question hash {q.question_hash}") 1574 self._questions[q.question_hash] = q 1575 1576 if cheat_answers is not None: 1577 for state, field_answers in cheat_answers.items(): 1578 if state not in self._cheat_answers: 1579 self._cheat_answers[state] = {} 1580 self._cheat_answers[state][q.question_hash] = json.dumps( 1581 field_answers, ensure_ascii=False 1582 ) 1583 1584 if q.question_hash in self._answers: 1585 try: 1586 return json.loads(self._answers[q.question_hash]) 1587 except (json.JSONDecodeError, TypeError): 1588 return {} 1589 return {} 1590 1591 def question_dummy(self, title, section='', description='', hash=None, order=None): 1592 """Register a display-only block (no answer widget shown to the student).""" 1593 title = self._apply_section(section, title) 1594 if order is None: 1595 order1 = self._questions_current_order 1596 self._questions_current_order = ((self._questions_current_order // 100) + 1) * 100 1597 else: 1598 order1 = order 1599 1600 q = QuestionDummy(title, description, hash, order1) 1601 if order1 not in self._questions_order: 1602 self._questions_order[order1] = [] 1603 self._questions_order[order1].append(q) 1604 1605 if q.question_hash in self._questions: 1606 write_error(f"Duplicate question hash {q.question_hash}") 1607 self._questions[q.question_hash] = q 1608 1609 def add_grade_part(self, title, description=''): 1610 """Register a new :class:`GradePart` group and return it. 1611 1612 Pass the returned object to ``add_grade_element(..., grade_part=...)`` 1613 to associate elements with this part. Parts are displayed in 1614 registration order (with a subtotal row per part) in the GUI 1615 evaluation view and in ``sre outline`` PDFs. 1616 """ 1617 if description: 1618 description = TranslatedText.from_value(description, self._default_language) 1619 gp = GradePart(title=title, description=description) 1620 if any(p.title == title for p in self._grade_parts): 1621 write_error(f"Duplicate grade part title = {title}") 1622 self._grade_parts.append(gp) 1623 return gp 1624 1625 def add_grade_element(self, title, max_grade, description='', grade=0, scope=params.BOTH_EVAL_SCOPE, 1626 grade_part=None): 1627 """Add a graded rubric item. Initial *grade* defaults to 0; use :meth:`set_grade` to update it. 1628 1629 ``scope`` is a bitmask: ``SELF_EVAL_SCOPE`` (1) for self-eval only, 1630 ``EXO_EVAL_SCOPE`` (2) for non-auto eval / outline / sheet only, 1631 ``BOTH_EVAL_SCOPE`` (3, default) for both audiences. 1632 1633 ``grade_part`` optionally associates this element with a 1634 :class:`GradePart` previously returned by :meth:`add_grade_part`. 1635 """ 1636 if scope not in params.grade_scopes: 1637 raise ValueError(f"Invalid scope {scope!r}; expected one of {params.grade_scopes}") 1638 if description: 1639 description = TranslatedText.from_value(description, self._default_language) 1640 grade_part_title = None 1641 if grade_part is not None: 1642 if not isinstance(grade_part, GradePart): 1643 raise TypeError(f"grade_part must be a GradePart, got {type(grade_part).__name__}") 1644 if grade_part not in self._grade_parts: 1645 write_error(f"Unregistered grade part {grade_part.title!r} passed to add_grade_element") 1646 grade_part_title = grade_part.title 1647 g = GradeElement(title=title, max_grade=max_grade, description=description, grade=grade, scope=scope, 1648 grade_part=grade_part_title) 1649 self._grade_list.append(g) 1650 key = _tt_hash_str(title) 1651 if key in self._grades: 1652 write_error(f"Duplicate grade title = {title}") 1653 self._grades[key] = g 1654 1655 def set_grade(self, title, grade): 1656 """Set the numeric *grade* for the element previously registered under *title*.""" 1657 self._grades[_tt_hash_str(title)].grade = grade 1658 1659 def save_lab_info(self): 1660 debug_project = os.path.exists( 1661 params.debug_project_marker_filename(self.net_scheme.running_lab_name) 1662 ) 1663 if debug_project: 1664 visible_machines = list(self.net_scheme.get_machines()) 1665 else: 1666 visible_machines = [m for m in self.net_scheme.get_machines() if not m.hidden] 1667 lab_hash = self.net_scheme.get_lab_hash() 1668 1669 def _get_stats(m): 1670 s = next(Kathara.get_instance().get_machine_stats(lab_hash=lab_hash, machine_name=m.name), None) 1671 return m, s 1672 1673 stats_map = {} 1674 with ThreadPoolExecutor(max_workers=min(params.max_docker_concurrency, len(visible_machines))) as executor: 1675 for m, s in executor.map(_get_stats, visible_machines): 1676 stats_map[m.name] = (m, s) 1677 1678 info_machines = [] 1679 for m in visible_machines: 1680 m, s = stats_map[m.name] 1681 interfaces = [] 1682 for net, netAdapter in m.net_adapters.items(): 1683 interfaces.append(InfoInterface(network=net.name, interface_name=f"eth{netAdapter.interface}")) 1684 info_machines.append( 1685 InfoMachine(name=m.name, status=s.status if s is not None else "", hidden=m.hidden, 1686 bridged=m.bridged, 1687 x11_host=m.x11_host, 1688 ports=m.ports, 1689 allow_connection=m.allow_connection, 1690 color=m.color or "", 1691 shape=m.shape or "", 1692 interfaces=interfaces)) 1693 1694 network_colors = { 1695 net.name: net.color 1696 for m in visible_machines 1697 for net in m.net_adapters 1698 if net.color 1699 } 1700 network_shapes = { 1701 net.name: net.shape 1702 for m in visible_machines 1703 for net in m.net_adapters 1704 if net.shape 1705 } 1706 1707 module_rvlab = sys.modules[params.srelab_py_name.removesuffix(".py")] 1708 default_language = getattr(module_rvlab, 'default_language', 'en') 1709 self._default_language = default_language 1710 show_nat_network = getattr(module_rvlab, 'show_nat_network', params.default_show_nat_network) 1711 nat_network_name = getattr(module_rvlab, 'host_network_name', params.default_host_network_name) 1712 nat_network_color = getattr(module_rvlab, 'host_network_color', params.default_host_network_color) 1713 host_network_exploded = getattr(module_rvlab, 'host_network_exploded', 1714 params.default_host_network_exploded) 1715 host_network_edge_relative_length = float(getattr( 1716 module_rvlab, 'host_network_edge_relative_length', 1717 params.default_host_network_edge_relative_length)) 1718 schema_splines = getattr(module_rvlab, 'schema_splines', params.graphviz_default_splines) 1719 schema_overlap = getattr(module_rvlab, 'schema_overlap', params.graphviz_default_overlap) 1720 1721 if self.step == 0: 1722 self.grade() 1723 self.step += 1 1724 1725 questions = self.get_questions_ordered() 1726 1727 if hasattr(module_rvlab, 'title'): 1728 title = module_rvlab.title 1729 else: 1730 lab_name = params.get_lab_name_from_running_lab_name(self.get_running_lab_name()) 1731 title = lab_name.removesuffix('.py') 1732 title = TranslatedText.from_value(title, default_language) 1733 1734 if hasattr(module_rvlab, 'delay_between_self_grade'): 1735 delay_between_self_grade = module_rvlab.delay_between_self_grade 1736 else: 1737 delay_between_self_grade = 0 1738 1739 if hasattr(module_rvlab, 'allow_self_grade'): 1740 allow_self_grade = module_rvlab.allow_self_grade 1741 else: 1742 allow_self_grade = False 1743 1744 if debug_project: 1745 delay_between_self_grade = 0 1746 allow_self_grade = True 1747 if hasattr(module_rvlab, 'export_kathara_project'): 1748 export_kathara_project = module_rvlab.export_kathara_project 1749 else: 1750 export_kathara_project = False 1751 1752 eval_interval_without_exam_mode = getattr(module_rvlab, 'eval_interval_without_exam_mode', params.default_eval_interval_without_exam_mode) 1753 eval_before_exit = getattr(module_rvlab, 'eval_before_exit', False) 1754 1755 informations = TranslatedText.from_value(self.net_scheme.informations, default_language) 1756 for q in questions: 1757 q.title = TranslatedText.from_value(q.title, default_language) 1758 q.description = TranslatedText.from_value(q.description, default_language) 1759 1760 net_scheme_cls = type(self.net_scheme) 1761 if debug_project: 1762 user_allowed_states_raw = {} 1763 for state in net_scheme_cls.get_state_methods(): 1764 desc = '' 1765 for klass in net_scheme_cls.__mro__: 1766 fn = klass.__dict__.get(state) 1767 if fn is not None and getattr(fn, '_is_sre_state', False): 1768 desc = getattr(fn, '_sre_state_description', '') 1769 break 1770 user_allowed_states_raw[state] = desc 1771 else: 1772 user_allowed_states_raw = ( 1773 net_scheme_cls.get_user_allowed_states() 1774 if getattr(module_rvlab, 'allow_user_states', False) else {} 1775 ) 1776 user_allowed_states = { 1777 state: TranslatedText.from_value(desc, default_language) if desc else desc 1778 for state, desc in user_allowed_states_raw.items() 1779 } 1780 1781 if debug_project: 1782 module_allows_user_states = getattr(module_rvlab, 'allow_user_states', False) 1783 admin_only_states = [ 1784 state for state in user_allowed_states_raw 1785 if not module_allows_user_states 1786 or not net_scheme_cls.is_state_user_allowed(state) 1787 ] 1788 else: 1789 admin_only_states = [] 1790 1791 info = InfoLab(lab_name=self.net_scheme.lab_name, lab_hash=self.net_scheme.lab_hash, 1792 title=title, machines=info_machines, delay_between_self_grade=delay_between_self_grade, 1793 questions=questions, informations=informations, 1794 export_kathara_project=export_kathara_project, allow_self_grade=allow_self_grade, 1795 debug_project=debug_project, 1796 eval_interval_without_exam_mode=eval_interval_without_exam_mode, 1797 eval_before_exit=eval_before_exit, 1798 default_language=default_language, 1799 user_allowed_states=user_allowed_states, 1800 admin_only_states=admin_only_states, 1801 network_colors=network_colors, 1802 network_shapes=network_shapes, 1803 show_nat_network=show_nat_network, 1804 nat_network_name=nat_network_name, 1805 nat_network_color=nat_network_color, 1806 host_network_exploded=host_network_exploded, 1807 host_network_edge_relative_length=host_network_edge_relative_length, 1808 schema_splines=schema_splines, 1809 schema_overlap=schema_overlap) 1810 1811 info_json = info.to_json() 1812 info_filename = params.info_filename(self.net_scheme.running_lab_name) 1813 1814 save_info_json = None 1815 try: 1816 with open(info_filename, "r") as f: 1817 save_info_json = f.read() 1818 except FileNotFoundError: 1819 pass 1820 if save_info_json == info_json: 1821 return 1822 1823 temp_file = tempfile.NamedTemporaryFile( 1824 delete=False, 1825 dir=Path(self.net_scheme.get_public_lab_dir()).parent) 1826 with open(temp_file.name, "w") as f: 1827 print(info_json, file=f) 1828 f.flush() 1829 os.fsync(f.fileno()) 1830 os.chmod(temp_file.name, 0o644) 1831 os.replace(temp_file.name, info_filename) 1832 1833 def add_error(self, error, category=ErrorCategory.ERROR, step: int = 1): 1834 if self.step != step: 1835 return 1836 log_error(error) 1837 self._errors.append((category.value, error)) 1838 1839 def add_warning(self, warning, step: int = 1): 1840 self.add_error(warning, category=ErrorCategory.WARNING, step=step) 1841 1842 @staticmethod 1843 def run_tests_on_machine(machine_name, machine, exetests): 1844 environment = {params.exetests_env_name: exetests} 1845 code, output = machine.api_object.exec_run([params.exetests_machines_path], 1846 stdin=False, 1847 stdout=True, 1848 stderr=False, 1849 tty=False, 1850 environment=environment, 1851 ) 1852 return machine_name, code, output 1853 1854 def run_tests(self): 1855 self._tests = {} 1856 self._section_counter = [] 1857 lab = self.net_scheme.get_lab_from_kathara() 1858 self._errors = [] 1859 self.load_answers() 1860 self._eval_date = datetime.datetime.now().isoformat() 1861 1862 while self.step <= self.max_step: 1863 self.reset_before_grade() 1864 self.grade() 1865 self.step += 1 1866 tests = self.get_tests() 1867 exetests_by_machine = self.get_exetests_strings(self.step) 1868 if SRE.args.debug and self.step <= self.max_step: 1869 log_debug(f"Commands step {self.step} (after running grade on step {self.step - 1}):") 1870 for machine, cmds in exetests_by_machine.items(): 1871 c = cmds.split(params.exetests_separator) 1872 c1 = " - ".join(c) 1873 log_debug(f"{machine}: {c1}") 1874 1875 results = {} 1876 with ThreadPoolExecutor( 1877 max_workers=max(1, min(params.max_docker_concurrency, len(lab.machines)))) as executor: 1878 futures_to_machines = { 1879 executor.submit( 1880 Grade0.run_tests_on_machine, 1881 machine_name, 1882 machine, 1883 exetests_by_machine[machine_name], 1884 ): machine_name 1885 for machine_name, machine in lab.machines.items() 1886 if machine_name in exetests_by_machine and len(exetests_by_machine[machine_name]) > 0 1887 } 1888 for future in as_completed(futures_to_machines): 1889 machine_name = futures_to_machines[future] 1890 try: 1891 machine_name, code, output = future.result() 1892 results[machine_name] = (code, output) 1893 except Exception as e: 1894 self.add_error(f"error during test execution on machine {machine_name}: {e}", step=self.step) 1895 continue 1896 1897 for machine_name, (exetests_code, output) in results.items(): 1898 if (machine_name, self.step) not in self._tests: 1899 self._tests[(machine_name, self.step)] = {} 1900 if exetests_code != 0: 1901 self.add_error( 1902 f"exetests error on {machine_name}: {exetests_by_machine[machine_name]} -- return code {exetests_code}", 1903 step=self.step) 1904 output1 = output.decode("utf-8") 1905 separator, _, rest = output1.partition("\n") 1906 output2 = rest.split(f"\n{separator}\n") 1907 for i in range(0, len(output2), 2): 1908 ligne1 = "" 1909 try: 1910 ligne1, date1, result = output2[i].split("\n", 2) 1911 except ValueError: 1912 result = "" 1913 if not ligne1: 1914 continue 1915 timeout_s, cmd = ligne1.split(":", 1) 1916 timeout = int(timeout_s) 1917 date2, code_s = output2[i + 1].split("\n", 1) 1918 try: 1919 code = int(code_s.strip()) 1920 except ValueError: 1921 code = -2 1922 self.add_error(f"test error on {machine_name}:{self.step}:{cmd} illegal error code", 1923 step=self.step) 1924 if code != 0: 1925 if not self._allow_errors_in_tests.get((machine_name, self.step, cmd, timeout), False): 1926 self.add_error(f"test error on {machine_name}:{cmd} code={code}", step=self.step) 1927 self._tests[(machine_name, self.step)][cmd, timeout] = (result, code) 1928 if SRE.args.debug: 1929 for machine_name in lab.machines: 1930 if (machine_name, self.step) not in self._tests: 1931 continue 1932 for (cmd, timeout) in self._tests[(machine_name, self.step)]: 1933 log_debug(f"machine {machine_name} - step {self.step} - command {cmd} - timeout {timeout}:") 1934 result, code = self._tests[(machine_name, self.step)][cmd, timeout] 1935 log_debug(result) 1936 log_debug(f"-------- exit code {code}\n") 1937 1938 host_step_cmds = self._host_tests.get(self.step, {}) 1939 if host_step_cmds: 1940 def _run_host_cmd(cmd, t): 1941 from .utils_privileges import preexec_drop_to_sre 1942 run_cmd = shlex.split(cmd) if params.execute_commands_on_host == "split" else cmd 1943 use_shell = params.execute_commands_on_host == "shell" 1944 try: 1945 proc = subprocess.run( 1946 run_cmd, shell=use_shell, capture_output=True, text=True, 1947 timeout=t if t > 0 else None, 1948 preexec_fn=preexec_drop_to_sre, 1949 ) 1950 return cmd, t, proc.stdout, proc.returncode 1951 except subprocess.TimeoutExpired: 1952 return cmd, t, '', -1 1953 except Exception: 1954 return cmd, t, '', -2 1955 1956 with ThreadPoolExecutor(max_workers=min(params.max_docker_concurrency, 1957 len(host_step_cmds))) as executor: 1958 futures = {executor.submit(_run_host_cmd, cmd, t): (cmd, t) 1959 for (cmd, t) in host_step_cmds} 1960 for future in as_completed(futures): 1961 cmd, t, result, code = future.result() 1962 if code != 0: 1963 if not self._allow_errors_in_host_tests.get((self.step, cmd, t), False): 1964 self.add_error(f"host test error: {cmd} code={code}", step=self.step) 1965 self._host_tests[self.step][(cmd, t)] = (result, code) 1966 if SRE.args.debug: 1967 log_debug(f"host - step {self.step} - command {cmd} - timeout {t}:") 1968 log_debug(result) 1969 log_debug(f"-------- exit code {code}\n") 1970 1971 if SRE.args.debug and len(self._errors) > 0: 1972 log_debug(f"{len(self._errors)} errors:\n " + "\n".join(f"[{e[0]}] {e[1]}" for e in self._errors)) 1973 self.compute_total() 1974 self._mark_self_eval = self.mark_self_eval() 1975 self._mark_exo_eval = self.mark_exo_eval() 1976 # log_error(f"DEBUG _tests at save: {len(self._tests)} machine-step entries") 1977 # for (machine, step), cmds in self._tests.items(): 1978 # for (cmd, timeout), (result, code) in cmds.items(): 1979 # log_error(f" [{machine}][step={step}] cmd={cmd!r} result_len={len(result)} code={code}") 1980 1981 def compute_total(self): 1982 """Accumulate per-scope totals in one pass; BOTH-scope elements contribute to both.""" 1983 self._total_grade_self_eval = 0 1984 self._total_max_self_eval = 0 1985 self._total_grade_exo_eval = 0 1986 self._total_max_exo_eval = 0 1987 for g in self._grade_list: 1988 if g.scope & params.SELF_EVAL_SCOPE: 1989 self._total_grade_self_eval += g.grade 1990 self._total_max_self_eval += g.max_grade 1991 if g.scope & params.EXO_EVAL_SCOPE: 1992 self._total_grade_exo_eval += g.grade 1993 self._total_max_exo_eval += g.max_grade 1994 1995 def save_tests(self): 1996 now = datetime.datetime.now() 1997 if self._exam_json is None: 1998 exam_path = Path(params.sre_pub_dir) / params.exam_json_name 1999 try: 2000 self._exam_json = json.loads(exam_path.read_text()) 2001 except FileNotFoundError: 2002 pass 2003 except Exception as e: 2004 log_error(f"can't read {exam_path}: {e}") 2005 for d1 in self.archive_dirs: 2006 d = Path(d1).expanduser().resolve() 2007 try: 2008 if not d.exists(): 2009 os.mkdir(d, 0o700) 2010 else: 2011 os.listdir(d) # trigger mount / catch stale handle early 2012 permissions = os.stat(d).st_mode 2013 if permissions != 0o700: 2014 os.chmod(d, 0o700) 2015 filename = d / params.get_archive_name(self.net_scheme.running_lab_name, now) 2016 self.save_tests_on_file(str(filename)) 2017 except Exception as e: 2018 log_error(f"can't save archive to {d}: {e}") 2019 2020 def save_tests_on_file(self, filename: str): 2021 files_content = {} 2022 if self.files_to_save_in_archives: 2023 fdir = Path(params.files_dir(self.get_running_lab_name())) 2024 if fdir.is_dir(): 2025 for pattern in self.files_to_save_in_archives: 2026 for fpath in fdir.iterdir(): 2027 if fpath.is_file() and fnmatch.fnmatch(fpath.name, pattern): 2028 try: 2029 files_content[fpath.name] = fpath.read_bytes() 2030 except Exception: 2031 pass 2032 archive = { 2033 params.running_lab_name_keyword: self.get_running_lab_name(), 2034 params.eval_date_keyword: self._eval_date, 2035 params.re_eval_date_keyword: self._re_eval_date, 2036 'data_json': self.get_data().to_json(), 2037 'tests': self.get_tests(), 2038 'errors': self.get_errors(), 2039 'answers': self.get_answers(), 2040 'grade_list': [asdict(e) for e in self._grade_list], 2041 'grade_parts': [asdict(p) for p in self._grade_parts], 2042 'total_grade_self_eval': self._total_grade_self_eval, 2043 'total_max_self_eval': self._total_max_self_eval, 2044 'mark_self_eval': self._mark_self_eval, 2045 'total_grade_exo_eval': self._total_grade_exo_eval, 2046 'total_max_exo_eval': self._total_max_exo_eval, 2047 'mark_exo_eval': self._mark_exo_eval, 2048 'maximum_mark': self._maximum_mark, 2049 'files': files_content, 2050 } 2051 if self._exam_json is not None: 2052 archive[params.exam_json_keyword] = self._exam_json 2053 cctx = zstd.ZstdCompressor(level=6) 2054 with tempfile.NamedTemporaryFile( 2055 mode="wb", 2056 dir=os.path.dirname(filename), 2057 delete=False 2058 ) as f: 2059 with cctx.stream_writer(f) as compressor: 2060 packed = msgpack.packb(archive, use_bin_type=True) 2061 compressor.write(packed) 2062 # f.flush() 2063 # os.fsync(f.fileno()) 2064 temp_name = f.name 2065 os.replace(temp_name, filename)
Base class for lab evaluation logic.
Subclasses override grade() to register tests and questions. The
actual execution happens later in run_tests(), which drives a
multi-step loop:
- Call
grade()to register commands (they return placeholder values). - Execute all registered commands in containers (via
exetests). - Repeat for each additional step (
self.max_step). - Call
grade()a final time — commands now return real results. - Save the archive.
Key instance attributes available inside grade():
self.data— theData0instance (vianet_scheme).self.step— current step number (1-based).self.max_step— highest step number registered so far.
1213 def __init__(self, net_scheme): 1214 self.net_scheme = net_scheme 1215 self.archive_dirs = [] 1216 self.files_to_save_in_archives = [] 1217 self._tests = None 1218 self._allow_errors_in_tests = None 1219 self.step = 0 1220 self.max_step = 1 1221 self._questions = None 1222 self._questions_order = None 1223 self._questions_current_order = None 1224 self._cheat_answers = {} 1225 self._answers = {} 1226 self._grade_list = [] 1227 self._grades = {} 1228 self._grade_parts: list[GradePart] = [] 1229 self._errors = [] 1230 self._total_grade_self_eval = 0 1231 self._total_max_self_eval = 0 1232 self._total_grade_exo_eval = 0 1233 self._total_max_exo_eval = 0 1234 self._maximum_mark = params.default_maximum_mark 1235 self._use_numerical_marks = params.use_numerical_marks_by_default 1236 self._display_marks_in_auto_evaluations = params.display_marks_in_auto_evaluations_by_default 1237 self._mark_self_eval = None 1238 self._mark_exo_eval = None 1239 self._section_counter = [] 1240 self.section_fmt = [('R', 1), ('N', 1), ('l', 2), ('N', 3)] 1241 self._eval_date = None 1242 self._re_eval_date = None 1243 self._default_language = 'en' 1244 self.auto_eval_count = 0 1245 self._exam_json = None 1246 self.full_reset()
1248 def full_reset(self): 1249 """Reset all state including tests and answers (called from ``__init__``).""" 1250 self.step = 0 1251 self.max_step = 1 1252 self._tests = {} # keys : (machine,step)->(cmd, timeout)->(result, code)) 1253 self._allow_errors_in_tests = {} 1254 self._host_tests = {} # step -> {(command, timeout): (result, code)} 1255 self._allow_errors_in_host_tests = {} # (step, command, timeout) -> True 1256 self._answers = {} # hash -> answer 1257 self._eval_date = None 1258 self.reset_before_grade()
Reset all state including tests and answers (called from __init__).
1260 def reset_before_grade(self): 1261 """Clear questions, grade list, and section counters before each call to :meth:`grade`.""" 1262 self._questions = dict() # hash->Question 1263 self._questions_order = dict() # order -> [Question1, Question2, ...] 1264 self._questions_current_order = 100 1265 self._cheat_answers = {} # state->(question_hash->answer) 1266 self._grade_list = [] 1267 self._grades = dict() # title -> GradeElement 1268 self._grade_parts = [] 1269 self._section_counter = []
Clear questions, grade list, and section counters before each call to grade().
1271 def grade(self): 1272 """Override to register tests, questions, and grade elements. 1273 1274 Called multiple times during :meth:`run_tests` — once per step plus a 1275 final time when all results are available. Do **not** produce side-effects 1276 here; only call ``self.test()``, ``self.question_*()`` and 1277 ``self.add_grade_element()`` / ``self.set_grade()``. 1278 """ 1279 self._grade_list = [] 1280 self._grade_parts = []
Override to register tests, questions, and grade elements.
Called multiple times during run_tests() — once per step plus a
final time when all results are available. Do not produce side-effects
here; only call self.test(), self.question_*() and
self.add_grade_element() / self.set_grade().
1308 def mark_self_eval(self): 1309 """Final mark over elements visible in self-eval (scope & SELF_EVAL_SCOPE).""" 1310 return self._compute_mark(self._total_grade_self_eval, self._total_max_self_eval)
Final mark over elements visible in self-eval (scope & SELF_EVAL_SCOPE).
1312 def mark_exo_eval(self): 1313 """Final mark over elements visible in non-auto eval / outline / sheet.""" 1314 return self._compute_mark(self._total_grade_exo_eval, self._total_max_exo_eval)
Final mark over elements visible in non-auto eval / outline / sheet.
1337 def get_exetests_strings(self, step: int): 1338 result = dict() 1339 for (machine, step1) in self._tests.keys(): 1340 if step != step1: 1341 continue 1342 result[machine] = params.exetests_separator.join( 1343 [f"{timeout}:{cmd}" for (cmd, timeout) in self._tests[(machine, step)].keys()]) 1344 return result
1349 def increment_section_counter(self, level: int): 1350 if len(self._section_counter) < level + 1: 1351 self._section_counter += [0 for i in range(level + 1 - len(self._section_counter))] 1352 else: 1353 self._section_counter = self._section_counter[:(level + 1)] 1354 self._section_counter[level] += 1
1356 def set_section_counter(self, level: int, value: int): 1357 if len(self._section_counter) < level + 1: 1358 self._section_counter += [0 for i in range(level + 1 - len(self._section_counter))] 1359 else: 1360 self._section_counter = self._section_counter[:(level + 1)] 1361 self._section_counter[level] = value
1367 def current_section(self, level: int = 0, fmt=None, show: int = None, pad: str = None): 1368 # fmt is a list of (type, depth[, prefix]) tuples, one per level. 1369 # type: R=ROMAN, r=roman, L=Letters, l=letters, N=number 1370 # depth: how many consecutive counters to display, ending at this level 1371 # prefix: optional string prepended to the result (default: '') 1372 # e.g. [('R',1),('N',2,' ')] → level 0: "I." / level 1: " I.1." 1373 if fmt is None: 1374 fmt = self.section_fmt 1375 1376 def _to_roman(n: int) -> str: 1377 vals = [ 1378 (1000, 'M'), (900, 'CM'), (500, 'D'), (400, 'CD'), 1379 (100, 'C'), (90, 'XC'), (50, 'L'), (40, 'XL'), 1380 (10, 'X'), (9, 'IX'), (5, 'V'), (4, 'IV'), (1, 'I'), 1381 ] 1382 result = '' 1383 for value, numeral in vals: 1384 while n >= value: 1385 result += numeral 1386 n -= value 1387 return result 1388 1389 def _to_capital_letter(n: int) -> str: 1390 result = '' 1391 while n > 0: 1392 n, r = divmod(n - 1, 26) 1393 result = chr(ord('A') + r) + result 1394 return result 1395 1396 def _to_lowercase_letter(n: int) -> str: 1397 result = '' 1398 while n > 0: 1399 n, r = divmod(n - 1, 26) 1400 result = chr(ord('a') + r) + result 1401 return result 1402 1403 def _convert(n: int, fmt_type: str) -> str: 1404 match fmt_type: 1405 case 'R': 1406 return _to_roman(n) 1407 case 'r': 1408 return _to_roman(n).lower() 1409 case 'L': 1410 return _to_capital_letter(n) 1411 case 'l': 1412 return _to_lowercase_letter(n) 1413 case _: 1414 return str(n) 1415 1416 entry = fmt[level] if level < len(fmt) else ('N', level + 1) 1417 _, depth = entry[:2] 1418 prefix = pad if pad is not None else (entry[2] if len(entry) > 2 else '') 1419 if show is not None: 1420 depth = show 1421 start = max(0, level - depth + 1) 1422 parts = [_convert(self._section_counter[j] if j < len(self._section_counter) else 0, 1423 fmt[j][0] if j < len(fmt) else 'N') 1424 for j in range(start, level + 1)] 1425 return prefix + ".".join(parts) + ". "
1427 def load_answers(self): 1428 """Load student answers from ``answers.json`` into ``self._answers``.""" 1429 self._answers = {} 1430 try: 1431 fd = os.open(params.answers_filename(running_lab_name=self.get_running_lab_name()), 1432 os.O_RDONLY | os.O_NOFOLLOW) 1433 with os.fdopen(fd) as f: 1434 self._answers = json.load(f) 1435 except (OSError, json.JSONDecodeError): 1436 self._answers = {}
Load student answers from answers.json into self._answers.
1447 def test_host(self, command, step=1, timeout: int = params.default_timeout, default_value='', 1448 default_code: int = 0, allow_error: bool = False): 1449 """Register and retrieve the result of a host-side test command. 1450 1451 On the first call (registration pass) returns *default_value* / *default_code*. 1452 On subsequent calls (result pass) returns the actual ``(stdout, exit_code)``. 1453 Set *allow_error* to suppress error recording on non-zero exit. 1454 """ 1455 if params.execute_commands_on_host is False: 1456 sys.exit("test_host() is disabled by params.execute_commands_on_host") 1457 if self.max_step < step: 1458 self.max_step = step 1459 if step not in self._host_tests: 1460 self._host_tests[step] = {} 1461 if allow_error and (step, command, timeout) not in self._allow_errors_in_host_tests: 1462 self._allow_errors_in_host_tests[(step, command, timeout)] = True 1463 if (command, timeout) not in self._host_tests[step]: 1464 self._host_tests[step][(command, timeout)] = (default_value, default_code) 1465 return default_value, default_code 1466 return self._host_tests[step][(command, timeout)]
Register and retrieve the result of a host-side test command.
On the first call (registration pass) returns default_value / default_code.
On subsequent calls (result pass) returns the actual (stdout, exit_code).
Set allow_error to suppress error recording on non-zero exit.
1468 def test(self, machine_name, command, step=1, timeout: int = params.default_timeout, default_value='', 1469 default_code: int = 0, allow_error: bool = False): 1470 """Register and retrieve the result of a command executed inside *machine_name*. 1471 1472 On the first call (registration pass) returns *default_value* / *default_code*. 1473 On subsequent calls (result pass) returns the actual ``(stdout, exit_code)``. 1474 *timeout* is in seconds. Set *allow_error* to suppress error recording. 1475 """ 1476 if self.max_step < step: 1477 self.max_step = step 1478 if (machine_name, step) not in self._tests: 1479 self._tests[(machine_name, step)] = {} 1480 if allow_error and (machine_name, step, command, timeout) not in self._allow_errors_in_tests: 1481 self._allow_errors_in_tests[(machine_name, step, command, timeout)] = True 1482 if (command, timeout) not in self._tests[(machine_name, step)]: 1483 self._tests[(machine_name, step)][(command, timeout)] = (default_value, default_code) 1484 return default_value, default_code 1485 return self._tests[(machine_name, step)][(command, timeout)]
Register and retrieve the result of a command executed inside machine_name.
On the first call (registration pass) returns default_value / default_code.
On subsequent calls (result pass) returns the actual (stdout, exit_code).
timeout is in seconds. Set allow_error to suppress error recording.
1495 def question_text(self, title, section='', description='', hash=None, order=None, default_answer='', 1496 cheat_answers=None): 1497 """Register a free-text question and return the student's current answer string. 1498 1499 Returns *default_answer* if the student has not answered yet. 1500 *cheat_answers* maps state names to answer strings used in automated testing. 1501 """ 1502 title = self._apply_section(section, title) 1503 if order is None: 1504 order1 = self._questions_current_order 1505 self._questions_current_order = ((self._questions_current_order // 100) + 1) * 100 1506 else: 1507 order1 = order 1508 1509 q = QuestionText(title, description, hash, order1) 1510 if order1 not in self._questions_order: 1511 self._questions_order[order1] = [] 1512 self._questions_order[order1].append(q) 1513 1514 if q.question_hash in self._questions: 1515 write_error(f"Duplicate question hash {q.question_hash}") 1516 self._questions[q.question_hash] = q 1517 1518 if cheat_answers is not None: 1519 for state, answer in cheat_answers.items(): 1520 if state not in self._cheat_answers: 1521 self._cheat_answers[state] = {} 1522 self._cheat_answers[state][q.question_hash] = answer 1523 1524 if q.question_hash in self._answers: 1525 return self._answers[q.question_hash] 1526 return default_answer
Register a free-text question and return the student's current answer string.
Returns default_answer if the student has not answered yet. cheat_answers maps state names to answer strings used in automated testing.
1530 def question_form(self, title, section='', description='', hash=None, order=None, cheat_answers=None): 1531 """Register a form question with inline @@{field_name:regex}@@ fields. 1532 1533 Returns the student's answers as a dict {field_name: value}, or {} if none yet. 1534 cheat_answers format: {state_name: {field_name: value, ...}} 1535 """ 1536 title = self._apply_section(section, title) 1537 from .common import QuestionForm 1538 1539 # The @@{field:regex}@@ markers are language-independent, so when the 1540 # description is a TranslatedText (wrapped in tr()) extract fields from 1541 # its resolved string. The full TranslatedText is still stored on the 1542 # question for per-language rendering. 1543 desc_str = (description.resolve('') 1544 if isinstance(description, TranslatedText) else description) 1545 1546 fields = [] 1547 for m in self._FORM_FIELD_RE.finditer(desc_str): 1548 name, spec = m.group(1), m.group(2) 1549 if spec.startswith('>') or '>>>' in spec: 1550 raw = spec[1:] if spec.startswith('>') else spec 1551 fields.append({"name": name, "choices": [ 1552 c.strip().split('>>>')[1].strip() if '>>>' in c.strip() else c.strip() 1553 for c in raw.split('|') 1554 ]}) 1555 elif spec.startswith('?'): 1556 default = spec[1:].strip().lower() not in ('', 'false') 1557 fields.append({"name": name, "checkbox": default}) 1558 else: 1559 fields.append({"name": name, "regex": spec}) 1560 1561 if order is None: 1562 order1 = self._questions_current_order 1563 self._questions_current_order = ((self._questions_current_order // 100) + 1) * 100 1564 else: 1565 order1 = order 1566 1567 q = QuestionForm(title, description, hash, order1, fields) 1568 if order1 not in self._questions_order: 1569 self._questions_order[order1] = [] 1570 self._questions_order[order1].append(q) 1571 1572 if q.question_hash in self._questions: 1573 write_error(f"Duplicate question hash {q.question_hash}") 1574 self._questions[q.question_hash] = q 1575 1576 if cheat_answers is not None: 1577 for state, field_answers in cheat_answers.items(): 1578 if state not in self._cheat_answers: 1579 self._cheat_answers[state] = {} 1580 self._cheat_answers[state][q.question_hash] = json.dumps( 1581 field_answers, ensure_ascii=False 1582 ) 1583 1584 if q.question_hash in self._answers: 1585 try: 1586 return json.loads(self._answers[q.question_hash]) 1587 except (json.JSONDecodeError, TypeError): 1588 return {} 1589 return {}
Register a form question with inline @@{field_name:regex}@@ fields.
Returns the student's answers as a dict {field_name: value}, or {} if none yet. cheat_answers format: {state_name: {field_name: value, ...}}
1591 def question_dummy(self, title, section='', description='', hash=None, order=None): 1592 """Register a display-only block (no answer widget shown to the student).""" 1593 title = self._apply_section(section, title) 1594 if order is None: 1595 order1 = self._questions_current_order 1596 self._questions_current_order = ((self._questions_current_order // 100) + 1) * 100 1597 else: 1598 order1 = order 1599 1600 q = QuestionDummy(title, description, hash, order1) 1601 if order1 not in self._questions_order: 1602 self._questions_order[order1] = [] 1603 self._questions_order[order1].append(q) 1604 1605 if q.question_hash in self._questions: 1606 write_error(f"Duplicate question hash {q.question_hash}") 1607 self._questions[q.question_hash] = q
Register a display-only block (no answer widget shown to the student).
1609 def add_grade_part(self, title, description=''): 1610 """Register a new :class:`GradePart` group and return it. 1611 1612 Pass the returned object to ``add_grade_element(..., grade_part=...)`` 1613 to associate elements with this part. Parts are displayed in 1614 registration order (with a subtotal row per part) in the GUI 1615 evaluation view and in ``sre outline`` PDFs. 1616 """ 1617 if description: 1618 description = TranslatedText.from_value(description, self._default_language) 1619 gp = GradePart(title=title, description=description) 1620 if any(p.title == title for p in self._grade_parts): 1621 write_error(f"Duplicate grade part title = {title}") 1622 self._grade_parts.append(gp) 1623 return gp
Register a new GradePart group and return it.
Pass the returned object to add_grade_element(..., grade_part=...)
to associate elements with this part. Parts are displayed in
registration order (with a subtotal row per part) in the GUI
evaluation view and in sre outline PDFs.
1625 def add_grade_element(self, title, max_grade, description='', grade=0, scope=params.BOTH_EVAL_SCOPE, 1626 grade_part=None): 1627 """Add a graded rubric item. Initial *grade* defaults to 0; use :meth:`set_grade` to update it. 1628 1629 ``scope`` is a bitmask: ``SELF_EVAL_SCOPE`` (1) for self-eval only, 1630 ``EXO_EVAL_SCOPE`` (2) for non-auto eval / outline / sheet only, 1631 ``BOTH_EVAL_SCOPE`` (3, default) for both audiences. 1632 1633 ``grade_part`` optionally associates this element with a 1634 :class:`GradePart` previously returned by :meth:`add_grade_part`. 1635 """ 1636 if scope not in params.grade_scopes: 1637 raise ValueError(f"Invalid scope {scope!r}; expected one of {params.grade_scopes}") 1638 if description: 1639 description = TranslatedText.from_value(description, self._default_language) 1640 grade_part_title = None 1641 if grade_part is not None: 1642 if not isinstance(grade_part, GradePart): 1643 raise TypeError(f"grade_part must be a GradePart, got {type(grade_part).__name__}") 1644 if grade_part not in self._grade_parts: 1645 write_error(f"Unregistered grade part {grade_part.title!r} passed to add_grade_element") 1646 grade_part_title = grade_part.title 1647 g = GradeElement(title=title, max_grade=max_grade, description=description, grade=grade, scope=scope, 1648 grade_part=grade_part_title) 1649 self._grade_list.append(g) 1650 key = _tt_hash_str(title) 1651 if key in self._grades: 1652 write_error(f"Duplicate grade title = {title}") 1653 self._grades[key] = g
Add a graded rubric item. Initial grade defaults to 0; use set_grade() to update it.
scope is a bitmask: SELF_EVAL_SCOPE (1) for self-eval only,
EXO_EVAL_SCOPE (2) for non-auto eval / outline / sheet only,
BOTH_EVAL_SCOPE (3, default) for both audiences.
grade_part optionally associates this element with a
GradePart previously returned by add_grade_part().
1655 def set_grade(self, title, grade): 1656 """Set the numeric *grade* for the element previously registered under *title*.""" 1657 self._grades[_tt_hash_str(title)].grade = grade
Set the numeric grade for the element previously registered under title.
1659 def save_lab_info(self): 1660 debug_project = os.path.exists( 1661 params.debug_project_marker_filename(self.net_scheme.running_lab_name) 1662 ) 1663 if debug_project: 1664 visible_machines = list(self.net_scheme.get_machines()) 1665 else: 1666 visible_machines = [m for m in self.net_scheme.get_machines() if not m.hidden] 1667 lab_hash = self.net_scheme.get_lab_hash() 1668 1669 def _get_stats(m): 1670 s = next(Kathara.get_instance().get_machine_stats(lab_hash=lab_hash, machine_name=m.name), None) 1671 return m, s 1672 1673 stats_map = {} 1674 with ThreadPoolExecutor(max_workers=min(params.max_docker_concurrency, len(visible_machines))) as executor: 1675 for m, s in executor.map(_get_stats, visible_machines): 1676 stats_map[m.name] = (m, s) 1677 1678 info_machines = [] 1679 for m in visible_machines: 1680 m, s = stats_map[m.name] 1681 interfaces = [] 1682 for net, netAdapter in m.net_adapters.items(): 1683 interfaces.append(InfoInterface(network=net.name, interface_name=f"eth{netAdapter.interface}")) 1684 info_machines.append( 1685 InfoMachine(name=m.name, status=s.status if s is not None else "", hidden=m.hidden, 1686 bridged=m.bridged, 1687 x11_host=m.x11_host, 1688 ports=m.ports, 1689 allow_connection=m.allow_connection, 1690 color=m.color or "", 1691 shape=m.shape or "", 1692 interfaces=interfaces)) 1693 1694 network_colors = { 1695 net.name: net.color 1696 for m in visible_machines 1697 for net in m.net_adapters 1698 if net.color 1699 } 1700 network_shapes = { 1701 net.name: net.shape 1702 for m in visible_machines 1703 for net in m.net_adapters 1704 if net.shape 1705 } 1706 1707 module_rvlab = sys.modules[params.srelab_py_name.removesuffix(".py")] 1708 default_language = getattr(module_rvlab, 'default_language', 'en') 1709 self._default_language = default_language 1710 show_nat_network = getattr(module_rvlab, 'show_nat_network', params.default_show_nat_network) 1711 nat_network_name = getattr(module_rvlab, 'host_network_name', params.default_host_network_name) 1712 nat_network_color = getattr(module_rvlab, 'host_network_color', params.default_host_network_color) 1713 host_network_exploded = getattr(module_rvlab, 'host_network_exploded', 1714 params.default_host_network_exploded) 1715 host_network_edge_relative_length = float(getattr( 1716 module_rvlab, 'host_network_edge_relative_length', 1717 params.default_host_network_edge_relative_length)) 1718 schema_splines = getattr(module_rvlab, 'schema_splines', params.graphviz_default_splines) 1719 schema_overlap = getattr(module_rvlab, 'schema_overlap', params.graphviz_default_overlap) 1720 1721 if self.step == 0: 1722 self.grade() 1723 self.step += 1 1724 1725 questions = self.get_questions_ordered() 1726 1727 if hasattr(module_rvlab, 'title'): 1728 title = module_rvlab.title 1729 else: 1730 lab_name = params.get_lab_name_from_running_lab_name(self.get_running_lab_name()) 1731 title = lab_name.removesuffix('.py') 1732 title = TranslatedText.from_value(title, default_language) 1733 1734 if hasattr(module_rvlab, 'delay_between_self_grade'): 1735 delay_between_self_grade = module_rvlab.delay_between_self_grade 1736 else: 1737 delay_between_self_grade = 0 1738 1739 if hasattr(module_rvlab, 'allow_self_grade'): 1740 allow_self_grade = module_rvlab.allow_self_grade 1741 else: 1742 allow_self_grade = False 1743 1744 if debug_project: 1745 delay_between_self_grade = 0 1746 allow_self_grade = True 1747 if hasattr(module_rvlab, 'export_kathara_project'): 1748 export_kathara_project = module_rvlab.export_kathara_project 1749 else: 1750 export_kathara_project = False 1751 1752 eval_interval_without_exam_mode = getattr(module_rvlab, 'eval_interval_without_exam_mode', params.default_eval_interval_without_exam_mode) 1753 eval_before_exit = getattr(module_rvlab, 'eval_before_exit', False) 1754 1755 informations = TranslatedText.from_value(self.net_scheme.informations, default_language) 1756 for q in questions: 1757 q.title = TranslatedText.from_value(q.title, default_language) 1758 q.description = TranslatedText.from_value(q.description, default_language) 1759 1760 net_scheme_cls = type(self.net_scheme) 1761 if debug_project: 1762 user_allowed_states_raw = {} 1763 for state in net_scheme_cls.get_state_methods(): 1764 desc = '' 1765 for klass in net_scheme_cls.__mro__: 1766 fn = klass.__dict__.get(state) 1767 if fn is not None and getattr(fn, '_is_sre_state', False): 1768 desc = getattr(fn, '_sre_state_description', '') 1769 break 1770 user_allowed_states_raw[state] = desc 1771 else: 1772 user_allowed_states_raw = ( 1773 net_scheme_cls.get_user_allowed_states() 1774 if getattr(module_rvlab, 'allow_user_states', False) else {} 1775 ) 1776 user_allowed_states = { 1777 state: TranslatedText.from_value(desc, default_language) if desc else desc 1778 for state, desc in user_allowed_states_raw.items() 1779 } 1780 1781 if debug_project: 1782 module_allows_user_states = getattr(module_rvlab, 'allow_user_states', False) 1783 admin_only_states = [ 1784 state for state in user_allowed_states_raw 1785 if not module_allows_user_states 1786 or not net_scheme_cls.is_state_user_allowed(state) 1787 ] 1788 else: 1789 admin_only_states = [] 1790 1791 info = InfoLab(lab_name=self.net_scheme.lab_name, lab_hash=self.net_scheme.lab_hash, 1792 title=title, machines=info_machines, delay_between_self_grade=delay_between_self_grade, 1793 questions=questions, informations=informations, 1794 export_kathara_project=export_kathara_project, allow_self_grade=allow_self_grade, 1795 debug_project=debug_project, 1796 eval_interval_without_exam_mode=eval_interval_without_exam_mode, 1797 eval_before_exit=eval_before_exit, 1798 default_language=default_language, 1799 user_allowed_states=user_allowed_states, 1800 admin_only_states=admin_only_states, 1801 network_colors=network_colors, 1802 network_shapes=network_shapes, 1803 show_nat_network=show_nat_network, 1804 nat_network_name=nat_network_name, 1805 nat_network_color=nat_network_color, 1806 host_network_exploded=host_network_exploded, 1807 host_network_edge_relative_length=host_network_edge_relative_length, 1808 schema_splines=schema_splines, 1809 schema_overlap=schema_overlap) 1810 1811 info_json = info.to_json() 1812 info_filename = params.info_filename(self.net_scheme.running_lab_name) 1813 1814 save_info_json = None 1815 try: 1816 with open(info_filename, "r") as f: 1817 save_info_json = f.read() 1818 except FileNotFoundError: 1819 pass 1820 if save_info_json == info_json: 1821 return 1822 1823 temp_file = tempfile.NamedTemporaryFile( 1824 delete=False, 1825 dir=Path(self.net_scheme.get_public_lab_dir()).parent) 1826 with open(temp_file.name, "w") as f: 1827 print(info_json, file=f) 1828 f.flush() 1829 os.fsync(f.fileno()) 1830 os.chmod(temp_file.name, 0o644) 1831 os.replace(temp_file.name, info_filename)
1842 @staticmethod 1843 def run_tests_on_machine(machine_name, machine, exetests): 1844 environment = {params.exetests_env_name: exetests} 1845 code, output = machine.api_object.exec_run([params.exetests_machines_path], 1846 stdin=False, 1847 stdout=True, 1848 stderr=False, 1849 tty=False, 1850 environment=environment, 1851 ) 1852 return machine_name, code, output
1854 def run_tests(self): 1855 self._tests = {} 1856 self._section_counter = [] 1857 lab = self.net_scheme.get_lab_from_kathara() 1858 self._errors = [] 1859 self.load_answers() 1860 self._eval_date = datetime.datetime.now().isoformat() 1861 1862 while self.step <= self.max_step: 1863 self.reset_before_grade() 1864 self.grade() 1865 self.step += 1 1866 tests = self.get_tests() 1867 exetests_by_machine = self.get_exetests_strings(self.step) 1868 if SRE.args.debug and self.step <= self.max_step: 1869 log_debug(f"Commands step {self.step} (after running grade on step {self.step - 1}):") 1870 for machine, cmds in exetests_by_machine.items(): 1871 c = cmds.split(params.exetests_separator) 1872 c1 = " - ".join(c) 1873 log_debug(f"{machine}: {c1}") 1874 1875 results = {} 1876 with ThreadPoolExecutor( 1877 max_workers=max(1, min(params.max_docker_concurrency, len(lab.machines)))) as executor: 1878 futures_to_machines = { 1879 executor.submit( 1880 Grade0.run_tests_on_machine, 1881 machine_name, 1882 machine, 1883 exetests_by_machine[machine_name], 1884 ): machine_name 1885 for machine_name, machine in lab.machines.items() 1886 if machine_name in exetests_by_machine and len(exetests_by_machine[machine_name]) > 0 1887 } 1888 for future in as_completed(futures_to_machines): 1889 machine_name = futures_to_machines[future] 1890 try: 1891 machine_name, code, output = future.result() 1892 results[machine_name] = (code, output) 1893 except Exception as e: 1894 self.add_error(f"error during test execution on machine {machine_name}: {e}", step=self.step) 1895 continue 1896 1897 for machine_name, (exetests_code, output) in results.items(): 1898 if (machine_name, self.step) not in self._tests: 1899 self._tests[(machine_name, self.step)] = {} 1900 if exetests_code != 0: 1901 self.add_error( 1902 f"exetests error on {machine_name}: {exetests_by_machine[machine_name]} -- return code {exetests_code}", 1903 step=self.step) 1904 output1 = output.decode("utf-8") 1905 separator, _, rest = output1.partition("\n") 1906 output2 = rest.split(f"\n{separator}\n") 1907 for i in range(0, len(output2), 2): 1908 ligne1 = "" 1909 try: 1910 ligne1, date1, result = output2[i].split("\n", 2) 1911 except ValueError: 1912 result = "" 1913 if not ligne1: 1914 continue 1915 timeout_s, cmd = ligne1.split(":", 1) 1916 timeout = int(timeout_s) 1917 date2, code_s = output2[i + 1].split("\n", 1) 1918 try: 1919 code = int(code_s.strip()) 1920 except ValueError: 1921 code = -2 1922 self.add_error(f"test error on {machine_name}:{self.step}:{cmd} illegal error code", 1923 step=self.step) 1924 if code != 0: 1925 if not self._allow_errors_in_tests.get((machine_name, self.step, cmd, timeout), False): 1926 self.add_error(f"test error on {machine_name}:{cmd} code={code}", step=self.step) 1927 self._tests[(machine_name, self.step)][cmd, timeout] = (result, code) 1928 if SRE.args.debug: 1929 for machine_name in lab.machines: 1930 if (machine_name, self.step) not in self._tests: 1931 continue 1932 for (cmd, timeout) in self._tests[(machine_name, self.step)]: 1933 log_debug(f"machine {machine_name} - step {self.step} - command {cmd} - timeout {timeout}:") 1934 result, code = self._tests[(machine_name, self.step)][cmd, timeout] 1935 log_debug(result) 1936 log_debug(f"-------- exit code {code}\n") 1937 1938 host_step_cmds = self._host_tests.get(self.step, {}) 1939 if host_step_cmds: 1940 def _run_host_cmd(cmd, t): 1941 from .utils_privileges import preexec_drop_to_sre 1942 run_cmd = shlex.split(cmd) if params.execute_commands_on_host == "split" else cmd 1943 use_shell = params.execute_commands_on_host == "shell" 1944 try: 1945 proc = subprocess.run( 1946 run_cmd, shell=use_shell, capture_output=True, text=True, 1947 timeout=t if t > 0 else None, 1948 preexec_fn=preexec_drop_to_sre, 1949 ) 1950 return cmd, t, proc.stdout, proc.returncode 1951 except subprocess.TimeoutExpired: 1952 return cmd, t, '', -1 1953 except Exception: 1954 return cmd, t, '', -2 1955 1956 with ThreadPoolExecutor(max_workers=min(params.max_docker_concurrency, 1957 len(host_step_cmds))) as executor: 1958 futures = {executor.submit(_run_host_cmd, cmd, t): (cmd, t) 1959 for (cmd, t) in host_step_cmds} 1960 for future in as_completed(futures): 1961 cmd, t, result, code = future.result() 1962 if code != 0: 1963 if not self._allow_errors_in_host_tests.get((self.step, cmd, t), False): 1964 self.add_error(f"host test error: {cmd} code={code}", step=self.step) 1965 self._host_tests[self.step][(cmd, t)] = (result, code) 1966 if SRE.args.debug: 1967 log_debug(f"host - step {self.step} - command {cmd} - timeout {t}:") 1968 log_debug(result) 1969 log_debug(f"-------- exit code {code}\n") 1970 1971 if SRE.args.debug and len(self._errors) > 0: 1972 log_debug(f"{len(self._errors)} errors:\n " + "\n".join(f"[{e[0]}] {e[1]}" for e in self._errors)) 1973 self.compute_total() 1974 self._mark_self_eval = self.mark_self_eval() 1975 self._mark_exo_eval = self.mark_exo_eval() 1976 # log_error(f"DEBUG _tests at save: {len(self._tests)} machine-step entries") 1977 # for (machine, step), cmds in self._tests.items(): 1978 # for (cmd, timeout), (result, code) in cmds.items(): 1979 # log_error(f" [{machine}][step={step}] cmd={cmd!r} result_len={len(result)} code={code}")
1981 def compute_total(self): 1982 """Accumulate per-scope totals in one pass; BOTH-scope elements contribute to both.""" 1983 self._total_grade_self_eval = 0 1984 self._total_max_self_eval = 0 1985 self._total_grade_exo_eval = 0 1986 self._total_max_exo_eval = 0 1987 for g in self._grade_list: 1988 if g.scope & params.SELF_EVAL_SCOPE: 1989 self._total_grade_self_eval += g.grade 1990 self._total_max_self_eval += g.max_grade 1991 if g.scope & params.EXO_EVAL_SCOPE: 1992 self._total_grade_exo_eval += g.grade 1993 self._total_max_exo_eval += g.max_grade
Accumulate per-scope totals in one pass; BOTH-scope elements contribute to both.
1995 def save_tests(self): 1996 now = datetime.datetime.now() 1997 if self._exam_json is None: 1998 exam_path = Path(params.sre_pub_dir) / params.exam_json_name 1999 try: 2000 self._exam_json = json.loads(exam_path.read_text()) 2001 except FileNotFoundError: 2002 pass 2003 except Exception as e: 2004 log_error(f"can't read {exam_path}: {e}") 2005 for d1 in self.archive_dirs: 2006 d = Path(d1).expanduser().resolve() 2007 try: 2008 if not d.exists(): 2009 os.mkdir(d, 0o700) 2010 else: 2011 os.listdir(d) # trigger mount / catch stale handle early 2012 permissions = os.stat(d).st_mode 2013 if permissions != 0o700: 2014 os.chmod(d, 0o700) 2015 filename = d / params.get_archive_name(self.net_scheme.running_lab_name, now) 2016 self.save_tests_on_file(str(filename)) 2017 except Exception as e: 2018 log_error(f"can't save archive to {d}: {e}")
2020 def save_tests_on_file(self, filename: str): 2021 files_content = {} 2022 if self.files_to_save_in_archives: 2023 fdir = Path(params.files_dir(self.get_running_lab_name())) 2024 if fdir.is_dir(): 2025 for pattern in self.files_to_save_in_archives: 2026 for fpath in fdir.iterdir(): 2027 if fpath.is_file() and fnmatch.fnmatch(fpath.name, pattern): 2028 try: 2029 files_content[fpath.name] = fpath.read_bytes() 2030 except Exception: 2031 pass 2032 archive = { 2033 params.running_lab_name_keyword: self.get_running_lab_name(), 2034 params.eval_date_keyword: self._eval_date, 2035 params.re_eval_date_keyword: self._re_eval_date, 2036 'data_json': self.get_data().to_json(), 2037 'tests': self.get_tests(), 2038 'errors': self.get_errors(), 2039 'answers': self.get_answers(), 2040 'grade_list': [asdict(e) for e in self._grade_list], 2041 'grade_parts': [asdict(p) for p in self._grade_parts], 2042 'total_grade_self_eval': self._total_grade_self_eval, 2043 'total_max_self_eval': self._total_max_self_eval, 2044 'mark_self_eval': self._mark_self_eval, 2045 'total_grade_exo_eval': self._total_grade_exo_eval, 2046 'total_max_exo_eval': self._total_max_exo_eval, 2047 'mark_exo_eval': self._mark_exo_eval, 2048 'maximum_mark': self._maximum_mark, 2049 'files': files_content, 2050 } 2051 if self._exam_json is not None: 2052 archive[params.exam_json_keyword] = self._exam_json 2053 cctx = zstd.ZstdCompressor(level=6) 2054 with tempfile.NamedTemporaryFile( 2055 mode="wb", 2056 dir=os.path.dirname(filename), 2057 delete=False 2058 ) as f: 2059 with cctx.stream_writer(f) as compressor: 2060 packed = msgpack.packb(archive, use_bin_type=True) 2061 compressor.write(packed) 2062 # f.flush() 2063 # os.fsync(f.fileno()) 2064 temp_name = f.name 2065 os.replace(temp_name, filename)