SRE.lib_sre

   1import datetime
   2import enum
   3import fnmatch
   4import re as _re
   5import os
   6import shlex
   7import socket
   8import subprocess
   9import sys
  10import tempfile
  11import math
  12
  13from dataclasses import asdict
  14
  15from concurrent.futures import ThreadPoolExecutor, as_completed
  16from typing import Tuple
  17
  18import zstandard as zstd
  19
  20from pathlib import Path
  21
  22import msgpack
  23import json
  24import random
  25from netaddr import EUI
  26from dataclasses import dataclass, fields
  27from ipaddress import (
  28    ip_address,
  29    ip_network,
  30    IPv4Address, IPv6Address,
  31    IPv4Interface,
  32    IPv4Network, IPv6Network
  33)
  34
  35from Kathara.manager.Kathara import Kathara
  36from Kathara.model.Lab import Lab
  37
  38from . import params
  39from .common import (QuestionText, QuestionDummy, GradeElement, GradePart, InfoMachine, InfoLab, InfoInterface,
  40                     TranslatedText, _tt_hash_str)
  41from .utils import log_error, error_quit, log_debug
  42from .params import SRE
  43
  44
  45class ErrorCategory(enum.Enum):
  46    ERROR = "ERROR"
  47    WARNING = "WARNING"
  48
  49
  50def write_error(string):
  51    print(string, file=sys.stderr)
  52
  53
  54def _lookup_translations(caller_globals: dict, text: str, inline: dict) -> dict:
  55    """Merge ``_TRANSLATIONS`` from a module's globals with inline kwargs.
  56
  57    Inline kwargs take priority.  ``None`` values in ``_TRANSLATIONS`` are
  58    skipped (they mark strings not yet translated).
  59    """
  60    result = {}
  61    for lang, strings in caller_globals.get('_TRANSLATIONS', {}).items():
  62        val = strings.get(text)
  63        if val is not None and lang not in inline:
  64            result[lang] = val
  65    result.update(inline)
  66    return result
  67
  68
  69def tr(text: str, **langs) -> TranslatedText:
  70    """Build a TranslatedText with 'en' as the default language key.
  71
  72    Looks up the caller module's ``_TRANSLATIONS`` dict for any language not
  73    already supplied as a keyword argument.  Inline kwargs take priority.
  74    ``None`` values in ``_TRANSLATIONS`` are skipped (placeholder for
  75    untranslated strings).
  76
  77    ``_TRANSLATIONS`` must be defined **before** the first ``tr()`` call in
  78    the file, because module-level expressions are evaluated at import time.
  79
  80    Usage in lab files::
  81
  82        title = tr("My lab")              # translation from _TRANSLATIONS
  83        title = tr("My lab", fr="Mon TP") # inline, retro-compatible
  84
  85    For labs whose primary language is not English, use :func:`make_tr` instead.
  86    """
  87    merged = _lookup_translations(sys._getframe(1).f_globals, text, langs)
  88    return TranslatedText({'en': text, **merged})
  89
  90
  91def make_tr(default_lang: str, translations: dict | None = None):
  92    """Return a tr() function bound to *default_lang* as the first-positional language.
  93
  94    If *translations* is supplied explicitly it is used directly (no frame
  95    inspection).  Otherwise the caller module's globals are inspected for a
  96    ``_TRANSLATIONS`` dict at each call.
  97
  98    In both cases ``_TRANSLATIONS`` (or the dict passed as *translations*) must
  99    be defined **before** the first ``tr()`` call in the file, because
 100    module-level expressions are evaluated at import time.
 101
 102    Usage with explicit dict (no frame inspection)::
 103
 104        _TRANSLATIONS = {'fr': {"Mon TP": "My lab"}}
 105        tr = make_tr('fr', translations=_TRANSLATIONS)
 106        title = tr("Mon TP")   # 'en' from _TRANSLATIONS
 107
 108    Usage with frame inspection::
 109
 110        tr = make_tr('fr')
 111        _TRANSLATIONS = {'en': {"Mon TP": "My lab"}}  # must come before tr() calls
 112        title = tr("Mon TP")
 113    """
 114    if translations is not None:
 115        _wrapped = {'_TRANSLATIONS': translations}
 116
 117        def _tr(text: str, **langs) -> TranslatedText:
 118            merged = _lookup_translations(_wrapped, text, langs)
 119            return TranslatedText({default_lang: text, **merged})
 120    else:
 121        caller_globals = sys._getframe(1).f_globals
 122
 123        def _tr(text: str, **langs) -> TranslatedText:
 124            merged = _lookup_translations(caller_globals, text, langs)
 125            return TranslatedText({default_lang: text, **merged})
 126
 127    return _tr
 128
 129
 130def no_tr(text: str) -> str:
 131    """Marker for prepare-sre-translations: leave this string untranslated.
 132
 133    Identity passthrough at runtime (returns *text* unchanged). It is purely a
 134    hint to the translation tooling so the wrapped string is never wrapped in
 135    tr() nor registered in _TRANSLATIONS — use it for short internal labels
 136    (e.g. grade-element titles) that are not natural-language prose.
 137    """
 138    return text
 139
 140
 141_PORT_WILDCARD_RE = _re.compile(r'^(\d*)(X+):(\d+)(?:/(\w+))?$', _re.IGNORECASE)
 142
 143
 144def _is_port_free(port: int, proto: str) -> bool:
 145    sock_type = socket.SOCK_DGRAM if proto == 'udp' else socket.SOCK_STREAM
 146    with socket.socket(socket.AF_INET, sock_type) as s:
 147        try:
 148            s.bind(('', port))
 149            return True
 150        except OSError:
 151            return False
 152
 153
 154def _resolve_port(spec: str, used: set) -> str:
 155    """Resolve a port spec, replacing trailing X wildcards with a free host port.
 156
 157    E.g. '80XX:80/tcp' -> '8042:80/tcp' (first free port in 8000-8099).
 158    Specs without wildcards are returned unchanged.
 159    `used` is updated in-place to track ports already allocated in this call.
 160    """
 161    m = _PORT_WILDCARD_RE.match(spec)
 162    if not m:
 163        return spec
 164    prefix_str, x_str, container_port, proto = m.group(1), m.group(2), m.group(3), (m.group(4) or 'tcp').lower()
 165    range_size = 10 ** len(x_str)
 166    base = (int(prefix_str) if prefix_str else 0) * range_size
 167    for offset in range(range_size):
 168        host_port = base + offset
 169        key = (host_port, proto)
 170        if key not in used and _is_port_free(host_port, proto):
 171            used.add(key)
 172            return f"{host_port}:{container_port}/{proto}"
 173    error_quit(f"no free {proto} port in range {base}-{base + range_size - 1}")
 174
 175
 176def _resolve_xauth_cookie():
 177    """Return the validated hex X11 magic cookie from $SRE_XAUTH_COOKIE, or None.
 178
 179    The env var is set by sre-wrapper (from `xauth list`) and preserved across
 180    sudo via env_keep.  When `sre start --xauth-file <file>` is used,
 181    action_start parses that file early (with privileges) and overrides this
 182    env var with the file's cookie before NetScheme is built.
 183    """
 184    cookie = os.environ.get(params.sre_xauth_cookie_env_variable)
 185    return cookie if cookie and _re.fullmatch(r'[0-9A-Fa-f]+', cookie) else None
 186
 187
 188class _AppendOp:
 189    """A file-append operation registered via NetScheme0.append_to_file()."""
 190    __slots__ = ('filename', 'content', 'permissions', 'owner', 'mtime')
 191
 192    def __init__(self, filename: str, content: bytes,
 193                 permissions: int | None, owner: str | None, mtime: float | None):
 194        self.filename = filename
 195        self.content = content
 196        self.permissions = permissions
 197        self.owner = owner
 198        self.mtime = mtime
 199
 200
 201class _IdempotentAppendOp:
 202    """An idempotent file-append operation registered via NetScheme0.idempotent_append_to_file().
 203
 204    Appends content only if the file does not already end with it.
 205    Optionally sets permissions, ownership, and mtime regardless of whether content was appended.
 206    """
 207    __slots__ = ('filename', 'content', 'permissions', 'owner', 'mtime')
 208
 209    def __init__(self, filename: str, content: bytes,
 210                 permissions: int | None, owner: str | None, mtime: float | None):
 211        self.filename = filename
 212        self.content = content
 213        self.permissions = permissions
 214        self.owner = owner
 215        self.mtime = mtime
 216
 217
 218class _FileOp:
 219    """A file-write operation registered via NetScheme0.file()."""
 220    __slots__ = ('filename', 'content', 'permissions', 'owner', 'mtime')
 221
 222    def __init__(self, filename: str, content: bytes, permissions: int, owner: str, mtime: float):
 223        self.filename = filename
 224        self.content = content
 225        self.permissions = permissions
 226        self.owner = owner
 227        self.mtime = mtime
 228
 229
 230class _HostCmdOp:
 231    """A host-side command registered via NetScheme0.host_cmd()."""
 232    __slots__ = ('command',)
 233
 234    def __init__(self, command: str):
 235        self.command = command
 236
 237
 238class _CpFromHostOp:
 239    """A deferred file-copy-from-host operation; file is read at execution time."""
 240    __slots__ = ('src_path', 'dest', 'permissions', 'owner', 'mtime')
 241
 242    def __init__(self, src_path, dest: str, permissions, owner: str, mtime):
 243        self.src_path = src_path
 244        self.dest = dest
 245        self.permissions = permissions
 246        self.owner = owner
 247        self.mtime = mtime
 248
 249
 250class _CpToHostOp:
 251    """Copy a file from a container to the host files_dir."""
 252    __slots__ = ('src_path', 'dest_path', 'permissions')
 253
 254    def __init__(self, src_path: str, dest_path: str, permissions: int = None):
 255        self.src_path = src_path  # absolute path inside the container
 256        self.dest_path = dest_path  # resolved absolute path on the host
 257        self.permissions = permissions  # mode to chmod on the host (None = leave as-is)
 258
 259
 260class _HostCallbackOp:
 261    """A host-side Python callback registered via NetScheme0.host_callback()."""
 262    __slots__ = ('callback',)
 263
 264    def __init__(self, callback):
 265        self.callback = callback
 266
 267
 268class _IPv4InterfaceContainer:
 269    """Holds named IPv4Interface attributes. Automatically available as Data0.ips."""
 270
 271    def __setattr__(self, name, value):
 272        if not isinstance(value, IPv4Interface):
 273            raise TypeError(f"{name}: expected IPv4Interface, got {type(value).__name__}")
 274        super().__setattr__(name, value)
 275
 276    def __getitem__(self, name):
 277        return getattr(self, name)
 278
 279    def to_dict(self):
 280        return {k: str(v) for k, v in self.__dict__.items()}
 281
 282
 283class _IPv4NetContainer:
 284    """Holds named IPv4Network attributes. Automatically available as Data0.nets."""
 285
 286    def __setattr__(self, name, value):
 287        if not isinstance(value, IPv4Network):
 288            raise TypeError(f"{name}: expected IPv4Network, got {type(value).__name__}")
 289        super().__setattr__(name, value)
 290
 291    def __getitem__(self, name):
 292        return getattr(self, name)
 293
 294    def to_dict(self):
 295        return {k: str(v) for k, v in self.__dict__.items()}
 296
 297
 298class _MacContainer:
 299    """Holds named EUI MAC address attributes. Automatically available as Data0.macs."""
 300
 301    def __setattr__(self, name, value):
 302        if not isinstance(value, EUI):
 303            raise TypeError(f"{name}: expected EUI, got {type(value).__name__}")
 304        super().__setattr__(name, value)
 305
 306    def __getitem__(self, name):
 307        return getattr(self, name)
 308
 309    def to_dict(self):
 310        return {k: str(v) for k, v in self.__dict__.items()}
 311
 312
 313@dataclass
 314class Flavor0:
 315    """Base class for optional lab-parameterisation dataclasses.
 316
 317    Subclass with ``@dataclass(slots=True)`` and declare your fields.  If the
 318    module defines ``flavor_form_at_startup = True``, the GUI renders the
 319    ``flavor_form`` string (containing ``@@{field:regex}@@`` markers) as a form
 320    before starting the lab.
 321
 322    Named presets can be attached as class attributes, e.g.::
 323
 324        Flavor.easy = Flavor(nb=1)
 325
 326    and referenced on the CLI with ``--set-flavor-name easy``.
 327
 328    Override :meth:`allowed_by_user` to restrict which values students may choose.
 329    """
 330
 331    _registry = {}
 332
 333    def __init_subclass__(cls, **kwargs):
 334        super().__init_subclass__(**kwargs)
 335        key = f"{cls.__module__}.{cls.__qualname__}"
 336        cls._type_key = key
 337        Flavor0._registry[key] = cls
 338
 339    def to_dict(self):
 340        """Serialise dataclass fields to a plain dict using :meth:`Data0._encode_value`."""
 341        return {f.name: Data0._encode_value(getattr(self, f.name)) for f in fields(self)}
 342
 343    @classmethod
 344    def from_dict(cls, d):
 345        """Reconstruct a ``Flavor0`` instance from a dict produced by :meth:`to_dict`."""
 346        decoded = {k: Data0._decode_value(v) for k, v in d.items()}
 347        return cls(**decoded)  # type: ignore[call-arg]
 348
 349    @classmethod
 350    def from_form_dict(cls, d: dict):
 351        """Build a Flavor from form field values (strings from text inputs, bools from
 352        submit buttons). Declared dataclass fields are coerced to their declared type.
 353        Extra keys in d (form fields not declared in the dataclass) are set as plain
 354        attributes on the instance so that allowed_by_user() can read them.
 355        """
 356        from typing import get_type_hints
 357        hints = get_type_hints(cls)
 358        declared = {f.name for f in fields(cls)}
 359        coerced = {}
 360        for f in fields(cls):
 361            v = d.get(f.name)
 362            if v is None:
 363                continue
 364            t = hints.get(f.name, str)
 365            if not isinstance(t, type) or isinstance(v, t):
 366                coerced[f.name] = v
 367            elif t == bool:
 368                coerced[f.name] = str(v).lower() in ('true', '1', 'yes')
 369            elif t == int:
 370                coerced[f.name] = int(v)
 371            elif t == float:
 372                coerced[f.name] = float(v)
 373            else:
 374                coerced[f.name] = str(v)
 375        obj = cls(**coerced)
 376        for k, v in d.items():
 377            if k not in declared:
 378                setattr(obj, k, v)
 379        return obj
 380
 381    def allowed_by_user(self) -> Tuple[bool, str]:
 382        """Return ``(True, "")`` if the student may use this flavor; ``(False, reason)`` otherwise.
 383
 384        Override in subclasses to restrict which values students can choose.
 385        """
 386        return True, ""
 387
 388
 389class Data0:
 390    """Base class for lab-specific parameter dataclasses.
 391
 392    Subclass with ``@dataclass(slots=True)`` and declare your fields normally.
 393    Three dynamic containers are injected automatically into every instance by
 394    ``__post_init__``:
 395
 396    * ``self.ips``  — :class:`_IPv4InterfaceContainer`: named ``IPv4Interface`` values
 397    * ``self.nets`` — :class:`_IPv4NetContainer`: named ``IPv4Network`` values
 398    * ``self.macs`` — :class:`_MacContainer`: named ``EUI`` MAC-address values
 399
 400    The class-level ``_registry`` maps ``"module.ClassName"`` keys to concrete
 401    subclasses so that ``from_dict``/``unpack``/``from_json`` can reconstruct the
 402    correct type from serialised data without knowing it in advance.
 403
 404    Override ``generate(flavor=None)`` as a ``@classmethod`` to produce a fresh
 405    randomised instance.  The result is serialised to ``data.json`` at lab start
 406    and reloaded for each evaluation.
 407    """
 408
 409    _registry = {}
 410    _rng = random.Random()
 411
 412    def __init_subclass__(cls, **kwargs):
 413        super().__init_subclass__(**kwargs)
 414        key = f"{cls.__module__}.{cls.__qualname__}"
 415        cls._type_key = key
 416        Data0._registry[key] = cls
 417
 418    def __post_init__(self):
 419        """Inject ``ips``, ``nets``, ``macs``, and ``flavor`` into every instance.
 420
 421        Called automatically by the dataclass ``__init__``.  Uses
 422        ``object.__setattr__`` so the injection works even when the subclass
 423        declares ``slots=True``.
 424        """
 425        # Inject ips/nets/flavor into __dict__ (available even with slots=True subclasses
 426        # because Data0 itself has no __slots__, so __dict__ is always inherited).
 427        object.__setattr__(self, 'ips', _IPv4InterfaceContainer())
 428        object.__setattr__(self, 'nets', _IPv4NetContainer())
 429        object.__setattr__(self, 'macs', _MacContainer())
 430        object.__setattr__(self, 'flavor', None)
 431        object.__setattr__(self, '__flavor_name', None)
 432        object.__setattr__(self, '__current_srelab_file', None)
 433
 434    # ---------------- lifecycle hooks ----------------
 435    @classmethod
 436    def compute_pre_generate(cls, flavor=None):
 437        """Hook called just before generate() and after JSON/msgpack deserialization.
 438
 439        Override in a subclass to set class-level derived state from *flavor*.
 440        The default implementation is a no-op (fully retro-compatible).
 441        """
 442        pass
 443
 444    def compute_post_generate(self):
 445        """Hook called after generate() and after JSON/msgpack deserialization.
 446
 447        Override in a subclass to set instance-level derived state from data fields.
 448        The default implementation is a no-op (fully retro-compatible).
 449        """
 450        pass
 451
 452    # ---------------- dict conversion ----------------
 453    def to_dict(self):
 454        """Serialise to a plain dict (JSON-safe).
 455
 456        Dataclass fields are encoded via :meth:`_encode_value`.  ``ips``, ``nets``,
 457        and ``macs`` are stored as nested dicts of strings.  An attached ``Flavor``
 458        is stored under the ``"flavor"`` key with its type key.
 459        """
 460        result = {}
 461        for f in fields(self):
 462            v = getattr(self, f.name)
 463            result[f.name] = self._encode_value(v)
 464        result['ips'] = self.ips.to_dict()
 465        result['nets'] = self.nets.to_dict()
 466        result['macs'] = self.macs.to_dict()
 467        if self.flavor is not None:
 468            result['flavor'] = {"__flavor_type__": self.flavor._type_key, "data": self.flavor.to_dict()}
 469        flavor_name = getattr(self, '__flavor_name', None)
 470        if flavor_name is not None:
 471            result['__flavor_name'] = flavor_name
 472        current_srelab_file = getattr(self, '__current_srelab_file', None)
 473        if current_srelab_file is not None:
 474            result['__current_srelab_file'] = current_srelab_file
 475        return result
 476
 477    @classmethod
 478    def from_dict(cls, d):
 479        """Reconstruct a ``Data0`` instance from a plain dict produced by :meth:`to_dict`.
 480
 481        The concrete subclass is resolved from ``d["__type__"]`` when *cls* is
 482        ``Data0`` itself; otherwise the calling class is used directly.
 483        Raises ``ValueError`` for unknown type keys.
 484        """
 485        d = dict(d)
 486        ips_data = d.pop('ips', {})
 487        nets_data = d.pop('nets', {})
 488        macs_data = d.pop('macs', {})
 489        flavor_data = d.pop('flavor', None)
 490        flavor_name = d.pop('__flavor_name', None)
 491        current_srelab_file = d.pop('__current_srelab_file', None)
 492        decoded = {k: cls._decode_value(v) for k, v in d.items()}
 493        obj = cls(**decoded)
 494        for k, v in ips_data.items():
 495            setattr(obj.ips, k, IPv4Interface(v))
 496        for k, v in nets_data.items():
 497            setattr(obj.nets, k, IPv4Network(v))
 498        for k, v in macs_data.items():
 499            setattr(obj.macs, k, EUI(v))
 500        if flavor_data is not None:
 501            flavor_key = flavor_data.get("__flavor_type__")
 502            if flavor_key not in Flavor0._registry:
 503                raise ValueError(f"unknown Flavor0 type: {flavor_key!r}")
 504            flavor_cls = Flavor0._registry[flavor_key]
 505            object.__setattr__(obj, 'flavor', flavor_cls.from_dict(flavor_data["data"]))
 506        object.__setattr__(obj, '__flavor_name', flavor_name)
 507        object.__setattr__(obj, '__current_srelab_file', current_srelab_file)
 508        return obj
 509
 510    # ---------------- value encoding ----------------
 511    @staticmethod
 512    def _encode_value(v):
 513        """Encode a field value for JSON/msgpack storage.
 514
 515        * ``IPv4Address`` / ``IPv6Address`` → ``{"__ip__": "..."}``
 516        * ``IPv4Network`` / ``IPv6Network`` → ``{"__net__": "..."}``
 517        * nested ``Data0`` → ``{"__type__": "...", "data": {...}}``
 518        * all other values are returned unchanged.
 519        """
 520        if isinstance(v, (IPv4Address, IPv6Address)):
 521            return {"__ip__": str(v)}
 522        if isinstance(v, (IPv4Network, IPv6Network)):
 523            return {"__net__": str(v)}
 524
 525        if isinstance(v, Data0):
 526            return {
 527                "__type__": v._type_key,
 528                "data": v.to_dict()
 529            }
 530        return v
 531
 532    @staticmethod
 533    def _decode_value(v):
 534        """Decode a value produced by :meth:`_encode_value` back to its Python type."""
 535        if isinstance(v, dict):
 536            if "__ip__" in v:
 537                return ip_address(v["__ip__"])
 538            if "__net__" in v:
 539                return ip_network(v["__net__"])
 540            if "__type__" in v:
 541                type_key = v["__type__"]
 542                if type_key not in Data0._registry:
 543                    raise ValueError(f"unknown Data0 type: {type_key!r}")
 544                return Data0._registry[type_key].from_dict(v["data"])
 545        return v
 546
 547    # ---------------- msgpack ----------------
 548    def pack(self):
 549        """Serialise to a msgpack binary blob (includes type key for polymorphic reload)."""
 550        return msgpack.packb(
 551            {
 552                "__type__": self._type_key,
 553                "data": self.to_dict()
 554            },
 555            use_bin_type=True
 556        )
 557
 558    @classmethod
 559    def unpack(cls, blob):
 560        """Deserialise a msgpack blob produced by :meth:`pack`.  Raises ``ValueError`` for unknown types."""
 561        obj = msgpack.unpackb(blob, raw=False)
 562        type_key = obj.get("__type__")
 563        if type_key not in Data0._registry:
 564            raise ValueError(f"unknown Data0 type: {type_key!r}")
 565        result = Data0._registry[type_key].from_dict(obj["data"])
 566        type(result).compute_pre_generate(result.flavor)
 567        result.compute_post_generate()
 568        return result
 569
 570    # ---------------- JSON ----------------
 571    def to_json(self):
 572        """Serialise to a JSON string (includes type key for polymorphic reload)."""
 573        return json.dumps({
 574            "__type__": self._type_key,
 575            "data": self.to_dict()
 576        })
 577
 578    @classmethod
 579    def from_json(cls, s):
 580        """Deserialise a JSON string produced by :meth:`to_json`."""
 581        obj = json.loads(s)
 582        concrete = Data0._registry[obj["__type__"]]
 583        result = concrete.from_dict(obj["data"])
 584        type(result).compute_pre_generate(result.flavor)
 585        result.compute_post_generate()
 586        return result
 587
 588    @classmethod
 589    def load_from_json_file(cls, filename):
 590        """
 591        Load a Data0-derived object from a JSON file.
 592        The concrete class is resolved automatically.
 593        """
 594
 595        path = Path(filename)
 596        with path.open("r", encoding="utf-8") as f:
 597            obj = json.load(f)
 598        concrete = Data0._registry[obj["__type__"]]
 599        result = concrete.from_dict(obj["data"])
 600        type(result).compute_pre_generate(result.flavor)
 601        result.compute_post_generate()
 602        return result
 603
 604    def save_to_json_file(self, filename):
 605        """Write the instance to *filename* as JSON, mode 0o600 (lab secrets stay private)."""
 606        path = Path(filename)
 607        fd = os.open(
 608            path,
 609            os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
 610            0o600
 611        )
 612        with os.fdopen(fd, "w", encoding="utf-8") as f:
 613            json.dump(
 614                {
 615                    "__type__": self._type_key,
 616                    "data": self.to_dict()
 617                },
 618                f,
 619                indent=2  # optional but operationally useful
 620            )
 621
 622
 623def sre_state(fn=None, *, user_allowed=False, description=''):
 624    def decorator(f):
 625        f._is_sre_state = True
 626        f._sre_state_user_allowed = user_allowed
 627        f._sre_state_description = description
 628        return f
 629
 630    if fn is not None:
 631        return decorator(fn)
 632    return decorator
 633
 634
 635class NetScheme0:
 636    """Base class for lab network topology definitions.
 637
 638    Subclasses declare the topology as class-level dicts and implement state
 639    methods decorated with :func:`sre_state`.
 640
 641    Class-level attributes:
 642
 643    * ``_machine_specs`` — ``{name: {Machine kwargs}}`` — one entry per container.
 644    * ``_network_specs`` — ``{net_name: {color: ...}}`` — optional display hints.
 645    * ``_topology``      — ``{net_name: [machine, ...]}`` or ``{net_name: {machine: iface}}``
 646      — which machines connect to each network and on which interface.
 647
 648    If any of these three attributes is not defined in the subclass (neither as a class
 649    variable nor as a property), the corresponding attribute is read from ``data``
 650    (``data.topology``, ``data.machine_specs``, ``data.network_specs``).
 651    Explicit subclass definitions always take precedence.
 652
 653    Instance attributes set by ``__init__``:
 654
 655    * ``self.data`` — the ``Data0`` instance for this lab run.
 656    * ``self.informations`` — Markdown text shown in the Informations tab (set this in ``__init__``).
 657    * Named machine and network objects (e.g. ``self.router``, ``self.lan``).
 658
 659    The ``initial`` state method is always called at ``sre start``.  Additional
 660    states are applied with ``sre state <lab> <state_name>``.
 661    """
 662
 663    _machine_specs = {}
 664    _network_specs = {}  # {net_name: {'color': ...}} — optional colors for networks
 665    _topology = {}  # {net_name: [m, ...] or {m: iface, ...}} — mixed allowed
 666
 667    def _resolve_spec(self, attr: str, data_attr: str):
 668        """Return the topology/spec dict for *attr*.
 669
 670        Resolution order:
 671        1. If the concrete subclass (or any class before ``NetScheme0`` in the MRO)
 672           defines *attr* as a class variable or property, return that value.
 673        2. Otherwise look for *data_attr* on ``self.data`` (checks instance attributes
 674           first, then class-level attributes set e.g. by ``compute_pre_generate``).
 675        3. If neither source provides the attribute, return the ``NetScheme0`` default
 676           (an empty dict), which is equivalent to "no topology/specs defined".
 677        """
 678        for cls in type(self).__mro__:
 679            if cls is NetScheme0:
 680                break
 681            if attr in cls.__dict__:
 682                return getattr(self, attr)
 683        return getattr(self.data, data_attr, getattr(NetScheme0, attr))
 684
 685    def __init__(self, data, running_lab_name, lab_hash=None):
 686        """Build all ``Machine``, ``Network``, and ``NetAdapter`` objects from the class-level specs.
 687
 688        Args:
 689            data: a ``Data0`` instance containing lab-specific parameters.
 690            running_lab_name: the runtime identifier ``{ts}@@@{lab}@@@{user}``.
 691            lab_hash: optional Kathara lab hash (resolved lazily if omitted).
 692        """
 693        self.data = data
 694        self.running_lab_name = running_lab_name
 695        self.debug_project = os.path.exists(params.debug_project_marker_filename(running_lab_name))
 696        self.lab_name = params.get_lab_name_from_running_lab_name(running_lab_name)
 697        self.lab_hash = lab_hash
 698        self.current_srelab_file = params.get_current_srelab_file_from_running_lab_name(running_lab_name)
 699        self.informations = ""
 700
 701        _machine_specs = self._resolve_spec('_machine_specs', 'machine_specs')
 702        _network_specs = self._resolve_spec('_network_specs', 'network_specs')
 703        _topology = self._resolve_spec('_topology', 'topology')
 704
 705        for name, parameters in _machine_specs.items():
 706            setattr(self, name, Machine(name=name, **parameters))
 707
 708        # Create networks from _network_specs (with optional color), then remaining from _topology
 709        for name, parameters in _network_specs.items():
 710            setattr(self, name, Network(name=name, **parameters))
 711
 712        for net_name in _topology:
 713            if not hasattr(self, net_name):
 714                setattr(self, net_name, Network(name=net_name))
 715
 716        # Create NetAdapters from _topology.
 717        # Each value is either a list (auto interface) or a dict {machine: iface_spec}.
 718        # iface_spec can be: None (auto), an int, or a (int, mac) tuple.
 719        # Interface auto-assignment counts prior connections per machine across all networks.
 720        _iface_counter: dict[str, int] = {}
 721        for net_name, machines in _topology.items():
 722            net = getattr(self, net_name)
 723            if isinstance(machines, dict):
 724                items = machines.items()
 725            else:
 726                items = ((m, None) for m in machines)
 727            for mname, iface_spec in items:
 728                machine = getattr(self, mname)
 729                if isinstance(iface_spec, tuple):
 730                    iface, mac = iface_spec
 731                else:
 732                    iface, mac = iface_spec, None
 733                if iface is None:
 734                    iface = _iface_counter.get(mname, 0)
 735                _iface_counter[mname] = max(_iface_counter.get(mname, 0), iface) + 1
 736                NetAdapter(network=net, machine=machine, interface=iface, mac=mac)
 737
 738        self._ops: dict[str, list] = {}  # machine → [str | _FileOp, ...]
 739        self._host_ops: dict[int, list] = {}
 740
 741        self.net_config = None
 742
 743    def host_interfaces_from_topology(self) -> dict:
 744        """Return {machine_name: [net_name, ...]} derived from the resolved topology."""
 745        topology = self._resolve_spec('_topology', 'topology')
 746        result = {}
 747        for net_name, machines in topology.items():
 748            names = machines.keys() if isinstance(machines, dict) else machines
 749            for mname in names:
 750                result.setdefault(mname, []).append(net_name)
 751        return result
 752
 753    @sre_state(user_allowed=False)
 754    def initial(self):
 755        pass
 756
 757    @classmethod
 758    def get_state_methods(cls):
 759        """Return a sorted list of all ``@sre_state``-decorated method names in this class hierarchy."""
 760        names = set()
 761        for klass in cls.__mro__:
 762            for name, fn in klass.__dict__.items():
 763                if callable(fn) and getattr(fn, '_is_sre_state', False):
 764                    names.add(name)
 765        return sorted(names)
 766
 767    @classmethod
 768    def is_state_user_allowed(cls, state):
 769        """Return ``True`` if *state* was decorated with ``@sre_state(user_allowed=True)``."""
 770        for klass in cls.__mro__:
 771            fn = klass.__dict__.get(state)
 772            if fn is not None and getattr(fn, '_is_sre_state', False):
 773                return getattr(fn, '_sre_state_user_allowed', False)
 774        return False
 775
 776    @classmethod
 777    def get_user_allowed_states(cls):
 778        """Return ``{state_name: description}`` for all user-allowed states."""
 779        result = {}
 780        for state in cls.get_state_methods():
 781            for klass in cls.__mro__:
 782                fn = klass.__dict__.get(state)
 783                if fn is not None and getattr(fn, '_is_sre_state', False):
 784                    if getattr(fn, '_sre_state_user_allowed', False):
 785                        result[state] = getattr(fn, '_sre_state_description', '')
 786                    break
 787        return result
 788
 789    def get_data(self):
 790        return self.data
 791
 792    def get_machine(self, machine_name):
 793        """Return the :class:`Machine` with *machine_name*, or ``None`` if not found."""
 794        if not hasattr(self, machine_name):
 795            return None
 796        machine = getattr(self, machine_name)
 797        if not isinstance(machine, Machine):
 798            return None
 799        return machine
 800
 801    def get_network(self, network_name):
 802        """Return the :class:`Network` with *network_name*, or ``None`` if not found."""
 803        if not hasattr(self, network_name):
 804            return None
 805        machine = getattr(self, network_name)
 806        if not isinstance(machine, Network):
 807            return None
 808        return machine
 809
 810    def get_machines(self):
 811        for _, value in self.__dict__.items():
 812            if isinstance(value, Machine):
 813                yield value
 814
 815    def get_machine_names(self):
 816        return [m.name for m in self.get_machines()]
 817
 818    def has_privileged_machines(self):
 819        return any(m.privileged for m in self.get_machines())
 820
 821    def get_accessible_machine_names(self):
 822        return [m.name for m in self.get_machines()
 823                if not m.hidden and m.allow_connection]
 824
 825    def get_visible_machine_names(self):
 826        return [m.name for m in self.get_visibles_machines()]
 827
 828    def get_visibles_machines(self):
 829        for _, value in self.__dict__.items():
 830            if isinstance(value, Machine):
 831                if not value.hidden:
 832                    yield value
 833
 834    def get_networks(self):
 835        for _, value in self.__dict__.items():
 836            if isinstance(value, Network):
 837                yield value
 838
 839    def get_topology(self):
 840        return self._resolve_spec('_topology', 'topology')
 841
 842    def get_machine_specs(self):
 843        return self._resolve_spec('_machine_specs', 'machine_specs')
 844
 845    def get_network_specs(self):
 846        return self._resolve_spec('_network_specs', 'network_specs')
 847
 848    def cmd(self, machine, command, step=1):
 849        """Register a shell command to execute inside *machine*'s container at *step*."""
 850        if SRE.args.debug:
 851            print(f"[state] [{machine}] CMD (step={step}): {command}", file=sys.stderr)
 852        self._ops.setdefault(step, {}).setdefault(machine, []).append(command)
 853
 854    def host_cmd(self, command, step=1):
 855        """Register a shell command to execute on the **host** (not inside a container) at *step*.
 856
 857        Requires ``params.execute_commands_on_host`` to be enabled; aborts otherwise.
 858        """
 859        if params.execute_commands_on_host is False:
 860            sys.exit("host_cmd() is disabled by params.execute_commands_on_host")
 861        if SRE.args.debug:
 862            print(f"[state] HOST_CMD (step={step}): {command}", file=sys.stderr)
 863        self._host_ops.setdefault(step, []).append(_HostCmdOp(command))
 864
 865    def host_callback(self, callback, step=1):
 866        """Register a Python callable to invoke on the host at *step* (called with no arguments)."""
 867        if SRE.args.debug:
 868            print(f"[state] HOST_CALLBACK (step={step}): {getattr(callback, '__name__', repr(callback))}",
 869                  file=sys.stderr)
 870        self._host_ops.setdefault(step, []).append(_HostCallbackOp(callback))
 871
 872    def cp_from_host(self, src: str, machine: str, dest: str, owner: str = "root:root", permissions: int = None,
 873                     mtime: float = None, step=1):
 874        """Register a file copy from the host into *machine*'s container at *step*.
 875
 876        *src* may be relative (resolved against ``params.files_dir``).
 877        *dest* is the absolute path inside the container.
 878        """
 879        orig_path = Path(src)
 880        if not orig_path.is_absolute():
 881            orig_path = Path(params.files_dir(self.running_lab_name)) / orig_path
 882        if SRE.args.debug:
 883            print(f"[state] [{machine}] CP (step={step}): {orig_path} -> {dest} (owner={owner})", file=sys.stderr)
 884        self._ops.setdefault(step, {}).setdefault(machine, []).append(
 885            _CpFromHostOp(orig_path, dest, permissions, owner, mtime)
 886        )
 887
 888    def cp_to_host(self, machine: str, path: str, dest: str, permissions: int = None, step=1):
 889        """Copy file `path` from `machine` to the host files_dir.
 890
 891        Args:
 892            machine:     container name
 893            path:        absolute path of the file inside the container
 894            dest:        relative destination path inside params.files_dir(running_lab_name)
 895            permissions: file mode to apply on the host copy (default: None, leave as written)
 896            step:        execution step (default 1)
 897        """
 898        files_dir = Path(params.files_dir(self.running_lab_name)).resolve()
 899        dest_path = (files_dir / dest).resolve()
 900        if not dest_path.is_relative_to(files_dir):
 901            error_quit(f"cp_to_host: dest '{dest}' is outside files_dir '{files_dir}'")
 902        if SRE.args.debug:
 903            print(f"[state] [{machine}] CP_TO_HOST (step={step}): {path} -> {dest_path}", file=sys.stderr)
 904        self._ops.setdefault(step, {}).setdefault(machine, []).append(
 905            _CpToHostOp(path, str(dest_path), permissions)
 906        )
 907
 908    def file(self, machine, filename, content, permissions=0o644, owner="root:root", mtime=None, step=1):
 909        """Register a file to create/overwrite on `machine` during the current state.
 910
 911        Args:
 912            machine:     machine name (str)
 913            filename:    absolute path inside the container (e.g. "/etc/myconfig")
 914            content:     file content (str or bytes)
 915            permissions: octal mode (default 0o644)
 916            owner:       "user:group" string (default "root:root")
 917            mtime:       modification time as a Unix timestamp (float); defaults to now
 918            step:        execution step (default 1); higher steps run after lower ones
 919        """
 920        import time as _time
 921        raw = content.encode() if isinstance(content, str) else content
 922        if SRE.args.debug:
 923            print(
 924                f"[state] [{machine}] FILE (step={step}): {filename} (permissions={permissions:#o}, owner={owner}, size={len(raw)}B)",
 925                file=sys.stderr)
 926        self._ops.setdefault(step, {}).setdefault(machine, []).append(
 927            _FileOp(filename, raw, permissions, owner, mtime if mtime is not None else _time.time())
 928        )
 929
 930    def append_to_file(self, machine, filename, content, permissions=None, owner=None, mtime=None, step=1):
 931        """Append content to a file on `machine`; create the file if it does not exist.
 932
 933        Args:
 934            machine:     machine name (str)
 935            filename:    absolute path inside the container (e.g. "/etc/hosts")
 936            content:     content to append (str or bytes)
 937            permissions: octal mode to set after appending (default: leave unchanged)
 938            owner:       "user:group" to set after appending (default: leave unchanged)
 939            mtime:       modification time as a Unix timestamp (default: leave unchanged)
 940            step:        execution step (default 1); higher steps run after lower ones
 941        """
 942        raw = content.encode() if isinstance(content, str) else content
 943        if SRE.args.debug:
 944            print(f"[state] [{machine}] APPEND (step={step}): {filename} (size={len(raw)}B)", file=sys.stderr)
 945        self._ops.setdefault(step, {}).setdefault(machine, []).append(
 946            _AppendOp(filename, raw, permissions, owner, mtime)
 947        )
 948
 949    def idempotent_append_to_file(self, machine, filename, content, permissions=None, owner=None, mtime=None, step=1):
 950        """Append content to a file on `machine` only if the file does not already end with it.
 951
 952        Idempotent version of append_to_file: safe to call multiple times — the content
 953        is appended at most once per application.  If permissions, owner, or mtime are
 954        provided they are applied regardless of whether content was appended.
 955
 956        Args:
 957            machine:     machine name (str)
 958            filename:    absolute path inside the container (e.g. "/etc/hosts")
 959            content:     content to append (str or bytes)
 960            permissions: octal mode to set after the check (default: leave unchanged)
 961            owner:       "user:group" to set after the check (default: leave unchanged)
 962            mtime:       modification time as a Unix timestamp (default: leave unchanged)
 963            step:        execution step (default 1); higher steps run after lower ones
 964        """
 965        raw = content.encode() if isinstance(content, str) else content
 966        if SRE.args.debug:
 967            print(f"[state] [{machine}] IDEMPOTENT_APPEND (step={step}): {filename} (size={len(raw)}B)",
 968                  file=sys.stderr)
 969        self._ops.setdefault(step, {}).setdefault(machine, []).append(
 970            _IdempotentAppendOp(filename, raw, permissions, owner, mtime)
 971        )
 972
 973    def compute_state_ops(self, state):
 974        self._ops = {}
 975        self._host_ops = {}
 976        if not hasattr(self, state):
 977            error_quit(f"state method {state} does not exist")
 978        method = getattr(self, state)
 979        if not callable(method):
 980            error_quit(f"state method {state} not callable")
 981        try:
 982            method()
 983        except Exception as e:
 984            error_quit(f"error during {state} execution: {e}")
 985        return self._ops, self._host_ops
 986
 987    def get_new_lab_from_scheme(self):
 988        lab = Lab(name=self.running_lab_name)
 989        abb_lab_name = params.get_abbreviated_lab_name_from_running_lab_name(self.running_lab_name)
 990        _used_ports: set = set()
 991        xauth_cookie = _resolve_xauth_cookie()
 992        for m in self.get_machines():
 993            m.ports = [_resolve_port(p, _used_ports) for p in m.ports]
 994            m.envs[f"{params.sre_name_env_variable}={abb_lab_name}"] = True
 995            if m.x11_host:
 996                m.envs[f"{params.sre_host_ip_env_variable}={params.sre_host_ip}"] = True
 997                if xauth_cookie:
 998                    m.envs[f"{params.sre_xauth_cookie_env_variable}={xauth_cookie}"] = True
 999            volumes = []
