diff --git a/base/images/images.toml b/base/images/images.toml index a7e87f5055e..ad7b7d9c50c 100644 --- a/base/images/images.toml +++ b/base/images/images.toml @@ -12,7 +12,10 @@ runtime-package-management = true [images.container-base] description = "Container Base Image" definition = { type = "kiwi", path = "container-base/container-base.kiwi", profile = "core" } -tests.test-suites = [{ name = "static-image-checks" }] +tests.test-suites = [ + { name = "static-image-checks" }, + { name = "container-bvt-tests" }, +] [images.container-base.capabilities] machine-bootable = false @@ -61,4 +64,21 @@ extra-args = [ "--image-path", "{image-path}", "--image-name", "{image-name}", "--capabilities", "{capabilities}", + "-m", "not runtime_container_tests", +] + +# Container Business Validation Tests (BVT) +[test-suites.container-bvt-tests] +type = "pytest" +description = "Container BVT: runtime validation of system resources, networking, filesystem, and package management" + +[test-suites.container-bvt-tests.pytest] +working-dir = "tests" +install = "pyproject" +test-paths = ["cases/container-base/runtime/test_container_bvt.py"] +extra-args = [ + "--image-path", "{image-path}", + "--image-name", "{image-name}", + "--capabilities", "{capabilities}", + "-v", "-s" ] diff --git a/base/images/tests/cases/container-base/runtime/test_container_bvt.py b/base/images/tests/cases/container-base/runtime/test_container_bvt.py new file mode 100644 index 00000000000..ab66697f421 --- /dev/null +++ b/base/images/tests/cases/container-base/runtime/test_container_bvt.py @@ -0,0 +1,475 @@ +# SPDX-License-Identifier: MIT +"""Container BVT (Build Verification Tests). + +All tests run inside a live container via podman exec +""" + +from __future__ import annotations + +import json +import os +import time + +import pytest + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _cmd_ok(container_exec, cmd: str, timeout: int = 10) -> tuple[bool, str]: + """Run cmd; return (success, stdout). Never raises.""" + try: + result = container_exec(cmd, timeout=timeout) + return result.returncode == 0, result.stdout.strip() + except Exception: + return False, "" + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +@pytest.mark.require_capability("container") +@pytest.mark.require_capability("runtime-package-management") +@pytest.mark.requires_pkg("procps-ng", "gawk") +def test_bvt_system_footprint(container_exec) -> None: + """BVT: Memory, disk, CPU, and process footprint metrics.""" + # Memory + ok, out = _cmd_ok(container_exec, "free -m | awk '/^Mem:/ {print $2, $3}'") + assert ok, "free command failed — procps-ng may be missing" + parts = out.split() + assert len(parts) == 2, f"Unexpected free output: {out!r}" + total_mb = int(parts[0]) + assert total_mb > 0, f"Invalid total memory: {total_mb} MB" + print(f"Memory: total={parts[0]} MB, used={parts[1]} MB") + + # Disk + ok, out = _cmd_ok(container_exec, "df -h / | awk 'NR==2 {print $2, $3, $4}'") + assert ok, "df command failed" + print(f"Disk (total used avail): {out}") + + # CPU + ok, out = _cmd_ok(container_exec, "nproc") + assert ok, "nproc failed" + assert int(out) > 0, f"Unexpected CPU count: {out}" + print(f"CPU cores: {out}") + + # Process count + ok, out = _cmd_ok(container_exec, "ps aux --no-headers | wc -l") + assert ok, "ps failed — procps-ng may be missing" + count = int(out) + assert count >= 1, f"Too few processes: {count}" + print(f"Running processes: {count}") + + # Package count (optional — rpm may be absent in distroless builds) + ok, out = _cmd_ok(container_exec, "rpm -qa | wc -l", timeout=15) + if ok: + print(f"Installed packages: {out}") + else: + print("rpm not available — skipping package count") + + +@pytest.mark.require_capability("container") +@pytest.mark.require_capability("runtime-package-management") +@pytest.mark.requires_pkg("util-linux") +def test_bvt_logging(container_exec) -> None: + """BVT: System logging via logger and log file/journal access.""" + tag = f"bvt_{int(time.time())}" + message = f"BVT log test entry {tag}" + + ok, _ = _cmd_ok(container_exec, f"logger -t bvt_test '{message}'", timeout=10) + assert ok, "logger command failed — util-linux may be missing" + + # Find the entry: try journalctl, then /var/log/messages, then /var/log/syslog + found = False + ok, out = _cmd_ok(container_exec, f"journalctl --no-pager --since '1 minute ago' 2>/dev/null | grep '{tag}' || true", timeout=15) + if ok and tag in out: + found = True + + if not found: + for logfile in ("/var/log/messages", "/var/log/syslog"): + ok, out = _cmd_ok(container_exec, f"tail -50 {logfile} 2>/dev/null | grep '{tag}' || true") + if ok and tag in out: + found = True + break + + # Log entry visibility depends on journald/syslog being wired up in the container. + if found: + print(f"Log entry verified in system logs: {message}") + else: + print(f"WARNING: log entry not immediately visible (journald may not be running in container): {message}") + + +@pytest.mark.require_capability("container") +def test_bvt_mathematical_computing(container_exec) -> None: + """BVT: Shell arithmetic, bc floating-point, and python3 math.""" + # Shell integer arithmetic (always available via bash) + ok, out = _cmd_ok(container_exec, "echo $((2 + 2))") + assert ok and out == "4", f"Basic arithmetic failed: {out!r}" + + ok, out = _cmd_ok(container_exec, "echo $((123 * 456))") + assert ok and out == str(123 * 456), f"Multiplication failed: {out!r}" + + # bc (optional) + ok, out = _cmd_ok(container_exec, "echo 'scale=2; 22/7' | bc", timeout=10) + if ok: + val = float(out) + assert 3.1 < val < 3.2, f"bc pi approximation out of range: {val}" + print(f"bc: 22/7 = {val}") + else: + print("bc not available — skipping floating-point test") + + # python3 (optional) + py_cmd = r"""python3 -c "import math; print(f'{math.pi:.6f}'); print(f'{math.sqrt(16):.1f}'); print('OK')" """ + ok, out = _cmd_ok(container_exec, py_cmd, timeout=15) + if ok and "OK" in out: + assert "3.14159" in out, f"Unexpected pi: {out}" + assert "4.0" in out, f"Unexpected sqrt(16): {out}" + print(f"python3 math verified: {out.splitlines()[0]}") + else: + print("python3 not available — skipping Python math test") + + +@pytest.mark.require_capability("container") +@pytest.mark.require_capability("runtime-package-management") +@pytest.mark.requires_pkg("iproute", "iputils", "hostname") +def test_bvt_networking(container_exec) -> None: + """BVT: Network interfaces, loopback ping, hostname, and routing.""" + # Network interfaces + ok, out = _cmd_ok(container_exec, "ip addr show", timeout=10) + assert ok, "ip command failed — iproute2 may be missing" + inet_lines = [l.strip() for l in out.splitlines() if "inet " in l and "scope" in l] + assert len(inet_lines) >= 1, f"No inet addresses found:\n{out}" + print(f"Network interfaces ({len(inet_lines)} addresses): {inet_lines}") + + # Loopback presence (the inet 127.0.0.1 line above already proves loopback + # exists). ping is informational only: rootless podman with slirp4netns + # often blocks ICMP regardless of iputils being installed. + assert any("127.0.0.1" in l for l in inet_lines), "Loopback interface missing" + ok, out = _cmd_ok(container_exec, "ping -c 2 -W 3 127.0.0.1", timeout=15) + if ok: + print("Loopback ping succeeded") + else: + print(f"Loopback ping not available (likely rootless ICMP restriction): {out[:120]}") + + # Hostname (hostname package was installed via requires_pkg) + ok, out = _cmd_ok(container_exec, "hostname") + assert ok, f"hostname command failed: {out!r}" + print(f"Hostname: {out or '(empty - no --hostname set)'}") + + # Routing table (informational) + ok, out = _cmd_ok(container_exec, "ip route show", timeout=10) + if ok: + print(f"Routing table present, default route: {'default' in out}") + + # DNS config (informational) + ok, out = _cmd_ok(container_exec, "cat /etc/resolv.conf") + if ok: + print(f"DNS resolvers configured: {'nameserver' in out}") + + +@pytest.mark.require_capability("container") +def test_bvt_external_connectivity(container_exec) -> None: + """BVT: External DNS resolution and optional HTTP connectivity.""" + domains = ["google.com", "microsoft.com", "github.com"] + + ok_nslookup, _ = _cmd_ok(container_exec, "which nslookup", timeout=5) + if ok_nslookup: + resolved = sum( + 1 for domain in domains + for ok, out in [_cmd_ok(container_exec, f"nslookup {domain}", timeout=10)] + if ok and "NXDOMAIN" not in out + ) + print(f"DNS resolved {resolved}/{len(domains)} domains") + assert resolved >= 1, f"All DNS resolutions failed for {domains}" + else: + print("nslookup not available — skipping DNS resolution test") + + # HTTP connectivity (optional) + ok_curl, _ = _cmd_ok(container_exec, "which curl", timeout=5) + if ok_curl: + ok, out = _cmd_ok(container_exec, "curl -s --connect-timeout 5 --max-time 10 http://httpbin.org/get", timeout=15) + if ok and '"origin"' in out: + print("External HTTP connectivity verified") + else: + print("External HTTP test skipped or blocked by network policy") + else: + print("curl not available — skipping HTTP connectivity test") + + +@pytest.mark.require_capability("container") +def test_bvt_filesystem_operations(container_exec) -> None: + """BVT: File create, read, chmod, and delete in /tmp.""" + content = "Azure Linux BVT filesystem test" + path = "/tmp/bvt_test_file.txt" + + ok, _ = _cmd_ok(container_exec, f"echo '{content}' > {path}") + assert ok, f"File creation failed: {path}" + + ok, out = _cmd_ok(container_exec, f"cat {path}") + assert ok and content in out, f"File content mismatch: {out!r}" + + ok, out = _cmd_ok(container_exec, f"chmod 644 {path} && stat -c '%a' {path}") + assert ok and "644" in out, f"chmod/stat failed: {out!r}" + + ok, _ = _cmd_ok(container_exec, f"rm {path}") + assert ok, "File deletion failed" + + for sysfile in ("/etc/os-release", "/proc/version"): + ok, _ = _cmd_ok(container_exec, f"test -r {sysfile}") + assert ok, f"Cannot read required system file: {sysfile}" + + print("Filesystem operations verified") + + +@pytest.mark.require_capability("container") +@pytest.mark.require_capability("runtime-package-management") +@pytest.mark.requires_pkg("procps-ng") +def test_bvt_process_management(container_exec) -> None: + """BVT: Background process start, list, and kill.""" + ok, _ = _cmd_ok(container_exec, "sleep 60 &", timeout=5) + assert ok, "Failed to start background process" + + ok, out = _cmd_ok(container_exec, "ps aux | grep '[s]leep 60'", timeout=10) + assert ok and "sleep 60" in out, f"Background process not found in ps: {out}" + + ok, _ = _cmd_ok(container_exec, "pkill -f 'sleep 60'", timeout=10) + assert ok, "pkill failed" + + ok, out = _cmd_ok(container_exec, "ps aux | grep '[s]leep 60' || true") + assert "sleep 60" not in out, "Process still running after pkill" + print("Process management verified") + + +@pytest.mark.require_capability("container") +@pytest.mark.require_capability("runtime-package-management") +@pytest.mark.requires_pkg("shadow-utils") +def test_bvt_user_management(container_exec) -> None: + """BVT: Create, verify, switch to, and delete a transient test user.""" + user = "bvt_testuser" + + ok, out = _cmd_ok(container_exec, f"useradd -m {user}", timeout=15) + assert ok, f"useradd failed — shadow-utils may be missing: {out}" + + ok, out = _cmd_ok(container_exec, f"id {user}") + assert ok and user in out, f"User not found: {out}" + + ok, _ = _cmd_ok(container_exec, f"test -d /home/{user}") + assert ok, f"Home directory /home/{user} not created" + + # Verify user is in passwd file (su may not work in minimal containers without TTY) + ok, out = _cmd_ok(container_exec, f"grep '^{user}:' /etc/passwd") + assert ok, f"User not in passwd file: {user}" + + ok, _ = _cmd_ok(container_exec, f"userdel -r {user}", timeout=15) + assert ok, "userdel failed" + print(f"User management verified for: {user}") + + +@pytest.mark.require_capability("container") +@pytest.mark.require_capability("runtime-package-management") +def test_bvt_package_management(container_exec) -> None: + """BVT: Package manager (tdnf/dnf) cache refresh, list, and info.""" + ok_tdnf, _ = _cmd_ok(container_exec, "which tdnf", timeout=5) + pm = "tdnf" if ok_tdnf else "dnf" + + ok, out = _cmd_ok(container_exec, f"{pm} makecache", timeout=60) + assert ok, f"{pm} makecache failed: {out}" + + ok, out = _cmd_ok(container_exec, f"{pm} list --installed | head -20", timeout=30) + assert ok, f"Package listing failed: {out}" + assert any(kw in out.lower() for kw in ("azure", "bash", "filesystem")), \ + f"Expected Azure Linux packages not found in listing: {out[:200]}" + + ok, out = _cmd_ok(container_exec, f"{pm} info bash", timeout=15) + assert ok, f"Package info for bash failed: {out}" + print(f"Package management verified via {pm}") + + +@pytest.mark.require_capability("container") +def test_bvt_environment_variables(container_exec, container_info: dict) -> None: + """BVT: Essential environment variables and custom variable export.""" + ok, out = _cmd_ok(container_exec, "env") + assert ok, "env command failed" + assert "PATH=" in out, "Required env var PATH not set" + print([l for l in out.splitlines() if l.startswith("PATH=")][0]) + + ok, out = _cmd_ok(container_exec, "export BVT_VAR=azure_linux_bvt && echo $BVT_VAR") + assert ok and "azure_linux_bvt" in out, f"Custom env var not set: {out!r}" + + ok, out = _cmd_ok(container_exec, "echo $HOSTNAME") + assert ok and len(out) > 0, "HOSTNAME is empty" + print(f"Container HOSTNAME: {out}") + + +@pytest.mark.require_capability("container") +@pytest.mark.require_capability("runtime-package-management") +@pytest.mark.requires_pkg("procps-ng") +def test_bvt_container_health_summary(container_exec, container_info: dict) -> None: + """BVT: Aggregated health summary — OS info, memory, processes, packages.""" + health: dict = { + "container_name": container_info["container_name"], + "container_ip": container_info["ip_address"], + "timestamp": int(time.time()), + } + + ok, out = _cmd_ok(container_exec, "grep PRETTY_NAME /etc/os-release | cut -d= -f2 | tr -d '\"'") + assert ok and out, "Could not read /etc/os-release" + health["os"] = out + + ok, out = _cmd_ok(container_exec, "uname -r") + if ok: + health["kernel"] = out + + ok, out = _cmd_ok(container_exec, "uptime -s 2>/dev/null || uptime") + if ok: + health["uptime"] = out + + # Memory: read /proc/meminfo directly with pure shell (no awk dependency) + ok, out = _cmd_ok(container_exec, "grep '^MemTotal:' /proc/meminfo") + if ok and out: + # Format: "MemTotal: 16384000 kB" + parts = out.split() + if len(parts) >= 2 and parts[1].isdigit(): + health["memory_total_mb"] = int(parts[1]) // 1024 + + ok, out = _cmd_ok(container_exec, "ps aux --no-headers | wc -l") + if ok: + health["process_count"] = int(out) + + ok, out = _cmd_ok(container_exec, "rpm -qa | wc -l", timeout=15) + if ok: + health["package_count"] = int(out) + + assert health.get("container_name"), "Missing container name" + assert health.get("container_ip"), "Missing container IP" + assert health.get("os"), "Missing OS information" + assert health.get("memory_total_mb", 0) > 0, "Invalid memory reading" + assert health.get("process_count", 0) >= 1, "No processes detected" + + print("\n" + "=" * 60) + print("CONTAINER BVT HEALTH SUMMARY") + print("=" * 60) + print(json.dumps(health, indent=2)) + print("=" * 60) + + +# --------------------------------------------------------------------------- +# Ports of legacy CBL-Mariner ContainerBase BVT tests +# --------------------------------------------------------------------------- + +# First 50 digits after "3." of pi — used to verify high-precision arithmetic. +_PI_PREFIX_50 = "14159265358979323846264338327950288419716939937510" + + +@pytest.mark.require_capability("container") +@pytest.mark.require_capability("runtime-package-management") +@pytest.mark.requires_pkg("python3") +def test_bvt_pi_to_1000_places(container_exec) -> None: + """BVT: High-precision pi to 1000 digits via python3 ``decimal``. + + Ports the legacy "Pi to 1000 places". Uses Machin's formula in the + container's Python to avoid extra package dependencies. + """ + py_cmd = "python3 <<'PYEOF'\n" + ( + "from decimal import Decimal, getcontext\n" + "getcontext().prec = 1010\n" + "def atan(x):\n" + " s = Decimal(0); t = x; n = Decimal(1); sign = 1\n" + " x2 = x * x\n" + " while t / n != 0:\n" + " s += sign * t / n\n" + " t *= x2; n += 2; sign = -sign\n" + " return s\n" + "pi = 16 * atan(Decimal(1) / 5) - 4 * atan(Decimal(1) / 239)\n" + "print(str(pi)[:1002])\n" + ) + "PYEOF\n" + ok, out = _cmd_ok(container_exec, py_cmd, timeout=60) + assert ok, f"python3 pi computation failed — python3 may be missing: {out!r}" + # Output is "3." + 1000 digits + assert out.startswith("3."), f"Unexpected pi output prefix: {out[:20]!r}" + digits = out[2:] + assert len(digits) >= 1000, f"Got only {len(digits)} digits of pi" + assert digits.startswith(_PI_PREFIX_50), \ + f"Pi digits incorrect at the start: got {digits[:50]!r}" + print(f"Pi to 1000 places verified (first 50 digits: {digits[:50]})") + + +@pytest.mark.require_capability("container") +@pytest.mark.require_capability("runtime-package-management") +@pytest.mark.requires_pkg("python3") +def test_bvt_pi_repeated_iterations(container_exec) -> None: + """BVT: Compute pi to 1000 places repeatedly (CPU stress + consistency). + + Ports the legacy "Pi to N x 1000 places" — runs the calculation 10 times + and asserts every iteration yields the same prefix. + """ + py_cmd = "python3 <<'PYEOF'\n" + ( + "from decimal import Decimal, getcontext\n" + "getcontext().prec = 1010\n" + "def atan(x):\n" + " s = Decimal(0); t = x; n = Decimal(1); sign = 1\n" + " x2 = x * x\n" + " while t / n != 0:\n" + " s += sign * t / n\n" + " t *= x2; n += 2; sign = -sign\n" + " return s\n" + "results = set()\n" + "for _ in range(10):\n" + " results.add(str(16 * atan(Decimal(1) / 5) - 4 * atan(Decimal(1) / 239))[:52])\n" + "print(len(results), next(iter(results)))\n" + ) + "PYEOF\n" + ok, out = _cmd_ok(container_exec, py_cmd, timeout=120) + assert ok, f"python3 repeated pi computation failed: {out!r}" + parts = out.split(maxsplit=1) + assert len(parts) == 2, f"Unexpected output: {out!r}" + unique_count, sample = int(parts[0]), parts[1] + assert unique_count == 1, f"Pi values diverged across iterations: {unique_count} distinct values" + assert sample.startswith("3."), f"Bad pi sample: {sample!r}" + assert sample[2:].startswith(_PI_PREFIX_50), f"Pi prefix wrong: {sample!r}" + print(f"Pi to 1000 places consistent across 10 iterations: {sample[:52]}") + + +@pytest.mark.require_capability("container") +@pytest.mark.require_capability("runtime-package-management") +@pytest.mark.requires_pkg("curl") +def test_bvt_sustained_http_fetch(container_exec) -> None: + """BVT: Sustained external HTTP — 50 sequential fetches, ≥90% success. + + Ports the legacy "Core Networking Test" (50-iteration page fetch). + + The target URL defaults to ``http://httpbin.org/get`` but can be + overridden by setting the ``BVT_HTTP_ENDPOINT`` environment variable + on the host before running the test suite. If the first probe to the + endpoint fails (e.g. outbound networking is blocked in the CI + environment), the test is skipped rather than failed so that + unrelated image-quality issues are not masked by network policy. + """ + iterations = 50 + url = os.environ.get("BVT_HTTP_ENDPOINT", "http://httpbin.org/get") + + # Probe once first; skip the whole test if outbound networking is blocked. + ok, _ = _cmd_ok( + container_exec, + f"curl -s -o /dev/null -w '%{{http_code}}' --connect-timeout 5 --max-time 10 {url}", + timeout=15, + ) + if not ok: + pytest.skip(f"Outbound HTTP to {url!r} is unreachable — skipping sustained fetch test") + + # One bash loop is far cheaper than 50 podman exec invocations. + cmd = ( + f"i=0; ok=0; while [ $i -lt {iterations} ]; do " + f"curl -s -o /dev/null -w '%{{http_code}}\\n' " + f"--connect-timeout 5 --max-time 10 {url} | " + f"grep -q '^2' && ok=$((ok+1)); i=$((i+1)); done; echo $ok" + ) + ok, out = _cmd_ok(container_exec, cmd, timeout=iterations * 12) + assert ok, f"Sustained fetch loop failed: {out!r}" + success = int(out.strip().splitlines()[-1]) + success_rate = success / iterations + print(f"Sustained HTTP fetch: {success}/{iterations} succeeded ({success_rate:.0%})") + assert success_rate >= 0.9, \ + f"HTTP success rate {success_rate:.0%} below 90% threshold ({success}/{iterations})" diff --git a/base/images/tests/cases/container-base/static/test_container.py b/base/images/tests/cases/container-base/static/test_container.py new file mode 100644 index 00000000000..68642fa5745 --- /dev/null +++ b/base/images/tests/cases/container-base/static/test_container.py @@ -0,0 +1,135 @@ +# SPDX-License-Identifier: MIT + +from __future__ import annotations + +import os +import re +from pathlib import Path + +import pytest + +# Default upper-bound for the unpacked container rootfs (MiB). +# Ported from the legacy "Memory Usage Check" / "Container Size Max Check" +# in the CBL-Mariner ContainerBase BVT. Override via env var +# AZL_CONTAINER_MAX_SIZE_MB to adjust the gate per build flavor. +_DEFAULT_MAX_ROOTFS_MB = 250 + + + +def test_no_kernel_modules(rootfs: Path) -> None: + """Container images must not ship kernel modules.""" + for modules_dir in ( + rootfs / "lib" / "modules", + rootfs / "usr" / "lib" / "modules", + ): + if modules_dir.exists(): + versions = list(modules_dir.iterdir()) + assert not versions, ( + f"Container image has kernel modules under {modules_dir}: " + f"{[v.name for v in versions]}" + ) + +def test_file_exists_root(rootfs: Path) -> None: + """Root directory must exist.""" + assert rootfs.exists(), "Root directory does not exist" + + +def test_file_exists_bin_date(rootfs: Path) -> None: + """Date binary must exist and be executable by owner.""" + path = rootfs / "bin" / "date" + assert path.exists(), f"File {path} does not exist" + # Check executable by owner (user x bit) + mode = path.stat().st_mode + assert mode & 0o100, f"File {path} is not executable by owner" + + +def test_file_exists_bin_bash(rootfs: Path) -> None: + """Bash must exist.""" + path = rootfs / "bin" / "bash" + assert path.exists(), f"File {path} does not exist" + + +def test_file_exists_etc_dnf_dnf_conf(rootfs: Path) -> None: + """DNF config must exist.""" + path = rootfs / "etc" / "dnf" / "dnf.conf" + assert path.exists(), f"File {path} does not exist" + + +def test_file_not_exists_etc_dummy(rootfs: Path) -> None: + """Dummy file should not exist (negative test).""" + path = rootfs / "etc" / "dummy" + assert not path.exists(), f"File {path} should not exist" + + +def test_os_release(rootfs: Path, os_release: dict[str, str]) -> None: + """os-release must exist and identify the container variant. + + ID and VERSION_ID are validated globally in cases/test_os_release.py; + here we only assert the container-specific VARIANT_ID. + """ + path = rootfs / "etc" / "os-release" + assert path.exists(), f"File {path} does not exist" + assert os_release.get("VARIANT_ID") == "container", \ + f"Expected VARIANT_ID=container, got {os_release.get('VARIANT_ID')}" + + +def test_passwd(rootfs: Path) -> None: + """passwd must exist and contain a valid root entry.""" + path = rootfs / "etc" / "passwd" + assert path.exists(), f"File {path} does not exist" + content = path.read_text() + # Pattern: root:x:0:0:Super User:/root:/bin/bash or similar + pattern = r"^root:x:0:0:.*:/root:/bin/bash" + assert re.search(pattern, content, re.MULTILINE), \ + f"Root entry not found in passwd file matching pattern {pattern}" + + +def test_license_bash_exists(rootfs: Path) -> None: + """Bash license file must exist.""" + path = rootfs / "usr" / "share" / "licenses" / "bash" / "COPYING" + assert path.exists(), f"License file {path} does not exist" + + +def test_license_coreutils_single_exists(rootfs: Path) -> None: + """coreutils-single license file must exist.""" + path = rootfs / "usr" / "share" / "licenses" / "coreutils-single" / "COPYING" + assert path.exists(), f"License file {path} does not exist" + + +def test_root_home_dir_exists(rootfs: Path) -> None: + """Root user home directory must exist.""" + root_home = rootfs / "root" + assert root_home.exists(), "root home directory does not exist" + + +def test_container_rootfs_size(rootfs: Path) -> None: + """Container rootfs footprint must stay within the configured max. + + Ports the legacy ContainerBase BVT "Memory Usage Check" / + "Container Size Max Check": walks the extracted rootfs and asserts + the total on-disk size is under AZL_CONTAINER_MAX_SIZE_MB (MiB). + """ + raw = os.environ.get("AZL_CONTAINER_MAX_SIZE_MB", str(_DEFAULT_MAX_ROOTFS_MB)) + try: + max_mb = int(raw) + except ValueError: + pytest.fail( + f"AZL_CONTAINER_MAX_SIZE_MB must be an integer (got {raw!r}). " + "Set it to the maximum allowed rootfs size in MiB." + ) + + total_bytes = 0 + for dirpath, _dirnames, filenames in os.walk(rootfs, followlinks=False): + for name in filenames: + fp = Path(dirpath) / name + try: + # lstat: don't follow symlinks, don't double-count targets + total_bytes += fp.lstat().st_size + except (FileNotFoundError, PermissionError): + continue + + total_mb = total_bytes / (1024 * 1024) + print(f"Container rootfs size: {total_mb:.1f} MiB (limit {max_mb} MiB)") + assert total_mb <= max_mb, ( + f"Container rootfs is {total_mb:.1f} MiB, exceeds limit of {max_mb} MiB" + ) diff --git a/base/images/tests/cases/container-base/test_container.py b/base/images/tests/cases/container-base/test_container.py deleted file mode 100644 index 61a6fd5c43e..00000000000 --- a/base/images/tests/cases/container-base/test_container.py +++ /dev/null @@ -1,20 +0,0 @@ -# SPDX-License-Identifier: MIT -"""Container-specific validation tests.""" - -from __future__ import annotations - -from pathlib import Path - - -def test_no_kernel_modules(rootfs: Path) -> None: - """Container images must not ship kernel modules.""" - for modules_dir in ( - rootfs / "lib" / "modules", - rootfs / "usr" / "lib" / "modules", - ): - if modules_dir.exists(): - versions = list(modules_dir.iterdir()) - assert not versions, ( - f"Container image has kernel modules under {modules_dir}: " - f"{[v.name for v in versions]}" - ) diff --git a/base/images/tests/cases/test_os_release.py b/base/images/tests/cases/test_os_release.py index 518b0dcc16d..518124ccf05 100644 --- a/base/images/tests/cases/test_os_release.py +++ b/base/images/tests/cases/test_os_release.py @@ -3,7 +3,6 @@ from __future__ import annotations - def test_os_release_has_required_keys(os_release: dict[str, str]) -> None: """os-release must contain the distro-identifying keys.""" for key in ("NAME", "ID", "VERSION_ID"): diff --git a/base/images/tests/conftest.py b/base/images/tests/conftest.py index 91823ae3223..b21e81949d3 100644 --- a/base/images/tests/conftest.py +++ b/base/images/tests/conftest.py @@ -24,6 +24,12 @@ unmount_vm_image, ) from utils.parsers import parse_os_release, query_rpm_packages +from utils.container_runtime import ( + ContainerExecInstance, + create_container_with_exec, + destroy_exec_container, + resolve_image_reference, +) from utils.pytest_plugin import ( derive_image_type_from_capabilities, detect_image_type, @@ -208,3 +214,160 @@ def partition_table( for p in disk_info.partitions: logger.debug(" %s: type=%s mount=%s size=%d", p.device, p.type, p.mountpoint, p.size_bytes) return disk_info.partitions + + +# --------------------------------------------------------------------------- +# Dynamic Container Testing Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture(scope="session") +def _container_image_ref(image_path: Path, image_type: str) -> str | None: + """Load the container image once per session and cache the reference. + + This avoids calling ``podman load -i `` for every test when + ``--image-path`` points at an OCI tar archive. For non-container + sessions the fixture returns ``None`` immediately (no podman call). + """ + if image_type != "container": + return None + return resolve_image_reference(image_path) + + +@pytest.fixture +def running_container( + image_path: Path, image_type: str, image_name: str | None, workdir: Path, + _container_image_ref: str | None, + request: pytest.FixtureRequest +) -> ContainerExecInstance: + """Fresh container per test, with exec access and guaranteed teardown. + + A new container is started for every test that requests this + fixture (directly or via ``container_exec`` / ``container_info``) + and destroyed on teardown so tests cannot leak state to one another + (installed packages, created users, processes, files). The container + name is auto-generated to avoid collisions when tests run in + parallel. + + Container startup is gated on **fixture usage**, not on a marker: + requesting the fixture is itself the signal that the test needs a + live container. The only hard gate is image type — for non-container + images the fixture skips so VM-only sessions don't try to start + podman. + """ + if image_type != "container": + pytest.skip("running_container only applicable to container images") + + logger.info("Creating fresh container for test %s", request.node.name) + container = create_container_with_exec( + image_path, + container_name=None, # auto-generate a unique name per test + image_name=image_name, + image_ref=_container_image_ref, # pre-loaded once per session; avoids repeated podman load + ) + + try: + yield container + finally: + logger.info("Destroying container for test %s", request.node.name) + destroy_exec_container(container) + + +@pytest.fixture +def container_exec(running_container: ContainerExecInstance): + """Execute a shell command in the running container via ``podman exec``. + + Returns a callable ``(command: str, timeout: int = 60) -> + subprocess.CompletedProcess[str]``. + + The container starts as-shipped, but tests can opt-in to dependency + installation by declaring ``@pytest.mark.requires_pkg("pkg", ...)`` + on the test function. The autouse ``_apply_requires_pkg`` fixture + (below) installs those packages via ``tdnf`` or ``dnf`` in this same + container before the test body runs, and skips the test if the + install fails. Tests with no ``requires_pkg`` marker see the image + exactly as shipped — the container is never mutated implicitly. + """ + from utils.container_runtime import exec_container_command_raw + + def _exec(command: str, timeout: int = 60) -> subprocess.CompletedProcess[str]: + return exec_container_command_raw( + running_container.container_name, + ["bash", "-c", command], + timeout=timeout, + ) + return _exec + + +@pytest.fixture(autouse=True) +def _apply_requires_pkg(request: pytest.FixtureRequest) -> None: + """Honor ``@pytest.mark.requires_pkg(...)`` on runtime tests. + + Collects all packages declared by ``requires_pkg`` markers on the + test (multiple markers stack), installs them once via ``tdnf`` + (preferred) or ``dnf`` in the live container, and skips the test on + installation failure or when neither package manager is found. + + No-op for tests that don't carry the marker or don't request + ``container_exec`` (e.g. static rootfs tests). + """ + markers = list(request.node.iter_markers("requires_pkg")) + if not markers: + return + if "container_exec" not in request.fixturenames: + pytest.skip("requires_pkg marker is only valid for runtime container tests") + + pkgs: list[str] = [] + for m in markers: + pkgs.extend(m.args) + if not pkgs: + return + + container_exec = request.getfixturevalue("container_exec") + + # Detect which package manager is available in this container. + # AZL images may ship tdnf, dnf, or both; prefer tdnf when present. + pkg_mgr: str | None = None + for candidate in ("tdnf", "dnf"): + probe = container_exec(f"command -v {candidate!s}", timeout=10) + if probe.returncode == 0: + pkg_mgr = candidate + break + if pkg_mgr is None: + pytest.skip( + f"requires_pkg: neither tdnf nor dnf found in container; " + f"cannot install {pkgs!r}" + ) + + cmd = f"{pkg_mgr} install -y " + " ".join(pkgs) + logger.info("requires_pkg: installing %s", pkgs) + result = container_exec(cmd, timeout=300) + if result.returncode != 0: + pytest.skip( + f"requires_pkg: failed to install {pkgs!r} (rc={result.returncode}): " + f"{result.stderr.strip()[:200]}" + ) + + +@pytest.fixture +def container_info(running_container: ContainerExecInstance) -> dict[str, str]: + """Container runtime metadata (id, name, image, IP).""" + try: + from utils.container_runtime import exec_container_command_raw + ip_result = exec_container_command_raw( + running_container.container_name, + ["hostname", "-i"], + timeout=5 + ) + container_ip = ip_result.stdout.strip() if ip_result.returncode == 0 else "127.0.0.1" + except Exception: + container_ip = "127.0.0.1" + + return { + "container_id": running_container.container_id, + "container_name": running_container.container_name, + "image_ref": running_container.image_ref, + "ip_address": container_ip, + } + + diff --git a/base/images/tests/utils/container_runtime.py b/base/images/tests/utils/container_runtime.py new file mode 100644 index 00000000000..5c1bba432a8 --- /dev/null +++ b/base/images/tests/utils/container_runtime.py @@ -0,0 +1,315 @@ +# SPDX-License-Identifier: MIT +"""Container runtime orchestration for dynamic testing. + +Provides fixtures and utilities for creating running containers with exec access +for dynamic integration testing. +""" + +from __future__ import annotations + +import logging +import subprocess +import time +from pathlib import Path +from typing import NamedTuple + +from .tools import NativeTool + +logger = logging.getLogger(__name__) + +# Container runtime tools +PODMAN = NativeTool( + name="podman", + package_hint="podman", + reason="create and manage test containers", + when="container", +) + +class ContainerExecInstance(NamedTuple): + """Running container instance with podman exec access.""" + container_id: str + container_name: str + image_ref: str + + +class ContainerRuntimeError(Exception): + """Container runtime operation failed.""" + pass + + +def _run_container_cmd(cmd: list[str], timeout: int = 300, **kwargs: object) -> subprocess.CompletedProcess[str]: + """Run container command with proper error handling. + + Args: + cmd: Command and arguments to run. + timeout: Maximum seconds to wait (default 300). Prevents operations + like ``podman load`` or ``podman run`` from hanging indefinitely + in CI when storage or network layers stall. + """ + logger.info("Container runtime: %s", " ".join(cmd)) + result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout, **kwargs) + if result.returncode != 0: + logger.error( + "Container command failed (rc=%d): %s\nstdout: %s\nstderr: %s", + result.returncode, + " ".join(cmd), + result.stdout, + result.stderr, + ) + raise ContainerRuntimeError( + f"Container command failed: {' '.join(cmd)}\n" + f"Error: {result.stderr}" + ) + return result + + +def _parse_loaded_images(load_stdout: str) -> list[str]: + """Extract image references from ``podman load`` stdout. + + podman/docker print one or more lines of the form:: + + Loaded image: localhost/foo:bar + Loaded image(s): localhost/foo:bar,localhost/baz:qux + + We accept both forms, split comma-separated lists, and return the + references in the order they were reported. This is race-free — + unlike scanning ``podman images --sort created``, it cannot be + perturbed by other processes loading or pulling images concurrently. + """ + refs: list[str] = [] + for line in load_stdout.splitlines(): + line = line.strip() + # Match "Loaded image:" and "Loaded image(s):" (case-insensitive). + lowered = line.lower() + for prefix in ("loaded image(s):", "loaded image:"): + if lowered.startswith(prefix): + payload = line[len(prefix):].strip() + # Some versions emit comma-separated lists on one line. + for ref in payload.split(","): + ref = ref.strip() + if ref: + refs.append(ref) + break + return refs + + +def resolve_image_reference(image_path: Path) -> str: + """Return a podman-usable image reference for *image_path*. + + If *image_path* looks like an already-resolved image reference (contains + ``:``) the value is returned as-is. Otherwise the file is loaded via + ``podman load`` and the reference reported by podman is returned. + """ + image_str = str(image_path) + + # If it's already a container reference (contains :), use directly + if ":" in image_str and not image_str.startswith("/"): + return image_str + + # If it's an OCI archive file, load it first + if image_path.exists() and image_path.suffix in (".tar", ".gz", ".xz"): + logger.info("Loading image archive: %s", image_path) + load_cmd = [PODMAN.name, "load", "-i", str(image_path)] + load_result = _run_container_cmd(load_cmd) + + # Parse the actual reference(s) reported by `podman load` rather + # than picking "the most recently created image", which is racy + # under parallel test runs or a busy dev machine where another + # process may have just pulled/loaded an image. + loaded_images = _parse_loaded_images(load_result.stdout) + if not loaded_images: + # Some podman versions write the "Loaded image:" line to + # stderr; fall back to that before giving up. + loaded_images = _parse_loaded_images(load_result.stderr) + if not loaded_images: + raise ContainerRuntimeError( + f"Could not determine image reference from `podman load` output " + f"for {image_path}.\nstdout: {load_result.stdout!r}\n" + f"stderr: {load_result.stderr!r}" + ) + if len(loaded_images) > 1: + logger.info( + "Archive %s contained %d images; using the first: %s (others: %s)", + image_path, len(loaded_images), loaded_images[0], loaded_images[1:], + ) + return loaded_images[0] + + # Assume it's a direct image reference + return image_str + + +def create_container_with_exec( + image_path: Path, + container_name: str | None = None, + image_name: str | None = None, + image_ref: str | None = None, +) -> ContainerExecInstance: + """Create a running container with podman exec access. + + Args: + image_path: Path to image file or image reference. Ignored when + *image_ref* is provided. + container_name: Container name (auto-generated if ``None``). + image_name: Human-readable image name used in log messages. + image_ref: Pre-resolved image reference (skips ``podman load``). + When provided, ``image_path`` is not touched. Use this to + amortize the ~10s tarball load across many container creates. + """ + + # Generate container name if not provided + if container_name is None: + timestamp = int(time.time()) + import random + random_suffix = random.randint(1000, 9999) + container_name = f"azl-test-{timestamp}-{random_suffix}" + + # Get image reference (load if needed) — but skip if caller already + # resolved one for us (e.g. session-scoped fixture). + if image_ref is None: + image_ref = resolve_image_reference(image_path) + logger.info( + "Creating exec container %s from image %s%s", + container_name, image_ref, + f" ({image_name})" if image_name else "", + ) + + # Create and start container + run_cmd = [ + PODMAN.name, "run", "-d", + "--name", container_name, + "--replace", # Replace existing container with same name + "--tmpfs", "/run", + "--tmpfs", "/tmp", + image_ref, + "sleep", "infinity" # Keep container running indefinitely + ] + + result = _run_container_cmd(run_cmd) + container_id = result.stdout.strip() + + # Wait briefly for container to start, then verify exec access. + # If anything goes wrong from here on we MUST tear the container + # down — otherwise a flaky readiness probe leaks containers in CI + # and on dev machines, and subsequent runs fail with name conflicts + # or resource exhaustion. + try: + logger.info("Waiting for exec container %s to start...", container_name) + time.sleep(2) + + test_result = exec_container_command_raw( + container_name, ["echo", "container-ready"] + ) + if test_result.returncode != 0 or "container-ready" not in test_result.stdout: + raise ContainerRuntimeError( + f"Container exec test failed for {container_name} " + f"(rc={test_result.returncode}, stdout={test_result.stdout!r}, " + f"stderr={test_result.stderr!r})" + ) + except BaseException: + # Best-effort cleanup; never let cleanup errors mask the original. + # BaseException covers KeyboardInterrupt / SystemExit too — we + # still want the container removed when the user Ctrl-C's mid-probe. + logger.warning( + "Readiness probe failed for %s; removing leaked container", + container_name, + ) + try: + subprocess.run( + [PODMAN.name, "rm", "-f", container_name], + capture_output=True, text=True, timeout=30, + ) + except Exception as cleanup_exc: # pragma: no cover + logger.warning( + "Failed to remove leaked container %s: %s", + container_name, cleanup_exc, + ) + raise + + logger.info( + "Exec container ready: %s (ID: %s)", + container_name, container_id[:12] + ) + + return ContainerExecInstance( + container_id=container_id, + container_name=container_name, + image_ref=image_ref, + ) + + +def exec_container_command_raw( + container_name: str, + command: list[str], + timeout: int = 60, + **kwargs: object +) -> subprocess.CompletedProcess[str]: + """Execute command in container via podman exec (raw version).""" + exec_cmd = [PODMAN.name, "exec", container_name] + command + + logger.debug("Container exec: %s", " ".join(command)) + return subprocess.run( + exec_cmd, + capture_output=True, + text=True, + timeout=timeout, + **kwargs + ) + + +def exec_container_command( + container: ContainerExecInstance, + command: str, + timeout: int = 60, + check: bool = True +) -> subprocess.CompletedProcess[str]: + """Execute command in container via podman exec. + + Args: + container: Container instance + command: Shell command to execute + timeout: Command timeout in seconds + check: Raise exception on non-zero exit code + + Returns: + Completed process with stdout/stderr + """ + result = exec_container_command_raw( + container.container_name, + ["bash", "-c", command], + timeout=timeout + ) + + if check and result.returncode != 0: + logger.error( + "Container exec failed (rc=%d): %s\nstdout: %s\nstderr: %s", + result.returncode, command, result.stdout, result.stderr + ) + raise ContainerRuntimeError( + f"Container exec failed: {command}\n" + f"Exit code: {result.returncode}\n" + f"Error: {result.stderr}" + ) + + return result + + +def destroy_exec_container(container: ContainerExecInstance) -> None: + """Stop and remove exec container.""" + logger.info("Destroying exec container %s", container.container_name) + + # Stop container + try: + _run_container_cmd([ + PODMAN.name, "stop", "-t", "10", container.container_name + ]) + except ContainerRuntimeError: + logger.warning("Failed to stop container gracefully") + + # Remove container + try: + _run_container_cmd([ + PODMAN.name, "rm", "-f", container.container_name + ]) + except ContainerRuntimeError: + logger.warning("Failed to remove container") + diff --git a/base/images/tests/utils/pytest_plugin.py b/base/images/tests/utils/pytest_plugin.py index 2528fba187a..9cdb0291738 100644 --- a/base/images/tests/utils/pytest_plugin.py +++ b/base/images/tests/utils/pytest_plugin.py @@ -102,12 +102,25 @@ def pytest_configure(config) -> None: # type: ignore[no-untyped-def] """Register markers and fail fast if required native tools are missing.""" config.addinivalue_line( "markers", - "require_capability(name): skip test unless the image has the named capability", + "require_capability(name): skip test unless the image has the named " + "capability. Each marker takes exactly one capability; stack the " + "decorator to require several.", ) config.addinivalue_line( "markers", "image(name): only run this test when --image-name matches", ) + config.addinivalue_line( + "markers", + "requires_pkg(*names): preinstall the named packages in the live container " + "before the test runs; skip the test if installation fails", + ) + config.addinivalue_line( + "markers", + "runtime_container_tests: auto-applied to tests under cases/*/runtime/; " + "used for -m runtime_container_tests filtering. Container startup is " + "driven by fixture usage (running_container), not by this marker.", + ) from utils.tools import check_tools @@ -135,10 +148,11 @@ def pytest_configure(config) -> None: # type: ignore[no-untyped-def] "Run 'uv run python -m utils.tools' for a full status check." ) - def pytest_runtest_setup(item: pytest.Item) -> None: """Skip tests whose marks are not satisfied.""" # require_capability: skip if image doesn't have the required capability. + # Each marker takes exactly one capability name; stack the decorator + # multiple times to require several capabilities on a single test. caps = parse_capabilities(item.config.getoption("--capabilities", default=None)) for marker in item.iter_markers("require_capability"): required = marker.args[0] if marker.args else None @@ -152,7 +166,6 @@ def pytest_runtest_setup(item: pytest.Item) -> None: if expected and image_name != expected: pytest.skip(f"test is specific to image '{expected}' (running: '{image_name}')") - def pytest_collection_modifyitems(config, items) -> None: # type: ignore[no-untyped-def] """Auto-apply ``@pytest.mark.image("")`` to tests under ``cases//``. @@ -179,3 +192,11 @@ def pytest_collection_modifyitems(config, items) -> None: # type: ignore[no-unt if cases_idx + 2 < len(parts): image_dir = parts[cases_idx + 1] item.add_marker(pytest.mark.image(image_dir)) + + # Auto-apply `runtime_container_tests` marker to any test that + # lives under a `runtime/` subdirectory anywhere below `cases/`. + # This is purely for `-m runtime_container_tests` filtering; the + # `running_container` fixture itself triggers container startup + # on fixture usage, not on this marker. + if "runtime" in parts[cases_idx:]: + item.add_marker(pytest.mark.runtime_container_tests)