|
|
from fastapi import FastAPI, HTTPException |
|
|
from fastapi.responses import StreamingResponse |
|
|
from pydantic import BaseModel |
|
|
import httpx |
|
|
import os |
|
|
import json |
|
|
import time |
|
|
import uuid |
|
|
import asyncio |
|
|
from typing import List, Dict, Any, Optional, AsyncGenerator |
|
|
|
|
|
|
|
|
INFERENCE_API_KEY = os.environ.get("INFERENCE_API_KEY", "inference-00050468cc1c4a20bd5ca0997c752329") |
|
|
INFERENCE_API_URL = "https://api.inference.net/v1/chat/completions" |
|
|
SEARCH_API_URL = "https://searchapi.snapzion.com/search" |
|
|
NEWS_API_URL = "https://searchapi.snapzion.com/news" |
|
|
IMAGE_API_URL = "https://searchapi.snapzion.com/images" |
|
|
MODEL_NAME = "Binglity-Lite" |
|
|
BACKEND_MODEL = "meta-llama/llama-3.1-8b-instruct/fp-8" |
|
|
|
|
|
|
|
|
SYSTEM_PROMPT = """ |
|
|
You are "Binglity-Lite", a highly advanced AI search assistant. Your purpose is to provide users with accurate, comprehensive, and trustworthy answers by synthesizing information from a given set of web, news, and image search results. |
|
|
|
|
|
**Core Directives:** |
|
|
1. **Answer Directly**: Immediately address the user's question. **Do not** use introductory phrases like "Based on the search results...". Your tone should be confident, objective, and encyclopedic. |
|
|
2. **Synthesize, Don't Summarize**: Your primary task is to weave information from multiple sources into a single, cohesive, and well-structured answer. Do not simply describe what each source says one by one. |
|
|
3. **Cite with Inline Markdown Links**: This is your most important instruction. When you present a fact or a piece of information from a source, you **must** cite it immediately using an inline Markdown link. |
|
|
* **Format**: The format must be `[phrase or sentence containing the fact](URL)`. The URL must come from the `URL:` field of the provided source. |
|
|
* **Example**: If a source with URL `https://example.com/science` says "The Earth is the third planet from the Sun", your output should be: "The Earth is the [third planet from the Sun](https://example.com/science)." |
|
|
* **Rule**: Every piece of information in your answer must be attributable to a source via these inline links. |
|
|
4. **Be Fact-Based**: Your entire response must be based **exclusively** on the information provided in the search results. Do not use any outside knowledge. |
|
|
5. **Interpret Image Results**: For image search results, use the title and context to describe the image if it's relevant to the user's query. Cite the source page URL. |
|
|
6. **Filter for Relevance**: If a search result is not relevant to the user's query, ignore it completely. Do not mention it in your response. |
|
|
7. **Handle Ambiguity**: If the search results are contradictory or insufficient to answer the question fully, state this clearly in your response, citing the conflicting sources. |
|
|
|
|
|
**Final Output Structure:** |
|
|
Your final response MUST be structured in two parts: |
|
|
1. **The Synthesized Answer**: A well-written response that directly answers the user's query, with facts and statements properly cited using inline Markdown links as described above. |
|
|
2. **Sources Section**: After the answer, add a section header `## Sources`. Under this header, provide a bulleted list of the full titles and URLs of every source you used. |
|
|
* **Format**: `- [Title of Source](URL)` |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
app = FastAPI( |
|
|
title="Binglity-Lite API", |
|
|
description="A web, news, and image search-powered, streaming-capable chat completions API.", |
|
|
version="1.4.0", |
|
|
) |
|
|
|
|
|
|
|
|
class ChatMessage(BaseModel): |
|
|
role: str |
|
|
content: str |
|
|
|
|
|
class ChatCompletionRequest(BaseModel): |
|
|
model: str |
|
|
messages: List[ChatMessage] |
|
|
max_tokens: Optional[int] = 2048 |
|
|
temperature: Optional[float] = 0.7 |
|
|
stream: Optional[bool] = False |
|
|
|
|
|
|
|
|
async def perform_search(client: httpx.AsyncClient, url: str, query: str, source_type: str) -> List[Dict[str, Any]]: |
|
|
"""Generic function to perform a search against a given API.""" |
|
|
try: |
|
|
response = await client.get(url, params={"query": query, "max_results": 10}) |
|
|
response.raise_for_status() |
|
|
results = response.json() |
|
|
for result in results: |
|
|
result['source_type'] = source_type |
|
|
return results |
|
|
except httpx.HTTPStatusError as e: |
|
|
print(f"Error from {source_type} API: {e.response.text}") |
|
|
return [] |
|
|
except Exception as e: |
|
|
print(f"An unexpected error occurred during {source_type} search: {str(e)}") |
|
|
return [] |
|
|
|
|
|
def format_search_results_for_prompt(results: List[Dict[str, Any]]) -> str: |
|
|
"""Formats combined search results for the language model prompt.""" |
|
|
if not results: |
|
|
return "No relevant search results were found. Inform the user that you could not find information on their query." |
|
|
|
|
|
formatted = "### Search Results ###\n\n" |
|
|
for i, result in enumerate(results): |
|
|
source_type = result.get('source_type', 'Search') |
|
|
formatted += f"Source [{i+1}] ({source_type}):\n" |
|
|
formatted += f"Title: {result.get('title', 'N/A')}\n" |
|
|
formatted += f"URL: {result.get('url', 'N/A')}\n" |
|
|
|
|
|
if source_type == 'Image': |
|
|
formatted += f"Content: [Image Result] A picture titled '{result.get('title', 'N/A')}'\n" |
|
|
formatted += f"Image URL: {result.get('image', 'N/A')}\n\n" |
|
|
else: |
|
|
formatted += f"Content: {result.get('description', 'N/A')}\n\n" |
|
|
|
|
|
return formatted |
|
|
|
|
|
|
|
|
async def stream_response_generator(payload: Dict[str, Any]) -> AsyncGenerator[str, None]: |
|
|
"""Generates server-sent events for streaming responses.""" |
|
|
headers = { |
|
|
"Authorization": f"Bearer {INFERENCE_API_KEY}", |
|
|
"Content-Type": "application/json", |
|
|
"Accept": "text/event-stream" |
|
|
} |
|
|
response_id = f"chatcmpl-{uuid.uuid4()}" |
|
|
created_time = int(time.time()) |
|
|
|
|
|
async with httpx.AsyncClient(timeout=300.0) as client: |
|
|
async with client.stream("POST", INFERENCE_API_URL, json=payload, headers=headers) as response: |
|
|
if response.status_code != 200: |
|
|
error_content = await response.aread() |
|
|
raise HTTPException(status_code=response.status_code, detail=f"Error from inference API: {error_content.decode()}") |
|
|
|
|
|
async for line in response.aiter_lines(): |
|
|
if line.startswith("data:"): |
|
|
line_data = line[len("data:"):].strip() |
|
|
if line_data == "[DONE]": |
|
|
yield f"data: {json.dumps({'id': response_id, 'model': MODEL_NAME, 'object': 'chat.completion.chunk', 'created': created_time, 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]})}\n\n" |
|
|
yield "data: [DONE]\n\n" |
|
|
break |
|
|
|
|
|
try: |
|
|
chunk = json.loads(line_data) |
|
|
|
|
|
if chunk.get("choices") and len(chunk["choices"]) > 0: |
|
|
formatted_chunk = { |
|
|
"id": response_id, "object": "chat.completion.chunk", "created": created_time, "model": MODEL_NAME, |
|
|
"choices": [{ |
|
|
"index": 0, |
|
|
"delta": chunk["choices"][0].get("delta", {}), |
|
|
"finish_reason": chunk["choices"][0].get("finish_reason") |
|
|
}] |
|
|
} |
|
|
yield f"data: {json.dumps(formatted_chunk)}\n\n" |
|
|
except (json.JSONDecodeError, IndexError): |
|
|
continue |
|
|
|
|
|
|
|
|
@app.post("/v1/chat/completions") |
|
|
async def chat_completions(request: ChatCompletionRequest): |
|
|
if request.model != MODEL_NAME: |
|
|
raise HTTPException(status_code=400, detail=f"Model not supported. Please use '{MODEL_NAME}'.") |
|
|
|
|
|
user_query = request.messages[-1].content if request.messages else "" |
|
|
if not user_query or request.messages[-1].role.lower() != 'user': |
|
|
raise HTTPException(status_code=400, detail="The last message must be from the 'user' and contain content.") |
|
|
|
|
|
|
|
|
async with httpx.AsyncClient() as client: |
|
|
search_tasks = [ |
|
|
perform_search(client, SEARCH_API_URL, user_query, "Web"), |
|
|
perform_search(client, NEWS_API_URL, user_query, "News"), |
|
|
perform_search(client, IMAGE_API_URL, user_query, "Image"), |
|
|
] |
|
|
all_results = await asyncio.gather(*search_tasks) |
|
|
|
|
|
|
|
|
combined_results = [] |
|
|
seen_urls = set() |
|
|
for result_list in all_results: |
|
|
for result in result_list: |
|
|
url = result.get('url') |
|
|
if url and url not in seen_urls: |
|
|
combined_results.append(result) |
|
|
seen_urls.add(url) |
|
|
|
|
|
formatted_results = format_search_results_for_prompt(combined_results) |
|
|
|
|
|
final_user_prompt = f"User's question: \"{user_query}\"\n\nUse the web, news, and image search results below to answer the user's question. Follow all rules in your system prompt exactly.\n\n{formatted_results}" |
|
|
|
|
|
payload = { |
|
|
"model": BACKEND_MODEL, |
|
|
"messages": [ |
|
|
{"role": "system", "content": SYSTEM_PROMPT}, |
|
|
{"role": "user", "content": final_user_prompt}, |
|
|
], |
|
|
"max_tokens": request.max_tokens, |
|
|
"temperature": request.temperature, |
|
|
"stream": request.stream, |
|
|
} |
|
|
|
|
|
if request.stream: |
|
|
return StreamingResponse(stream_response_generator(payload), media_type="text/event-stream") |
|
|
else: |
|
|
headers = {"Authorization": f"Bearer {INFERENCE_API_KEY}"} |
|
|
async with httpx.AsyncClient(timeout=120.0) as client: |
|
|
try: |
|
|
response = await client.post(INFERENCE_API_URL, json=payload, headers=headers) |
|
|
response.raise_for_status() |
|
|
model_response = response.json() |
|
|
|
|
|
|
|
|
if not model_response.get("choices") or len(model_response["choices"]) == 0: |
|
|
raise HTTPException(status_code=500, detail="Invalid response from inference API: 'choices' field is missing or empty.") |
|
|
|
|
|
return { |
|
|
"id": model_response.get("id", f"chatcmpl-{uuid.uuid4()}"), "object": "chat.completion", "created": model_response.get("created", int(time.time())), "model": MODEL_NAME, |
|
|
"choices": [{"index": 0, "message": {"role": "assistant", "content": model_response["choices"][0]["message"]["content"],}, "finish_reason": "stop",}], |
|
|
"usage": model_response.get("usage", {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}), |
|
|
} |
|
|
except httpx.HTTPStatusError as e: |
|
|
raise HTTPException(status_code=e.response.status_code, detail=f"Error from inference API: {e.response.text}") |
|
|
|
|
|
@app.get("/") |
|
|
def read_root(): |
|
|
return {"message": "Welcome to the Binglity-Lite API. Use the /v1/chat/completions endpoint."} |