Remostart's picture
FARMLINGUA AI CONVERSATIONAL initial commit
f825332 verified
# farmlingua/app/agents/crew_pipeline.pymemorysection
import os
import sys
import re
import uuid
import requests
import joblib
import faiss
import numpy as np
import torch
import fasttext
from huggingface_hub import hf_hub_download
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
from sentence_transformers import SentenceTransformer
from app.utils import config
from app.utils.memory import memory_store # memory module
from typing import List
hf_cache = "/models/huggingface"
os.environ["HF_HOME"] = hf_cache
os.environ["TRANSFORMERS_CACHE"] = hf_cache
os.environ["HUGGINGFACE_HUB_CACHE"] = hf_cache
os.makedirs(hf_cache, exist_ok=True)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if BASE_DIR not in sys.path:
sys.path.insert(0, BASE_DIR)
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
try:
classifier = joblib.load(config.CLASSIFIER_PATH)
except Exception:
classifier = None
print(f"Loading expert model ({config.EXPERT_MODEL_NAME})...")
tokenizer = AutoTokenizer.from_pretrained(config.EXPERT_MODEL_NAME, use_fast=False)
model = AutoModelForCausalLM.from_pretrained(
config.EXPERT_MODEL_NAME,
torch_dtype="auto",
device_map="auto"
)
embedder = SentenceTransformer(config.EMBEDDING_MODEL)
# language detector
print(f"Loading FastText language identifier ({config.LANG_ID_MODEL_REPO})...")
lang_model_path = hf_hub_download(
repo_id=config.LANG_ID_MODEL_REPO,
filename=getattr(config, "LANG_ID_MODEL_FILE", "model.bin")
)
lang_identifier = fasttext.load_model(lang_model_path)
def detect_language(text: str, top_k: int = 1):
if not text or not text.strip():
return [("eng_Latn", 1.0)]
clean_text = text.replace("\n", " ").strip()
labels, probs = lang_identifier.predict(clean_text, k=top_k)
return [(l.replace("__label__", ""), float(p)) for l, p in zip(labels, probs)]
# Translation model
print(f"Loading translation model ({config.TRANSLATION_MODEL_NAME})...")
translation_pipeline = pipeline(
"translation",
model=config.TRANSLATION_MODEL_NAME,
device=0 if DEVICE == "cuda" else -1,
max_new_tokens=400,
)
SUPPORTED_LANGS = {
"eng_Latn": "English",
"ibo_Latn": "Igbo",
"yor_Latn": "Yoruba",
"hau_Latn": "Hausa",
"swh_Latn": "Swahili",
"amh_Latn": "Amharic",
}
# Text chunking
_SENTENCE_SPLIT_RE = re.compile(r'(?<=[.!?])\s+')
def chunk_text(text: str, max_len: int = 400) -> List[str]:
if not text:
return []
sentences = _SENTENCE_SPLIT_RE.split(text)
chunks, current = [], ""
for s in sentences:
if not s:
continue
if len(current) + len(s) + 1 <= max_len:
current = (current + " " + s).strip()
else:
if current:
chunks.append(current.strip())
current = s.strip()
if current:
chunks.append(current.strip())
return chunks
def translate_text(text: str, src_lang: str, tgt_lang: str, max_chunk_len: int = 400) -> str:
if not text.strip():
return text
chunks = chunk_text(text, max_len=max_chunk_len)
translated_parts = []
for chunk in chunks:
res = translation_pipeline(chunk, src_lang=src_lang, tgt_lang=tgt_lang)
translated_parts.append(res[0]["translation_text"])
return " ".join(translated_parts).strip()
# RAG retrieval
def retrieve_docs(query: str, vs_path: str):
if not vs_path or not os.path.exists(vs_path):
return None
try:
index = faiss.read_index(str(vs_path))
except Exception:
return None
query_vec = np.array([embedder.encode(query)], dtype=np.float32)
D, I = index.search(query_vec, k=3)
if D[0][0] == 0:
return None
meta_path = str(vs_path) + "_meta.npy"
if os.path.exists(meta_path):
metadata = np.load(meta_path, allow_pickle=True).item()
docs = [metadata.get(str(idx), "") for idx in I[0] if str(idx) in metadata]
docs = [d for d in docs if d]
return "\n\n".join(docs) if docs else None
return None
def get_weather(state_name: str) -> str:
url = "http://api.weatherapi.com/v1/current.json"
params = {"key": config.WEATHER_API_KEY, "q": f"{state_name}, Nigeria", "aqi": "no"}
r = requests.get(url, params=params, timeout=10)
if r.status_code != 200:
return f"Unable to retrieve weather for {state_name}."
data = r.json()
return (
f"Weather in {state_name}:\n"
f"- Condition: {data['current']['condition']['text']}\n"
f"- Temperature: {data['current']['temp_c']}°C\n"
f"- Humidity: {data['current']['humidity']}%\n"
f"- Wind: {data['current']['wind_kph']} kph"
)
def detect_intent(query: str):
q_lower = (query or "").lower()
if any(word in q_lower for word in ["weather", "temperature", "rain", "forecast"]):
for state in getattr(config, "STATES", []):
if state.lower() in q_lower:
return "weather", state
return "weather", None
if any(word in q_lower for word in ["latest", "update", "breaking", "news", "current", "predict"]):
return "live_update", None
if hasattr(classifier, "predict") and hasattr(classifier, "predict_proba"):
try:
predicted_intent = classifier.predict([query])[0]
confidence = max(classifier.predict_proba([query])[0])
if confidence < getattr(config, "CLASSIFIER_CONFIDENCE_THRESHOLD", 0.6):
return "low_confidence", None
return predicted_intent, None
except Exception:
pass
return "normal", None
# expert runner
def run_qwen(messages: List[dict], max_new_tokens: int = 1300) -> str:
text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
inputs = tokenizer([text], return_tensors="pt").to(model.device)
generated_ids = model.generate(
**inputs,
max_new_tokens=max_new_tokens,
temperature=0.4,
repetition_penalty=1.1
)
output_ids = generated_ids[0][len(inputs.input_ids[0]):].tolist()
return tokenizer.decode(output_ids, skip_special_tokens=True).strip()
# Memory
MAX_HISTORY_MESSAGES = getattr(config, "MAX_HISTORY_MESSAGES", 30)
def build_messages_from_history(history: List[dict], system_prompt: str) -> List[dict]:
msgs = [{"role": "system", "content": system_prompt}]
msgs.extend(history)
return msgs
# Main pipeline
def run_pipeline(user_query: str, session_id: str = None):
"""
Run FarmLingua pipeline with per-session memory.
Each session_id keeps its own history.
"""
if session_id is None:
session_id = str(uuid.uuid4()) # fallback unique session
# Language detection
lang_label, prob = detect_language(user_query, top_k=1)[0]
if lang_label not in SUPPORTED_LANGS:
lang_label = "eng_Latn"
translated_query = (
translate_text(user_query, src_lang=lang_label, tgt_lang="eng_Latn")
if lang_label != "eng_Latn"
else user_query
)
intent, extra = detect_intent(translated_query)
# Load conversation history
history = memory_store.get_history(session_id) or []
if len(history) > MAX_HISTORY_MESSAGES:
history = history[-MAX_HISTORY_MESSAGES:]
history.append({"role": "user", "content": translated_query})
system_prompt = (
"You are FarmLingua, an AI assistant for Nigerian farmers. "
"Answer directly without repeating the question. "
"Use clear farmer-friendly English with emojis . "
"Avoid jargon and irrelevant details. "
"If asked who built you, say: 'KawaFarm LTD developed me to help farmers.'"
)
if intent == "weather" and extra:
weather_text = get_weather(extra)
history.append({"role": "user", "content": f"Rewrite this weather update simply for farmers:\n{weather_text}"})
messages_for_qwen = build_messages_from_history(history, system_prompt)
english_answer = run_qwen(messages_for_qwen, max_new_tokens=256)
else:
if intent == "live_update":
context = retrieve_docs(translated_query, config.LIVE_VS_PATH)
if context:
history.append({"role": "user", "content": f"Latest agricultural updates:\n{context}"})
if intent == "low_confidence":
context = retrieve_docs(translated_query, config.STATIC_VS_PATH)
if context:
history.append({"role": "user", "content": f"Reference information:\n{context}"})
messages_for_qwen = build_messages_from_history(history, system_prompt)
english_answer = run_qwen(messages_for_qwen, max_new_tokens=700)
# Save assistant reply
history.append({"role": "assistant", "content": english_answer})
if len(history) > MAX_HISTORY_MESSAGES:
history = history[-MAX_HISTORY_MESSAGES:]
memory_store.save_history(session_id, history)
# Translate back if needed
final_answer = (
translate_text(english_answer, src_lang="eng_Latn", tgt_lang=lang_label)
if lang_label != "eng_Latn"
else english_answer
)
return {
"session_id": session_id,
"detected_language": SUPPORTED_LANGS.get(lang_label, "Unknown"),
"answer": final_answer
}