Importing Transcripts

Overview

You can populate a transcript database in a variety of ways depending on where your transcript data lives and how it is managed:

  1. Inspect Logs: Read transcript data from Inspect eval log files.

  2. LangSmith and Logfire: Read transcript data from LLM observability platforms.

  3. Transcript API: Python API for creating and inserting transcripts.

  4. Arrow Import: Efficient direct insertion using RecordBatchReader.

  5. Parquet Data Lake: Use an existing data lake not created using Inspect Scout.

We’ll cover each of these in turn below. Before proceeding though you should be sure to familiarize yourself with the Database Schema and make a plan for how you want to map your data into it.

Inspect Logs

While Scout can read Inspect logs directly, you might prefer to keep them in a transcript database. You can easily import Inspect logs as follows:

from inspect_scout import transcripts_db, transcripts_from

async with transcripts_db("s3://my-transcript-db/") as db:
    await db.insert(transcripts_from("./logs"))

You could also insert a filtered list of transcripts:

from inspect_scout import columns as c

async with transcripts_db("s3://my-transcript-db/") as db:
    transcripts = (
        transcripts_from("./logs")
        .where(c.task_set == "cybench")
        .where(c.model.like("openai/%"))
    )
    await db.insert(transcripts)

LangSmith

LangSmith is LangChain’s platform for tracing, evaluating, and monitoring LLM applications. Scout can import transcripts from LangSmith traces, supporting:

  • LangChain agents and chains (via langchain tracing)
  • OpenAI API calls (via wrap_openai)
  • Anthropic API calls (via wrap_anthropic)

Use the langsmith() transcript source to import traces from a LangSmith project. You can filter by tags, time range, or using LangSmith’s filter syntax:

from inspect_scout import transcripts_db
from inspect_scout.sources import langsmith

async with transcripts_db("s3://my-transcript-db/") as db:
    await db.insert(langsmith(
        project="my-langsmith-project",
        tags=["production"],
    ))
NoteAuthentication

Set the LANGSMITH_API_KEY environment variable to authenticate with LangSmith. You can create an API key from LangSmith Settings.

Logfire

Logfire is Pydantic’s observability platform for Python applications. Scout can import transcripts from Logfire traces created by any of the supported instrumentors:

  • Pydantic AI (logfire.instrument_pydantic_ai())
  • OpenAI (logfire.instrument_openai())
  • Anthropic (logfire.instrument_anthropic())
  • Google GenAI (logfire.instrument_google_genai())

Use the logfire() transcript source to import traces. You can filter by time range, specific trace ID, or using SQL WHERE fragments:

from inspect_scout import transcripts_db
from inspect_scout.sources import logfire

async with transcripts_db("s3://my-transcript-db/") as db:
    await db.insert(logfire(
        filter="tags @> ARRAY['production']",
    ))
NoteAuthentication

Set the LOGFIRE_READ_TOKEN environment variable to authenticate with Logfire. You can create a read token from Logfire Settings > Read Tokens.

Transcript API

You can import transcripts from any source so long as you can create Transcript objects to be imported. In this example imagine we have a read_weave_transcripts() function which can read transcripts from an external JSON transcript format:

from inspect_scout import transcripts_db

from .readers import read_json_transcripts

# create/open database
async with transcripts_db("s3://my-transcripts") as db:

    # read transcripts to insert
    transcripts = read_json_transcripts()

    # insert into database
    await db.insert(transcripts)

Once you’ve created a database and populated it with transcripts, you can read from it using transcripts_from():

from inspect_scout import scan, transcripts_from

scan(
    scanners=[...],
    transcripts=transcripts_from("s3://my-transcripts")
)

Streaming

Each call to db.insert() will minimally create one Parquet file, but will break transcripts across multiple files as required (typically of size 75-100MB). This will create a storage layout optimized for fast queries and content reading. Consequently, when importing a large number of transcripts you should always write a generator to yield transcripts rather than making many calls to db.insert() (which is likely to result in more Parquet files than is ideal).

For example, we might implement read_json_transcripts() like this:

from pathlib import Path
from typing import AsyncIterator
from inspect_scout import Transcript

