File-Based ETL Agent Blueprint

AI agent that builds ETL pipelines from CSV and JSON files: infers schemas, validates data, transforms and cleans, validates output, and generates documentation. Self-contained, no external DB needed.

June 11, 2026
etl-agentdata-pipelinecsvjsondata-transformationagent-blueprint

File-Based ETL Agent

An AI agent that builds end-to-end ETL pipelines from flat files. It infers schemas, validates input, cleans and transforms data, validates output quality, and generates pipeline documentation. Entirely file-based — reads CSVs/JSONs in, writes transformed files out. No database or orchestrator required.

Note:

This agent is designed for data analysts and engineers who need repeatable transformations on file-based data. Point it at a directory of CSVs, describe the target schema, and it builds the pipeline. Works with local files only — no cloud or database dependencies.

Agent File Structure

file-etl-agentadd
agent.pyadd
tools.pyadd
config.jsonadd

Setup

1

Install Dependencies

Install the OpenAI client plus pandas for data operations.

pip install openai pandas
2

Create config.json

Configure input and output directories.

{
  "openai_api_key": "sk-...",
  "model": "gpt-4o",
  "max_iterations": 10,
  "input_directory": "./data/raw",
  "output_directory": "./data/processed",
  "pipeline_file": "./pipeline.py",
  "max_input_rows": 1000
}

Note:

The agent writes transformation code to pipeline_file. Point it at a new file or one you're comfortable overwriting.

3

Verify

Run the agent on a sample data directory.

python agent.py --task "Clean and deduplicate sales data, output as CSV with consistent date format, remove rows with null revenue"

The agent should analyze input files, generate a pipeline script, run it, and validate the output.

System Prompt

You are a data engineer specializing in file-based ETL pipelines. Your job
is to build, run, and validate data transformation pipelines. Follow this protocol:

1. THOUGHT: What data is available? What does the user need?
2. ACTION: List input files, sample data, infer schemas
3. Design the pipeline: which files to read, what transformations to apply,
   how to validate output
4. Write the pipeline as a Python script using pandas
5. Run the pipeline — if it fails, analyze the error and fix the script
6. Validate output: check row counts, null percentages, schema consistency
7. FINAL_PIPELINE: The working pipeline script + validation report + documentation

Rules:
- Use pandas for all transformations — no Spark, no Dask, no external DBs
- The pipeline must be idempotent (running it twice produces the same output)
- Handle common data issues: nulls, duplicates, type mismatches, encoding errors
- Validate that the output matches the user's stated requirements
- Write clear comments in the pipeline script explaining each step
- If input data is > 1000 rows, sample it for analysis but process the full dataset

### Tool Definitions

<ParameterGrid title="Agent Tools" items={[
  { param: "list_input_files", description: "List all CSV and JSON files in the input directory with file sizes and row counts.", values: "none" },
  { param: "sample_data", description: "Read the first N rows of a file plus schema info (column names, dtypes, null counts).", values: "path: string, n_rows?: int (default 50)" },
  { param: "infer_schema", description: "Infer the schema of a file: column names, inferred types, unique value counts, null percentages, sample values.", values: "path: string" },
  { param: "write_pipeline", description: "Write the ETL pipeline script to the configured pipeline file. Overwrites existing content.", values: "code: string" },
  { param: "run_pipeline", description: "Execute the pipeline script and capture stdout, stderr, and exit code.", values: "none" },
  { param: "validate_output", description: "Check output files: row count, null %, schema consistency, dedup check, and comparison against input stats.", values: "none" }
]} />

### Tool Implementation

