Skip to content

Rate Ingestion System - Implementation Status

Last Updated: December 20, 2025

Overview

The Rate Ingestion System extracts structured rate data from radio station rate cards (PDFs, images) using AI-powered OCR and semantic extraction.

Architecture

┌─────────────────┐     ┌──────────────────┐     ┌─────────────────┐
│  Source Files   │────▶│  Mistral OCR     │────▶│  Claude         │
│  (PDF/PNG/JPG)  │     │  /v1/ocr         │     │  Extraction     │
└─────────────────┘     └──────────────────┘     └────────┬────────┘
┌─────────────────┐     ┌──────────────────┐             ▼
│  FastAPI        │◀────│  DuckDB          │◀────┌─────────────────┐
│  REST API       │     │  Database        │     │  Validation     │
└─────────────────┘     └──────────────────┘     └─────────────────┘

Current Status

Processing Progress

Metric Count Notes
Total Rates 4,946 376% increase from baseline
- From Airtable (baseline) 1,040 Original imported data
- From AI Extraction 3,906 Extracted this session
Total Documents 5,085 All rate card files
Documents Completed 383 7.5% processed
Documents Pending 4,520 Remaining to process
Documents Failed 122 OCR or extraction errors
Needs Review 59 Low confidence extractions
Total Stations 1,908 Radio stations in network

Extraction Performance

Metric Value
Avg Rates per Document 10.5
Image Success Rate ~95% (Pixtral)
PDF Success Rate ~65% (Mistral OCR)
Avg Confidence Score 0.75-0.95
Processing Speed ~50 docs / 15 min

Components

1. OCR Service (src/workers/ocr_service.py)

Mistral OCR (PDFs): - Endpoint: POST https://api.mistral.ai/v1/ocr - Model: mistral-ocr-latest - Returns: Markdown-formatted text with table preservation

Pixtral Vision (Images): - Endpoint: POST https://api.mistral.ai/v1/chat/completions - Model: pixtral-large-latest - Supports: PNG, JPG, JPEG

2. Rate Extractor (src/workers/rate_extractor.py)

  • Uses Claude API to extract structured JSON from OCR text
  • Outputs: Station, ad type, time slots, durations, gross/net rates
  • Validates and normalizes all extracted data

3. Document Processor (src/workers/document_processor.py)

Orchestrates the pipeline:

# Process pending documents (recommended batch size: 50)
python3 -u src/workers/document_processor.py --limit 50

# Process specific document
python3 src/workers/document_processor.py --document-id <uuid>

4. REST API (src/api/)

Station Endpoints

Method Endpoint Description
GET /api/v1/stations List stations with filters
GET /api/v1/stations/{callsign} Station details
GET /api/v1/stations/{callsign}/rates Station rates

Rate Card CRUD Endpoints

Rate cards group multiple rates for a station/year. All write operations require JWT authentication.

Method Endpoint Description
GET /api/v1/rate-cards List rate cards with filters
GET /api/v1/rate-cards/{id} Get rate card with embedded rates
POST /api/v1/rate-cards Create rate card with nested rates
PATCH /api/v1/rate-cards/{id} Partial update metadata
DELETE /api/v1/rate-cards/{id} Delete rate card (cascades to rates)

Bulk Rate Operations

Method Endpoint Description
POST /api/v1/rate-cards/{id}/rates Add multiple rates to a card
PATCH /api/v1/rate-cards/{id}/rates Bulk update rates by ID
DELETE /api/v1/rate-cards/{id}/rates Bulk delete rates by ID

Individual Rate CRUD Endpoints

Method Endpoint Description
GET /api/v1/rates Query rates with filters
GET /api/v1/rates/{id} Get single rate
POST /api/v1/rates Create single rate
PUT /api/v1/rates/{id} Replace rate entirely
PATCH /api/v1/rates/{id} Partial update rate
DELETE /api/v1/rates/{id} Delete rate

Specialized Adjustment Endpoints (AI Correction Workflow)

These endpoints allow targeted updates to specific rate fields without affecting others:

