Spaces:
Running
Running
| """ | |
| File Tool - Xử lý các loại file khác nhau | |
| """ | |
| import os | |
| import tempfile | |
| import requests | |
| import pandas as pd | |
| from typing import Optional, Dict, Any | |
| def get_txt_content_from_url(url: str) -> str: | |
| """ | |
| Lấy nội dung file .txt từ URL (dành cho transcript link hoặc file text) | |
| """ | |
| try: | |
| response = requests.get(url, timeout=30) | |
| response.raise_for_status() | |
| return response.text | |
| except Exception as e: | |
| return f"Error downloading text file: {str(e)}" | |
| def download_file_from_api(task_id: str) -> Optional[str]: | |
| """ | |
| Download file từ API với task_id | |
| """ | |
| try: | |
| api_url = "https://agents-course-unit4-scoring.hf.space" | |
| file_url = f"{api_url}/files/{task_id}" | |
| response = requests.get(file_url, timeout=30) | |
| if response.status_code == 200: | |
| # Determine file extension dựa trên content-type | |
| content_type = response.headers.get('content-type', '') | |
| if 'python' in content_type or 'text/plain' in content_type: | |
| suffix = '.py' | |
| elif 'excel' in content_type or 'spreadsheet' in content_type: | |
| suffix = '.xlsx' | |
| elif 'csv' in content_type: | |
| suffix = '.csv' | |
| elif 'json' in content_type: | |
| suffix = '.json' | |
| elif 'xml' in content_type: | |
| suffix = '.xml' | |
| elif 'html' in content_type: | |
| suffix = '.html' | |
| else: | |
| suffix = '.txt' # Default | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file: | |
| tmp_file.write(response.content) | |
| return tmp_file.name | |
| else: | |
| return None | |
| except Exception as e: | |
| print(f"Error downloading file: {e}") | |
| return None | |
| def read_python_file(file_path: str) -> str: | |
| """ | |
| Đọc và phân tích file Python | |
| """ | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| # Đếm dòng code | |
| lines = content.split('\n') | |
| code_lines = [line for line in lines if line.strip() and not line.strip().startswith('#')] | |
| return f"""Python Code Analysis: | |
| Filename: {os.path.basename(file_path)} | |
| Total lines: {len(lines)} | |
| Code lines: {len(code_lines)} | |
| Content: | |
| {content}""" | |
| except Exception as e: | |
| return f"Error reading Python file: {str(e)}" | |
| def read_excel_file(file_path: str) -> str: | |
| """ | |
| Đọc và phân tích file Excel | |
| """ | |
| try: | |
| # Đọc tất cả sheets | |
| excel_file = pd.ExcelFile(file_path) | |
| sheet_names = excel_file.sheet_names | |
| result = f"Excel File Analysis:\nFilename: {os.path.basename(file_path)}\nSheets: {sheet_names}\n\n" | |
| for sheet_name in sheet_names: | |
| df = pd.read_excel(file_path, sheet_name=sheet_name) | |
| result += f"Sheet '{sheet_name}':\n" | |
| result += f"Shape: {df.shape}\n" | |
| result += f"Columns: {list(df.columns)}\n" | |
| result += f"Data preview:\n{df.head().to_string()}\n\n" | |
| # Nếu file quá lớn, chỉ hiển thị 3 sheets đầu | |
| if len(sheet_names) > 3: | |
| result += "... (showing first 3 sheets)\n" | |
| break | |
| return result | |
| except Exception as e: | |
| return f"Error reading Excel file: {str(e)}" | |
| def read_csv_file(file_path: str) -> str: | |
| """ | |
| Đọc và phân tích file CSV | |
| """ | |
| try: | |
| df = pd.read_csv(file_path) | |
| return f"""CSV File Analysis: | |
| Filename: {os.path.basename(file_path)} | |
| Shape: {df.shape} | |
| Columns: {list(df.columns)} | |
| Data preview: | |
| {df.head().to_string()} | |
| Data types: | |
| {df.dtypes.to_string()}""" | |
| except Exception as e: | |
| return f"Error reading CSV file: {str(e)}" | |
| def read_text_file(file_path: str) -> str: | |
| """ | |
| Đọc file text thường | |
| """ | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| # Giới hạn độ dài hiển thị | |
| if len(content) > 5000: | |
| content = content[:5000] + "\n... (content truncated)" | |
| return f"""Text File Content: | |
| Filename: {os.path.basename(file_path)} | |
| Size: {len(content)} characters | |
| Content: | |
| {content}""" | |
| except Exception as e: | |
| return f"Error reading text file: {str(e)}" | |
| def read_file_content(task_id: str = "", file_path: str = "") -> str: | |
| """ | |
| Main function: Đọc nội dung file từ task_id hoặc file_path | |
| Args: | |
| task_id: ID để download file từ API | |
| file_path: Đường dẫn file local (nếu có) | |
| Returns: | |
| Nội dung file đã được phân tích | |
| """ | |
| target_file_path = None | |
| try: | |
| # Xác định đường dẫn file | |
| if file_path and os.path.exists(file_path): | |
| target_file_path = file_path | |
| elif task_id: | |
| target_file_path = download_file_from_api(task_id) | |
| if not target_file_path: | |
| return "Error: Could not download file" | |
| else: | |
| return "Error: No file path or task_id provided" | |
| # Kiểm tra file tồn tại | |
| if not os.path.exists(target_file_path): | |
| return "Error: File not found" | |
| # Đọc file dựa trên extension | |
| file_ext = os.path.splitext(target_file_path)[1].lower() | |
| if file_ext == '.py': | |
| result = read_python_file(target_file_path) | |
| elif file_ext in ['.xlsx', '.xls']: | |
| result = read_excel_file(target_file_path) | |
| elif file_ext == '.csv': | |
| result = read_csv_file(target_file_path) | |
| else: | |
| result = read_text_file(target_file_path) | |
| # Cleanup downloaded file nếu cần | |
| if task_id and target_file_path != file_path: | |
| try: | |
| os.unlink(target_file_path) | |
| except: | |
| pass | |
| return result | |
| except Exception as e: | |
| # Cleanup file nếu có lỗi | |
| if task_id and target_file_path and target_file_path != file_path: | |
| try: | |
| os.unlink(target_file_path) | |
| except: | |
| pass | |
| return f"File processing error: {str(e)}" | |
| def get_file_info(task_id: str = "", file_path: str = "") -> Dict[str, Any]: | |
| """ | |
| Lấy thông tin metadata của file | |
| """ | |
| target_file_path = None | |
| try: | |
| if file_path and os.path.exists(file_path): | |
| target_file_path = file_path | |
| elif task_id: | |
| target_file_path = download_file_from_api(task_id) | |
| if not target_file_path: | |
| return {"error": "Could not download file"} | |
| else: | |
| return {"error": "No file path or task_id provided"} | |
| # Lấy thông tin file | |
| file_stat = os.stat(target_file_path) | |
| file_ext = os.path.splitext(target_file_path)[1].lower() | |
| info = { | |
| "filename": os.path.basename(target_file_path), | |
| "extension": file_ext, | |
| "size_bytes": file_stat.st_size, | |
| "size_kb": round(file_stat.st_size / 1024, 2), | |
| "exists": True | |
| } | |
| # Cleanup | |
| if task_id and target_file_path != file_path: | |
| try: | |
| os.unlink(target_file_path) | |
| except: | |
| pass | |
| return info | |
| except Exception as e: | |
| return {"error": f"File info error: {str(e)}"} | |
| # Test function | |
| if __name__ == "__main__": | |
| # Test với file local (nếu có) | |
| test_file = "/path/to/test/file.py" | |
| if os.path.exists(test_file): | |
| result = read_file_content(file_path=test_file) | |
| print("File Content:", result[:200]) | |
| else: | |
| print("No test file found") | |
| # Test URL content | |
| test_url = "https://example.com/file.txt" | |
| content = get_txt_content_from_url(test_url) | |
| print("URL Content:", content[:100]) |