```python
# tools.py
import os
import json
import subprocess
import pandas as pd

INPUT_DIR = None
OUTPUT_DIR = None
PIPELINE_FILE = None
MAX_INPUT_ROWS = 1000

def list_input_files():
    if not os.path.exists(INPUT_DIR):
        return f"ERROR: Input directory not found: {INPUT_DIR}"
    files = []
    for f in sorted(os.listdir(INPUT_DIR)):
        full = os.path.join(INPUT_DIR, f)
        if f.endswith((".csv", ".json", ".jsonl")):
            size_kb = os.path.getsize(full) / 1024
            try:
                if f.endswith(".csv"):
                    df = pd.read_csv(full, nrows=0)
                elif f.endswith(".jsonl"):
                    df = pd.read_json(full, lines=True, nrows=0)
                else:
                    df = pd.read_json(full, nrows=0)
                cols = list(df.columns)
                full_df = pd.read_csv(full) if f.endswith(".csv") else pd.read_json(full, lines=f.endswith(".jsonl"))
                files.append(f"{f} | {size_kb:.0f}KB | {len(full_df)} rows | columns: {', '.join(cols[:10])}")
            except Exception as e:
                files.append(f"{f} | {size_kb:.0f}KB | ERROR: {str(e)[:80]}")
    return "\n".join(files) if files else f"No CSV/JSON files in {INPUT_DIR}"

def sample_data(path, n_rows=50):
    full = os.path.join(INPUT_DIR, path) if not path.startswith("/") else path
    if not os.path.exists(full):
        return f"ERROR: File not found: {path}"
    try:
        # Get full row count without loading all data
        total_rows = _count_rows(full)
        # Load sample for inspection
        df = _read_file(full)
        sample = df.head(n_rows)
        shape_note = f"{total_rows} rows" if total_rows else f"{len(df)} rows (sampled)"
        return f"Shape: ({shape_note}, {len(df.columns)} columns)\nColumns: {list(df.columns)}\nDtypes:\n{df.dtypes.to_string()}\n\nFirst {min(n_rows, len(df))} rows:\n{sample.to_string()}"
    except Exception as e:
        return f"ERROR reading {path}: {e}"

def infer_schema(path):
    full = os.path.join(INPUT_DIR, path) if not path.startswith("/") else path
    if not os.path.exists(full):
        return f"ERROR: File not found: {path}"
    try:
        df = _read_file(full)
        info = []
        for col in df.columns:
            null_count = df[col].isnull().sum()
            null_pct = (null_count / len(df)) * 100
            unique = df[col].nunique()
            dtype = str(df[col].dtype)
            sample_vals = df[col].dropna().head(3).tolist()
            info.append({
                "column": col,
                "dtype": dtype,
                "null_pct": round(null_pct, 1),
                "unique_values": unique,
                "sample": sample_vals
            })
        return json.dumps(info, indent=2, default=str)
    except Exception as e:
        return f"ERROR: {e}"

def write_pipeline(code):
    os.makedirs(os.path.dirname(PIPELINE_FILE) if os.path.dirname(PIPELINE_FILE) else ".", exist_ok=True)
    with open(PIPELINE_FILE, "w") as f:
        f.write(code)
    return f"Pipeline written to {PIPELINE_FILE} ({len(code)} bytes)"

def run_pipeline():
    try:
        result = subprocess.run(
            ["python", PIPELINE_FILE],
            capture_output=True, text=True, timeout=120,
            cwd=os.path.dirname(PIPELINE_FILE) or "."
        )
        output = result.stdout[-4000:] if len(result.stdout) > 4000 else result.stdout
        errors = result.stderr[-2000:] if len(result.stderr) > 2000 else result.stderr
        return f"Exit code: {result.returncode}\n\nSTDOUT:\n{output}\n\nSTDERR:\n{errors}"
    except subprocess.TimeoutExpired:
        return "Pipeline timed out after 120s. Check for infinite loops or very large datasets."

def validate_output():
    if not os.path.exists(OUTPUT_DIR):
        return "ERROR: Output directory does not exist. Pipeline may have failed to create it."
    files = [f for f in os.listdir(OUTPUT_DIR) if f.endswith((".csv", ".json", ".jsonl"))]
    if not files:
        return "WARNING: No output files found. Pipeline may have produced no data."

    report = []
    for f in sorted(files):
        full = os.path.join(OUTPUT_DIR, f)
        try:
            df = _read_file(full)
            nulls = {c: round(df[c].isnull().sum() / len(df) * 100, 1) for c in df.columns}
            dups = df.duplicated().sum()
            report.append({
                "file": f,
                "rows": len(df),
                "columns": list(df.columns),
                "null_percentages": nulls,
                "duplicate_rows": int(dups),
                "size_kb": round(os.path.getsize(full) / 1024, 1)
            })
        except Exception as e:
            report.append({"file": f, "error": str(e)})
    return json.dumps(report, indent=2, default=str)

def _read_file(path):
    if path.endswith(".csv"):
        return pd.read_csv(path, nrows=MAX_INPUT_ROWS)
    elif path.endswith(".jsonl"):
        return pd.read_json(path, lines=True, nrows=MAX_INPUT_ROWS)
    else:
        return pd.read_json(path, nrows=MAX_INPUT_ROWS)


def _count_rows(path):
    """Count rows in a file without loading all data."""
    if path.endswith(".csv"):
        with open(path, "r") as f:
            return sum(1 for _ in f) - 1  # subtract header
    elif path.endswith(".jsonl"):
        with open(path, "r") as f:
            return sum(1 for line in f if line.strip())
    else:
        df = pd.read_json(path, nrows=0)
        return None  # JSON arrays can't be counted without full load

Agent Initialization

# agent.py
import json
import argparse
from openai import OpenAI
import tools as agent_tools

TOOL_SCHEMAS = [
    {
        "type": "function",
        "function": {
            "name": "list_input_files",
            "description": "List CSV/JSON files in the input directory with sizes and row counts",
            "parameters": {"type": "object", "properties": {}, "required": []}
        }
    },
    {
        "type": "function",
        "function": {
            "name": "sample_data",
            "description": "Read first N rows of a file with schema and dtype info",
            "parameters": {
                "type": "object",
                "properties": {
                    "path": {"type": "string"},
                    "n_rows": {"type": "integer", "default": 50}
                },
                "required": ["path"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "infer_schema",
            "description": "Infer schema: column types, null %, unique counts, sample values",
            "parameters": {
                "type": "object",
                "properties": {"path": {"type": "string"}},
                "required": ["path"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "write_pipeline",
            "description": "Write the ETL pipeline script to the configured pipeline file",
            "parameters": {
                "type": "object",
                "properties": {"code": {"type": "string"}},
                "required": ["code"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "run_pipeline",
            "description": "Execute the pipeline script and return stdout/stderr",
            "parameters": {"type": "object", "properties": {}, "required": []}
        }
    },
    {
        "type": "function",
        "function": {
            "name": "validate_output",
            "description": "Check output files: row count, null %, duplicates, schema",
            "parameters": {"type": "object", "properties": {}, "required": []}
        }
    }
]

SYSTEM_PROMPT = """You are a data engineer specializing in file-based ETL pipelines.
Your job is to build, run, and validate data transformation pipelines.

Protocol:
1. THOUGHT: What data is available? What does the user need?
2. ACTION: List input files, sample data, infer schemas
3. Design the pipeline: which files to read, what transformations,
   how to validate output
4. Write the pipeline as a Python script using pandas
5. Run the pipeline — fix errors until it works
6. Validate output against user requirements
7. FINAL_PIPELINE: working pipeline script + validation report + documentation

Rules:
- Use pandas for all transformations — no external DBs or orchestrators
- Pipeline must be idempotent
- Handle: nulls, duplicates, type mismatches, encoding errors
- Validate output matches user requirements
- Write clear comments in the pipeline
- Sample data for analysis, process full dataset for final run"""


def run_agent(task: str, config: dict):
    client = OpenAI(api_key=config["openai_api_key"])
    model = config.get("model", "gpt-4o")

    agent_tools.INPUT_DIR = config.get("input_directory", "./data/raw")
    agent_tools.OUTPUT_DIR = config.get("output_directory", "./data/processed")
    agent_tools.PIPELINE_FILE = config.get("pipeline_file", "./pipeline.py")
    agent_tools.MAX_INPUT_ROWS = config.get("max_input_rows", 1000)

    os.makedirs(agent_tools.OUTPUT_DIR, exist_ok=True)

    messages = [
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": f"Build an ETL pipeline for this task: {task}"}
    ]

    for i in range(config.get("max_iterations", 10)):
        response = client.chat.completions.create(
            model=model,
            messages=messages,
            tools=TOOL_SCHEMAS,
            temperature=0.1
        )

        msg = response.choices[0].message
        messages.append(msg)

        if msg.content and "FINAL_PIPELINE:" in msg.content:
            return msg.content.split("FINAL_PIPELINE:", 1)[1].strip()

        if not msg.tool_calls:
            messages.append({
                "role": "user",
                "content": "Continue. Use tools to analyze data, write the pipeline, run it, and validate. End with FINAL_PIPELINE."
            })
            continue

        for tool_call in msg.tool_calls:
            func_name = tool_call.function.name
            func_args = json.loads(tool_call.function.arguments)
            func = getattr(agent_tools, func_name)
            result = func(**func_args)
            messages.append({
                "role": "tool",
                "tool_call_id": tool_call.id,
                "content": result
            })

    return "Agent reached max iterations."


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--task", required=True, help="ETL task description")
    parser.add_argument("--config", default="config.json")
    args = parser.parse_args()

    with open(args.config) as f:
        config = json.load(f)

    result = run_agent(args.task, config)
    print(result)
    print(f"\nPipeline saved to: {config.get('pipeline_file', './pipeline.py')}")
    print(f"Output files in: {config.get('output_directory', './data/processed')}")

Walkthrough

Cleaning and deduplicating a messy sales dataset.

1

Agent surveys input files

list_input_files() returns:

  • sales_2024.csv | 854KB | 12,430 rows | columns: date, product, revenue, region, rep
  • customers.json | 112KB | 2,840 rows | columns: id, name, segment, since

infer_schema("sales_2024.csv") reveals issues: 8% null revenue, mixed date formats (MM/DD/YYYY and YYYY-MM-DD), duplicate rows by product+date+region.

2

Agent writes the pipeline

write_pipeline(code) creates a script that:

  • Parses dates with pd.to_datetime(..., errors='coerce') handling mixed formats
  • Drops rows with null revenue
  • Deduplicates on ['date', 'product', 'region']
  • Joins with customers.json on customer_id
  • Outputs clean_sales.csv and summary_stats.json
3

Runs and fixes the pipeline

run_pipeline() fails on first attempt — pd.merge() raises a KeyError because the CSV column is rep but the script references customer_id. The agent reads the error, corrects the column name, rewrites the script, and runs again. Passes on second attempt.

4

Validates output

validate_output() confirms:

  • clean_sales.csv: 11,238 rows (1,192 duplicates removed), 0% null revenue, consistent date format
  • summary_stats.json: revenue by region, top products, monthly trend
5

Delivers FINAL_PIPELINE

The agent returns the working pipeline script plus a validation report:

FINAL_PIPELINE:
- Input: 12,430 raw rows from sales_2024.csv + 2,840 customers
- Output: 11,238 clean rows in clean_sales.csv
- Removed: 1,192 duplicates, 0 null-revenue rows
- Fixed: mixed date formats, column name mismatch (rep → customer_id)
- Pipeline is idempotent and documented with inline comments

Customization

Pipeline Settings

input_directory
Where the agent looks for CSV and JSON input files. Supports .csv, .json, and .jsonl formats.

Values: path relative to agent.py

output_directory
Where transformed files are written. Created automatically if it doesn't exist.

Values: path relative to agent.py

pipeline_file
Where the generated pipeline script is saved. Overwritten on each run.

Values: path to .py file

max_input_rows
Rows to sample during schema inference. The pipeline processes all rows, but analysis uses this limit to stay fast.

Values: 100-10000 (default 1000)

Note:

Reusable pipeline. The generated pipeline script is a standalone Python file. After the agent creates it, you can run it directly with python pipeline.py — no agent needed for subsequent runs. This is the key difference from other blueprints: the output is a reusable asset.

Key Takeaway

A file-based ETL agent is most valuable when you have messy datasets that need repeatable cleaning. The agent generates a documented, idempotent pipeline you can commit to your repo. For one-off data analysis, a direct pandas script is faster. Use the agent when the pipeline will run regularly or when non-technical team members need to understand what the transformations do.