Back to Projects
RhetoriQ cover

APRIL 2026

Autonomous Narrative Tracing Pipeline

A planned end-to-end system that detects narrative spikes, traces provenance, and publishes investigation reports from a Kafka-centered microservice architecture.

Overview

The repo defines an event-driven architecture where five ingestion services (Reddit, NewsAPI, RSS, GDELT, C-SPAN) publish normalized raw documents to Kafka topics. A Flink processor consumes raw.* topics, deduplicates and cleans content, extracts entities, generates 384-dim embeddings, and emits both processed documents and anomaly alerts.

A storage worker design writes processed data into PostgreSQL/pgvector (semantic search), Elasticsearch (full-text search), Neo4j (spread graph), and Redis (cache/state). A LangChain ReAct agent is specified to consume anomalies, run semantic + graph + text retrieval tools, and synthesize markdown investigation reports with GPT-4o.

The backend API is documented as a FastAPI layer with JWT auth, paginated investigation endpoints, search endpoints, graph endpoints, and WebSocket broadcast events for anomaly/investigation lifecycle updates. The frontend is documented as a React + TypeScript SPA using React Query, Zustand, Sigma.js, and Recharts for live feed, investigation detail, and graph views.

The project includes extensive operational documentation for Kafka partitioning/retention, Kubernetes deployment patterns, Terraform modules, ArgoCD GitOps flow, observability dashboards, and a phased 5-6 month implementation roadmap.

Current repo state is documentation-first: architecture, schemas, runbooks, and roadmap are present; implementation files are largely not checked in.

How It Works

1) Scrapers poll/stream Reddit, RSS, NewsAPI, GDELT, and C-SPAN.
2) Each scraper publishes normalized raw events to Kafka (raw.reddit, raw.news, raw.gdelt, raw.speeches).
3) Flink consumes raw streams, deduplicates, cleans text, extracts entities, creates embeddings, and computes 10-minute anomaly windows vs 7-day baselines.
4) Flink emits documents.processed and anomalies.detected.
5) Storage worker consumes documents.processed and writes to Postgres/pgvector, Elasticsearch, Neo4j, and Redis.
6) Agent consumes anomalies.detected, executes retrieval tools (semantic_search, graph_trace, full_text_search, get_source_profile), then calls synthesize_report.
7) Agent publishes completed reports to investigations.complete.
8) FastAPI consumes/serves investigation data via REST + WebSocket.
9) React frontend renders live feed, timeline, investigation report, and spread graph.

Architecture

rhetoriq/
services/
  ingestion/
    reddit-scraper         -> Kafka: raw.reddit
    newsapi-scraper        -> Kafka: raw.news
    rss-scraper            -> Kafka: raw.news
    gdelt-scraper          -> Kafka: raw.gdelt
    cspan-scraper          -> Kafka: raw.speeches
  processing/
    flink-processor        <- raw.* ; -> documents.processed, anomalies.detected
  storage/
    storage-worker         <- documents.processed
                              -> PostgreSQL/pgvector
                              -> Elasticsearch
                              -> Neo4j
                              -> Redis
  analysis/
    investigation-agent    <- anomalies.detected
                              -> investigations.complete
  api/
    fastapi-backend        <- investigations.complete
                              -> REST + WebSocket
  frontend/
    react-dashboard        <- FastAPI REST/WS

Datasets

Reddit API (PRAW)

Streams monitored political subreddit submissions into raw.reddit for early narrative detection.

Open dataset

GDELT GKG/Event updates

15-minute global news event feed used for high-volume narrative and theme signals in raw.gdelt.

Open dataset

NewsAPI

Supplemental mainstream political article feed for raw.news.

Open dataset

Outlet RSS feeds (NYT, WaPo, Fox, Reuters, BBC, Breitbart, The Hill, Politico)

Direct outlet feeds for politically diverse article ingestion into raw.news.

Open dataset

C-SPAN API transcripts

Speech/hearing transcript source for formal political adoption signals in raw.speeches.

Open dataset

Setup

Prerequisites

  • Docker Desktop
  • Python 3.11+
  • Node.js 18+
  • kubectl
  • Terraform CLI

