Advanced

Metadata

Flow supports two types of metadata with distinct purposes: metadata and flow_metadata.

metadata (Inspect AI Metadata)

The metadata field in FlowOptions and FlowTask is passed directly to Inspect AI and stored in evaluation logs. Use this for tracking experiment information that should be accessible in Inspect AI’s log viewer and analysis tools.

Example:

config.py
from inspect_flow import FlowOptions, FlowSpec, FlowTask

FlowSpec(
    log_dir="logs",
    options=FlowOptions(
        metadata={
            "experiment": "baseline_v1",
            "hypothesis": "Higher temperature improves creative tasks",
            "hardware": "A100-80GB",
        }
    ),
    tasks=[
        FlowTask(
            name="inspect_evals/gpqa_diamond",
            model="openai/gpt-4o",
            metadata={
                "task_variant": "chemistry_subset",
                "note": "Testing with reduced context",
            },
        )
    ],
)

The metadata from FlowOptions is applied globally to all tasks in the evaluation run, while task-level metadata is specific to each task. These metadata dictionaries are merged in Inspect AI, with task-level metadata keys overriding the global options.

flow_metadata (Flow-Only Metadata)

The flow_metadata field is available on FlowSpec, FlowTask, FlowModel, FlowScorer, FlowSolver, and FlowAgent. This metadata is not passed to Inspect AI—it exists only in the Flow configuration and is useful for configuration-time logic and organization.

Use cases:

  • Filtering or selecting configurations based on properties
  • Organizing complex configuration generation logic
  • Documenting configuration decisions
  • Annotating configs without polluting Inspect AI logs

Example: Configuration-time filtering

config.py
from inspect_flow import FlowModel, FlowSpec, tasks_matrix

# Define models with metadata about capabilities
models = [
    FlowModel(name="openai/gpt-4o", flow_metadata={"context_window": 128000}),
    FlowModel(name="openai/gpt-4o-mini", flow_metadata={"context_window": 128000}),
    FlowModel(
        name="anthropic/claude-3-5-sonnet", flow_metadata={"context_window": 200000}
    ),
]

# Filter to only long-context models
long_context_models = [
    m for m in models if (m.flow_metadata or {}).get("context_window", 0) >= 128000
]

FlowSpec(
    log_dir="logs",
    tasks=tasks_matrix(
        task="long_context_task",
        model=long_context_models,
    ),
)

Parameterized Jobs

Flow Args (--arg)

Pass custom variables to Python config files using --arg or the INSPECT_FLOW_ARG environment variable. Use this for dynamic configuration that isn’t available via --set. To access the args the last statement in the config file should be a function that returns a FlowSpec. This function will be called with any provided args:

flow run config.py --arg task_min_priority=2
config.py
from inspect_flow import FlowSpec, FlowTask

all_tasks = [
    FlowTask(name="task_easy", flow_metadata={"priority": 1}),
    FlowTask(name="task_medium", flow_metadata={"priority": 2}),
    FlowTask(name="task_hard", flow_metadata={"priority": 3}),
]


def spec(task_min_priority: int = 1) -> FlowSpec:
    return FlowSpec(
        log_dir="logs",
        tasks=[
            t
            for t in all_tasks
            if (t.flow_metadata or {}).get("priority", 0) >= task_min_priority
        ],
    )

Template Substitution

Use {field_name} syntax to reference other FlowSpec configuration values. Substitutions are applied after the config is loaded:

FlowSpec(
    log_dir="logs/my_eval",
    options=FlowOptions(bundle_dir="{log_dir}/bundle"),
    # Result: bundle_dir="logs/my_eval/bundle"
)

For nested fields, use bracket notation: {options[eval_set_id]} or {flow_metadata[key]}. Substitutions are resolved recursively until no more remain.

Viewer Bundling

Viewer bundling works the same way as eval_set() in Inspect AI and is configurable via FlowOptions. An additional feature allows you to print bundle URLs for users running evaluations.

