ChaBo_README / orchestrator.py
ppsingh's picture
adding other components
944aab6
orchestratorText = """
# Chabo Orchestrator Documentation
### Table of Contents
1. Overview
2. System Architecture
3. Components
4. Configuration
5. Deployment Guide
6. API Reference
7. Usage Examples
8. Troubleshooting
## Overview
The Chabo Orchestrator is the central coordination module of the Chabo RAG system. \
It orchestrates the flow between multiple microservices to provide intelligent \
document processing and question-answering capabilities. The system is designed for deployment on Huggingface Spaces.
### Key Features:
- **Workflow Orchestration**: Uses LangGraph to manage complex processing pipelines
- **Multi-Modal Support**: Handles files dependent on ChatUI and Ingestor config (e.g. PDF, DOCX, GeoJSON, and JSON )
- **Streaming Responses**: Real-time response generation with Server-Sent Events (SSE)
- **Dual Processing Modes**:
- **Direct Output Mode**: Returns ingestor results immediately (e.g. EUDR use case)
- **Standard RAG Mode**: Full retrieval-augmented generation pipeline
- **Intelligent Caching**: Prevents redundant file processing (e.g. EUDR use case)
- **Multiple Interfaces**: FastAPI endpoints for modules; LangServe endpoints for ChatUI; Gradio UI for testing
## System Architecture
### High-Level Architecture
```
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ ChatUI β”‚
β”‚ Frontend β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚ HTTP/SSE
β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Chabo Orchestrator β”‚
β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚ β”‚ LangGraph Workflow β”‚ β”‚
β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚
β”‚ β”‚ β”‚ Detect File β”‚ β”‚ β”‚
β”‚ β”‚ β”‚ Type β”‚ β”‚ β”‚
β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚
β”‚ β”‚ β”‚ β”‚ β”‚
β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚
β”‚ β”‚ β”‚ Ingest File β”‚ β”‚ β”‚
β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚
β”‚ β”‚ β”‚ β”‚ β”‚
β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β” β”‚ β”‚
β”‚ β”‚ β”‚ β”‚ β”‚ β”‚
β”‚ β”‚ β”Œβ”€β”€β–Όβ”€β”€β”€β” β”Œβ”€β”€β”€β”€β–Όβ”€β”€β”€β” β”‚ β”‚
β”‚ β”‚ β”‚Directβ”‚ β”‚Retrieveβ”‚ β”‚ β”‚
β”‚ β”‚ β”‚Outputβ”‚ β”‚Context β”‚ β”‚ β”‚
β”‚ β”‚ β””β”€β”€β”¬β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”¬β”€β”€β”€β”˜ β”‚ β”‚
β”‚ β”‚ β”‚ β”‚ β”‚ β”‚
β”‚ β”‚ β”‚ β”Œβ”€β”€β”€β”€β–Όβ”€β”€β”€β” β”‚ β”‚
β”‚ β”‚ β”‚ β”‚Generateβ”‚ β”‚ β”‚
β”‚ β”‚ β”‚ β”‚Responseβ”‚ β”‚ β”‚
β”‚ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚
β”‚ β””β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”˜
β”‚ β”‚ β”‚
β”Œβ”€β”€β”€β–Όβ”€β”€β” β”Œβ”€β”€β”€β–Όβ”€β”€β”€β” β”Œβ”€β”€β–Όβ”€β”€β”€β”€β”
β”‚Ingestβ”‚ β”‚Retrie-β”‚ β”‚Genera-β”‚
β”‚or β”‚ β”‚ver β”‚ β”‚tor β”‚
β””β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”˜
```
### Component Communication
All communication between modules happens over HTTP:
- **Orchestrator ↔ Ingestor**: Gradio Client (file upload, processing)
- **Orchestrator ↔ Retriever**: Gradio Client (semantic search)
- **Orchestrator ↔ Generator**: HTTP streaming (SSE for real-time responses)
- **ChatUI ↔ Orchestrator**: LangServe streaming endpoints
### Workflow Logic
The orchestrator implements two distinct workflows:
**Direct Output Workflow** (when `DIRECT_OUTPUT=True` and file is new):
```
File Upload β†’ Detect Type β†’ Ingest β†’ Direct Output β†’ Return Results
```
**Standard RAG Workflow** (default or cached files):
```
Query β†’ Retrieve Context β†’ Generate Response β†’ Stream to User
```
## Components
### 1. Main Application (`main.py`)
- LangServe endpoints for ChatUI integration
- Gradio web interface for testing
- FastAPI endpoints for diagnostics and future use (e.g. /health)
- Cache management endpoint (for direct output use cases)
**Key Functions:**
- `chatui_adapter()`: Handles text-only queries
- `chatui_file_adapter()`: Handles file uploads with queries
- `create_gradio_interface()`: Test UI
### 2. Workflow Nodes (`nodes.py`)
LangGraph nodes that implement the processing pipeline:
**Node Functions:**
- `detect_file_type_node()`: Identifies file type and determines routing
- `ingest_node()`: Processes files through appropriate ingestor
- `direct_output_node()`: Returns raw ingestor results
- `retrieve_node()`: Fetches relevant context from vector store
- `generate_node_streaming()`: Streams LLM responses
- `route_workflow()`: Conditional routing logic
**Helper Functions:**
- `process_query_streaming()`: Unified streaming interface
- `compute_file_hash()`: SHA256 hashing for deduplication
- `clear_direct_output_cache()`: Cache management
### 3. Data Models (`models.py`)
Pydantic models for type validation
### 4. Retriever Adapter (`retriever_adapter.py`)
Abstraction layer for managing different retriever configurations:
- Handles authentication
- Formats queries and filters
### 5. Utilities (`utils.py`)
Helper functions
#### Conversation Context Management
The `build_conversation_context()` function manages conversation history to provide relevant context to the generator while respecting token limits and conversation flow.
**Key Features:**
- **Context Selection**: Always includes the first user and assistant messages to maintain conversation context
- **Recent Turn Limiting**: Includes only the last N complete turns (user + assistant pairs) to focus on recent conversation (default: 3)
- **Character Limit Management**: Truncates to maximum character limits to prevent context overflow
**Function Parameters:**
```python
def build_conversation_context(
messages, # List of Message objects from conversation
max_turns: int = 3, # Maximum number of recent turns to include
max_chars: int = 8000 # Maximum total characters in context
) -> str
```
## Configuration
### Configuration File (`params.cfg`)
```ini
[file_processing]
# Enable direct output mode: when True, ingestor results are returned directly
# without going through the generator. When False, all files go through full RAG pipeline.
# This also prevents ChatUI from resending the file in the conversation history with each turn
# Note: File type validation is handled by the ChatUI frontend
DIRECT_OUTPUT = True
[conversation_history]
# Limit the context window for the conversation history
MAX_TURNS = 3
MAX_CHARS = 12000
[retriever]
RETRIEVER = https://giz-chatfed-retriever0-3.hf.space/
# Optional
COLLECTION_NAME = EUDR
[generator]
GENERATOR = https://giz-eudr-chabo-generator.hf.space
[ingestor]
INGESTOR = https://giz-eudr-chabo-ingestor.hf.space
[general]
# need to include this for HF inference endpoint limits
MAX_CONTEXT_CHARS = 15000
```
### Environment Variables
Create a `.env` file with:
```bash
# Required for private HuggingFace Spaces
HF_TOKEN=hf_xxxxxxxxxxxxxxxxxxxxx
```
### ChatUI Configuration
ChatUI `DOTENV_LOCAL` example deployment configuration:
```javascript
MODELS=`[
{
"name": "asistente_eudr",
"displayName": "Asistente EUDR",
"description": "Retrieval-augmented generation on EUDR Whisp API powered by ChatFed modules.",
"instructions": {
"title": "EUDR Asistente: Instructiones",
"content": "Hola, soy Asistente EUDR, un asistente conversacional basado en inteligencia artificial diseΓ±ado para ayudarle a comprender el cumplimiento y el anΓ‘lisis del Reglamento de la UE sobre la deforestaciΓ³n. ResponderΓ© a sus preguntas utilizando los informes EUDR y los archivos GeoJSON cargados.\\n\\nπŸ’‘ **CΓ³mo utilizarlo (panel a la derecha)**\\n\\n**Modo de uso:** elija entre subir un archivo GeoJSON para su anΓ‘lisis o consultar los informes EUDR filtrados por paΓ­s.\\n\\n**Ejemplos:** seleccione entre preguntas de ejemplo seleccionadas de diferentes categorΓ­as.\\n\\n**Referencias:** consulte las fuentes de contenido utilizadas para la verificaciΓ³n de datos.\\n\\n⚠️ Para conocer las limitaciones y la informaciΓ³n sobre la recopilaciΓ³n de datos, consulte la pestaΓ±a Β«ExenciΓ³n de responsabilidadΒ»\\n\\n⚠️ Al utilizar esta aplicaciΓ³n, usted acepta que recopilemos estadΓ­sticas de uso (como preguntas formuladas, comentarios realizados, duraciΓ³n de la sesiΓ³n, tipo de dispositivo e informaciΓ³n geogrΓ‘fica anΓ³nima) para comprender el rendimiento y mejorar continuamente la herramienta, basΓ‘ndonos en nuestro interΓ©s legΓ­timo por mejorar nuestros servicios."
},
"multimodal": true,
"multimodalAcceptedMimetypes": [
"application/geojson"
],
"chatPromptTemplate": "{{#each messages}}{{#ifUser}}{{content}}{{/ifUser}}{{#ifAssistant}}{{content}}{{/ifAssistant}}{{/each}}",
"parameters": {
"temperature": 0.0,
"max_new_tokens": 2048
},
"endpoints": [{
"type": "langserve-streaming",
"url": "[https://giz-eudr-chabo-orchestrator.hf.space/chatfed-ui-stream](https://giz-eudr-chabo-orchestrator.hf.space/chatfed-ui-stream)",
"streamingFileUploadUrl": "[https://giz-eudr-chabo-orchestrator.hf.space/chatfed-with-file-stream](https://giz-eudr-chabo-orchestrator.hf.space/chatfed-with-file-stream)",
"inputKey": "text",
"fileInputKey": "files"
}]
}
]`
PUBLIC_ANNOUNCEMENT_BANNERS=`[
{
"title": "This is Chat Prototype for DSC users",
"linkTitle": "Keep it Clean"
}
]`
PUBLIC_APP_DISCLAIMER_MESSAGE="Disclaimer: AI is an area of active research with known problems such as biased generation and misinformation. Do not use this application for high-stakes decisions or advice. Do not insert your personal data, especially sensitive, like health data."
PUBLIC_APP_DESCRIPTION="Internal Chat-tool for DSC users for testing"
PUBLIC_APP_NAME="EUDR ChatUI"
ENABLE_ASSISTANTS=false
ENABLE_ASSISTANTS_RAG=false
COMMUNITY_TOOLS=false
MONGODB_URL=mongodb://localhost:27017
# Disable LLM-based title generation to prevent template queries
LLM_SUMMARIZATION=false
```
Key things to ensure here:
- multimodalAcceptedMimetypes: file types to accept for upload via ChatUI
- endpoints: orchestrator url + endpoints
## Deployment Guide
### Local Development
**Prerequisites:**
- Python 3.10+
- pip
**Steps:**
1. Clone the repository:
```bash
git clone <your-repo-url>
cd chabo-orchestrator
```
2. Install dependencies:
```bash
pip install -r requirements.txt
```
3. Configure the system:
```bash
# Create .env file
echo "HF_TOKEN=your_token_here" > .env
# Edit params.cfg with your service URLs
nano params.cfg
```
4. Run the application:
```bash
python app/main.py
```
5. Access interfaces:
- Gradio UI: http://localhost:7860/gradio
- API Docs: http://localhost:7860/docs
- Health Check: http://localhost:7860/health
### Docker Deployment
**Build the image:**
```bash
docker build -t chabo-orchestrator .
```
**Run the container:**
```bash
docker run -d --name chabo-orchestrator -p 7860:7860 chabo-orchestrator
```
### HuggingFace Spaces Deployment
**Repository Structure:**
```
your-space/
β”œβ”€β”€ app/
β”‚ β”œβ”€β”€ main.py
β”‚ β”œβ”€β”€ nodes.py
β”‚ β”œβ”€β”€ models.py
β”‚ β”œβ”€β”€ retriever_adapter.py
β”‚ └── utils.py
β”œβ”€β”€ Dockerfile
β”œβ”€β”€ requirements.txt
β”œβ”€β”€ params.cfg
└── README.md
```
**Steps:**
1. Create a new Space on HuggingFace
2. Select "Docker" as the SDK
3. Push your code to the Space repository
4. Add secrets in Space settings:
- `HF_TOKEN`: Your HuggingFace token
5. The Space will automatically build and deploy
**Important:** Ensure all service URLs in `params.cfg` are publicly accessible.
### Docker Compose (Multi-Service)
Example orchestrated deployment for the entire Chabo stack (*NOTE - docker-compose will not run on Huggingface spaces*)
```yaml
version: '3.8'
services:
orchestrator:
build: ./orchestrator
ports:
- "7860:7860"
environment:
- HF_TOKEN=${HF_TOKEN}
- RETRIEVER=http://retriever:7861
- GENERATOR=http://generator:7862
- INGESTOR=http://ingestor:7863
depends_on:
- retriever
- generator
- ingestor
retriever:
build: ./retriever
ports:
- "7861:7861"
environment:
- QDRANT_API_KEY=${QDRANT_API_KEY}
generator:
build: ./generator
ports:
- "7862:7862"
environment:
- HF_TOKEN=${HF_TOKEN}
ingestor:
build: ./ingestor
ports:
- "7863:7863"
```
## API Reference
### Endpoints
#### Health Check
```
GET /health
```
Returns service health status.
**Response:**
```json
{
"status": "healthy"
}
```
#### Root Information
```
GET /
```
Returns API metadata and available endpoints.
#### Text Query (Streaming)
```
POST /chatfed-ui-stream/stream
Content-Type: application/json
```
**Request Body:**
```json
{
"input": {
"text": "What are EUDR requirements?"
}
}
```
**Response:** Server-Sent Events stream
```
event: data
data: "The EUDR requires..."
event: sources
data: {"sources": [...]}
event: end
data: ""
```
#### File Upload Query (Streaming)
```
POST /chatfed-with-file-stream/stream
Content-Type: application/json
```
**Request Body:**
```json
{
"input": {
"text": "Analyze this GeoJSON",
"files": [
{
"name": "boundaries.geojson",
"type": "base64",
"content": "base64_encoded_content"
}
]
}
}
```
#### Clear Cache
```
POST /clear-cache
```
Clears the direct output file cache.
**Response:**
```json
{
"status": "cache cleared"
}
```
### Gradio Interface
#### Interactive Query
Gradio's default API endpoint for UI interactions. If running on huggingface spaces, access via: https://[ORG_NAME]-[SPACE_NAME].hf.space/gradio/
## Troubleshooting
### Common Issues
#### 1. File Upload Fails
**Symptoms:** "Error reading file" or "Failed to decode uploaded file"
**Solutions:**
- Verify file is properly base64 encoded
- Check file size limits (default: varies by deployment)
- Ensure MIME type is in `multimodalAcceptedMimetypes`
#### 2. Slow Responses
**Symptoms:** Long wait times for responses
**Solutions:**
- Check network latency to external services
- Verify `MAX_CONTEXT_CHARS` isn't too high
- Consider enabling `DIRECT_OUTPUT` for suitable file types
- Check logs for retrieval/generation bottlenecks
#### 3. Cache Not Clearing
**Symptoms:** Same file shows cached results when it shouldn't
**Solutions:**
- Call `/clear-cache` endpoint
- Restart the service (clears in-memory cache)
- Check if `DIRECT_OUTPUT=True` in config
#### 4. Service Connection Errors
**Symptoms:** "Connection refused" or timeout errors
**Solutions:**
- Verify all service URLs in `params.cfg` are accessible
- Check HF_TOKEN is valid and has access to private spaces (*NOTE - THE ORCHESTRATOR CURRENTLY MUST BE PUBLIC*)
- Test each service independently with health checks
- Review firewall/network policies
### Version History
- **v1.0.0**: Initial release with LangGraph orchestration
- Current implementation supports streaming, caching, and dual-mode processing
---
**Documentation Last Updated:** 2025-10-01
**Compatible With:** Python 3.10+, LangGraph 0.2+, FastAPI 0.100+
"""