Installation

git clone https://github.com/yourusername/rhetoriq.git
cd rhetoriq
cp .env.example .env
# fill required keys
docker-compose up -d
# create Kafka topics per KAFKA.md

Environment

Configure keys and URLs documented across README (3).md, SERVICES.md, and BACKEND (1).md, including:
OPENAI_API_KEY
REDDIT_CLIENT_ID
REDDIT_CLIENT_SECRET
NEWS_API_KEY
CSPAN_API_KEY
KAFKA_BOOTSTRAP_SERVERS
POSTGRES_URL
ELASTICSEARCH_URL
NEO4J_URI
NEO4J_PASSWORD
REDIS_URL
VITE_API_URL
VITE_WS_URL

Connect Services

Local service endpoints documented:
API: http://localhost:8000/api/v1
WebSocket: ws://localhost:8000/ws
Frontend: http://localhost:3000
Kafka UI: http://localhost:8080

Model Setup

HuggingFace models referenced for local setup:
- sentence-transformers/all-MiniLM-L6-v2
- dslim/bert-base-NER
(roadmap indicates pre-downloading these before Flink processing).

Run Services

cd backend/scrapers && python run_all.py
cd backend/processors && python flink_job.py
cd backend/processors && python storage_worker.py
cd backend/agent && python agent.py
cd backend/api && uvicorn main:app --reload --port 8000
cd frontend && npm install && npm run dev

Decision Engine

Flink runs anomaly detection in 10-minute tumbling windows, comparing phrase frequency against a rolling 7-day baseline; if frequency exceeds threshold (default 3.0x), it emits anomalies.detected. The agent then runs a bounded ReAct loop (max 10 iterations/tool calls) over retrieval tools before synthesizing a report.

State Snapshot (Input)

{
  "anomaly_id": "a1b2c3d4",
  "phrase": "climate lockdowns",
  "spike_magnitude": 4.7,
  "window_frequency": 847,
  "baseline_frequency": 180,
  "top_sources": [
    {"source": "reddit", "subreddit": "conspiracy", "count": 312}
  ]
}

Structured Action (Output)

{
  "investigation_id": "a1b2c3d4",
  "phrase": "climate lockdowns",
  "duration_seconds": 47,
  "origin": {
    "source": "reddit",
    "outlet_or_subreddit": "r/conspiracy",
    "confidence": 0.87
  },
  "spread_path": [
    {"stage": 1, "source": "reddit", "outlet_or_subreddit": "r/conspiracy"},
    {"stage": 2, "source": "rss", "outlet_or_subreddit": "Breitbart"}
  ],
  "pattern_classification": "grassroots",
  "report": "## Narrative: climate lockdowns ..."
}

Decision Triggers

  • Phrase frequency in current 10-minute window exceeds configured spike threshold vs rolling 7-day baseline (ANOMALY_SPIKE_THRESHOLD, default 3.0).
  • Agent invocation is triggered by new messages on Kafka topic anomalies.detected.

Opponent Modeling

  • Source profiling enriches nodes with type, political lean, audience size estimate, and document counts to characterize amplification roles.
  • Spread-path and key-amplifier ranking model narrative propagation through AMPLIFIED graph relationships in Neo4j.
  • Investigation state/status is tracked in Redis and Postgres (detected, investigating, complete, failed).

Metrics

Investigation duration

Example documented: 47 seconds per completed investigation response.

Anomaly threshold

Default 3.0x baseline over 10-minute window.

Agent tool-call budget

Max 10 iterations/tool calls per investigation.

API cache latency goal

Documented target: keep common API queries under 100ms via Redis caching.

Per-investigation LLM cost estimate

$0.035 per investigation (doc estimate).

Infrastructure cost estimate

$500/month on AWS (doc estimate).

Disclaimer

This project page reflects a documentation-defined system architecture and phased implementation roadmap. The current repository snapshot is primarily design, schema, and operations documentation; most production service code is planned but not fully checked in.

NIHAD.PROTOCOL
Type 'help' to begin.
TYPE 'HELP'