Spaces:
Running
Running
| import gradio as gr | |
| import time | |
| from datetime import datetime | |
| import pandas as pd | |
| from sentence_transformers import SentenceTransformer | |
| from qdrant_client import QdrantClient | |
| from qdrant_client.models import Filter, FieldCondition, MatchValue | |
| import os | |
| from rapidfuzz import process, fuzz | |
| from pythainlp.tokenize import word_tokenize | |
| from pyairtable import Table | |
| from pyairtable import Api | |
| import pickle | |
| import re | |
| import unicodedata | |
| # Setup Qdrant Client | |
| qdrant_client = QdrantClient( | |
| url=os.environ.get("Qdrant_url"), | |
| api_key=os.environ.get("Qdrant_api"), | |
| ) | |
| # Airtable Config | |
| AIRTABLE_API_KEY = os.environ.get("airtable_api") | |
| BASE_ID = os.environ.get("airtable_baseid") | |
| TABLE_NAME = "Feedback_search" | |
| api = Api(AIRTABLE_API_KEY) | |
| table = api.table(BASE_ID, TABLE_NAME) | |
| # Load model | |
| model = SentenceTransformer('intfloat/multilingual-e5-small') | |
| collection_name = "product_E5" | |
| # Load whitelist | |
| with open("keyword_whitelist.pkl", "rb") as f: | |
| keyword_whitelist = pickle.load(f) | |
| # Utils | |
| def normalize(text: str) -> str: | |
| text = unicodedata.normalize("NFC", text) | |
| return text.replace("เแ", "แ").replace("เเ", "แ").strip().lower() | |
| def smart_tokenize(text: str) -> list: | |
| tokens = word_tokenize(text.strip(), engine="newmm") | |
| return tokens if tokens and len("".join(tokens)) >= len(text.strip()) * 0.5 else [text.strip()] | |
| def correct_query_merge_phrases(query: str, whitelist, threshold=80, max_ngram=3): | |
| query_norm = normalize(query) | |
| tokens = smart_tokenize(query_norm) | |
| corrected = [] | |
| i = 0 | |
| while i < len(tokens): | |
| matched = False | |
| for n in range(min(max_ngram, len(tokens) - i), 0, -1): | |
| phrase = "".join(tokens[i:i+n]) | |
| match, score, _ = process.extractOne(phrase, whitelist, scorer=fuzz.token_sort_ratio) | |
| if score >= threshold: | |
| corrected.append(match) | |
| i += n | |
| matched = True | |
| break | |
| if not matched: | |
| corrected.append(tokens[i]) | |
| i += 1 | |
| return "".join([word for word in corrected if len(word) > 1 or word in whitelist]) | |
| # Global state | |
| latest_query_result = {"query": "", "result": "", "raw_query": "", "time": ""} | |
| # Main Search | |
| def search_product(query): | |
| start_time = time.time() | |
| latest_query_result["raw_query"] = query | |
| corrected_query = correct_query_merge_phrases(query, keyword_whitelist) | |
| query_embed = model.encode("query: " + corrected_query) | |
| try: | |
| result = qdrant_client.query_points( | |
| collection_name=collection_name, | |
| query=query_embed.tolist(), | |
| with_payload=True, | |
| query_filter=Filter(must=[FieldCondition(key="type", match=MatchValue(value="product"))]), | |
| limit=50 | |
| ).points | |
| except Exception as e: | |
| return f"<p>❌ Qdrant error: {str(e)}</p>" | |
| elapsed = time.time() - start_time | |
| html_output = f"<p>⏱ <strong>{elapsed:.2f} วินาที</strong></p>" | |
| if corrected_query != query: | |
| html_output += f"<p>🔧 แก้คำค้นจาก: <code>{query}</code> → <code>{corrected_query}</code></p>" | |
| html_output += '<div style="display: grid; grid-template-columns: repeat(auto-fill, minmax(220px, 1fr)); gap: 20px;">' | |
| result_summary, found = "", False | |
| for res in result: | |
| if res.score > 0.8: | |
| found = True | |
| name = res.payload.get("name", "ไม่ทราบชื่อสินค้า") | |
| score = f"{res.score:.4f}" | |
| img_url = res.payload.get("imageUrl", "") | |
| price = res.payload.get("price", "ไม่ระบุ") | |
| brand = res.payload.get("brand", "") | |
| html_output += f""" | |
| <div style="border: 1px solid #ddd; border-radius: 8px; padding: 10px; text-align: center; box-shadow: 1px 1px 5px rgba(0,0,0,0.1); background: #fff;"> | |
| <img src="{img_url}" style="width: 100%; max-height: 150px; object-fit: contain; border-radius: 4px;"> | |
| <div style="margin-top: 10px;"> | |
| <div style="font-weight: bold; font-size: 14px;">{name}</div> | |
| <div style="color: gray; font-size: 12px;">{brand}</div> | |
| <div style="color: green; margin: 4px 0;">฿{price}</div> | |
| <div style="font-size: 12px; color: #555;">score: {score}</div> | |
| </div> | |
| </div> | |
| """ | |
| result_summary += f"{name} (score: {score}) | " | |
| html_output += "</div>" | |
| if not found: | |
| html_output += '<div style="text-align: center; font-size: 18px; color: #a00; padding: 30px;">❌ ไม่พบสินค้าที่เกี่ยวข้องกับคำค้นนี้</div>' | |
| return html_output | |
| latest_query_result.update({ | |
| "query": corrected_query, | |
| "result": result_summary.strip(), | |
| "time": elapsed, | |
| }) | |
| return html_output | |
| # Feedback logging | |
| def log_feedback(feedback): | |
| try: | |
| now = datetime.now().strftime("%Y-%m-%d") | |
| table.create({ | |
| "model": "E5 (intfloat/multilingual-e5-small)", | |
| "timestamp": now, | |
| "raw_query": latest_query_result["raw_query"], | |
| "query": latest_query_result["query"], | |
| "result": latest_query_result["result"], | |
| "time(second)": latest_query_result["time"], | |
| "feedback": feedback | |
| }) | |
| return "✅ Feedback saved to Airtable!" | |
| except Exception as e: | |
| return f"❌ Failed to save feedback: {str(e)}" | |
| # Gradio UI | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## 🔎 Product Semantic Search (Vector Search + Qdrant)") | |
| query_input = gr.Textbox(label="พิมพ์คำค้นหา") | |
| result_output = gr.HTML(label="📋 ผลลัพธ์") | |
| with gr.Row(): | |
| match_btn = gr.Button("✅ ตรง") | |
| not_match_btn = gr.Button("❌ ไม่ตรง") | |
| feedback_status = gr.Textbox(label="📬 สถานะ Feedback") | |
| query_input.submit(search_product, inputs=[query_input], outputs=result_output) | |
| match_btn.click(lambda: log_feedback("match"), outputs=feedback_status) | |
| not_match_btn.click(lambda: log_feedback("not_match"), outputs=feedback_status) | |
| # Run | |
| demo.launch(share=True) | |