qayyumu
updated gemini live
d402940
import asyncio
import base64
import datetime as dt
from datetime import datetime, timezone
import json
import os
import pathlib
import threading
import uuid
import wave
from typing import AsyncGenerator, Literal
import gradio as gr
import numpy as np
from dotenv import load_dotenv
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from fastrtc import (
AsyncStreamHandler,
get_cloudflare_turn_credentials_async,
wait_for_item,
)
from google import genai
from google.genai.types import (
LiveConnectConfig,
PrebuiltVoiceConfig,
SpeechConfig,
VoiceConfig,
)
from gradio.utils import get_space
from pydantic import BaseModel
from fastrtc import Stream
from interview_agents import run_interview_crew, run_evaluation_crew
# --- MONKEYPATCH FOR HF SPACES GRADIO COMPATIBILITY ---
# Fixes: TypeError: BlockContext.__init__() got an unexpected keyword argument 'css' / 'theme'
original_blocks_init = gr.Blocks.__init__
def patched_blocks_init(self, *args, **kwargs):
import inspect
sig = inspect.signature(original_blocks_init)
# Remove unsupported arguments for HF Spaces Gradio version
problem_args = ["css", "theme"]
for arg_name in problem_args:
if arg_name in kwargs and arg_name not in sig.parameters:
print(f"WARNING: '{arg_name}' argument removed from gr.Blocks call for HF Spaces compatibility.")
kwargs.pop(arg_name)
original_blocks_init(self, *args, **kwargs)
gr.Blocks.__init__ = patched_blocks_init
# ---------------------------------------------
# --- UTILS (Inline for single-file convenience) ---
def extract_text_from_file(file_obj):
"""Helper to extract text from PDF or TXT files."""
if not file_obj:
return ""
try:
if file_obj.name.endswith('.pdf'):
# Basic PDF extraction (requires pypdf)
try:
from pypdf import PdfReader
reader = PdfReader(file_obj.name)
text = ""
for page in reader.pages:
text += page.extract_text()
return text
except ImportError:
return "[Error: pypdf not installed. Please pip install pypdf for PDF support]"
else:
# Text file
with open(file_obj.name, 'r', encoding='utf-8') as f:
return f.read()
except Exception as e:
return f"Error reading file: {e}"
current_dir = pathlib.Path(__file__).parent
recordings_root = current_dir / "recordings"
recordings_root.mkdir(exist_ok=True)
LATEST_TRANSCRIPT_DATA = {
"text": "Transcript will appear here after you stop the call and press Generate Transcript.",
"session_dir": None,
"timestamp": None,
}
LATEST_TRANSCRIPT_LOCK = threading.Lock()
LATEST_RECORDING_INFO = {
"session_dir": None,
"combined": None,
"api_key": None,
}
LATEST_RECORDING_LOCK = threading.Lock()
def _set_latest_transcript(text: str, session_dir: pathlib.Path | None) -> None:
with LATEST_TRANSCRIPT_LOCK:
LATEST_TRANSCRIPT_DATA["text"] = text
LATEST_TRANSCRIPT_DATA["session_dir"] = str(session_dir) if session_dir else None
LATEST_TRANSCRIPT_DATA["timestamp"] = datetime.now(timezone.utc).isoformat()
def _set_latest_recording(
session_dir: pathlib.Path | None,
combined: pathlib.Path | None,
api_key: str | None,
) -> None:
with LATEST_RECORDING_LOCK:
LATEST_RECORDING_INFO["session_dir"] = str(session_dir) if session_dir else None
LATEST_RECORDING_INFO["combined"] = str(combined) if combined else None
LATEST_RECORDING_INFO["api_key"] = api_key
def _get_latest_recording() -> dict[str, str | None]:
with LATEST_RECORDING_LOCK:
return LATEST_RECORDING_INFO.copy()
class InputData(BaseModel):
webrtc_id: str
voice_name: str
api_key: str
def _transcribe_with_client(
client: genai.Client,
audio_path: pathlib.Path | None,
prompt: str,
) -> str:
if not audio_path:
return ""
try:
audio_bytes = audio_path.read_bytes()
except (FileNotFoundError, OSError):
return ""
if not audio_bytes:
return ""
inline_data = {
"mime_type": "audio/wav",
"data": base64.b64encode(audio_bytes).decode("utf-8"),
}
try:
response = client.models.generate_content(
model="gemini-2.5-flash",
contents=[
{
"role": "user",
"parts": [
{"text": prompt},
{"inline_data": inline_data},
],
}
],
)
return response.text or ""
except Exception as transcribe_err:
print(f"Transcription error: {transcribe_err}")
return ""
def generate_transcript_from_recording() -> str:
info = _get_latest_recording()
combined_path_str = info.get("combined")
session_dir_str = info.get("session_dir")
if os.path.exists(".env"):
load_dotenv()
api_key = info.get("api_key") or os.getenv("API_KEY")
session_dir = pathlib.Path(session_dir_str) if session_dir_str else None
if not combined_path_str:
message = "⚠️ No completed recording found. Stop the call first."
_set_latest_transcript(message, session_dir)
return message
if not api_key:
message = "⚠️ Missing API key for transcription."
_set_latest_transcript(message, session_dir)
return message
combined_path = pathlib.Path(combined_path_str)
if not combined_path.exists():
message = "⚠️ Session recording not found yet. Wait a moment and try again."
_set_latest_transcript(message, session_dir)
return message
client = genai.Client(api_key=api_key)
combined_prompt = (
"You are transcribing a stereo interview recording. "
"Channel 1 contains the candidate's microphone audio, "
"Channel 2 contains the AI interviewer. "
"Produce a clean diarized transcript with speaker labels."
)
combined_text = _transcribe_with_client(client, combined_path, combined_prompt)
if combined_text:
final_text = combined_text.strip()
else:
final_text = "Transcript could not be generated from the recording."
if session_dir:
transcript_path = session_dir / "transcript.md"
try:
transcript_path.write_text(final_text, encoding="utf-8")
except Exception as file_err:
print(f"Error writing transcript: {file_err}")
_set_latest_transcript(final_text, session_dir)
return final_text
# load_dotenv()
def encode_audio(data: np.ndarray) -> str:
"""Encode Audio data to send to the server"""
return base64.b64encode(data.tobytes()).decode("UTF-8")
class GeminiHandler(AsyncStreamHandler):
"""Handler for the Gemini API"""
def __init__(
self,
expected_layout: Literal["mono"] = "mono",
output_sample_rate: int = 24000,
system_instruction: str = "You are a helpful interviewer.",
) -> None:
super().__init__(
expected_layout,
output_sample_rate,
input_sample_rate=16000,
)
self.input_queue: asyncio.Queue = asyncio.Queue()
self.output_queue: asyncio.Queue = asyncio.Queue()
self.quit: asyncio.Event = asyncio.Event()
self.system_instruction = system_instruction
# Recording state (only combined stereo)
self.session_dir: pathlib.Path | None = None
self.combined_wave: wave.Wave_write | None = None
self.combined_recording_path: pathlib.Path | None = None
self.session_id: str | None = None
self.session_api_key: str | None = None
def copy(self) -> "GeminiHandler":
return GeminiHandler(
expected_layout="mono",
output_sample_rate=self.output_sample_rate,
system_instruction=self.system_instruction,
)
async def start_up(self):
# Wait for API key and voice from client (sent via /input_hook)
# Fallback to environment variable
if os.path.exists(".env"):
load_dotenv()
api_key = os.getenv("API_KEY") or os.getenv("GEMINI_API_KEY")
if not self.phone_mode:
# await self.wait_for_args()
api_key, voice_name = api_key, "Puck" #self.latest_args[1:]
else:
api_key, voice_name = api_key, "Puck"
if not api_key:
print("❌ ERROR: No API Key provided")
return
print(f"βœ… Starting Gemini session with voice: {voice_name}")
self.session_api_key = api_key
# Start recording session
self._start_recording_session()
client = genai.Client(
api_key=api_key,
http_options={"api_version": "v1alpha"},
)
config = LiveConnectConfig(
response_modalities=["AUDIO"], # type: ignore
system_instruction=self.system_instruction,
speech_config=SpeechConfig(
voice_config=VoiceConfig(
prebuilt_voice_config=PrebuiltVoiceConfig(
voice_name=voice_name,
)
)
),
)
try:
async with client.aio.live.connect(
model="gemini-2.5-flash-native-audio-preview-09-2025", config=config
) as session:
async for audio in session.start_stream(
stream=self.stream(), mime_type="audio/pcm"
):
if audio.data:
array = np.frombuffer(audio.data, dtype=np.int16)
self.output_queue.put_nowait((self.output_sample_rate, array))
self._write_output_audio(self.output_sample_rate, array)
except Exception as e:
print(f"❌ Error in Gemini session: {e}")
import traceback
traceback.print_exc()
async def stream(self) -> AsyncGenerator[bytes, None]:
while not self.quit.is_set():
try:
audio = await asyncio.wait_for(self.input_queue.get(), 0.1)
yield audio
except (asyncio.TimeoutError, TimeoutError):
pass
async def receive(self, frame: tuple[int, np.ndarray]) -> None:
_, array = frame
array = array.squeeze()
self._write_input_audio(array)
audio_message = encode_audio(array)
self.input_queue.put_nowait(audio_message)
async def emit(self) -> tuple[int, np.ndarray] | None:
frame = await wait_for_item(self.output_queue)
return frame
def shutdown(self) -> None:
self._close_recording_session()
self.quit.set()
def _start_recording_session(self) -> None:
try:
self.session_id = uuid.uuid4().hex[:8]
self.session_dir = recordings_root / f"session_{self.session_id}"
self.session_dir.mkdir(parents=True, exist_ok=True)
self.combined_recording_path = self.session_dir / "session_stereo.wav"
# Only create combined stereo recording
self.combined_wave = wave.open(str(self.combined_recording_path), "wb")
self.combined_wave.setnchannels(2) # Stereo: left=input, right=output
self.combined_wave.setsampwidth(2)
self.combined_wave.setframerate(self.input_sample_rate)
except Exception as recorder_err:
print(f"Recorder init error: {recorder_err}")
self._close_recording_session()
def _close_recording_session(self) -> None:
# Close combined wave file
if self.combined_wave:
try:
self.combined_wave.close()
except Exception:
pass
self.combined_wave = None
# Update latest recording info
_set_latest_recording(
self.session_dir,
self.combined_recording_path,
self.session_api_key,
)
def _write_input_audio(self, array: np.ndarray) -> None:
"""Write input audio to combined stereo (left channel)"""
if not self.combined_wave:
return
data_int16 = np.asarray(array, dtype=np.int16)
self._write_combined_chunk(left=data_int16, right=None)
def _write_output_audio(self, sample_rate: int, array: np.ndarray) -> None:
"""Write output audio to combined stereo (right channel)"""
if not self.combined_wave:
return
resampled = self._resample_audio(array, sample_rate, self.input_sample_rate)
self._write_combined_chunk(left=None, right=resampled)
def _resample_audio(
self, array: np.ndarray, original_rate: int, target_rate: int
) -> np.ndarray:
if original_rate == target_rate:
return np.asarray(array, dtype=np.int16)
if len(array) == 0 or original_rate == 0 or target_rate == 0:
return np.asarray(array, dtype=np.int16)
duration = len(array) / float(original_rate)
target_length = max(1, int(duration * target_rate))
current_times = np.linspace(0, duration, num=len(array), endpoint=False)
target_times = np.linspace(0, duration, num=target_length, endpoint=False)
resampled = np.interp(target_times, current_times, array.astype(np.float32))
return resampled.astype(np.int16)
def _write_combined_chunk(
self,
left: np.ndarray | None,
right: np.ndarray | None,
) -> None:
if self.combined_wave is None:
return
if left is None and right is None:
return
left_arr = np.asarray(left, dtype=np.int16) if left is not None else None
right_arr = np.asarray(right, dtype=np.int16) if right is not None else None
if left_arr is None and right_arr is not None:
left_arr = np.zeros_like(right_arr, dtype=np.int16)
if right_arr is None and left_arr is not None:
right_arr = np.zeros_like(left_arr, dtype=np.int16)
if left_arr is None or right_arr is None:
return
max_len = max(len(left_arr), len(right_arr))
if len(left_arr) < max_len:
left_arr = np.pad(left_arr, (0, max_len - len(left_arr)), "constant")
if len(right_arr) < max_len:
right_arr = np.pad(right_arr, (0, max_len - len(right_arr)), "constant")
interleaved = np.empty(max_len * 2, dtype=np.int16)
interleaved[0::2] = left_arr
interleaved[1::2] = right_arr
try:
self.combined_wave.writeframes(interleaved.tobytes())
except Exception as combine_err:
print(f"Combined recording error: {combine_err}")
async def generate_feedback(jd_context="", questions_text=""):
"""
Orchestrates transcript retrieval and scorecard generation using CrewAI.
Yields logs for progress tracking.
"""
logs = []
# 1. Get Transcript
logs.append("πŸ“„ Step 1: Retrieving interview transcript...")
yield "\n".join(logs), "Waiting for transcript...", "Waiting for scorecard..."
transcript = generate_transcript_from_recording()
if not transcript or "Error" in transcript:
logs.append("⚠️ Error: No transcript found.")
yield "\n".join(logs), transcript, "⚠️ Scorecard generation skipped."
return
logs.append("βœ… Transcript retrieved successfully.")
logs.append(f"πŸ“ Transcript length: {len(transcript)} characters")
yield "\n".join(logs), transcript, "Waiting for scorecard..."
# 2. Generate Scorecard using CrewAI
logs.append("\n--- Interview Multi-Agent Evaluation ---")
logs.append("πŸ€– Agents: Starting Multi-Agent Evaluation (CrewAI)...")
logs.append(" - Technical Evaluator: Analyzing technical skills...")
logs.append(" - Behavioral Evaluator: Assessing soft skills and culture fit...")
logs.append(" - Evaluation Director: Compiling final scorecard...")
yield "\n".join(logs), transcript, "Waiting for scorecard..."
try:
if os.path.exists(".env"):
load_dotenv()
active_api_key = os.getenv("API_KEY")
if not active_api_key:
logs.append("❌ Error: API Key not found.")
yield "\n".join(logs), transcript, "⚠️ Scorecard generation failed: Missing API key."
return
# Capture CrewAI output
import sys
from io import StringIO
# Capture stdout to show CrewAI's verbose output
old_stdout = sys.stdout
sys.stdout = captured_output = StringIO()
try:
# Run CrewAI evaluation in a separate thread
evaluation_result = await asyncio.to_thread(
run_evaluation_crew,
transcript,
jd_context or "No job description available.",
questions_text or "No questions list available.",
active_api_key
)
# Get captured output
crew_output = captured_output.getvalue()
finally:
# Restore stdout
sys.stdout = old_stdout
# Add CrewAI output to logs (strip ANSI escape codes)
if crew_output:
cleaned_output = strip_ansi_codes(crew_output)
logs.append(cleaned_output)
yield "\n".join(logs), transcript, "Waiting for scorecard..."
logs.append("\n--- Processing Results ---")
yield "\n".join(logs), transcript, "Waiting for scorecard..."
# Extract scorecard from result
raw_output = str(evaluation_result)
# Clean up markdown code blocks if present
cleaned_output = raw_output.replace("```markdown", "").replace("```", "").strip()
scorecard = cleaned_output
logs.append("βœ… Scorecard generated successfully.")
yield "\n".join(logs), transcript, scorecard
except Exception as e:
logs.append(f"❌ Error during evaluation: {str(e)}")
import traceback
traceback.print_exc()
yield "\n".join(logs), transcript, f"⚠️ Error generating scorecard: {str(e)}"
# --- Backend Logic (Analysis) ---
def strip_ansi_codes(text):
"""
Remove ANSI escape codes from text (e.g., [36m, [0m, [1;36m, etc.)
"""
import re
# Pattern to match ANSI escape sequences
ansi_escape = re.compile(r'\x1b\[[0-9;]*m|\x1b\[[0-9;]*[A-Za-z]|\[[0-9;]*m')
return ansi_escape.sub('', text)
async def analyze_inputs(jd_file, jd_text, cv_file, num_questions, user_api_key):
"""
Analyzes JD and CV to generate interview questions and system instructions using CrewAI.
"""
if os.path.exists(".env"):
load_dotenv()
active_api_key = user_api_key or os.getenv("API_KEY")
if not active_api_key:
yield "Error: API Key not found.", "", "", None, "⚠️ Missing API key."
return
# 1. Prepare Context
jd_content = jd_text
if jd_file:
jd_content += "\n" + extract_text_from_file(jd_file)
cv_content = ""
if cv_file:
cv_content = extract_text_from_file(cv_file)
logs = []
question_markdown = "Waiting for generated questions..."
logs.append(f"πŸ“Š Configuration: {num_questions} Questions")
yield "\n".join(logs), "", "", None, question_markdown
# 2. Run CrewAI
logs.append("πŸ€– Agents: Starting Multi-Agent Analysis (CrewAI)...")
logs.append(" - Technical Interviewer: Analyzing hard skills...")
logs.append(" - Personality Specialist: Searching for behavioral questions...")
logs.append(" - Director: Compiling final plan...")
yield "\n".join(logs), "", "", None, question_markdown
try:
# Capture CrewAI output
import sys
from io import StringIO
logs.append("\n--- CrewAI Multi-Agent Analysis ---\n")
yield "\n".join(logs), "", "", None, question_markdown
# Capture stdout to show CrewAI's verbose output
old_stdout = sys.stdout
sys.stdout = captured_output = StringIO()
try:
# Run CrewAI in a separate thread
crew_result = await asyncio.to_thread(
run_interview_crew,
jd_content,
cv_content,
num_questions,
active_api_key
)
# Get captured output
crew_output = captured_output.getvalue()
finally:
# Restore stdout
sys.stdout = old_stdout
# Add CrewAI output to logs (strip ANSI escape codes)
if crew_output:
cleaned_output = strip_ansi_codes(crew_output)
logs.append(cleaned_output)
yield "\n".join(logs), "", "", None, question_markdown
logs.append("\n--- Processing Results ---\n")
yield "\n".join(logs), "", "", None, question_markdown
raw_output = str(crew_result)
# Attempt to parse JSON
import re
import json
# Clean up markdown code blocks if present
cleaned_output = raw_output.replace("```json", "").replace("```", "").strip()
# Try to find the JSON object
json_match = re.search(r'\{.*\}', cleaned_output, re.DOTALL)
questions_md = raw_output
sys_instr = raw_output
if json_match:
try:
data = json.loads(json_match.group(0))
questions_md = data.get("questions_markdown", raw_output)
sys_instr = data.get("system_instruction", raw_output)
except Exception as json_err:
print(f"JSON Parse Error: {json_err}")
logs.append(f"⚠️ JSON parsing issue, using raw output")
yield "\n".join(logs), "", "", None, question_markdown
# Save questions
with open("generated_questions.md", "w", encoding="utf-8") as f:
f.write(questions_md)
logs.append("βœ… Questions generated and saved.")
yield "\n".join(logs), "", "", None, question_markdown
# UPDATE THE HANDLER
gemini_handler.system_instruction = sys_instr
gemini_handler.api_key = active_api_key
logs.append("βœ… Analysis Complete. Ready to interview.")
yield "\n".join(logs), sys_instr, jd_content, [], questions_md
except Exception as e:
logs.append(f"❌ Error: {str(e)}")
import traceback
traceback.print_exc()
yield "\n".join(logs), "", "", None, question_markdown
async def get_rtc_config_with_fallback():
"""Get RTC configuration with Cloudflare TURN (using HF token) and fallback STUN servers."""
# Check if running on HF Spaces by looking for SPACE_ID env var
is_space = os.getenv("SPACE_ID") is not None or get_space() is not None
if not is_space:
# Local development - use public STUN servers
print("🏠 Local development mode - using public STUN servers")
return {
"iceServers": [
{"urls": "stun:stun.l.google.com:19302"},
{"urls": "stun:stun1.l.google.com:19302"},
]
}
# HF Spaces - Try to get Cloudflare TURN credentials with HF token
print("🌐 HF Spaces detected - configuring TURN servers")
try:
hf_token = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN")
if hf_token:
print(f"πŸ”‘ HF_TOKEN found (length: {len(hf_token)})")
print("🌐 Requesting Cloudflare TURN credentials...")
config = await get_cloudflare_turn_credentials_async(hf_token=hf_token)
print(f"βœ… Cloudflare TURN credentials received: {len(config.get('iceServers', []))} servers")
# Log TURN server details (without credentials)
for server in config.get('iceServers', []):
urls = server.get('urls', [])
if isinstance(urls, str):
urls = [urls]
for url in urls:
if 'turn:' in url:
print(f" πŸ”„ TURN: {url.split('?')[0]}")
elif 'stun:' in url:
print(f" πŸ“‘ STUN: {url}")
# Add fallback STUN servers
if "iceServers" in config:
config["iceServers"].extend([
{"urls": "stun:stun.l.google.com:19302"},
{"urls": "stun:stun1.l.google.com:19302"},
])
print(f" βž• Added 2 fallback STUN servers")
return config
else:
print("⚠️ WARNING: HF_TOKEN not found in environment variables")
print(" Set HF_TOKEN in Space settings for TURN support")
except Exception as e:
print(f"❌ ERROR: Failed to get Cloudflare TURN credentials: {e}")
import traceback
traceback.print_exc()
# Fallback to public STUN servers
print("πŸ“‘ Using fallback STUN servers only (no TURN relay)")
return {
"iceServers": [
{"urls": "stun:stun.l.google.com:19302"},
{"urls": "stun:stun1.l.google.com:19302"},
{"urls": "stun:stun2.l.google.com:19302"},
{"urls": "stun:stun.services.mozilla.com"},
]
}
gemini_handler = GeminiHandler()
stream = Stream(
modality="audio",
mode="send-receive",
handler=gemini_handler,
rtc_configuration=get_rtc_config_with_fallback if (os.getenv("SPACE_ID") or get_space()) else None,
concurrency_limit=5 if (os.getenv("SPACE_ID") or get_space()) else None,
time_limit=120 if (os.getenv("SPACE_ID") or get_space()) else None,
)
app = FastAPI()
stream.mount(app)
@app.post("/input_hook")
async def _(body: InputData):
if os.path.exists(".env"):
load_dotenv()
api_key = info.get("api_key") or os.getenv("API_KEY")
# Use the API key from the client (sent from browser)
stream.set_input(body.webrtc_id, api_key, body.voice_name)
return {"status": "ok"}
@app.get("/debug/rtc_config")
async def debug_rtc_config():
"""Diagnostic endpoint to check RTC configuration"""
config = await get_rtc_config_with_fallback()
# Remove credentials for security
safe_config = {"iceServers": []}
for server in config.get("iceServers", []):
safe_server = {"urls": server.get("urls")}
if "username" in server:
safe_server["username"] = server["username"][:10] + "..."
safe_config["iceServers"].append(safe_server)
return safe_config
@app.get("/interview_ui")
async def interview_ui():
# Get RTC configuration
rtc_config = await get_rtc_config_with_fallback()
html_content = (current_dir / "index.html").read_text()
html_content = html_content.replace("__RTC_CONFIGURATION__", json.dumps(rtc_config))
return HTMLResponse(content=html_content)
# Custom CSS - Enhanced visibility with vibrant orange accents
custom_css = """
@import url('https://fonts.googleapis.com/css2?family=Inter:wght@300;400;600;700&display=swap');
body {
background: linear-gradient(135deg, #1a1f2e 0%, #0f1419 100%);
background-attachment: fixed;
font-family: 'Inter', sans-serif !important;
}
.gradio-container {
background: #242b3d !important;
backdrop-filter: blur(10px);
border: 1px solid rgba(255, 120, 73, 0.2);
border-radius: 12px !important;
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.4), 0 0 0 1px rgba(255, 120, 73, 0.1);
max-width: 1400px !important;
padding: 2rem !important;
}
h1 {
color: #ffffff !important;
font-family: 'Inter', sans-serif !important;
font-size: 2rem !important;
margin-bottom: 1.5rem !important;
font-weight: 700;
text-shadow: 0 0 30px rgba(255, 120, 73, 0.3);
}
h2, h3 {
color: #e8eaed !important;
font-family: 'Inter', sans-serif !important;
font-size: 1.1rem !important;
margin-bottom: 0.75rem !important;
font-weight: 600;
}
h4, h5, h6 {
color: #b8bcc8 !important;
font-family: 'Inter', sans-serif !important;
}
/* Improved spacing */
.gr-form, .gr-box {
gap: 1rem !important;
}
.gr-group {
padding: 1.25rem !important;
background: rgba(255, 120, 73, 0.05) !important;
border: 1px solid rgba(255, 120, 73, 0.2) !important;
border-radius: 8px !important;
box-shadow: 0 0 20px rgba(255, 120, 73, 0.05);
}
/* Vibrant orange accent buttons */
.primary-btn {
background: linear-gradient(135deg, #ff7849 0%, #ff5722 100%) !important;
border: none !important;
color: #ffffff !important;
font-weight: 600 !important;
padding: 0.75rem 1.75rem !important;
transition: all 0.3s ease;
box-shadow: 0 4px 16px rgba(255, 120, 73, 0.4), 0 0 0 1px rgba(255, 120, 73, 0.3);
}
.primary-btn:hover {
transform: translateY(-2px);
box-shadow: 0 8px 24px rgba(255, 120, 73, 0.5), 0 0 0 1px #ff7849;
filter: brightness(1.15);
}
/* Input Fields - brighter with better contrast */
label {
margin-bottom: 0.5rem !important;
}
label span {
color: #b8bcc8 !important;
font-weight: 500;
font-size: 0.95rem !important;
}
input, textarea, .gr-box {
background-color: #1a1f2e !important;
}
/* Scorecard Table Styles */
.scorecard-table table {
width: 100% !important;
border-collapse: collapse !important;
margin-top: 1rem !important;
font-size: 0.9rem !important;
table-layout: fixed !important; /* Fixes column widths */
}
.scorecard-table th, .scorecard-table td {
padding: 0.75rem !important;
text-align: left !important;
border-bottom: 1px solid #3a3a3a !important;
word-wrap: break-word !important; /* Ensures long text wraps */
overflow-wrap: break-word !important;
}
.scorecard-table th {
background-color: #2a2a2a !important;
color: #ff6b35 !important;
font-weight: 600 !important;
}
.scorecard-table tr:hover {
background-color: #2a2a2a !important;
}
/* Make it scrollable horizontally on small screens */
.scorecard-table {
overflow-x: auto !important;
display: block !important;
width: 100% !important;
}
border: 1px solid rgba(255, 120, 73, 0.2) !important;
color: #e8eaed !important;
border-radius: 8px !important;
padding: 0.75rem !important;
font-size: 0.95rem !important;
}
input:focus, textarea:focus {
border-color: #ff7849 !important;
box-shadow: 0 0 0 3px rgba(255, 120, 73, 0.2);
outline: none;
background-color: #242b3d !important;
}
/* Tabs - more visible */
.tab-nav {
border-bottom: 2px solid rgba(255, 120, 73, 0.3) !important;
margin-bottom: 1.5rem !important;
}
.tab-nav button {
font-weight: 500;
color: #9ca3af;
padding: 0.75rem 1.25rem !important;
font-size: 0.95rem !important;
transition: all 0.2s ease;
}
.tab-nav button.selected {
color: #ff7849 !important;
border-bottom: 3px solid #ff7849 !important;
font-weight: 600;
}
.tab-nav button:hover {
color: #ffab91 !important;
}
/* Links - brighter */
a {
color: #ff7849 !important;
text-decoration: none;
font-weight: 500;
}
a:hover {
color: #ffab91 !important;
text-decoration: underline;
}
/* Markdown content - light text for dark background */
.prose, .markdown-text, .gr-prose {
font-size: 0.95rem !important;
line-height: 1.6 !important;
color: #e8eaed !important;
}
.prose p, .markdown-text p, .gr-prose p {
color: #e8eaed !important;
}
.prose strong, .markdown-text strong, .gr-prose strong {
color: #ffffff !important;
font-weight: 700;
}
.prose em, .markdown-text em, .gr-prose em {
color: #b8bcc8 !important;
}
.prose ul, .prose ol {
color: #e8eaed !important;
}
.prose li {
color: #e8eaed !important;
}
/* File upload - more visible */
.gr-file {
min-height: 100px !important;
background: rgba(255, 120, 73, 0.05) !important;
border: 2px dashed rgba(255, 120, 73, 0.3) !important;
border-radius: 8px !important;
}
.gr-file:hover {
border-color: #ff7849 !important;
background: rgba(255, 120, 73, 0.08) !important;
}
/* Better padding */
.gr-padded {
padding: 1rem !important;
}
/* Button improvements */
button {
font-weight: 600 !important;
transition: all 0.3s ease !important;
}
button:hover {
transform: translateY(-1px) !important;
}
/* Ensure all text is visible on dark background */
p, div, span, .gr-text {
color: #e8eaed !important;
}
/* Code blocks */
code, pre {
background: #1a1f2e !important;
color: #ffab91 !important;
border: 1px solid rgba(255, 120, 73, 0.2);
padding: 0.2rem 0.4rem;
border-radius: 4px;
}
pre code {
display: block;
padding: 1rem;
}
/* Tables */
table {
color: #e8eaed !important;
}
th {
background: rgba(255, 120, 73, 0.1) !important;
color: #ffffff !important;
}
td {
color: #e8eaed !important;
}
/* Markdown headings */
.prose h1, .prose h2, .prose h3, .prose h4, .prose h5, .prose h6,
.markdown-text h1, .markdown-text h2, .markdown-text h3, .markdown-text h4, .markdown-text h5, .markdown-text h6 {
color: #ffffff !important;
}
/* Blockquotes */
blockquote {
border-left: 3px solid #ff7849;
padding-left: 1rem;
color: #b8bcc8 !important;
}
"""
with gr.Blocks(
title="Agentic Interviewer",
theme=gr.themes.Base(
primary_hue="orange",
secondary_hue="gray",
neutral_hue="slate",
font=[gr.themes.GoogleFont("Inter"), "ui-sans-serif", "system-ui"],
radius_size="md",
spacing_size="md",
).set(
body_background_fill="linear-gradient(135deg, #1a1f2e 0%, #0f1419 100%)",
body_text_color="#e8eaed",
button_primary_background_fill="#ff7849",
button_primary_background_fill_hover="#ff5722",
button_primary_text_color="#ffffff",
block_background_fill="#242b3d",
block_border_color="rgba(255, 120, 73, 0.2)",
input_background_fill="#1a1f2e",
input_border_color="rgba(255, 120, 73, 0.2)",
),
css=custom_css,
js="""
function() {
window.addEventListener("message", (event) => {
if (event.data === "stop_interview") {
const btn = document.getElementById("hidden-stop-btn");
if (btn) btn.click();
}
});
}
"""
) as demo:
# State Variables
system_instruction_state = gr.State("You are a helpful interviewer.")
jd_context_state = gr.State("")
chat_history_state = gr.State([])
questions_state = gr.State("Waiting for generated questions...")
gr.Markdown("# πŸŽ™οΈ Agentic Interviewer (Live)")
tabs = gr.Tabs(elem_id="app-tabs")
with tabs:
# TAB 1: SETUP
with gr.Tab("1. Setup", id="tab-setup") as setup_tab:
# Hero Section
gr.Markdown(
"""
# AI-Powered Interview System
Upload your job description and candidate CV to generate tailored interview questions powered by AI agents.
"""
)
# Main Configuration Row
with gr.Row():
with gr.Column(scale=1):
with gr.Group():
gr.Markdown("### βš™οΈ **Settings**")
num_questions_input = gr.Slider(
minimum=1,
maximum=10,
value=3,
step=1,
label="Number of Questions",
#info="Total questions to generate"
)
api_key_input = gr.State("")
with gr.Group():
gr.Markdown("### πŸ“‚ **Quick Start**")
load_examples_btn = gr.Button(
"⚑ Load Example Files",
variant="primary",
elem_classes=["primary-btn"],
size="lg"
)
gr.Markdown("*Try the demo with pre-loaded CV/JD pdf files*")
with gr.Column(scale=2):
with gr.Group():
gr.Markdown("### πŸ“„ **Job Description**")
gr.Markdown("*Upload the JD in Pdf or paste the text below*")
jd_file_input = gr.File(
label="πŸ“Ž Upload PDF",
file_types=[".pdf"],
height=120
)
jd_text_input = gr.Textbox(
label="✍️ or Paste JD Text",
lines=3,
placeholder="Paste the job description here...",
#info="Supports plain text or formatted content"
)
with gr.Column(scale=2):
with gr.Group():
gr.Markdown("### πŸ‘€ **Candidate Profile**")
gr.Markdown("*Upload the candidate's resume/CV in PDF format*")
cv_file_input = gr.File(
label="πŸ“Ž Upload CV/Resume",
file_types=[".pdf"],
height=180
)
# Action Button
with gr.Row():
with gr.Column(scale=1):
pass
with gr.Column(scale=2):
analyze_btn = gr.Button(
"πŸš€ Generate Interview Questions",
variant="primary",
size="lg",
elem_classes=["primary-btn"]
)
with gr.Column(scale=1):
pass
# Logs Section
with gr.Group():
gr.Markdown("### πŸ“Š **Analysis Progress**")
log_output = gr.Textbox(
show_label=False,
interactive=False,
lines=10,
max_lines=20,
autoscroll=True,
placeholder="Analysis logs will appear here..."
)
gr.HTML(
"""
<div style="background: linear-gradient(135deg, #2a2a2a 0%, #1a1a1a 100%); padding: 2rem; border-radius: 8px; border: 1px solid #3a3a3a; margin-top: 2rem;">
<h3 style="color: #ff6b35; font-size: 1.25rem; margin-bottom: 1rem; border-bottom: 2px solid #ff6b35; padding-bottom: 0.5rem;">
Agentic Interview MVP β€” Automated Voice Interview & Candidate Scoring System
</h3>
<div style="display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 1rem; margin-bottom: 1.5rem;">
<div style="background: #0f0f0f; padding: 0.75rem; border-radius: 6px; border-left: 3px solid #ff6b35;">
<strong style="color: #ff6b35;">Category:</strong><br>
<span style="color: #e5e5e5;">Enterprise Applications</span>
</div>
<div style="background: #0f0f0f; padding: 0.75rem; border-radius: 6px; border-left: 3px solid #f7931e;">
<strong style="color: #f7931e;">Tag:</strong><br>
<span style="color: #e5e5e5;">mcp-in-action-track-enterprise</span>
</div>
</div>
<p style="color: #e5e5e5; line-height: 1.6; margin-bottom: 1.5rem;">
This Space showcases an enterprise-grade agentic interview system (technical + behavioural questions) built using <strong style="color: #ffffff;">Gradio + CrewAI + Gemini + FastRTC + Serper Tool + MCP Tool (sentiment analysis) + Vibe Coding</strong> automation. The application enables organizations to streamline and scale their hiring workflow by leveraging intelligent multi-agent collaboration.
</p>
<h4 style="color: #ff6b35; font-size: 1.1rem; margin: 1.5rem 0 1rem 0;">πŸš€ What the Application Does</h4>
<div style="display: grid; grid-template-columns: repeat(auto-fit, minmax(250px, 1fr)); gap: 1rem; margin-bottom: 1.5rem;">
<div style="background: #0f0f0f; padding: 1rem; border-radius: 6px;">
<strong style="color: #ff6b35;">1️⃣ Upload CV + Job Description</strong>
<p style="color: #a3a3a3; font-size: 0.875rem; margin-top: 0.5rem;">
The enterprise (or candidate) provides a resume/CV and the job description
</p>
</div>
<div style="background: #0f0f0f; padding: 1rem; border-radius: 6px;">
<strong style="color: #ff6b35;">2️⃣ Agent Generates Questions</strong>
<p style="color: #a3a3a3; font-size: 0.875rem; margin-top: 0.5rem;">
AI analyzes both documents and creates a custom interview questionnaire aligned with the role
</p>
</div>
<div style="background: #0f0f0f; padding: 1rem; border-radius: 6px;">
<strong style="color: #ff6b35;">3️⃣ Voice Interview Tab</strong>
<p style="color: #a3a3a3; font-size: 0.875rem; margin-top: 0.5rem;">
AI interview agent conducts a live voice interview simulating a real interview experience
</p>
</div>
<div style="background: #0f0f0f; padding: 1rem; border-radius: 6px;">
<strong style="color: #ff6b35;">4️⃣ Agentic Evaluation</strong>
<p style="color: #a3a3a3; font-size: 0.875rem; margin-top: 0.5rem;">
AI evaluates responses and generates competency scores, strengths/weaknesses, and hire recommendations
</p>
</div>
</div>
<div style="display: grid; grid-template-columns: 1fr 1fr; gap: 1.5rem; margin-top: 2rem;">
<div>
<h4 style="color: #f7931e; font-size: 1rem; margin-bottom: 0.75rem;">Why It's Useful</h4>
<div style="background: #0f0f0f; padding: 1rem; border-radius: 6px; margin-bottom: 1rem;">
<strong style="color: #ffffff;">For Enterprises:</strong>
<ul style="color: #a3a3a3; font-size: 0.875rem; margin-top: 0.5rem; padding-left: 1.25rem;">
<li>Automates early-stage screening</li>
<li>Standardizes evaluation</li>
<li>Reduces manual workload</li>
<li>Cuts time-to-hire</li>
<li>Creates scalable, unbiased pipelines</li>
</ul>
</div>
<div style="background: #0f0f0f; padding: 1rem; border-radius: 6px;">
<strong style="color: #ffffff;">For Candidates:</strong>
<ul style="color: #a3a3a3; font-size: 0.875rem; margin-top: 0.5rem; padding-left: 1.25rem;">
<li>Realistic AI-driven mock interviews</li>
<li>Practice role-specific questions</li>
<li>Receive actionable feedback</li>
</ul>
</div>
</div>
<div>
<h4 style="color: #f7931e; font-size: 1rem; margin-bottom: 0.75rem;">Who Should Use It</h4>
<div style="background: #0f0f0f; padding: 1rem; border-radius: 6px;">
<ul style="color: #a3a3a3; font-size: 0.875rem; padding-left: 1.25rem;">
<li>HR teams</li>
<li>Talent acquisition departments</li>
<li>Startups building recruitment tools</li>
<li>Candidates preparing for interviews</li>
<li>Organizations adopting AI-driven automation</li>
</ul>
</div>
</div>
</div>
</div>
"""
)
# TAB 2: INTERVIEW
with gr.Tab("2. Interview", id="tab-interview") as interview_tab:
gr.Markdown(
"""
### ⚠️ INSTRUCTIONS
1. Click **Start** to begin speaking and Greet interview with **Hello** so that interviewer can start the interview.
2. When the interview is done, click **Stop** on the widget.
3. Then go to the **Feedback** tab to get your results.
"""
)
questions_display = gr.Markdown(
"### πŸ“‹ Interview Question Plan\nWaiting for generation...",
elem_id="questions-display",
)
# WebRTC Component for live interview
# webrtc = WebRTC(
# label="Interview Session",
# modality="audio",
# mode="send-receive",
# rtc_configuration=get_cloudflare_turn_credentials_async if get_space() else None,
# )
# webrtc.stream(
# gemini_handler,
# inputs=[webrtc],
# outputs=[webrtc],
# time_limit=90 if get_space() else None,
# concurrency_limit=5 if get_space() else None,
# )
gr.HTML(
"""
<iframe
src="/interview_ui"
allow="microphone; camera; autoplay"
style="width: 100%; height: 400px; border: none; border-radius: 8px; background-color: transparent;"
></iframe>
"""
)
# Logs Section
# with gr.Group():
# gr.Markdown("### πŸ“Š **Analysis Progress**")
# log_output = gr.Textbox(
# show_label=False,
# interactive=False,
# lines=15,
# max_lines=20,
# autoscroll=True,
# placeholder="Analysis logs will appear here..."
# )
# gr.Markdown("### πŸ“ Once the Interview Finished then move to Feedback Tab")
# with gr.Row():
# # prev_btn = gr.Button("<< Setup", variant="secondary")
# next_btn = gr.Button("Feedback >>", variant="secondary")
# TAB 3: FEEDBACK
with gr.Tab("3. Feedback", id="tab-feedback") as feedback_tab:
gr.Markdown("### πŸ“ Interview Feedback")
# Logs Section
with gr.Group():
gr.Markdown("### πŸ“Š **Evaluation Progress**")
feedback_log_output = gr.Textbox(
show_label=False,
interactive=False,
lines=10,
max_lines=15,
autoscroll=True,
placeholder="Evaluation logs will appear here..."
)
gr.Markdown("### πŸ“Š Scorecard")
scorecard_output = gr.Markdown("Waiting for scorecard...", elem_classes=["scorecard-table"])
gr.Markdown("################")
gr.Markdown("### πŸ“„ Transcript")
transcript_output = gr.Markdown("Waiting for transcript...")
# --- Hidden Button for Auto-Feedback ---
hidden_stop_btn = gr.Button(visible=True, elem_id="hidden-stop-btn")
# --- Events ---
# Load Example Data
load_examples_btn.click(
fn=lambda: ("JD.pdf", "Anonymized_CV.pdf"),
inputs=[],
outputs=[jd_file_input, cv_file_input]
)
analyze_btn.click(
fn=analyze_inputs,
inputs=[jd_file_input, jd_text_input, cv_file_input, num_questions_input, api_key_input],
outputs=[log_output, system_instruction_state, jd_context_state, chat_history_state, questions_state]
).then(
fn=lambda x: x,
inputs=[questions_state],
outputs=[questions_display]
).then(
fn=lambda: gr.Tabs(selected="tab-interview"),
outputs=[tabs]
)
# Automated feedback generation when interview stops
hidden_stop_btn.click(
fn=lambda: gr.Tabs(selected="tab-feedback"),
outputs=[tabs]
).then(
fn=generate_feedback,
inputs=[jd_context_state, questions_state],
outputs=[feedback_log_output, transcript_output, scorecard_output]
)
# prev_btn.click(
# fn=lambda: gr.Tabs(selected="tab-setup"),
# outputs=[tabs]
# )
# next_btn.click(
# fn=lambda: gr.Tabs(selected="tab-feedback"),
# outputs=[tabs]
# )
async def check_recording_and_generate(jd_context, questions_text):
"""Check if recording exists and generate feedback if it does"""
info = _get_latest_recording()
combined_path = info.get("combined")
if combined_path and os.path.exists(combined_path):
async for logs, transcript, scorecard in generate_feedback(jd_context or "", questions_text or ""):
yield logs, transcript, scorecard
else:
yield "No Interview tookplace yet.", "No Interview tookplace yet.", "No Interview tookplace yet."
# Auto-generate feedback when switching to feedback tab
feedback_tab.select(
fn=check_recording_and_generate,
inputs=[jd_context_state, questions_state],
outputs=[feedback_log_output, transcript_output, scorecard_output]
)
# Mount Gradio app to FastAPI (required for HF Spaces)
app = gr.mount_gradio_app(app, demo, path="/")
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("PORT", 7860)) # Use 7860 for HF Spaces compatibility
# port = 7890
print(f"\n\nπŸš€ App running! Please open: http://127.0.0.1:{port}\n\n")
uvicorn.run(app, host="0.0.0.0", port=port)