Spaces:
Runtime error
Runtime error
Jeongsoo1975
Initial commit: Gradio text-based speaker separation app for Hugging Face Spaces
ae9ec05
| import whisper | |
| import google.generativeai as genai | |
| import os | |
| import json | |
| from datetime import datetime | |
| import re | |
| def test_speaker_separation(): | |
| """Gemini를 사용한 화자 분리 테스트""" | |
| # API 키 로드 | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") | |
| if not GOOGLE_API_KEY or GOOGLE_API_KEY == "your_google_api_key_here": | |
| print("ERROR: Please set GOOGLE_API_KEY in .env file") | |
| return | |
| print("Loading models...") | |
| try: | |
| # Whisper 모델 로드 | |
| whisper_model = whisper.load_model("base") | |
| print("Whisper model loaded!") | |
| # Gemini 모델 설정 | |
| genai.configure(api_key=GOOGLE_API_KEY) | |
| # gemini-2.0-flash: 최신 Gemini 2.0 모델, 빠르고 정확한 화자 분리 | |
| gemini_model = genai.GenerativeModel('gemini-2.0-flash') | |
| print("Gemini 2.0 Flash model configured!") | |
| # WAV 파일 찾기 | |
| wav_files = [] | |
| if os.path.exists("data"): | |
| for file in os.listdir("data"): | |
| if file.endswith(".wav"): | |
| wav_files.append(os.path.join("data", file)) | |
| if not wav_files: | |
| print("No WAV files found in data folder.") | |
| return | |
| print(f"Found {len(wav_files)} WAV file(s)") | |
| for wav_file in wav_files[:1]: # 첫 번째 파일만 테스트 | |
| print(f"\nProcessing: {os.path.basename(wav_file)}") | |
| # 1단계: 음성 인식 | |
| print("Step 1: Speech recognition...") | |
| result = whisper_model.transcribe(wav_file) | |
| full_text = result['text'].strip() | |
| print(f"Language detected: {result['language']}") | |
| print(f"Text length: {len(full_text)} characters") | |
| print(f"Text preview: {full_text[:200]}...") | |
| # 2단계: 화자 분리 | |
| print("\nStep 2: Speaker separation with Gemini...") | |
| prompt = f""" | |
| 당신은 2명의 화자가 나누는 대화를 분석하는 전문가입니다. | |
| 주어진 텍스트를 분석하여 각 발언을 화자별로 구분해주세요. | |
| 분석 지침: | |
| 1. 대화의 맥락과 내용을 기반으로 화자를 구분하세요 | |
| 2. 말투, 주제 전환, 질문과 답변의 패턴을 활용하세요 | |
| 3. 화자1과 화자2로 구분하여 표시하세요 | |
| 4. 각 발언 앞에 [화자1] 또는 [화자2]를 붙여주세요 | |
| 5. 발언이 너무 길 경우 자연스러운 지점에서 나누어주세요 | |
| 출력 형식: | |
| [화자1] 첫 번째 발언 내용 | |
| [화자2] 두 번째 발언 내용 | |
| [화자1] 세 번째 발언 내용 | |
| ... | |
| 분석할 텍스트: | |
| {full_text} | |
| """ | |
| response = gemini_model.generate_content(prompt) | |
| separated_text = response.text.strip() | |
| print("Speaker separation completed!") | |
| # 3단계: 맞춤법 교정 | |
| print("\nStep 3: Spell checking with Gemini...") | |
| spelling_prompt = f""" | |
| 당신은 한국어 맞춤법 교정 전문가입니다. | |
| 주어진 텍스트에서 맞춤법 오류, 띄어쓰기 오류, 오타를 수정해주세요. | |
| 교정 지침: | |
| 1. 자연스러운 한국어 표현으로 수정하되, 원본의 의미와 말투는 유지하세요 | |
| 2. [화자1], [화자2] 태그는 그대로 유지하세요 | |
| 3. 전문 용어나 고유명사는 가능한 정확하게 수정하세요 | |
| 4. 구어체 특성은 유지하되, 명백한 오타만 수정하세요 | |
| 5. 문맥에 맞는 올바른 단어로 교체하세요 | |
| 교정할 텍스트: | |
| {separated_text} | |
| """ | |
| corrected_response = gemini_model.generate_content(spelling_prompt) | |
| corrected_text = corrected_response.text.strip() | |
| print("Spell checking completed!") | |
| # 4단계: 결과 저장 | |
| print("\nStep 4: Saving results...") | |
| base_name = os.path.splitext(os.path.basename(wav_file))[0] | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| # output 폴더 생성 | |
| if not os.path.exists("output"): | |
| os.makedirs("output") | |
| # 전체 결과 저장 (원본 + 분리 + 교정) | |
| result_path = f"output/{base_name}_complete_result_{timestamp}.txt" | |
| with open(result_path, 'w', encoding='utf-8') as f: | |
| f.write(f"Filename: {base_name}\n") | |
| f.write(f"Processing time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") | |
| f.write(f"Language: {result['language']}\n") | |
| f.write("="*50 + "\n\n") | |
| f.write("Original text:\n") | |
| f.write(full_text + "\n\n") | |
| f.write("="*50 + "\n\n") | |
| f.write("Speaker separated text (original):\n") | |
| f.write(separated_text + "\n\n") | |
| f.write("="*50 + "\n\n") | |
| f.write("Speaker separated text (spell corrected):\n") | |
| f.write(corrected_text + "\n") | |
| # 교정된 텍스트에서 화자별 분리 결과 파싱 | |
| corrected_conversations = {"화자1": [], "화자2": []} | |
| pattern = r'\[화자([12])\]\s*(.+?)(?=\[화자[12]\]|$)' | |
| matches = re.findall(pattern, corrected_text, re.DOTALL) | |
| for speaker_num, content in matches: | |
| speaker = f"화자{speaker_num}" | |
| content = content.strip() | |
| if content: | |
| corrected_conversations[speaker].append(content) | |
| # 원본 화자별 분리 결과도 파싱 (비교용) | |
| original_conversations = {"화자1": [], "화자2": []} | |
| matches = re.findall(pattern, separated_text, re.DOTALL) | |
| for speaker_num, content in matches: | |
| speaker = f"화자{speaker_num}" | |
| content = content.strip() | |
| if content: | |
| original_conversations[speaker].append(content) | |
| # 교정된 화자별 개별 파일 저장 | |
| for speaker, utterances in corrected_conversations.items(): | |
| if utterances: | |
| speaker_path = f"output/{base_name}_{speaker}_교정본_{timestamp}.txt" | |
| with open(speaker_path, 'w', encoding='utf-8') as f: | |
| f.write(f"Filename: {base_name}\n") | |
| f.write(f"Speaker: {speaker}\n") | |
| f.write(f"Processing time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") | |
| f.write(f"Number of utterances: {len(utterances)}\n") | |
| f.write("="*50 + "\n\n") | |
| for idx, utterance in enumerate(utterances, 1): | |
| f.write(f"{idx}. {utterance}\n\n") | |
| # JSON 저장 (원본과 교정본 모두 포함) | |
| json_path = f"output/{base_name}_complete_data_{timestamp}.json" | |
| json_data = { | |
| "filename": base_name, | |
| "processed_time": datetime.now().isoformat(), | |
| "language": result['language'], | |
| "original_text": full_text, | |
| "separated_text": separated_text, | |
| "corrected_text": corrected_text, | |
| "conversations_by_speaker_original": original_conversations, | |
| "conversations_by_speaker_corrected": corrected_conversations, | |
| "segments": result.get("segments", []) | |
| } | |
| with open(json_path, 'w', encoding='utf-8') as f: | |
| json.dump(json_data, f, ensure_ascii=False, indent=2) | |
| print(f"Results saved:") | |
| print(f" - Complete result: {result_path}") | |
| print(f" - JSON data: {json_path}") | |
| for speaker in corrected_conversations: | |
| if corrected_conversations[speaker]: | |
| print(f" - {speaker} (교정본): {len(corrected_conversations[speaker])} utterances") | |
| print("\nProcessing completed successfully!") | |
| print("✓ Speech recognition with Whisper") | |
| print("✓ Speaker separation with Gemini 2.0") | |
| print("✓ Spell checking with Gemini 2.0") | |
| print("✓ Results saved (original + corrected versions)") | |
| except Exception as e: | |
| print(f"Error occurred: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| if __name__ == "__main__": | |
| test_speaker_separation() | |