model / app.py
Johnnyyyyy56's picture
Update app.py
22a8e71 verified
import os
import csv
import zipfile
import shutil
import re
from datetime import datetime
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0'
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import cv2
import gradio as gr
from deepface import DeepFace
import numpy as np
from PIL import Image
import time
from pathlib import Path
import pandas as pd
# Configuration
EMOTION_MAP = {
"angry": "😠", "disgust": "🤢", "fear": "😨",
"happy": "😄", "sad": "😢", "surprise": "😲",
"neutral": "😐"
}
BACKENDS = ['opencv', 'mtcnn', 'ssd', 'dlib']
SAVE_DIR = Path("/tmp/emotion_results")
SAVE_DIR.mkdir(exist_ok=True)
# Create directories
(SAVE_DIR / "faces").mkdir(exist_ok=True)
(SAVE_DIR / "annotated").mkdir(exist_ok=True)
for emotion in EMOTION_MAP.keys():
(SAVE_DIR / "faces" / emotion).mkdir(exist_ok=True, parents=True)
(SAVE_DIR / "annotated" / emotion).mkdir(exist_ok=True, parents=True)
# Log file setup
LOG_FILE = SAVE_DIR / "emotion_logs.csv"
if not LOG_FILE.exists():
with open(LOG_FILE, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(["timestamp", "batch_no", "emotion", "confidence", "face_path", "annotated_path"])
def log_emotion(batch_no, emotion, confidence, face_path, annotated_path):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(LOG_FILE, 'a', newline='') as f:
writer = csv.writer(f)
writer.writerow([timestamp, batch_no, emotion, confidence, str(face_path), str(annotated_path)])
def validate_batch_no(batch_no):
"""Validate that batch number contains only digits"""
if not batch_no.strip():
return False, "Batch number cannot be empty"
if not re.match(r'^\d+$', batch_no):
return False, "Batch number must contain only numbers"
return True, ""
def predict_emotion(batch_no: str, image):
if not batch_no.strip():
return None, None, "Please enter a batch number first", gr.Image(visible=False), gr.Textbox(visible=False), gr.Button(visible=False)
if image is None:
return None, None, "Please capture your face first", gr.Image(visible=False), gr.Textbox(visible=False), gr.Button(visible=False)
try:
# Convert PIL Image to OpenCV format
frame = np.array(image)
if frame.ndim == 3:
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
# Try different backends for face detection
results = None
for backend in BACKENDS:
try:
results = DeepFace.analyze(
frame,
actions=['emotion'],
detector_backend=backend,
enforce_detection=True,
silent=True
)
break
except Exception:
continue
if not results:
return None, None, "No face detected. Please try again.", gr.Image(visible=False), gr.Textbox(visible=False), gr.Button(visible=False)
# Process the first face found
result = results[0] if isinstance(results, list) else results
emotion = result['dominant_emotion']
confidence = result['emotion'][emotion]
region = result['region']
# Extract face coordinates
x, y, w, h = region['x'], region['y'], region['w'], region['h']
# 1. Save raw face crop
face_crop = frame[y:y+h, x:x+w]
timestamp = int(time.time())
face_dir = SAVE_DIR / "faces" / emotion
face_path = face_dir / f"{batch_no}_{timestamp}.jpg"
cv2.imwrite(str(face_path), face_crop)
# 2. Create and save annotated image
annotated_frame = frame.copy()
cv2.rectangle(annotated_frame, (x, y), (x+w, y+h), (0, 255, 0), 2)
cv2.putText(annotated_frame, f"{emotion} {EMOTION_MAP[emotion]} {confidence:.1f}%",
(x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
annotated_dir = SAVE_DIR / "annotated" / emotion
annotated_path = annotated_dir / f"{batch_no}_{timestamp}.jpg"
cv2.imwrite(str(annotated_path), annotated_frame)
# Log both paths
log_emotion(batch_no, emotion, confidence, face_path, annotated_path)
# Convert back to PIL format for display
output_img = Image.fromarray(cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB))
return output_img, f"Batch {batch_no}: {emotion.title()} ({confidence:.1f}%)", "", gr.Image(visible=True), gr.Textbox(visible=True), gr.Button(visible=True)
except Exception as e:
return None, None, f"Error processing image: {str(e)}", gr.Image(visible=False), gr.Textbox(visible=False), gr.Button(visible=False)
def check_batch_no(batch_no):
"""Check if batch number is entered and valid"""
is_valid, validation_msg = validate_batch_no(batch_no)
if not is_valid:
return (
gr.Textbox(interactive=True), # Keep batch_no interactive
gr.Textbox(value=validation_msg, visible=bool(validation_msg)), # Show validation message
gr.Image(visible=False), # Hide webcam
gr.Image(visible=False), # Hide result image
gr.Textbox(visible=False), # Hide result text
gr.Button(visible=False) # Hide done button
)
# After validation, disable input and show countdown
return (
gr.Textbox(interactive=False), # Disable batch_no
gr.Textbox(value="Processing will start in 5 seconds...", visible=True), # Show countdown
gr.Image(visible=False), # Keep webcam hidden initially
gr.Image(visible=False), # Hide result image
gr.Textbox(visible=False), # Hide result text
gr.Button(visible=False) # Hide done button
)
def activate_webcam(batch_no):
"""Actually activate the webcam after the delay"""
is_valid, _ = validate_batch_no(batch_no)
if not is_valid:
return (
gr.Textbox(interactive=True), # Re-enable batch_no if invalid
gr.Textbox(visible=False), # Hide message
gr.Image(visible=False), # Hide webcam
gr.Image(visible=False), # Hide result image
gr.Textbox(visible=False), # Hide result text
gr.Button(visible=False) # Hide done button
)
return (
gr.Textbox(interactive=False), # Keep batch_no disabled
gr.Textbox(value="Please capture your face now", visible=True), # Show instruction
gr.Image(visible=True), # Show webcam
gr.Image(visible=False), # Hide result image
gr.Textbox(visible=False), # Hide result text
gr.Button(visible=False) # Hide done button
)
def reset_interface():
"""Reset the interface to initial state"""
return (
gr.Textbox(value="", interactive=True), # Enable batch_no
gr.Textbox(value="", visible=False), # Hide message
gr.Image(value=None, visible=False), # Hide webcam
gr.Image(visible=False), # Hide result image
gr.Textbox(visible=False), # Hide result text
gr.Button(visible=False) # Hide done button
)
def get_image_gallery(emotion, image_type):
"""Get image gallery for selected emotion and type"""
if emotion == "All Emotions":
image_dict = {}
for emot in EMOTION_MAP.keys():
folder = SAVE_DIR / image_type / emot
image_dict[emot] = [str(f) for f in folder.glob("*.jpg") if f.exists()]
else:
folder = SAVE_DIR / image_type / emotion
image_dict = {emotion: [str(f) for f in folder.glob("*.jpg") if f.exists()]}
return image_dict
def create_custom_zip(file_paths):
"""Create zip from selected images and return the file path"""
if not file_paths:
return None
temp_dir = SAVE_DIR / "temp_downloads"
temp_dir.mkdir(exist_ok=True)
zip_path = temp_dir / f"emotion_images_{int(time.time())}.zip"
if zip_path.exists():
try:
zip_path.unlink()
except Exception as e:
print(f"Error deleting old zip: {e}")
try:
with zipfile.ZipFile(zip_path, 'w') as zipf:
for file_path in file_paths:
file_path = Path(file_path)
if file_path.exists():
zipf.write(file_path, arcname=file_path.name)
return str(zip_path) if zip_path.exists() else None
except Exception as e:
print(f"Error creating zip file: {e}")
return None
def download_all_emotions_structured():
"""Download all emotions in a structured ZIP with folders for each emotion"""
temp_dir = SAVE_DIR / "temp_downloads"
temp_dir.mkdir(exist_ok=True)
zip_path = temp_dir / f"all_emotions_structured_{int(time.time())}.zip"
if zip_path.exists():
try:
zip_path.unlink()
except Exception as e:
print(f"Error deleting old zip: {e}")
try:
with zipfile.ZipFile(zip_path, 'w') as zipf:
for emotion in EMOTION_MAP.keys():
# Add faces
face_dir = SAVE_DIR / "faces" / emotion
for face_file in face_dir.glob("*.jpg"):
if face_file.exists():
arcname = f"faces/{emotion}/{face_file.name}"
zipf.write(face_file, arcname=arcname)
# Add annotated images
annotated_dir = SAVE_DIR / "annotated" / emotion
for annotated_file in annotated_dir.glob("*.jpg"):
if annotated_file.exists():
arcname = f"annotated/{emotion}/{annotated_file.name}"
zipf.write(annotated_file, arcname=arcname)
return str(zip_path) if zip_path.exists() else None
except Exception as e:
print(f"Error creating structured zip file: {e}")
return None
def delete_selected_images(selected_images):
"""Delete selected images with proper validation"""
if not selected_images:
return "No images selected for deletion"
deleted_count = 0
failed_deletions = []
for img_path in selected_images:
img_path = Path(img_path)
try:
if img_path.exists():
img_path.unlink()
deleted_count += 1
else:
failed_deletions.append(str(img_path))
except Exception as e:
print(f"Error deleting {img_path}: {e}")
failed_deletions.append(str(img_path))
if deleted_count > 0 and LOG_FILE.exists():
try:
df = pd.read_csv(LOG_FILE)
for img_path in selected_images:
img_path = str(img_path)
if "faces" in img_path:
df = df[df.face_path != img_path]
else:
df = df[df.annotated_path != img_path]
df.to_csv(LOG_FILE, index=False)
except Exception as e:
print(f"Error updating logs: {e}")
status_msg = f"Deleted {deleted_count} images"
if failed_deletions:
status_msg += f"\nFailed to delete {len(failed_deletions)} images"
return status_msg
def delete_images_in_category(emotion, image_type, confirm=False):
"""Delete all images in a specific category with confirmation"""
if not confirm:
return "Please check the confirmation box to delete all images in this category"
if emotion == "All Emotions":
deleted_count = 0
for emot in EMOTION_MAP.keys():
deleted_count += delete_images_in_category(emot, image_type, confirm=True)
return f"Deleted {deleted_count} images across all emotion categories"
folder = SAVE_DIR / image_type / emotion
deleted_count = 0
failed_deletions = []
for file in folder.glob("*"):
if file.is_file():
try:
file.unlink()
deleted_count += 1
except Exception as e:
print(f"Error deleting {file}: {e}")
failed_deletions.append(str(file))
if deleted_count > 0 and LOG_FILE.exists():
try:
df = pd.read_csv(LOG_FILE)
if image_type == "faces":
df = df[df.emotion != emotion]
else:
df = df[~((df.emotion == emotion) & (df.annotated_path.str.contains(str(folder))))]
df.to_csv(LOG_FILE, index=False)
except Exception as e:
print(f"Error updating logs: {e}")
status_msg = f"Deleted {deleted_count} images from {emotion}/{image_type}"
if failed_deletions:
status_msg += f"\nFailed to delete {len(failed_deletions)} images"
return status_msg
def get_logs():
if LOG_FILE.exists():
return pd.read_csv(LOG_FILE)
return pd.DataFrame()
def view_logs():
df = get_logs()
if not df.empty:
try:
return df.to_markdown()
except ImportError:
return df.to_string()
return "No logs available yet"
def download_logs():
if LOG_FILE.exists():
temp_dir = SAVE_DIR / "temp_downloads"
temp_dir.mkdir(exist_ok=True)
download_path = temp_dir / "emotion_logs.csv"
shutil.copy2(LOG_FILE, download_path)
return str(download_path)
return None
def clear_all_data():
"""Clear all images and logs"""
deleted_count = 0
for emotion in EMOTION_MAP.keys():
for img_type in ["faces", "annotated"]:
folder = SAVE_DIR / img_type / emotion
for file in folder.glob("*"):
if file.is_file():
try:
file.unlink()
deleted_count += 1
except Exception as e:
print(f"Error deleting {file}: {e}")
temp_dir = SAVE_DIR / "temp_downloads"
if temp_dir.exists():
try:
shutil.rmtree(temp_dir)
except Exception as e:
print(f"Error deleting temp directory: {e}")
if LOG_FILE.exists():
try:
LOG_FILE.unlink()
except Exception as e:
print(f"Error deleting log file: {e}")
try:
with open(LOG_FILE, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(["timestamp", "batch_no", "emotion", "confidence", "face_path", "annotated_path"])
except Exception as e:
print(f"Error recreating log file: {e}")
empty_df = pd.DataFrame(columns=["timestamp", "batch_no", "emotion", "confidence", "face_path", "annotated_path"])
return f"Deleted {deleted_count} items. All data has been cleared.", empty_df, None
# Unified CSS for both interfaces
desktop_css = """
:root {
--spacing: 0.75rem;
--border-radius: 8px;
--shadow: 0 2px 6px rgba(0,0,0,0.1);
--primary-color: #4f46e5;
--danger-color: #ef4444;
--success-color: #10b981;
--panel-bg: #f8f9fa;
}
.gradio-container {
max-width: 1200px !important;
margin: 0 auto;
padding: 1.5rem;
}
h1 {
font-size: 1.8rem !important;
margin-bottom: 1.2rem !important;
}
.message {
color: red;
font-weight: bold;
margin: 0.5rem 0;
padding: 0.5rem;
background: #fff3f3;
border-radius: var(--border-radius);
}
.gallery {
grid-template-columns: repeat(auto-fill, minmax(200px, 1fr)) !important;
gap: var(--spacing);
}
.disabled-input {
background-color: #f0f0f0;
}
.processing {
color: orange;
font-weight: bold;
}
.success {
color: var(--success-color);
font-weight: bold;
}
.tab-nav {
margin-bottom: 1.5rem;
}
.dashboard-panel {
background: white;
padding: 1.5rem;
border-radius: var(--border-radius);
box-shadow: var(--shadow);
margin-bottom: 1.5rem;
}
.input-group, .output-group {
margin-bottom: 1rem;
}
button {
border-radius: var(--border-radius) !important;
padding: 0.6rem 1.2rem !important;
font-size: 0.95rem !important;
transition: all 0.2s ease !important;
}
button:hover {
transform: translateY(-1px);
box-shadow: 0 2px 8px rgba(0,0,0,0.1);
}
button.primary {
background: var(--primary-color) !important;
color: white !important;
}
button.danger {
background: var(--danger-color) !important;
color: white !important;
}
.webcam-container {
width: 100%;
max-width: 800px;
margin: 0 auto;
border-radius: var(--border-radius);
overflow: hidden;
box-shadow: var(--shadow);
}
.result-container {
width: 100%;
max-width: 800px;
margin: 1rem auto;
border-radius: var(--border-radius);
overflow: hidden;
}
.instruction-panel {
background: var(--panel-bg);
padding: 1.2rem;
border-radius: var(--border-radius);
margin-bottom: 1.5rem;
border-left: 4px solid var(--primary-color);
}
.control-row {
display: flex;
gap: 1rem;
align-items: center;
margin-bottom: 1rem;
}
.management-section {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 1.5rem;
margin-top: 1.5rem;
}
.capture-section {
display: grid;
grid-template-columns: 1fr;
gap: 1.5rem;
}
@media (max-width: 992px) {
.management-section, .capture-section {
grid-template-columns: 1fr;
}
.gradio-container {
padding: 1rem;
}
}
@media (max-width: 768px) {
.gallery {
grid-template-columns: repeat(auto-fill, minmax(150px, 1fr)) !important;
}
}
"""
# Capture Interface - Now matches Data Management style
with gr.Blocks(title="Emotion Capture", css=desktop_css) as capture_interface:
with gr.Column(elem_classes="dashboard-panel"):
gr.Markdown("""
# Emotion Capture Interface
""")
with gr.Column(elem_classes="instruction-panel"):
gr.Markdown("""
**Instructions:**
1. Enter/scan your batch number (numbers only)
2. System will automatically proceed after 5 seconds of inactivity
3. Webcam will activate for face capture
4. View your emotion analysis results
5. Click "Done" to reset the interface
""")
with gr.Row(elem_classes="control-row"):
batch_no = gr.Textbox(
label="Batch Number",
placeholder="Enter or scan numbers only",
interactive=True,
scale=4
)
message = gr.Textbox(
label="Status",
interactive=False,
elem_classes="message",
visible=False
)
with gr.Column(elem_classes="capture-section"):
webcam = gr.Image(
sources=["webcam"],
type="pil",
label="Live Camera Feed",
interactive=True,
mirror_webcam=True,
visible=False,
elem_classes="webcam-container",
height=500
)
result_img = gr.Image(
label="Analysis Result",
interactive=False,
visible=False,
elem_classes="result-container",
height=500
)
with gr.Row():
result_text = gr.Textbox(
label="Emotion Result",
interactive=False,
visible=False,
container=False
)
with gr.Row():
done_btn = gr.Button(
"Done",
visible=False,
elem_classes="primary"
)
# Event handlers
batch_no.change(
check_batch_no,
inputs=batch_no,
outputs=[batch_no, message, webcam, result_img, result_text, done_btn],
queue=False
).then(
lambda: time.sleep(5),
None,
None,
queue=False
).then(
activate_webcam,
inputs=batch_no,
outputs=[batch_no, message, webcam, result_img, result_text, done_btn],
queue=False
)
webcam.change(
predict_emotion,
inputs=[batch_no, webcam],
outputs=[result_img, result_text, message, result_img, result_text, done_btn]
)
done_btn.click(
reset_interface,
outputs=[batch_no, message, webcam, result_img, result_text, done_btn]
)
# Data Management Interface
with gr.Blocks(title="Data Management") as data_interface:
with gr.Column():
gr.Markdown("""
# Data Management Dashboard
""")
with gr.Tabs():
with gr.Tab("Image Management", elem_classes="dashboard-panel"):
with gr.Column():
gr.Markdown("### Image Gallery Management")
with gr.Row():
emotion_selector = gr.Dropdown(
choices=["All Emotions"] + list(EMOTION_MAP.keys()),
label="Emotion Category",
value="All Emotions",
scale=3
)
image_type_selector = gr.Dropdown(
choices=["faces", "annotated"],
label="Image Type",
value="faces",
scale=2
)
refresh_btn = gr.Button("Refresh", scale=1)
current_image_paths = gr.State([])
gallery = gr.Gallery(
label="Image Gallery",
columns=5,
height="auto",
preview=True
)
selected_images = gr.CheckboxGroup(
label="Selected Images",
interactive=True,
value=[],
visible=False
)
with gr.Row(elem_classes="management-section"):
with gr.Column():
gr.Markdown("#### Download Options")
with gr.Row():
download_btn = gr.Button("Download Selected", variant="primary")
download_all_btn = gr.Button("Download All in Category")
download_structured_btn = gr.Button("Download Full Archive", variant="primary")
download_output = gr.File(label="Download Result", visible=False)
with gr.Column():
gr.Markdown("#### Delete Options")
delete_btn = gr.Button("Delete Selected", variant="stop")
with gr.Row():
delete_confirm = gr.Checkbox(
label="Confirm deletion of ALL images in this category",
value=False,
scale=4
)
delete_all_btn = gr.Button(
"Delete All in Category",
variant="stop",
interactive=False,
scale=1
)
delete_output = gr.Textbox(label="Operation Status")
with gr.Tab("Emotion Logs", elem_classes="dashboard-panel"):
with gr.Column():
gr.Markdown("### Emotion Analysis Logs")
with gr.Row():
refresh_logs_btn = gr.Button("Refresh Logs")
download_logs_btn = gr.Button("Export Logs", variant="primary")
clear_all_btn = gr.Button("Clear All Data", variant="stop")
logs_display = gr.Markdown()
logs_csv = gr.File(label="Logs Download", visible=False)
clear_message = gr.Textbox(label="Operation Status")
# Event handlers for Data Management
def update_gallery_components(emotion, image_type):
image_dict = get_image_gallery(emotion, image_type)
gallery_items = []
image_paths = []
for emotion, images in image_dict.items():
for img_path in images:
gallery_items.append((img_path, f"{emotion}: {Path(img_path).name}"))
image_paths.append(img_path)
return gallery_items, image_paths
initial_gallery, initial_paths = update_gallery_components("All Emotions", "faces")
gallery.value = initial_gallery
current_image_paths.value = initial_paths
selected_images.choices = initial_paths
def update_components(emotion, image_type):
gallery_items, image_paths = update_gallery_components(emotion, image_type)
return {
gallery: gallery_items,
current_image_paths: image_paths,
selected_images: gr.CheckboxGroup(choices=image_paths, value=[])
}
emotion_selector.change(
update_components,
inputs=[emotion_selector, image_type_selector],
outputs=[gallery, current_image_paths, selected_images]
)
image_type_selector.change(
update_components,
inputs=[emotion_selector, image_type_selector],
outputs=[gallery, current_image_paths, selected_images]
)
refresh_btn.click(
update_components,
inputs=[emotion_selector, image_type_selector],
outputs=[gallery, current_image_paths, selected_images]
)
download_btn.click(
lambda selected: create_custom_zip(selected),
inputs=selected_images,
outputs=download_output,
api_name="download_selected"
).then(
lambda x: gr.File(visible=x is not None),
inputs=download_output,
outputs=download_output
)
download_all_btn.click(
lambda emotion, img_type: create_custom_zip(
[str(f) for f in (SAVE_DIR / img_type / (emotion if emotion != "All Emotions" else "*")).glob("*.jpg") if f.exists()]
),
inputs=[emotion_selector, image_type_selector],
outputs=download_output,
api_name="download_all"
).then(
lambda x: gr.File(visible=x is not None),
inputs=download_output,
outputs=download_output
)
download_structured_btn.click(
download_all_emotions_structured,
outputs=download_output,
api_name="download_all_structured"
).then(
lambda x: gr.File(visible=x is not None),
inputs=download_output,
outputs=download_output
)
delete_btn.click(
lambda selected: {
"delete_output": delete_selected_images(selected),
**update_components(emotion_selector.value, image_type_selector.value)
},
inputs=selected_images,
outputs=[delete_output, gallery, current_image_paths, selected_images]
)
delete_confirm.change(
lambda x: gr.Button(interactive=x),
inputs=delete_confirm,
outputs=delete_all_btn
)
delete_all_btn.click(
lambda emotion, img_type, confirm: {
"delete_output": delete_images_in_category(emotion, img_type, confirm),
**update_components(emotion, img_type)
},
inputs=[emotion_selector, image_type_selector, delete_confirm],
outputs=[delete_output, gallery, current_image_paths, selected_images]
)
refresh_logs_btn.click(
view_logs,
outputs=logs_display
)
download_logs_btn.click(
download_logs,
outputs=logs_csv,
api_name="download_logs"
).then(
lambda x: gr.File(visible=x is not None),
inputs=logs_csv,
outputs=logs_csv
)
clear_all_btn.click(
clear_all_data,
outputs=[clear_message, logs_display, logs_csv]
).then(
lambda: update_components("All Emotions", "faces"),
outputs=[gallery, current_image_paths]
).then(
lambda: gr.CheckboxGroup(choices=[], value=[]),
outputs=selected_images
)
# Combine interfaces
demo = gr.TabbedInterface(
[capture_interface, data_interface],
["Emotion Capture", "Data Management"],
css=desktop_css
)
if __name__ == "__main__":
demo.launch()