Convert local bundle paths to public URLs for sharing evaluation results. The bundle_url_mappings in FlowOptions applies string replacements to bundle_dir to generate a shareable URL that’s printed to stdout after the evaluation completes.

config.py
from inspect_flow import FlowOptions, FlowSpec, FlowTask

FlowSpec(
    log_dir="logs/my_eval",
    options=FlowOptions(
        bundle_dir="s3://my-bucket/bundles/my_eval",
        bundle_url_mappings={"s3://my-bucket": "https://my-bucket.s3.amazonaws.com"},
    ),
    tasks=[FlowTask(name="task", model="openai/gpt-4o")],
)

After running this prints: Bundle URL: https://my-bucket.s3.amazonaws.com/bundles/my_eval

Use this when storing bundles on cloud storage like S3 or on servers with public HTTP access. Multiple mappings are applied in order.

Using Bundle URL maps makes sense along with the spec inheritance feature so you can configure the same bundle mapping for all configs in a repository.

Config Scripts

When loading a configuration file, Flow expects the last expression to either be a FlowSpec or a function that returns a FlowSpec. Other than this requirement, the configuration file may execute arbitrary code.

after_load

Configuration scripts are executed while loading the spec. At the time that the script is running the spec is in an intermediate state (includes may not have been processed, overrides not applied, and template substitutions will not have run). To run code after the spec is fully loaded a script can decorate a function with @after_load.

The decorated function may optionally implement the following arguments:

  • spec - the fully loaded FlowSpec
  • files - the list of configuration files that were loaded

One example of functionality that can be implemented using this feature is validation code to enforce constraints. Instead of repeating this validation code in every Flow configuration file, the code could be placed in a _flow.py file that is auto included.

Prevent Runs with Uncommitted Changes

Place a _flow.py file at your repository root to validate that all configs are in clean git repositories. This validation runs automatically for all configs in subdirectories.

_flow.py
import subprocess
from pathlib import Path

from inspect_flow import after_load


def check_repo(path: str) -> None:
    abs_path = Path(path).resolve()
    check_dir = abs_path if abs_path.is_dir() else abs_path.parent

    result = subprocess.run(
        ["git", "status", "--porcelain"],
        cwd=check_dir,
        capture_output=True,
        text=True,
        check=True,
    )

    if result.stdout.strip():
        raise RuntimeError(f"The repository at {check_dir} has uncommitted changes.")


@after_load
def validate_no_dirty_repo(files: list[str]) -> None:
    # Check no config files are in a dirty git repo
    for path in files:
        check_repo(path)
config.py
# Automatically inherits _flow.py
from inspect_flow import FlowSpec, FlowTask

FlowSpec(
    log_dir="logs",
    tasks=[FlowTask(name="inspect_evals/gpqa_diamond", model="openai/gpt-4o")],
)
# Will fail if uncommitted changes exist in the repository

Lock Config Fields

A _flow.py file can prevent configs from overriding critical settings:

_flow.py
from inspect_flow import FlowOptions, FlowSpec, after_load

MAX_SAMPLES = 16


@after_load
def validate_max_samples(spec: FlowSpec) -> None:
    if not spec.options or not spec.options.max_samples == MAX_SAMPLES:
        raise ValueError("Do not override max_samples!")


FlowSpec(
    options=FlowOptions(max_samples=MAX_SAMPLES),
)
config.py
# Automatically inherits _flow.py
from inspect_flow import FlowOptions, FlowSpec, FlowTask

FlowSpec(
    log_dir="logs",
    options=FlowOptions(max_samples=32),  # Will raise ValueError!
    tasks=[FlowTask(name="inspect_evals/gpqa_diamond", model="openai/gpt-4o")],
)

This pattern is useful for enforcing organizational standards (resource limits, safety constraints, etc.) across all evaluation configs in a repository.