lyangas's picture
move model downloading to dockerfile
f2adbf5
raw
history blame
24 kB
import json
import os
import gradio as gr
from typing import Optional, Dict, Any, Union
from PIL import Image
from pydantic import BaseModel
import logging
from config import Config
# Try to import llama_cpp with fallback
try:
from llama_cpp import Llama, LlamaGrammar
LLAMA_CPP_AVAILABLE = True
except ImportError as e:
print(f"Warning: llama-cpp-python not available: {e}")
LLAMA_CPP_AVAILABLE = False
Llama = None
LlamaGrammar = None
# Try to import huggingface_hub
try:
from huggingface_hub import hf_hub_download
HUGGINGFACE_HUB_AVAILABLE = True
except ImportError as e:
print(f"Warning: huggingface_hub not available: {e}")
HUGGINGFACE_HUB_AVAILABLE = False
hf_hub_download = None
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class StructuredOutputRequest(BaseModel):
prompt: str
image: Optional[str] = None # base64 encoded image
json_schema: Dict[str, Any]
class LLMClient:
def __init__(self):
"""
Initialize client for working with local GGUF model via llama-cpp-python
"""
self.model_path = Config.get_model_path()
logger.info(f"Using model: {self.model_path}")
self.llm = None
self._initialize_model()
def _download_model_if_needed(self) -> str:
"""Download model from Hugging Face if it doesn't exist locally"""
if os.path.exists(self.model_path):
logger.info(f"Model already exists at: {self.model_path}")
return self.model_path
# If model doesn't exist and we're in production (Docker),
# it means the build process failed or model is in wrong location
if os.getenv('DOCKER_CONTAINER', 'false').lower() == 'true':
# Let's check common locations where model might be
alternative_paths = [
f"/app/models/{Config.MODEL_FILENAME}",
f"./models/{Config.MODEL_FILENAME}",
f"/models/{Config.MODEL_FILENAME}",
f"/app/{Config.MODEL_FILENAME}"
]
for alt_path in alternative_paths:
if os.path.exists(alt_path):
logger.info(f"Found model at alternative location: {alt_path}")
return alt_path
# List what's actually in the models directory
models_dir = "/app/models"
if os.path.exists(models_dir):
files = os.listdir(models_dir)
logger.error(f"Contents of {models_dir}: {files}")
else:
logger.error(f"Directory {models_dir} does not exist")
# Try to download as fallback
logger.warning("Model not found in expected locations, attempting download...")
if not HUGGINGFACE_HUB_AVAILABLE:
raise ImportError("huggingface_hub is not available. Please install it to download models.")
logger.info(f"Downloading model {Config.MODEL_REPO}/{Config.MODEL_FILENAME}...")
# Create models directory if it doesn't exist
models_dir = Config.get_models_dir()
os.makedirs(models_dir, exist_ok=True)
try:
# Download model
model_path = hf_hub_download(
repo_id=Config.MODEL_REPO,
filename=Config.MODEL_FILENAME,
local_dir=models_dir,
token=Config.HUGGINGFACE_TOKEN if Config.HUGGINGFACE_TOKEN else None
)
logger.info(f"Model downloaded to: {model_path}")
return model_path
except Exception as e:
logger.error(f"Failed to download model: {e}")
raise
def _initialize_model(self):
"""Initialize local GGUF model"""
try:
if not LLAMA_CPP_AVAILABLE:
raise ImportError("llama-cpp-python is not available. Please check installation.")
logger.info("Loading local model...")
# Download model if needed
model_path = self._download_model_if_needed()
# Verify model file exists and is readable
if not os.path.exists(model_path):
raise FileNotFoundError(f"Model file not found: {model_path}")
# Check file size to ensure it's not corrupted
file_size = os.path.getsize(model_path)
if file_size < 1024 * 1024: # Less than 1MB is suspicious for GGUF model
raise ValueError(f"Model file seems corrupted or incomplete. Size: {file_size} bytes")
logger.info(f"Model file verified. Size: {file_size / (1024**3):.2f} GB")
# Initialize Llama model with enhanced error handling
logger.info("Initializing Llama model...")
self.llm = Llama(
model_path=model_path,
n_ctx=Config.N_CTX,
n_batch=Config.N_BATCH,
n_gpu_layers=Config.N_GPU_LAYERS,
use_mlock=Config.USE_MLOCK,
use_mmap=Config.USE_MMAP,
vocab_only=False,
f16_kv=Config.F16_KV,
logits_all=False,
embedding=False,
n_threads=Config.N_THREADS,
last_n_tokens_size=64,
lora_base=None,
lora_path=None,
seed=Config.SEED,
verbose=True # Enable verbose for debugging
)
logger.info("Model successfully loaded and initialized")
# Test model with a simple prompt to verify it's working
logger.info("Testing model with simple prompt...")
test_response = self.llm("Hello", max_tokens=1, temperature=0.1)
logger.info("Model test successful")
except Exception as e:
logger.error(f"Error initializing model: {e}")
# Provide more specific error information
if "Failed to load model from file" in str(e):
logger.error("This error usually indicates:")
logger.error("1. Model file is corrupted or incomplete")
logger.error("2. llama-cpp-python version is incompatible with the model")
logger.error("3. Insufficient memory to load the model")
logger.error(f"4. Model path: {self.model_path}")
raise
def _validate_json_schema(self, schema: str) -> Dict[str, Any]:
"""Validate and parse JSON schema"""
try:
parsed_schema = json.loads(schema)
return parsed_schema
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON schema: {e}")
def _format_prompt_with_schema(self, prompt: str, json_schema: Dict[str, Any]) -> str:
"""
Format prompt for structured output generation
"""
schema_str = json.dumps(json_schema, ensure_ascii=False, indent=2)
formatted_prompt = f"""User: {prompt}
Please respond in strict accordance with the following JSON schema:
```json
{schema_str}
```
Return ONLY valid JSON without additional comments or explanations."""
return formatted_prompt
def _json_schema_to_gbnf(schema: Dict[str, Any], root_name: str = "root") -> str:
"""Convert JSON schema to GBNF (Backus-Naur Form) grammar for structured output"""
rules = []
rule_names = set() # Track rule names to avoid duplicates
def add_rule(name: str, definition: str):
if name not in rule_names:
rules.append(f"{name} ::= {definition}")
rule_names.add(name)
def process_type(schema_part: Dict[str, Any], type_name: str = "value") -> str:
if "type" not in schema_part:
# Handle anyOf, oneOf, allOf cases - simplified to string for now
return "string"
schema_type = schema_part["type"]
if schema_type == "object":
# Handle object type
properties = schema_part.get("properties", {})
required = schema_part.get("required", [])
if not properties:
add_rule(type_name, '"{" ws "}"')
return type_name
# Separate required and optional parts
required_parts = []
optional_parts = []
for prop_name, prop_schema in properties.items():
prop_type_name = f"{type_name}_{prop_name}"
prop_type = process_type(prop_schema, prop_type_name)
prop_def = f'"\\"" "{prop_name}" "\\"" ws ":" ws {prop_type}'
if prop_name in required:
required_parts.append(prop_def)
else:
optional_parts.append(prop_def)
# Build object structure - simplified approach
if not required_parts and not optional_parts:
object_def = '"{" ws "}"'
else:
# For simplicity, create a fixed structure based on required fields only
# and treat optional fields as always present but with optional values
if not required_parts:
# Only optional fields - make the whole object optional content
if len(optional_parts) == 1:
object_def = f'"{" ws ({optional_parts[0]})? ws "}"'
else:
comma_separated = ' ws "," ws '.join(optional_parts)
object_def = f'"{" ws ({comma_separated})? ws "}"'
else:
# Has required fields
all_parts = required_parts.copy()
# Add optional parts as truly optional (with optional commas)
for opt_part in optional_parts:
all_parts.append(f'(ws "," ws {opt_part})?')
if len(all_parts) == 1:
object_def = f'"{" ws {all_parts[0]} ws "}"'
else:
# Join required parts with commas, optional parts are already with optional commas
required_with_commas = ' ws "," ws '.join(required_parts)
optional_with_commas = ' '.join([f'(ws "," ws {opt})?' for opt in optional_parts])
if optional_with_commas:
object_def = f'"{{" ws {required_with_commas} {optional_with_commas} ws "}}"'
else:
object_def = f'"{{" ws {required_with_commas} ws "}}"'
add_rule(type_name, object_def)
return type_name
elif schema_type == "array":
# Handle array type
items_schema = schema_part.get("items", {})
items_type_name = f"{type_name}_items"
item_type = process_type(items_schema, f"{type_name}_item")
# Create array items rule
add_rule(items_type_name, f"{item_type} (ws \",\" ws {item_type})*")
add_rule(type_name, f'"[" ws ({items_type_name})? ws "]"')
return type_name
elif schema_type == "string":
# Handle string type with enum support
if "enum" in schema_part:
enum_values = schema_part["enum"]
enum_options = ' | '.join([f'"\\"" "{val}" "\\""' for val in enum_values])
add_rule(type_name, enum_options)
return type_name
else:
return "string"
elif schema_type == "number" or schema_type == "integer":
return "number"
elif schema_type == "boolean":
return "boolean"
else:
return "string" # fallback
# Process root schema
process_type(schema, root_name)
# Basic GBNF rules for primitives
basic_rules = [
'ws ::= [ \\t\\n]*',
'string ::= "\\"" char* "\\""',
'char ::= [^"\\\\] | "\\\\" (["\\\\bfnrt] | "u" hex hex hex hex)',
'hex ::= [0-9a-fA-F]',
'number ::= "-"? ("0" | [1-9] [0-9]*) ("." [0-9]+)? ([eE] [+-]? [0-9]+)?',
'boolean ::= "true" | "false"',
'null ::= "null"'
]
# Add basic rules only if they haven't been added yet
for rule in basic_rules:
rule_name = rule.split(' ::= ')[0]
if rule_name not in rule_names:
rules.append(rule)
rule_names.add(rule_name)
return "\\n".join(rules)
def generate_structured_response(self,
prompt: str,
json_schema: Union[str, Dict[str, Any]],
image: Optional[Image.Image] = None,
use_grammar: bool = True) -> Dict[str, Any]:
"""
Generate structured response from local GGUF model
"""
try:
# Validate and parse JSON schema
if isinstance(json_schema, str):
parsed_schema = self._validate_json_schema(json_schema)
else:
parsed_schema = json_schema
# Format prompt
formatted_prompt = self._format_prompt_with_schema(prompt, parsed_schema)
# Warning about images (not supported in this implementation)
if image is not None:
logger.warning("Image processing is not supported with this local model")
# Generate response
logger.info(f"Generating response... (Grammar: {'Enabled' if use_grammar else 'Disabled'})")
# Create grammar if enabled
grammar = None
if use_grammar and LLAMA_CPP_AVAILABLE and LlamaGrammar is not None:
try:
gbnf_grammar = _json_schema_to_gbnf(parsed_schema, "root")
grammar = LlamaGrammar.from_string(gbnf_grammar)
logger.info("Grammar successfully created from JSON schema")
except Exception as e:
logger.warning(f"Failed to create grammar: {e}. Falling back to non-grammar mode.")
use_grammar = False
# Set generation parameters
generation_params = {
"max_tokens": Config.MAX_NEW_TOKENS,
"temperature": Config.TEMPERATURE,
"echo": False
}
# Add grammar or stop tokens based on mode
if use_grammar and grammar is not None:
generation_params["grammar"] = grammar
# For grammar mode, use a simpler prompt without schema explanation
simple_prompt = f"User: {prompt}\n\nAssistant:"
response = self.llm(simple_prompt, **generation_params)
else:
generation_params["stop"] = ["User:", "\n\n", "Assistant:", "Human:"]
response = self.llm(formatted_prompt, **generation_params)
# Extract generated text
generated_text = response['choices'][0]['text']
# Attempt to parse JSON response
try:
# Find JSON in response
json_start = generated_text.find('{')
json_end = generated_text.rfind('}') + 1
if json_start != -1 and json_end > json_start:
json_str = generated_text[json_start:json_end]
parsed_response = json.loads(json_str)
return {
"success": True,
"data": parsed_response,
"raw_response": generated_text
}
else:
return {
"error": "Could not find JSON in model response",
"raw_response": generated_text
}
except json.JSONDecodeError as e:
return {
"error": f"JSON parsing error: {e}",
"raw_response": generated_text
}
except Exception as e:
logger.error(f"Unexpected error: {e}")
return {
"error": f"Generation error: {str(e)}"
}
def test_grammar_generation(json_schema_str: str) -> Dict[str, Any]:
"""
Test grammar generation without running the full model
"""
try:
parsed_schema = llm_client._validate_json_schema(json_schema_str)
gbnf_grammar = _json_schema_to_gbnf(parsed_schema, "root")
return {
"success": True,
"grammar": gbnf_grammar,
"schema": parsed_schema
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
# Initialize client
logger.info("Initializing LLM client...")
try:
llm_client = LLMClient()
logger.info("LLM client successfully initialized")
except Exception as e:
logger.error(f"Error initializing LLM client: {e}")
llm_client = None
def process_request(prompt: str,
json_schema: str,
image: Optional[Image.Image] = None,
use_grammar: bool = True) -> str:
"""
Process request through Gradio interface
"""
if llm_client is None:
return json.dumps({
"error": "LLM client not initialized",
"details": "Check logs for detailed error information"
}, ensure_ascii=False, indent=2)
if not prompt.strip():
return json.dumps({"error": "Prompt cannot be empty"}, ensure_ascii=False, indent=2)
if not json_schema.strip():
return json.dumps({"error": "JSON schema cannot be empty"}, ensure_ascii=False, indent=2)
result = llm_client.generate_structured_response(prompt, json_schema, image, use_grammar)
return json.dumps(result, ensure_ascii=False, indent=2)
# Examples for demonstration
example_schema = """{
"type": "object",
"properties": {
"summary": {
"type": "string",
"description": "Brief summary of the response"
},
"sentiment": {
"type": "string",
"enum": ["positive", "negative", "neutral"],
"description": "Emotional tone"
},
"confidence": {
"type": "number",
"minimum": 0,
"maximum": 1,
"description": "Confidence level in the response"
},
"keywords": {
"type": "array",
"items": {
"type": "string"
},
"description": "Key words"
}
},
"required": ["summary", "sentiment", "confidence"]
}"""
example_prompt = "Analyze the following text and provide a structured assessment: 'The company's new product received enthusiastic user reviews. Sales exceeded all expectations by 150%.'"
def create_gradio_interface():
"""Create Gradio interface"""
with gr.Blocks(title="LLM Structured Output", theme=gr.themes.Soft()) as demo:
gr.Markdown("# πŸ€– LLM with Structured Output")
gr.Markdown(f"Application for generating structured responses using model **{Config.MODEL_REPO}/{Config.MODEL_FILENAME}**")
# Show model status
if llm_client is None:
gr.Markdown("⚠️ **Warning**: Model not loaded. Check configuration and restart the application.")
else:
gr.Markdown("βœ… **Status**: Model successfully loaded and ready to work")
with gr.Row():
with gr.Column():
prompt_input = gr.Textbox(
label="Prompt for model",
placeholder="Enter your request...",
lines=5,
value=example_prompt
)
image_input = gr.Image(
label="Image (optional, for multimodal models)",
type="pil"
)
schema_input = gr.Textbox(
label="JSON schema for response structure",
placeholder="Enter JSON schema...",
lines=15,
value=example_schema
)
grammar_checkbox = gr.Checkbox(
label="πŸ”— Use Grammar (GBNF) Mode",
value=True,
info="Enable grammar-based structured output for more precise JSON generation"
)
submit_btn = gr.Button("Generate Response", variant="primary")
with gr.Column():
output = gr.Textbox(
label="Structured Response",
lines=20,
interactive=False
)
submit_btn.click(
fn=process_request,
inputs=[prompt_input, schema_input, image_input, grammar_checkbox],
outputs=output
)
# Examples
gr.Markdown("## πŸ“‹ Usage Examples")
examples = gr.Examples(
examples=[
[
"Describe today's weather in New York",
"""{
"type": "object",
"properties": {
"temperature": {"type": "number"},
"description": {"type": "string"},
"humidity": {"type": "number"}
}
}""",
None
],
[
"Create a Python learning plan for one month",
"""{
"type": "object",
"properties": {
"weeks": {
"type": "array",
"items": {
"type": "object",
"properties": {
"week_number": {"type": "integer"},
"topics": {"type": "array", "items": {"type": "string"}},
"practice_hours": {"type": "number"}
}
}
},
"total_hours": {"type": "number"}
}
}""",
None
]
],
inputs=[prompt_input, schema_input, image_input]
)
# Model information
gr.Markdown(f"""
## ℹ️ Model Information
- **Model**: {Config.MODEL_REPO}/{Config.MODEL_FILENAME}
- **Local path**: {Config.MODEL_PATH}
- **Context window**: {Config.N_CTX} tokens
- **Batch size**: {Config.N_BATCH}
- **GPU layers**: {Config.N_GPU_LAYERS if Config.N_GPU_LAYERS >= 0 else "All"}
- **CPU threads**: {Config.N_THREADS}
- **Maximum response length**: {Config.MAX_NEW_TOKENS} tokens
- **Temperature**: {Config.TEMPERATURE}
- **Memory lock**: {"Enabled" if Config.USE_MLOCK else "Disabled"}
- **Memory mapping**: {"Enabled" if Config.USE_MMAP else "Disabled"}
πŸ’‘ **Tips**:
- Use clear and specific JSON schemas for better results
- Enable Grammar (GBNF) mode for more precise JSON structure enforcement
- Grammar mode uses schema-based constraints to guarantee valid JSON output
- Disable Grammar mode for more flexible text generation with schema guidance
πŸ”— **Grammar Features**:
- Automatic conversion of JSON Schema to GBNF grammar
- Strict enforcement of JSON structure during generation
- Support for objects, arrays, strings, numbers, booleans, and enums
- Improved consistency and reliability of structured outputs
""")
return demo
if __name__ == "__main__":
# Create and launch Gradio interface
demo = create_gradio_interface()
demo.launch(
server_name=Config.HOST,
server_port=Config.GRADIO_PORT,
share=False,
debug=True
)