|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os, io, time, json, threading, logging, requests |
|
|
from typing import Optional, List, Tuple |
|
|
from flask import Flask, request, jsonify, send_file, render_template_string |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger("kcrobot.v4.1") |
|
|
app = Flask(__name__) |
|
|
|
|
|
|
|
|
HF_API_TOKEN = os.getenv("HF_API_TOKEN", "") |
|
|
HF_MODEL = os.getenv("HF_MODEL", "google/flan-t5-large") |
|
|
HF_TTS_MODEL = os.getenv("HF_TTS_MODEL", "facebook/tts_transformer-en-ljspeech") |
|
|
HF_STT_MODEL = os.getenv("HF_STT_MODEL", "openai/whisper-small") |
|
|
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN", "") |
|
|
PORT = int(os.getenv("PORT", 7860)) |
|
|
|
|
|
HF_HEADERS = {"Authorization": f"Bearer {HF_API_TOKEN}"} if HF_API_TOKEN else {} |
|
|
if not HF_API_TOKEN: |
|
|
logger.warning("⚠️ HF_API_TOKEN not set. Put HF_API_TOKEN in Secrets.") |
|
|
|
|
|
|
|
|
CONV: List[Tuple[str, str]] = [] |
|
|
DISPLAY_LINES: List[str] = [] |
|
|
|
|
|
def push_display(line: str, limit=6): |
|
|
global DISPLAY_LINES |
|
|
DISPLAY_LINES.append(line) |
|
|
if len(DISPLAY_LINES) > limit: |
|
|
DISPLAY_LINES = DISPLAY_LINES[-limit:] |
|
|
|
|
|
|
|
|
def hf_text_generate(prompt: str, model: Optional[str] = None, max_new_tokens: int = 256) -> str: |
|
|
model = model or HF_MODEL |
|
|
url = f"https://api-inference.huggingface.co/models/{model}" |
|
|
payload = { |
|
|
"inputs": prompt, |
|
|
"parameters": {"max_new_tokens": max_new_tokens, "temperature": 0.7}, |
|
|
"options": {"wait_for_model": True} |
|
|
} |
|
|
r = requests.post(url, headers=HF_HEADERS, json=payload, timeout=120) |
|
|
if r.status_code != 200: |
|
|
raise RuntimeError(f"HF text generation failed: {r.status_code}: {r.text}") |
|
|
data = r.json() |
|
|
if isinstance(data, list) and len(data) and isinstance(data[0], dict): |
|
|
return data[0].get("generated_text", "") |
|
|
if isinstance(data, dict) and "generated_text" in data: |
|
|
return data["generated_text"] |
|
|
return str(data) |
|
|
|
|
|
def hf_tts_get_mp3(text: str, model: Optional[str] = None) -> bytes: |
|
|
model = model or HF_TTS_MODEL |
|
|
url = f"https://api-inference.huggingface.co/models/{model}" |
|
|
payload = {"inputs": text} |
|
|
headers = dict(HF_HEADERS) |
|
|
headers["Content-Type"] = "application/json" |
|
|
r = requests.post(url, headers=headers, json=payload, stream=True, timeout=120) |
|
|
if r.status_code != 200: |
|
|
raise RuntimeError(f"HF TTS failed: {r.status_code}: {r.text}") |
|
|
return r.content |
|
|
|
|
|
def hf_stt_from_bytes(audio_bytes: bytes, model: Optional[str] = None) -> str: |
|
|
model = model or HF_STT_MODEL |
|
|
url = f"https://api-inference.huggingface.co/models/{model}" |
|
|
headers = dict(HF_HEADERS) |
|
|
headers["Content-Type"] = "application/octet-stream" |
|
|
r = requests.post(url, headers=headers, data=audio_bytes, timeout=180) |
|
|
if r.status_code != 200: |
|
|
raise RuntimeError(f"HF STT failed: {r.status_code}: {r.text}") |
|
|
j = r.json() |
|
|
return j.get("text", str(j)) |
|
|
|
|
|
|
|
|
@app.route("/ask", methods=["POST"]) |
|
|
def api_ask(): |
|
|
data = request.get_json(force=True) |
|
|
text = data.get("text", "").strip() |
|
|
lang = data.get("lang", "auto") |
|
|
if not text: |
|
|
return jsonify({"error": "no text"}), 400 |
|
|
if lang == "vi": |
|
|
prompt = "Bạn là trợ lý thông minh, trả lời bằng tiếng Việt:\n" + text |
|
|
elif lang == "en": |
|
|
prompt = "You are a helpful assistant. Answer in English:\n" + text |
|
|
else: |
|
|
prompt = "Bạn là trợ lý song ngữ Việt-Anh, trả lời theo ngôn ngữ người dùng:\n" + text |
|
|
try: |
|
|
ans = hf_text_generate(prompt) |
|
|
except Exception as e: |
|
|
return jsonify({"error": str(e)}), 500 |
|
|
CONV.append((text, ans)) |
|
|
push_display("YOU: " + text[:40]) |
|
|
push_display("BOT: " + ans[:40]) |
|
|
return jsonify({"answer": ans}) |
|
|
|
|
|
@app.route("/presence", methods=["POST"]) |
|
|
def api_presence(): |
|
|
data = request.get_json(force=True) |
|
|
note = data.get("note", "Có người tới gần!") |
|
|
greeting_vi = f"Xin chào! {note}" |
|
|
greeting_en = "Hello there! Nice to see you." |
|
|
combined = f"{greeting_vi}\n{greeting_en}" |
|
|
CONV.append(("__presence__", combined)) |
|
|
push_display("RADAR: " + note[:40]) |
|
|
if TELEGRAM_TOKEN: |
|
|
try: |
|
|
send_telegram_message(f"⚠️ Phát hiện có người: {note}") |
|
|
except Exception as e: |
|
|
logger.error("Telegram send failed: %s", e) |
|
|
return jsonify({"greeting": combined}) |
|
|
|
|
|
@app.route("/display", methods=["GET"]) |
|
|
def api_display(): |
|
|
return jsonify({"lines": DISPLAY_LINES, "conv_len": len(CONV)}) |
|
|
|
|
|
|
|
|
@app.route("/") |
|
|
def index(): |
|
|
return render_template_string("<h3>🤖 KC Robot AI V4.1 - Cloud Brain Running</h3><p>Bilingual & Smart Connected to ESP32</p>") |
|
|
|
|
|
|
|
|
def send_telegram_message(text: str): |
|
|
if not TELEGRAM_TOKEN: |
|
|
return |
|
|
chat_id = os.getenv("TELEGRAM_CHATID", "") |
|
|
url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage" |
|
|
requests.post(url, json={"chat_id": chat_id, "text": text}, timeout=10) |
|
|
|
|
|
def telegram_poll_loop(): |
|
|
if not TELEGRAM_TOKEN: return |
|
|
logger.info("Starting Telegram poller...") |
|
|
offset = None |
|
|
base = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}" |
|
|
while True: |
|
|
try: |
|
|
params = {"timeout": 30} |
|
|
if offset: params["offset"] = offset |
|
|
r = requests.get(base + "/getUpdates", params=params, timeout=35) |
|
|
j = r.json() |
|
|
for u in j.get("result", []): |
|
|
offset = u["update_id"] + 1 |
|
|
msg = u.get("message", {}) |
|
|
text = msg.get("text", "") |
|
|
chat_id = msg.get("chat", {}).get("id") |
|
|
if text.lower().startswith("/ask "): |
|
|
q = text[5:].strip() |
|
|
ans = hf_text_generate(q) |
|
|
requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": ans}) |
|
|
elif text.lower().startswith("/say "): |
|
|
t = text[5:].strip() |
|
|
mp3 = hf_tts_get_mp3(t) |
|
|
files = {"audio": ("robot.mp3", mp3, "audio/mpeg")} |
|
|
requests.post(base + "/sendAudio", files=files, data={"chat_id": chat_id}) |
|
|
except Exception as e: |
|
|
logger.error("TG poll error: %s", e) |
|
|
time.sleep(3) |
|
|
|
|
|
def start_background(): |
|
|
if TELEGRAM_TOKEN: |
|
|
threading.Thread(target=telegram_poll_loop, daemon=True).start() |
|
|
|
|
|
@app.before_first_request |
|
|
def _startup(): |
|
|
start_background() |
|
|
|
|
|
if __name__ == "__main__": |
|
|
start_background() |
|
|
logger.info(f"🚀 KC Robot AI V4.1 running on port {PORT}") |
|
|
print("Xin chào chủ nhân! Em là KC Robot — rất vui được gặp bạn.\nHello master! I’m KC Robot, your smart assistant.") |
|
|
app.run(host="0.0.0.0", port=PORT) |
|
|
|