|
|
import json |
|
|
import os |
|
|
import gradio as gr |
|
|
from typing import Optional, Dict, Any, Union |
|
|
from PIL import Image |
|
|
from pydantic import BaseModel |
|
|
import logging |
|
|
from config import Config |
|
|
|
|
|
|
|
|
try: |
|
|
from llama_cpp import Llama, LlamaGrammar |
|
|
LLAMA_CPP_AVAILABLE = True |
|
|
except ImportError as e: |
|
|
print(f"Warning: llama-cpp-python not available: {e}") |
|
|
LLAMA_CPP_AVAILABLE = False |
|
|
Llama = None |
|
|
LlamaGrammar = None |
|
|
|
|
|
|
|
|
try: |
|
|
from huggingface_hub import hf_hub_download |
|
|
HUGGINGFACE_HUB_AVAILABLE = True |
|
|
except ImportError as e: |
|
|
print(f"Warning: huggingface_hub not available: {e}") |
|
|
HUGGINGFACE_HUB_AVAILABLE = False |
|
|
hf_hub_download = None |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class StructuredOutputRequest(BaseModel): |
|
|
prompt: str |
|
|
image: Optional[str] = None |
|
|
json_schema: Dict[str, Any] |
|
|
|
|
|
class LLMClient: |
|
|
def __init__(self): |
|
|
""" |
|
|
Initialize client for working with local GGUF model via llama-cpp-python |
|
|
""" |
|
|
self.model_path = Config.get_model_path() |
|
|
logger.info(f"Using model: {self.model_path}") |
|
|
|
|
|
self.llm = None |
|
|
|
|
|
self._initialize_model() |
|
|
|
|
|
def _download_model_if_needed(self) -> str: |
|
|
"""Download model from Hugging Face if it doesn't exist locally""" |
|
|
if os.path.exists(self.model_path): |
|
|
logger.info(f"Model already exists at: {self.model_path}") |
|
|
return self.model_path |
|
|
|
|
|
|
|
|
|
|
|
if os.getenv('DOCKER_CONTAINER', 'false').lower() == 'true': |
|
|
|
|
|
alternative_paths = [ |
|
|
f"/app/models/{Config.MODEL_FILENAME}", |
|
|
f"./models/{Config.MODEL_FILENAME}", |
|
|
f"/models/{Config.MODEL_FILENAME}", |
|
|
f"/app/{Config.MODEL_FILENAME}" |
|
|
] |
|
|
|
|
|
for alt_path in alternative_paths: |
|
|
if os.path.exists(alt_path): |
|
|
logger.info(f"Found model at alternative location: {alt_path}") |
|
|
return alt_path |
|
|
|
|
|
|
|
|
models_dir = "/app/models" |
|
|
if os.path.exists(models_dir): |
|
|
files = os.listdir(models_dir) |
|
|
logger.error(f"Contents of {models_dir}: {files}") |
|
|
else: |
|
|
logger.error(f"Directory {models_dir} does not exist") |
|
|
|
|
|
|
|
|
logger.warning("Model not found in expected locations, attempting download...") |
|
|
|
|
|
if not HUGGINGFACE_HUB_AVAILABLE: |
|
|
raise ImportError("huggingface_hub is not available. Please install it to download models.") |
|
|
|
|
|
logger.info(f"Downloading model {Config.MODEL_REPO}/{Config.MODEL_FILENAME}...") |
|
|
|
|
|
|
|
|
models_dir = Config.get_models_dir() |
|
|
os.makedirs(models_dir, exist_ok=True) |
|
|
|
|
|
try: |
|
|
|
|
|
model_path = hf_hub_download( |
|
|
repo_id=Config.MODEL_REPO, |
|
|
filename=Config.MODEL_FILENAME, |
|
|
local_dir=models_dir, |
|
|
token=Config.HUGGINGFACE_TOKEN if Config.HUGGINGFACE_TOKEN else None |
|
|
) |
|
|
|
|
|
logger.info(f"Model downloaded to: {model_path}") |
|
|
return model_path |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to download model: {e}") |
|
|
raise |
|
|
|
|
|
def _initialize_model(self): |
|
|
"""Initialize local GGUF model""" |
|
|
try: |
|
|
if not LLAMA_CPP_AVAILABLE: |
|
|
raise ImportError("llama-cpp-python is not available. Please check installation.") |
|
|
|
|
|
logger.info("Loading local model...") |
|
|
|
|
|
|
|
|
model_path = self._download_model_if_needed() |
|
|
|
|
|
|
|
|
if not os.path.exists(model_path): |
|
|
raise FileNotFoundError(f"Model file not found: {model_path}") |
|
|
|
|
|
|
|
|
file_size = os.path.getsize(model_path) |
|
|
if file_size < 1024 * 1024: |
|
|
raise ValueError(f"Model file seems corrupted or incomplete. Size: {file_size} bytes") |
|
|
|
|
|
logger.info(f"Model file verified. Size: {file_size / (1024**3):.2f} GB") |
|
|
|
|
|
|
|
|
logger.info("Initializing Llama model...") |
|
|
self.llm = Llama( |
|
|
model_path=model_path, |
|
|
n_ctx=Config.N_CTX, |
|
|
n_batch=Config.N_BATCH, |
|
|
n_gpu_layers=Config.N_GPU_LAYERS, |
|
|
use_mlock=Config.USE_MLOCK, |
|
|
use_mmap=Config.USE_MMAP, |
|
|
vocab_only=False, |
|
|
f16_kv=Config.F16_KV, |
|
|
logits_all=False, |
|
|
embedding=False, |
|
|
n_threads=Config.N_THREADS, |
|
|
last_n_tokens_size=64, |
|
|
lora_base=None, |
|
|
lora_path=None, |
|
|
seed=Config.SEED, |
|
|
verbose=True |
|
|
) |
|
|
|
|
|
logger.info("Model successfully loaded and initialized") |
|
|
|
|
|
|
|
|
logger.info("Testing model with simple prompt...") |
|
|
test_response = self.llm("Hello", max_tokens=1, temperature=0.1) |
|
|
logger.info("Model test successful") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error initializing model: {e}") |
|
|
|
|
|
if "Failed to load model from file" in str(e): |
|
|
logger.error("This error usually indicates:") |
|
|
logger.error("1. Model file is corrupted or incomplete") |
|
|
logger.error("2. llama-cpp-python version is incompatible with the model") |
|
|
logger.error("3. Insufficient memory to load the model") |
|
|
logger.error(f"4. Model path: {self.model_path}") |
|
|
raise |
|
|
|
|
|
def _validate_json_schema(self, schema: str) -> Dict[str, Any]: |
|
|
"""Validate and parse JSON schema""" |
|
|
try: |
|
|
parsed_schema = json.loads(schema) |
|
|
return parsed_schema |
|
|
except json.JSONDecodeError as e: |
|
|
raise ValueError(f"Invalid JSON schema: {e}") |
|
|
|
|
|
def _format_prompt_with_schema(self, prompt: str, json_schema: Dict[str, Any]) -> str: |
|
|
""" |
|
|
Format prompt for structured output generation |
|
|
""" |
|
|
schema_str = json.dumps(json_schema, ensure_ascii=False, indent=2) |
|
|
|
|
|
formatted_prompt = f"""User: {prompt} |
|
|
|
|
|
Please respond in strict accordance with the following JSON schema: |
|
|
|
|
|
```json |
|
|
{schema_str} |
|
|
``` |
|
|
|
|
|
Return ONLY valid JSON without additional comments or explanations.""" |
|
|
|
|
|
return formatted_prompt |
|
|
|
|
|
def _json_schema_to_gbnf(schema: Dict[str, Any], root_name: str = "root") -> str: |
|
|
"""Convert JSON schema to GBNF (Backus-Naur Form) grammar for structured output""" |
|
|
rules = [] |
|
|
rule_names = set() |
|
|
|
|
|
def add_rule(name: str, definition: str): |
|
|
if name not in rule_names: |
|
|
rules.append(f"{name} ::= {definition}") |
|
|
rule_names.add(name) |
|
|
|
|
|
def process_type(schema_part: Dict[str, Any], type_name: str = "value") -> str: |
|
|
if "type" not in schema_part: |
|
|
|
|
|
return "string" |
|
|
|
|
|
schema_type = schema_part["type"] |
|
|
|
|
|
if schema_type == "object": |
|
|
|
|
|
properties = schema_part.get("properties", {}) |
|
|
required = schema_part.get("required", []) |
|
|
|
|
|
if not properties: |
|
|
add_rule(type_name, '"{" ws "}"') |
|
|
return type_name |
|
|
|
|
|
|
|
|
required_parts = [] |
|
|
optional_parts = [] |
|
|
|
|
|
for prop_name, prop_schema in properties.items(): |
|
|
prop_type_name = f"{type_name}_{prop_name}" |
|
|
prop_type = process_type(prop_schema, prop_type_name) |
|
|
prop_def = f'"\\"" "{prop_name}" "\\"" ws ":" ws {prop_type}' |
|
|
|
|
|
if prop_name in required: |
|
|
required_parts.append(prop_def) |
|
|
else: |
|
|
optional_parts.append(prop_def) |
|
|
|
|
|
|
|
|
if not required_parts and not optional_parts: |
|
|
object_def = '"{" ws "}"' |
|
|
else: |
|
|
|
|
|
|
|
|
if not required_parts: |
|
|
|
|
|
if len(optional_parts) == 1: |
|
|
object_def = f'"{" ws ({optional_parts[0]})? ws "}"' |
|
|
else: |
|
|
comma_separated = ' ws "," ws '.join(optional_parts) |
|
|
object_def = f'"{" ws ({comma_separated})? ws "}"' |
|
|
else: |
|
|
|
|
|
all_parts = required_parts.copy() |
|
|
|
|
|
|
|
|
for opt_part in optional_parts: |
|
|
all_parts.append(f'(ws "," ws {opt_part})?') |
|
|
|
|
|
if len(all_parts) == 1: |
|
|
object_def = f'"{" ws {all_parts[0]} ws "}"' |
|
|
else: |
|
|
|
|
|
required_with_commas = ' ws "," ws '.join(required_parts) |
|
|
optional_with_commas = ' '.join([f'(ws "," ws {opt})?' for opt in optional_parts]) |
|
|
|
|
|
if optional_with_commas: |
|
|
object_def = f'"{{" ws {required_with_commas} {optional_with_commas} ws "}}"' |
|
|
else: |
|
|
object_def = f'"{{" ws {required_with_commas} ws "}}"' |
|
|
|
|
|
add_rule(type_name, object_def) |
|
|
return type_name |
|
|
|
|
|
elif schema_type == "array": |
|
|
|
|
|
items_schema = schema_part.get("items", {}) |
|
|
items_type_name = f"{type_name}_items" |
|
|
item_type = process_type(items_schema, f"{type_name}_item") |
|
|
|
|
|
|
|
|
add_rule(items_type_name, f"{item_type} (ws \",\" ws {item_type})*") |
|
|
add_rule(type_name, f'"[" ws ({items_type_name})? ws "]"') |
|
|
return type_name |
|
|
|
|
|
elif schema_type == "string": |
|
|
|
|
|
if "enum" in schema_part: |
|
|
enum_values = schema_part["enum"] |
|
|
enum_options = ' | '.join([f'"\\"" "{val}" "\\""' for val in enum_values]) |
|
|
add_rule(type_name, enum_options) |
|
|
return type_name |
|
|
else: |
|
|
return "string" |
|
|
|
|
|
elif schema_type == "number" or schema_type == "integer": |
|
|
return "number" |
|
|
|
|
|
elif schema_type == "boolean": |
|
|
return "boolean" |
|
|
|
|
|
else: |
|
|
return "string" |
|
|
|
|
|
|
|
|
process_type(schema, root_name) |
|
|
|
|
|
|
|
|
basic_rules = [ |
|
|
'ws ::= [ \\t\\n]*', |
|
|
'string ::= "\\"" char* "\\""', |
|
|
'char ::= [^"\\\\] | "\\\\" (["\\\\bfnrt] | "u" hex hex hex hex)', |
|
|
'hex ::= [0-9a-fA-F]', |
|
|
'number ::= "-"? ("0" | [1-9] [0-9]*) ("." [0-9]+)? ([eE] [+-]? [0-9]+)?', |
|
|
'boolean ::= "true" | "false"', |
|
|
'null ::= "null"' |
|
|
] |
|
|
|
|
|
|
|
|
for rule in basic_rules: |
|
|
rule_name = rule.split(' ::= ')[0] |
|
|
if rule_name not in rule_names: |
|
|
rules.append(rule) |
|
|
rule_names.add(rule_name) |
|
|
|
|
|
return "\\n".join(rules) |
|
|
|
|
|
def generate_structured_response(self, |
|
|
prompt: str, |
|
|
json_schema: Union[str, Dict[str, Any]], |
|
|
image: Optional[Image.Image] = None, |
|
|
use_grammar: bool = True) -> Dict[str, Any]: |
|
|
""" |
|
|
Generate structured response from local GGUF model |
|
|
""" |
|
|
try: |
|
|
|
|
|
if isinstance(json_schema, str): |
|
|
parsed_schema = self._validate_json_schema(json_schema) |
|
|
else: |
|
|
parsed_schema = json_schema |
|
|
|
|
|
|
|
|
formatted_prompt = self._format_prompt_with_schema(prompt, parsed_schema) |
|
|
|
|
|
|
|
|
if image is not None: |
|
|
logger.warning("Image processing is not supported with this local model") |
|
|
|
|
|
|
|
|
logger.info(f"Generating response... (Grammar: {'Enabled' if use_grammar else 'Disabled'})") |
|
|
|
|
|
|
|
|
grammar = None |
|
|
if use_grammar and LLAMA_CPP_AVAILABLE and LlamaGrammar is not None: |
|
|
try: |
|
|
gbnf_grammar = _json_schema_to_gbnf(parsed_schema, "root") |
|
|
grammar = LlamaGrammar.from_string(gbnf_grammar) |
|
|
logger.info("Grammar successfully created from JSON schema") |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to create grammar: {e}. Falling back to non-grammar mode.") |
|
|
use_grammar = False |
|
|
|
|
|
|
|
|
generation_params = { |
|
|
"max_tokens": Config.MAX_NEW_TOKENS, |
|
|
"temperature": Config.TEMPERATURE, |
|
|
"echo": False |
|
|
} |
|
|
|
|
|
|
|
|
if use_grammar and grammar is not None: |
|
|
generation_params["grammar"] = grammar |
|
|
|
|
|
simple_prompt = f"User: {prompt}\n\nAssistant:" |
|
|
response = self.llm(simple_prompt, **generation_params) |
|
|
else: |
|
|
generation_params["stop"] = ["User:", "\n\n", "Assistant:", "Human:"] |
|
|
response = self.llm(formatted_prompt, **generation_params) |
|
|
|
|
|
|
|
|
generated_text = response['choices'][0]['text'] |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
json_start = generated_text.find('{') |
|
|
json_end = generated_text.rfind('}') + 1 |
|
|
|
|
|
if json_start != -1 and json_end > json_start: |
|
|
json_str = generated_text[json_start:json_end] |
|
|
parsed_response = json.loads(json_str) |
|
|
return { |
|
|
"success": True, |
|
|
"data": parsed_response, |
|
|
"raw_response": generated_text |
|
|
} |
|
|
else: |
|
|
return { |
|
|
"error": "Could not find JSON in model response", |
|
|
"raw_response": generated_text |
|
|
} |
|
|
|
|
|
except json.JSONDecodeError as e: |
|
|
return { |
|
|
"error": f"JSON parsing error: {e}", |
|
|
"raw_response": generated_text |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Unexpected error: {e}") |
|
|
return { |
|
|
"error": f"Generation error: {str(e)}" |
|
|
} |
|
|
|
|
|
def test_grammar_generation(json_schema_str: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Test grammar generation without running the full model |
|
|
""" |
|
|
try: |
|
|
parsed_schema = llm_client._validate_json_schema(json_schema_str) |
|
|
gbnf_grammar = _json_schema_to_gbnf(parsed_schema, "root") |
|
|
return { |
|
|
"success": True, |
|
|
"grammar": gbnf_grammar, |
|
|
"schema": parsed_schema |
|
|
} |
|
|
except Exception as e: |
|
|
return { |
|
|
"success": False, |
|
|
"error": str(e) |
|
|
} |
|
|
|
|
|
|
|
|
logger.info("Initializing LLM client...") |
|
|
try: |
|
|
llm_client = LLMClient() |
|
|
logger.info("LLM client successfully initialized") |
|
|
except Exception as e: |
|
|
logger.error(f"Error initializing LLM client: {e}") |
|
|
llm_client = None |
|
|
|
|
|
def process_request(prompt: str, |
|
|
json_schema: str, |
|
|
image: Optional[Image.Image] = None, |
|
|
use_grammar: bool = True) -> str: |
|
|
""" |
|
|
Process request through Gradio interface |
|
|
""" |
|
|
if llm_client is None: |
|
|
return json.dumps({ |
|
|
"error": "LLM client not initialized", |
|
|
"details": "Check logs for detailed error information" |
|
|
}, ensure_ascii=False, indent=2) |
|
|
|
|
|
if not prompt.strip(): |
|
|
return json.dumps({"error": "Prompt cannot be empty"}, ensure_ascii=False, indent=2) |
|
|
|
|
|
if not json_schema.strip(): |
|
|
return json.dumps({"error": "JSON schema cannot be empty"}, ensure_ascii=False, indent=2) |
|
|
|
|
|
result = llm_client.generate_structured_response(prompt, json_schema, image, use_grammar) |
|
|
return json.dumps(result, ensure_ascii=False, indent=2) |
|
|
|
|
|
|
|
|
example_schema = """{ |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"summary": { |
|
|
"type": "string", |
|
|
"description": "Brief summary of the response" |
|
|
}, |
|
|
"sentiment": { |
|
|
"type": "string", |
|
|
"enum": ["positive", "negative", "neutral"], |
|
|
"description": "Emotional tone" |
|
|
}, |
|
|
"confidence": { |
|
|
"type": "number", |
|
|
"minimum": 0, |
|
|
"maximum": 1, |
|
|
"description": "Confidence level in the response" |
|
|
}, |
|
|
"keywords": { |
|
|
"type": "array", |
|
|
"items": { |
|
|
"type": "string" |
|
|
}, |
|
|
"description": "Key words" |
|
|
} |
|
|
}, |
|
|
"required": ["summary", "sentiment", "confidence"] |
|
|
}""" |
|
|
|
|
|
example_prompt = "Analyze the following text and provide a structured assessment: 'The company's new product received enthusiastic user reviews. Sales exceeded all expectations by 150%.'" |
|
|
|
|
|
def create_gradio_interface(): |
|
|
"""Create Gradio interface""" |
|
|
|
|
|
with gr.Blocks(title="LLM Structured Output", theme=gr.themes.Soft()) as demo: |
|
|
gr.Markdown("# π€ LLM with Structured Output") |
|
|
gr.Markdown(f"Application for generating structured responses using model **{Config.MODEL_REPO}/{Config.MODEL_FILENAME}**") |
|
|
|
|
|
|
|
|
if llm_client is None: |
|
|
gr.Markdown("β οΈ **Warning**: Model not loaded. Check configuration and restart the application.") |
|
|
else: |
|
|
gr.Markdown("β
**Status**: Model successfully loaded and ready to work") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
prompt_input = gr.Textbox( |
|
|
label="Prompt for model", |
|
|
placeholder="Enter your request...", |
|
|
lines=5, |
|
|
value=example_prompt |
|
|
) |
|
|
|
|
|
image_input = gr.Image( |
|
|
label="Image (optional, for multimodal models)", |
|
|
type="pil" |
|
|
) |
|
|
|
|
|
schema_input = gr.Textbox( |
|
|
label="JSON schema for response structure", |
|
|
placeholder="Enter JSON schema...", |
|
|
lines=15, |
|
|
value=example_schema |
|
|
) |
|
|
|
|
|
grammar_checkbox = gr.Checkbox( |
|
|
label="π Use Grammar (GBNF) Mode", |
|
|
value=True, |
|
|
info="Enable grammar-based structured output for more precise JSON generation" |
|
|
) |
|
|
|
|
|
submit_btn = gr.Button("Generate Response", variant="primary") |
|
|
|
|
|
with gr.Column(): |
|
|
output = gr.Textbox( |
|
|
label="Structured Response", |
|
|
lines=20, |
|
|
interactive=False |
|
|
) |
|
|
|
|
|
submit_btn.click( |
|
|
fn=process_request, |
|
|
inputs=[prompt_input, schema_input, image_input, grammar_checkbox], |
|
|
outputs=output |
|
|
) |
|
|
|
|
|
|
|
|
gr.Markdown("## π Usage Examples") |
|
|
|
|
|
examples = gr.Examples( |
|
|
examples=[ |
|
|
[ |
|
|
"Describe today's weather in New York", |
|
|
"""{ |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"temperature": {"type": "number"}, |
|
|
"description": {"type": "string"}, |
|
|
"humidity": {"type": "number"} |
|
|
} |
|
|
}""", |
|
|
None |
|
|
], |
|
|
[ |
|
|
"Create a Python learning plan for one month", |
|
|
"""{ |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"weeks": { |
|
|
"type": "array", |
|
|
"items": { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"week_number": {"type": "integer"}, |
|
|
"topics": {"type": "array", "items": {"type": "string"}}, |
|
|
"practice_hours": {"type": "number"} |
|
|
} |
|
|
} |
|
|
}, |
|
|
"total_hours": {"type": "number"} |
|
|
} |
|
|
}""", |
|
|
None |
|
|
] |
|
|
], |
|
|
inputs=[prompt_input, schema_input, image_input] |
|
|
) |
|
|
|
|
|
|
|
|
gr.Markdown(f""" |
|
|
## βΉοΈ Model Information |
|
|
|
|
|
- **Model**: {Config.MODEL_REPO}/{Config.MODEL_FILENAME} |
|
|
- **Local path**: {Config.MODEL_PATH} |
|
|
- **Context window**: {Config.N_CTX} tokens |
|
|
- **Batch size**: {Config.N_BATCH} |
|
|
- **GPU layers**: {Config.N_GPU_LAYERS if Config.N_GPU_LAYERS >= 0 else "All"} |
|
|
- **CPU threads**: {Config.N_THREADS} |
|
|
- **Maximum response length**: {Config.MAX_NEW_TOKENS} tokens |
|
|
- **Temperature**: {Config.TEMPERATURE} |
|
|
- **Memory lock**: {"Enabled" if Config.USE_MLOCK else "Disabled"} |
|
|
- **Memory mapping**: {"Enabled" if Config.USE_MMAP else "Disabled"} |
|
|
|
|
|
π‘ **Tips**: |
|
|
- Use clear and specific JSON schemas for better results |
|
|
- Enable Grammar (GBNF) mode for more precise JSON structure enforcement |
|
|
- Grammar mode uses schema-based constraints to guarantee valid JSON output |
|
|
- Disable Grammar mode for more flexible text generation with schema guidance |
|
|
|
|
|
π **Grammar Features**: |
|
|
- Automatic conversion of JSON Schema to GBNF grammar |
|
|
- Strict enforcement of JSON structure during generation |
|
|
- Support for objects, arrays, strings, numbers, booleans, and enums |
|
|
- Improved consistency and reliability of structured outputs |
|
|
""") |
|
|
|
|
|
return demo |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
demo = create_gradio_interface() |
|
|
demo.launch( |
|
|
server_name=Config.HOST, |
|
|
server_port=Config.GRADIO_PORT, |
|
|
share=False, |
|
|
debug=True |
|
|
) |
|
|
|