Lazarus PipelineV2 Migration Plan¶
Overview¶
Migrate the cbintel lazarus worker from the slow Python cdx_toolkit implementation to the faster PipelineV2 (bash/C) pipeline.
Problem Statement¶
Current lazarus jobs timeout because: 1. gau (URL discovery) - queries Wayback Machine API, takes 60-300s 2. cdx_toolkit queries - per-URL queries to archive APIs, 30-60s each 3. Sequential processing - no parallelization
Result: 0% success rate in stress tests (all jobs timeout at 60s)
Solution: PipelineV2¶
Located at: extern/lazarus/pipeline-v2/
Components¶
pipeline-v2/
├── process.sh # Main orchestrator (bash)
├── retriever.py # Bulk CDX/WARC fetcher
├── warc_parser # C binary - fast WARC extraction
├── soup.py # HTML to markdown converter
├── linkclean.sh # Link extraction
└── data/ # Output sessions
Adapter Already Exists¶
# extern/lazarus/src/lazarus/pipeline_v2_adapter.py
class PipelineV2Adapter:
def run_pipeline(url_query, max_urls, timeout) -> Path
def iter_records(session_dir) -> Iterator[PipelineV2Record]
def retrieve_and_process(url_query, max_urls, timeout) -> Iterator[PipelineV2Record]
Output Format (PipelineV2Record)¶
@dataclass
class PipelineV2Record:
record_id: str
url: str
timestamp: str
markdown_content: str # Extracted text as markdown
links: List[str] # Outbound links
headers: Dict # WARC headers
raw_file: Path # Original content
content_file: Path # Processed content
Implementation Plan¶
Phase 1: Copy Adapter to cbintel¶
# Copy the adapter
cp extern/lazarus/src/lazarus/pipeline_v2_adapter.py \
src/cbintel/lazarus/pipeline_v2.py
Update imports and paths for cbintel structure.
Phase 2: Update LazarusWorker¶
File: src/cbintel/jobs/workers/lazarus.py
# Add import
from cbintel.lazarus.pipeline_v2 import PipelineV2Adapter, PipelineV2Record
class LazarusWorker(BaseWorker):
async def process(self, job_data, progress) -> JobResult:
request = job_data["request"]
use_v2 = request.get("use_pipeline_v2", True) # Default to V2
if use_v2:
return await self._process_v2(job_data, progress)
else:
return await self._process_legacy(job_data, progress)
async def _process_v2(self, job_data, progress) -> JobResult:
"""Fast pipeline using PipelineV2"""
domain = job_data["request"]["domain"]
max_urls = job_data["request"].get("sample_size", 100)
timeout = job_data["request"].get("timeout", 300)
# Initialize adapter
pipeline_dir = Path(__file__).parent.parent.parent.parent.parent / "extern/lazarus/pipeline-v2"
adapter = PipelineV2Adapter(str(pipeline_dir))
await progress.update("retrieving", message=f"Running PipelineV2 for {domain}...")
# Run in thread pool (subprocess is blocking)
loop = asyncio.get_event_loop()
session_dir = await loop.run_in_executor(
None,
lambda: adapter.run_pipeline(f"{domain}/*", max_urls, timeout)
)
# Process records
records = []
for record in adapter.iter_records(session_dir):
records.append({
"url": record.url,
"timestamp": record.timestamp,
"content_preview": record.markdown_content[:500],
"links_count": len(record.links),
})
# Save and upload results
# ... (similar to current implementation)
Phase 3: Update Request Model¶
File: src/cbintel/jobs/models.py (or wherever LazarusRequest is defined)
class LazarusRequest(BaseModel):
domain: str
providers: list[str] = ["wayback"]
sample_size: int = 100
use_pipeline_v2: bool = True # NEW: default to fast pipeline
timeout: int = 300 # NEW: configurable timeout
Phase 4: Ensure Dependencies¶
The PipelineV2 requires:
- warc_parser binary (already compiled at extern/lazarus/pipeline-v2/warc_parser)
- lynx for link extraction
- html2markdown for conversion
Phase 5: Test¶
# Direct test
cd extern/lazarus/pipeline-v2
./process.sh "example.com/*" 10
# Via cbintel
curl -X POST "https://intel.nominate.ai/api/v1/jobs/lazarus" \
-H "Content-Type: application/json" \
-d '{"domain": "example.com", "sample_size": 10, "use_pipeline_v2": true}'
Output Mapping¶
Map PipelineV2 output to current lazarus output format:
| Current Output | PipelineV2 Source |
|---|---|
discovery.json |
Session stats |
snapshots.json |
iter_records() data |
url |
record.url |
timestamp |
record.timestamp |
content |
record.markdown_content |
Risks & Mitigations¶
- Path dependencies - PipelineV2 uses relative paths
-
Mitigation: Use absolute paths in adapter
-
Missing binaries - warc_parser might not be compiled
-
Mitigation: Add build step or include pre-compiled binary
-
Output format differences - V2 outputs markdown, not HTML
- Mitigation: This is actually better for downstream processing
Success Criteria¶
- Lazarus jobs complete in <60s for small domains
-
80% success rate in stress tests
- Output contains URL, timestamp, and content
Files to Modify¶
src/cbintel/lazarus/pipeline_v2.py(NEW - copy from extern)src/cbintel/lazarus/__init__.py(add export)src/cbintel/jobs/workers/lazarus.py(add V2 processing)src/cbintel/jobs/models.py(add request fields)
Session Notes (2026-01-06)¶
What We Learned¶
- Current lazarus uses
gau+cdx_toolkitwhich is slow - PipelineV2 exists in
extern/lazarus/pipeline-v2/ - Adapter exists at
extern/lazarus/src/lazarus/pipeline_v2_adapter.py - Timeouts reduced to 60s but still not enough for web archive APIs
Implementation Completed ✓¶
Commit: 4920878
Changes made:
1. Created src/cbintel/lazarus/pipeline_v2.py - adapted from extern adapter
2. Updated src/cbintel/lazarus/__init__.py - exports PipelineV2Adapter
3. Updated src/cbintel/jobs/models.py - added use_pipeline_v2 and timeout fields
4. Updated src/cbintel/jobs/workers/lazarus.py - dual-mode processing (v2 default)
Dependencies installed:
- html2text - for markdown conversion in pipeline
- Updated extern/lazarus/pipeline-v2/process.sh to use html2text instead of html2markdown
Results¶
Before PipelineV2: - Lazarus: 0% success rate (all jobs timing out at 300s+)
After PipelineV2: - Lazarus: 100% success rate - Average job time: 10-26s - Min job time: 8s
Full stress test results (5 min, 3 workers): - fetch: 100% (7/7) - screenshots: 100% (4/4) - lazarus: 100% (4/4) - vectl: 75% (¾) - crawl: 67% (4/6) - Overall: 88% success rate