async def read_json_transcripts(dir: Path) -> AsyncIterator[Transcript]:
    json_files = list(dir.rglob("*.json"))
    for json_file in json_files:
        yield await json_to_transcript(json_file)

async def json_to_transcript(json_file: Path) -> Transcript:
    # convert json_file to Transcript
    return Transcript(...)

We can then pass this generator function directly to db.insert():

async with transcripts_db("s3://my-transcripts") as db:
    await db.insert(read_json_transcripts())

Note that transcript insertion is idempotent—once a transcript with a given ID has been inserted it will not be inserted again. This means that you can safely resume imports that are interrupted, and only new transcripts will be added.

Transcripts

Here is how we might implement json_to_transcript():

from pathlib import Path
from typing import AsyncIterator
from inspect_ai.model import (
    messages_from_openai, model_output_from_openai
)
from inspect_scout import Transcript

async def json_to_transcript(json_file: Path) -> Transcript:
    with open(json_file, "r") as f:
        json_data: dict[str,Any] = json.loads(f.read())
        return Transcript(
            transcript_id = json_data["trace_id"],
            source_type="abracadabra",
            source_id=json_data["project_id"],
            metadata=json_data["attributes"],
            messages=await json_to_messages(
                input=json_data["inputs"]["messages"], 
                output=json_data["output"]
            )
        )
    
# convert raw model input and output to inspect messages
async def json_to_messages(
    input: list[dict[str, Any]], output: dict[str, Any]
) -> list[ChatMessage]:
    # start with input messages
    messages = await messages_from_openai(input)

    # extract and append assistant message from output
    output = await model_output_from_openai(output)
    messages.append(output.message)

    # return full message history for transcript
    return messages

Note that we use the messages_from_openai() and model_output_from_openai() function from inspect_ai to convert the raw model payloads in the trace data to the correct types for the transcript database.

The most important fields to populate are transcript_id and messages. The source_* fields are also useful for providing additional context. The metadata field, while not required, is a convenient way to provide additional transcript attributes which may be useful for filtering or analysis. The events field is not required and useful primarily for more complex multi-agent transcripts.

Arrow Import

In some cases you may already have arrow-accessible data (e.g. from Parquet files or a database that supports yielding arrow batches) that you want to insert directly into a transcript database. So long as your data conforms to the schema, you can do this by passing a PyArrow RecordBatchReader to db.insert().

For example, to read from existing Parquet files using the PyArrow dataset API:

import pyarrow.dataset as ds
from inspect_scout import transcripts_db

# read from existing parquet files
dataset = ds.dataset("path/to/parquet/files", format="parquet")
reader = dataset.scanner().to_reader()

async with transcripts_db("s3://my-transcripts") as db:
    await db.insert(reader)

You can also use DuckDB to query and transform data before import:

import duckdb
from inspect_scout import transcripts_db

conn = duckdb.connect("my_database.db")
reader = conn.execute("""
    SELECT
        trace_id as transcript_id,
        messages,
        'myapp' as source_type,
        project_id as source_id
    FROM traces
""").fetch_record_batch()

async with transcripts_db("s3://my-transcripts") as db:
    await db.insert(reader)

Parquet Data Lake

If you have transcripts already stored in Parquet format you don’t need to use db.insert() at all. So long as your Parquet files conform to the transcript database schema then you can read them directly using transcripts_from(). For example:

from inspect_scout import transcripts_from

# read from an existing parquet data lake
transcripts = transcripts_from(
    "s3://my-transcripts-data-lake/cyber"
)

Note that several fields in the database schema are marked as JSON—these should be encoded as serialized JSON strings within your parquet files.

You can validate that your custom data lake conforms to the schema using the scout db validate command. For example:

scout db validate s3://my-transcripts-data-lake/cyber

Transcript Index

One critical additional step required for a custom data lake is to build a transcript index. While this is not required, it can make querying across large sets of transcripts dramatically faster (10x or better) as well as improve the handling of large databases in Scout View.

To build an index, use the scout db index command. For example:

scout db index s3://my-transcripts-data-lake/cyber

You should run this command whenever you add or remove transcripts from your data lake (transcripts will not be visible to clients until the index is updated). While building an index is not required, it is highly reccommend if you want optimal query performance.