1000            if len(m.volumes) > 0:
1001                has_private_volume = False
1002                for v in m.volumes:
1003                    if len(v) > 2 and v[3] == 'private':
1004                        has_private_volume = True
1005                        break
1006
1007                if has_private_volume and params.disable_volume_mount_on_root_partition:
1008                    private_mount_dir = self.get_private_mount_dir()
1009                    p = Path(private_mount_dir)
1010                    while not p.exists():
1011                        p = p.parent
1012                    if os.stat(p).st_dev == os.stat('/').st_dev:
1013                        error_quit(
1014                            f"Machine '{m.name}': private mount dir '{private_mount_dir}' is on the root partition (disable_volume_mount_on_root_partition=True)")
1015
1016                for v in m.volumes:
1017                    if len(v) > 2 and v[3] == 'private':
1018                        if v[0].startswith('/'):
1019                            error_quit(f"volume {v[0]} is private and have an absolute path")
1020                        v_host = f"{self.get_private_mount_dir()}/{v[0]}"
1021                        os.makedirs(v_host, exist_ok=True)
1022                        os.chmod(v_host, 0o777)
1023                    else:
1024                        if v[0].startswith('/'):
1025                            v_host = v[0]
1026                        else:
1027                            v_host = f"{self.get_user_public_dir()}/{v[0]}"
1028                            os.makedirs(v_host, exist_ok=True)
1029                            os.chmod(v_host, 0o777)
1030                    if v[2] not in ['rw', 'ro']:
1031                        error_quit(f"volume mode {v[2]} not supported (only rw and ro)")
1032
1033                    volumes.append(f"{v_host}|{v[1]}|{v[2]}")
1034            lab.new_machine(m.name,
1035                            exec_commands=m.exec_commands,
1036                            sysctls=m.sysctls,
1037                            envs=m.envs,
1038                            ports=m.ports,
1039                            ulimits=m.ulimits,
1040                            volumes=volumes,
1041                            shell=m.kathara_shell,
1042                            image=m.image,
1043                            bridged=m.bridged,
1044                            privileged=m.privileged,
1045                            entrypoint=m.entrypoint,
1046                            )
1047            for net, netAdapter in m.net_adapters.items():
1048                lab.connect_machine_to_link(m.name, net.name, machine_iface_number=netAdapter.interface,
1049                                            mac_address=str(netAdapter.mac).replace('-',
1050                                                                                    ':').lower() if netAdapter.mac is not None else None)
1051        self.lab_hash = lab.hash
1052        return lab
1053
1054    def get_lab_hash(self):
1055        if self.lab_hash is None:
1056            lab = Lab(name=self.running_lab_name)
1057            self.lab_hash = lab.hash
1058        return self.lab_hash
1059
1060    def get_lab_from_kathara(self):
1061        lab_hash = self.get_lab_hash()
1062        return Kathara.get_instance().get_lab_from_api(lab_hash=lab_hash)
1063
1064    def get_public_lab_dir(self):
1065        return params.public_lab_dir(self.running_lab_name)
1066
1067    def get_private_lab_dir(self):
1068        return params.private_lab_dir(self.running_lab_name)
1069
1070    def get_files_dir(self):
1071        return params.files_dir(self.running_lab_name)
1072
1073    def get_private_mount_dir(self):
1074        return params.private_mount_dir(self.running_lab_name)
1075
1076    def get_user_public_dir(self):
1077        try:
1078            return Path(params.link_to_user_public_dir(self.running_lab_name)).readlink()
1079        except OSError:
1080            return Path(params.link_to_user_public_dir(self.running_lab_name))
1081
1082    def get_shared_dir(self):
1083        return f"{self.get_user_public_dir()}/{params.shared_dir_name}"
1084
1085    def answers_dir(self):
1086        return params.answers_dir(self.running_lab_name)
1087
1088    def answers_file(self):
1089        return params.answers_filename(self.running_lab_name)
1090
1091
1092class NetAdapter:
1093    def __init__(self, network, machine, interface, mac=None, addresses=None):
1094        if addresses is None:
1095            addresses = []
1096        if not isinstance(machine, Machine):
1097            raise ValueError("machine must an object of class Machine")
1098        if not isinstance(network, Network):
1099            raise ValueError("network must be an instance of Network")
1100        if mac is not None:
1101            first_byte = int(str(mac).replace('-', ':').split(':')[0], 16)
1102            if first_byte & 1:
1103                raise ValueError(
1104                    f"MAC address {mac} on machine '{machine.name}': "
1105                    f"multicast bit (LSB of first byte) is set — use a unicast address "
1106                    f"(first byte must be even, e.g. replace '{hex(first_byte)}' with '{hex(first_byte & 0xfe)}')."
1107                )
1108        self.network = network
1109        self.machine = machine
1110        self.interface = interface
1111        self.mac = mac
1112        self.addresses = addresses
1113        machine.net_adapters[network] = self
1114        network.net_adapters[machine] = self
1115
1116
1117class Network:
1118    def __init__(self, name, color=None, shape=None):
1119        self.name = name
1120        self.color = color
1121        self.shape = shape
1122        self.net_adapters = {}
1123
1124    def get_machines(self):
1125        for m in self.net_adapters.keys():
1126            yield m
1127
1128
1129class Machine:
1130    def __init__(self, name, image=params.default_docker_image, bridged=False, x11_host=False, mem="",
1131                 cpus=None, ipv6=None,
1132                 exec_commands=[],
1133                 sysctls={},
1134                 envs={},
1135                 ports=None, ulimits={}, volumes=[],
1136                 shell=None,
1137                 kathara_shell=None,
1138                 privileged=None, entrypoint=None, args=[],
1139                 hidden=False,
1140                 allow_connection=True,
1141                 color=None,
1142                 shape=None):
1143        if ports is None:
1144            ports = []
1145        self.name = name
1146        self.image = image
1147        self.bridged = bridged
1148        self.x11_host = x11_host
1149        self.mem = mem
1150        self.cpus = cpus
1151        self.ipv6 = ipv6
1152        self.exec_commands = exec_commands
1153        self.sysctls = sysctls
1154        # Copy: the constructor's mutable default {} is shared across Machines;
1155        # post-construction mutations in get_new_lab_from_scheme (e.g. SRE_HOST_IP
1156        # for x11_host machines) must not leak into machines that didn't opt in.
1157        self.envs = dict(envs)
1158        self.ports = ports
1159        self.ulimits = ulimits
1160        self.volumes = volumes
1161        self.shell = shell
1162        self.kathara_shell = kathara_shell
1163        self.privileged = privileged
1164        self.entrypoint = entrypoint
1165        self.args = args
1166        self.hidden = hidden
1167        self.allow_connection = allow_connection
1168        self.shape = shape
1169        self.color = color
1170
1171        self.net_adapters = {}
1172
1173        if params.disable_volume_mount_on_root_partition:
1174            for v in self.volumes:
1175                v_host = v[0]
1176                if v_host.startswith('/'):
1177                    p = Path(v_host)
1178                    while not p.exists():
1179                        p = p.parent
1180                    if os.stat(p).st_dev == os.stat('/').st_dev:
1181                        error_quit(
1182                            f"Machine '{self.name}': volume host path '{v_host}' is on the root partition (disable_volume_mount_on_root_partition=True)")
1183
1184        if self.privileged and not params.allow_privileged_machines:
1185            error_quit(
1186                f"Machine '{self.name}': privileged mode is disabled (allow_privileged_machines=False). Can't run in privileged mode")
1187
1188        if not self.bridged and len(self.ports) > 0:
1189            error_quit("To add ports in a machine, you need to activate bridged mode")
1190
1191
1192class Grade0:
1193    """Base class for lab evaluation logic.
1194
1195    Subclasses override :meth:`grade` to *register* tests and questions.  The
1196    actual execution happens later in :meth:`run_tests`, which drives a
1197    multi-step loop:
1198
1199    1. Call :meth:`grade` to register commands (they return placeholder values).
1200    2. Execute all registered commands in containers (via :mod:`exetests`).
1201    3. Repeat for each additional step (``self.max_step``).
1202    4. Call :meth:`grade` a final time — commands now return real results.
1203    5. Save the archive.
1204
1205    Key instance attributes available inside ``grade()``:
1206
1207    * ``self.data`` — the ``Data0`` instance (via ``net_scheme``).
1208    * ``self.step`` — current step number (1-based).
1209    * ``self.max_step`` — highest step number registered so far.
1210    """
1211
1212    def __init__(self, net_scheme):
1213        self.net_scheme = net_scheme
1214        self.archive_dirs = []
1215        self.files_to_save_in_archives = []
1216        self._tests = None
1217        self._allow_errors_in_tests = None
1218        self.step = 0
1219        self.max_step = 1
1220        self._questions = None
1221        self._questions_order = None
1222        self._questions_current_order = None
1223        self._cheat_answers = {}
1224        self._answers = {}
1225        self._grade_list = []
1226        self._grades = {}
1227        self._grade_parts: list[GradePart] = []
1228        self._errors = []
1229        self._total_grade_self_eval = 0
1230        self._total_max_self_eval = 0
1231        self._total_grade_exo_eval = 0
1232        self._total_max_exo_eval = 0
1233        self._maximum_mark = params.default_maximum_mark
1234        self._use_numerical_marks = params.use_numerical_marks_by_default
1235        self._display_marks_in_auto_evaluations = params.display_marks_in_auto_evaluations_by_default
1236        self._mark_self_eval = None
1237        self._mark_exo_eval = None
1238        self._section_counter = []
1239        self.section_fmt = [('R', 1), ('N', 1), ('l', 2), ('N', 3)]
1240        self._eval_date = None
1241        self._re_eval_date = None
1242        self._default_language = 'en'
1243        self.auto_eval_count = 0
1244        self._exam_json = None
1245        self.full_reset()
1246
1247    def full_reset(self):
1248        """Reset all state including tests and answers (called from ``__init__``)."""
1249        self.step = 0
1250        self.max_step = 1
1251        self._tests = {}  # keys : (machine,step)->(cmd, timeout)->(result, code))
1252        self._allow_errors_in_tests = {}
1253        self._host_tests = {}  # step -> {(command, timeout): (result, code)}
1254        self._allow_errors_in_host_tests = {}  # (step, command, timeout) -> True
1255        self._answers = {}  # hash -> answer
1256        self._eval_date = None
1257        self.reset_before_grade()
1258
1259    def reset_before_grade(self):
1260        """Clear questions, grade list, and section counters before each call to :meth:`grade`."""
1261        self._questions = dict()  # hash->Question
1262        self._questions_order = dict()  # order -> [Question1, Question2, ...]
1263        self._questions_current_order = 100
1264        self._cheat_answers = {}  # state->(question_hash->answer)
1265        self._grade_list = []
1266        self._grades = dict()  # title -> GradeElement
1267        self._grade_parts = []
1268        self._section_counter = []
1269
1270    def grade(self):
1271        """Override to register tests, questions, and grade elements.
1272
1273        Called multiple times during :meth:`run_tests` — once per step plus a
1274        final time when all results are available.  Do **not** produce side-effects
1275        here; only call ``self.test()``, ``self.question_*()`` and
1276        ``self.add_grade_element()`` / ``self.set_grade()``.
1277        """
1278        self._grade_list = []
1279        self._grade_parts = []
1280
1281    def _compute_mark(self, total_grade, total_max):
1282        """Compute a mark from a ``(total_grade, total_max)`` pair.
1283
1284        Returns ``None`` if ``total_max == 0``.
1285        Numerical mode (default): rounded to one decimal, scaled to ``_maximum_mark``.
1286        Letter mode: A+/A/B/C/D/F.
1287        """
1288        if total_max == 0:
1289            return None
1290        if self._use_numerical_marks:
1291            return math.ceil(10 * self._maximum_mark * total_grade / total_max) / 10
1292        else:
1293            ratio = total_grade / total_max
1294            if ratio >= 18 / 20:
1295                return "A+"
1296            elif ratio >= 16 / 20:
1297                return "A"
1298            elif ratio >= 14 / 20:
1299                return "B"
1300            elif ratio >= 12 / 20:
1301                return "C"
1302            elif ratio >= 10 / 20:
1303                return "D"
1304            else:
1305                return "F"
1306
1307    def mark_self_eval(self):
1308        """Final mark over elements visible in self-eval (scope & SELF_EVAL_SCOPE)."""
1309        return self._compute_mark(self._total_grade_self_eval, self._total_max_self_eval)
1310
1311    def mark_exo_eval(self):
1312        """Final mark over elements visible in non-auto eval / outline / sheet."""
1313        return self._compute_mark(self._total_grade_exo_eval, self._total_max_exo_eval)
1314
1315    def get_data(self):
1316        return self.net_scheme.get_data()
1317
1318    def get_errors(self):
1319        return self._errors
1320
1321    def get_grade_list(self):
1322        return self._grade_list
1323
1324    def get_grade_parts(self):
1325        return self._grade_parts
1326
1327    def get_answers(self):
1328        return self._answers
1329
1330    def get_cheat_answers(self, state: str):
1331        return self._cheat_answers.get(state)
1332
1333    def get_tests(self):
1334        return self._tests
1335
1336    def get_exetests_strings(self, step: int):
1337        result = dict()
1338        for (machine, step1) in self._tests.keys():
1339            if step != step1:
1340                continue
1341            result[machine] = params.exetests_separator.join(
1342                [f"{timeout}:{cmd}" for (cmd, timeout) in self._tests[(machine, step)].keys()])
1343        return result
1344
1345    def get_running_lab_name(self):
1346        return self.net_scheme.running_lab_name
1347
1348    def increment_section_counter(self, level: int):
1349        if len(self._section_counter) < level + 1:
1350            self._section_counter += [0 for i in range(level + 1 - len(self._section_counter))]
1351        else:
1352            self._section_counter = self._section_counter[:(level + 1)]
1353        self._section_counter[level] += 1
1354
1355    def set_section_counter(self, level: int, value: int):
1356        if len(self._section_counter) < level + 1:
1357            self._section_counter += [0 for i in range(level + 1 - len(self._section_counter))]
1358        else:
1359            self._section_counter = self._section_counter[:(level + 1)]
1360        self._section_counter[level] = value
1361
1362    def section(self, level: int = 0, fmt=None, show: int = None, pad: str = None):
1363        self.increment_section_counter(level)
1364        return self.current_section(level, fmt, show, pad)
1365
1366    def current_section(self, level: int = 0, fmt=None, show: int = None, pad: str = None):
1367        # fmt is a list of (type, depth[, prefix]) tuples, one per level.
1368        # type: R=ROMAN, r=roman, L=Letters, l=letters, N=number
1369        # depth: how many consecutive counters to display, ending at this level
1370        # prefix: optional string prepended to the result (default: '')
1371        # e.g. [('R',1),('N',2,' ')] → level 0: "I." / level 1: " I.1."
1372        if fmt is None:
1373            fmt = self.section_fmt
1374
1375        def _to_roman(n: int) -> str:
1376            vals = [
1377                (1000, 'M'), (900, 'CM'), (500, 'D'), (400, 'CD'),
1378                (100, 'C'), (90, 'XC'), (50, 'L'), (40, 'XL'),
1379                (10, 'X'), (9, 'IX'), (5, 'V'), (4, 'IV'), (1, 'I'),
1380            ]
1381            result = ''
1382            for value, numeral in vals:
1383                while n >= value:
1384                    result += numeral
1385                    n -= value
1386            return result
1387
1388        def _to_capital_letter(n: int) -> str:
1389            result = ''
1390            while n > 0:
1391                n, r = divmod(n - 1, 26)
1392                result = chr(ord('A') + r) + result
1393            return result
1394
1395        def _to_lowercase_letter(n: int) -> str:
1396            result = ''
1397            while n > 0:
1398                n, r = divmod(n - 1, 26)
1399                result = chr(ord('a') + r) + result
1400            return result
1401
1402        def _convert(n: int, fmt_type: str) -> str:
1403            match fmt_type:
1404                case 'R':
1405                    return _to_roman(n)
1406                case 'r':
1407                    return _to_roman(n).lower()
1408                case 'L':
1409                    return _to_capital_letter(n)
1410                case 'l':
1411                    return _to_lowercase_letter(n)
1412                case _:
1413                    return str(n)
1414
1415        entry = fmt[level] if level < len(fmt) else ('N', level + 1)
1416        _, depth = entry[:2]
1417        prefix = pad if pad is not None else (entry[2] if len(entry) > 2 else '')
1418        if show is not None:
1419            depth = show
1420        start = max(0, level - depth + 1)
1421        parts = [_convert(self._section_counter[j] if j < len(self._section_counter) else 0,
1422                          fmt[j][0] if j < len(fmt) else 'N')
1423                 for j in range(start, level + 1)]
1424        return prefix + ".".join(parts) + ". "
1425
1426    def load_answers(self):
1427        """Load student answers from ``answers.json`` into ``self._answers``."""
1428        self._answers = {}
1429        try:
1430            fd = os.open(params.answers_filename(running_lab_name=self.get_running_lab_name()),
1431                         os.O_RDONLY | os.O_NOFOLLOW)
1432            with os.fdopen(fd) as f:
1433                self._answers = json.load(f)
1434        except (OSError, json.JSONDecodeError):
1435            self._answers = {}
1436
1437    def get_questions_ordered(self):
1438        q = []
1439        for _, v in sorted(self._questions_order.items()):
1440            q += v
1441        return q
1442
1443    def export_questions(self):
1444        pass
1445
1446    def test_host(self, command, step=1, timeout: int = params.default_timeout, default_value='',
1447                  default_code: int = 0, allow_error: bool = False):
1448        """Register and retrieve the result of a host-side test command.
1449
1450        On the first call (registration pass) returns *default_value* / *default_code*.
1451        On subsequent calls (result pass) returns the actual ``(stdout, exit_code)``.
1452        Set *allow_error* to suppress error recording on non-zero exit.
1453        """
1454        if params.execute_commands_on_host is False:
1455            sys.exit("test_host() is disabled by params.execute_commands_on_host")
1456        if self.max_step < step:
1457            self.max_step = step
1458        if step not in self._host_tests:
1459            self._host_tests[step] = {}
1460        if allow_error and (step, command, timeout) not in self._allow_errors_in_host_tests:
1461            self._allow_errors_in_host_tests[(step, command, timeout)] = True
1462        if (command, timeout) not in self._host_tests[step]:
1463            self._host_tests[step][(command, timeout)] = (default_value, default_code)
1464            return default_value, default_code
1465        return self._host_tests[step][(command, timeout)]
1466
1467    def test(self, machine_name, command, step=1, timeout: int = params.default_timeout, default_value='',
1468             default_code: int = 0, allow_error: bool = False):
1469        """Register and retrieve the result of a command executed inside *machine_name*.
1470
1471        On the first call (registration pass) returns *default_value* / *default_code*.
1472        On subsequent calls (result pass) returns the actual ``(stdout, exit_code)``.
1473        *timeout* is in seconds.  Set *allow_error* to suppress error recording.
1474        """
1475        if self.max_step < step:
1476            self.max_step = step
1477        if (machine_name, step) not in self._tests:
1478            self._tests[(machine_name, step)] = {}
1479        if allow_error and (machine_name, step, command, timeout) not in self._allow_errors_in_tests:
1480            self._allow_errors_in_tests[(machine_name, step, command, timeout)] = True
1481        if (command, timeout) not in self._tests[(machine_name, step)]:
1482            self._tests[(machine_name, step)][(command, timeout)] = (default_value, default_code)
1483            return default_value, default_code
1484        return self._tests[(machine_name, step)][(command, timeout)]
1485
1486    @staticmethod
1487    def _apply_section(section: str, title) -> TranslatedText:
1488        """Prepend *section* to every language value in *title*."""
1489        tt = TranslatedText.from_value(title)
1490        if not section:
1491            return tt
1492        return TranslatedText({lang: section + text for lang, text in tt.items()})
1493
1494    def question_text(self, title, section='', description='', hash=None, order=None, default_answer='',
1495                      cheat_answers=None):
1496        """Register a free-text question and return the student's current answer string.
1497
1498        Returns *default_answer* if the student has not answered yet.
1499        *cheat_answers* maps state names to answer strings used in automated testing.
1500        """
1501        title = self._apply_section(section, title)
1502        if order is None:
1503            order1 = self._questions_current_order
1504            self._questions_current_order = ((self._questions_current_order // 100) + 1) * 100
1505        else:
1506            order1 = order
1507
1508        q = QuestionText(title, description, hash, order1)
1509        if order1 not in self._questions_order:
1510            self._questions_order[order1] = []
1511        self._questions_order[order1].append(q)
1512
1513        if q.question_hash in self._questions:
1514            write_error(f"Duplicate question hash {q.question_hash}")
1515        self._questions[q.question_hash] = q
1516
1517        if cheat_answers is not None:
1518            for state, answer in cheat_answers.items():
1519                if state not in self._cheat_answers:
1520                    self._cheat_answers[state] = {}
1521                self._cheat_answers[state][q.question_hash] = answer
1522
1523        if q.question_hash in self._answers:
1524            return self._answers[q.question_hash]
1525        return default_answer
1526
1527    _FORM_FIELD_RE = _re.compile(r'@@\{([^:}]+):([^}]*)\}@@')
1528
1529    def question_form(self, title, section='', description='', hash=None, order=None, cheat_answers=None):
1530        """Register a form question with inline @@{field_name:regex}@@ fields.
1531
1532        Returns the student's answers as a dict {field_name: value}, or {} if none yet.
1533        cheat_answers format: {state_name: {field_name: value, ...}}
1534        """
1535        title = self._apply_section(section, title)
1536        from .common import QuestionForm
1537
1538        # The @@{field:regex}@@ markers are language-independent, so when the
1539        # description is a TranslatedText (wrapped in tr()) extract fields from
1540        # its resolved string. The full TranslatedText is still stored on the
1541        # question for per-language rendering.
1542        desc_str = (description.resolve('')
1543                    if isinstance(description, TranslatedText) else description)
1544
1545        fields = []
1546        for m in self._FORM_FIELD_RE.finditer(desc_str):
1547            name, spec = m.group(1), m.group(2)
1548            if spec.startswith('>') or '>>>' in spec:
1549                raw = spec[1:] if spec.startswith('>') else spec
1550                fields.append({"name": name, "choices": [
1551                    c.strip().split('>>>')[1].strip() if '>>>' in c.strip() else c.strip()
1552                    for c in raw.split('|')
1553                ]})
1554            elif spec.startswith('?'):
1555                default = spec[1:].strip().lower() not in ('', 'false')
1556                fields.append({"name": name, "checkbox": default})
1557            else:
1558                fields.append({"name": name, "regex": spec})
1559
1560        if order is None:
1561            order1 = self._questions_current_order
1562            self._questions_current_order = ((self._questions_current_order // 100) + 1) * 100
1563        else:
1564            order1 = order
1565
1566        q = QuestionForm(title, description, hash, order1, fields)
1567        if order1 not in self._questions_order:
1568            self._questions_order[order1] = []
1569        self._questions_order[order1].append(q)
1570
1571        if q.question_hash in self._questions:
1572            write_error(f"Duplicate question hash {q.question_hash}")
1573        self._questions[q.question_hash] = q
1574
1575        if cheat_answers is not None:
1576            for state, field_answers in cheat_answers.items():
1577                if state not in self._cheat_answers:
1578                    self._cheat_answers[state] = {}
1579                self._cheat_answers[state][q.question_hash] = json.dumps(
1580                    field_answers, ensure_ascii=False
1581                )
1582
1583        if q.question_hash in self._answers:
1584            try:
1585                return json.loads(self._answers[q.question_hash])
1586            except (json.JSONDecodeError, TypeError):
1587                return {}
1588        return {}
1589
1590    def question_dummy(self, title, section='', description='', hash=None, order=None):
1591        """Register a display-only block (no answer widget shown to the student)."""
1592        title = self._apply_section(section, title)
1593        if order is None:
1594            order1 = self._questions_current_order
1595            self._questions_current_order = ((self._questions_current_order // 100) + 1) * 100
1596        else:
1597            order1 = order
1598
1599        q = QuestionDummy(title, description, hash, order1)
1600        if order1 not in self._questions_order:
1601            self._questions_order[order1] = []
1602        self._questions_order[order1].append(q)
1603
1604        if q.question_hash in self._questions:
1605            write_error(f"Duplicate question hash {q.question_hash}")
1606        self._questions[q.question_hash] = q
1607
1608    def add_grade_part(self, title, description=''):
1609        """Register a new :class:`GradePart` group and return it.
1610
1611        Pass the returned object to ``add_grade_element(..., grade_part=...)``
1612        to associate elements with this part.  Parts are displayed in
1613        registration order (with a subtotal row per part) in the GUI
1614        evaluation view and in ``sre outline`` PDFs.
1615        """
1616        if description:
1617            description = TranslatedText.from_value(description, self._default_language)
1618        gp = GradePart(title=title, description=description)
1619        if any(p.title == title for p in self._grade_parts):
1620            write_error(f"Duplicate grade part title = {title}")
1621        self._grade_parts.append(gp)
1622        return gp
1623
1624    def add_grade_element(self, title, max_grade, description='', grade=0, scope=params.BOTH_EVAL_SCOPE,
1625                          grade_part=None):
1626        """Add a graded rubric item.  Initial *grade* defaults to 0; use :meth:`set_grade` to update it.
1627
1628        ``scope`` is a bitmask: ``SELF_EVAL_SCOPE`` (1) for self-eval only,
1629        ``EXO_EVAL_SCOPE`` (2) for non-auto eval / outline / sheet only,
1630        ``BOTH_EVAL_SCOPE`` (3, default) for both audiences.
1631
1632        ``grade_part`` optionally associates this element with a
1633        :class:`GradePart` previously returned by :meth:`add_grade_part`.
1634        """
1635        if scope not in params.grade_scopes:
1636            raise ValueError(f"Invalid scope {scope!r}; expected one of {params.grade_scopes}")
1637        if description:
1638            description = TranslatedText.from_value(description, self._default_language)
1639        grade_part_title = None
1640        if grade_part is not None:
1641            if not isinstance(grade_part, GradePart):
1642                raise TypeError(f"grade_part must be a GradePart, got {type(grade_part).__name__}")
1643            if grade_part not in self._grade_parts:
1644                write_error(f"Unregistered grade part {grade_part.title!r} passed to add_grade_element")
1645            grade_part_title = grade_part.title
1646        g = GradeElement(title=title, max_grade=max_grade, description=description, grade=grade, scope=scope,
1647                         grade_part=grade_part_title)
1648        self._grade_list.append(g)
1649        key = _tt_hash_str(title)
1650        if key in self._grades:
1651            write_error(f"Duplicate grade title = {title}")
1652        self._grades[key] = g
1653
1654    def set_grade(self, title, grade):
1655        """Set the numeric *grade* for the element previously registered under *title*."""
1656        self._grades[_tt_hash_str(title)].grade = grade
1657
1658    def save_lab_info(self):
1659        debug_project = os.path.exists(
1660            params.debug_project_marker_filename(self.net_scheme.running_lab_name)
1661        )
1662        if debug_project:
1663            visible_machines = list(self.net_scheme.get_machines())
1664        else:
1665            visible_machines = [m for m in self.net_scheme.get_machines() if not m.hidden]
1666        lab_hash = self.net_scheme.get_lab_hash()
1667
1668        def _get_stats(m):
1669            s = next(Kathara.get_instance().get_machine_stats(lab_hash=lab_hash, machine_name=m.name), None)
1670            return m, s
1671
1672        stats_map = {}
1673        with ThreadPoolExecutor(max_workers=min(params.max_docker_concurrency, len(visible_machines))) as executor:
1674            for m, s in executor.map(_get_stats, visible_machines):
1675                stats_map[m.name] = (m, s)
1676
1677        info_machines = []
1678        for m in visible_machines:
1679            m, s = stats_map[m.name]
1680            interfaces = []
1681            for net, netAdapter in m.net_adapters.items():
1682                interfaces.append(InfoInterface(network=net.name, interface_name=f"eth{netAdapter.interface}"))
1683            info_machines.append(
1684                InfoMachine(name=m.name, status=s.status if s is not None else "", hidden=m.hidden,
1685                            bridged=m.bridged,
1686                            x11_host=m.x11_host,
1687                            ports=m.ports,
1688                            allow_connection=m.allow_connection,
1689                            color=m.color or "",
1690                            shape=m.shape or "",
1691                            interfaces=interfaces))
1692
1693        network_colors = {
1694            net.name: net.color
1695            for m in visible_machines
1696            for net in m.net_adapters
1697            if net.color
1698        }
1699        network_shapes = {
1700            net.name: net.shape
1701            for m in visible_machines
1702            for net in m.net_adapters
1703            if net.shape
1704        }
1705
1706        module_rvlab = sys.modules[params.srelab_py_name.removesuffix(".py")]
1707        default_language = getattr(module_rvlab, 'default_language', 'en')
1708        self._default_language = default_language
1709        show_nat_network = getattr(module_rvlab, 'show_nat_network', params.default_show_nat_network)
1710        nat_network_name = getattr(module_rvlab, 'host_network_name', params.default_host_network_name)
1711        nat_network_color = getattr(module_rvlab, 'host_network_color', params.default_host_network_color)
1712        host_network_exploded = getattr(module_rvlab, 'host_network_exploded',
1713                                        params.default_host_network_exploded)
1714        host_network_edge_relative_length = float(getattr(
1715            module_rvlab, 'host_network_edge_relative_length',
1716            params.default_host_network_edge_relative_length))
1717        schema_splines = getattr(module_rvlab, 'schema_splines', params.graphviz_default_splines)
1718        schema_overlap = getattr(module_rvlab, 'schema_overlap', params.graphviz_default_overlap)
1719
1720        if self.step == 0:
1721            self.grade()
1722            self.step += 1
1723
1724        questions = self.get_questions_ordered()
1725
1726        if hasattr(module_rvlab, 'title'):
1727            title = module_rvlab.title
1728        else:
1729            lab_name = params.get_lab_name_from_running_lab_name(self.get_running_lab_name())
1730            title = lab_name.removesuffix('.py')
1731        title = TranslatedText.from_value(title, default_language)
1732
1733        if hasattr(module_rvlab, 'delay_between_self_grade'):
1734            delay_between_self_grade = module_rvlab.delay_between_self_grade
1735        else:
1736            delay_between_self_grade = 0
1737
1738        if hasattr(module_rvlab, 'allow_self_grade'):
1739            allow_self_grade = module_rvlab.allow_self_grade
1740        else:
1741            allow_self_grade = False
1742
1743        if debug_project:
1744            delay_between_self_grade = 0
1745            allow_self_grade = True
1746        if hasattr(module_rvlab, 'export_kathara_project'):
1747            export_kathara_project = module_rvlab.export_kathara_project
1748        else:
1749            export_kathara_project = False
1750
1751        eval_interval_without_exam_mode = getattr(module_rvlab, 'eval_interval_without_exam_mode', params.default_eval_interval_without_exam_mode)
1752        eval_before_exit = getattr(module_rvlab, 'eval_before_exit', False)
1753
1754        informations = TranslatedText.from_value(self.net_scheme.informations, default_language)
1755        for q in questions:
1756            q.title = TranslatedText.from_value(q.title, default_language)
1757            q.description = TranslatedText.from_value(q.description, default_language)
1758
1759        net_scheme_cls = type(self.net_scheme)
1760        if debug_project:
1761            user_allowed_states_raw = {}
1762            for state in net_scheme_cls.get_state_methods():
1763                desc = ''
1764                for klass in net_scheme_cls.__mro__:
1765                    fn = klass.__dict__.get(state)
1766                    if fn is not None and getattr(fn, '_is_sre_state', False):
1767                        desc = getattr(fn, '_sre_state_description', '')
1768                        break
1769                user_allowed_states_raw[state] = desc
1770        else:
1771            user_allowed_states_raw = (
1772                net_scheme_cls.get_user_allowed_states()
1773                if getattr(module_rvlab, 'allow_user_states', False) else {}
1774            )
1775        user_allowed_states = {
1776            state: TranslatedText.from_value(desc, default_language) if desc else desc
1777            for state, desc in user_allowed_states_raw.items()
1778        }
1779
1780        if debug_project:
1781            module_allows_user_states = getattr(module_rvlab, 'allow_user_states', False)
1782            admin_only_states = [
1783                state for state in user_allowed_states_raw
1784                if not module_allows_user_states
1785                or not net_scheme_cls.is_state_user_allowed(state)
1786            ]
1787        else:
1788            admin_only_states = []
1789
1790        info = InfoLab(lab_name=self.net_scheme.lab_name, lab_hash=self.net_scheme.lab_hash,
1791                       title=title, machines=info_machines, delay_between_self_grade=delay_between_self_grade,
1792                       questions=questions, informations=informations,
1793                       export_kathara_project=export_kathara_project, allow_self_grade=allow_self_grade,
1794                       debug_project=debug_project,
1795                       eval_interval_without_exam_mode=eval_interval_without_exam_mode,
1796                       eval_before_exit=eval_before_exit,
1797                       default_language=default_language,
1798                       user_allowed_states=user_allowed_states,
1799                       admin_only_states=admin_only_states,
1800                       network_colors=network_colors,
1801                       network_shapes=network_shapes,
1802                       show_nat_network=show_nat_network,
1803                       nat_network_name=nat_network_name,
1804                       nat_network_color=nat_network_color,
1805                       host_network_exploded=host_network_exploded,
1806                       host_network_edge_relative_length=host_network_edge_relative_length,
1807                       schema_splines=schema_splines,
1808                       schema_overlap=schema_overlap)
1809
1810        info_json = info.to_json()
1811        info_filename = params.info_filename(self.net_scheme.running_lab_name)
1812
1813        save_info_json = None
1814        try:
1815            with open(info_filename, "r") as f:
1816                save_info_json = f.read()
1817        except FileNotFoundError:
1818            pass
1819        if save_info_json == info_json:
1820            return
1821
1822        temp_file = tempfile.NamedTemporaryFile(
1823            delete=False,
1824            dir=Path(self.net_scheme.get_public_lab_dir()).parent)
1825        with open(temp_file.name, "w") as f:
1826            print(info_json, file=f)
1827            f.flush()
1828            os.fsync(f.fileno())
1829            os.chmod(temp_file.name, 0o644)
1830        os.replace(temp_file.name, info_filename)
1831
1832    def add_error(self, error, category=ErrorCategory.ERROR, step: int = 1):
1833        if self.step != step:
1834            return
1835        log_error(error)
1836        self._errors.append((category.value, error))
1837
1838    def add_warning(self, warning, step: int = 1):
1839        self.add_error(warning, category=ErrorCategory.WARNING, step=step)
1840
1841    @staticmethod
1842    def run_tests_on_machine(machine_name, machine, exetests):
1843        environment = {params.exetests_env_name: exetests}
1844        code, output = machine.api_object.exec_run([params.exetests_machines_path],
1845                                                   stdin=False,
1846                                                   stdout=True,
1847                                                   stderr=False,
1848                                                   tty=False,
1849                                                   environment=environment,
1850                                                   )
1851        return machine_name, code, output
1852
1853    def run_tests(self):
1854        self._tests = {}
1855        self._section_counter = []
1856        lab = self.net_scheme.get_lab_from_kathara()
1857        self._errors = []
1858        self.load_answers()
1859        self._eval_date = datetime.datetime.now().isoformat()
1860
1861        while self.step <= self.max_step:
1862            self.reset_before_grade()
1863            self.grade()
1864            self.step += 1
1865            tests = self.get_tests()
1866            exetests_by_machine = self.get_exetests_strings(self.step)
1867            if SRE.args.debug and self.step <= self.max_step:
1868                log_debug(f"Commands step {self.step} (after running grade on step {self.step - 1}):")
1869                for machine, cmds in exetests_by_machine.items():
1870                    c = cmds.split(params.exetests_separator)
1871                    c1 = " - ".join(c)
1872                    log_debug(f"{machine}: {c1}")
1873
1874            results = {}
1875            with ThreadPoolExecutor(
1876                    max_workers=max(1, min(params.max_docker_concurrency, len(lab.machines)))) as executor:
1877                futures_to_machines = {
1878                    executor.submit(
1879                        Grade0.run_tests_on_machine,
1880                        machine_name,
1881                        machine,
1882                        exetests_by_machine[machine_name],
1883                    ): machine_name
1884                    for machine_name, machine in lab.machines.items()
1885                    if machine_name in exetests_by_machine and len(exetests_by_machine[machine_name]) > 0
1886                }
1887                for future in as_completed(futures_to_machines):
1888                    machine_name = futures_to_machines[future]
1889                    try:
1890                        machine_name, code, output = future.result()
1891                        results[machine_name] = (code, output)
1892                    except Exception as e:
1893                        self.add_error(f"error during test execution on machine {machine_name}: {e}", step=self.step)
1894                        continue
1895
1896            for machine_name, (exetests_code, output) in results.items():
1897                if (machine_name, self.step) not in self._tests:
1898                    self._tests[(machine_name, self.step)] = {}
1899                if exetests_code != 0:
1900                    self.add_error(
1901                        f"exetests error on {machine_name}: {exetests_by_machine[machine_name]} -- return code {exetests_code}",
1902                        step=self.step)
1903                output1 = output.decode("utf-8")
1904                separator, _, rest = output1.partition("\n")
1905                output2 = rest.split(f"\n{separator}\n")
1906                for i in range(0, len(output2), 2):
1907                    ligne1 = ""
1908                    try:
1909                        ligne1, date1, result = output2[i].split("\n", 2)
1910                    except ValueError:
1911                        result = ""
1912                    if not ligne1:
1913                        continue
1914                    timeout_s, cmd = ligne1.split(":", 1)
1915                    timeout = int(timeout_s)
1916                    date2, code_s = output2[i + 1].split("\n", 1)
1917                    try:
1918                        code = int(code_s.strip())
1919                    except ValueError:
1920                        code = -2
1921                        self.add_error(f"test error on {machine_name}:{self.step}:{cmd} illegal error code",
1922                                       step=self.step)
1923                    if code != 0:
1924                        if not self._allow_errors_in_tests.get((machine_name, self.step, cmd, timeout), False):
1925                            self.add_error(f"test error on {machine_name}:{cmd} code={code}", step=self.step)
1926                    self._tests[(machine_name, self.step)][cmd, timeout] = (result, code)
1927            if SRE.args.debug:
1928                for machine_name in lab.machines:
1929                    if (machine_name, self.step) not in self._tests:
1930                        continue
1931                    for (cmd, timeout) in self._tests[(machine_name, self.step)]:
1932                        log_debug(f"machine {machine_name} - step {self.step} - command {cmd} - timeout {timeout}:")
1933                        result, code = self._tests[(machine_name, self.step)][cmd, timeout]
1934                        log_debug(result)
1935                        log_debug(f"-------- exit code {code}\n")
1936
1937            host_step_cmds = self._host_tests.get(self.step, {})
1938            if host_step_cmds:
1939                def _run_host_cmd(cmd, t):
1940                    from .utils_privileges import preexec_drop_to_sre
1941                    run_cmd = shlex.split(cmd) if params.execute_commands_on_host == "split" else cmd
1942                    use_shell = params.execute_commands_on_host == "shell"
1943                    try:
1944                        proc = subprocess.run(
1945                            run_cmd, shell=use_shell, capture_output=True, text=True,
1946                            timeout=t if t > 0 else None,
1947                            preexec_fn=preexec_drop_to_sre,
1948                        )
1949                        return cmd, t, proc.stdout, proc.returncode
1950                    except subprocess.TimeoutExpired:
1951                        return cmd, t, '', -1
1952                    except Exception:
1953                        return cmd, t, '', -2
1954
1955                with ThreadPoolExecutor(max_workers=min(params.max_docker_concurrency,
1956                                                        len(host_step_cmds))) as executor:
1957                    futures = {executor.submit(_run_host_cmd, cmd, t): (cmd, t)
1958                               for (cmd, t) in host_step_cmds}
1959                    for future in as_completed(futures):
1960                        cmd, t, result, code = future.result()
1961                        if code != 0:
1962                            if not self._allow_errors_in_host_tests.get((self.step, cmd, t), False):
1963                                self.add_error(f"host test error: {cmd} code={code}", step=self.step)
1964                        self._host_tests[self.step][(cmd, t)] = (result, code)
1965                        if SRE.args.debug:
1966                            log_debug(f"host - step {self.step} - command {cmd} - timeout {t}:")
1967                            log_debug(result)
1968                            log_debug(f"-------- exit code {code}\n")
1969
1970        if SRE.args.debug and len(self._errors) > 0:
1971            log_debug(f"{len(self._errors)} errors:\n " + "\n".join(f"[{e[0]}] {e[1]}" for e in self._errors))
1972        self.compute_total()
1973        self._mark_self_eval = self.mark_self_eval()
1974        self._mark_exo_eval = self.mark_exo_eval()
1975        # log_error(f"DEBUG _tests at save: {len(self._tests)} machine-step entries")
1976        # for (machine, step), cmds in self._tests.items():
1977        #     for (cmd, timeout), (result, code) in cmds.items():
1978        #         log_error(f"  [{machine}][step={step}] cmd={cmd!r} result_len={len(result)} code={code}")
1979
1980    def compute_total(self):
1981        """Accumulate per-scope totals in one pass; BOTH-scope elements contribute to both."""
1982        self._total_grade_self_eval = 0
1983        self._total_max_self_eval = 0
1984        self._total_grade_exo_eval = 0
1985        self._total_max_exo_eval = 0
1986        for g in self._grade_list:
1987            if g.scope & params.SELF_EVAL_SCOPE:
1988                self._total_grade_self_eval += g.grade
1989                self._total_max_self_eval += g.max_grade
1990            if g.scope & params.EXO_EVAL_SCOPE:
1991                self._total_grade_exo_eval += g.grade
1992                self._total_max_exo_eval += g.max_grade
1993
1994    def save_tests(self):
1995        now = datetime.datetime.now()
1996        if self._exam_json is None:
1997            exam_path = Path(params.sre_pub_dir) / params.exam_json_name
1998            try:
1999                self._exam_json = json.loads(exam_path.read_text())
2000            except FileNotFoundError:
2001                pass
2002            except Exception as e:
2003                log_error(f"can't read {exam_path}: {e}")
2004        for d1 in self.archive_dirs:
2005            d = Path(d1).expanduser().resolve()
2006            try:
2007                if not d.exists():
2008                    os.mkdir(d, 0o700)
2009                else:
2010                    os.listdir(d)  # trigger mount / catch stale handle early
2011                    permissions = os.stat(d).st_mode
2012                    if permissions != 0o700:
2013                        os.chmod(d, 0o700)
2014                filename = d / params.get_archive_name(self.net_scheme.running_lab_name, now)
2015                self.save_tests_on_file(str(filename))
2016            except Exception as e:
2017                log_error(f"can't save archive to {d}: {e}")
2018
2019    def save_tests_on_file(self, filename: str):
2020        files_content = {}
2021        if self.files_to_save_in_archives:
2022            fdir = Path(params.files_dir(self.get_running_lab_name()))
2023            if fdir.is_dir():
2024                for pattern in self.files_to_save_in_archives:
2025                    for fpath in fdir.iterdir():
2026                        if fpath.is_file() and fnmatch.fnmatch(fpath.name, pattern):
2027                            try:
2028                                files_content[fpath.name] = fpath.read_bytes()
2029                            except Exception:
2030                                pass
2031        archive = {
2032            params.running_lab_name_keyword: self.get_running_lab_name(),
2033            params.eval_date_keyword: self._eval_date,
2034            params.re_eval_date_keyword: self._re_eval_date,
2035            'data_json': self.get_data().to_json(),
2036            'tests': self.get_tests(),
2037            'errors': self.get_errors(),
2038            'answers': self.get_answers(),
2039            'grade_list': [asdict(e) for e in self._grade_list],
2040            'grade_parts': [asdict(p) for p in self._grade_parts],
2041            'total_grade_self_eval': self._total_grade_self_eval,
2042            'total_max_self_eval': self._total_max_self_eval,
2043            'mark_self_eval': self._mark_self_eval,
2044            'total_grade_exo_eval': self._total_grade_exo_eval,
2045            'total_max_exo_eval': self._total_max_exo_eval,
2046            'mark_exo_eval': self._mark_exo_eval,
2047            'maximum_mark': self._maximum_mark,
2048            'files': files_content,
2049        }
2050        if self._exam_json is not None:
2051            archive[params.exam_json_keyword] = self._exam_json
2052        cctx = zstd.ZstdCompressor(level=6)
2053        with tempfile.NamedTemporaryFile(
2054                mode="wb",
2055                dir=os.path.dirname(filename),
2056                delete=False
2057        ) as f:
2058            with cctx.stream_writer(f) as compressor:
2059                packed = msgpack.packb(archive, use_bin_type=True)
2060                compressor.write(packed)
2061            # f.flush()
2062            # os.fsync(f.fileno())
2063            temp_name = f.name
2064        os.replace(temp_name, filename)
class ErrorCategory(enum.Enum):
46class ErrorCategory(enum.Enum):
47    ERROR = "ERROR"
48    WARNING = "WARNING"
ERROR = <ErrorCategory.ERROR: 'ERROR'>
WARNING = <ErrorCategory.WARNING: 'WARNING'>
def write_error(string):
51def write_error(string):
52    print(string, file=sys.stderr)
def tr(text: str, **langs) -> SRE.common.TranslatedText:
70def tr(text: str, **langs) -> TranslatedText:
71    """Build a TranslatedText with 'en' as the default language key.
72
73    Looks up the caller module's ``_TRANSLATIONS`` dict for any language not
74    already supplied as a keyword argument.  Inline kwargs take priority.
75    ``None`` values in ``_TRANSLATIONS`` are skipped (placeholder for
76    untranslated strings).
77
78    ``_TRANSLATIONS`` must be defined **before** the first ``tr()`` call in
79    the file, because module-level expressions are evaluated at import time.
80
81    Usage in lab files::
82
83        title = tr("My lab")              # translation from _TRANSLATIONS
84        title = tr("My lab", fr="Mon TP") # inline, retro-compatible
85
86    For labs whose primary language is not English, use :func:`make_tr` instead.
87    """
88    merged = _lookup_translations(sys._getframe(1).f_globals, text, langs)
89    return TranslatedText({'en': text, **merged})

Build a TranslatedText with 'en' as the default language key.

Looks up the caller module's _TRANSLATIONS dict for any language not already supplied as a keyword argument. Inline kwargs take priority. None values in _TRANSLATIONS are skipped (placeholder for untranslated strings).

_TRANSLATIONS must be defined before the first tr() call in the file, because module-level expressions are evaluated at import time.

Usage in lab files::

title = tr("My lab")              # translation from _TRANSLATIONS
title = tr("My lab", fr="Mon TP") # inline, retro-compatible

For labs whose primary language is not English, use make_tr() instead.

def make_tr(default_lang: str, translations: dict | None = None):
 92def make_tr(default_lang: str, translations: dict | None = None):
 93    """Return a tr() function bound to *default_lang* as the first-positional language.
 94
 95    If *translations* is supplied explicitly it is used directly (no frame
 96    inspection).  Otherwise the caller module's globals are inspected for a
 97    ``_TRANSLATIONS`` dict at each call.
 98
 99    In both cases ``_TRANSLATIONS`` (or the dict passed as *translations*) must
100    be defined **before** the first ``tr()`` call in the file, because
101    module-level expressions are evaluated at import time.
102
103    Usage with explicit dict (no frame inspection)::
104
105        _TRANSLATIONS = {'fr': {"Mon TP": "My lab"}}
106        tr = make_tr('fr', translations=_TRANSLATIONS)
107        title = tr("Mon TP")   # 'en' from _TRANSLATIONS
108
109    Usage with frame inspection::
110
111        tr = make_tr('fr')
112        _TRANSLATIONS = {'en': {"Mon TP": "My lab"}}  # must come before tr() calls
113        title = tr("Mon TP")
114    """
115    if translations is not None:
116        _wrapped = {'_TRANSLATIONS': translations}
117
118        def _tr(text: str, **langs) -> TranslatedText:
119            merged = _lookup_translations(_wrapped, text, langs)
120            return TranslatedText({default_lang: text, **merged})
121    else:
122        caller_globals = sys._getframe(1).f_globals
123
124        def _tr(text: str, **langs) -> TranslatedText:
125            merged = _lookup_translations(caller_globals, text, langs)
126            return TranslatedText({default_lang: text, **merged})
127
128    return _tr

Return a tr() function bound to default_lang as the first-positional language.

If translations is supplied explicitly it is used directly (no frame inspection). Otherwise the caller module's globals are inspected for a _TRANSLATIONS dict at each call.

In both cases _TRANSLATIONS (or the dict passed as translations) must be defined before the first tr() call in the file, because module-level expressions are evaluated at import time.

Usage with explicit dict (no frame inspection)::

_TRANSLATIONS = {'fr': {"Mon TP": "My lab"}}
tr = make_tr('fr', translations=_TRANSLATIONS)
title = tr("Mon TP")   # 'en' from _TRANSLATIONS

Usage with frame inspection::

tr = make_tr('fr')
_TRANSLATIONS = {'en': {"Mon TP": "My lab"}}  # must come before tr() calls
title = tr("Mon TP")
def no_tr(text: str) -> str:
131def no_tr(text: str) -> str:
132    """Marker for prepare-sre-translations: leave this string untranslated.
133
134    Identity passthrough at runtime (returns *text* unchanged). It is purely a
135    hint to the translation tooling so the wrapped string is never wrapped in
136    tr() nor registered in _TRANSLATIONS — use it for short internal labels
137    (e.g. grade-element titles) that are not natural-language prose.
138    """
139    return text

Marker for prepare-sre-translations: leave this string untranslated.

Identity passthrough at runtime (returns text unchanged). It is purely a hint to the translation tooling so the wrapped string is never wrapped in tr() nor registered in _TRANSLATIONS — use it for short internal labels (e.g. grade-element titles) that are not natural-language prose.

@dataclass
class Flavor0:
314@dataclass
315class Flavor0:
316    """Base class for optional lab-parameterisation dataclasses.
317
318    Subclass with ``@dataclass(slots=True)`` and declare your fields.  If the
319    module defines ``flavor_form_at_startup = True``, the GUI renders the
320    ``flavor_form`` string (containing ``@@{field:regex}@@`` markers) as a form
321    before starting the lab.
322
323    Named presets can be attached as class attributes, e.g.::
324
325        Flavor.easy = Flavor(nb=1)
326
327    and referenced on the CLI with ``--set-flavor-name easy``.
328
329    Override :meth:`allowed_by_user` to restrict which values students may choose.
330    """
331
332    _registry = {}
333
334    def __init_subclass__(cls, **kwargs):
335        super().__init_subclass__(**kwargs)
336        key = f"{cls.__module__}.{cls.__qualname__}"
337        cls._type_key = key
338        Flavor0._registry[key] = cls
339
340    def to_dict(self):
341        """Serialise dataclass fields to a plain dict using :meth:`Data0._encode_value`."""
342        return {f.name: Data0._encode_value(getattr(self, f.name)) for f in fields(self)}
343
344    @classmethod
345    def from_dict(cls, d):
346        """Reconstruct a ``Flavor0`` instance from a dict produced by :meth:`to_dict`."""
347        decoded = {k: Data0._decode_value(v) for k, v in d.items()}
348        return cls(**decoded)  # type: ignore[call-arg]
349
350    @classmethod
351    def from_form_dict(cls, d: dict):
352        """Build a Flavor from form field values (strings from text inputs, bools from
353        submit buttons). Declared dataclass fields are coerced to their declared type.
354        Extra keys in d (form fields not declared in the dataclass) are set as plain
355        attributes on the instance so that allowed_by_user() can read them.
356        """
357        from typing import get_type_hints
358        hints = get_type_hints(cls)
359        declared = {f.name for f in fields(cls)}
360        coerced = {}
361        for f in fields(cls):
362            v = d.get(f.name)
363            if v is None:
364                continue
365            t = hints.get(f.name, str)
366            if not isinstance(t, type) or isinstance(v, t):
367                coerced[f.name] = v
368            elif t == bool:
369                coerced[f.name] = str(v).lower() in ('true', '1', 'yes')
370            elif t == int:
371                coerced[f.name] = int(v)
372            elif t == float:
373                coerced[f.name] = float(v)
374            else:
375                coerced[f.name] = str(v)
376        obj = cls(**coerced)
377        for k, v in d.items():
378            if k not in declared:
379                setattr(obj, k, v)
380        return obj
381
382    def allowed_by_user(self) -> Tuple[bool, str]:
383        """Return ``(True, "")`` if the student may use this flavor; ``(False, reason)`` otherwise.
384
385        Override in subclasses to restrict which values students can choose.
386        """
387        return True, ""

Base class for optional lab-parameterisation dataclasses.

Subclass with @dataclass(slots=True) and declare your fields. If the module defines flavor_form_at_startup = True, the GUI renders the flavor_form string (containing @@{field:regex}@@ markers) as a form before starting the lab.

Named presets can be attached as class attributes, e.g.::

Flavor.easy = Flavor(nb=1)

and referenced on the CLI with --set-flavor-name easy.

Override allowed_by_user() to restrict which values students may choose.

def to_dict(self):
340    def to_dict(self):
341        """Serialise dataclass fields to a plain dict using :meth:`Data0._encode_value`."""
342        return {f.name: Data0._encode_value(getattr(self, f.name)) for f in fields(self)}

Serialise dataclass fields to a plain dict using Data0._encode_value().

@classmethod
def from_dict(cls, d):
344    @classmethod
345    def from_dict(cls, d):
346        """Reconstruct a ``Flavor0`` instance from a dict produced by :meth:`to_dict`."""
347        decoded = {k: Data0._decode_value(v) for k, v in d.items()}
348        return cls(**decoded)  # type: ignore[call-arg]

Reconstruct a Flavor0 instance from a dict produced by to_dict().

@classmethod
def from_form_dict(cls, d: dict):
350    @classmethod
351    def from_form_dict(cls, d: dict):
352        """Build a Flavor from form field values (strings from text inputs, bools from
353        submit buttons). Declared dataclass fields are coerced to their declared type.
354        Extra keys in d (form fields not declared in the dataclass) are set as plain
355        attributes on the instance so that allowed_by_user() can read them.
356        """
357        from typing import get_type_hints
358        hints = get_type_hints(cls)
359        declared = {f.name for f in fields(cls)}
360        coerced = {}
361        for f in fields(cls):
362            v = d.get(f.name)
363            if v is None:
364                continue
365            t = hints.get(f.name, str)
366            if not isinstance(t, type) or isinstance(v, t):
367                coerced[f.name] = v
368            elif t == bool:
369                coerced[f.name] = str(v).lower() in ('true', '1', 'yes')
370            elif t == int:
371                coerced[f.name] = int(v)
372            elif t == float:
373                coerced[f.name] = float(v)
374            else:
375                coerced[f.name] = str(v)
376        obj = cls(**coerced)
377        for k, v in d.items():
378            if k not in declared:
379                setattr(obj, k, v)
380        return obj

Build a Flavor from form field values (strings from text inputs, bools from submit buttons). Declared dataclass fields are coerced to their declared type. Extra keys in d (form fields not declared in the dataclass) are set as plain attributes on the instance so that allowed_by_user() can read them.

def allowed_by_user(self) -> Tuple[bool, str]:
382    def allowed_by_user(self) -> Tuple[bool, str]:
383        """Return ``(True, "")`` if the student may use this flavor; ``(False, reason)`` otherwise.
384
385        Override in subclasses to restrict which values students can choose.
386        """
387        return True, ""

Return (True, "") if the student may use this flavor; (False, reason) otherwise.

Override in subclasses to restrict which values students can choose.

class Data0:
390class Data0:
391    """Base class for lab-specific parameter dataclasses.
392
393    Subclass with ``@dataclass(slots=True)`` and declare your fields normally.
394    Three dynamic containers are injected automatically into every instance by
395    ``__post_init__``:
396
397    * ``self.ips``  — :class:`_IPv4InterfaceContainer`: named ``IPv4Interface`` values
398    * ``self.nets`` — :class:`_IPv4NetContainer`: named ``IPv4Network`` values
399    * ``self.macs`` — :class:`_MacContainer`: named ``EUI`` MAC-address values
400
401    The class-level ``_registry`` maps ``"module.ClassName"`` keys to concrete
402    subclasses so that ``from_dict``/``unpack``/``from_json`` can reconstruct the
403    correct type from serialised data without knowing it in advance.
404
405    Override ``generate(flavor=None)`` as a ``@classmethod`` to produce a fresh
406    randomised instance.  The result is serialised to ``data.json`` at lab start
407    and reloaded for each evaluation.
408    """
409
410    _registry = {}
411    _rng = random.Random()
412
413    def __init_subclass__(cls, **kwargs):
414        super().__init_subclass__(**kwargs)
415        key = f"{cls.__module__}.{cls.__qualname__}"
416        cls._type_key = key
417        Data0._registry[key] = cls
418
419    def __post_init__(self):
420        """Inject ``ips``, ``nets``, ``macs``, and ``flavor`` into every instance.
421
422        Called automatically by the dataclass ``__init__``.  Uses
423        ``object.__setattr__`` so the injection works even when the subclass
424        declares ``slots=True``.
425        """
426        # Inject ips/nets/flavor into __dict__ (available even with slots=True subclasses
427        # because Data0 itself has no __slots__, so __dict__ is always inherited).
428        object.__setattr__(self, 'ips', _IPv4InterfaceContainer())
429        object.__setattr__(self, 'nets', _IPv4NetContainer())
430        object.__setattr__(self, 'macs', _MacContainer())
431        object.__setattr__(self, 'flavor', None)
432        object.__setattr__(self, '__flavor_name', None)
433        object.__setattr__(self, '__current_srelab_file', None)
434
435    # ---------------- lifecycle hooks ----------------
436    @classmethod
437    def compute_pre_generate(cls, flavor=None):
438        """Hook called just before generate() and after JSON/msgpack deserialization.
439
440        Override in a subclass to set class-level derived state from *flavor*.
441        The default implementation is a no-op (fully retro-compatible).
442        """
443        pass
444
445    def compute_post_generate(self):
446        """Hook called after generate() and after JSON/msgpack deserialization.
447
448        Override in a subclass to set instance-level derived state from data fields.
449        The default implementation is a no-op (fully retro-compatible).
450        """
451        pass
452
453    # ---------------- dict conversion ----------------
454    def to_dict(self):
455        """Serialise to a plain dict (JSON-safe).
456
457        Dataclass fields are encoded via :meth:`_encode_value`.  ``ips``, ``nets``,
458        and ``macs`` are stored as nested dicts of strings.  An attached ``Flavor``
459        is stored under the ``"flavor"`` key with its type key.
460        """
461        result = {}
462        for f in fields(self):
463            v = getattr(self, f.name)
464            result[f.name] = self._encode_value(v)
465        result['ips'] = self.ips.to_dict()
466        result['nets'] = self.nets.to_dict()
467        result['macs'] = self.macs.to_dict()
468        if self.flavor is not None:
469            result['flavor'] = {"__flavor_type__": self.flavor._type_key, "data": self.flavor.to_dict()}
470        flavor_name = getattr(self, '__flavor_name', None)
471        if flavor_name is not None:
472            result['__flavor_name'] = flavor_name
473        current_srelab_file = getattr(self, '__current_srelab_file', None)
474        if current_srelab_file is not None:
475            result['__current_srelab_file'] = current_srelab_file
476        return result
477
478    @classmethod
479    def from_dict(cls, d):
480        """Reconstruct a ``Data0`` instance from a plain dict produced by :meth:`to_dict`.
481
482        The concrete subclass is resolved from ``d["__type__"]`` when *cls* is
483        ``Data0`` itself; otherwise the calling class is used directly.
484        Raises ``ValueError`` for unknown type keys.
485        """
486        d = dict(d)
487        ips_data = d.pop('ips', {})
488        nets_data = d.pop('nets', {})
489        macs_data = d.pop('macs', {})
490        flavor_data = d.pop('flavor', None)
491        flavor_name = d.pop('__flavor_name', None)
492        current_srelab_file = d.pop('__current_srelab_file', None)
493        decoded = {k: cls._decode_value(v) for k, v in d.items()}
494        obj = cls(**decoded)
495        for k, v in ips_data.items():
496            setattr(obj.ips, k, IPv4Interface(v))
497        for k, v in nets_data.items():
498            setattr(obj.nets, k, IPv4Network(v))
499        for k, v in macs_data.items():
500            setattr(obj.macs, k, EUI(v))
501        if flavor_data is not None:
502            flavor_key = flavor_data.get("__flavor_type__")
503            if flavor_key not in Flavor0._registry:
504                raise ValueError(f"unknown Flavor0 type: {flavor_key!r}")
505            flavor_cls = Flavor0._registry[flavor_key]
506            object.__setattr__(obj, 'flavor', flavor_cls.from_dict(flavor_data["data"]))
507        object.__setattr__(obj, '__flavor_name', flavor_name)
508        object.__setattr__(obj, '__current_srelab_file', current_srelab_file)
509        return obj
510
511    # ---------------- value encoding ----------------
512    @staticmethod
513    def _encode_value(v):
514        """Encode a field value for JSON/msgpack storage.
515
516        * ``IPv4Address`` / ``IPv6Address`` → ``{"__ip__": "..."}``
517        * ``IPv4Network`` / ``IPv6Network`` → ``{"__net__": "..."}``
518        * nested ``Data0`` → ``{"__type__": "...", "data": {...}}``
519        * all other values are returned unchanged.
520        """
521        if isinstance(v, (IPv4Address, IPv6Address)):
522            return {"__ip__": str(v)}
523        if isinstance(v, (IPv4Network, IPv6Network)):
524            return {"__net__": str(v)}
525
526        if isinstance(v, Data0):
527            return {
528                "__type__": v._type_key,
529                "data": v.to_dict()
530            }
531        return v
532
533    @staticmethod
534    def _decode_value(v):
535        """Decode a value produced by :meth:`_encode_value` back to its Python type."""
536        if isinstance(v, dict):
537            if "__ip__" in v:
538                return ip_address(v["__ip__"])
539            if "__net__" in v:
540                return ip_network(v["__net__"])
541            if "__type__" in v:
542                type_key = v["__type__"]
543                if type_key not in Data0._registry:
544                    raise ValueError(f"unknown Data0 type: {type_key!r}")
545                return Data0._registry[type_key].from_dict(v["data"])
546        return v
547
548    # ---------------- msgpack ----------------
549    def pack(self):
550        """Serialise to a msgpack binary blob (includes type key for polymorphic reload)."""
551        return msgpack.packb(
552            {
553                "__type__": self._type_key,
554                "data": self.to_dict()
555            },
556            use_bin_type=True
557        )
558
559    @classmethod
560    def unpack(cls, blob):
561        """Deserialise a msgpack blob produced by :meth:`pack`.  Raises ``ValueError`` for unknown types."""
562        obj = msgpack.unpackb(blob, raw=False)
563        type_key = obj.get("__type__")
564        if type_key not in Data0._registry:
565            raise ValueError(f"unknown Data0 type: {type_key!r}")
566        result = Data0._registry[type_key].from_dict(obj["data"])
567        type(result).compute_pre_generate(result.flavor)
568        result.compute_post_generate()
569        return result
570
571    # ---------------- JSON ----------------
572    def to_json(self):
573        """Serialise to a JSON string (includes type key for polymorphic reload)."""
574        return json.dumps({
575            "__type__": self._type_key,
576            "data": self.to_dict()
577        })
578
579    @classmethod
580    def from_json(cls, s):
581        """Deserialise a JSON string produced by :meth:`to_json`."""
582        obj = json.loads(s)
583        concrete = Data0._registry[obj["__type__"]]
584        result = concrete.from_dict(obj["data"])
585        type(result).compute_pre_generate(result.flavor)
586        result.compute_post_generate()
587        return result
588
589    @classmethod
590    def load_from_json_file(cls, filename):
591        """
592        Load a Data0-derived object from a JSON file.
593        The concrete class is resolved automatically.
594        """
595
596        path = Path(filename)
597        with path.open("r", encoding="utf-8") as f:
598            obj = json.load(f)
599        concrete = Data0._registry[obj["__type__"]]
600        result = concrete.from_dict(obj["data"])
601        type(result).compute_pre_generate(result.flavor)
602        result.compute_post_generate()
603        return result
604
605    def save_to_json_file(self, filename):
606        """Write the instance to *filename* as JSON, mode 0o600 (lab secrets stay private)."""
607        path = Path(filename)
608        fd = os.open(
609            path,
610            os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
611            0o600
612        )
613        with os.fdopen(fd, "w", encoding="utf-8") as f:
614            json.dump(
615                {
616                    "__type__": self._type_key,
617                    "data": self.to_dict()
618                },
619                f,
620                indent=2  # optional but operationally useful
621            )

Base class for lab-specific parameter dataclasses.

Subclass with @dataclass(slots=True) and declare your fields normally. Three dynamic containers are injected automatically into every instance by __post_init__:

  • self.ips — _IPv4InterfaceContainer: named IPv4Interface values
  • self.nets — _IPv4NetContainer: named IPv4Network values
  • self.macs — _MacContainer: named EUI MAC-address values

The class-level _registry maps "module.ClassName" keys to concrete subclasses so that from_dict/unpack/from_json can reconstruct the correct type from serialised data without knowing it in advance.

Override generate(flavor=None) as a @classmethod to produce a fresh randomised instance. The result is serialised to data.json at lab start and reloaded for each evaluation.

@classmethod
def compute_pre_generate(cls, flavor=None):
436    @classmethod
437    def compute_pre_generate(cls, flavor=None):
438        """Hook called just before generate() and after JSON/msgpack deserialization.
439
440        Override in a subclass to set class-level derived state from *flavor*.
441        The default implementation is a no-op (fully retro-compatible).
442        """
443        pass

Hook called just before generate() and after JSON/msgpack deserialization.

Override in a subclass to set class-level derived state from flavor. The default implementation is a no-op (fully retro-compatible).

def compute_post_generate(self):
445    def compute_post_generate(self):
446        """Hook called after generate() and after JSON/msgpack deserialization.
447
448        Override in a subclass to set instance-level derived state from data fields.
449        The default implementation is a no-op (fully retro-compatible).
450        """
451        pass

Hook called after generate() and after JSON/msgpack deserialization.

Override in a subclass to set instance-level derived state from data fields. The default implementation is a no-op (fully retro-compatible).

def to_dict(self):
454    def to_dict(self):
455        """Serialise to a plain dict (JSON-safe).
456
457        Dataclass fields are encoded via :meth:`_encode_value`.  ``ips``, ``nets``,
458        and ``macs`` are stored as nested dicts of strings.  An attached ``Flavor``
459        is stored under the ``"flavor"`` key with its type key.
460        """
461        result = {}
462        for f in fields(self):
463            v = getattr(self, f.name)
464            result[f.name] = self._encode_value(v)
465        result['ips'] = self.ips.to_dict()
466        result['nets'] = self.nets.to_dict()
467        result['macs'] = self.macs.to_dict()
468        if self.flavor is not None:
469            result['flavor'] = {"__flavor_type__": self.flavor._type_key, "data": self.flavor.to_dict()}
470        flavor_name = getattr(self, '__flavor_name', None)
471        if flavor_name is not None:
472            result['__flavor_name'] = flavor_name
473        current_srelab_file = getattr(self, '__current_srelab_file', None)
474        if current_srelab_file is not None:
475            result['__current_srelab_file'] = current_srelab_file
476        return result

Serialise to a plain dict (JSON-safe).

Dataclass fields are encoded via _encode_value(). ips, nets, and macs are stored as nested dicts of strings. An attached Flavor is stored under the "flavor" key with its type key.

@classmethod
def from_dict(cls, d):
478    @classmethod
479    def from_dict(cls, d):
480        """Reconstruct a ``Data0`` instance from a plain dict produced by :meth:`to_dict`.
481
482        The concrete subclass is resolved from ``d["__type__"]`` when *cls* is
483        ``Data0`` itself; otherwise the calling class is used directly.
484        Raises ``ValueError`` for unknown type keys.
485        """
486        d = dict(d)
487        ips_data = d.pop('ips', {})
488        nets_data = d.pop('nets', {})
489        macs_data = d.pop('macs', {})
490        flavor_data = d.pop('flavor', None)
491        flavor_name = d.pop('__flavor_name', None)
492        current_srelab_file = d.pop('__current_srelab_file', None)
493        decoded = {k: cls._decode_value(v) for k, v in d.items()}
494        obj = cls(**decoded)
495        for k, v in ips_data.items():
496            setattr(obj.ips, k, IPv4Interface(v))
497        for k, v in nets_data.items():
498            setattr(obj.nets, k, IPv4Network(v))
499        for k, v in macs_data.items():
500            setattr(obj.macs, k, EUI(v))
501        if flavor_data is not None:
502            flavor_key = flavor_data.get("__flavor_type__")
503            if flavor_key not in Flavor0._registry:
504                raise ValueError(f"unknown Flavor0 type: {flavor_key!r}")
505            flavor_cls = Flavor0._registry[flavor_key]
506            object.__setattr__(obj, 'flavor', flavor_cls.from_dict(flavor_data["data"]))
507        object.__setattr__(obj, '__flavor_name', flavor_name)
508        object.__setattr__(obj, '__current_srelab_file', current_srelab_file)
509        return obj

Reconstruct a Data0 instance from a plain dict produced by to_dict().

The concrete subclass is resolved from d["__type__"] when cls is Data0 itself; otherwise the calling class is used directly. Raises ValueError for unknown type keys.

def pack(self):
549    def pack(self):
550        """Serialise to a msgpack binary blob (includes type key for polymorphic reload)."""
551        return msgpack.packb(
552            {
553                "__type__": self._type_key,
554                "data": self.to_dict()
555            },
556            use_bin_type=True
557        )

Serialise to a msgpack binary blob (includes type key for polymorphic reload).

@classmethod
def unpack(cls, blob):
559    @classmethod
560    def unpack(cls, blob):
561        """Deserialise a msgpack blob produced by :meth:`pack`.  Raises ``ValueError`` for unknown types."""
562        obj = msgpack.unpackb(blob, raw=False)
563        type_key = obj.get("__type__")
564        if type_key not in Data0._registry:
565            raise ValueError(f"unknown Data0 type: {type_key!r}")
566        result = Data0._registry[type_key].from_dict(obj["data"])
567        type(result).compute_pre_generate(result.flavor)
568        result.compute_post_generate()
569        return result

Deserialise a msgpack blob produced by pack(). Raises ValueError for unknown types.

def to_json(self):
572    def to_json(self):
573        """Serialise to a JSON string (includes type key for polymorphic reload)."""
574        return json.dumps({
575            "__type__": self._type_key,
576            "data": self.to_dict()
577        })

Serialise to a JSON string (includes type key for polymorphic reload).

@classmethod
def from_json(cls, s):
579    @classmethod
580    def from_json(cls, s):
581        """Deserialise a JSON string produced by :meth:`to_json`."""
582        obj = json.loads(s)
583        concrete = Data0._registry[obj["__type__"]]
584        result = concrete.from_dict(obj["data"])
585        type(result).compute_pre_generate(result.flavor)
586        result.compute_post_generate()
587        return result

Deserialise a JSON string produced by to_json().

@classmethod
def load_from_json_file(cls, filename):
589    @classmethod
590    def load_from_json_file(cls, filename):
591        """
592        Load a Data0-derived object from a JSON file.
593        The concrete class is resolved automatically.
594        """
595
596        path = Path(filename)
597        with path.open("r", encoding="utf-8") as f:
598            obj = json.load(f)
599        concrete = Data0._registry[obj["__type__"]]
600        result = concrete.from_dict(obj["data"])
601        type(result).compute_pre_generate(result.flavor)
602        result.compute_post_generate()
603        return result

Load a Data0-derived object from a JSON file. The concrete class is resolved automatically.

def save_to_json_file(self, filename):
605    def save_to_json_file(self, filename):
606        """Write the instance to *filename* as JSON, mode 0o600 (lab secrets stay private)."""
607        path = Path(filename)
608        fd = os.open(
609            path,
610            os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
611            0o600
612        )
613        with os.fdopen(fd, "w", encoding="utf-8") as f:
614            json.dump(
615                {
616                    "__type__": self._type_key,
617                    "data": self.to_dict()
618                },
619                f,
620                indent=2  # optional but operationally useful
621            )

Write the instance to filename as JSON, mode 0o600 (lab secrets stay private).

def sre_state(fn=None, *, user_allowed=False, description=''):
624def sre_state(fn=None, *, user_allowed=False, description=''):
625    def decorator(f):
626        f._is_sre_state = True
627        f._sre_state_user_allowed = user_allowed
628        f._sre_state_description = description
629        return f
630
631    if fn is not None:
632        return decorator(fn)
633    return decorator
class NetScheme0:
 636class NetScheme0:
 637    """Base class for lab network topology definitions.
 638
 639    Subclasses declare the topology as class-level dicts and implement state
 640    methods decorated with :func:`sre_state`.
 641
 642    Class-level attributes:
 643
 644    * ``_machine_specs`` — ``{name: {Machine kwargs}}`` — one entry per container.
 645    * ``_network_specs`` — ``{net_name: {color: ...}}`` — optional display hints.
 646    * ``_topology``      — ``{net_name: [machine, ...]}`` or ``{net_name: {machine: iface}}``
 647      — which machines connect to each network and on which interface.
 648
 649    If any of these three attributes is not defined in the subclass (neither as a class
 650    variable nor as a property), the corresponding attribute is read from ``data``
 651    (``data.topology``, ``data.machine_specs``, ``data.network_specs``).
 652    Explicit subclass definitions always take precedence.
 653
 654    Instance attributes set by ``__init__``:
 655
 656    * ``self.data`` — the ``Data0`` instance for this lab run.
 657    * ``self.informations`` — Markdown text shown in the Informations tab (set this in ``__init__``).
 658    * Named machine and network objects (e.g. ``self.router``, ``self.lan``).
 659
 660    The ``initial`` state method is always called at ``sre start``.  Additional
 661    states are applied with ``sre state <lab> <state_name>``.
 662    """
 663
 664    _machine_specs = {}
 665    _network_specs = {}  # {net_name: {'color': ...}} — optional colors for networks
 666    _topology = {}  # {net_name: [m, ...] or {m: iface, ...}} — mixed allowed
 667
 668    def _resolve_spec(self, attr: str, data_attr: str):
 669        """Return the topology/spec dict for *attr*.
 670
 671        Resolution order:
 672        1. If the concrete subclass (or any class before ``NetScheme0`` in the MRO)
 673           defines *attr* as a class variable or property, return that value.
 674        2. Otherwise look for *data_attr* on ``self.data`` (checks instance attributes
 675           first, then class-level attributes set e.g. by ``compute_pre_generate``).
 676        3. If neither source provides the attribute, return the ``NetScheme0`` default
 677           (an empty dict), which is equivalent to "no topology/specs defined".
 678        """
 679        for cls in type(self).__mro__:
 680            if cls is NetScheme0:
 681                break
 682            if attr in cls.__dict__:
 683                return getattr(self, attr)
 684        return getattr(self.data, data_attr, getattr(NetScheme0, attr))
 685
 686    def __init__(self, data, running_lab_name, lab_hash=None):
 687        """Build all ``Machine``, ``Network``, and ``NetAdapter`` objects from the class-level specs.
 688
 689        Args:
 690            data: a ``Data0`` instance containing lab-specific parameters.
 691            running_lab_name: the runtime identifier ``{ts}@@@{lab}@@@{user}``.
 692            lab_hash: optional Kathara lab hash (resolved lazily if omitted).
 693        """
 694        self.data = data
 695        self.running_lab_name = running_lab_name
 696        self.debug_project = os.path.exists(params.debug_project_marker_filename(running_lab_name))
 697        self.lab_name = params.get_lab_name_from_running_lab_name(running_lab_name)
 698        self.lab_hash = lab_hash
 699        self.current_srelab_file = params.get_current_srelab_file_from_running_lab_name(running_lab_name)
 700        self.informations = ""
 701
 702        _machine_specs = self._resolve_spec('_machine_specs', 'machine_specs')
 703        _network_specs = self._resolve_spec('_network_specs', 'network_specs')
 704        _topology = self._resolve_spec('_topology', 'topology')
 705
 706        for name, parameters in _machine_specs.items():
 707            setattr(self, name, Machine(name=name, **parameters))
 708
 709        # Create networks from _network_specs (with optional color), then remaining from _topology
 710        for name, parameters in _network_specs.items():
 711            setattr(self, name, Network(name=name, **parameters))
 712
 713        for net_name in _topology:
 714            if not hasattr(self, net_name):
 715                setattr(self, net_name, Network(name=net_name))
 716
 717        # Create NetAdapters from _topology.
 718        # Each value is either a list (auto interface) or a dict {machine: iface_spec}.
 719        # iface_spec can be: None (auto), an int, or a (int, mac) tuple.
 720        # Interface auto-assignment counts prior connections per machine across all networks.
 721        _iface_counter: dict[str, int] = {}
 722        for net_name, machines in _topology.items():
 723            net = getattr(self, net_name)
 724            if isinstance(machines, dict):
 725                items = machines.items()
 726            else:
 727                items = ((m, None) for m in machines)
 728            for mname, iface_spec in items:
 729                machine = getattr(self, mname)
 730                if isinstance(iface_spec, tuple):
 731                    iface, mac = iface_spec
 732                else:
 733                    iface, mac = iface_spec, None
 734                if iface is None:
 735                    iface = _iface_counter.get(mname, 0)
 736                _iface_counter[mname] = max(_iface_counter.get(mname, 0), iface) + 1
 737                NetAdapter(network=net, machine=machine, interface=iface, mac=mac)
 738
 739        self._ops: dict[str, list] = {}  # machine → [str | _FileOp, ...]
 740        self._host_ops: dict[int, list] = {}
 741
 742        self.net_config = None
 743
 744    def host_interfaces_from_topology(self) -> dict:
 745        """Return {machine_name: [net_name, ...]} derived from the resolved topology."""
 746        topology = self._resolve_spec('_topology', 'topology')
 747        result = {}
 748        for net_name, machines in topology.items():
 749            names = machines.keys() if isinstance(machines, dict) else machines
 750            for mname in names:
 751                result.setdefault(mname, []).append(net_name)
 752        return result
 753
 754    @sre_state(user_allowed=False)
 755    def initial(self):
 756        pass
 757
 758    @classmethod
 759    def get_state_methods(cls):
 760        """Return a sorted list of all ``@sre_state``-decorated method names in this class hierarchy."""
 761        names = set()
 762        for klass in cls.__mro__:
 763            for name, fn in klass.__dict__.items():
 764                if callable(fn) and getattr(fn, '_is_sre_state', False):
 765                    names.add(name)
 766        return sorted(names)
 767
 768    @classmethod
 769    def is_state_user_allowed(cls, state):
 770        """Return ``True`` if *state* was decorated with ``@sre_state(user_allowed=True)``."""
 771        for klass in cls.__mro__:
 772            fn = klass.__dict__.get(state)
 773            if fn is not None and getattr(fn, '_is_sre_state', False):
 774                return getattr(fn, '_sre_state_user_allowed', False)
 775        return False
 776
 777    @classmethod
 778    def get_user_allowed_states(cls):
 779        """Return ``{state_name: description}`` for all user-allowed states."""
 780        result = {}
 781        for state in cls.get_state_methods():
 782            for klass in cls.__mro__:
 783                fn = klass.__dict__.get(state)
 784                if fn is not None and getattr(fn, '_is_sre_state', False):
 785                    if getattr(fn, '_sre_state_user_allowed', False):
 786                        result[state] = getattr(fn, '_sre_state_description', '')
 787                    break
 788        return result
 789
 790    def get_data(self):
 791        return self.data
 792
 793    def get_machine(self, machine_name):
 794        """Return the :class:`Machine` with *machine_name*, or ``None`` if not found."""
 795        if not hasattr(self, machine_name):
 796            return None
 797        machine = getattr(self, machine_name)
 798        if not isinstance(machine, Machine):
 799            return None
 800        return machine
 801
 802    def get_network(self, network_name):
 803        """Return the :class:`Network` with *network_name*, or ``None`` if not found."""
 804        if not hasattr(self, network_name):
 805            return None
 806        machine = getattr(self, network_name)
 807        if not isinstance(machine, Network):
 808            return None
 809        return machine
 810
 811    def get_machines(self):
 812        for _, value in self.__dict__.items():
 813            if isinstance(value, Machine):
 814                yield value
 815
 816    def get_machine_names(self):
 817        return [m.name for m in self.get_machines()]
 818
 819    def has_privileged_machines(self):
 820        return any(m.privileged for m in self.get_machines())
 821
 822    def get_accessible_machine_names(self):
 823        return [m.name for m in self.get_machines()
 824                if not m.hidden and m.allow_connection]
 825
 826    def get_visible_machine_names(self):
 827        return [m.name for m in self.get_visibles_machines()]
 828
 829    def get_visibles_machines(self):
 830        for _, value in self.__dict__.items():
 831            if isinstance(value, Machine):
 832                if not value.hidden:
 833                    yield value
 834
 835    def get_networks(self):
 836        for _, value in self.__dict__.items():
 837            if isinstance(value, Network):
 838                yield value
 839
 840    def get_topology(self):
 841        return self._resolve_spec('_topology', 'topology')
 842
 843    def get_machine_specs(self):
 844        return self._resolve_spec('_machine_specs', 'machine_specs')
 845
 846    def get_network_specs(self):
 847        return self._resolve_spec('_network_specs', 'network_specs')
 848
 849    def cmd(self, machine, command, step=1):
 850        """Register a shell command to execute inside *machine*'s container at *step*."""
 851        if SRE.args.debug:
 852            print(f"[state] [{machine}] CMD (step={step}): {command}", file=sys.stderr)
 853        self._ops.setdefault(step, {}).setdefault(machine, []).append(command)
 854
 855    def host_cmd(self, command, step=1):
 856        """Register a shell command to execute on the **host** (not inside a container) at *step*.
 857
 858        Requires ``params.execute_commands_on_host`` to be enabled; aborts otherwise.
 859        """
 860        if params.execute_commands_on_host is False:
 861            sys.exit("host_cmd() is disabled by params.execute_commands_on_host")
 862        if SRE.args.debug:
 863            print(f"[state] HOST_CMD (step={step}): {command}", file=sys.stderr)
 864        self._host_ops.setdefault(step, []).append(_HostCmdOp(command))
 865
 866    def host_callback(self, callback, step=1):
 867        """Register a Python callable to invoke on the host at *step* (called with no arguments)."""
 868        if SRE.args.debug:
 869            print(f"[state] HOST_CALLBACK (step={step}): {getattr(callback, '__name__', repr(callback))}",
 870                  file=sys.stderr)
 871        self._host_ops.setdefault(step, []).append(_HostCallbackOp(callback))
 872
 873    def cp_from_host(self, src: str, machine: str, dest: str, owner: str = "root:root", permissions: int = None,
 874                     mtime: float = None, step=1):
 875        """Register a file copy from the host into *machine*'s container at *step*.
 876
 877        *src* may be relative (resolved against ``params.files_dir``).
 878        *dest* is the absolute path inside the container.
 879        """
 880        orig_path = Path(src)
 881        if not orig_path.is_absolute():
 882            orig_path = Path(params.files_dir(self.running_lab_name)) / orig_path
 883        if SRE.args.debug:
 884            print(f"[state] [{machine}] CP (step={step}): {orig_path} -> {dest} (owner={owner})", file=sys.stderr)
 885        self._ops.setdefault(step, {}).setdefault(machine, []).append(
 886            _CpFromHostOp(orig_path, dest, permissions, owner, mtime)
 887        )
 888
 889    def cp_to_host(self, machine: str, path: str, dest: str, permissions: int = None, step=1):
 890        """Copy file `path` from `machine` to the host files_dir.
 891
 892        Args:
 893            machine:     container name
 894            path:        absolute path of the file inside the container
 895            dest:        relative destination path inside params.files_dir(running_lab_name)
 896            permissions: file mode to apply on the host copy (default: None, leave as written)
 897            step:        execution step (default 1)
 898        """
 899        files_dir = Path(params.files_dir(self.running_lab_name)).resolve()
 900        dest_path = (files_dir / dest).resolve()
 901        if not dest_path.is_relative_to(files_dir):
 902            error_quit(f"cp_to_host: dest '{dest}' is outside files_dir '{files_dir}'")
 903        if SRE.args.debug:
 904            print(f"[state] [{machine}] CP_TO_HOST (step={step}): {path} -> {dest_path}", file=sys.stderr)
 905        self._ops.setdefault(step, {}).setdefault(machine, []).append(
 906            _CpToHostOp(path, str(dest_path), permissions)
 907        )
 908
 909    def file(self, machine, filename, content, permissions=0o644, owner="root:root", mtime=None, step=1):
 910        """Register a file to create/overwrite on `machine` during the current state.
 911
 912        Args:
 913            machine:     machine name (str)
 914            filename:    absolute path inside the container (e.g. "/etc/myconfig")
 915            content:     file content (str or bytes)
 916            permissions: octal mode (default 0o644)
 917            owner:       "user:group" string (default "root:root")
 918            mtime:       modification time as a Unix timestamp (float); defaults to now
 919            step:        execution step (default 1); higher steps run after lower ones
 920        """
 921        import time as _time
 922        raw = content.encode() if isinstance(content, str) else content
 923        if SRE.args.debug:
 924            print(
 925                f"[state] [{machine}] FILE (step={step}): {filename} (permissions={permissions:#o}, owner={owner}, size={len(raw)}B)",
 926                file=sys.stderr)
 927        self._ops.setdefault(step, {}).setdefault(machine, []).append(
 928            _FileOp(filename, raw, permissions, owner, mtime if mtime is not None else _time.time())
 929        )
 930
 931    def append_to_file(self, machine, filename, content, permissions=None, owner=None, mtime=None, step=1):
 932        """Append content to a file on `machine`; create the file if it does not exist.
 933
 934        Args:
 935            machine:     machine name (str)
 936            filename:    absolute path inside the container (e.g. "/etc/hosts")
 937            content:     content to append (str or bytes)
 938            permissions: octal mode to set after appending (default: leave unchanged)
 939            owner:       "user:group" to set after appending (default: leave unchanged)
 940            mtime:       modification time as a Unix timestamp (default: leave unchanged)
 941            step:        execution step (default 1); higher steps run after lower ones
 942        """
 943        raw = content.encode() if isinstance(content, str) else content
 944        if SRE.args.debug:
 945            print(f"[state] [{machine}] APPEND (step={step}): {filename} (size={len(raw)}B)", file=sys.stderr)
 946        self._ops.setdefault(step, {}).setdefault(machine, []).append(
 947            _AppendOp(filename, raw, permissions, owner, mtime)
 948        )
 949
 950    def idempotent_append_to_file(self, machine, filename, content, permissions=None, owner=None, mtime=None, step=1):
 951        """Append content to a file on `machine` only if the file does not already end with it.
 952
 953        Idempotent version of append_to_file: safe to call multiple times — the content
 954        is appended at most once per application.  If permissions, owner, or mtime are
 955        provided they are applied regardless of whether content was appended.
 956
 957        Args:
 958            machine:     machine name (str)
 959            filename:    absolute path inside the container (e.g. "/etc/hosts")
 960            content:     content to append (str or bytes)
 961            permissions: octal mode to set after the check (default: leave unchanged)
 962            owner:       "user:group" to set after the check (default: leave unchanged)
 963            mtime:       modification time as a Unix timestamp (default: leave unchanged)
 964            step:        execution step (default 1); higher steps run after lower ones
 965        """
 966        raw = content.encode() if isinstance(content, str) else content
 967        if SRE.args.debug:
 968            print(f"[state] [{machine}] IDEMPOTENT_APPEND (step={step}): {filename} (size={len(raw)}B)",
 969                  file=sys.stderr)
 970        self._ops.setdefault(step, {}).setdefault(machine, []).append(
 971            _IdempotentAppendOp(filename, raw, permissions, owner, mtime)
 972        )
 973
 974    def compute_state_ops(self, state):
 975        self._ops = {}
 976        self._host_ops = {}
 977        if not hasattr(self, state):
 978            error_quit(f"state method {state} does not exist")
 979        method = getattr(self, state)
 980        if not callable(method):
 981            error_quit(f"state method {state} not callable")
 982        try:
 983            method()
 984        except Exception as e:
 985            error_quit(f"error during {state} execution: {e}")
 986        return self._ops, self._host_ops
 987
 988    def get_new_lab_from_scheme(self):
 989        lab = Lab(name=self.running_lab_name)
 990        abb_lab_name = params.get_abbreviated_lab_name_from_running_lab_name(self.running_lab_name)
 991        _used_ports: set = set()
 992        xauth_cookie = _resolve_xauth_cookie()
 993        for m in self.get_machines():
 994            m.ports = [_resolve_port(p, _used_ports) for p in m.ports]
 995            m.envs[f"{params.sre_name_env_variable}={abb_lab_name}"] = True
 996            if m.x11_host:
 997                m.envs[f"{params.sre_host_ip_env_variable}={params.sre_host_ip}"] = True
 998                if xauth_cookie:
 999                    m.envs[f"{params.sre_xauth_cookie_env_variable}={xauth_cookie}"] = True
1000            volumes = []
1001            if len(m.volumes) > 0:
1002                has_private_volume = False
1003                for v in m.volumes:
1004                    if len(v) > 2 and v[3] == 'private':
1005                        has_private_volume = True
1006                        break
1007
1008                if has_private_volume and params.disable_volume_mount_on_root_partition:
1009                    private_mount_dir = self.get_private_mount_dir()
1010                    p = Path(private_mount_dir)
1011                    while not p.exists():
1012                        p = p.parent
1013                    if os.stat(p).st_dev == os.stat('/').st_dev:
1014                        error_quit(
1015                            f"Machine '{m.name}': private mount dir '{private_mount_dir}' is on the root partition (disable_volume_mount_on_root_partition=True)")
1016
1017                for v in m.volumes:
1018                    if len(v) > 2 and v[3] == 'private':
1019                        if v[0].startswith('/'):
1020                            error_quit(f"volume {v[0]} is private and have an absolute path")
1021                        v_host = f"{self.get_private_mount_dir()}/{v[0]}"
1022                        os.makedirs(v_host, exist_ok=True)
1023                        os.chmod(v_host, 0o777)
1024                    else:
1025                        if v[0].startswith('/'):
1026                            v_host = v[0]
1027                        else:
1028                            v_host = f"{self.get_user_public_dir()}/{v[0]}"
1029                            os.makedirs(v_host, exist_ok=True)
1030                            os.chmod(v_host, 0o777)
1031                    if v[2] not in ['rw', 'ro']:
1032                        error_quit(f"volume mode {v[2]} not supported (only rw and ro)")
1033
1034                    volumes.append(f"{v_host}|{v[1]}|{v[2]}")
1035            lab.new_machine(m.name,
1036                            exec_commands=m.exec_commands,
1037                            sysctls=m.sysctls,
1038                            envs=m.envs,
1039                            ports=m.ports,
1040                            ulimits=m.ulimits,
1041                            volumes=volumes,
1042                            shell=m.kathara_shell,
1043                            image=m.image,
1044                            bridged=m.bridged,
1045                            privileged=m.privileged,
1046                            entrypoint=m.entrypoint,
1047                            )
1048            for net, netAdapter in m.net_adapters.items():
1049                lab.connect_machine_to_link(m.name, net.name, machine_iface_number=netAdapter.interface,
1050                                            mac_address=str(netAdapter.mac).replace('-',
1051                                                                                    ':').lower() if netAdapter.mac is not None else None)
1052        self.lab_hash = lab.hash
1053        return lab
1054
1055    def get_lab_hash(self):
1056        if self.lab_hash is None:
1057            lab = Lab(name=self.running_lab_name)
1058            self.lab_hash = lab.hash
1059        return self.lab_hash
1060
1061    def get_lab_from_kathara(self):
1062        lab_hash = self.get_lab_hash()
1063        return Kathara.get_instance().get_lab_from_api(lab_hash=lab_hash)
1064
1065    def get_public_lab_dir(self):
1066        return params.public_lab_dir(self.running_lab_name)
1067
1068    def get_private_lab_dir(self):
1069        return params.private_lab_dir(self.running_lab_name)
1070
1071    def get_files_dir(self):
1072        return params.files_dir(self.running_lab_name)
1073
1074    def get_private_mount_dir(self):
1075        return params.private_mount_dir(self.running_lab_name)
1076
1077    def get_user_public_dir(self):
1078        try:
1079            return Path(params.link_to_user_public_dir(self.running_lab_name)).readlink()
1080        except OSError:
1081            return Path(params.link_to_user_public_dir(self.running_lab_name))
1082
1083    def get_shared_dir(self):
1084        return f"{self.get_user_public_dir()}/{params.shared_dir_name}"
1085
1086    def answers_dir(self):
1087        return params.answers_dir(self.running_lab_name)
1088
1089    def answers_file(self):
1090        return params.answers_filename(self.running_lab_name)

Base class for lab network topology definitions.

Subclasses declare the topology as class-level dicts and implement state methods decorated with sre_state().

Class-level attributes:

  • _machine_specs — {name: {Machine kwargs}} — one entry per container.
  • _network_specs — {net_name: {color: ...}} — optional display hints.
  • _topology — {net_name: [machine, ...]} or {net_name: {machine: iface}} — which machines connect to each network and on which interface.

If any of these three attributes is not defined in the subclass (neither as a class variable nor as a property), the corresponding attribute is read from data (data.topology, data.machine_specs, data.network_specs). Explicit subclass definitions always take precedence.

Instance attributes set by __init__:

  • self.data — the Data0 instance for this lab run.
  • self.informations — Markdown text shown in the Informations tab (set this in __init__).
  • Named machine and network objects (e.g. self.router, self.lan).

The initial state method is always called at sre start. Additional states are applied with sre state <lab> <state_name>.

NetScheme0(data, running_lab_name, lab_hash=None)
686    def __init__(self, data, running_lab_name, lab_hash=None):
687        """Build all ``Machine``, ``Network``, and ``NetAdapter`` objects from the class-level specs.
688
689        Args:
690            data: a ``Data0`` instance containing lab-specific parameters.
691            running_lab_name: the runtime identifier ``{ts}@@@{lab}@@@{user}``.
692            lab_hash: optional Kathara lab hash (resolved lazily if omitted).
693        """
694        self.data = data
695        self.running_lab_name = running_lab_name
696        self.debug_project = os.path.exists(params.debug_project_marker_filename(running_lab_name))
697        self.lab_name = params.get_lab_name_from_running_lab_name(running_lab_name)
698        self.lab_hash = lab_hash
699        self.current_srelab_file = params.get_current_srelab_file_from_running_lab_name(running_lab_name)
700        self.informations = ""
701
702        _machine_specs = self._resolve_spec('_machine_specs', 'machine_specs')
703        _network_specs = self._resolve_spec('_network_specs', 'network_specs')
704        _topology = self._resolve_spec('_topology', 'topology')
705
706        for name, parameters in _machine_specs.items():
707            setattr(self, name, Machine(name=name, **parameters))
708
709        # Create networks from _network_specs (with optional color), then remaining from _topology
710        for name, parameters in _network_specs.items():
711            setattr(self, name, Network(name=name, **parameters))
712
713        for net_name in _topology:
714            if not hasattr(self, net_name):
715                setattr(self, net_name, Network(name=net_name))
716
717        # Create NetAdapters from _topology.
718        # Each value is either a list (auto interface) or a dict {machine: iface_spec}.
719        # iface_spec can be: None (auto), an int, or a (int, mac) tuple.
720        # Interface auto-assignment counts prior connections per machine across all networks.
721        _iface_counter: dict[str, int] = {}
722        for net_name, machines in _topology.items():
723            net = getattr(self, net_name)
724            if isinstance(machines, dict):
725                items = machines.items()
726            else:
727                items = ((m, None) for m in machines)
728            for mname, iface_spec in items:
729                machine = getattr(self, mname)
730                if isinstance(iface_spec, tuple):
731                    iface, mac = iface_spec
732                else:
733                    iface, mac = iface_spec, None
734                if iface is None:
735                    iface = _iface_counter.get(mname, 0)
736                _iface_counter[mname] = max(_iface_counter.get(mname, 0), iface) + 1
737                NetAdapter(network=net, machine=machine, interface=iface, mac=mac)
738
739        self._ops: dict[str, list] = {}  # machine → [str | _FileOp, ...]
740        self._host_ops: dict[int, list] = {}
741
742        self.net_config = None

Build all Machine, Network, and NetAdapter objects from the class-level specs.

Arguments:
  • data: a Data0 instance containing lab-specific parameters.
  • running_lab_name: the runtime identifier {ts}@@@{lab}@@@{user}.
  • lab_hash: optional Kathara lab hash (resolved lazily if omitted).
data
running_lab_name
debug_project
lab_name
lab_hash
current_srelab_file
informations
net_config
def host_interfaces_from_topology(self) -> dict:
744    def host_interfaces_from_topology(self) -> dict:
745        """Return {machine_name: [net_name, ...]} derived from the resolved topology."""
746        topology = self._resolve_spec('_topology', 'topology')
747        result = {}
748        for net_name, machines in topology.items():
749            names = machines.keys() if isinstance(machines, dict) else machines
750            for mname in names:
751                result.setdefault(mname, []).append(net_name)
752        return result

Return {machine_name: [net_name, ...]} derived from the resolved topology.

@sre_state(user_allowed=False)
def initial(self):
754    @sre_state(user_allowed=False)
755    def initial(self):
756        pass
@classmethod
def get_state_methods(cls):
758    @classmethod
759    def get_state_methods(cls):
760        """Return a sorted list of all ``@sre_state``-decorated method names in this class hierarchy."""
761        names = set()
762        for klass in cls.__mro__:
763            for name, fn in klass.__dict__.items():
764                if callable(fn) and getattr(fn, '_is_sre_state', False):
765                    names.add(name)
766        return sorted(names)

Return a sorted list of all @sre_state-decorated method names in this class hierarchy.

@classmethod
def is_state_user_allowed(cls, state):
768    @classmethod
769    def is_state_user_allowed(cls, state):
770        """Return ``True`` if *state* was decorated with ``@sre_state(user_allowed=True)``."""
771        for klass in cls.__mro__:
772            fn = klass.__dict__.get(state)
773            if fn is not None and getattr(fn, '_is_sre_state', False):
774                return getattr(fn, '_sre_state_user_allowed', False)
775        return False

Return True if state was decorated with @sre_state(user_allowed=True).

@classmethod
def get_user_allowed_states(cls):
777    @classmethod
778    def get_user_allowed_states(cls):
779        """Return ``{state_name: description}`` for all user-allowed states."""
780        result = {}
781        for state in cls.get_state_methods():
782            for klass in cls.__mro__:
783                fn = klass.__dict__.get(state)
784                if fn is not None and getattr(fn, '_is_sre_state', False):
785                    if getattr(fn, '_sre_state_user_allowed', False):
786                        result[state] = getattr(fn, '_sre_state_description', '')
787                    break
788        return result

Return {state_name: description} for all user-allowed states.

def get_data(self):
790    def get_data(self):
791        return self.data
def get_machine(self, machine_name):
793    def get_machine(self, machine_name):
794        """Return the :class:`Machine` with *machine_name*, or ``None`` if not found."""
795        if not hasattr(self, machine_name):
796            return None
797        machine = getattr(self, machine_name)
798        if not isinstance(machine, Machine):
799            return None
800        return machine

Return the Machine with machine_name, or None if not found.

def get_network(self, network_name):
802    def get_network(self, network_name):
803        """Return the :class:`Network` with *network_name*, or ``None`` if not found."""
804        if not hasattr(self, network_name):
805            return None
806        machine = getattr(self, network_name)
807        if not isinstance(machine, Network):
808            return None
809        return machine

Return the Network with network_name, or None if not found.

def get_machines(self):
811    def get_machines(self):
812        for _, value in self.__dict__.items():
813            if isinstance(value, Machine):
814                yield value
def get_machine_names(self):
816    def get_machine_names(self):
817        return [m.name for m in self.get_machines()]
def has_privileged_machines(self):
819    def has_privileged_machines(self):
820        return any(m.privileged for m in self.get_machines())
def get_accessible_machine_names(self):
822    def get_accessible_machine_names(self):
823        return [m.name for m in self.get_machines()
824                if not m.hidden and m.allow_connection]
def get_visible_machine_names(self):
826    def get_visible_machine_names(self):
827        return [m.name for m in self.get_visibles_machines()]
def get_visibles_machines(self):
829    def get_visibles_machines(self):
830        for _, value in self.__dict__.items():
831            if isinstance(value, Machine):
832                if not value.hidden:
833                    yield value
def get_networks(self):
835    def get_networks(self):
836        for _, value in self.__dict__.items():
837            if isinstance(value, Network):
838                yield value
def get_topology(self):
840    def get_topology(self):
841        return self._resolve_spec('_topology', 'topology')
def get_machine_specs(self):
843    def get_machine_specs(self):
844        return self._resolve_spec('_machine_specs', 'machine_specs')
def get_network_specs(self):
846    def get_network_specs(self):
847        return self._resolve_spec('_network_specs', 'network_specs')
def cmd(self, machine, command, step=1):
849    def cmd(self, machine, command, step=1):
850        """Register a shell command to execute inside *machine*'s container at *step*."""
851        if SRE.args.debug:
852            print(f"[state] [{machine}] CMD (step={step}): {command}", file=sys.stderr)
853        self._ops.setdefault(step, {}).setdefault(machine, []).append(command)

Register a shell command to execute inside machine's container at step.

def host_cmd(self, command, step=1):
855    def host_cmd(self, command, step=1):
856        """Register a shell command to execute on the **host** (not inside a container) at *step*.
857
858        Requires ``params.execute_commands_on_host`` to be enabled; aborts otherwise.
859        """
860        if params.execute_commands_on_host is False:
861            sys.exit("host_cmd() is disabled by params.execute_commands_on_host")
862        if SRE.args.debug:
863            print(f"[state] HOST_CMD (step={step}): {command}", file=sys.stderr)
864        self._host_ops.setdefault(step, []).append(_HostCmdOp(command))

Register a shell command to execute on the host (not inside a container) at step.

Requires params.execute_commands_on_host to be enabled; aborts otherwise.

def host_callback(self, callback, step=1):
866    def host_callback(self, callback, step=1):
867        """Register a Python callable to invoke on the host at *step* (called with no arguments)."""
868        if SRE.args.debug:
869            print(f"[state] HOST_CALLBACK (step={step}): {getattr(callback, '__name__', repr(callback))}",
870                  file=sys.stderr)
871        self._host_ops.setdefault(step, []).append(_HostCallbackOp(callback))

Register a Python callable to invoke on the host at step (called with no arguments).

def cp_from_host( self, src: str, machine: str, dest: str, owner: str = 'root:root', permissions: int = None, mtime: float = None, step=1):
873    def cp_from_host(self, src: str, machine: str, dest: str, owner: str = "root:root", permissions: int = None,
874                     mtime: float = None, step=1):
875        """Register a file copy from the host into *machine*'s container at *step*.
876
877        *src* may be relative (resolved against ``params.files_dir``).
878        *dest* is the absolute path inside the container.
879        """
880        orig_path = Path(src)
881        if not orig_path.is_absolute():
882            orig_path = Path(params.files_dir(self.running_lab_name)) / orig_path
883        if SRE.args.debug:
884            print(f"[state] [{machine}] CP (step={step}): {orig_path} -> {dest} (owner={owner})", file=sys.stderr)
885        self._ops.setdefault(step, {}).setdefault(machine, []).append(
886            _CpFromHostOp(orig_path, dest, permissions, owner, mtime)
887        )

Register a file copy from the host into machine's container at step.

src may be relative (resolved against params.files_dir). dest is the absolute path inside the container.

def cp_to_host( self, machine: str, path: str, dest: str, permissions: int = None, step=1):
889    def cp_to_host(self, machine: str, path: str, dest: str, permissions: int = None, step=1):
890        """Copy file `path` from `machine` to the host files_dir.
891
892        Args:
893            machine:     container name
894            path:        absolute path of the file inside the container
895            dest:        relative destination path inside params.files_dir(running_lab_name)
896            permissions: file mode to apply on the host copy (default: None, leave as written)
897            step:        execution step (default 1)
898        """
899        files_dir = Path(params.files_dir(self.running_lab_name)).resolve()
900        dest_path = (files_dir / dest).resolve()
901        if not dest_path.is_relative_to(files_dir):
902            error_quit(f"cp_to_host: dest '{dest}' is outside files_dir '{files_dir}'")
903        if SRE.args.debug:
904            print(f"[state] [{machine}] CP_TO_HOST (step={step}): {path} -> {dest_path}", file=sys.stderr)
905        self._ops.setdefault(step, {}).setdefault(machine, []).append(
906            _CpToHostOp(path, str(dest_path), permissions)
907        )

Copy file path from machine to the host files_dir.

Arguments:
  • machine: container name
  • path: absolute path of the file inside the container
  • dest: relative destination path inside params.files_dir(running_lab_name)
  • permissions: file mode to apply on the host copy (default: None, leave as written)
  • step: execution step (default 1)
def file( self, machine, filename, content, permissions=420, owner='root:root', mtime=None, step=1):
909    def file(self, machine, filename, content, permissions=0o644, owner="root:root", mtime=None, step=1):
910        """Register a file to create/overwrite on `machine` during the current state.
911
912        Args:
913            machine:     machine name (str)
914            filename:    absolute path inside the container (e.g. "/etc/myconfig")
915            content:     file content (str or bytes)
916            permissions: octal mode (default 0o644)
917            owner:       "user:group" string (default "root:root")
918            mtime:       modification time as a Unix timestamp (float); defaults to now
919            step:        execution step (default 1); higher steps run after lower ones
920        """
921        import time as _time
922        raw = content.encode() if isinstance(content, str) else content
923        if SRE.args.debug:
924            print(
925                f"[state] [{machine}] FILE (step={step}): {filename} (permissions={permissions:#o}, owner={owner}, size={len(raw)}B)",
926                file=sys.stderr)
927        self._ops.setdefault(step, {}).setdefault(machine, []).append(
928            _FileOp(filename, raw, permissions, owner, mtime if mtime is not None else _time.time())
929        )

Register a file to create/overwrite on machine during the current state.

Arguments:
  • machine: machine name (str)
  • filename: absolute path inside the container (e.g. "/etc/myconfig")
  • content: file content (str or bytes)
  • permissions: octal mode (default 0o644)
  • owner: "user:group" string (default "root:root")
  • mtime: modification time as a Unix timestamp (float); defaults to now
  • step: execution step (default 1); higher steps run after lower ones
def append_to_file( self, machine, filename, content, permissions=None, owner=None, mtime=None, step=1):
931    def append_to_file(self, machine, filename, content, permissions=None, owner=None, mtime=None, step=1):
932        """Append content to a file on `machine`; create the file if it does not exist.
933
934        Args:
935            machine:     machine name (str)
936            filename:    absolute path inside the container (e.g. "/etc/hosts")
937            content:     content to append (str or bytes)
938            permissions: octal mode to set after appending (default: leave unchanged)
939            owner:       "user:group" to set after appending (default: leave unchanged)
940            mtime:       modification time as a Unix timestamp (default: leave unchanged)
941            step:        execution step (default 1); higher steps run after lower ones
942        """
943        raw = content.encode() if isinstance(content, str) else content
944        if SRE.args.debug:
945            print(f"[state] [{machine}] APPEND (step={step}): {filename} (size={len(raw)}B)", file=sys.stderr)
946        self._ops.setdefault(step, {}).setdefault(machine, []).append(
947            _AppendOp(filename, raw, permissions, owner, mtime)
948        )

Append content to a file on machine; create the file if it does not exist.

Arguments:
  • machine: machine name (str)
  • filename: absolute path inside the container (e.g. "/etc/hosts")
  • content: content to append (str or bytes)
  • permissions: octal mode to set after appending (default: leave unchanged)
  • owner: "user:group" to set after appending (default: leave unchanged)
  • mtime: modification time as a Unix timestamp (default: leave unchanged)
  • step: execution step (default 1); higher steps run after lower ones
def idempotent_append_to_file( self, machine, filename, content, permissions=None, owner=None, mtime=None, step=1):
950    def idempotent_append_to_file(self, machine, filename, content, permissions=None, owner=None, mtime=None, step=1):
951        """Append content to a file on `machine` only if the file does not already end with it.
952
953        Idempotent version of append_to_file: safe to call multiple times — the content
954        is appended at most once per application.  If permissions, owner, or mtime are
955        provided they are applied regardless of whether content was appended.
956
957        Args:
958            machine:     machine name (str)
959            filename:    absolute path inside the container (e.g. "/etc/hosts")
960            content:     content to append (str or bytes)
961            permissions: octal mode to set after the check (default: leave unchanged)
962            owner:       "user:group" to set after the check (default: leave unchanged)
963            mtime:       modification time as a Unix timestamp (default: leave unchanged)
964            step:        execution step (default 1); higher steps run after lower ones
965        """
966        raw = content.encode() if isinstance(content, str) else content
967        if SRE.args.debug:
968            print(f"[state] [{machine}] IDEMPOTENT_APPEND (step={step}): {filename} (size={len(raw)}B)",
969                  file=sys.stderr)
970        self._ops.setdefault(step, {}).setdefault(machine, []).append(
971            _IdempotentAppendOp(filename, raw, permissions, owner, mtime)
972        )

Append content to a file on machine only if the file does not already end with it.

Idempotent version of append_to_file: safe to call multiple times — the content is appended at most once per application. If permissions, owner, or mtime are provided they are applied regardless of whether content was appended.

Arguments:
  • machine: machine name (str)
  • filename: absolute path inside the container (e.g. "/etc/hosts")
  • content: content to append (str or bytes)
  • permissions: octal mode to set after the check (default: leave unchanged)
  • owner: "user:group" to set after the check (default: leave unchanged)
  • mtime: modification time as a Unix timestamp (default: leave unchanged)
  • step: execution step (default 1); higher steps run after lower ones
def compute_state_ops(self, state):
974    def compute_state_ops(self, state):
975        self._ops = {}
976        self._host_ops = {}
977        if not hasattr(self, state):
978            error_quit(f"state method {state} does not exist")
979        method = getattr(self, state)
980        if not callable(method):
981            error_quit(f"state method {state} not callable")
982        try:
983            method()
984        except Exception as e:
985            error_quit(f"error during {state} execution: {e}")
986        return self._ops, self._host_ops
def get_new_lab_from_scheme(self):
 988    def get_new_lab_from_scheme(self):
 989        lab = Lab(name=self.running_lab_name)
 990        abb_lab_name = params.get_abbreviated_lab_name_from_running_lab_name(self.running_lab_name)
 991        _used_ports: set = set()
 992        xauth_cookie = _resolve_xauth_cookie()
 993        for m in self.get_machines():
 994            m.ports = [_resolve_port(p, _used_ports) for p in m.ports]
 995            m.envs[f"{params.sre_name_env_variable}={abb_lab_name}"] = True
 996            if m.x11_host:
 997                m.envs[f"{params.sre_host_ip_env_variable}={params.sre_host_ip}"] = True
 998                if xauth_cookie:
 999                    m.envs[f"{params.sre_xauth_cookie_env_variable}={xauth_cookie}"] = True
1000            volumes = []
1001            if len(m.volumes) > 0:
1002                has_private_volume = False
1003                for v in m.volumes:
1004                    if len(v) > 2 and v[3] == 'private':
1005                        has_private_volume = True
1006                        break
1007
1008                if has_private_volume and params.disable_volume_mount_on_root_partition:
1009                    private_mount_dir = self.get_private_mount_dir()
1010                    p = Path(private_mount_dir)
1011                    while not p.exists():
1012                        p = p.parent
1013                    if os.stat(p).st_dev == os.stat('/').st_dev:
1014                        error_quit(
1015                            f"Machine '{m.name}': private mount dir '{private_mount_dir}' is on the root partition (disable_volume_mount_on_root_partition=True)")
1016
1017                for v in m.volumes:
1018                    if len(v) > 2 and v[3] == 'private':
1019                        if v[0].startswith('/'):
1020                            error_quit(f"volume {v[0]} is private and have an absolute path")
1021                        v_host = f"{self.get_private_mount_dir()}/{v[0]}"
1022                        os.makedirs(v_host, exist_ok=True)
1023                        os.chmod(v_host, 0o777)
1024                    else:
1025                        if v[0].startswith('/'):
1026                            v_host = v[0]
1027                        else:
1028                            v_host = f"{self.get_user_public_dir()}/{v[0]}"
1029                            os.makedirs(v_host, exist_ok=True)
1030                            os.chmod(v_host, 0o777)
1031                    if v[2] not in ['rw', 'ro']:
1032                        error_quit(f"volume mode {v[2]} not supported (only rw and ro)")
1033
1034                    volumes.append(f"{v_host}|{v[1]}|{v[2]}")
1035            lab.new_machine(m.name,
1036                            exec_commands=m.exec_commands,
1037                            sysctls=m.sysctls,
1038                            envs=m.envs,
1039                            ports=m.ports,
1040                            ulimits=m.ulimits,
1041                            volumes=volumes,
1042                            shell=m.kathara_shell,
1043                            image=m.image,
1044                            bridged=m.bridged,
1045                            privileged=m.privileged,
1046                            entrypoint=m.entrypoint,
1047                            )
1048            for net, netAdapter in m.net_adapters.items():
1049                lab.connect_machine_to_link(m.name, net.name, machine_iface_number=netAdapter.interface,
1050                                            mac_address=str(netAdapter.mac).replace('-',
1051                                                                                    ':').lower() if netAdapter.mac is not None else None)
1052        self.lab_hash = lab.hash
1053        return lab
def get_lab_hash(self):
1055    def get_lab_hash(self):
1056        if self.lab_hash is None:
1057            lab = Lab(name=self.running_lab_name)
1058            self.lab_hash = lab.hash
1059        return self.lab_hash
def get_lab_from_kathara(self):
1061    def get_lab_from_kathara(self):
1062        lab_hash = self.get_lab_hash()
1063        return Kathara.get_instance().get_lab_from_api(lab_hash=lab_hash)
def get_public_lab_dir(self):
1065    def get_public_lab_dir(self):
1066        return params.public_lab_dir(self.running_lab_name)
def get_private_lab_dir(self):
1068    def get_private_lab_dir(self):
1069        return params.private_lab_dir(self.running_lab_name)
def get_files_dir(self):
1071    def get_files_dir(self):
1072        return params.files_dir(self.running_lab_name)
def get_private_mount_dir(self):
1074    def get_private_mount_dir(self):
1075        return params.private_mount_dir(self.running_lab_name)
def get_user_public_dir(self):
1077    def get_user_public_dir(self):
1078        try:
1079            return Path(params.link_to_user_public_dir(self.running_lab_name)).readlink()
1080        except OSError:
1081            return Path(params.link_to_user_public_dir(self.running_lab_name))
def get_shared_dir(self):
1083    def get_shared_dir(self):
1084        return f"{self.get_user_public_dir()}/{params.shared_dir_name}"
def answers_dir(self):
1086    def answers_dir(self):
1087        return params.answers_dir(self.running_lab_name)
def answers_file(self):
1089    def answers_file(self):
1090        return params.answers_filename(self.running_lab_name)
class NetAdapter:
1093class NetAdapter:
1094    def __init__(self, network, machine, interface, mac=None, addresses=None):
1095        if addresses is None:
1096            addresses = []
1097        if not isinstance(machine, Machine):
1098            raise ValueError("machine must an object of class Machine")
1099        if not isinstance(network, Network):
1100            raise ValueError("network must be an instance of Network")
1101        if mac is not None:
1102            first_byte = int(str(mac).replace('-', ':').split(':')[0], 16)
1103            if first_byte & 1:
1104                raise ValueError(
1105                    f"MAC address {mac} on machine '{machine.name}': "
1106                    f"multicast bit (LSB of first byte) is set — use a unicast address "
1107                    f"(first byte must be even, e.g. replace '{hex(first_byte)}' with '{hex(first_byte & 0xfe)}')."
1108                )
1109        self.network = network
1110        self.machine = machine
1111        self.interface = interface
1112        self.mac = mac
1113        self.addresses = addresses
1114        machine.net_adapters[network] = self
1115        network.net_adapters[machine] = self
NetAdapter(network, machine, interface, mac=None, addresses=None)
1094    def __init__(self, network, machine, interface, mac=None, addresses=None):
1095        if addresses is None:
1096            addresses = []
1097        if not isinstance(machine, Machine):
1098            raise ValueError("machine must an object of class Machine")
1099        if not isinstance(network, Network):
1100            raise ValueError("network must be an instance of Network")
1101        if mac is not None:
1102            first_byte = int(str(mac).replace('-', ':').split(':')[0], 16)
1103            if first_byte & 1:
1104                raise ValueError(
1105                    f"MAC address {mac} on machine '{machine.name}': "
1106                    f"multicast bit (LSB of first byte) is set — use a unicast address "
1107                    f"(first byte must be even, e.g. replace '{hex(first_byte)}' with '{hex(first_byte & 0xfe)}')."
1108                )
1109        self.network = network
1110        self.machine = machine
1111        self.interface = interface
1112        self.mac = mac
1113        self.addresses = addresses
1114        machine.net_adapters[network] = self
1115        network.net_adapters[machine] = self
network
machine
interface
mac
addresses
class Network:
1118class Network:
1119    def __init__(self, name, color=None, shape=None):
1120        self.name = name
1121        self.color = color
1122        self.shape = shape
1123        self.net_adapters = {}
1124
1125    def get_machines(self):
1126        for m in self.net_adapters.keys():
1127            yield m
Network(name, color=None, shape=None)
1119    def __init__(self, name, color=None, shape=None):
1120        self.name = name
1121        self.color = color
1122        self.shape = shape
1123        self.net_adapters = {}
name
color
shape
net_adapters
def get_machines(self):
1125    def get_machines(self):
1126        for m in self.net_adapters.keys():
1127            yield m
class Machine:
1130class Machine:
1131    def __init__(self, name, image=params.default_docker_image, bridged=False, x11_host=False, mem="",
1132                 cpus=None, ipv6=None,
1133                 exec_commands=[],
1134                 sysctls={},
1135                 envs={},
1136                 ports=None, ulimits={}, volumes=[],
1137                 shell=None,
1138                 kathara_shell=None,
1139                 privileged=None, entrypoint=None, args=[],
1140                 hidden=False,
1141                 allow_connection=True,
1142                 color=None,
1143                 shape=None):
1144        if ports is None:
1145            ports = []
1146        self.name = name
1147        self.image = image
1148        self.bridged = bridged
1149        self.x11_host = x11_host
1150        self.mem = mem
1151        self.cpus = cpus
1152        self.ipv6 = ipv6
1153        self.exec_commands = exec_commands
1154        self.sysctls = sysctls
1155        # Copy: the constructor's mutable default {} is shared across Machines;
1156        # post-construction mutations in get_new_lab_from_scheme (e.g. SRE_HOST_IP
1157        # for x11_host machines) must not leak into machines that didn't opt in.
1158        self.envs = dict(envs)
1159        self.ports = ports
1160        self.ulimits = ulimits
1161        self.volumes = volumes
1162        self.shell = shell
1163        self.kathara_shell = kathara_shell
1164        self.privileged = privileged
1165        self.entrypoint = entrypoint
1166        self.args = args
1167        self.hidden = hidden
1168        self.allow_connection = allow_connection
1169        self.shape = shape
1170        self.color = color
1171
1172        self.net_adapters = {}
1173
1174        if params.disable_volume_mount_on_root_partition:
1175            for v in self.volumes:
1176                v_host = v[0]
1177                if v_host.startswith('/'):
1178                    p = Path(v_host)
1179                    while not p.exists():
1180                        p = p.parent
1181                    if os.stat(p).st_dev == os.stat('/').st_dev:
1182                        error_quit(
1183                            f"Machine '{self.name}': volume host path '{v_host}' is on the root partition (disable_volume_mount_on_root_partition=True)")
1184
1185        if self.privileged and not params.allow_privileged_machines:
1186            error_quit(
1187                f"Machine '{self.name}': privileged mode is disabled (allow_privileged_machines=False). Can't run in privileged mode")
1188
1189        if not self.bridged and len(self.ports) > 0:
1190            error_quit("To add ports in a machine, you need to activate bridged mode")
Machine( name, image='sysreseval/base:1.28', bridged=False, x11_host=False, mem='', cpus=None, ipv6=None, exec_commands=[], sysctls={}, envs={}, ports=None, ulimits={}, volumes=[], shell=None, kathara_shell=None, privileged=None, entrypoint=None, args=[], hidden=False, allow_connection=True, color=None, shape=None)
1131    def __init__(self, name, image=params.default_docker_image, bridged=False, x11_host=False, mem="",
1132                 cpus=None, ipv6=None,
1133                 exec_commands=[],
1134                 sysctls={},
1135                 envs={},
1136                 ports=None, ulimits={}, volumes=[],
1137                 shell=None,
1138                 kathara_shell=None,
1139                 privileged=None, entrypoint=None, args=[],
1140                 hidden=False,
1141                 allow_connection=True,
1142                 color=None,
1143                 shape=None):
1144        if ports is None:
1145            ports = []
1146        self.name = name
1147        self.image = image
1148        self.bridged = bridged
1149        self.x11_host = x11_host
1150        self.mem = mem
1151        self.cpus = cpus
1152        self.ipv6 = ipv6
1153        self.exec_commands = exec_commands
1154        self.sysctls = sysctls
1155        # Copy: the constructor's mutable default {} is shared across Machines;
1156        # post-construction mutations in get_new_lab_from_scheme (e.g. SRE_HOST_IP
1157        # for x11_host machines) must not leak into machines that didn't opt in.
1158        self.envs = dict(envs)
1159        self.ports = ports
1160        self.ulimits = ulimits
1161        self.volumes = volumes
1162        self.shell = shell
1163        self.kathara_shell = kathara_shell
1164        self.privileged = privileged
1165        self.entrypoint = entrypoint
1166        self.args = args
1167        self.hidden = hidden
1168        self.allow_connection = allow_connection
1169        self.shape = shape
1170        self.color = color
1171
1172        self.net_adapters = {}
1173
1174        if params.disable_volume_mount_on_root_partition:
1175            for v in self.volumes:
1176                v_host = v[0]
1177                if v_host.startswith('/'):
1178                    p = Path(v_host)
1179                    while not p.exists():
1180                        p = p.parent
1181                    if os.stat(p).st_dev == os.stat('/').st_dev:
1182                        error_quit(
1183                            f"Machine '{self.name}': volume host path '{v_host}' is on the root partition (disable_volume_mount_on_root_partition=True)")
1184
1185        if self.privileged and not params.allow_privileged_machines:
1186            error_quit(
1187                f"Machine '{self.name}': privileged mode is disabled (allow_privileged_machines=False). Can't run in privileged mode")
1188
1189        if not self.bridged and len(self.ports) > 0:
1190            error_quit("To add ports in a machine, you need to activate bridged mode")
name
image
bridged
x11_host
mem
cpus
ipv6
exec_commands
sysctls
envs
ports
ulimits
volumes
shell
kathara_shell
privileged
entrypoint
args
hidden
allow_connection
shape
color
net_adapters
class Grade0:
1193class Grade0:
1194    """Base class for lab evaluation logic.
1195
1196    Subclasses override :meth:`grade` to *register* tests and questions.  The
1197    actual execution happens later in :meth:`run_tests`, which drives a
1198    multi-step loop:
1199
1200    1. Call :meth:`grade` to register commands (they return placeholder values).
1201    2. Execute all registered commands in containers (via :mod:`exetests`).
1202    3. Repeat for each additional step (``self.max_step``).
1203    4. Call :meth:`grade` a final time — commands now return real results.
1204    5. Save the archive.
1205
1206    Key instance attributes available inside ``grade()``:
1207
1208    * ``self.data`` — the ``Data0`` instance (via ``net_scheme``).
1209    * ``self.step`` — current step number (1-based).
1210    * ``self.max_step`` — highest step number registered so far.
1211    """
1212
1213    def __init__(self, net_scheme):
1214        self.net_scheme = net_scheme
1215        self.archive_dirs = []
1216        self.files_to_save_in_archives = []
1217        self._tests = None
1218        self._allow_errors_in_tests = None
1219        self.step = 0
1220        self.max_step = 1
1221        self._questions = None
1222        self._questions_order = None
1223        self._questions_current_order = None
1224        self._cheat_answers = {}
1225        self._answers = {}
1226        self._grade_list = []
1227        self._grades = {}
1228        self._grade_parts: list[GradePart] = []
1229        self._errors = []
1230        self._total_grade_self_eval = 0
1231        self._total_max_self_eval = 0
1232        self._total_grade_exo_eval = 0
1233        self._total_max_exo_eval = 0
1234        self._maximum_mark = params.default_maximum_mark
1235        self._use_numerical_marks = params.use_numerical_marks_by_default
1236        self._display_marks_in_auto_evaluations = params.display_marks_in_auto_evaluations_by_default
1237        self._mark_self_eval = None
1238        self._mark_exo_eval = None
1239        self._section_counter = []
1240        self.section_fmt = [('R', 1), ('N', 1), ('l', 2), ('N', 3)]
1241        self._eval_date = None
1242        self._re_eval_date = None
1243        self._default_language = 'en'
1244        self.auto_eval_count = 0
1245        self._exam_json = None
1246        self.full_reset()
1247
1248    def full_reset(self):
1249        """Reset all state including tests and answers (called from ``__init__``)."""
1250        self.step = 0
1251        self.max_step = 1
1252        self._tests = {}  # keys : (machine,step)->(cmd, timeout)->(result, code))
1253        self._allow_errors_in_tests = {}
1254        self._host_tests = {}  # step -> {(command, timeout): (result, code)}
1255        self._allow_errors_in_host_tests = {}  # (step, command, timeout) -> True
1256        self._answers = {}  # hash -> answer
1257        self._eval_date = None
1258        self.reset_before_grade()
1259
1260    def reset_before_grade(self):
1261        """Clear questions, grade list, and section counters before each call to :meth:`grade`."""
1262        self._questions = dict()  # hash->Question
1263        self._questions_order = dict()  # order -> [Question1, Question2, ...]
1264        self._questions_current_order = 100
1265        self._cheat_answers = {}  # state->(question_hash->answer)
1266        self._grade_list = []
1267        self._grades = dict()  # title -> GradeElement
1268        self._grade_parts = []
1269        self._section_counter = []
1270
1271    def grade(self):
1272        """Override to register tests, questions, and grade elements.
1273
1274        Called multiple times during :meth:`run_tests` — once per step plus a
1275        final time when all results are available.  Do **not** produce side-effects
1276        here; only call ``self.test()``, ``self.question_*()`` and
1277        ``self.add_grade_element()`` / ``self.set_grade()``.
1278        """
1279        self._grade_list = []
1280        self._grade_parts = []
1281
1282    def _compute_mark(self, total_grade, total_max):
1283        """Compute a mark from a ``(total_grade, total_max)`` pair.
1284
1285        Returns ``None`` if ``total_max == 0``.
1286        Numerical mode (default): rounded to one decimal, scaled to ``_maximum_mark``.
1287        Letter mode: A+/A/B/C/D/F.
1288        """
1289        if total_max == 0:
1290            return None
1291        if self._use_numerical_marks:
1292            return math.ceil(10 * self._maximum_mark * total_grade / total_max) / 10
1293        else:
1294            ratio = total_grade / total_max
1295            if ratio >= 18 / 20:
1296                return "A+"
1297            elif ratio >= 16 / 20:
1298                return "A"
1299            elif ratio >= 14 / 20:
1300                return "B"
1301            elif ratio >= 12 / 20:
1302                return "C"
1303            elif ratio >= 10 / 20:
1304                return "D"
1305            else:
1306                return "F"
1307
1308    def mark_self_eval(self):
1309        """Final mark over elements visible in self-eval (scope & SELF_EVAL_SCOPE)."""
1310        return self._compute_mark(self._total_grade_self_eval, self._total_max_self_eval)
1311
1312    def mark_exo_eval(self):
1313        """Final mark over elements visible in non-auto eval / outline / sheet."""
1314        return self._compute_mark(self._total_grade_exo_eval, self._total_max_exo_eval)
1315
1316    def get_data(self):
1317        return self.net_scheme.get_data()
1318
1319    def get_errors(self):
1320        return self._errors
1321
1322    def get_grade_list(self):
1323        return self._grade_list
1324
1325    def get_grade_parts(self):
1326        return self._grade_parts
1327
1328    def get_answers(self):
1329        return self._answers
1330
1331    def get_cheat_answers(self, state: str):
1332        return self._cheat_answers.get(state)
1333
1334    def get_tests(self):
1335        return self._tests
1336
1337    def get_exetests_strings(self, step: int):
1338        result = dict()
1339        for (machine, step1) in self._tests.keys():
1340            if step != step1:
1341                continue
1342            result[machine] = params.exetests_separator.join(
1343                [f"{timeout}:{cmd}" for (cmd, timeout) in self._tests[(machine, step)].keys()])
1344        return result
1345
1346    def get_running_lab_name(self):
1347        return self.net_scheme.running_lab_name
1348
1349    def increment_section_counter(self, level: int):
1350        if len(self._section_counter) < level + 1:
1351            self._section_counter += [0 for i in range(level + 1 - len(self._section_counter))]
1352        else:
1353            self._section_counter = self._section_counter[:(level + 1)]
1354        self._section_counter[level] += 1
1355
1356    def set_section_counter(self, level: int, value: int):
1357        if len(self._section_counter) < level + 1:
1358            self._section_counter += [0 for i in range(level + 1 - len(self._section_counter))]
1359        else:
1360            self._section_counter = self._section_counter[:(level + 1)]
1361        self._section_counter[level] = value
1362
1363    def section(self, level: int = 0, fmt=None, show: int = None, pad: str = None):
1364        self.increment_section_counter(level)
1365        return self.current_section(level, fmt, show, pad)
1366
1367    def current_section(self, level: int = 0, fmt=None, show: int = None, pad: str = None):
1368        # fmt is a list of (type, depth[, prefix]) tuples, one per level.
1369        # type: R=ROMAN, r=roman, L=Letters, l=letters, N=number
1370        # depth: how many consecutive counters to display, ending at this level
1371        # prefix: optional string prepended to the result (default: '')
1372        # e.g. [('R',1),('N',2,' ')] → level 0: "I." / level 1: " I.1."
1373        if fmt is None:
1374            fmt = self.section_fmt
1375
1376        def _to_roman(n: int) -> str:
1377            vals = [
1378                (1000, 'M'), (900, 'CM'), (500, 'D'), (400, 'CD'),
1379                (100, 'C'), (90, 'XC'), (50, 'L'), (40, 'XL'),
1380                (10, 'X'), (9, 'IX'), (5, 'V'), (4, 'IV'), (1, 'I'),
1381            ]
1382            result = ''
1383            for value, numeral in vals:
1384                while n >= value:
1385                    result += numeral
1386                    n -= value
1387            return result
1388
1389        def _to_capital_letter(n: int) -> str:
1390            result = ''
1391            while n > 0:
1392                n, r = divmod(n - 1, 26)
1393                result = chr(ord('A') + r) + result
1394            return result
1395
1396        def _to_lowercase_letter(n: int) -> str:
1397            result = ''
1398            while n > 0:
1399                n, r = divmod(n - 1, 26)
1400                result = chr(ord('a') + r) + result
1401            return result
1402
1403        def _convert(n: int, fmt_type: str) -> str:
1404            match fmt_type:
1405                case 'R':
1406                    return _to_roman(n)
1407                case 'r':
1408                    return _to_roman(n).lower()
1409                case 'L':
1410                    return _to_capital_letter(n)
1411                case 'l':
1412                    return _to_lowercase_letter(n)
1413                case _:
1414                    return str(n)
1415
1416        entry = fmt[level] if level < len(fmt) else ('N', level + 1)
1417        _, depth = entry[:2]
1418        prefix = pad if pad is not None else (entry[2] if len(entry) > 2 else '')
1419        if show is not None:
1420            depth = show
1421        start = max(0, level - depth + 1)
1422        parts = [_convert(self._section_counter[j] if j < len(self._section_counter) else 0,
1423                          fmt[j][0] if j < len(fmt) else 'N')
1424                 for j in range(start, level + 1)]
1425        return prefix + ".".join(parts) + ". "
1426
1427    def load_answers(self):
1428        """Load student answers from ``answers.json`` into ``self._answers``."""
1429        self._answers = {}
1430        try:
1431            fd = os.open(params.answers_filename(running_lab_name=self.get_running_lab_name()),
1432                         os.O_RDONLY | os.O_NOFOLLOW)
1433            with os.fdopen(fd) as f:
1434                self._answers = json.load(f)
1435        except (OSError, json.JSONDecodeError):
1436            self._answers = {}
1437
1438    def get_questions_ordered(self):
1439        q = []
1440        for _, v in sorted(self._questions_order.items()):
1441            q += v
1442        return q
1443
1444    def export_questions(self):
1445        pass
1446
1447    def test_host(self, command, step=1, timeout: int = params.default_timeout, default_value='',
1448                  default_code: int = 0, allow_error: bool = False):
1449        """Register and retrieve the result of a host-side test command.
1450
1451        On the first call (registration pass) returns *default_value* / *default_code*.
1452        On subsequent calls (result pass) returns the actual ``(stdout, exit_code)``.
1453        Set *allow_error* to suppress error recording on non-zero exit.
1454        """
1455        if params.execute_commands_on_host is False:
1456            sys.exit("test_host() is disabled by params.execute_commands_on_host")
1457        if self.max_step < step:
1458            self.max_step = step
1459        if step not in self._host_tests:
1460            self._host_tests[step] = {}
1461        if allow_error and (step, command, timeout) not in self._allow_errors_in_host_tests:
1462            self._allow_errors_in_host_tests[(step, command, timeout)] = True
1463        if (command, timeout) not in self._host_tests[step]:
1464            self._host_tests[step][(command, timeout)] = (default_value, default_code)
1465            return default_value, default_code
1466        return self._host_tests[step][(command, timeout)]
1467
1468    def test(self, machine_name, command, step=1, timeout: int = params.default_timeout, default_value='',
1469             default_code: int = 0, allow_error: bool = False):
1470        """Register and retrieve the result of a command executed inside *machine_name*.
1471
1472        On the first call (registration pass) returns *default_value* / *default_code*.
1473        On subsequent calls (result pass) returns the actual ``(stdout, exit_code)``.
1474        *timeout* is in seconds.  Set *allow_error* to suppress error recording.
1475        """
1476        if self.max_step < step:
1477            self.max_step = step
1478        if (machine_name, step) not in self._tests:
1479            self._tests[(machine_name, step)] = {}
1480        if allow_error and (machine_name, step, command, timeout) not in self._allow_errors_in_tests:
1481            self._allow_errors_in_tests[(machine_name, step, command, timeout)] = True
1482        if (command, timeout) not in self._tests[(machine_name, step)]:
1483            self._tests[(machine_name, step)][(command, timeout)] = (default_value, default_code)
1484            return default_value, default_code
1485        return self._tests[(machine_name, step)][(command, timeout)]
1486
1487    @staticmethod
1488    def _apply_section(section: str, title) -> TranslatedText:
1489        """Prepend *section* to every language value in *title*."""
1490        tt = TranslatedText.from_value(title)
1491        if not section:
1492            return tt
1493        return TranslatedText({lang: section + text for lang, text in tt.items()})
1494
1495    def question_text(self, title, section='', description='', hash=None, order=None, default_answer='',
1496                      cheat_answers=None):
1497        """Register a free-text question and return the student's current answer string.
1498
1499        Returns *default_answer* if the student has not answered yet.
1500        *cheat_answers* maps state names to answer strings used in automated testing.
1501        """
1502        title = self._apply_section(section, title)
1503        if order is None:
1504            order1 = self._questions_current_order
1505            self._questions_current_order = ((self._questions_current_order // 100) + 1) * 100
1506        else:
1507            order1 = order
1508
1509        q = QuestionText(title, description, hash, order1)
1510        if order1 not in self._questions_order:
1511            self._questions_order[order1] = []
1512        self._questions_order[order1].append(q)
1513
1514        if q.question_hash in self._questions:
1515            write_error(f"Duplicate question hash {q.question_hash}")
1516        self._questions[q.question_hash] = q
1517
1518        if cheat_answers is not None:
1519            for state, answer in cheat_answers.items():
1520                if state not in self._cheat_answers:
1521                    self._cheat_answers[state] = {}
1522                self._cheat_answers[state][q.question_hash] = answer
1523
1524        if q.question_hash in self._answers:
1525            return self._answers[q.question_hash]
1526        return default_answer
1527
1528    _FORM_FIELD_RE = _re.compile(r'@@\{([^:}]+):([^}]*)\}@@')
1529
1530    def question_form(self, title, section='', description='', hash=None, order=None, cheat_answers=None):
1531        """Register a form question with inline @@{field_name:regex}@@ fields.
1532
1533        Returns the student's answers as a dict {field_name: value}, or {} if none yet.
1534        cheat_answers format: {state_name: {field_name: value, ...}}
1535        """
1536        title = self._apply_section(section, title)
1537        from .common import QuestionForm
1538
1539        # The @@{field:regex}@@ markers are language-independent, so when the
1540        # description is a TranslatedText (wrapped in tr()) extract fields from
1541        # its resolved string. The full TranslatedText is still stored on the
1542        # question for per-language rendering.
1543        desc_str = (description.resolve('')
1544                    if isinstance(description, TranslatedText) else description)
1545
1546        fields = []
1547        for m in self._FORM_FIELD_RE.finditer(desc_str):
1548            name, spec = m.group(1), m.group(2)
1549            if spec.startswith('>') or '>>>' in spec:
1550                raw = spec[1:] if spec.startswith('>') else spec
1551                fields.append({"name": name, "choices": [
1552                    c.strip().split('>>>')[1].strip() if '>>>' in c.strip() else c.strip()
1553                    for c in raw.split('|')
1554                ]})
1555            elif spec.startswith('?'):
1556                default = spec[1:].strip().lower() not in ('', 'false')
1557                fields.append({"name": name, "checkbox": default})
1558            else:
1559                fields.append({"name": name, "regex": spec})
1560
1561        if order is None:
1562            order1 = self._questions_current_order
1563            self._questions_current_order = ((self._questions_current_order // 100) + 1) * 100
1564        else:
1565            order1 = order
1566
1567        q = QuestionForm(title, description, hash, order1, fields)
1568        if order1 not in self._questions_order:
1569            self._questions_order[order1] = []
1570        self._questions_order[order1].append(q)
1571
1572        if q.question_hash in self._questions:
1573            write_error(f"Duplicate question hash {q.question_hash}")
1574        self._questions[q.question_hash] = q
1575
1576        if cheat_answers is not None:
1577            for state, field_answers in cheat_answers.items():
1578                if state not in self._cheat_answers:
1579                    self._cheat_answers[state] = {}
1580                self._cheat_answers[state][q.question_hash] = json.dumps(
1581                    field_answers, ensure_ascii=False
1582                )
1583
1584        if q.question_hash in self._answers:
1585            try:
1586                return json.loads(self._answers[q.question_hash])
1587            except (json.JSONDecodeError, TypeError):
1588                return {}
1589        return {}
1590
1591    def question_dummy(self, title, section='', description='', hash=None, order=None):
1592        """Register a display-only block (no answer widget shown to the student)."""
1593        title = self._apply_section(section, title)
1594        if order is None:
1595            order1 = self._questions_current_order
1596            self._questions_current_order = ((self._questions_current_order // 100) + 1) * 100
1597        else:
1598            order1 = order
1599
1600        q = QuestionDummy(title, description, hash, order1)
1601        if order1 not in self._questions_order:
1602            self._questions_order[order1] = []
1603        self._questions_order[order1].append(q)
1604
1605        if q.question_hash in self._questions:
1606            write_error(f"Duplicate question hash {q.question_hash}")
1607        self._questions[q.question_hash] = q
1608
1609    def add_grade_part(self, title, description=''):
1610        """Register a new :class:`GradePart` group and return it.
1611
1612        Pass the returned object to ``add_grade_element(..., grade_part=...)``
1613        to associate elements with this part.  Parts are displayed in
1614        registration order (with a subtotal row per part) in the GUI
1615        evaluation view and in ``sre outline`` PDFs.
1616        """
1617        if description:
1618            description = TranslatedText.from_value(description, self._default_language)
1619        gp = GradePart(title=title, description=description)
1620        if any(p.title == title for p in self._grade_parts):
1621            write_error(f"Duplicate grade part title = {title}")
1622        self._grade_parts.append(gp)
1623        return gp
1624
1625    def add_grade_element(self, title, max_grade, description='', grade=0, scope=params.BOTH_EVAL_SCOPE,
1626                          grade_part=None):
1627        """Add a graded rubric item.  Initial *grade* defaults to 0; use :meth:`set_grade` to update it.
1628
1629        ``scope`` is a bitmask: ``SELF_EVAL_SCOPE`` (1) for self-eval only,
1630        ``EXO_EVAL_SCOPE`` (2) for non-auto eval / outline / sheet only,
1631        ``BOTH_EVAL_SCOPE`` (3, default) for both audiences.
1632
1633        ``grade_part`` optionally associates this element with a
1634        :class:`GradePart` previously returned by :meth:`add_grade_part`.
1635        """
1636        if scope not in params.grade_scopes:
1637            raise ValueError(f"Invalid scope {scope!r}; expected one of {params.grade_scopes}")
1638        if description:
1639            description = TranslatedText.from_value(description, self._default_language)
1640        grade_part_title = None
1641        if grade_part is not None:
1642            if not isinstance(grade_part, GradePart):
1643                raise TypeError(f"grade_part must be a GradePart, got {type(grade_part).__name__}")
1644            if grade_part not in self._grade_parts:
1645                write_error(f"Unregistered grade part {grade_part.title!r} passed to add_grade_element")
1646            grade_part_title = grade_part.title
1647        g = GradeElement(title=title, max_grade=max_grade, description=description, grade=grade, scope=scope,
1648                         grade_part=grade_part_title)
1649        self._grade_list.append(g)
1650        key = _tt_hash_str(title)
1651        if key in self._grades:
1652            write_error(f"Duplicate grade title = {title}")
1653        self._grades[key] = g
1654
1655    def set_grade(self, title, grade):
1656        """Set the numeric *grade* for the element previously registered under *title*."""
1657        self._grades[_tt_hash_str(title)].grade = grade
1658
1659    def save_lab_info(self):
1660        debug_project = os.path.exists(
1661            params.debug_project_marker_filename(self.net_scheme.running_lab_name)
1662        )
1663        if debug_project:
1664            visible_machines = list(self.net_scheme.get_machines())
1665        else:
1666            visible_machines = [m for m in self.net_scheme.get_machines() if not m.hidden]
1667        lab_hash = self.net_scheme.get_lab_hash()
1668
1669        def _get_stats(m):
1670            s = next(Kathara.get_instance().get_machine_stats(lab_hash=lab_hash, machine_name=m.name), None)
1671            return m, s
1672
1673        stats_map = {}
1674        with ThreadPoolExecutor(max_workers=min(params.max_docker_concurrency, len(visible_machines))) as executor:
1675            for m, s in executor.map(_get_stats, visible_machines):
1676                stats_map[m.name] = (m, s)
1677
1678        info_machines = []
1679        for m in visible_machines:
1680            m, s = stats_map[m.name]
1681            interfaces = []
1682            for net, netAdapter in m.net_adapters.items():
1683                interfaces.append(InfoInterface(network=net.name, interface_name=f"eth{netAdapter.interface}"))
1684            info_machines.append(
1685                InfoMachine(name=m.name, status=s.status if s is not None else "", hidden=m.hidden,
1686                            bridged=m.bridged,
1687                            x11_host=m.x11_host,
1688                            ports=m.ports,
1689                            allow_connection=m.allow_connection,
1690                            color=m.color or "",
1691                            shape=m.shape or "",
1692                            interfaces=interfaces))
1693
1694        network_colors = {
1695            net.name: net.color
1696            for m in visible_machines
1697            for net in m.net_adapters
1698            if net.color
1699        }
1700        network_shapes = {
1701            net.name: net.shape
1702            for m in visible_machines
1703            for net in m.net_adapters
1704            if net.shape
1705        }
1706
1707        module_rvlab = sys.modules[params.srelab_py_name.removesuffix(".py")]
1708        default_language = getattr(module_rvlab, 'default_language', 'en')
1709        self._default_language = default_language
1710        show_nat_network = getattr(module_rvlab, 'show_nat_network', params.default_show_nat_network)
1711        nat_network_name = getattr(module_rvlab, 'host_network_name', params.default_host_network_name)
1712        nat_network_color = getattr(module_rvlab, 'host_network_color', params.default_host_network_color)
1713        host_network_exploded = getattr(module_rvlab, 'host_network_exploded',
1714                                        params.default_host_network_exploded)
1715        host_network_edge_relative_length = float(getattr(
1716            module_rvlab, 'host_network_edge_relative_length',
1717            params.default_host_network_edge_relative_length))
1718        schema_splines = getattr(module_rvlab, 'schema_splines', params.graphviz_default_splines)
1719        schema_overlap = getattr(module_rvlab, 'schema_overlap', params.graphviz_default_overlap)
1720
1721        if self.step == 0:
1722            self.grade()
1723            self.step += 1
1724
1725        questions = self.get_questions_ordered()
1726
1727        if hasattr(module_rvlab, 'title'):
1728            title = module_rvlab.title
1729        else:
1730            lab_name = params.get_lab_name_from_running_lab_name(self.get_running_lab_name())
1731            title = lab_name.removesuffix('.py')
1732        title = TranslatedText.from_value(title, default_language)
1733
1734        if hasattr(module_rvlab, 'delay_between_self_grade'):
1735            delay_between_self_grade = module_rvlab.delay_between_self_grade
1736        else:
1737            delay_between_self_grade = 0
1738
1739        if hasattr(module_rvlab, 'allow_self_grade'):
1740            allow_self_grade = module_rvlab.allow_self_grade
1741        else:
1742            allow_self_grade = False
1743
1744        if debug_project:
1745            delay_between_self_grade = 0
1746            allow_self_grade = True
1747        if hasattr(module_rvlab, 'export_kathara_project'):
1748            export_kathara_project = module_rvlab.export_kathara_project
1749        else:
1750            export_kathara_project = False
1751
1752        eval_interval_without_exam_mode = getattr(module_rvlab, 'eval_interval_without_exam_mode', params.default_eval_interval_without_exam_mode)
1753        eval_before_exit = getattr(module_rvlab, 'eval_before_exit', False)
1754
1755        informations = TranslatedText.from_value(self.net_scheme.informations, default_language)
1756        for q in questions:
1757            q.title = TranslatedText.from_value(q.title, default_language)
1758            q.description = TranslatedText.from_value(q.description, default_language)
1759
1760        net_scheme_cls = type(self.net_scheme)
1761        if debug_project:
1762            user_allowed_states_raw = {}
1763            for state in net_scheme_cls.get_state_methods():
1764                desc = ''
1765                for klass in net_scheme_cls.__mro__:
1766                    fn = klass.__dict__.get(state)
1767                    if fn is not None and getattr(fn, '_is_sre_state', False):
1768                        desc = getattr(fn, '_sre_state_description', '')
1769                        break
1770                user_allowed_states_raw[state] = desc
1771        else:
1772            user_allowed_states_raw = (
1773                net_scheme_cls.get_user_allowed_states()
1774                if getattr(module_rvlab, 'allow_user_states', False) else {}
1775            )
1776        user_allowed_states = {
1777            state: TranslatedText.from_value(desc, default_language) if desc else desc
1778            for state, desc in user_allowed_states_raw.items()
1779        }
1780
1781        if debug_project:
1782            module_allows_user_states = getattr(module_rvlab, 'allow_user_states', False)
1783            admin_only_states = [
1784                state for state in user_allowed_states_raw
1785                if not module_allows_user_states
1786                or not net_scheme_cls.is_state_user_allowed(state)
1787            ]
1788        else:
1789            admin_only_states = []
1790
1791        info = InfoLab(lab_name=self.net_scheme.lab_name, lab_hash=self.net_scheme.lab_hash,
1792                       title=title, machines=info_machines, delay_between_self_grade=delay_between_self_grade,
1793                       questions=questions, informations=informations,
1794                       export_kathara_project=export_kathara_project, allow_self_grade=allow_self_grade,
1795                       debug_project=debug_project,
1796                       eval_interval_without_exam_mode=eval_interval_without_exam_mode,
1797                       eval_before_exit=eval_before_exit,
1798                       default_language=default_language,
1799                       user_allowed_states=user_allowed_states,
1800                       admin_only_states=admin_only_states,
1801                       network_colors=network_colors,
1802                       network_shapes=network_shapes,
1803                       show_nat_network=show_nat_network,
1804                       nat_network_name=nat_network_name,
1805                       nat_network_color=nat_network_color,
1806                       host_network_exploded=host_network_exploded,
1807                       host_network_edge_relative_length=host_network_edge_relative_length,
1808                       schema_splines=schema_splines,
1809                       schema_overlap=schema_overlap)
1810
1811        info_json = info.to_json()
1812        info_filename = params.info_filename(self.net_scheme.running_lab_name)
1813
1814        save_info_json = None
1815        try:
1816            with open(info_filename, "r") as f:
1817                save_info_json = f.read()
1818        except FileNotFoundError:
1819            pass
1820        if save_info_json == info_json:
1821            return
1822
1823        temp_file = tempfile.NamedTemporaryFile(
1824            delete=False,
1825            dir=Path(self.net_scheme.get_public_lab_dir()).parent)
1826        with open(temp_file.name, "w") as f:
1827            print(info_json, file=f)
1828            f.flush()
1829            os.fsync(f.fileno())
1830            os.chmod(temp_file.name, 0o644)
1831        os.replace(temp_file.name, info_filename)
1832
1833    def add_error(self, error, category=ErrorCategory.ERROR, step: int = 1):
1834        if self.step != step:
1835            return
1836        log_error(error)
1837        self._errors.append((category.value, error))
1838
1839    def add_warning(self, warning, step: int = 1):
1840        self.add_error(warning, category=ErrorCategory.WARNING, step=step)
1841
1842    @staticmethod
1843    def run_tests_on_machine(machine_name, machine, exetests):
1844        environment = {params.exetests_env_name: exetests}
1845        code, output = machine.api_object.exec_run([params.exetests_machines_path],
1846                                                   stdin=False,
1847                                                   stdout=True,
1848                                                   stderr=False,
1849                                                   tty=False,
1850                                                   environment=environment,
1851                                                   )
1852        return machine_name, code, output
1853
1854    def run_tests(self):
1855        self._tests = {}
1856        self._section_counter = []
1857        lab = self.net_scheme.get_lab_from_kathara()
1858        self._errors = []
1859        self.load_answers()
1860        self._eval_date = datetime.datetime.now().isoformat()
1861
1862        while self.step <= self.max_step:
1863            self.reset_before_grade()
1864            self.grade()
1865            self.step += 1
1866            tests = self.get_tests()
1867            exetests_by_machine = self.get_exetests_strings(self.step)
1868            if SRE.args.debug and self.step <= self.max_step:
1869                log_debug(f"Commands step {self.step} (after running grade on step {self.step - 1}):")
1870                for machine, cmds in exetests_by_machine.items():
1871                    c = cmds.split(params.exetests_separator)
1872                    c1 = " - ".join(c)
1873                    log_debug(f"{machine}: {c1}")
1874
1875            results = {}
1876            with ThreadPoolExecutor(
1877                    max_workers=max(1, min(params.max_docker_concurrency, len(lab.machines)))) as executor:
1878                futures_to_machines = {
1879                    executor.submit(
1880                        Grade0.run_tests_on_machine,
1881                        machine_name,
1882                        machine,
1883                        exetests_by_machine[machine_name],
1884                    ): machine_name
1885                    for machine_name, machine in lab.machines.items()
1886                    if machine_name in exetests_by_machine and len(exetests_by_machine[machine_name]) > 0
1887                }
1888                for future in as_completed(futures_to_machines):
1889                    machine_name = futures_to_machines[future]
1890                    try:
1891                        machine_name, code, output = future.result()
1892                        results[machine_name] = (code, output)
1893                    except Exception as e:
1894                        self.add_error(f"error during test execution on machine {machine_name}: {e}", step=self.step)
1895                        continue
1896
1897            for machine_name, (exetests_code, output) in results.items():
1898                if (machine_name, self.step) not in self._tests:
1899                    self._tests[(machine_name, self.step)] = {}
1900                if exetests_code != 0:
1901                    self.add_error(
1902                        f"exetests error on {machine_name}: {exetests_by_machine[machine_name]} -- return code {exetests_code}",
1903                        step=self.step)
1904                output1 = output.decode("utf-8")
1905                separator, _, rest = output1.partition("\n")
1906                output2 = rest.split(f"\n{separator}\n")
1907                for i in range(0, len(output2), 2):
1908                    ligne1 = ""
1909                    try:
1910                        ligne1, date1, result = output2[i].split("\n", 2)
1911                    except ValueError:
1912                        result = ""
1913                    if not ligne1:
1914                        continue
1915                    timeout_s, cmd = ligne1.split(":", 1)
1916                    timeout = int(timeout_s)
1917                    date2, code_s = output2[i + 1].split("\n", 1)
1918                    try:
1919                        code = int(code_s.strip())
1920                    except ValueError:
1921                        code = -2
1922                        self.add_error(f"test error on {machine_name}:{self.step}:{cmd} illegal error code",
1923                                       step=self.step)
1924                    if code != 0:
1925                        if not self._allow_errors_in_tests.get((machine_name, self.step, cmd, timeout), False):
1926                            self.add_error(f"test error on {machine_name}:{cmd} code={code}", step=self.step)
1927                    self._tests[(machine_name, self.step)][cmd, timeout] = (result, code)
1928            if SRE.args.debug:
1929                for machine_name in lab.machines:
1930                    if (machine_name, self.step) not in self._tests:
1931                        continue
1932                    for (cmd, timeout) in self._tests[(machine_name, self.step)]:
1933                        log_debug(f"machine {machine_name} - step {self.step} - command {cmd} - timeout {timeout}:")
1934                        result, code = self._tests[(machine_name, self.step)][cmd, timeout]
1935                        log_debug(result)
1936                        log_debug(f"-------- exit code {code}\n")
1937
1938            host_step_cmds = self._host_tests.get(self.step, {})
1939            if host_step_cmds:
1940                def _run_host_cmd(cmd, t):
1941                    from .utils_privileges import preexec_drop_to_sre
1942                    run_cmd = shlex.split(cmd) if params.execute_commands_on_host == "split" else cmd
1943                    use_shell = params.execute_commands_on_host == "shell"
1944                    try:
1945                        proc = subprocess.run(
1946                            run_cmd, shell=use_shell, capture_output=True, text=True,
1947                            timeout=t if t > 0 else None,
1948                            preexec_fn=preexec_drop_to_sre,
1949                        )
1950                        return cmd, t, proc.stdout, proc.returncode
1951                    except subprocess.TimeoutExpired:
1952                        return cmd, t, '', -1
1953                    except Exception:
1954                        return cmd, t, '', -2
1955
1956                with ThreadPoolExecutor(max_workers=min(params.max_docker_concurrency,
1957                                                        len(host_step_cmds))) as executor:
1958                    futures = {executor.submit(_run_host_cmd, cmd, t): (cmd, t)
1959                               for (cmd, t) in host_step_cmds}
1960                    for future in as_completed(futures):
1961                        cmd, t, result, code = future.result()
1962                        if code != 0:
1963                            if not self._allow_errors_in_host_tests.get((self.step, cmd, t), False):
1964                                self.add_error(f"host test error: {cmd} code={code}", step=self.step)
1965                        self._host_tests[self.step][(cmd, t)] = (result, code)
1966                        if SRE.args.debug:
1967                            log_debug(f"host - step {self.step} - command {cmd} - timeout {t}:")
1968                            log_debug(result)
1969                            log_debug(f"-------- exit code {code}\n")
1970
1971        if SRE.args.debug and len(self._errors) > 0:
1972            log_debug(f"{len(self._errors)} errors:\n " + "\n".join(f"[{e[0]}] {e[1]}" for e in self._errors))
1973        self.compute_total()
1974        self._mark_self_eval = self.mark_self_eval()
1975        self._mark_exo_eval = self.mark_exo_eval()
1976        # log_error(f"DEBUG _tests at save: {len(self._tests)} machine-step entries")
1977        # for (machine, step), cmds in self._tests.items():
1978        #     for (cmd, timeout), (result, code) in cmds.items():
1979        #         log_error(f"  [{machine}][step={step}] cmd={cmd!r} result_len={len(result)} code={code}")
1980
1981    def compute_total(self):
1982        """Accumulate per-scope totals in one pass; BOTH-scope elements contribute to both."""
1983        self._total_grade_self_eval = 0
1984        self._total_max_self_eval = 0
1985        self._total_grade_exo_eval = 0
1986        self._total_max_exo_eval = 0
1987        for g in self._grade_list:
1988            if g.scope & params.SELF_EVAL_SCOPE:
1989                self._total_grade_self_eval += g.grade
1990                self._total_max_self_eval += g.max_grade
1991            if g.scope & params.EXO_EVAL_SCOPE:
1992                self._total_grade_exo_eval += g.grade
1993                self._total_max_exo_eval += g.max_grade
1994
1995    def save_tests(self):
1996        now = datetime.datetime.now()
1997        if self._exam_json is None:
1998            exam_path = Path(params.sre_pub_dir) / params.exam_json_name
1999            try:
2000                self._exam_json = json.loads(exam_path.read_text())
2001            except FileNotFoundError:
2002                pass
2003            except Exception as e:
2004                log_error(f"can't read {exam_path}: {e}")
2005        for d1 in self.archive_dirs:
2006            d = Path(d1).expanduser().resolve()
2007            try:
2008                if not d.exists():
2009                    os.mkdir(d, 0o700)
2010                else:
2011                    os.listdir(d)  # trigger mount / catch stale handle early
2012                    permissions = os.stat(d).st_mode
2013                    if permissions != 0o700:
2014                        os.chmod(d, 0o700)
2015                filename = d / params.get_archive_name(self.net_scheme.running_lab_name, now)
2016                self.save_tests_on_file(str(filename))
2017            except Exception as e:
2018                log_error(f"can't save archive to {d}: {e}")
2019
2020    def save_tests_on_file(self, filename: str):
2021        files_content = {}
2022        if self.files_to_save_in_archives:
2023            fdir = Path(params.files_dir(self.get_running_lab_name()))
2024            if fdir.is_dir():
2025                for pattern in self.files_to_save_in_archives:
2026                    for fpath in fdir.iterdir():
2027                        if fpath.is_file() and fnmatch.fnmatch(fpath.name, pattern):
2028                            try:
2029                                files_content[fpath.name] = fpath.read_bytes()
2030                            except Exception:
2031                                pass
2032        archive = {
2033            params.running_lab_name_keyword: self.get_running_lab_name(),
2034            params.eval_date_keyword: self._eval_date,
2035            params.re_eval_date_keyword: self._re_eval_date,
2036            'data_json': self.get_data().to_json(),
2037            'tests': self.get_tests(),
2038            'errors': self.get_errors(),
2039            'answers': self.get_answers(),
2040            'grade_list': [asdict(e) for e in self._grade_list],
2041            'grade_parts': [asdict(p) for p in self._grade_parts],
2042            'total_grade_self_eval': self._total_grade_self_eval,
2043            'total_max_self_eval': self._total_max_self_eval,
2044            'mark_self_eval': self._mark_self_eval,
2045            'total_grade_exo_eval': self._total_grade_exo_eval,
2046            'total_max_exo_eval': self._total_max_exo_eval,
2047            'mark_exo_eval': self._mark_exo_eval,
2048            'maximum_mark': self._maximum_mark,
2049            'files': files_content,
2050        }
2051        if self._exam_json is not None:
2052            archive[params.exam_json_keyword] = self._exam_json
2053        cctx = zstd.ZstdCompressor(level=6)
2054        with tempfile.NamedTemporaryFile(
2055                mode="wb",
2056                dir=os.path.dirname(filename),
2057                delete=False
2058        ) as f:
2059            with cctx.stream_writer(f) as compressor:
2060                packed = msgpack.packb(archive, use_bin_type=True)
2061                compressor.write(packed)
2062            # f.flush()
2063            # os.fsync(f.fileno())
2064            temp_name = f.name
2065        os.replace(temp_name, filename)

Base class for lab evaluation logic.

Subclasses override grade() to register tests and questions. The actual execution happens later in run_tests(), which drives a multi-step loop:

  1. Call grade() to register commands (they return placeholder values).
  2. Execute all registered commands in containers (via exetests).
  3. Repeat for each additional step (self.max_step).
  4. Call grade() a final time — commands now return real results.
  5. Save the archive.

Key instance attributes available inside grade():

  • self.data — the Data0 instance (via net_scheme).
  • self.step — current step number (1-based).
  • self.max_step — highest step number registered so far.
Grade0(net_scheme)
1213    def __init__(self, net_scheme):
1214        self.net_scheme = net_scheme
1215        self.archive_dirs = []
1216        self.files_to_save_in_archives = []
1217        self._tests = None
1218        self._allow_errors_in_tests = None
1219        self.step = 0
1220        self.max_step = 1
1221        self._questions = None
1222        self._questions_order = None
1223        self._questions_current_order = None
1224        self._cheat_answers = {}
1225        self._answers = {}
1226        self._grade_list = []
1227        self._grades = {}
1228        self._grade_parts: list[GradePart] = []
1229        self._errors = []
1230        self._total_grade_self_eval = 0
1231        self._total_max_self_eval = 0
1232        self._total_grade_exo_eval = 0
1233        self._total_max_exo_eval = 0
1234        self._maximum_mark = params.default_maximum_mark
1235        self._use_numerical_marks = params.use_numerical_marks_by_default
1236        self._display_marks_in_auto_evaluations = params.display_marks_in_auto_evaluations_by_default
1237        self._mark_self_eval = None
1238        self._mark_exo_eval = None
1239        self._section_counter = []
1240        self.section_fmt = [('R', 1), ('N', 1), ('l', 2), ('N', 3)]
1241        self._eval_date = None
1242        self._re_eval_date = None
1243        self._default_language = 'en'
1244        self.auto_eval_count = 0
1245        self._exam_json = None
1246        self.full_reset()
net_scheme
archive_dirs
files_to_save_in_archives
step
max_step
section_fmt
auto_eval_count
def full_reset(self):
1248    def full_reset(self):
1249        """Reset all state including tests and answers (called from ``__init__``)."""
1250        self.step = 0
1251        self.max_step = 1
1252        self._tests = {}  # keys : (machine,step)->(cmd, timeout)->(result, code))
1253        self._allow_errors_in_tests = {}
1254        self._host_tests = {}  # step -> {(command, timeout): (result, code)}
1255        self._allow_errors_in_host_tests = {}  # (step, command, timeout) -> True
1256        self._answers = {}  # hash -> answer
1257        self._eval_date = None
1258        self.reset_before_grade()

Reset all state including tests and answers (called from __init__).

def reset_before_grade(self):
1260    def reset_before_grade(self):
1261        """Clear questions, grade list, and section counters before each call to :meth:`grade`."""
1262        self._questions = dict()  # hash->Question
1263        self._questions_order = dict()  # order -> [Question1, Question2, ...]
1264        self._questions_current_order = 100
1265        self._cheat_answers = {}  # state->(question_hash->answer)
1266        self._grade_list = []
1267        self._grades = dict()  # title -> GradeElement
1268        self._grade_parts = []
1269        self._section_counter = []

Clear questions, grade list, and section counters before each call to grade().

def grade(self):
1271    def grade(self):
1272        """Override to register tests, questions, and grade elements.
1273
1274        Called multiple times during :meth:`run_tests` — once per step plus a
1275        final time when all results are available.  Do **not** produce side-effects
1276        here; only call ``self.test()``, ``self.question_*()`` and
1277        ``self.add_grade_element()`` / ``self.set_grade()``.
1278        """
1279        self._grade_list = []
1280        self._grade_parts = []

Override to register tests, questions, and grade elements.

Called multiple times during run_tests() — once per step plus a final time when all results are available. Do not produce side-effects here; only call self.test(), self.question_*() and self.add_grade_element() / self.set_grade().

def mark_self_eval(self):
1308    def mark_self_eval(self):
1309        """Final mark over elements visible in self-eval (scope & SELF_EVAL_SCOPE)."""
1310        return self._compute_mark(self._total_grade_self_eval, self._total_max_self_eval)

Final mark over elements visible in self-eval (scope & SELF_EVAL_SCOPE).

def mark_exo_eval(self):
1312    def mark_exo_eval(self):
1313        """Final mark over elements visible in non-auto eval / outline / sheet."""
1314        return self._compute_mark(self._total_grade_exo_eval, self._total_max_exo_eval)

Final mark over elements visible in non-auto eval / outline / sheet.

def get_data(self):
1316    def get_data(self):
1317        return self.net_scheme.get_data()
def get_errors(self):
1319    def get_errors(self):
1320        return self._errors
def get_grade_list(self):
1322    def get_grade_list(self):
1323        return self._grade_list
def get_grade_parts(self):
1325    def get_grade_parts(self):
1326        return self._grade_parts
def get_answers(self):
1328    def get_answers(self):
1329        return self._answers
def get_cheat_answers(self, state: str):
1331    def get_cheat_answers(self, state: str):
1332        return self._cheat_answers.get(state)
def get_tests(self):
1334    def get_tests(self):
1335        return self._tests
def get_exetests_strings(self, step: int):
1337    def get_exetests_strings(self, step: int):
1338        result = dict()
1339        for (machine, step1) in self._tests.keys():
1340            if step != step1:
1341                continue
1342            result[machine] = params.exetests_separator.join(
1343                [f"{timeout}:{cmd}" for (cmd, timeout) in self._tests[(machine, step)].keys()])
1344        return result
def get_running_lab_name(self):
1346    def get_running_lab_name(self):
1347        return self.net_scheme.running_lab_name
def increment_section_counter(self, level: int):
1349    def increment_section_counter(self, level: int):
1350        if len(self._section_counter) < level + 1:
1351            self._section_counter += [0 for i in range(level + 1 - len(self._section_counter))]
1352        else:
1353            self._section_counter = self._section_counter[:(level + 1)]
1354        self._section_counter[level] += 1
def set_section_counter(self, level: int, value: int):
1356    def set_section_counter(self, level: int, value: int):
1357        if len(self._section_counter) < level + 1:
1358            self._section_counter += [0 for i in range(level + 1 - len(self._section_counter))]
1359        else:
1360            self._section_counter = self._section_counter[:(level + 1)]
1361        self._section_counter[level] = value
def section(self, level: int = 0, fmt=None, show: int = None, pad: str = None):
1363    def section(self, level: int = 0, fmt=None, show: int = None, pad: str = None):
1364        self.increment_section_counter(level)
1365        return self.current_section(level, fmt, show, pad)
def current_section(self, level: int = 0, fmt=None, show: int = None, pad: str = None):
1367    def current_section(self, level: int = 0, fmt=None, show: int = None, pad: str = None):
1368        # fmt is a list of (type, depth[, prefix]) tuples, one per level.
1369        # type: R=ROMAN, r=roman, L=Letters, l=letters, N=number
1370        # depth: how many consecutive counters to display, ending at this level
1371        # prefix: optional string prepended to the result (default: '')
1372        # e.g. [('R',1),('N',2,' ')] → level 0: "I." / level 1: " I.1."
1373        if fmt is None:
1374            fmt = self.section_fmt
1375
1376        def _to_roman(n: int) -> str:
1377            vals = [
1378                (1000, 'M'), (900, 'CM'), (500, 'D'), (400, 'CD'),
1379                (100, 'C'), (90, 'XC'), (50, 'L'), (40, 'XL'),
1380                (10, 'X'), (9, 'IX'), (5, 'V'), (4, 'IV'), (1, 'I'),
1381            ]
1382            result = ''
1383            for value, numeral in vals:
1384                while n >= value:
1385                    result += numeral
1386                    n -= value
1387            return result
1388
1389        def _to_capital_letter(n: int) -> str:
1390            result = ''
1391            while n > 0:
1392                n, r = divmod(n - 1, 26)
1393                result = chr(ord('A') + r) + result
1394            return result
1395
1396        def _to_lowercase_letter(n: int) -> str:
1397            result = ''
1398            while n > 0:
1399                n, r = divmod(n - 1, 26)
1400                result = chr(ord('a') + r) + result
1401            return result
1402
1403        def _convert(n: int, fmt_type: str) -> str:
1404            match fmt_type:
1405                case 'R':
1406                    return _to_roman(n)
1407                case 'r':
1408                    return _to_roman(n).lower()
1409                case 'L':
1410                    return _to_capital_letter(n)
1411                case 'l':
1412                    return _to_lowercase_letter(n)
1413                case _:
1414                    return str(n)
1415
1416        entry = fmt[level] if level < len(fmt) else ('N', level + 1)
1417        _, depth = entry[:2]
1418        prefix = pad if pad is not None else (entry[2] if len(entry) > 2 else '')
1419        if show is not None:
1420            depth = show
1421        start = max(0, level - depth + 1)
1422        parts = [_convert(self._section_counter[j] if j < len(self._section_counter) else 0,
1423                          fmt[j][0] if j < len(fmt) else 'N')
1424                 for j in range(start, level + 1)]
1425        return prefix + ".".join(parts) + ". "
def load_answers(self):
1427    def load_answers(self):
1428        """Load student answers from ``answers.json`` into ``self._answers``."""
1429        self._answers = {}
1430        try:
1431            fd = os.open(params.answers_filename(running_lab_name=self.get_running_lab_name()),
1432                         os.O_RDONLY | os.O_NOFOLLOW)
1433            with os.fdopen(fd) as f:
1434                self._answers = json.load(f)
1435        except (OSError, json.JSONDecodeError):
1436            self._answers = {}

Load student answers from answers.json into self._answers.

def get_questions_ordered(self):
1438    def get_questions_ordered(self):
1439        q = []
1440        for _, v in sorted(self._questions_order.items()):
1441            q += v
1442        return q
def export_questions(self):
1444    def export_questions(self):
1445        pass
def test_host( self, command, step=1, timeout: int = 20, default_value='', default_code: int = 0, allow_error: bool = False):
1447    def test_host(self, command, step=1, timeout: int = params.default_timeout, default_value='',
1448                  default_code: int = 0, allow_error: bool = False):
1449        """Register and retrieve the result of a host-side test command.
1450
1451        On the first call (registration pass) returns *default_value* / *default_code*.
1452        On subsequent calls (result pass) returns the actual ``(stdout, exit_code)``.
1453        Set *allow_error* to suppress error recording on non-zero exit.
1454        """
1455        if params.execute_commands_on_host is False:
1456            sys.exit("test_host() is disabled by params.execute_commands_on_host")
1457        if self.max_step < step:
1458            self.max_step = step
1459        if step not in self._host_tests:
1460            self._host_tests[step] = {}
1461        if allow_error and (step, command, timeout) not in self._allow_errors_in_host_tests:
1462            self._allow_errors_in_host_tests[(step, command, timeout)] = True
1463        if (command, timeout) not in self._host_tests[step]:
1464            self._host_tests[step][(command, timeout)] = (default_value, default_code)
1465            return default_value, default_code
1466        return self._host_tests[step][(command, timeout)]

Register and retrieve the result of a host-side test command.

On the first call (registration pass) returns default_value / default_code. On subsequent calls (result pass) returns the actual (stdout, exit_code). Set allow_error to suppress error recording on non-zero exit.

def test( self, machine_name, command, step=1, timeout: int = 20, default_value='', default_code: int = 0, allow_error: bool = False):
1468    def test(self, machine_name, command, step=1, timeout: int = params.default_timeout, default_value='',
1469             default_code: int = 0, allow_error: bool = False):
1470        """Register and retrieve the result of a command executed inside *machine_name*.
1471
1472        On the first call (registration pass) returns *default_value* / *default_code*.
1473        On subsequent calls (result pass) returns the actual ``(stdout, exit_code)``.
1474        *timeout* is in seconds.  Set *allow_error* to suppress error recording.
1475        """
1476        if self.max_step < step:
1477            self.max_step = step
1478        if (machine_name, step) not in self._tests:
1479            self._tests[(machine_name, step)] = {}
1480        if allow_error and (machine_name, step, command, timeout) not in self._allow_errors_in_tests:
1481            self._allow_errors_in_tests[(machine_name, step, command, timeout)] = True
1482        if (command, timeout) not in self._tests[(machine_name, step)]:
1483            self._tests[(machine_name, step)][(command, timeout)] = (default_value, default_code)
1484            return default_value, default_code
1485        return self._tests[(machine_name, step)][(command, timeout)]

Register and retrieve the result of a command executed inside machine_name.

On the first call (registration pass) returns default_value / default_code. On subsequent calls (result pass) returns the actual (stdout, exit_code). timeout is in seconds. Set allow_error to suppress error recording.

def question_text( self, title, section='', description='', hash=None, order=None, default_answer='', cheat_answers=None):
1495    def question_text(self, title, section='', description='', hash=None, order=None, default_answer='',
1496                      cheat_answers=None):
1497        """Register a free-text question and return the student's current answer string.
1498
1499        Returns *default_answer* if the student has not answered yet.
1500        *cheat_answers* maps state names to answer strings used in automated testing.
1501        """
1502        title = self._apply_section(section, title)
1503        if order is None:
1504            order1 = self._questions_current_order
1505            self._questions_current_order = ((self._questions_current_order // 100) + 1) * 100
1506        else:
1507            order1 = order
1508
1509        q = QuestionText(title, description, hash, order1)
1510        if order1 not in self._questions_order:
1511            self._questions_order[order1] = []
1512        self._questions_order[order1].append(q)
1513
1514        if q.question_hash in self._questions:
1515            write_error(f"Duplicate question hash {q.question_hash}")
1516        self._questions[q.question_hash] = q
1517
1518        if cheat_answers is not None:
1519            for state, answer in cheat_answers.items():
1520                if state not in self._cheat_answers:
1521                    self._cheat_answers[state] = {}
1522                self._cheat_answers[state][q.question_hash] = answer
1523
1524        if q.question_hash in self._answers:
1525            return self._answers[q.question_hash]
1526        return default_answer

Register a free-text question and return the student's current answer string.

Returns default_answer if the student has not answered yet. cheat_answers maps state names to answer strings used in automated testing.

def question_form( self, title, section='', description='', hash=None, order=None, cheat_answers=None):
1530    def question_form(self, title, section='', description='', hash=None, order=None, cheat_answers=None):
1531        """Register a form question with inline @@{field_name:regex}@@ fields.
1532
1533        Returns the student's answers as a dict {field_name: value}, or {} if none yet.
1534        cheat_answers format: {state_name: {field_name: value, ...}}
1535        """
1536        title = self._apply_section(section, title)
1537        from .common import QuestionForm
1538
1539        # The @@{field:regex}@@ markers are language-independent, so when the
1540        # description is a TranslatedText (wrapped in tr()) extract fields from
1541        # its resolved string. The full TranslatedText is still stored on the
1542        # question for per-language rendering.
1543        desc_str = (description.resolve('')
1544                    if isinstance(description, TranslatedText) else description)
1545
1546        fields = []
1547        for m in self._FORM_FIELD_RE.finditer(desc_str):
1548            name, spec = m.group(1), m.group(2)
1549            if spec.startswith('>') or '>>>' in spec:
1550                raw = spec[1:] if spec.startswith('>') else spec
1551                fields.append({"name": name, "choices": [
1552                    c.strip().split('>>>')[1].strip() if '>>>' in c.strip() else c.strip()
1553                    for c in raw.split('|')
1554                ]})
1555            elif spec.startswith('?'):
1556                default = spec[1:].strip().lower() not in ('', 'false')
1557                fields.append({"name": name, "checkbox": default})
1558            else:
1559                fields.append({"name": name, "regex": spec})
1560
1561        if order is None:
1562            order1 = self._questions_current_order
1563            self._questions_current_order = ((self._questions_current_order // 100) + 1) * 100
1564        else:
1565            order1 = order
1566
1567        q = QuestionForm(title, description, hash, order1, fields)
1568        if order1 not in self._questions_order:
1569            self._questions_order[order1] = []
1570        self._questions_order[order1].append(q)
1571
1572        if q.question_hash in self._questions:
1573            write_error(f"Duplicate question hash {q.question_hash}")
1574        self._questions[q.question_hash] = q
1575
1576        if cheat_answers is not None:
1577            for state, field_answers in cheat_answers.items():
1578                if state not in self._cheat_answers:
1579                    self._cheat_answers[state] = {}
1580                self._cheat_answers[state][q.question_hash] = json.dumps(
1581                    field_answers, ensure_ascii=False
1582                )
1583
1584        if q.question_hash in self._answers:
1585            try:
1586                return json.loads(self._answers[q.question_hash])
1587            except (json.JSONDecodeError, TypeError):
1588                return {}
1589        return {}

Register a form question with inline @@{field_name:regex}@@ fields.

Returns the student's answers as a dict {field_name: value}, or {} if none yet. cheat_answers format: {state_name: {field_name: value, ...}}

def question_dummy(self, title, section='', description='', hash=None, order=None):
1591    def question_dummy(self, title, section='', description='', hash=None, order=None):
1592        """Register a display-only block (no answer widget shown to the student)."""
1593        title = self._apply_section(section, title)
1594        if order is None:
1595            order1 = self._questions_current_order
1596            self._questions_current_order = ((self._questions_current_order // 100) + 1) * 100
1597        else:
1598            order1 = order
1599
1600        q = QuestionDummy(title, description, hash, order1)
1601        if order1 not in self._questions_order:
1602            self._questions_order[order1] = []
1603        self._questions_order[order1].append(q)
1604
1605        if q.question_hash in self._questions:
1606            write_error(f"Duplicate question hash {q.question_hash}")
1607        self._questions[q.question_hash] = q

Register a display-only block (no answer widget shown to the student).

def add_grade_part(self, title, description=''):
1609    def add_grade_part(self, title, description=''):
1610        """Register a new :class:`GradePart` group and return it.
1611
1612        Pass the returned object to ``add_grade_element(..., grade_part=...)``
1613        to associate elements with this part.  Parts are displayed in
1614        registration order (with a subtotal row per part) in the GUI
1615        evaluation view and in ``sre outline`` PDFs.
1616        """
1617        if description:
1618            description = TranslatedText.from_value(description, self._default_language)
1619        gp = GradePart(title=title, description=description)
1620        if any(p.title == title for p in self._grade_parts):
1621            write_error(f"Duplicate grade part title = {title}")
1622        self._grade_parts.append(gp)
1623        return gp

Register a new GradePart group and return it.

Pass the returned object to add_grade_element(..., grade_part=...) to associate elements with this part. Parts are displayed in registration order (with a subtotal row per part) in the GUI evaluation view and in sre outline PDFs.

def add_grade_element( self, title, max_grade, description='', grade=0, scope=3, grade_part=None):
1625    def add_grade_element(self, title, max_grade, description='', grade=0, scope=params.BOTH_EVAL_SCOPE,
1626                          grade_part=None):
1627        """Add a graded rubric item.  Initial *grade* defaults to 0; use :meth:`set_grade` to update it.
1628
1629        ``scope`` is a bitmask: ``SELF_EVAL_SCOPE`` (1) for self-eval only,
1630        ``EXO_EVAL_SCOPE`` (2) for non-auto eval / outline / sheet only,
1631        ``BOTH_EVAL_SCOPE`` (3, default) for both audiences.
1632
1633        ``grade_part`` optionally associates this element with a
1634        :class:`GradePart` previously returned by :meth:`add_grade_part`.
1635        """
1636        if scope not in params.grade_scopes:
1637            raise ValueError(f"Invalid scope {scope!r}; expected one of {params.grade_scopes}")
1638        if description:
1639            description = TranslatedText.from_value(description, self._default_language)
1640        grade_part_title = None
1641        if grade_part is not None:
1642            if not isinstance(grade_part, GradePart):
1643                raise TypeError(f"grade_part must be a GradePart, got {type(grade_part).__name__}")
1644            if grade_part not in self._grade_parts:
1645                write_error(f"Unregistered grade part {grade_part.title!r} passed to add_grade_element")
1646            grade_part_title = grade_part.title
1647        g = GradeElement(title=title, max_grade=max_grade, description=description, grade=grade, scope=scope,
1648                         grade_part=grade_part_title)
1649        self._grade_list.append(g)
1650        key = _tt_hash_str(title)
1651        if key in self._grades:
1652            write_error(f"Duplicate grade title = {title}")
1653        self._grades[key] = g

Add a graded rubric item. Initial grade defaults to 0; use set_grade() to update it.

scope is a bitmask: SELF_EVAL_SCOPE (1) for self-eval only, EXO_EVAL_SCOPE (2) for non-auto eval / outline / sheet only, BOTH_EVAL_SCOPE (3, default) for both audiences.

grade_part optionally associates this element with a GradePart previously returned by add_grade_part().

def set_grade(self, title, grade):
1655    def set_grade(self, title, grade):
1656        """Set the numeric *grade* for the element previously registered under *title*."""
1657        self._grades[_tt_hash_str(title)].grade = grade

Set the numeric grade for the element previously registered under title.

def save_lab_info(self):
1659    def save_lab_info(self):
1660        debug_project = os.path.exists(
1661            params.debug_project_marker_filename(self.net_scheme.running_lab_name)
1662        )
1663        if debug_project:
1664            visible_machines = list(self.net_scheme.get_machines())
1665        else:
1666            visible_machines = [m for m in self.net_scheme.get_machines() if not m.hidden]
1667        lab_hash = self.net_scheme.get_lab_hash()
1668
1669        def _get_stats(m):
1670            s = next(Kathara.get_instance().get_machine_stats(lab_hash=lab_hash, machine_name=m.name), None)
1671            return m, s
1672
1673        stats_map = {}
1674        with ThreadPoolExecutor(max_workers=min(params.max_docker_concurrency, len(visible_machines))) as executor:
1675            for m, s in executor.map(_get_stats, visible_machines):
1676                stats_map[m.name] = (m, s)
1677
1678        info_machines = []
1679        for m in visible_machines:
1680            m, s = stats_map[m.name]
1681            interfaces = []
1682            for net, netAdapter in m.net_adapters.items():
1683                interfaces.append(InfoInterface(network=net.name, interface_name=f"eth{netAdapter.interface}"))
1684            info_machines.append(
1685                InfoMachine(name=m.name, status=s.status if s is not None else "", hidden=m.hidden,
1686                            bridged=m.bridged,
1687                            x11_host=m.x11_host,
1688                            ports=m.ports,
1689                            allow_connection=m.allow_connection,
1690                            color=m.color or "",
1691                            shape=m.shape or "",
1692                            interfaces=interfaces))
1693
1694        network_colors = {
1695            net.name: net.color
1696            for m in visible_machines
1697            for net in m.net_adapters
1698            if net.color
1699        }
1700        network_shapes = {
1701            net.name: net.shape
1702            for m in visible_machines
1703            for net in m.net_adapters
1704            if net.shape
1705        }
1706
1707        module_rvlab = sys.modules[params.srelab_py_name.removesuffix(".py")]
1708        default_language = getattr(module_rvlab, 'default_language', 'en')
1709        self._default_language = default_language
1710        show_nat_network = getattr(module_rvlab, 'show_nat_network', params.default_show_nat_network)
1711        nat_network_name = getattr(module_rvlab, 'host_network_name', params.default_host_network_name)
1712        nat_network_color = getattr(module_rvlab, 'host_network_color', params.default_host_network_color)
1713        host_network_exploded = getattr(module_rvlab, 'host_network_exploded',
1714                                        params.default_host_network_exploded)
1715        host_network_edge_relative_length = float(getattr(
1716            module_rvlab, 'host_network_edge_relative_length',
1717            params.default_host_network_edge_relative_length))
1718        schema_splines = getattr(module_rvlab, 'schema_splines', params.graphviz_default_splines)
1719        schema_overlap = getattr(module_rvlab, 'schema_overlap', params.graphviz_default_overlap)
1720
1721        if self.step == 0:
1722            self.grade()
1723            self.step += 1
1724
1725        questions = self.get_questions_ordered()
1726
1727        if hasattr(module_rvlab, 'title'):
1728            title = module_rvlab.title
1729        else:
1730            lab_name = params.get_lab_name_from_running_lab_name(self.get_running_lab_name())
1731            title = lab_name.removesuffix('.py')
1732        title = TranslatedText.from_value(title, default_language)
1733
1734        if hasattr(module_rvlab, 'delay_between_self_grade'):
1735            delay_between_self_grade = module_rvlab.delay_between_self_grade
1736        else:
1737            delay_between_self_grade = 0
1738
1739        if hasattr(module_rvlab, 'allow_self_grade'):
1740            allow_self_grade = module_rvlab.allow_self_grade
1741        else:
1742            allow_self_grade = False
1743
1744        if debug_project:
1745            delay_between_self_grade = 0
1746            allow_self_grade = True
1747        if hasattr(module_rvlab, 'export_kathara_project'):
1748            export_kathara_project = module_rvlab.export_kathara_project
1749        else:
1750            export_kathara_project = False
1751
1752        eval_interval_without_exam_mode = getattr(module_rvlab, 'eval_interval_without_exam_mode', params.default_eval_interval_without_exam_mode)
1753        eval_before_exit = getattr(module_rvlab, 'eval_before_exit', False)
1754
1755        informations = TranslatedText.from_value(self.net_scheme.informations, default_language)
1756        for q in questions:
1757            q.title = TranslatedText.from_value(q.title, default_language)
1758            q.description = TranslatedText.from_value(q.description, default_language)
1759
1760        net_scheme_cls = type(self.net_scheme)
1761        if debug_project:
1762            user_allowed_states_raw = {}
1763            for state in net_scheme_cls.get_state_methods():
1764                desc = ''
1765                for klass in net_scheme_cls.__mro__:
1766                    fn = klass.__dict__.get(state)
1767                    if fn is not None and getattr(fn, '_is_sre_state', False):
1768                        desc = getattr(fn, '_sre_state_description', '')
1769                        break
1770                user_allowed_states_raw[state] = desc
1771        else:
1772            user_allowed_states_raw = (
1773                net_scheme_cls.get_user_allowed_states()
1774                if getattr(module_rvlab, 'allow_user_states', False) else {}
1775            )
1776        user_allowed_states = {
1777            state: TranslatedText.from_value(desc, default_language) if desc else desc
1778            for state, desc in user_allowed_states_raw.items()
1779        }
1780
1781        if debug_project:
1782            module_allows_user_states = getattr(module_rvlab, 'allow_user_states', False)
1783            admin_only_states = [
1784                state for state in user_allowed_states_raw
1785                if not module_allows_user_states
1786                or not net_scheme_cls.is_state_user_allowed(state)
1787            ]
1788        else:
1789            admin_only_states = []
1790
1791        info = InfoLab(lab_name=self.net_scheme.lab_name, lab_hash=self.net_scheme.lab_hash,
1792                       title=title, machines=info_machines, delay_between_self_grade=delay_between_self_grade,
1793                       questions=questions, informations=informations,
1794                       export_kathara_project=export_kathara_project, allow_self_grade=allow_self_grade,
1795                       debug_project=debug_project,
1796                       eval_interval_without_exam_mode=eval_interval_without_exam_mode,
1797                       eval_before_exit=eval_before_exit,
1798                       default_language=default_language,
1799                       user_allowed_states=user_allowed_states,
1800                       admin_only_states=admin_only_states,
1801                       network_colors=network_colors,
1802                       network_shapes=network_shapes,
1803                       show_nat_network=show_nat_network,
1804                       nat_network_name=nat_network_name,
1805                       nat_network_color=nat_network_color,
1806                       host_network_exploded=host_network_exploded,
1807                       host_network_edge_relative_length=host_network_edge_relative_length,
1808                       schema_splines=schema_splines,
1809                       schema_overlap=schema_overlap)
1810
1811        info_json = info.to_json()
1812        info_filename = params.info_filename(self.net_scheme.running_lab_name)
1813
1814        save_info_json = None
1815        try:
1816            with open(info_filename, "r") as f:
1817                save_info_json = f.read()
1818        except FileNotFoundError:
1819            pass
1820        if save_info_json == info_json:
1821            return
1822
1823        temp_file = tempfile.NamedTemporaryFile(
1824            delete=False,
1825            dir=Path(self.net_scheme.get_public_lab_dir()).parent)
1826        with open(temp_file.name, "w") as f:
1827            print(info_json, file=f)
1828            f.flush()
1829            os.fsync(f.fileno())
1830            os.chmod(temp_file.name, 0o644)
1831        os.replace(temp_file.name, info_filename)
def add_error(self, error, category=<ErrorCategory.ERROR: 'ERROR'>, step: int = 1):
1833    def add_error(self, error, category=ErrorCategory.ERROR, step: int = 1):
1834        if self.step != step:
1835            return
1836        log_error(error)
1837        self._errors.append((category.value, error))
def add_warning(self, warning, step: int = 1):
1839    def add_warning(self, warning, step: int = 1):
1840        self.add_error(warning, category=ErrorCategory.WARNING, step=step)
@staticmethod
def run_tests_on_machine(machine_name, machine, exetests):
1842    @staticmethod
1843    def run_tests_on_machine(machine_name, machine, exetests):
1844        environment = {params.exetests_env_name: exetests}
1845        code, output = machine.api_object.exec_run([params.exetests_machines_path],
1846                                                   stdin=False,
1847                                                   stdout=True,
1848                                                   stderr=False,
1849                                                   tty=False,
1850                                                   environment=environment,
1851                                                   )
1852        return machine_name, code, output
def run_tests(self):
1854    def run_tests(self):
1855        self._tests = {}
1856        self._section_counter = []
1857        lab = self.net_scheme.get_lab_from_kathara()
1858        self._errors = []
1859        self.load_answers()
1860        self._eval_date = datetime.datetime.now().isoformat()
1861
1862        while self.step <= self.max_step:
1863            self.reset_before_grade()
1864            self.grade()
1865            self.step += 1
1866            tests = self.get_tests()
1867            exetests_by_machine = self.get_exetests_strings(self.step)
1868            if SRE.args.debug and self.step <= self.max_step:
1869                log_debug(f"Commands step {self.step} (after running grade on step {self.step - 1}):")
1870                for machine, cmds in exetests_by_machine.items():
1871                    c = cmds.split(params.exetests_separator)
1872                    c1 = " - ".join(c)
1873                    log_debug(f"{machine}: {c1}")
1874
1875            results = {}
1876            with ThreadPoolExecutor(
1877                    max_workers=max(1, min(params.max_docker_concurrency, len(lab.machines)))) as executor:
1878                futures_to_machines = {
1879                    executor.submit(
1880                        Grade0.run_tests_on_machine,
1881                        machine_name,
1882                        machine,
1883                        exetests_by_machine[machine_name],
1884                    ): machine_name
1885                    for machine_name, machine in lab.machines.items()
1886                    if machine_name in exetests_by_machine and len(exetests_by_machine[machine_name]) > 0
1887                }
1888                for future in as_completed(futures_to_machines):
1889                    machine_name = futures_to_machines[future]
1890                    try:
1891                        machine_name, code, output = future.result()
1892                        results[machine_name] = (code, output)
1893                    except Exception as e:
1894                        self.add_error(f"error during test execution on machine {machine_name}: {e}", step=self.step)
1895                        continue
1896
1897            for machine_name, (exetests_code, output) in results.items():
1898                if (machine_name, self.step) not in self._tests:
1899                    self._tests[(machine_name, self.step)] = {}
1900                if exetests_code != 0:
1901                    self.add_error(
1902                        f"exetests error on {machine_name}: {exetests_by_machine[machine_name]} -- return code {exetests_code}",
1903                        step=self.step)
1904                output1 = output.decode("utf-8")
1905                separator, _, rest = output1.partition("\n")
1906                output2 = rest.split(f"\n{separator}\n")
1907                for i in range(0, len(output2), 2):
1908                    ligne1 = ""
1909                    try:
1910                        ligne1, date1, result = output2[i].split("\n", 2)
1911                    except ValueError:
1912                        result = ""
1913                    if not ligne1:
1914                        continue
1915                    timeout_s, cmd = ligne1.split(":", 1)
1916                    timeout = int(timeout_s)
1917                    date2, code_s = output2[i + 1].split("\n", 1)
1918                    try:
1919                        code = int(code_s.strip())
1920                    except ValueError:
1921                        code = -2
1922                        self.add_error(f"test error on {machine_name}:{self.step}:{cmd} illegal error code",
1923                                       step=self.step)
1924                    if code != 0:
1925                        if not self._allow_errors_in_tests.get((machine_name, self.step, cmd, timeout), False):
1926                            self.add_error(f"test error on {machine_name}:{cmd} code={code}", step=self.step)
1927                    self._tests[(machine_name, self.step)][cmd, timeout] = (result, code)
1928            if SRE.args.debug:
1929                for machine_name in lab.machines:
1930                    if (machine_name, self.step) not in self._tests:
1931                        continue
1932                    for (cmd, timeout) in self._tests[(machine_name, self.step)]:
1933                        log_debug(f"machine {machine_name} - step {self.step} - command {cmd} - timeout {timeout}:")
1934                        result, code = self._tests[(machine_name, self.step)][cmd, timeout]
1935                        log_debug(result)
1936                        log_debug(f"-------- exit code {code}\n")
1937
1938            host_step_cmds = self._host_tests.get(self.step, {})
1939            if host_step_cmds:
1940                def _run_host_cmd(cmd, t):
1941                    from .utils_privileges import preexec_drop_to_sre
1942                    run_cmd = shlex.split(cmd) if params.execute_commands_on_host == "split" else cmd
1943                    use_shell = params.execute_commands_on_host == "shell"
1944                    try:
1945                        proc = subprocess.run(
1946                            run_cmd, shell=use_shell, capture_output=True, text=True,
1947                            timeout=t if t > 0 else None,
1948                            preexec_fn=preexec_drop_to_sre,
1949                        )
1950                        return cmd, t, proc.stdout, proc.returncode
1951                    except subprocess.TimeoutExpired:
1952                        return cmd, t, '', -1
1953                    except Exception:
1954                        return cmd, t, '', -2
1955
1956                with ThreadPoolExecutor(max_workers=min(params.max_docker_concurrency,
1957                                                        len(host_step_cmds))) as executor:
1958                    futures = {executor.submit(_run_host_cmd, cmd, t): (cmd, t)
1959                               for (cmd, t) in host_step_cmds}
1960                    for future in as_completed(futures):
1961                        cmd, t, result, code = future.result()
1962                        if code != 0:
1963                            if not self._allow_errors_in_host_tests.get((self.step, cmd, t), False):
1964                                self.add_error(f"host test error: {cmd} code={code}", step=self.step)
1965                        self._host_tests[self.step][(cmd, t)] = (result, code)
1966                        if SRE.args.debug:
1967                            log_debug(f"host - step {self.step} - command {cmd} - timeout {t}:")
1968                            log_debug(result)
1969                            log_debug(f"-------- exit code {code}\n")
1970
1971        if SRE.args.debug and len(self._errors) > 0:
1972            log_debug(f"{len(self._errors)} errors:\n " + "\n".join(f"[{e[0]}] {e[1]}" for e in self._errors))
1973        self.compute_total()
1974        self._mark_self_eval = self.mark_self_eval()
1975        self._mark_exo_eval = self.mark_exo_eval()
1976        # log_error(f"DEBUG _tests at save: {len(self._tests)} machine-step entries")
1977        # for (machine, step), cmds in self._tests.items():
1978        #     for (cmd, timeout), (result, code) in cmds.items():
1979        #         log_error(f"  [{machine}][step={step}] cmd={cmd!r} result_len={len(result)} code={code}")
def compute_total(self):
1981    def compute_total(self):
1982        """Accumulate per-scope totals in one pass; BOTH-scope elements contribute to both."""
1983        self._total_grade_self_eval = 0
1984        self._total_max_self_eval = 0
1985        self._total_grade_exo_eval = 0
1986        self._total_max_exo_eval = 0
1987        for g in self._grade_list:
1988            if g.scope & params.SELF_EVAL_SCOPE:
1989                self._total_grade_self_eval += g.grade
1990                self._total_max_self_eval += g.max_grade
1991            if g.scope & params.EXO_EVAL_SCOPE:
1992                self._total_grade_exo_eval += g.grade
1993                self._total_max_exo_eval += g.max_grade

Accumulate per-scope totals in one pass; BOTH-scope elements contribute to both.

def save_tests(self):
1995    def save_tests(self):
1996        now = datetime.datetime.now()
1997        if self._exam_json is None:
1998            exam_path = Path(params.sre_pub_dir) / params.exam_json_name
1999            try:
2000                self._exam_json = json.loads(exam_path.read_text())
2001            except FileNotFoundError:
2002                pass
2003            except Exception as e:
2004                log_error(f"can't read {exam_path}: {e}")
2005        for d1 in self.archive_dirs:
2006            d = Path(d1).expanduser().resolve()
2007            try:
2008                if not d.exists():
2009                    os.mkdir(d, 0o700)
2010                else:
2011                    os.listdir(d)  # trigger mount / catch stale handle early
2012                    permissions = os.stat(d).st_mode
2013                    if permissions != 0o700:
2014                        os.chmod(d, 0o700)
2015                filename = d / params.get_archive_name(self.net_scheme.running_lab_name, now)
2016                self.save_tests_on_file(str(filename))
2017            except Exception as e:
2018                log_error(f"can't save archive to {d}: {e}")
def save_tests_on_file(self, filename: str):
2020    def save_tests_on_file(self, filename: str):
2021        files_content = {}
2022        if self.files_to_save_in_archives:
2023            fdir = Path(params.files_dir(self.get_running_lab_name()))
2024            if fdir.is_dir():
2025                for pattern in self.files_to_save_in_archives:
2026                    for fpath in fdir.iterdir():
2027                        if fpath.is_file() and fnmatch.fnmatch(fpath.name, pattern):
2028                            try:
2029                                files_content[fpath.name] = fpath.read_bytes()
2030                            except Exception:
2031                                pass
2032        archive = {
2033            params.running_lab_name_keyword: self.get_running_lab_name(),
2034            params.eval_date_keyword: self._eval_date,
2035            params.re_eval_date_keyword: self._re_eval_date,
2036            'data_json': self.get_data().to_json(),
2037            'tests': self.get_tests(),
2038            'errors': self.get_errors(),
2039            'answers': self.get_answers(),
2040            'grade_list': [asdict(e) for e in self._grade_list],
2041            'grade_parts': [asdict(p) for p in self._grade_parts],
2042            'total_grade_self_eval': self._total_grade_self_eval,
2043            'total_max_self_eval': self._total_max_self_eval,
2044            'mark_self_eval': self._mark_self_eval,
2045            'total_grade_exo_eval': self._total_grade_exo_eval,
2046            'total_max_exo_eval': self._total_max_exo_eval,
2047            'mark_exo_eval': self._mark_exo_eval,
2048            'maximum_mark': self._maximum_mark,
2049            'files': files_content,
2050        }
2051        if self._exam_json is not None:
2052            archive[params.exam_json_keyword] = self._exam_json
2053        cctx = zstd.ZstdCompressor(level=6)
2054        with tempfile.NamedTemporaryFile(
2055                mode="wb",
2056                dir=os.path.dirname(filename),
2057                delete=False
2058        ) as f:
2059            with cctx.stream_writer(f) as compressor:
2060                packed = msgpack.packb(archive, use_bin_type=True)
2061                compressor.write(packed)
2062            # f.flush()
2063            # os.fsync(f.fileno())
2064            temp_name = f.name
2065        os.replace(temp_name, filename)