File size: 4,812 Bytes
712579e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
import logging
from groq import Client
import requests
import os
from weaviate.classes.init import Auth
import pypdf
import docx
from clients import get_weaviate_client
from langchain.text_splitter import RecursiveCharacterTextSplitter
from dotenv import load_dotenv
from configs import load_yaml_config
load_dotenv()
# Load SiliconFlow API key
SILICONFLOW_API_KEY = os.getenv("SILICONFLOW_API_KEY")
SILICONFLOW_EMBEDDING_URL = os.getenv("SILICONFLOW_EMBEDDING_URL")
# load config from yaml
config = load_yaml_config("config.yaml")
try:
from logger.custom_logger import CustomLoggerTracker
custom_log = CustomLoggerTracker()
logger = custom_log.get_logger("rags_steps")
except ImportError:
# Fallback to standard logging if custom logger not available
logger = logging.getLogger("rag_steps")
# βββ Utility: Extract raw text ββββββββββββββββββββββββββββββββββββββββββββββββββ
def extract_text(file_path: str) -> str:
ext = os.path.splitext(file_path)[1].lower()
if ext == ".pdf":
text = ""
with open(file_path, "rb") as f:
reader = pypdf.PdfReader(f)
for page in reader.pages:
page_text = page.extract_text() or ""
text += page_text + "\n"
elif ext == ".docx":
doc = docx.Document(file_path)
text = "\n".join(p.text for p in doc.paragraphs)
elif ext == ".txt":
with open(file_path, "r", encoding="utf-8") as f:
text = f.read()
else:
raise ValueError("Unsupported file format. Use PDF, DOCX, or TXT.")
return text
# βββ Chunker & Embed ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
splitter = RecursiveCharacterTextSplitter(
chunk_size=config["chunking"]["chunk_size"],
chunk_overlap=config["chunking"]["chunk_size"],
separators=config["chunking"]["chunk_size"],)
def embed_texts(texts: list[str], batch_size: int = 50) -> list[list[float]]:
all_embeddings = []
headers = {
"Authorization": f"Bearer {SILICONFLOW_API_KEY}",
"Content-Type": "application/json"}
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
logger.info(f"Embedding Model: {config['apis_models']['silicon_flow']['qwen']['embed']}")
payload = {
"model": config["apis_models"]["silicon_flow"]["qwen"]["embed"],
"input": batch}
response = requests.post(
SILICONFLOW_EMBEDDING_URL, json=payload, headers=headers)
# response.raise_for_status()
data = response.json()
if "data" not in data:
raise ValueError(f"Invalid response format: {data}")
batch_embs = [item["embedding"] for item in data["data"]]
all_embeddings.extend(batch_embs)
# Fill with empty embeddings in case of failure
all_embeddings.extend([[] for _ in batch])
return all_embeddings
# βββ Ingest & Index βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def ingest_file(file_path: str) -> str:
raw = extract_text(file_path)
docs = splitter.split_text(raw)
texts = [chunk for chunk in docs]
vectors = embed_texts(texts)
client = get_weaviate_client()
if client is None:
logger.info("Weaviate client not connected. Please check your WEAVIATE_URL and WEAVIATE_API_KEY.")
else:
logger.info("Weaviate client connected (startup checks skipped).")
documents = client.collections.get(config["rag"]["weavaite_collection"])
with client.batch.dynamic() as batch:
for txt, vec in zip(texts, vectors):
batch.add_object(
collection=documents,
properties={"text": txt},
vector=vec)
return f"Ingested {len(texts)} chunks from {os.path.basename(file_path)}"
# client.close()
if __name__=="__main__":
logger.info(f"Test Pdf: ")
pdf_text = extract_text("tests/Computational Requirements for Embed.pdf")
logger.info(f"Extracted text from Pdf: {pdf_text}")
logger.info(f"Test txt: ")
txt_text = extract_text("assets/RAG_Documents/Autism_Books_1.txt")
logger.info(f"Extracted text from Txt file: {pdf_text}")
logger.info(f"Test docs: ")
docs_text = extract_text("tests/Computational Requirements for Embed.docx")
logger.info(f"Extracted text from Docs: {pdf_text}")
|