|
|
import io |
|
|
from fastapi import FastAPI, File, UploadFile |
|
|
import subprocess |
|
|
import os |
|
|
import requests |
|
|
import random |
|
|
from datetime import datetime |
|
|
from datetime import date |
|
|
import json |
|
|
from pydantic import BaseModel |
|
|
from typing import Annotated |
|
|
import random |
|
|
from fastapi import FastAPI, Response |
|
|
import string |
|
|
import time |
|
|
from huggingface_hub import InferenceClient |
|
|
|
|
|
from fastapi import Form |
|
|
|
|
|
class Query(BaseModel): |
|
|
text: str |
|
|
code:str |
|
|
host:str |
|
|
|
|
|
class Query2(BaseModel): |
|
|
text: str |
|
|
code:str |
|
|
filename:str |
|
|
host:str |
|
|
|
|
|
class QueryM(BaseModel): |
|
|
text: str |
|
|
tokens:int |
|
|
temp:float |
|
|
topp:float |
|
|
topk:float |
|
|
|
|
|
|
|
|
|
|
|
from fastapi import FastAPI, Request, Depends, UploadFile, File |
|
|
from fastapi.exceptions import HTTPException |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
from fastapi.responses import JSONResponse |
|
|
|
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=['*'], |
|
|
allow_credentials=True, |
|
|
allow_methods=['*'], |
|
|
allow_headers=['*'], |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.on_event("startup") |
|
|
async def startup_event(): |
|
|
print("on startup") |
|
|
|
|
|
|
|
|
audio_space="https://audiospace-1-u9912847.deta.app/uphoto" |
|
|
|
|
|
import threading |
|
|
from huggingface_hub.inference_api import InferenceApi |
|
|
client = InferenceClient() |
|
|
|
|
|
|
|
|
@app.post("/image") |
|
|
async def get_answer(q: Query ): |
|
|
text = q.text |
|
|
try: |
|
|
global client |
|
|
imagei = client.text_to_image(text) |
|
|
byte_array = io.BytesIO() |
|
|
imagei.save(byte_array, format='JPEG') |
|
|
response = Response(content=byte_array.getvalue(), media_type="image/png") |
|
|
return response |
|
|
|
|
|
except: |
|
|
return JSONResponse({"status":False}) |
|
|
|
|
|
|
|
|
@app.post("/mistral") |
|
|
async def get_answer(q: QueryM ): |
|
|
text = q.text |
|
|
try: |
|
|
client = InferenceClient() |
|
|
generate_kwargs = dict( |
|
|
max_new_tokens= int(q.tokens), |
|
|
do_sample=True, |
|
|
top_p= q.topp, |
|
|
top_k=int(q.topk), |
|
|
temperature=q.temp, |
|
|
) |
|
|
inputs= text |
|
|
response = client.post(json={"inputs": inputs, "parameters": generate_kwargs}, model="mistralai/Mistral-7B-Instruct-v0.1") |
|
|
json_string = response.decode('utf-8') |
|
|
list_of_dicts = json.loads(json_string) |
|
|
result_dict = list_of_dicts[0] |
|
|
x=(result_dict['generated_text']) |
|
|
x=x.replace(inputs,'') |
|
|
return JSONResponse({"result":x,"status":True}) |
|
|
except Exception as e: |
|
|
print(e) |
|
|
return JSONResponse({"status":False}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
''' to be removed when main code is updated ''' |
|
|
|
|
|
@app.post("/") |
|
|
async def get_answer(q: Query ): |
|
|
|
|
|
text = q.text |
|
|
code= q.code |
|
|
host= q.host |
|
|
|
|
|
|
|
|
N = 20 |
|
|
res = ''.join(random.choices(string.ascii_uppercase + |
|
|
string.digits, k=N)) |
|
|
|
|
|
|
|
|
|
|
|
res= res+ str(time.time()) |
|
|
|
|
|
filename= res |
|
|
|
|
|
t = threading.Thread(target=do_ML, args=(filename,text,code,host)) |
|
|
t.start() |
|
|
|
|
|
return JSONResponse({"id": filename}) |
|
|
|
|
|
return "hello" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.post("/error") |
|
|
async def get_answer(q: Query2 ): |
|
|
|
|
|
text = q.text |
|
|
code= q.code |
|
|
filename= q.filename |
|
|
host= q.host |
|
|
|
|
|
|
|
|
t = threading.Thread(target=do_ML, args=(filename,text,code,host)) |
|
|
t.start() |
|
|
|
|
|
return JSONResponse({"id": filename}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import requests |
|
|
import io |
|
|
import io |
|
|
from PIL import Image |
|
|
import json |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def do_ML(filename:str,text:str,code:str,host:str): |
|
|
try: |
|
|
global client |
|
|
|
|
|
imagei = client.text_to_image(text) |
|
|
|
|
|
byte_array = io.BytesIO() |
|
|
imagei.save(byte_array, format='JPEG') |
|
|
image_bytes = byte_array.getvalue() |
|
|
|
|
|
|
|
|
files = {'file': image_bytes} |
|
|
|
|
|
global audio_space |
|
|
url = audio_space+code |
|
|
|
|
|
data = {"filename": filename} |
|
|
response = requests.post(url, files=files,data= data) |
|
|
|
|
|
print(response.text) |
|
|
|
|
|
if response.status_code == 200: |
|
|
print("File uploaded successfully.") |
|
|
|
|
|
else: |
|
|
print("File upload failed.") |
|
|
|
|
|
except: |
|
|
data={"text":text,"filename":filename} |
|
|
requests.post(host+"texttoimage2handleerror",data=data) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|