Method Endpoint Description
PATCH /api/v1/rates/{id}/pricing Adjust gross/net rates only
PATCH /api/v1/rates/{id}/time-slot Adjust time slot fields only
PATCH /api/v1/rates/{id}/classification Adjust ad_type, preemption only

Rate Analytics Endpoints

Method Endpoint Description
GET /api/v1/rates/summary Aggregate statistics
GET /api/v1/rates/by-state Rates grouped by state
GET /api/v1/rates/by-ad-type Rates grouped by ad type
GET /api/v1/rates/by-duration Rates grouped by duration
GET /api/v1/rates/years Available rate card years
GET /api/v1/rates/count Count rates with filters

Document Endpoints

Method Endpoint Description
GET /api/v1/documents List documents
GET /api/v1/documents/pending Processing queue

API Features

Auto-Calculation

When creating or updating rates, if only one of gross_rate or net_rate is provided: - net_rate = gross_rate * 0.85 (15% commission) - gross_rate = net_rate / 0.85

Station Denormalization

When creating rates, station_callsign and station_state are automatically populated from the station record for query efficiency.

Atomic Operations

  • POST /rate-cards with nested rates creates all records in a single transaction
  • Bulk create operations are atomic (all-or-nothing)
  • Bulk update/delete allows partial success with error reporting

API Usage Examples

Create Rate Card with Nested Rates

curl -X POST "https://ruralamfm.nominate.ai/api/v1/rate-cards" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "station_id": "abc-123-uuid",
    "year": 2025,
    "rate_type": "Political",
    "rates": [
      {
        "effective_date": "2025-01-01",
        "ad_type": "candidate",
        "duration_seconds": 60,
        "gross_rate": 45.00,
        "slot_days": "m-f",
        "slot_start_time": "06:00",
        "slot_end_time": "10:00"
      },
      {
        "effective_date": "2025-01-01",
        "ad_type": "issue",
        "duration_seconds": 30,
        "gross_rate": 25.00,
        "slot_days": "m-f"
      }
    ]
  }'

Correct Rate Pricing (AI Workflow)

# Fix gross rate - net_rate auto-calculated as 42.50
curl -X PATCH "https://ruralamfm.nominate.ai/api/v1/rates/rate-123/pricing" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"gross_rate": 50.00}'

Correct Time Slot

curl -X PATCH "https://ruralamfm.nominate.ai/api/v1/rates/rate-123/time-slot" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "slot_days": "m-sat",
    "slot_start_time": "05:00",
    "slot_end_time": "09:00"
  }'

Bulk Add Rates to Existing Card

curl -X POST "https://ruralamfm.nominate.ai/api/v1/rate-cards/card-123/rates" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "rates": [
      {"effective_date": "2025-01-01", "ad_type": "candidate", "duration_seconds": 15, "gross_rate": 15.00},
      {"effective_date": "2025-01-01", "ad_type": "candidate", "duration_seconds": 90, "gross_rate": 65.00}
    ]
  }'

Query Rates with Filters

# Get Texas candidate rates for 60-second spots
curl "https://ruralamfm.nominate.ai/api/v1/rates?state=TX&ad_type=candidate&duration=60"

# Get rates between $20-$50
curl "https://ruralamfm.nominate.ai/api/v1/rates?min_rate=20&max_rate=50&limit=50"

Data Model

Rate Structure

{
    "station_callsign": "WXXX-AM",
    "ad_type": "candidate|issue|all",
    "rate_context": "in_window|out_of_window|all",
    "preemption": "non_preemptible|preemptible",
    "slot_days": "m-f|m-sat|sat-sun|all",
    "slot_start_time": "06:00",
    "slot_end_time": "10:00",
    "duration_seconds": 60,
    "gross_rate": 25.00,
    "net_rate": 21.25
}

Document Status Flow

pending → processing → completed
                    → needs_review (confidence < 0.5)
                    → failed (OCR or extraction error)

Next Steps

Immediate Priority

  1. Complete Batch Processing
  2. 4,520 documents remaining (~90 batches of 50)
  3. Estimated time: ~22 hours of processing
  4. Run command: python3 -u src/workers/document_processor.py --limit 50

  5. Review Failed Documents

  6. 122 failed documents need investigation
  7. Common issues: corrupt PDFs, scanned images with poor quality
  8. Consider manual data entry for critical stations

  9. Review Low-Confidence Extractions

  10. 59 documents marked as needs_review
  11. Build simple review UI or export for manual verification

