Spaces:
Runtime error
Runtime error
| import tkinter as tk | |
| from tkinter import scrolledtext, messagebox, ttk | |
| import threading | |
| import os | |
| import whisper | |
| from dotenv import load_dotenv | |
| import logging | |
| import glob | |
| from datetime import datetime | |
| from stt_processor import TextProcessor | |
| # 환경 변수 로드 | |
| load_dotenv() | |
| # --- 설정: .env 파일에서 API 키를 읽어옵니다 --- | |
| GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") | |
| # 폴더 생성 | |
| for folder in ["logs", "output", "data"]: | |
| if not os.path.exists(folder): | |
| os.makedirs(folder) | |
| # 로깅 설정 | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler('logs/stt_processor.log', encoding='utf-8'), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class STTProcessorApp: | |
| def __init__(self, root): | |
| self.root = root | |
| self.root.title("2인 대화 STT 처리기 (AI 화자 분리)") | |
| self.root.geometry("1000x750") | |
| # 모델 초기화 | |
| self.whisper_model = None | |
| self.text_processor = None | |
| # UI 구성 요소 | |
| self.setup_ui() | |
| # 상태 추적 | |
| self.is_processing = False | |
| logger.info("STT 처리기 앱이 시작되었습니다.") | |
| def setup_ui(self): | |
| """UI 컴포넌트를 설정합니다.""" | |
| # 상단 프레임 - 상태 정보 | |
| status_frame = ttk.Frame(self.root) | |
| status_frame.pack(fill=tk.X, padx=10, pady=5) | |
| ttk.Label(status_frame, text="상태:").pack(side=tk.LEFT) | |
| self.status_label = ttk.Label(status_frame, text="준비", foreground="green") | |
| self.status_label.pack(side=tk.LEFT, padx=(5, 0)) | |
| # 중앙 프레임 - 로그 출력 | |
| log_frame = ttk.LabelFrame(self.root, text="처리 로그") | |
| log_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=5) | |
| self.log_text = scrolledtext.ScrolledText(log_frame, height=25, wrap=tk.WORD) | |
| self.log_text.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) | |
| # 하단 프레임 - 컨트롤 | |
| control_frame = ttk.Frame(self.root) | |
| control_frame.pack(fill=tk.X, padx=10, pady=5) | |
| # 왼쪽: 모델 로딩 버튼 | |
| self.load_models_btn = ttk.Button( | |
| control_frame, | |
| text="모델 로딩", | |
| command=self.load_models_threaded | |
| ) | |
| self.load_models_btn.pack(side=tk.LEFT, padx=(0, 10)) | |
| # 중앙: 처리 버튼 | |
| self.process_btn = ttk.Button( | |
| control_frame, | |
| text="오디오 파일 처리 시작", | |
| command=self.process_files_threaded, | |
| state=tk.DISABLED | |
| ) | |
| self.process_btn.pack(side=tk.LEFT, padx=(0, 10)) | |
| # 오른쪽: 종료 버튼 | |
| ttk.Button( | |
| control_frame, | |
| text="종료", | |
| command=self.root.quit | |
| ).pack(side=tk.RIGHT) | |
| # 진행률 표시 | |
| self.progress = ttk.Progressbar( | |
| control_frame, | |
| mode='indeterminate' | |
| ) | |
| self.progress.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0, 10)) | |
| def log(self, message): | |
| """로그 메시지를 UI에 출력합니다.""" | |
| def append_log(): | |
| timestamp = datetime.now().strftime("%H:%M:%S") | |
| self.log_text.insert(tk.END, f"[{timestamp}] {message}\n") | |
| self.log_text.see(tk.END) | |
| self.root.after(0, append_log) | |
| logger.info(message) | |
| def update_status(self, status, color="black"): | |
| """상태 라벨을 업데이트합니다.""" | |
| def update(): | |
| self.status_label.config(text=status, foreground=color) | |
| self.root.after(0, update) | |
| def load_models_threaded(self): | |
| """별도 스레드에서 모델을 로딩합니다.""" | |
| threading.Thread(target=self.load_models, daemon=True).start() | |
| def load_models(self): | |
| """AI 모델들을 로딩합니다.""" | |
| try: | |
| self.update_status("모델 로딩 중...", "orange") | |
| self.log("AI 모델 로딩을 시작합니다...") | |
| # API 키 검증 | |
| if not GOOGLE_API_KEY or GOOGLE_API_KEY == "your_google_api_key_here": | |
| raise ValueError("Google API 키가 설정되지 않았습니다. .env 파일을 확인하세요.") | |
| # Whisper 모델 로딩 | |
| self.log("Whisper 모델을 로딩합니다...") | |
| self.whisper_model = whisper.load_model("base") | |
| self.log("Whisper 모델 로딩 완료!") | |
| # TextProcessor 초기화 | |
| self.log("Gemini 텍스트 프로세서를 초기화합니다...") | |
| self.text_processor = TextProcessor(GOOGLE_API_KEY) | |
| self.text_processor.load_models() | |
| self.log("모든 모델이 성공적으로 로딩되었습니다!") | |
| self.update_status("준비 완료", "green") | |
| # 처리 버튼 활성화 | |
| def enable_button(): | |
| self.process_btn.config(state=tk.NORMAL) | |
| self.root.after(0, enable_button) | |
| except Exception as e: | |
| error_msg = f"모델 로딩 실패: {str(e)}" | |
| self.log(error_msg) | |
| self.update_status("모델 로딩 실패", "red") | |
| messagebox.showerror("오류", error_msg) | |
| def process_files_threaded(self): | |
| """별도 스레드에서 파일을 처리합니다.""" | |
| if self.is_processing: | |
| messagebox.showwarning("경고", "이미 처리 중입니다.") | |
| return | |
| threading.Thread(target=self.process_files, daemon=True).start() | |
| def process_files(self): | |
| """data 폴더의 모든 WAV 파일을 처리합니다.""" | |
| try: | |
| self.is_processing = True | |
| self.update_status("처리 중...", "orange") | |
| # 진행률 표시 시작 | |
| def start_progress(): | |
| self.progress.start(10) | |
| self.root.after(0, start_progress) | |
| # WAV 파일 찾기 | |
| wav_files = glob.glob("data/*.wav") | |
| if not wav_files: | |
| self.log("data 폴더에 WAV 파일이 없습니다.") | |
| return | |
| self.log(f"{len(wav_files)}개의 WAV 파일을 발견했습니다.") | |
| # 각 파일 처리 | |
| for i, wav_file in enumerate(wav_files): | |
| self.log(f"\n=== 파일 처리 ({i+1}/{len(wav_files)}) ===") | |
| success = self.process_single_audio_file(wav_file) | |
| if success: | |
| self.log(f"✅ {os.path.basename(wav_file)} 처리 완료") | |
| else: | |
| self.log(f"❌ {os.path.basename(wav_file)} 처리 실패") | |
| self.log(f"\n모든 파일 처리 완료! 총 {len(wav_files)}개 파일") | |
| self.update_status("처리 완료", "green") | |
| except Exception as e: | |
| error_msg = f"파일 처리 중 오류: {str(e)}" | |
| self.log(error_msg) | |
| self.update_status("처리 실패", "red") | |
| finally: | |
| self.is_processing = False | |
| # 진행률 표시 중지 | |
| def stop_progress(): | |
| self.progress.stop() | |
| self.root.after(0, stop_progress) | |
| def process_single_audio_file(self, file_path): | |
| """단일 오디오 파일을 처리합니다.""" | |
| try: | |
| filename = os.path.basename(file_path) | |
| base_name = os.path.splitext(filename)[0] | |
| self.log(f"파일 처리 시작: {filename}") | |
| # 1단계: Whisper로 음성 인식 | |
| self.log("1/3: 음성 인식 진행 중...") | |
| result = self.whisper_model.transcribe(file_path) | |
| full_text = result['text'].strip() | |
| if not full_text: | |
| self.log(f"❌ 파일 {filename}에서 텍스트를 추출할 수 없습니다.") | |
| return False | |
| language = result.get('language', 'unknown') | |
| self.log(f"음성 인식 완료 (언어: {language}, 길이: {len(full_text)}자)") | |
| # 2단계: TextProcessor로 화자 분리 및 맞춤법 교정 | |
| self.log("2/3: AI 화자 분리 및 맞춤법 교정 진행 중...") | |
| def progress_callback(status, current, total): | |
| self.log(f" → {status} ({current}/{total})") | |
| text_result = self.text_processor.process_text( | |
| full_text, | |
| text_name=base_name, | |
| progress_callback=progress_callback | |
| ) | |
| if not text_result.get("success", False): | |
| self.log(f"❌ 텍스트 처리 실패: {text_result.get('error', 'Unknown error')}") | |
| return False | |
| # 3단계: 결과 저장 | |
| self.log("3/3: 결과 저장 중...") | |
| # 기존 결과에 Whisper 정보 추가 | |
| enhanced_result = text_result.copy() | |
| enhanced_result.update({ | |
| "base_name": base_name, | |
| "language": language, | |
| "whisper_segments": result.get("segments", []) | |
| }) | |
| # 파일 저장 | |
| saved = self.text_processor.save_results_to_files(enhanced_result) | |
| if saved: | |
| self.log("결과 파일 저장 완료!") | |
| else: | |
| self.log("⚠️ 결과 파일 저장 중 일부 오류 발생") | |
| return True | |
| except Exception as e: | |
| self.log(f"❌ 파일 {filename} 처리 중 오류: {str(e)}") | |
| return False | |
| def main(): | |
| """메인 함수""" | |
| root = tk.Tk() | |
| app = STTProcessorApp(root) | |
| try: | |
| root.mainloop() | |
| except KeyboardInterrupt: | |
| logger.info("사용자에 의해 프로그램이 종료되었습니다.") | |
| except Exception as e: | |
| logger.error(f"예상치 못한 오류: {e}") | |
| messagebox.showerror("오류", f"예상치 못한 오류가 발생했습니다: {str(e)}") | |
| if __name__ == "__main__": | |
| main() | |