acecalisto3 commited on
Commit
ad1c93f
·
verified ·
1 Parent(s): be90db0

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +86 -31
app.py CHANGED
@@ -9,6 +9,8 @@ import datetime
9
  import zipfile
10
  import nltk.data
11
  import nltk
 
 
12
 
13
  # Ensure the 'punkt' tokenizer is downloaded only if missing
14
  try:
@@ -43,6 +45,36 @@ DATASET_URL = f"https://huggingface.co/datasets/{REPO_NAME}/raw/main/"
43
  MAX_TOKENS = 8192
44
 
45
  # Utility Functions
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
46
  def read_pdf(file_path):
47
  try:
48
  reader = PdfReader(file_path)
@@ -52,6 +84,35 @@ def read_pdf(file_path):
52
  log(f"Error reading PDF {file_path}: {e}")
53
  return ""
54
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
55
  def fetch_google_doc(url):
56
  if "docs.google.com/document/d/" in url:
57
  # Extract document ID
@@ -79,8 +140,31 @@ def fetch_url(url, max_depth):
79
  continue
80
  if depth < max_depth:
81
  visited.add(current_url)
82
- # Check if it's a Google Doc
83
- if "docs.google.com/document/d/" in current_url:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
84
  doc_content = fetch_google_doc(current_url)
85
  if doc_content:
86
  results.append(doc_content)
@@ -101,35 +185,6 @@ def fetch_url(url, max_depth):
101
  errors.append(f"Error fetching {current_url}: {e}")
102
  return "\n".join(results), "\n".join(errors)
103
 
104
- def read_txt(txt_path):
105
- try:
106
- with open(txt_path, "r", encoding="utf-8") as f:
107
- return f.read()
108
- except Exception as e:
109
- log(f"Error reading TXT file {txt_path}: {e}")
110
- return ""
111
-
112
- def read_zip(zip_path):
113
- try:
114
- extracted_data = []
115
- with zipfile.ZipFile(zip_path, 'r') as zip_ref:
116
- for file_info in zip_ref.infolist():
117
- if file_info.filename.endswith((".txt", ".pdf")):
118
- with zip_ref.open(file_info) as file:
119
- content = file.read()
120
- if file_info.filename.endswith(".txt"):
121
- extracted_data.append(content.decode("utf-8"))
122
- elif file_info.filename.endswith(".pdf"):
123
- temp_path = f"/tmp/{uuid.uuid4()}"
124
- with open(temp_path, "wb") as temp_file:
125
- temp_file.write(content)
126
- extracted_data.append(read_pdf(temp_path))
127
- os.remove(temp_path)
128
- return "\n".join(extracted_data)
129
- except Exception as e:
130
- log(f"Error reading ZIP file {zip_path}: {e}")
131
- return ""
132
-
133
  def process_file(file):
134
  try:
135
  if file.name.endswith(".pdf"):
 
9
  import zipfile
10
  import nltk.data
11
  import nltk
12
+ import tempfile
13
+ import shutil
14
 
15
  # Ensure the 'punkt' tokenizer is downloaded only if missing
16
  try:
 
45
  MAX_TOKENS = 8192
46
 
47
  # Utility Functions
48
+
49
+ def get_file_id_from_google_drive_url(url):
50
+ if "drive.google.com" in url and "file/d/" in url:
51
+ parts = url.split("/file/d/")
52
+ if len(parts) < 2:
53
+ return None
54
+ file_id = parts[1].split("/")[0].split("?")[0]
55
+ return file_id
56
+ return None
57
+
58
+ def download_google_drive_file(file_id):
59
+ download_url = f"https://drive.google.com/uc?id={file_id}"
60
+ try:
61
+ response = requests.get(download_url, stream=True)
62
+ response.raise_for_status()
63
+ content_disposition = response.headers.get('Content-Disposition')
64
+ if content_disposition:
65
+ filename = content_disposition.split("filename=")[1].strip('"')
66
+ else:
67
+ filename = f"file_{uuid.uuid4()}"
68
+ temp_dir = tempfile.mkdtemp()
69
+ file_path = os.path.join(temp_dir, filename)
70
+ with open(file_path, "wb") as f:
71
+ for chunk in response.iter_content(chunk_size=8192):
72
+ f.write(chunk)
73
+ return file_path, temp_dir
74
+ except Exception as e:
75
+ log(f"Error downloading Google Drive file {file_id}: {e}")
76
+ return None, None
77
+
78
  def read_pdf(file_path):
79
  try:
80
  reader = PdfReader(file_path)
 
84
  log(f"Error reading PDF {file_path}: {e}")
85
  return ""
86
 
87
+ def read_txt(txt_path):
88
+ try:
89
+ with open(txt_path, "r", encoding="utf-8") as f:
90
+ return f.read()
91
+ except Exception as e:
92
+ log(f"Error reading TXT file {txt_path}: {e}")
93
+ return ""
94
+
95
+ def read_zip(zip_path):
96
+ try:
97
+ extracted_data = []
98
+ with zipfile.ZipFile(zip_path, 'r') as zip_ref:
99
+ for file_info in zip_ref.infolist():
100
+ if file_info.filename.endswith((".txt", ".pdf")):
101
+ with zip_ref.open(file_info) as file:
102
+ content = file.read()
103
+ if file_info.filename.endswith(".txt"):
104
+ extracted_data.append(content.decode("utf-8"))
105
+ elif file_info.filename.endswith(".pdf"):
106
+ temp_path = os.path.join(tempfile.mkdtemp(), file_info.filename)
107
+ with open(temp_path, "wb") as temp_file:
108
+ temp_file.write(content)
109
+ extracted_data.append(read_pdf(temp_path))
110
+ os.remove(temp_path)
111
+ return "\n".join(extracted_data)
112
+ except Exception as e:
113
+ log(f"Error reading ZIP file {zip_path}: {e}")
114
+ return ""
115
+
116
  def fetch_google_doc(url):
117
  if "docs.google.com/document/d/" in url:
118
  # Extract document ID
 
140
  continue
141
  if depth < max_depth:
142
  visited.add(current_url)
143
+ # Check if it's a Google Drive file URL
144
+ if "drive.google.com/file/d/" in current_url:
145
+ file_id = get_file_id_from_google_drive_url(current_url)
146
+ if file_id:
147
+ file_path, temp_dir = download_google_drive_file(file_id)
148
+ if file_path:
149
+ file_ext = os.path.splitext(file_path)[1].lower()
150
+ if file_ext == ".pdf":
151
+ pdf_text = read_pdf(file_path)
152
+ results.append(pdf_text)
153
+ elif file_ext == ".txt":
154
+ txt_content = read_txt(file_path)
155
+ results.append(txt_content)
156
+ elif file_ext == ".zip":
157
+ zip_content = read_zip(file_path)
158
+ results.append(zip_content)
159
+ else:
160
+ errors.append(f"Unsupported file type for URL: {current_url}")
161
+ shutil.rmtree(temp_dir)
162
+ else:
163
+ errors.append(f"Failed to download file from URL: {current_url}")
164
+ else:
165
+ errors.append(f"Invalid Google Drive URL: {current_url}")
166
+ # Check if it's a Google Doc URL
167
+ elif "docs.google.com/document/d/" in current_url:
168
  doc_content = fetch_google_doc(current_url)
169
  if doc_content:
170
  results.append(doc_content)
 
185
  errors.append(f"Error fetching {current_url}: {e}")
186
  return "\n".join(results), "\n".join(errors)
187
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
188
  def process_file(file):
189
  try:
190
  if file.name.endswith(".pdf"):