final_agent_course / utils /file_tool.py
tuan3335's picture
structure code
92d2175
raw
history blame
8.22 kB
"""
File Tool - Xử lý các loại file khác nhau
"""
import os
import tempfile
import requests
import pandas as pd
from typing import Optional, Dict, Any
def get_txt_content_from_url(url: str) -> str:
"""
Lấy nội dung file .txt từ URL (dành cho transcript link hoặc file text)
"""
try:
response = requests.get(url, timeout=30)
response.raise_for_status()
return response.text
except Exception as e:
return f"Error downloading text file: {str(e)}"
def download_file_from_api(task_id: str) -> Optional[str]:
"""
Download file từ API với task_id
"""
try:
api_url = "https://agents-course-unit4-scoring.hf.space"
file_url = f"{api_url}/files/{task_id}"
response = requests.get(file_url, timeout=30)
if response.status_code == 200:
# Determine file extension dựa trên content-type
content_type = response.headers.get('content-type', '')
if 'python' in content_type or 'text/plain' in content_type:
suffix = '.py'
elif 'excel' in content_type or 'spreadsheet' in content_type:
suffix = '.xlsx'
elif 'csv' in content_type:
suffix = '.csv'
elif 'json' in content_type:
suffix = '.json'
elif 'xml' in content_type:
suffix = '.xml'
elif 'html' in content_type:
suffix = '.html'
else:
suffix = '.txt' # Default
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file:
tmp_file.write(response.content)
return tmp_file.name
else:
return None
except Exception as e:
print(f"Error downloading file: {e}")
return None
def read_python_file(file_path: str) -> str:
"""
Đọc và phân tích file Python
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Đếm dòng code
lines = content.split('\n')
code_lines = [line for line in lines if line.strip() and not line.strip().startswith('#')]
return f"""Python Code Analysis:
Filename: {os.path.basename(file_path)}
Total lines: {len(lines)}
Code lines: {len(code_lines)}
Content:
{content}"""
except Exception as e:
return f"Error reading Python file: {str(e)}"
def read_excel_file(file_path: str) -> str:
"""
Đọc và phân tích file Excel
"""
try:
# Đọc tất cả sheets
excel_file = pd.ExcelFile(file_path)
sheet_names = excel_file.sheet_names
result = f"Excel File Analysis:\nFilename: {os.path.basename(file_path)}\nSheets: {sheet_names}\n\n"
for sheet_name in sheet_names:
df = pd.read_excel(file_path, sheet_name=sheet_name)
result += f"Sheet '{sheet_name}':\n"
result += f"Shape: {df.shape}\n"
result += f"Columns: {list(df.columns)}\n"
result += f"Data preview:\n{df.head().to_string()}\n\n"
# Nếu file quá lớn, chỉ hiển thị 3 sheets đầu
if len(sheet_names) > 3:
result += "... (showing first 3 sheets)\n"
break
return result
except Exception as e:
return f"Error reading Excel file: {str(e)}"
def read_csv_file(file_path: str) -> str:
"""
Đọc và phân tích file CSV
"""
try:
df = pd.read_csv(file_path)
return f"""CSV File Analysis:
Filename: {os.path.basename(file_path)}
Shape: {df.shape}
Columns: {list(df.columns)}
Data preview:
{df.head().to_string()}
Data types:
{df.dtypes.to_string()}"""
except Exception as e:
return f"Error reading CSV file: {str(e)}"
def read_text_file(file_path: str) -> str:
"""
Đọc file text thường
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Giới hạn độ dài hiển thị
if len(content) > 5000:
content = content[:5000] + "\n... (content truncated)"
return f"""Text File Content:
Filename: {os.path.basename(file_path)}
Size: {len(content)} characters
Content:
{content}"""
except Exception as e:
return f"Error reading text file: {str(e)}"
def read_file_content(task_id: str = "", file_path: str = "") -> str:
"""
Main function: Đọc nội dung file từ task_id hoặc file_path
Args:
task_id: ID để download file từ API
file_path: Đường dẫn file local (nếu có)
Returns:
Nội dung file đã được phân tích
"""
target_file_path = None
try:
# Xác định đường dẫn file
if file_path and os.path.exists(file_path):
target_file_path = file_path
elif task_id:
target_file_path = download_file_from_api(task_id)
if not target_file_path:
return "Error: Could not download file"
else:
return "Error: No file path or task_id provided"
# Kiểm tra file tồn tại
if not os.path.exists(target_file_path):
return "Error: File not found"
# Đọc file dựa trên extension
file_ext = os.path.splitext(target_file_path)[1].lower()
if file_ext == '.py':
result = read_python_file(target_file_path)
elif file_ext in ['.xlsx', '.xls']:
result = read_excel_file(target_file_path)
elif file_ext == '.csv':
result = read_csv_file(target_file_path)
else:
result = read_text_file(target_file_path)
# Cleanup downloaded file nếu cần
if task_id and target_file_path != file_path:
try:
os.unlink(target_file_path)
except:
pass
return result
except Exception as e:
# Cleanup file nếu có lỗi
if task_id and target_file_path and target_file_path != file_path:
try:
os.unlink(target_file_path)
except:
pass
return f"File processing error: {str(e)}"
def get_file_info(task_id: str = "", file_path: str = "") -> Dict[str, Any]:
"""
Lấy thông tin metadata của file
"""
target_file_path = None
try:
if file_path and os.path.exists(file_path):
target_file_path = file_path
elif task_id:
target_file_path = download_file_from_api(task_id)
if not target_file_path:
return {"error": "Could not download file"}
else:
return {"error": "No file path or task_id provided"}
# Lấy thông tin file
file_stat = os.stat(target_file_path)
file_ext = os.path.splitext(target_file_path)[1].lower()
info = {
"filename": os.path.basename(target_file_path),
"extension": file_ext,
"size_bytes": file_stat.st_size,
"size_kb": round(file_stat.st_size / 1024, 2),
"exists": True
}
# Cleanup
if task_id and target_file_path != file_path:
try:
os.unlink(target_file_path)
except:
pass
return info
except Exception as e:
return {"error": f"File info error: {str(e)}"}
# Test function
if __name__ == "__main__":
# Test với file local (nếu có)
test_file = "/path/to/test/file.py"
if os.path.exists(test_file):
result = read_file_content(file_path=test_file)
print("File Content:", result[:200])
else:
print("No test file found")
# Test URL content
test_url = "https://example.com/file.txt"
content = get_txt_content_from_url(test_url)
print("URL Content:", content[:100])