Spaces:
Sleeping
Sleeping
| # -*- coding: utf-8 -*- | |
| from phi.agent import Agent | |
| from phi.model.google import Gemini | |
| from phi.tools.duckduckgo import DuckDuckGo | |
| import google.generativeai as genai | |
| from google.generativeai import upload_file, get_file | |
| import os | |
| import numpy as np | |
| import time | |
| import uuid | |
| import yt_dlp | |
| import cv2 | |
| import mediapipe as mp | |
| #========================================================================================================== | |
| # Load a pre-trained face embedding model (OpenCV's FaceNet). This model has better performance than mp embedder | |
| face_embedder = cv2.dnn.readNetFromTorch("nn4.small2.v1.t7") # Download the model from OpenCV's GitHub | |
| # Define embedder with Mediapipe, -- comment off as worse performance for face detection | |
| # Download the model from https://storage.googleapis.com/mediapipe-tasks/image_embedder | |
| BaseOptions = mp.tasks.BaseOptions | |
| ImageEmbedder = mp.tasks.vision.ImageEmbedder | |
| ImageEmbedderOptions = mp.tasks.vision.ImageEmbedderOptions | |
| VisionRunningMode = mp.tasks.vision.RunningMode | |
| options = ImageEmbedderOptions( | |
| base_options=BaseOptions(model_asset_path='mobilenet_v3_small_075_224_embedder.tflite'), | |
| quantize=True, | |
| running_mode=VisionRunningMode.IMAGE) | |
| mp_embedder = ImageEmbedder.create_from_options(options) | |
| #================================================================================================================ | |
| def initialize_agent(): | |
| return Agent( | |
| name="Video AI summarizer", | |
| model=Gemini(id="gemini-2.0-flash-exp"), | |
| tools=[DuckDuckGo()], | |
| show_tool_calls=True, | |
| markdown=True, | |
| ) | |
| # Based on cv2 facenet embedder | |
| def get_face_embedding(face_image): | |
| """ | |
| Generate a face embedding using the pre-trained model. | |
| """ | |
| # Preprocess the face image with cv2 | |
| blob = cv2.dnn.blobFromImage(face_image, 1.0 / 255, (96, 96), (0, 0, 0), swapRB=True, crop=False) | |
| face_embedder.setInput(blob) | |
| embedding = face_embedder.forward() | |
| return embedding.flatten() | |
| # Based on mediapipe embedder | |
| def get_mp_embedding(face_image): | |
| """ | |
| Generate a face embedding using the pre-trained model. | |
| """ | |
| # Load the input image from a numpy array. | |
| mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=np.array(face_image)) | |
| embedding_result = mp_embedder.embed(mp_image) | |
| return embedding_result.embeddings[0] | |
| # Advanced Face Tracking with MediaPipe and Face Embeddings | |
| def face_detection_embed(video_path): | |
| # Initialize MediaPipe Face Detection | |
| mp_face_detection = mp.solutions.face_detection | |
| mp_drawing = mp.solutions.drawing_utils | |
| # Load a pre-trained face embedding model (OpenCV's FaceNet) | |
| #embedder = cv2.dnn.readNetFromTorch("nn4.small2.v1.t7") # Download the model from OpenCV's GitHub, move out from this function | |
| # Open the video file | |
| video_capture = cv2.VideoCapture(video_path) | |
| # Dictionary to store face embeddings and their corresponding IDs, number of matched, normalized images | |
| face_tracker = {} # Format: {face_id: {"embedding": face_embedding, "number_matched": number_matched, "image": normalized_face}} | |
| face_id_counter = 0 | |
| similarity_threshold = 0.5 # Threshold for considering two faces the same | |
| frame_number = 0 | |
| # Define the target size for normalization | |
| target_width = 100 # Desired width for all faces | |
| target_height = 100 # Desired height for all faces | |
| with mp_face_detection.FaceDetection(min_detection_confidence=0.5) as face_detection: | |
| while True: | |
| # Grab a single frame of video | |
| ret, frame = video_capture.read() | |
| if not ret: | |
| break | |
| if frame_number % 30 == 0: | |
| # Convert the frame to RGB for MediaPipe | |
| rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| # Process the frame with MediaPipe Face Detection | |
| results = face_detection.process(rgb_frame) | |
| if results.detections: | |
| for detection in results.detections: | |
| # Get the bounding box of the face | |
| bboxC = detection.location_data.relative_bounding_box | |
| ih, iw, _ = frame.shape | |
| x = int(bboxC.xmin * iw) | |
| y = int(bboxC.ymin * ih) | |
| w = int(bboxC.width * iw) | |
| h = int(bboxC.height * ih) | |
| score = detection.score[0] | |
| # Extract the face region | |
| face_image = frame[y:y+h, x:x+w] | |
| if face_image.size == 0: | |
| continue # Skip empty face regions | |
| #yield face_image # Yield the frame for streaming | |
| # Generate the face embedding | |
| face_embedding = get_face_embedding(face_image) #This model has better performance than mp embedder | |
| #face_embedding = get_mp_embedding(face_image) | |
| # Check if this face matches any previously tracked face, and find face_id with maximum similarity | |
| matched_id = None | |
| max_similarity = 0 | |
| for face_id, data in face_tracker.items(): | |
| # Calculate the cosine similarity between embeddings. This model has better performance than mp embedder | |
| similarity = np.dot(face_embedding, data["embedding"]) / ( | |
| np.linalg.norm(face_embedding) * np.linalg.norm(data["embedding"]) | |
| ) | |
| ''' | |
| # Compute cosine similarity. comment off because of worse performance | |
| similarity = ImageEmbedder.cosine_similarity( | |
| face_embedding, data["embedding"]) | |
| ''' | |
| if similarity > max_similarity: | |
| max_similarity = similarity | |
| max_face_id = face_id | |
| # Define a larger bounding box for output faceface | |
| xb = int(x * 0.8) | |
| yb = int(y * 0.8) | |
| xe = int(x * 1.2 + w) | |
| ye = int(y * 1.2 + h) | |
| if max_similarity > similarity_threshold: | |
| matched_id = max_face_id | |
| number_matched = face_tracker[matched_id]["number_matched"] + 1 | |
| face_tracker[matched_id]["number_matched"] = number_matched | |
| if score > face_tracker[matched_id]["score"]: #switch to higher score image | |
| face_image_b = frame[yb:ye, xb:xe] | |
| normalized_face = cv2.resize(face_image_b, (target_width, target_height)) | |
| face_tracker[matched_id] = {"embedding": face_embedding, "number_matched": number_matched, "image": normalized_face, "score":score} | |
| # If the face is not matched, assign a new ID | |
| if matched_id is None: | |
| face_id_counter += 1 | |
| matched_id = face_id_counter | |
| # Update the face tracker with the new embedding and frame number | |
| face_image_b = frame[yb:ye, xb:xe] | |
| normalized_face = cv2.resize(face_image_b, (target_width, target_height)) | |
| face_tracker[matched_id] = {"embedding": face_embedding, "number_matched": 0, "image": normalized_face, "score":score} | |
| # Draw a larger bounding box and face ID | |
| cv2.rectangle(frame, (xb, yb), (xe, ye), (0, 255, 0), 2) | |
| cv2.putText(frame, f"ID: {matched_id}", (xb, yb - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2) | |
| # Display the resulting frame, for debug purpose | |
| #yield frame # Yield the frame for streaming | |
| #time.sleep(2) #simulate a delay | |
| # Increment frame number | |
| frame_number += 1 | |
| # finished reading video | |
| if len(face_tracker) == 0: | |
| return None | |
| sorted_data = sorted(face_tracker, key=lambda x: face_tracker[x]['number_matched'], reverse =True) | |
| # find top N faces in all detected faces | |
| number_faces = len(face_tracker) | |
| if number_faces >= 3: | |
| center_top1 = [sorted_data[1], sorted_data[0], sorted_data[2]] # Top 1 will take center position | |
| else: | |
| center_top1 = sorted_data | |
| images = [] | |
| contents = [] | |
| for face_id in center_top1: | |
| #yield face_tracker[face_id]["image"] # Yield the frame for streaming | |
| #time.sleep(2) #simulate a delay | |
| face_image = face_tracker[face_id]["image"] | |
| images.append(face_image) | |
| # Release the video capture object | |
| video_capture.release() | |
| cv2.destroyAllWindows() | |
| return images | |
| # Advanced object Tracking with MediaPipe object detection | |
| def object_detection_embed(video_path): | |
| # Initialize MediaPipe Face Detection | |
| BaseOptions = mp.tasks.BaseOptions | |
| ObjectDetector = mp.tasks.vision.ObjectDetector | |
| ObjectDetectorOptions = mp.tasks.vision.ObjectDetectorOptions | |
| VisionRunningMode = mp.tasks.vision.RunningMode | |
| options = ObjectDetectorOptions( | |
| base_options=BaseOptions(model_asset_path='efficientdet_lite0.tflite'), | |
| max_results=3, | |
| score_threshold=0.5, | |
| running_mode=VisionRunningMode.IMAGE, | |
| ) | |
| mp_drawing = mp.solutions.drawing_utils | |
| # Load a pre-trained face embedding model (OpenCV's FaceNet) | |
| #embedder = cv2.dnn.readNetFromTorch("nn4.small2.v1.t7") # Download the model from OpenCV's GitHub, move out from this function | |
| # Open the video file | |
| video_capture = cv2.VideoCapture(video_path) | |
| # Dictionary to store face embeddings and their corresponding IDs, number of matched, normalized images | |
| object_tracker = {} # Format: {object_id: {"embedding": obj_embedding, "number_matched": number_matched, "image": normalized_obj, "score":score, "category": category}} | |
| object_id_counter = 0 | |
| similarity_threshold = 0.5 # Threshold for considering two faces the same | |
| frame_number = 0 | |
| # Define the target size for normalization, only fix height | |
| #target_width = 100 # Desired width for all faces | |
| target_height = 100 # Desired height for all faces | |
| with ObjectDetector.create_from_options(options) as obj_detection: | |
| while True: | |
| # Grab a single frame of video | |
| ret, frame = video_capture.read() | |
| if not ret: | |
| break | |
| if frame_number % 30 == 0: | |
| # Convert the frame to RGB for MediaPipe | |
| rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| # Load the image back into memory because Image object needs filepath input | |
| frame_height, frame_width, _ = rgb_frame.shape | |
| # Load the input image from a numpy array. | |
| mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb_frame) | |
| # Process the frame with MediaPipe Face Detection | |
| results = obj_detection.detect(mp_image) | |
| if results.detections: | |
| for detection in results.detections: | |
| #print("line 297: detection:", detection) | |
| ''' | |
| sample output: | |
| Detection(bounding_box=BoundingBox(origin_x=84, origin_y=168, width=272, height=448), | |
| categories=[Category(index=None, score=0.81640625, display_name=None, category_name='person')], keypoints=[]) | |
| ''' | |
| # Get the bounding box of the face, note x is in height ditection(h) | |
| bboxC = detection.bounding_box | |
| x = int(bboxC.origin_x) | |
| y = int(bboxC.origin_y) | |
| w = int(bboxC.width) | |
| h = int(bboxC.height) | |
| score = detection.categories[0].score | |
| category = detection.categories[0].category_name | |
| # Extract the face region | |
| obj_image = frame[y:y+w, x:x+h] | |
| if obj_image.size == 0: | |
| continue # Skip empty face regions | |
| #yield obj_image # Yield the frame for streaming | |
| # Generate the face embedding | |
| #face_embedding = get_face_embedding(face_image) #This model has better performance than mp embedder | |
| obj_embedding = get_mp_embedding(obj_image) | |
| # Check if this face matches any previously tracked face, and find face_id with maximum similarity | |
| matched_id = None | |
| max_similarity = 0 | |
| for obj_id, data in object_tracker.items(): | |
| ''' | |
| # Calculate the cosine similarity between embeddings. This model has better performance than mp embedder | |
| similarity = np.dot(face_embedding, data["embedding"]) / ( | |
| np.linalg.norm(face_embedding) * np.linalg.norm(data["embedding"]) | |
| ) | |
| ''' | |
| # Compute cosine similarity. comment off because of worse performance | |
| similarity = ImageEmbedder.cosine_similarity( | |
| obj_embedding, data["embedding"]) | |
| if similarity > max_similarity: | |
| max_similarity = similarity | |
| max_obj_id = obj_id | |
| # Define a larger bounding box for output faceface | |
| xb = int(x * 0.8) | |
| yb = int(y * 0.8) | |
| xe = int(x * 1.2 + h) | |
| ye = int(y * 1.2 + w) | |
| scale = target_height / (x * 0.4 + w) | |
| target_width = int((y * 0.4 + w) * scale) | |
| if max_similarity > similarity_threshold: | |
| matched_id = max_obj_id | |
| number_matched = object_tracker[matched_id]["number_matched"] + 1 | |
| object_tracker[matched_id]["number_matched"] = number_matched | |
| if score > object_tracker[matched_id]["score"]: #switch to higher score image | |
| obj_image_b = frame[yb:ye, xb:xe] | |
| normalized_obj = cv2.resize(obj_image_b, (target_width, target_height)) | |
| object_tracker[matched_id] = {"embedding": obj_embedding, "number_matched": number_matched, "image": normalized_obj, "score":score, "category":category} | |
| # If the face is not matched, assign a new ID | |
| if matched_id is None: | |
| object_id_counter += 1 | |
| matched_id = object_id_counter | |
| # Update the face tracker with the new embedding and frame number | |
| obj_image_b = frame[yb:ye, xb:xe] | |
| normalized_obj = cv2.resize(obj_image_b, (target_width, target_height)) | |
| object_tracker[matched_id] = {"embedding": obj_embedding, "number_matched": 0, "image": normalized_obj, "score":score, "category":category} | |
| # Draw a larger bounding box and face ID | |
| #cv2.rectangle(frame, (xb, yb), (xe, ye), (0, 255, 0), 2) | |
| #cv2.putText(frame, f"ID: {matched_id}", (xb, yb - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2) | |
| # Display the resulting frame, for debug purpose | |
| #yield frame # Yield the frame for streaming | |
| #time.sleep(2) #simulate a delay | |
| # Increment frame number | |
| frame_number += 1 | |
| # finished reading video | |
| if len(object_tracker) == 0: | |
| return None | |
| sorted_data = sorted(object_tracker, key=lambda x: object_tracker[x]['number_matched'], reverse =True) | |
| # find top N faces in all detected faces | |
| number_objs = len(object_tracker) | |
| if number_objs >= 3: | |
| center_top1 = [sorted_data[1], sorted_data[0], sorted_data[2]] # Top 1 will take center position | |
| else: | |
| center_top1 = sorted_data | |
| images = [] | |
| contents = [] | |
| #center_top1 = [sorted_data[1], sorted_data[0], sorted_data[2]] # Top 1 will take center position | |
| for obj_id in center_top1: | |
| #yield object_tracker[obj_id]["image"] # Yield the frame for streaming | |
| #time.sleep(2) #simulate a delay | |
| obj_image = object_tracker[obj_id]["image"] | |
| images.append(obj_image) | |
| # Release the video capture object | |
| video_capture.release() | |
| cv2.destroyAllWindows() | |
| return images | |
| #========================================================================================================= | |
| # Summarize video using phi Agent | |
| def summarize_video(video_path, user_prompt, out_lang = 'Original'): | |
| # Upload and process the video | |
| processed_video = upload_file(video_path) | |
| # Extract video info to a dictionary | |
| video_info = str(processed_video).split('File(')[1] | |
| video_info = video_info.replace(")", "") | |
| video_dic = eval(video_info) | |
| print("display_name, sha256_hash:", video_dic['display_name'], video_dic['sha256_hash']) | |
| while processed_video.state.name == "PROCESSING": | |
| time.sleep(1) | |
| processed_video = get_file(processed_video.name) | |
| # detect language | |
| lang_prompt = (f'''Give language name''') | |
| lang_response = multimodal_Agent.run(lang_prompt, videos=[processed_video]).content | |
| language = str(lang_response).split(' ')[-1] | |
| print('Video language is:', language) | |
| if out_lang == 'Original': | |
| out_lang = language | |
| # Analysis prompt | |
| analysis_prompt = ( f''' | |
| First analyze the video and then answer following questions using the video analysis, questions: | |
| {user_prompt} | |
| Provide a comprehensive response focusing on practical, actionable information with original questions. | |
| Answer questions in {out_lang}. limit the total lines to 30 lines.''' | |
| ) | |
| # AI agent processing | |
| response = multimodal_Agent.run(analysis_prompt, videos=[processed_video]) | |
| markdown_text = response.content | |
| return out_lang, str(markdown_text) | |
| #======================================================================================= | |
| # Initialize the agent | |
| multimodal_Agent = initialize_agent() |