Short-Term Improvements

  1. Parallel Processing
  2. Current: Sequential processing (1 doc at a time)
  3. Goal: Process 3-5 documents concurrently
  4. Requires: Rate limiting for API calls

  5. Retry Logic for Failed Documents

  6. Add exponential backoff for API timeouts
  7. Retry with different OCR strategy (PDF → image conversion)

  8. Accuracy Validation

  9. Compare extracted rates vs Airtable ground truth
  10. Calculate precision/recall metrics
  11. Identify systematic extraction errors

Future Enhancements

  1. Email Ingestion: Process rate cards from email attachments
  2. Change Detection: Alert when rates differ from previous version
  3. LUR Calculation: Compute Lowest Unit Rate windows automatically
  4. Reporting: Generate rate comparison reports by market/state
  5. Deduplication: Detect and merge duplicate rate entries

Configuration

Environment variables (.env):

# Anthropic Claude
CLAUDE_API_KEY=sk-ant-...
CLAUDE_MODEL=claude-sonnet-4-5-20250929

# Mistral AI
MISTRAL_API_KEY=...
MISTRAL_OCR_MODEL=mistral-ocr-latest
MISTRAL_VISION_MODEL=pixtral-large-latest

API Documentation

Interactive docs available at: - Swagger UI: https://ruralamfm.nominate.ai/docs - ReDoc: https://ruralamfm.nominate.ai/redoc - OpenAPI JSON: https://ruralamfm.nominate.ai/openapi.json

File Structure

src/
├── api/
│   ├── main.py              # FastAPI app with full documentation
│   ├── database.py          # DuckDB connection
│   ├── auth.py              # JWT authentication
│   ├── models/
│   │   ├── station.py       # Station Pydantic models
│   │   ├── rate.py          # Rate + RateCard models (CRUD, adjustments, bulk ops)
│   │   ├── document.py      # Document models
│   │   └── user.py          # User models
│   └── routes/
│       ├── stations.py      # Station endpoints
│       ├── rate_cards.py    # Rate card CRUD + bulk operations
│       ├── rates.py         # Rate CRUD + adjustment endpoints
│       ├── documents.py     # Document endpoints
│       └── auth.py          # Auth endpoints
├── workers/
│   ├── ocr_service.py       # Mistral OCR/Pixtral Vision
│   ├── rate_extractor.py    # Claude extraction
│   ├── document_processor.py # Pipeline orchestrator
│   └── csv_seeder.py        # Airtable data import
scripts/
└── create_schema.py         # DuckDB schema
docs/
├── RATE-INGESTION.md        # Business requirements
└── RATE-INGESTION-STATUS.md # This file

Running the Pipeline

Prerequisites

# Activate virtual environment
source ~/.pyenv/versions/nominates/bin/activate

# Verify API keys are set
python3 -c "from scripts.config import config; print('Claude:', bool(config.get('CLAUDE_API_KEY'))); print('Mistral:', bool(config.get('MISTRAL_API_KEY')))"

Process Documents

# Process 50 documents (recommended batch size)
python3 -u src/workers/document_processor.py --limit 50

# Monitor progress
watch -n 60 "python3 -c \"import duckdb; c=duckdb.connect('db/cbradio.db'); print('Rates:', c.execute('SELECT COUNT(*) FROM rate').fetchone()[0]); print('Completed:', c.execute(\\\"SELECT COUNT(*) FROM document WHERE extraction_status='completed'\\\").fetchone()[0])\""

Check Status

python3 -c "
import duckdb
conn = duckdb.connect('db/cbradio.db', read_only=True)
print('Total Rates:', conn.execute('SELECT COUNT(*) FROM rate').fetchone()[0])
print('Completed Docs:', conn.execute(\"SELECT COUNT(*) FROM document WHERE extraction_status='completed'\").fetchone()[0])
print('Pending Docs:', conn.execute(\"SELECT COUNT(*) FROM document WHERE extraction_status='pending'\").fetchone()[0])
"