Binglity / main.py
rkihacker's picture
Update main.py
b2806a7 verified
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import httpx
import os
import json
import time
import uuid
import asyncio
from typing import List, Dict, Any, Optional, AsyncGenerator
# --- Configuration ---
INFERENCE_API_KEY = os.environ.get("INFERENCE_API_KEY", "inference-00050468cc1c4a20bd5ca0997c752329")
INFERENCE_API_URL = "https://api.inference.net/v1/chat/completions"
SEARCH_API_URL = "https://searchapi.snapzion.com/search"
NEWS_API_URL = "https://searchapi.snapzion.com/news"
IMAGE_API_URL = "https://searchapi.snapzion.com/images" # Added Image API URL
MODEL_NAME = "Binglity-Lite"
BACKEND_MODEL = "meta-llama/llama-3.1-8b-instruct/fp-8"
# --- Final Advanced System Prompt ---
SYSTEM_PROMPT = """
You are "Binglity-Lite", a highly advanced AI search assistant. Your purpose is to provide users with accurate, comprehensive, and trustworthy answers by synthesizing information from a given set of web, news, and image search results.
**Core Directives:**
1. **Answer Directly**: Immediately address the user's question. **Do not** use introductory phrases like "Based on the search results...". Your tone should be confident, objective, and encyclopedic.
2. **Synthesize, Don't Summarize**: Your primary task is to weave information from multiple sources into a single, cohesive, and well-structured answer. Do not simply describe what each source says one by one.
3. **Cite with Inline Markdown Links**: This is your most important instruction. When you present a fact or a piece of information from a source, you **must** cite it immediately using an inline Markdown link.
* **Format**: The format must be `[phrase or sentence containing the fact](URL)`. The URL must come from the `URL:` field of the provided source.
* **Example**: If a source with URL `https://example.com/science` says "The Earth is the third planet from the Sun", your output should be: "The Earth is the [third planet from the Sun](https://example.com/science)."
* **Rule**: Every piece of information in your answer must be attributable to a source via these inline links.
4. **Be Fact-Based**: Your entire response must be based **exclusively** on the information provided in the search results. Do not use any outside knowledge.
5. **Interpret Image Results**: For image search results, use the title and context to describe the image if it's relevant to the user's query. Cite the source page URL.
6. **Filter for Relevance**: If a search result is not relevant to the user's query, ignore it completely. Do not mention it in your response.
7. **Handle Ambiguity**: If the search results are contradictory or insufficient to answer the question fully, state this clearly in your response, citing the conflicting sources.
**Final Output Structure:**
Your final response MUST be structured in two parts:
1. **The Synthesized Answer**: A well-written response that directly answers the user's query, with facts and statements properly cited using inline Markdown links as described above.
2. **Sources Section**: After the answer, add a section header `## Sources`. Under this header, provide a bulleted list of the full titles and URLs of every source you used.
* **Format**: `- [Title of Source](URL)`
"""
# --- FastAPI App ---
app = FastAPI(
title="Binglity-Lite API",
description="A web, news, and image search-powered, streaming-capable chat completions API.",
version="1.4.0",
)
# --- Pydantic Models for OpenAI Compatibility ---
class ChatMessage(BaseModel):
role: str
content: str
class ChatCompletionRequest(BaseModel):
model: str
messages: List[ChatMessage]
max_tokens: Optional[int] = 2048
temperature: Optional[float] = 0.7
stream: Optional[bool] = False
# --- Search Functions ---
async def perform_search(client: httpx.AsyncClient, url: str, query: str, source_type: str) -> List[Dict[str, Any]]:
"""Generic function to perform a search against a given API."""
try:
response = await client.get(url, params={"query": query, "max_results": 10})
response.raise_for_status()
results = response.json()
for result in results:
result['source_type'] = source_type
return results
except httpx.HTTPStatusError as e:
print(f"Error from {source_type} API: {e.response.text}")
return []
except Exception as e:
print(f"An unexpected error occurred during {source_type} search: {str(e)}")
return []
def format_search_results_for_prompt(results: List[Dict[str, Any]]) -> str:
"""Formats combined search results for the language model prompt."""
if not results:
return "No relevant search results were found. Inform the user that you could not find information on their query."
formatted = "### Search Results ###\n\n"
for i, result in enumerate(results):
source_type = result.get('source_type', 'Search')
formatted += f"Source [{i+1}] ({source_type}):\n"
formatted += f"Title: {result.get('title', 'N/A')}\n"
formatted += f"URL: {result.get('url', 'N/A')}\n"
if source_type == 'Image':
formatted += f"Content: [Image Result] A picture titled '{result.get('title', 'N/A')}'\n"
formatted += f"Image URL: {result.get('image', 'N/A')}\n\n"
else:
formatted += f"Content: {result.get('description', 'N/A')}\n\n"
return formatted
# --- Streaming Logic ---
async def stream_response_generator(payload: Dict[str, Any]) -> AsyncGenerator[str, None]:
"""Generates server-sent events for streaming responses."""
headers = {
"Authorization": f"Bearer {INFERENCE_API_KEY}",
"Content-Type": "application/json",
"Accept": "text/event-stream"
}
response_id = f"chatcmpl-{uuid.uuid4()}"
created_time = int(time.time())
async with httpx.AsyncClient(timeout=300.0) as client:
async with client.stream("POST", INFERENCE_API_URL, json=payload, headers=headers) as response:
if response.status_code != 200:
error_content = await response.aread()
raise HTTPException(status_code=response.status_code, detail=f"Error from inference API: {error_content.decode()}")
async for line in response.aiter_lines():
if line.startswith("data:"):
line_data = line[len("data:"):].strip()
if line_data == "[DONE]":
yield f"data: {json.dumps({'id': response_id, 'model': MODEL_NAME, 'object': 'chat.completion.chunk', 'created': created_time, 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]})}\n\n"
yield "data: [DONE]\n\n"
break
try:
chunk = json.loads(line_data)
# **ERROR FIX**: Check if 'choices' exists and is not empty before accessing
if chunk.get("choices") and len(chunk["choices"]) > 0:
formatted_chunk = {
"id": response_id, "object": "chat.completion.chunk", "created": created_time, "model": MODEL_NAME,
"choices": [{
"index": 0,
"delta": chunk["choices"][0].get("delta", {}),
"finish_reason": chunk["choices"][0].get("finish_reason")
}]
}
yield f"data: {json.dumps(formatted_chunk)}\n\n"
except (json.JSONDecodeError, IndexError):
continue
# --- API Endpoint ---
@app.post("/v1/chat/completions")
async def chat_completions(request: ChatCompletionRequest):
if request.model != MODEL_NAME:
raise HTTPException(status_code=400, detail=f"Model not supported. Please use '{MODEL_NAME}'.")
user_query = request.messages[-1].content if request.messages else ""
if not user_query or request.messages[-1].role.lower() != 'user':
raise HTTPException(status_code=400, detail="The last message must be from the 'user' and contain content.")
# Perform all searches concurrently
async with httpx.AsyncClient() as client:
search_tasks = [
perform_search(client, SEARCH_API_URL, user_query, "Web"),
perform_search(client, NEWS_API_URL, user_query, "News"),
perform_search(client, IMAGE_API_URL, user_query, "Image"),
]
all_results = await asyncio.gather(*search_tasks)
# Combine results and remove duplicates by URL
combined_results = []
seen_urls = set()
for result_list in all_results:
for result in result_list:
url = result.get('url')
if url and url not in seen_urls:
combined_results.append(result)
seen_urls.add(url)
formatted_results = format_search_results_for_prompt(combined_results)
final_user_prompt = f"User's question: \"{user_query}\"\n\nUse the web, news, and image search results below to answer the user's question. Follow all rules in your system prompt exactly.\n\n{formatted_results}"
payload = {
"model": BACKEND_MODEL,
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": final_user_prompt},
],
"max_tokens": request.max_tokens,
"temperature": request.temperature,
"stream": request.stream,
}
if request.stream:
return StreamingResponse(stream_response_generator(payload), media_type="text/event-stream")
else:
headers = {"Authorization": f"Bearer {INFERENCE_API_KEY}"}
async with httpx.AsyncClient(timeout=120.0) as client:
try:
response = await client.post(INFERENCE_API_URL, json=payload, headers=headers)
response.raise_for_status()
model_response = response.json()
# Ensure the response structure is valid before returning
if not model_response.get("choices") or len(model_response["choices"]) == 0:
raise HTTPException(status_code=500, detail="Invalid response from inference API: 'choices' field is missing or empty.")
return {
"id": model_response.get("id", f"chatcmpl-{uuid.uuid4()}"), "object": "chat.completion", "created": model_response.get("created", int(time.time())), "model": MODEL_NAME,
"choices": [{"index": 0, "message": {"role": "assistant", "content": model_response["choices"][0]["message"]["content"],}, "finish_reason": "stop",}],
"usage": model_response.get("usage", {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}),
}
except httpx.HTTPStatusError as e:
raise HTTPException(status_code=e.response.status_code, detail=f"Error from inference API: {e.response.text}")
@app.get("/")
def read_root():
return {"message": "Welcome to the Binglity-Lite API. Use the /v1/chat/completions endpoint."}