Source code for dash_evals.runner.sandboxes.podman.podman

"""Simple Podman sandbox environment for inspect_ai.

This module provides a minimal PodmanSandboxEnvironment that uses the Podman CLI
directly (not podman-compose) for running containers.
"""

import asyncio
import base64
import logging
import os
import tempfile
from pathlib import Path
from typing import Literal, Union, overload

from inspect_ai.util._sandbox.environment import (
    SandboxConnection,
    SandboxEnvironment,
    SandboxEnvironmentConfigType,
)
from inspect_ai.util._sandbox.limits import SandboxEnvironmentLimits
from inspect_ai.util._sandbox.registry import sandboxenv
from inspect_ai.util._subprocess import ExecResult

logger = logging.getLogger(__name__)

# Default Flutter sandbox image (built from dataset/sandboxes/podman/Containerfile)
DEFAULT_IMAGE = "localhost/flutter-sandbox:latest"


[docs] @sandboxenv(name="podman") class PodmanSandboxEnvironment(SandboxEnvironment): """Simple Podman-based sandbox environment."""
[docs] def __init__(self, container_id: str, working_dir: str = "/workspace"): super().__init__() self.container_id = container_id self._working_dir = working_dir
[docs] @classmethod def config_files(cls) -> list[str]: return ["compose.yaml", "Containerfile", "Dockerfile"]
[docs] @classmethod def default_concurrency(cls) -> int | None: return (os.cpu_count() or 1) * 2
[docs] @classmethod async def task_init(cls, task_name: str, config: SandboxEnvironmentConfigType | None) -> None: """Validate podman is available.""" try: proc = await asyncio.create_subprocess_exec( "podman", "--version", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) _, stderr = await proc.communicate() if proc.returncode != 0: raise RuntimeError(f"Podman check failed: {stderr.decode()}") except FileNotFoundError: raise RuntimeError( "Podman executable not found. Please ensure podman is installed and in your PATH." )
[docs] @classmethod async def sample_init( cls, task_name: str, config: SandboxEnvironmentConfigType | None, metadata: dict[str, str], ) -> dict[str, SandboxEnvironment]: """Start a container for this sample.""" # Determine image from config or use default image = DEFAULT_IMAGE if isinstance(config, str) and not config.endswith((".yaml", ".yml")): image = config # Start container (no TTY to avoid control chars, sleep to keep running) # Mount /tmp so workspace files copied by setup_workspace are accessible tmp_dir = tempfile.gettempdir() cmd = [ "podman", "run", "-d", "--rm", "-v", f"{tmp_dir}:{tmp_dir}", # Mount temp dir for workspace sharing image, "sleep", "infinity", ] try: proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await proc.communicate() if proc.returncode != 0: raise RuntimeError(f"Failed to start podman container: {stderr.decode()}") except FileNotFoundError: raise RuntimeError( "Podman executable not found. Please ensure podman is installed and in your PATH." ) container_id = stdout.decode().strip() logger.info(f"Started podman container: {container_id[:12]}") return {"default": cls(container_id=container_id)}
[docs] @classmethod async def sample_cleanup( cls, task_name: str, config: SandboxEnvironmentConfigType | None, environments: dict[str, SandboxEnvironment], interrupted: bool, ) -> None: """Stop and remove containers.""" for env in environments.values(): if isinstance(env, PodmanSandboxEnvironment): logger.info(f"Cleaning up container: {env.container_id[:12]}") await asyncio.create_subprocess_exec( "podman", "rm", "-f", env.container_id, stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL, )
[docs] @classmethod async def task_cleanup( cls, task_name: str, config: SandboxEnvironmentConfigType | None, cleanup: bool ) -> None: """No task-level cleanup needed - containers are removed per-sample.""" pass
[docs] @classmethod async def cli_cleanup(cls, id: str | None) -> None: """CLI cleanup for orphaned containers.""" if id: await asyncio.create_subprocess_exec( "podman", "rm", "-f", id, stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL, )
[docs] async def exec( self, cmd: list[str], input: str | bytes | None = None, cwd: str | None = None, env: dict[str, str] | None = None, user: str | None = None, timeout: int | None = None, timeout_retry: bool = True, concurrency: bool = True, truncate: bool = True, ) -> ExecResult[str]: """Execute command inside the container.""" if env is None: env = {} podman_cmd = ["podman", "exec", "-i"] # Working directory final_cwd = cwd if cwd else self._working_dir podman_cmd.extend(["--workdir", final_cwd]) # User if user: podman_cmd.extend(["--user", user]) # Environment variables for k, v in env.items(): podman_cmd.extend(["--env", f"{k}={v}"]) podman_cmd.append(self.container_id) podman_cmd.extend(cmd) proc = await asyncio.create_subprocess_exec( *podman_cmd, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdin_data = input.encode() if isinstance(input, str) else input stdout, stderr = await asyncio.wait_for( proc.communicate(input=stdin_data), timeout=timeout, ) except asyncio.TimeoutError: try: proc.kill() except ProcessLookupError: pass raise TimeoutError(f"Command timed out after {timeout}s") from None stdout_decoded = stdout.decode("utf-8", errors="replace") stderr_decoded = stderr.decode("utf-8", errors="replace") # Truncate if too large if truncate: limit = SandboxEnvironmentLimits.MAX_EXEC_OUTPUT_SIZE if len(stdout_decoded) > limit: stdout_decoded = stdout_decoded[:limit] + "...[TRUNCATED]" if len(stderr_decoded) > limit: stderr_decoded = stderr_decoded[:limit] + "...[TRUNCATED]" return ExecResult( success=(proc.returncode == 0), returncode=proc.returncode or 0, stdout=stdout_decoded, stderr=stderr_decoded, )
[docs] async def write_file(self, file: str, contents: str | bytes) -> None: """Write file to container.""" # Ensure directory exists dir_path = Path(file).parent.as_posix() if dir_path and dir_path != ".": await self.exec(["mkdir", "-p", dir_path]) # Handle binary content with base64 encoding via stdin (not command line) if isinstance(contents, bytes): b64_content = base64.b64encode(contents).decode("ascii") result = await self.exec( ["sh", "-c", f'base64 -d > "{file}"'], input=b64_content, ) else: result = await self.exec( ["sh", "-c", f'cat > "{file}"'], input=contents, ) if not result.success: if "permission denied" in result.stderr.lower(): raise PermissionError(f"Permission denied writing {file}") raise RuntimeError(f"Failed to write file {file}: {result.stderr}")
@overload async def read_file(self, file: str, text: Literal[True] = True) -> str: ... @overload async def read_file(self, file: str, text: Literal[False]) -> bytes: ...
[docs] async def read_file(self, file: str, text: bool = True) -> Union[str, bytes]: """Read file from container.""" if text: # Text mode: use cat directly result = await self.exec(["cat", file], truncate=False) if not result.success: if "No such file" in result.stderr: raise FileNotFoundError(f"File not found: {file}") if "permission denied" in result.stderr.lower(): raise PermissionError(f"Permission denied reading {file}") raise RuntimeError(f"Failed to read file {file}: {result.stderr}") if len(result.stdout) > SandboxEnvironmentLimits.MAX_READ_FILE_SIZE: raise RuntimeError(f"File {file} exceeds size limit") return result.stdout else: # Binary mode: use base64 to transfer safely (-w0 disables line wrapping) result = await self.exec(["sh", "-c", f'base64 -w0 "{file}"'], truncate=False) if not result.success: if "No such file" in result.stderr: raise FileNotFoundError(f"File not found: {file}") if "permission denied" in result.stderr.lower(): raise PermissionError(f"Permission denied reading {file}") raise RuntimeError(f"Failed to read file {file}: {result.stderr}") decoded = base64.b64decode(result.stdout.strip()) if len(decoded) > SandboxEnvironmentLimits.MAX_READ_FILE_SIZE: raise RuntimeError(f"File {file} exceeds size limit") return decoded
[docs] async def connection(self, *, user: str | None = None) -> SandboxConnection: """Get connection info for debugging.""" cmd_parts = ["podman", "exec", "-it"] if user: cmd_parts.extend(["--user", user]) cmd_parts.extend([self.container_id, "/bin/bash"]) return SandboxConnection( type="podman", command=" ".join(cmd_parts), vscode_command=None, ports=None, container=self.container_id, )
[docs] def default_polling_interval(self) -> float: return 0.2