kcrobot / app.py
kcrobot25's picture
Update
53a78ff verified
# KC Robot AI V4.1 – Cloud Brain Intelligent Assistant
# Flask server: Chat (HF), TTS, STT, Telegram poller, REST API cho ESP32
# Features: bilingual greetings, radar detect, OLED lines, TTS/STT, Telegram, HuggingFace brain
import os, io, time, json, threading, logging, requests
from typing import Optional, List, Tuple
from flask import Flask, request, jsonify, send_file, render_template_string
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("kcrobot.v4.1")
app = Flask(__name__)
# ====== Config from env / Secrets ======
HF_API_TOKEN = os.getenv("HF_API_TOKEN", "")
HF_MODEL = os.getenv("HF_MODEL", "google/flan-t5-large")
HF_TTS_MODEL = os.getenv("HF_TTS_MODEL", "facebook/tts_transformer-en-ljspeech")
HF_STT_MODEL = os.getenv("HF_STT_MODEL", "openai/whisper-small")
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN", "")
PORT = int(os.getenv("PORT", 7860))
HF_HEADERS = {"Authorization": f"Bearer {HF_API_TOKEN}"} if HF_API_TOKEN else {}
if not HF_API_TOKEN:
logger.warning("⚠️ HF_API_TOKEN not set. Put HF_API_TOKEN in Secrets.")
# ====== In-memory storage ======
CONV: List[Tuple[str, str]] = []
DISPLAY_LINES: List[str] = []
def push_display(line: str, limit=6):
global DISPLAY_LINES
DISPLAY_LINES.append(line)
if len(DISPLAY_LINES) > limit:
DISPLAY_LINES = DISPLAY_LINES[-limit:]
# ====== Hugging Face API helpers ======
def hf_text_generate(prompt: str, model: Optional[str] = None, max_new_tokens: int = 256) -> str:
model = model or HF_MODEL
url = f"https://api-inference.huggingface.co/models/{model}"
payload = {
"inputs": prompt,
"parameters": {"max_new_tokens": max_new_tokens, "temperature": 0.7},
"options": {"wait_for_model": True}
}
r = requests.post(url, headers=HF_HEADERS, json=payload, timeout=120)
if r.status_code != 200:
raise RuntimeError(f"HF text generation failed: {r.status_code}: {r.text}")
data = r.json()
if isinstance(data, list) and len(data) and isinstance(data[0], dict):
return data[0].get("generated_text", "")
if isinstance(data, dict) and "generated_text" in data:
return data["generated_text"]
return str(data)
def hf_tts_get_mp3(text: str, model: Optional[str] = None) -> bytes:
model = model or HF_TTS_MODEL
url = f"https://api-inference.huggingface.co/models/{model}"
payload = {"inputs": text}
headers = dict(HF_HEADERS)
headers["Content-Type"] = "application/json"
r = requests.post(url, headers=headers, json=payload, stream=True, timeout=120)
if r.status_code != 200:
raise RuntimeError(f"HF TTS failed: {r.status_code}: {r.text}")
return r.content
def hf_stt_from_bytes(audio_bytes: bytes, model: Optional[str] = None) -> str:
model = model or HF_STT_MODEL
url = f"https://api-inference.huggingface.co/models/{model}"
headers = dict(HF_HEADERS)
headers["Content-Type"] = "application/octet-stream"
r = requests.post(url, headers=headers, data=audio_bytes, timeout=180)
if r.status_code != 200:
raise RuntimeError(f"HF STT failed: {r.status_code}: {r.text}")
j = r.json()
return j.get("text", str(j))
# ====== API cho ESP32 ======
@app.route("/ask", methods=["POST"])
def api_ask():
data = request.get_json(force=True)
text = data.get("text", "").strip()
lang = data.get("lang", "auto")
if not text:
return jsonify({"error": "no text"}), 400
if lang == "vi":
prompt = "Bạn là trợ lý thông minh, trả lời bằng tiếng Việt:\n" + text
elif lang == "en":
prompt = "You are a helpful assistant. Answer in English:\n" + text
else:
prompt = "Bạn là trợ lý song ngữ Việt-Anh, trả lời theo ngôn ngữ người dùng:\n" + text
try:
ans = hf_text_generate(prompt)
except Exception as e:
return jsonify({"error": str(e)}), 500
CONV.append((text, ans))
push_display("YOU: " + text[:40])
push_display("BOT: " + ans[:40])
return jsonify({"answer": ans})
@app.route("/presence", methods=["POST"])
def api_presence():
data = request.get_json(force=True)
note = data.get("note", "Có người tới gần!")
greeting_vi = f"Xin chào! {note}"
greeting_en = "Hello there! Nice to see you."
combined = f"{greeting_vi}\n{greeting_en}"
CONV.append(("__presence__", combined))
push_display("RADAR: " + note[:40])
if TELEGRAM_TOKEN:
try:
send_telegram_message(f"⚠️ Phát hiện có người: {note}")
except Exception as e:
logger.error("Telegram send failed: %s", e)
return jsonify({"greeting": combined})
@app.route("/display", methods=["GET"])
def api_display():
return jsonify({"lines": DISPLAY_LINES, "conv_len": len(CONV)})
# ====== Web UI (simple) ======
@app.route("/")
def index():
return render_template_string("<h3>🤖 KC Robot AI V4.1 - Cloud Brain Running</h3><p>Bilingual & Smart Connected to ESP32</p>")
# ====== Telegram ======
def send_telegram_message(text: str):
if not TELEGRAM_TOKEN:
return
chat_id = os.getenv("TELEGRAM_CHATID", "")
url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage"
requests.post(url, json={"chat_id": chat_id, "text": text}, timeout=10)
def telegram_poll_loop():
if not TELEGRAM_TOKEN: return
logger.info("Starting Telegram poller...")
offset = None
base = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}"
while True:
try:
params = {"timeout": 30}
if offset: params["offset"] = offset
r = requests.get(base + "/getUpdates", params=params, timeout=35)
j = r.json()
for u in j.get("result", []):
offset = u["update_id"] + 1
msg = u.get("message", {})
text = msg.get("text", "")
chat_id = msg.get("chat", {}).get("id")
if text.lower().startswith("/ask "):
q = text[5:].strip()
ans = hf_text_generate(q)
requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": ans})
elif text.lower().startswith("/say "):
t = text[5:].strip()
mp3 = hf_tts_get_mp3(t)
files = {"audio": ("robot.mp3", mp3, "audio/mpeg")}
requests.post(base + "/sendAudio", files=files, data={"chat_id": chat_id})
except Exception as e:
logger.error("TG poll error: %s", e)
time.sleep(3)
def start_background():
if TELEGRAM_TOKEN:
threading.Thread(target=telegram_poll_loop, daemon=True).start()
@app.before_first_request
def _startup():
start_background()
if __name__ == "__main__":
start_background()
logger.info(f"🚀 KC Robot AI V4.1 running on port {PORT}")
print("Xin chào chủ nhân! Em là KC Robot — rất vui được gặp bạn.\nHello master! I’m KC Robot, your smart assistant.")
app.run(host="0.0.0.0", port=PORT)