Rate Ingestion System - Implementation Status¶
Last Updated: December 20, 2025
Overview¶
The Rate Ingestion System extracts structured rate data from radio station rate cards (PDFs, images) using AI-powered OCR and semantic extraction.
Architecture¶
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Source Files │────▶│ Mistral OCR │────▶│ Claude │
│ (PDF/PNG/JPG) │ │ /v1/ocr │ │ Extraction │
└─────────────────┘ └──────────────────┘ └────────┬────────┘
│
┌─────────────────┐ ┌──────────────────┐ ▼
│ FastAPI │◀────│ DuckDB │◀────┌─────────────────┐
│ REST API │ │ Database │ │ Validation │
└─────────────────┘ └──────────────────┘ └─────────────────┘
Current Status¶
Processing Progress¶
| Metric | Count | Notes |
|---|---|---|
| Total Rates | 4,946 | 376% increase from baseline |
| - From Airtable (baseline) | 1,040 | Original imported data |
| - From AI Extraction | 3,906 | Extracted this session |
| Total Documents | 5,085 | All rate card files |
| Documents Completed | 383 | 7.5% processed |
| Documents Pending | 4,520 | Remaining to process |
| Documents Failed | 122 | OCR or extraction errors |
| Needs Review | 59 | Low confidence extractions |
| Total Stations | 1,908 | Radio stations in network |
Extraction Performance¶
| Metric | Value |
|---|---|
| Avg Rates per Document | 10.5 |
| Image Success Rate | ~95% (Pixtral) |
| PDF Success Rate | ~65% (Mistral OCR) |
| Avg Confidence Score | 0.75-0.95 |
| Processing Speed | ~50 docs / 15 min |
Components¶
1. OCR Service (src/workers/ocr_service.py)¶
Mistral OCR (PDFs):
- Endpoint: POST https://api.mistral.ai/v1/ocr
- Model: mistral-ocr-latest
- Returns: Markdown-formatted text with table preservation
Pixtral Vision (Images):
- Endpoint: POST https://api.mistral.ai/v1/chat/completions
- Model: pixtral-large-latest
- Supports: PNG, JPG, JPEG
2. Rate Extractor (src/workers/rate_extractor.py)¶
- Uses Claude API to extract structured JSON from OCR text
- Outputs: Station, ad type, time slots, durations, gross/net rates
- Validates and normalizes all extracted data
3. Document Processor (src/workers/document_processor.py)¶
Orchestrates the pipeline:
# Process pending documents (recommended batch size: 50)
python3 -u src/workers/document_processor.py --limit 50
# Process specific document
python3 src/workers/document_processor.py --document-id <uuid>
4. REST API (src/api/)¶
Station Endpoints¶
| Method | Endpoint | Description |
|---|---|---|
GET |
/api/v1/stations |
List stations with filters |
GET |
/api/v1/stations/{callsign} |
Station details |
GET |
/api/v1/stations/{callsign}/rates |
Station rates |
Rate Card CRUD Endpoints¶
Rate cards group multiple rates for a station/year. All write operations require JWT authentication.
| Method | Endpoint | Description |
|---|---|---|
GET |
/api/v1/rate-cards |
List rate cards with filters |
GET |
/api/v1/rate-cards/{id} |
Get rate card with embedded rates |
POST |
/api/v1/rate-cards |
Create rate card with nested rates |
PATCH |
/api/v1/rate-cards/{id} |
Partial update metadata |
DELETE |
/api/v1/rate-cards/{id} |
Delete rate card (cascades to rates) |
Bulk Rate Operations¶
| Method | Endpoint | Description |
|---|---|---|
POST |
/api/v1/rate-cards/{id}/rates |
Add multiple rates to a card |
PATCH |
/api/v1/rate-cards/{id}/rates |
Bulk update rates by ID |
DELETE |
/api/v1/rate-cards/{id}/rates |
Bulk delete rates by ID |
Individual Rate CRUD Endpoints¶
| Method | Endpoint | Description |
|---|---|---|
GET |
/api/v1/rates |
Query rates with filters |
GET |
/api/v1/rates/{id} |
Get single rate |
POST |
/api/v1/rates |
Create single rate |
PUT |
/api/v1/rates/{id} |
Replace rate entirely |
PATCH |
/api/v1/rates/{id} |
Partial update rate |
DELETE |
/api/v1/rates/{id} |
Delete rate |
Specialized Adjustment Endpoints (AI Correction Workflow)¶
These endpoints allow targeted updates to specific rate fields without affecting others:
| Method | Endpoint | Description |
|---|---|---|
PATCH |
/api/v1/rates/{id}/pricing |
Adjust gross/net rates only |
PATCH |
/api/v1/rates/{id}/time-slot |
Adjust time slot fields only |
PATCH |
/api/v1/rates/{id}/classification |
Adjust ad_type, preemption only |
Rate Analytics Endpoints¶
| Method | Endpoint | Description |
|---|---|---|
GET |
/api/v1/rates/summary |
Aggregate statistics |
GET |
/api/v1/rates/by-state |
Rates grouped by state |
GET |
/api/v1/rates/by-ad-type |
Rates grouped by ad type |
GET |
/api/v1/rates/by-duration |
Rates grouped by duration |
GET |
/api/v1/rates/years |
Available rate card years |
GET |
/api/v1/rates/count |
Count rates with filters |
Document Endpoints¶
| Method | Endpoint | Description |
|---|---|---|
GET |
/api/v1/documents |
List documents |
GET |
/api/v1/documents/pending |
Processing queue |
API Features¶
Auto-Calculation¶
When creating or updating rates, if only one of gross_rate or net_rate is provided:
- net_rate = gross_rate * 0.85 (15% commission)
- gross_rate = net_rate / 0.85
Station Denormalization¶
When creating rates, station_callsign and station_state are automatically populated from the station record for query efficiency.
Atomic Operations¶
POST /rate-cardswith nested rates creates all records in a single transaction- Bulk create operations are atomic (all-or-nothing)
- Bulk update/delete allows partial success with error reporting
API Usage Examples¶
Create Rate Card with Nested Rates¶
curl -X POST "https://ruralamfm.nominate.ai/api/v1/rate-cards" \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"station_id": "abc-123-uuid",
"year": 2025,
"rate_type": "Political",
"rates": [
{
"effective_date": "2025-01-01",
"ad_type": "candidate",
"duration_seconds": 60,
"gross_rate": 45.00,
"slot_days": "m-f",
"slot_start_time": "06:00",
"slot_end_time": "10:00"
},
{
"effective_date": "2025-01-01",
"ad_type": "issue",
"duration_seconds": 30,
"gross_rate": 25.00,
"slot_days": "m-f"
}
]
}'
Correct Rate Pricing (AI Workflow)¶
# Fix gross rate - net_rate auto-calculated as 42.50
curl -X PATCH "https://ruralamfm.nominate.ai/api/v1/rates/rate-123/pricing" \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{"gross_rate": 50.00}'
Correct Time Slot¶
curl -X PATCH "https://ruralamfm.nominate.ai/api/v1/rates/rate-123/time-slot" \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"slot_days": "m-sat",
"slot_start_time": "05:00",
"slot_end_time": "09:00"
}'
Bulk Add Rates to Existing Card¶
curl -X POST "https://ruralamfm.nominate.ai/api/v1/rate-cards/card-123/rates" \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"rates": [
{"effective_date": "2025-01-01", "ad_type": "candidate", "duration_seconds": 15, "gross_rate": 15.00},
{"effective_date": "2025-01-01", "ad_type": "candidate", "duration_seconds": 90, "gross_rate": 65.00}
]
}'
Query Rates with Filters¶
# Get Texas candidate rates for 60-second spots
curl "https://ruralamfm.nominate.ai/api/v1/rates?state=TX&ad_type=candidate&duration=60"
# Get rates between $20-$50
curl "https://ruralamfm.nominate.ai/api/v1/rates?min_rate=20&max_rate=50&limit=50"
Data Model¶
Rate Structure¶
{
"station_callsign": "WXXX-AM",
"ad_type": "candidate|issue|all",
"rate_context": "in_window|out_of_window|all",
"preemption": "non_preemptible|preemptible",
"slot_days": "m-f|m-sat|sat-sun|all",
"slot_start_time": "06:00",
"slot_end_time": "10:00",
"duration_seconds": 60,
"gross_rate": 25.00,
"net_rate": 21.25
}
Document Status Flow¶
pending → processing → completed
→ needs_review (confidence < 0.5)
→ failed (OCR or extraction error)
Next Steps¶
Immediate Priority¶
- Complete Batch Processing
- 4,520 documents remaining (~90 batches of 50)
- Estimated time: ~22 hours of processing
-
Run command:
python3 -u src/workers/document_processor.py --limit 50 -
Review Failed Documents
- 122 failed documents need investigation
- Common issues: corrupt PDFs, scanned images with poor quality
-
Consider manual data entry for critical stations
-
Review Low-Confidence Extractions
- 59 documents marked as needs_review
- Build simple review UI or export for manual verification
Short-Term Improvements¶
- Parallel Processing
- Current: Sequential processing (1 doc at a time)
- Goal: Process 3-5 documents concurrently
-
Requires: Rate limiting for API calls
-
Retry Logic for Failed Documents
- Add exponential backoff for API timeouts
-
Retry with different OCR strategy (PDF → image conversion)
-
Accuracy Validation
- Compare extracted rates vs Airtable ground truth
- Calculate precision/recall metrics
- Identify systematic extraction errors
Future Enhancements¶
- Email Ingestion: Process rate cards from email attachments
- Change Detection: Alert when rates differ from previous version
- LUR Calculation: Compute Lowest Unit Rate windows automatically
- Reporting: Generate rate comparison reports by market/state
- Deduplication: Detect and merge duplicate rate entries
Configuration¶
Environment variables (.env):
# Anthropic Claude
CLAUDE_API_KEY=sk-ant-...
CLAUDE_MODEL=claude-sonnet-4-5-20250929
# Mistral AI
MISTRAL_API_KEY=...
MISTRAL_OCR_MODEL=mistral-ocr-latest
MISTRAL_VISION_MODEL=pixtral-large-latest
API Documentation¶
Interactive docs available at: - Swagger UI: https://ruralamfm.nominate.ai/docs - ReDoc: https://ruralamfm.nominate.ai/redoc - OpenAPI JSON: https://ruralamfm.nominate.ai/openapi.json
File Structure¶
src/
├── api/
│ ├── main.py # FastAPI app with full documentation
│ ├── database.py # DuckDB connection
│ ├── auth.py # JWT authentication
│ ├── models/
│ │ ├── station.py # Station Pydantic models
│ │ ├── rate.py # Rate + RateCard models (CRUD, adjustments, bulk ops)
│ │ ├── document.py # Document models
│ │ └── user.py # User models
│ └── routes/
│ ├── stations.py # Station endpoints
│ ├── rate_cards.py # Rate card CRUD + bulk operations
│ ├── rates.py # Rate CRUD + adjustment endpoints
│ ├── documents.py # Document endpoints
│ └── auth.py # Auth endpoints
├── workers/
│ ├── ocr_service.py # Mistral OCR/Pixtral Vision
│ ├── rate_extractor.py # Claude extraction
│ ├── document_processor.py # Pipeline orchestrator
│ └── csv_seeder.py # Airtable data import
scripts/
└── create_schema.py # DuckDB schema
docs/
├── RATE-INGESTION.md # Business requirements
└── RATE-INGESTION-STATUS.md # This file
Running the Pipeline¶
Prerequisites¶
# Activate virtual environment
source ~/.pyenv/versions/nominates/bin/activate
# Verify API keys are set
python3 -c "from scripts.config import config; print('Claude:', bool(config.get('CLAUDE_API_KEY'))); print('Mistral:', bool(config.get('MISTRAL_API_KEY')))"
Process Documents¶
# Process 50 documents (recommended batch size)
python3 -u src/workers/document_processor.py --limit 50
# Monitor progress
watch -n 60 "python3 -c \"import duckdb; c=duckdb.connect('db/cbradio.db'); print('Rates:', c.execute('SELECT COUNT(*) FROM rate').fetchone()[0]); print('Completed:', c.execute(\\\"SELECT COUNT(*) FROM document WHERE extraction_status='completed'\\\").fetchone()[0])\""
Check Status¶
python3 -c "
import duckdb
conn = duckdb.connect('db/cbradio.db', read_only=True)
print('Total Rates:', conn.execute('SELECT COUNT(*) FROM rate').fetchone()[0])
print('Completed Docs:', conn.execute(\"SELECT COUNT(*) FROM document WHERE extraction_status='completed'\").fetchone()[0])
print('Pending Docs:', conn.execute(\"SELECT COUNT(*) FROM document WHERE extraction_status='pending'\").fetchone()[0])
"