Autonomous Narrative Tracing Pipeline
A planned end-to-end system that detects narrative spikes, traces provenance, and publishes investigation reports from a Kafka-centered microservice architecture.
Overview
The repo defines an event-driven architecture where five ingestion services (Reddit, NewsAPI, RSS, GDELT, C-SPAN) publish normalized raw documents to Kafka topics. A Flink processor consumes raw.* topics, deduplicates and cleans content, extracts entities, generates 384-dim embeddings, and emits both processed documents and anomaly alerts.
A storage worker design writes processed data into PostgreSQL/pgvector (semantic search), Elasticsearch (full-text search), Neo4j (spread graph), and Redis (cache/state). A LangChain ReAct agent is specified to consume anomalies, run semantic + graph + text retrieval tools, and synthesize markdown investigation reports with GPT-4o.
The backend API is documented as a FastAPI layer with JWT auth, paginated investigation endpoints, search endpoints, graph endpoints, and WebSocket broadcast events for anomaly/investigation lifecycle updates. The frontend is documented as a React + TypeScript SPA using React Query, Zustand, Sigma.js, and Recharts for live feed, investigation detail, and graph views.
The project includes extensive operational documentation for Kafka partitioning/retention, Kubernetes deployment patterns, Terraform modules, ArgoCD GitOps flow, observability dashboards, and a phased 5-6 month implementation roadmap.
Current repo state is documentation-first: architecture, schemas, runbooks, and roadmap are present; implementation files are largely not checked in.
How It Works
1) Scrapers poll/stream Reddit, RSS, NewsAPI, GDELT, and C-SPAN.
2) Each scraper publishes normalized raw events to Kafka (raw.reddit, raw.news, raw.gdelt, raw.speeches).
3) Flink consumes raw streams, deduplicates, cleans text, extracts entities, creates embeddings, and computes 10-minute anomaly windows vs 7-day baselines.
4) Flink emits documents.processed and anomalies.detected.
5) Storage worker consumes documents.processed and writes to Postgres/pgvector, Elasticsearch, Neo4j, and Redis.
6) Agent consumes anomalies.detected, executes retrieval tools (semantic_search, graph_trace, full_text_search, get_source_profile), then calls synthesize_report.
7) Agent publishes completed reports to investigations.complete.
8) FastAPI consumes/serves investigation data via REST + WebSocket.
9) React frontend renders live feed, timeline, investigation report, and spread graph.Architecture
rhetoriq/
services/
ingestion/
reddit-scraper -> Kafka: raw.reddit
newsapi-scraper -> Kafka: raw.news
rss-scraper -> Kafka: raw.news
gdelt-scraper -> Kafka: raw.gdelt
cspan-scraper -> Kafka: raw.speeches
processing/
flink-processor <- raw.* ; -> documents.processed, anomalies.detected
storage/
storage-worker <- documents.processed
-> PostgreSQL/pgvector
-> Elasticsearch
-> Neo4j
-> Redis
analysis/
investigation-agent <- anomalies.detected
-> investigations.complete
api/
fastapi-backend <- investigations.complete
-> REST + WebSocket
frontend/
react-dashboard <- FastAPI REST/WSDatasets
Reddit API (PRAW)
Streams monitored political subreddit submissions into raw.reddit for early narrative detection.
Open datasetGDELT GKG/Event updates
15-minute global news event feed used for high-volume narrative and theme signals in raw.gdelt.
Open datasetNewsAPI
Supplemental mainstream political article feed for raw.news.
Open datasetOutlet RSS feeds (NYT, WaPo, Fox, Reuters, BBC, Breitbart, The Hill, Politico)
Direct outlet feeds for politically diverse article ingestion into raw.news.
Open datasetC-SPAN API transcripts
Speech/hearing transcript source for formal political adoption signals in raw.speeches.
Open datasetSetup
Prerequisites
- Docker Desktop
- Python 3.11+
- Node.js 18+
- kubectl
- Terraform CLI
Installation
git clone https://github.com/yourusername/rhetoriq.git
cd rhetoriq
cp .env.example .env
# fill required keys
docker-compose up -d
# create Kafka topics per KAFKA.mdEnvironment
Configure keys and URLs documented across README (3).md, SERVICES.md, and BACKEND (1).md, including:
OPENAI_API_KEY
REDDIT_CLIENT_ID
REDDIT_CLIENT_SECRET
NEWS_API_KEY
CSPAN_API_KEY
KAFKA_BOOTSTRAP_SERVERS
POSTGRES_URL
ELASTICSEARCH_URL
NEO4J_URI
NEO4J_PASSWORD
REDIS_URL
VITE_API_URL
VITE_WS_URLConnect Services
Local service endpoints documented:
API: http://localhost:8000/api/v1
WebSocket: ws://localhost:8000/ws
Frontend: http://localhost:3000
Kafka UI: http://localhost:8080Model Setup
HuggingFace models referenced for local setup:
- sentence-transformers/all-MiniLM-L6-v2
- dslim/bert-base-NER
(roadmap indicates pre-downloading these before Flink processing).Run Services
cd backend/scrapers && python run_all.py
cd backend/processors && python flink_job.py
cd backend/processors && python storage_worker.py
cd backend/agent && python agent.py
cd backend/api && uvicorn main:app --reload --port 8000
cd frontend && npm install && npm run devDecision Engine
Flink runs anomaly detection in 10-minute tumbling windows, comparing phrase frequency against a rolling 7-day baseline; if frequency exceeds threshold (default 3.0x), it emits anomalies.detected. The agent then runs a bounded ReAct loop (max 10 iterations/tool calls) over retrieval tools before synthesizing a report.
State Snapshot (Input)
{
"anomaly_id": "a1b2c3d4",
"phrase": "climate lockdowns",
"spike_magnitude": 4.7,
"window_frequency": 847,
"baseline_frequency": 180,
"top_sources": [
{"source": "reddit", "subreddit": "conspiracy", "count": 312}
]
}Structured Action (Output)
{
"investigation_id": "a1b2c3d4",
"phrase": "climate lockdowns",
"duration_seconds": 47,
"origin": {
"source": "reddit",
"outlet_or_subreddit": "r/conspiracy",
"confidence": 0.87
},
"spread_path": [
{"stage": 1, "source": "reddit", "outlet_or_subreddit": "r/conspiracy"},
{"stage": 2, "source": "rss", "outlet_or_subreddit": "Breitbart"}
],
"pattern_classification": "grassroots",
"report": "## Narrative: climate lockdowns ..."
}Investigation decisions and outputs are logged to keep system behavior inspectable and explainable.
Decision Triggers
- Phrase frequency in current 10-minute window exceeds configured spike threshold vs rolling 7-day baseline (ANOMALY_SPIKE_THRESHOLD, default 3.0).
- Agent invocation is triggered by new messages on Kafka topic anomalies.detected.
Opponent Modeling
- Source profiling enriches nodes with type, political lean, audience size estimate, and document counts to characterize amplification roles.
- Spread-path and key-amplifier ranking model narrative propagation through AMPLIFIED graph relationships in Neo4j.
- Investigation state/status is tracked in Redis and Postgres (detected, investigating, complete, failed).
Metrics
Investigation duration
Example documented: 47 seconds per completed investigation response.
Anomaly threshold
Default 3.0x baseline over 10-minute window.
Agent tool-call budget
Max 10 iterations/tool calls per investigation.
API cache latency goal
Documented target: keep common API queries under 100ms via Redis caching.
Per-investigation LLM cost estimate
$0.035 per investigation (doc estimate).
Infrastructure cost estimate
$500/month on AWS (doc estimate).
Disclaimer
This project page reflects a documentation-defined system architecture and phased implementation roadmap. The current repository snapshot is primarily design, schema, and operations documentation; most production service code is planned but not fully checked in.