import os import csv import zipfile import shutil import re from datetime import datetime os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0' os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' import cv2 import gradio as gr from deepface import DeepFace import numpy as np from PIL import Image import time from pathlib import Path import pandas as pd # Configuration EMOTION_MAP = { "angry": "😠", "disgust": "🤢", "fear": "😨", "happy": "😄", "sad": "😢", "surprise": "😲", "neutral": "😐" } BACKENDS = ['opencv', 'mtcnn', 'ssd', 'dlib'] SAVE_DIR = Path("/tmp/emotion_results") SAVE_DIR.mkdir(exist_ok=True) # Create directories (SAVE_DIR / "faces").mkdir(exist_ok=True) (SAVE_DIR / "annotated").mkdir(exist_ok=True) for emotion in EMOTION_MAP.keys(): (SAVE_DIR / "faces" / emotion).mkdir(exist_ok=True, parents=True) (SAVE_DIR / "annotated" / emotion).mkdir(exist_ok=True, parents=True) # Log file setup LOG_FILE = SAVE_DIR / "emotion_logs.csv" if not LOG_FILE.exists(): with open(LOG_FILE, 'w', newline='') as f: writer = csv.writer(f) writer.writerow(["timestamp", "batch_no", "emotion", "confidence", "face_path", "annotated_path"]) def log_emotion(batch_no, emotion, confidence, face_path, annotated_path): timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") with open(LOG_FILE, 'a', newline='') as f: writer = csv.writer(f) writer.writerow([timestamp, batch_no, emotion, confidence, str(face_path), str(annotated_path)]) def validate_batch_no(batch_no): """Validate that batch number contains only digits""" if not batch_no.strip(): return False, "Batch number cannot be empty" if not re.match(r'^\d+$', batch_no): return False, "Batch number must contain only numbers" return True, "" def predict_emotion(batch_no: str, image): if not batch_no.strip(): return None, None, "Please enter a batch number first", gr.Image(visible=False), gr.Textbox(visible=False), gr.Button(visible=False) if image is None: return None, None, "Please capture your face first", gr.Image(visible=False), gr.Textbox(visible=False), gr.Button(visible=False) try: # Convert PIL Image to OpenCV format frame = np.array(image) if frame.ndim == 3: frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) # Try different backends for face detection results = None for backend in BACKENDS: try: results = DeepFace.analyze( frame, actions=['emotion'], detector_backend=backend, enforce_detection=True, silent=True ) break except Exception: continue if not results: return None, None, "No face detected. Please try again.", gr.Image(visible=False), gr.Textbox(visible=False), gr.Button(visible=False) # Process the first face found result = results[0] if isinstance(results, list) else results emotion = result['dominant_emotion'] confidence = result['emotion'][emotion] region = result['region'] # Extract face coordinates x, y, w, h = region['x'], region['y'], region['w'], region['h'] # 1. Save raw face crop face_crop = frame[y:y+h, x:x+w] timestamp = int(time.time()) face_dir = SAVE_DIR / "faces" / emotion face_path = face_dir / f"{batch_no}_{timestamp}.jpg" cv2.imwrite(str(face_path), face_crop) # 2. Create and save annotated image annotated_frame = frame.copy() cv2.rectangle(annotated_frame, (x, y), (x+w, y+h), (0, 255, 0), 2) cv2.putText(annotated_frame, f"{emotion} {EMOTION_MAP[emotion]} {confidence:.1f}%", (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2) annotated_dir = SAVE_DIR / "annotated" / emotion annotated_path = annotated_dir / f"{batch_no}_{timestamp}.jpg" cv2.imwrite(str(annotated_path), annotated_frame) # Log both paths log_emotion(batch_no, emotion, confidence, face_path, annotated_path) # Convert back to PIL format for display output_img = Image.fromarray(cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB)) return output_img, f"Batch {batch_no}: {emotion.title()} ({confidence:.1f}%)", "", gr.Image(visible=True), gr.Textbox(visible=True), gr.Button(visible=True) except Exception as e: return None, None, f"Error processing image: {str(e)}", gr.Image(visible=False), gr.Textbox(visible=False), gr.Button(visible=False) def check_batch_no(batch_no): """Check if batch number is entered and valid""" is_valid, validation_msg = validate_batch_no(batch_no) if not is_valid: return ( gr.Textbox(interactive=True), # Keep batch_no interactive gr.Textbox(value=validation_msg, visible=bool(validation_msg)), # Show validation message gr.Image(visible=False), # Hide webcam gr.Image(visible=False), # Hide result image gr.Textbox(visible=False), # Hide result text gr.Button(visible=False) # Hide done button ) # After validation, disable input and show countdown return ( gr.Textbox(interactive=False), # Disable batch_no gr.Textbox(value="Processing will start in 5 seconds...", visible=True), # Show countdown gr.Image(visible=False), # Keep webcam hidden initially gr.Image(visible=False), # Hide result image gr.Textbox(visible=False), # Hide result text gr.Button(visible=False) # Hide done button ) def activate_webcam(batch_no): """Actually activate the webcam after the delay""" is_valid, _ = validate_batch_no(batch_no) if not is_valid: return ( gr.Textbox(interactive=True), # Re-enable batch_no if invalid gr.Textbox(visible=False), # Hide message gr.Image(visible=False), # Hide webcam gr.Image(visible=False), # Hide result image gr.Textbox(visible=False), # Hide result text gr.Button(visible=False) # Hide done button ) return ( gr.Textbox(interactive=False), # Keep batch_no disabled gr.Textbox(value="Please capture your face now", visible=True), # Show instruction gr.Image(visible=True), # Show webcam gr.Image(visible=False), # Hide result image gr.Textbox(visible=False), # Hide result text gr.Button(visible=False) # Hide done button ) def reset_interface(): """Reset the interface to initial state""" return ( gr.Textbox(value="", interactive=True), # Enable batch_no gr.Textbox(value="", visible=False), # Hide message gr.Image(value=None, visible=False), # Hide webcam gr.Image(visible=False), # Hide result image gr.Textbox(visible=False), # Hide result text gr.Button(visible=False) # Hide done button ) def get_image_gallery(emotion, image_type): """Get image gallery for selected emotion and type""" if emotion == "All Emotions": image_dict = {} for emot in EMOTION_MAP.keys(): folder = SAVE_DIR / image_type / emot image_dict[emot] = [str(f) for f in folder.glob("*.jpg") if f.exists()] else: folder = SAVE_DIR / image_type / emotion image_dict = {emotion: [str(f) for f in folder.glob("*.jpg") if f.exists()]} return image_dict def create_custom_zip(file_paths): """Create zip from selected images and return the file path""" if not file_paths: return None temp_dir = SAVE_DIR / "temp_downloads" temp_dir.mkdir(exist_ok=True) zip_path = temp_dir / f"emotion_images_{int(time.time())}.zip" if zip_path.exists(): try: zip_path.unlink() except Exception as e: print(f"Error deleting old zip: {e}") try: with zipfile.ZipFile(zip_path, 'w') as zipf: for file_path in file_paths: file_path = Path(file_path) if file_path.exists(): zipf.write(file_path, arcname=file_path.name) return str(zip_path) if zip_path.exists() else None except Exception as e: print(f"Error creating zip file: {e}") return None def download_all_emotions_structured(): """Download all emotions in a structured ZIP with folders for each emotion""" temp_dir = SAVE_DIR / "temp_downloads" temp_dir.mkdir(exist_ok=True) zip_path = temp_dir / f"all_emotions_structured_{int(time.time())}.zip" if zip_path.exists(): try: zip_path.unlink() except Exception as e: print(f"Error deleting old zip: {e}") try: with zipfile.ZipFile(zip_path, 'w') as zipf: for emotion in EMOTION_MAP.keys(): # Add faces face_dir = SAVE_DIR / "faces" / emotion for face_file in face_dir.glob("*.jpg"): if face_file.exists(): arcname = f"faces/{emotion}/{face_file.name}" zipf.write(face_file, arcname=arcname) # Add annotated images annotated_dir = SAVE_DIR / "annotated" / emotion for annotated_file in annotated_dir.glob("*.jpg"): if annotated_file.exists(): arcname = f"annotated/{emotion}/{annotated_file.name}" zipf.write(annotated_file, arcname=arcname) return str(zip_path) if zip_path.exists() else None except Exception as e: print(f"Error creating structured zip file: {e}") return None def delete_selected_images(selected_images): """Delete selected images with proper validation""" if not selected_images: return "No images selected for deletion" deleted_count = 0 failed_deletions = [] for img_path in selected_images: img_path = Path(img_path) try: if img_path.exists(): img_path.unlink() deleted_count += 1 else: failed_deletions.append(str(img_path)) except Exception as e: print(f"Error deleting {img_path}: {e}") failed_deletions.append(str(img_path)) if deleted_count > 0 and LOG_FILE.exists(): try: df = pd.read_csv(LOG_FILE) for img_path in selected_images: img_path = str(img_path) if "faces" in img_path: df = df[df.face_path != img_path] else: df = df[df.annotated_path != img_path] df.to_csv(LOG_FILE, index=False) except Exception as e: print(f"Error updating logs: {e}") status_msg = f"Deleted {deleted_count} images" if failed_deletions: status_msg += f"\nFailed to delete {len(failed_deletions)} images" return status_msg def delete_images_in_category(emotion, image_type, confirm=False): """Delete all images in a specific category with confirmation""" if not confirm: return "Please check the confirmation box to delete all images in this category" if emotion == "All Emotions": deleted_count = 0 for emot in EMOTION_MAP.keys(): deleted_count += delete_images_in_category(emot, image_type, confirm=True) return f"Deleted {deleted_count} images across all emotion categories" folder = SAVE_DIR / image_type / emotion deleted_count = 0 failed_deletions = [] for file in folder.glob("*"): if file.is_file(): try: file.unlink() deleted_count += 1 except Exception as e: print(f"Error deleting {file}: {e}") failed_deletions.append(str(file)) if deleted_count > 0 and LOG_FILE.exists(): try: df = pd.read_csv(LOG_FILE) if image_type == "faces": df = df[df.emotion != emotion] else: df = df[~((df.emotion == emotion) & (df.annotated_path.str.contains(str(folder))))] df.to_csv(LOG_FILE, index=False) except Exception as e: print(f"Error updating logs: {e}") status_msg = f"Deleted {deleted_count} images from {emotion}/{image_type}" if failed_deletions: status_msg += f"\nFailed to delete {len(failed_deletions)} images" return status_msg def get_logs(): if LOG_FILE.exists(): return pd.read_csv(LOG_FILE) return pd.DataFrame() def view_logs(): df = get_logs() if not df.empty: try: return df.to_markdown() except ImportError: return df.to_string() return "No logs available yet" def download_logs(): if LOG_FILE.exists(): temp_dir = SAVE_DIR / "temp_downloads" temp_dir.mkdir(exist_ok=True) download_path = temp_dir / "emotion_logs.csv" shutil.copy2(LOG_FILE, download_path) return str(download_path) return None def clear_all_data(): """Clear all images and logs""" deleted_count = 0 for emotion in EMOTION_MAP.keys(): for img_type in ["faces", "annotated"]: folder = SAVE_DIR / img_type / emotion for file in folder.glob("*"): if file.is_file(): try: file.unlink() deleted_count += 1 except Exception as e: print(f"Error deleting {file}: {e}") temp_dir = SAVE_DIR / "temp_downloads" if temp_dir.exists(): try: shutil.rmtree(temp_dir) except Exception as e: print(f"Error deleting temp directory: {e}") if LOG_FILE.exists(): try: LOG_FILE.unlink() except Exception as e: print(f"Error deleting log file: {e}") try: with open(LOG_FILE, 'w', newline='') as f: writer = csv.writer(f) writer.writerow(["timestamp", "batch_no", "emotion", "confidence", "face_path", "annotated_path"]) except Exception as e: print(f"Error recreating log file: {e}") empty_df = pd.DataFrame(columns=["timestamp", "batch_no", "emotion", "confidence", "face_path", "annotated_path"]) return f"Deleted {deleted_count} items. All data has been cleared.", empty_df, None # Unified CSS for both interfaces desktop_css = """ :root { --spacing: 0.75rem; --border-radius: 8px; --shadow: 0 2px 6px rgba(0,0,0,0.1); --primary-color: #4f46e5; --danger-color: #ef4444; --success-color: #10b981; --panel-bg: #f8f9fa; } .gradio-container { max-width: 1200px !important; margin: 0 auto; padding: 1.5rem; } h1 { font-size: 1.8rem !important; margin-bottom: 1.2rem !important; } .message { color: red; font-weight: bold; margin: 0.5rem 0; padding: 0.5rem; background: #fff3f3; border-radius: var(--border-radius); } .gallery { grid-template-columns: repeat(auto-fill, minmax(200px, 1fr)) !important; gap: var(--spacing); } .disabled-input { background-color: #f0f0f0; } .processing { color: orange; font-weight: bold; } .success { color: var(--success-color); font-weight: bold; } .tab-nav { margin-bottom: 1.5rem; } .dashboard-panel { background: white; padding: 1.5rem; border-radius: var(--border-radius); box-shadow: var(--shadow); margin-bottom: 1.5rem; } .input-group, .output-group { margin-bottom: 1rem; } button { border-radius: var(--border-radius) !important; padding: 0.6rem 1.2rem !important; font-size: 0.95rem !important; transition: all 0.2s ease !important; } button:hover { transform: translateY(-1px); box-shadow: 0 2px 8px rgba(0,0,0,0.1); } button.primary { background: var(--primary-color) !important; color: white !important; } button.danger { background: var(--danger-color) !important; color: white !important; } .webcam-container { width: 100%; max-width: 800px; margin: 0 auto; border-radius: var(--border-radius); overflow: hidden; box-shadow: var(--shadow); } .result-container { width: 100%; max-width: 800px; margin: 1rem auto; border-radius: var(--border-radius); overflow: hidden; } .instruction-panel { background: var(--panel-bg); padding: 1.2rem; border-radius: var(--border-radius); margin-bottom: 1.5rem; border-left: 4px solid var(--primary-color); } .control-row { display: flex; gap: 1rem; align-items: center; margin-bottom: 1rem; } .management-section { display: grid; grid-template-columns: 1fr 1fr; gap: 1.5rem; margin-top: 1.5rem; } .capture-section { display: grid; grid-template-columns: 1fr; gap: 1.5rem; } @media (max-width: 992px) { .management-section, .capture-section { grid-template-columns: 1fr; } .gradio-container { padding: 1rem; } } @media (max-width: 768px) { .gallery { grid-template-columns: repeat(auto-fill, minmax(150px, 1fr)) !important; } } """ # Capture Interface - Now matches Data Management style with gr.Blocks(title="Emotion Capture", css=desktop_css) as capture_interface: with gr.Column(elem_classes="dashboard-panel"): gr.Markdown(""" # Emotion Capture Interface """) with gr.Column(elem_classes="instruction-panel"): gr.Markdown(""" **Instructions:** 1. Enter/scan your batch number (numbers only) 2. System will automatically proceed after 5 seconds of inactivity 3. Webcam will activate for face capture 4. View your emotion analysis results 5. Click "Done" to reset the interface """) with gr.Row(elem_classes="control-row"): batch_no = gr.Textbox( label="Batch Number", placeholder="Enter or scan numbers only", interactive=True, scale=4 ) message = gr.Textbox( label="Status", interactive=False, elem_classes="message", visible=False ) with gr.Column(elem_classes="capture-section"): webcam = gr.Image( sources=["webcam"], type="pil", label="Live Camera Feed", interactive=True, mirror_webcam=True, visible=False, elem_classes="webcam-container", height=500 ) result_img = gr.Image( label="Analysis Result", interactive=False, visible=False, elem_classes="result-container", height=500 ) with gr.Row(): result_text = gr.Textbox( label="Emotion Result", interactive=False, visible=False, container=False ) with gr.Row(): done_btn = gr.Button( "Done", visible=False, elem_classes="primary" ) # Event handlers batch_no.change( check_batch_no, inputs=batch_no, outputs=[batch_no, message, webcam, result_img, result_text, done_btn], queue=False ).then( lambda: time.sleep(5), None, None, queue=False ).then( activate_webcam, inputs=batch_no, outputs=[batch_no, message, webcam, result_img, result_text, done_btn], queue=False ) webcam.change( predict_emotion, inputs=[batch_no, webcam], outputs=[result_img, result_text, message, result_img, result_text, done_btn] ) done_btn.click( reset_interface, outputs=[batch_no, message, webcam, result_img, result_text, done_btn] ) # Data Management Interface with gr.Blocks(title="Data Management") as data_interface: with gr.Column(): gr.Markdown(""" # Data Management Dashboard """) with gr.Tabs(): with gr.Tab("Image Management", elem_classes="dashboard-panel"): with gr.Column(): gr.Markdown("### Image Gallery Management") with gr.Row(): emotion_selector = gr.Dropdown( choices=["All Emotions"] + list(EMOTION_MAP.keys()), label="Emotion Category", value="All Emotions", scale=3 ) image_type_selector = gr.Dropdown( choices=["faces", "annotated"], label="Image Type", value="faces", scale=2 ) refresh_btn = gr.Button("Refresh", scale=1) current_image_paths = gr.State([]) gallery = gr.Gallery( label="Image Gallery", columns=5, height="auto", preview=True ) selected_images = gr.CheckboxGroup( label="Selected Images", interactive=True, value=[], visible=False ) with gr.Row(elem_classes="management-section"): with gr.Column(): gr.Markdown("#### Download Options") with gr.Row(): download_btn = gr.Button("Download Selected", variant="primary") download_all_btn = gr.Button("Download All in Category") download_structured_btn = gr.Button("Download Full Archive", variant="primary") download_output = gr.File(label="Download Result", visible=False) with gr.Column(): gr.Markdown("#### Delete Options") delete_btn = gr.Button("Delete Selected", variant="stop") with gr.Row(): delete_confirm = gr.Checkbox( label="Confirm deletion of ALL images in this category", value=False, scale=4 ) delete_all_btn = gr.Button( "Delete All in Category", variant="stop", interactive=False, scale=1 ) delete_output = gr.Textbox(label="Operation Status") with gr.Tab("Emotion Logs", elem_classes="dashboard-panel"): with gr.Column(): gr.Markdown("### Emotion Analysis Logs") with gr.Row(): refresh_logs_btn = gr.Button("Refresh Logs") download_logs_btn = gr.Button("Export Logs", variant="primary") clear_all_btn = gr.Button("Clear All Data", variant="stop") logs_display = gr.Markdown() logs_csv = gr.File(label="Logs Download", visible=False) clear_message = gr.Textbox(label="Operation Status") # Event handlers for Data Management def update_gallery_components(emotion, image_type): image_dict = get_image_gallery(emotion, image_type) gallery_items = [] image_paths = [] for emotion, images in image_dict.items(): for img_path in images: gallery_items.append((img_path, f"{emotion}: {Path(img_path).name}")) image_paths.append(img_path) return gallery_items, image_paths initial_gallery, initial_paths = update_gallery_components("All Emotions", "faces") gallery.value = initial_gallery current_image_paths.value = initial_paths selected_images.choices = initial_paths def update_components(emotion, image_type): gallery_items, image_paths = update_gallery_components(emotion, image_type) return { gallery: gallery_items, current_image_paths: image_paths, selected_images: gr.CheckboxGroup(choices=image_paths, value=[]) } emotion_selector.change( update_components, inputs=[emotion_selector, image_type_selector], outputs=[gallery, current_image_paths, selected_images] ) image_type_selector.change( update_components, inputs=[emotion_selector, image_type_selector], outputs=[gallery, current_image_paths, selected_images] ) refresh_btn.click( update_components, inputs=[emotion_selector, image_type_selector], outputs=[gallery, current_image_paths, selected_images] ) download_btn.click( lambda selected: create_custom_zip(selected), inputs=selected_images, outputs=download_output, api_name="download_selected" ).then( lambda x: gr.File(visible=x is not None), inputs=download_output, outputs=download_output ) download_all_btn.click( lambda emotion, img_type: create_custom_zip( [str(f) for f in (SAVE_DIR / img_type / (emotion if emotion != "All Emotions" else "*")).glob("*.jpg") if f.exists()] ), inputs=[emotion_selector, image_type_selector], outputs=download_output, api_name="download_all" ).then( lambda x: gr.File(visible=x is not None), inputs=download_output, outputs=download_output ) download_structured_btn.click( download_all_emotions_structured, outputs=download_output, api_name="download_all_structured" ).then( lambda x: gr.File(visible=x is not None), inputs=download_output, outputs=download_output ) delete_btn.click( lambda selected: { "delete_output": delete_selected_images(selected), **update_components(emotion_selector.value, image_type_selector.value) }, inputs=selected_images, outputs=[delete_output, gallery, current_image_paths, selected_images] ) delete_confirm.change( lambda x: gr.Button(interactive=x), inputs=delete_confirm, outputs=delete_all_btn ) delete_all_btn.click( lambda emotion, img_type, confirm: { "delete_output": delete_images_in_category(emotion, img_type, confirm), **update_components(emotion, img_type) }, inputs=[emotion_selector, image_type_selector, delete_confirm], outputs=[delete_output, gallery, current_image_paths, selected_images] ) refresh_logs_btn.click( view_logs, outputs=logs_display ) download_logs_btn.click( download_logs, outputs=logs_csv, api_name="download_logs" ).then( lambda x: gr.File(visible=x is not None), inputs=logs_csv, outputs=logs_csv ) clear_all_btn.click( clear_all_data, outputs=[clear_message, logs_display, logs_csv] ).then( lambda: update_components("All Emotions", "faces"), outputs=[gallery, current_image_paths] ).then( lambda: gr.CheckboxGroup(choices=[], value=[]), outputs=selected_images ) # Combine interfaces demo = gr.TabbedInterface( [capture_interface, data_interface], ["Emotion Capture", "Data Management"], css=desktop_css ) if __name__ == "__main__": demo.launch()