DeepSeek Data Extraction: High-Volume JSON Pipelines

Leverage DeepSeek's JSON mode and aggressive pricing for massive data extraction. Cache-aware batch design, retry patterns for empty JSON output, and production extraction pipeline architecture.

June 11, 2026
DeepSeekData ExtractionJSONBatch ProcessingStructured Output

DeepSeek's combination of JSON output mode and aggressive pricing ($0.14/M input on Flash) makes it the most cost-effective model for high-volume structured extraction. At 10-50x cheaper than Claude or GPT, extraction pipelines that were cost-prohibitive become viable — processing millions of documents for cents instead of dollars.

But DeepSeek's JSON mode has a known quirk: occasional empty output. Production extraction pipelines need retry logic, schema validation, and cache-aware batching. This page covers the patterns that make DeepSeek extraction reliable at scale.

JSON Output Mode

Enabling JSON Mode

response = client.chat.completions.create(
    model="deepseek-v4-flash",  # Flash for cost efficiency
    messages=[
        {"role": "system", "content": "Extract data in JSON format. Output ONLY valid JSON."},
        {"role": "user", "content": f"Parse this invoice:\n\n{invoice_text}"}
    ],
    response_format={"type": "json_object"},
    max_tokens=2048  # Set appropriately to prevent truncation
)

data = json.loads(response.choices[0].message.content)

Critical Requirements

  1. Must include "json" in the prompt — the system or user message must contain the word "json"
  2. Set adequate max_tokens — truncated JSON is invalid JSON
  3. Provide an example of desired format — guides the model's output structure
  4. Handle empty output — JSON mode occasionally returns empty content

Production Extraction Pipeline

Architecture

import json
import asyncio
from openai import AsyncOpenAI, APIError

class DeepSeekExtractor:
    def __init__(self, model="deepseek-v4-flash", max_retries=3):
        self.client = AsyncOpenAI(
            base_url="https://api.deepseek.com",
            api_key="<key>"
        )
        self.model = model
        self.max_retries = max_retries

    async def extract(self, document: str, schema: dict) -> dict | None:
        """Extract structured data from a document with retry logic."""

        system_prompt = f"""Extract data from the document in JSON format.
Output ONLY valid JSON matching this schema:
{json.dumps(schema, indent=2)}

Rules:
- Return ONLY the JSON object, no markdown fences, no explanatory text
- If a field is missing, use null
- For dates, use ISO 8601 format (YYYY-MM-DD)
- For currency, use numbers without symbols"""

        for attempt in range(self.max_retries):
            try:
                response = await self.client.chat.completions.create(
                    model=self.model,
                    messages=[
                        {"role": "system", "content": system_prompt},
                        {"role": "user", "content": document}
                    ],
                    response_format={"type": "json_object"},
                    max_tokens=4096
                )

                content = response.choices[0].message.content

                # Handle empty output (known DeepSeek JSON mode issue)
                if not content or not content.strip():
                    print(f"Empty output on attempt {attempt + 1}, retrying...")
                    continue

                return json.loads(content)

            except json.JSONDecodeError:
                print(f"Invalid JSON on attempt {attempt + 1}, retrying...")
                continue
            except APIError as e:
                print(f"API error on attempt {attempt + 1}: {e}")
                if attempt < self.max_retries - 1:
                    await asyncio.sleep(2 ** attempt)  # Exponential backoff
                continue

        return None  # All retries exhausted

Batch Processing with Cache Optimization

async def batch_extract(documents: list[str], schema: dict):
    """Process documents in cache-optimized batches."""

    extractor = DeepSeekExtractor()

    # Group identical schema documents for cache efficiency
    # The system prompt (which includes schema) is the cache prefix
    results = await asyncio.gather(*[
        extractor.extract(doc, schema)
        for doc in documents
    ])

    return results

# Process 10,000 invoices — cost ~$1.40 on Flash
documents = load_invoices(10_000)
schema = {
    "invoice_number": "string",
    "date": "string (ISO 8601)",
    "vendor": {"name": "string", "tax_id": "string|null"},
    "total": "number",
    "currency": "string (ISO 4217)"
}

results = await batch_extract(documents, schema)

Schema Definition Patterns

Defensive Schema Design

# GOOD: Defensive schema with null handling
schema = {
    "invoice_number": "string or null — if not found, use null",
    "date": "string or null — ISO 8601 format if present",
    "vendor": {
        "name": "string — 'Unknown Vendor' if missing",
        "tax_id": "string or null"
    },
    "line_items": [{
        "description": "string",
        "quantity": "number",
        "unit_price": "number",
        "total": "number"
    }],
    "subtotal": "number or null",
    "tax": "number or null",
    "total": "number or null",
    "currency": "string or null — default null if not specified"
}

# BAD: Schema assumes all fields present
schema = {
    "invoice_number": "string",
    "date": "string",
    "total": "number"
}
# Model will invent values for missing fields

Field Aliases

system_prompt = f"""Extract data in JSON format.

FIELD DEFINITIONS:
- invoice_number: The invoice ID. May be labeled as:
  'Invoice #', 'Inv No.', 'Reference', 'Document Number', 'Bill No.'
- total: The final amount. May be labeled as:
  'Total', 'Grand Total', 'Amount Due', 'Balance', 'Pay This Amount'

If multiple labels are found, use the most specific one.
"""

Quality Assurance Pipeline

Validation Layer

def validate_extraction(data: dict, schema: dict) -> list[str]:
    """Validate extracted data against expected schema."""
    issues = []

    # Required field check
    for field in schema:
        if field not in data:
            issues.append(f"Missing required field: {field}")

    # Type checking
    if "total" in data and data["total"] is not None:
        if not isinstance(data["total"], (int, float)):
            issues.append(f"total should be number, got {type(data['total'])}")

    # Cross-field validation
    if data.get("subtotal") and data.get("tax") and data.get("total"):
        expected = data["subtotal"] + data["tax"]
        if abs(expected - data["total"]) > 0.01:
            issues.append(f"total ({data['total']}) != subtotal + tax ({expected})")

    return issues

Two-Pass Extraction for Critical Documents

async def critical_extraction(document: str, schema: dict) -> dict:
    """Two-pass extraction with reconciliation for high-value documents."""

    extractor = DeepSeekExtractor(model="deepseek-v4-pro")  # Pro for accuracy

    # Pass 1: Standard extraction
    result_1 = await extractor.extract(document, schema)

    # Pass 2: Extraction with different prompt phrasing (reduces systematic errors)
    alt_prompt = f"Parse this document. Extract: {json.dumps(schema)}"
    result_2 = await extractor.extract(document, schema)

    # Compare and reconcile
    if result_1 and result_2:
        for key in schema:
            if result_1.get(key) != result_2.get(key):
                # Disagreement — flag for human review
                result_1[f"_{key}_DISPUTED"] = True
                result_1[f"_{key}_alt"] = result_2.get(key)

    return result_1

Cost Analysis

VolumeDocumentsAvg Tokens/DocFlash CostGPT-4o CostSavings
Small1,0001,000$0.14$2.5094%
Medium10,000500$0.70$12.5094%
Large100,000250$3.50$62.5094%
Massive1,000,000100$14.00$250.0094%

With context caching (repeated schema + system prompt), effective costs drop further — cache-hit input tokens cost $0.0028/M instead of $0.14/M.

Note:

Pro Move: For recurring extraction (daily invoices, weekly reports), use a fixed system prompt that never changes. The system prompt becomes a permanent cache prefix. Only the document (user message) varies — but it's typically much shorter than the cached prefix, so cache-hit rates exceed 90% after the first few requests.

Note:

Empty output mitigation: JSON mode occasionally returns empty content. Always implement retry logic (2-3 attempts). If empty output persists, try rephrasing the prompt — add the word "json" in a different position or provide a more explicit format example. The issue is intermittent and retry typically resolves it.

  • Cost Optimization Patterns — Cache-aware batching is critical for high-volume extraction pipelines.
  • Context Caching — Design extraction prompts for maximum cache hits when processing repeated document types.