multimodalart HF Staff commited on
Commit
9b92b0d
·
verified ·
1 Parent(s): 0933a87

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +118 -9
app.py CHANGED
@@ -8,6 +8,79 @@ from PIL import Image
8
  from io import BytesIO
9
  import tempfile
10
  import ffmpeg
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
11
 
12
  # --- Google Gemini API Configuration ---
13
  GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "")
@@ -38,6 +111,27 @@ def verify_pro_status(token: Optional[Union[gr.OAuthToken, str]]) -> bool:
38
  print(f"Could not verify user's PRO/Enterprise status: {e}")
39
  return False
40
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
41
  def _extract_image_data_from_response(response) -> Optional[bytes]:
42
  """Helper to extract image data from the model's response."""
43
  if hasattr(response, 'candidates') and response.candidates:
@@ -118,7 +212,17 @@ def _generate_video_segment(input_image_path: str, output_image_path: str, promp
118
  return result[0]["video"]
119
 
120
  def unified_image_generator(prompt: str, images: Optional[List[str]], previous_video_path: Optional[str], last_frame_path: Optional[str], manual_token: str, oauth_token: Optional[gr.OAuthToken]) -> tuple:
121
- if not (verify_pro_status(oauth_token) or verify_pro_status(manual_token)): raise gr.Error("Access Denied.")
 
 
 
 
 
 
 
 
 
 
122
  try:
123
  contents = [Image.open(image_path[0]) for image_path in images] if images else []
124
  contents.append(prompt)
@@ -135,7 +239,7 @@ def unified_image_generator(prompt: str, images: Optional[List[str]], previous_v
135
  # The crucial check for continuity
136
  if images[0][0] == last_frame_path:
137
  can_extend_video = True
138
-
139
  return (output_path, gr.update(visible=can_create_video), gr.update(visible=can_extend_video), gr.update(visible=False))
140
  except Exception as e:
141
  raise gr.Error(f"Image generation failed: {e}. Rephrase your prompt to make image generation explicit and try again")
@@ -199,6 +303,7 @@ with gr.Blocks(theme=gr.themes.Citrus(), css=css) as demo:
199
  generate_button = gr.Button("Generate", variant="primary")
200
  with gr.Column(scale=1):
201
  output_image = gr.Image(label="Output", interactive=False, elem_id="output", type="filepath")
 
202
  use_image_button = gr.Button("♻️ Use this Image for Next Edit", variant="primary")
203
  with gr.Row():
204
  create_video_button = gr.Button("Create video between the two images 🎥", variant="secondary", visible=False)
@@ -215,15 +320,15 @@ with gr.Blocks(theme=gr.themes.Citrus(), css=css) as demo:
215
  triggers=[generate_button.click, prompt_input.submit],
216
  fn=unified_image_generator,
217
  inputs=[prompt_input, image_input_gallery, previous_video_state, last_frame_of_video_state, manual_token],
218
- outputs=[output_image, create_video_button, extend_video_button, video_group]
219
  )
220
  use_image_button.click(
221
  fn=lambda img: (
222
  [img] if img else None, None, gr.update(visible=False),
223
- gr.update(visible=False), gr.update(visible=False)
224
  ),
225
  inputs=[output_image],
226
- outputs=[image_input_gallery, output_image, create_video_button, extend_video_button, video_group]
227
  )
228
  create_video_button.click(
229
  fn=lambda: gr.update(visible=True), outputs=[video_group]
@@ -241,8 +346,12 @@ with gr.Blocks(theme=gr.themes.Citrus(), css=css) as demo:
241
  )
242
 
243
  def control_access(profile: Optional[gr.OAuthProfile] = None, oauth_token: Optional[gr.OAuthToken] = None):
244
- if not profile: return gr.update(visible=False), gr.update(visible=False)
245
- if verify_pro_status(oauth_token): return gr.update(visible=True), gr.update(visible=False)
 
 
 
 
246
  else:
247
  message = (
248
  "## ✨ Exclusive Access for PRO Users\n\n"
@@ -250,8 +359,8 @@ with gr.Blocks(theme=gr.themes.Citrus(), css=css) as demo:
250
  "To unlock this and many other cool stuff, please consider upgrading your account.\n\n"
251
  "### [**Become a PRO Today!**](http://huggingface.co/subscribe/pro?source=nana_banana)"
252
  )
253
- return gr.update(visible=False), gr.update(visible=True, value=message)
254
- demo.load(control_access, inputs=None, outputs=[main_interface, pro_message])
255
 
256
  if __name__ == "__main__":
257
  demo.queue(max_size=None, default_concurrency_limit=None).launch(show_error=True)
 
8
  from io import BytesIO
9
  import tempfile
10
  import ffmpeg
11
+ import json
12
+ from datetime import datetime, date
13
+ from pathlib import Path
14
+
15
+ # --- Database Setup ---
16
+ DATA_DIR = Path("/data")
17
+ DATA_DIR.mkdir(exist_ok=True)
18
+ USAGE_DB_PATH = DATA_DIR / "usage_limits.json"
19
+
20
+ def load_usage_db() -> dict:
21
+ """Load the usage database from disk."""
22
+ if USAGE_DB_PATH.exists():
23
+ try:
24
+ with open(USAGE_DB_PATH, 'r') as f:
25
+ return json.load(f)
26
+ except Exception as e:
27
+ print(f"Error loading usage database: {e}")
28
+ return {}
29
+ return {}
30
+
31
+ def save_usage_db(db: dict):
32
+ """Save the usage database to disk."""
33
+ try:
34
+ with open(USAGE_DB_PATH, 'w') as f:
35
+ json.dump(db, f, indent=2)
36
+ except Exception as e:
37
+ print(f"Error saving usage database: {e}")
38
+
39
+ def check_and_update_usage(username: str) -> bool:
40
+ """
41
+ Check if user has reached daily limit and update usage.
42
+ Returns True if user can generate, False if limit reached.
43
+ """
44
+ db = load_usage_db()
45
+ today = str(date.today())
46
+
47
+ # Initialize user record if not exists
48
+ if username not in db:
49
+ db[username] = {"date": today, "count": 0}
50
+
51
+ user_data = db[username]
52
+
53
+ # Reset count if it's a new day
54
+ if user_data["date"] != today:
55
+ user_data["date"] = today
56
+ user_data["count"] = 0
57
+
58
+ # Check if limit reached
59
+ if user_data["count"] >= 80:
60
+ return False
61
+
62
+ # Increment count
63
+ user_data["count"] += 1
64
+ db[username] = user_data
65
+ save_usage_db(db)
66
+
67
+ return True
68
+
69
+ def get_remaining_generations(username: str) -> int:
70
+ """Get the number of remaining generations for today."""
71
+ db = load_usage_db()
72
+ today = str(date.today())
73
+
74
+ if username not in db:
75
+ return 80
76
+
77
+ user_data = db[username]
78
+
79
+ # Reset if it's a new day
80
+ if user_data["date"] != today:
81
+ return 80
82
+
83
+ return max(0, 80 - user_data["count"])
84
 
85
  # --- Google Gemini API Configuration ---
86
  GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "")
 
111
  print(f"Could not verify user's PRO/Enterprise status: {e}")
112
  return False
113
 
114
+ def get_username(token: Optional[Union[gr.OAuthToken, str]]) -> Optional[str]:
115
+ """Get the username from the token."""
116
+ if not token:
117
+ return None
118
+
119
+ if isinstance(token, gr.OAuthToken):
120
+ token_str = token.token
121
+ elif isinstance(token, str):
122
+ token_str = token
123
+ else:
124
+ return None
125
+
126
+ try:
127
+ user_info = whoami(token=token_str)
128
+ username = user_info.get("name", None)
129
+ print(f"Username: {username}")
130
+ return username
131
+ except Exception as e:
132
+ print(f"Could not get username: {e}")
133
+ return None
134
+
135
  def _extract_image_data_from_response(response) -> Optional[bytes]:
136
  """Helper to extract image data from the model's response."""
137
  if hasattr(response, 'candidates') and response.candidates:
 
212
  return result[0]["video"]
213
 
214
  def unified_image_generator(prompt: str, images: Optional[List[str]], previous_video_path: Optional[str], last_frame_path: Optional[str], manual_token: str, oauth_token: Optional[gr.OAuthToken]) -> tuple:
215
+ if not (verify_pro_status(oauth_token) or verify_pro_status(manual_token)):
216
+ raise gr.Error("Access Denied.")
217
+
218
+ # Check rate limit
219
+ username = get_username(oauth_token) or get_username(manual_token)
220
+ if not username:
221
+ raise gr.Error("Could not identify user.")
222
+
223
+ if not check_and_update_usage(username):
224
+ raise gr.Error("This demo is made for interactive generations, not automated workflows. You have generated 80 images today, come back tomorrow for more.")
225
+
226
  try:
227
  contents = [Image.open(image_path[0]) for image_path in images] if images else []
228
  contents.append(prompt)
 
239
  # The crucial check for continuity
240
  if images[0][0] == last_frame_path:
241
  can_extend_video = True
242
+
243
  return (output_path, gr.update(visible=can_create_video), gr.update(visible=can_extend_video), gr.update(visible=False))
244
  except Exception as e:
245
  raise gr.Error(f"Image generation failed: {e}. Rephrase your prompt to make image generation explicit and try again")
 
303
  generate_button = gr.Button("Generate", variant="primary")
304
  with gr.Column(scale=1):
305
  output_image = gr.Image(label="Output", interactive=False, elem_id="output", type="filepath")
306
+ status_message = gr.Markdown("", visible=True)
307
  use_image_button = gr.Button("♻️ Use this Image for Next Edit", variant="primary")
308
  with gr.Row():
309
  create_video_button = gr.Button("Create video between the two images 🎥", variant="secondary", visible=False)
 
320
  triggers=[generate_button.click, prompt_input.submit],
321
  fn=unified_image_generator,
322
  inputs=[prompt_input, image_input_gallery, previous_video_state, last_frame_of_video_state, manual_token],
323
+ outputs=[output_image, create_video_button, extend_video_button, video_group, status_message]
324
  )
325
  use_image_button.click(
326
  fn=lambda img: (
327
  [img] if img else None, None, gr.update(visible=False),
328
+ gr.update(visible=False), gr.update(visible=False), ""
329
  ),
330
  inputs=[output_image],
331
+ outputs=[image_input_gallery, output_image, create_video_button, extend_video_button, video_group, status_message]
332
  )
333
  create_video_button.click(
334
  fn=lambda: gr.update(visible=True), outputs=[video_group]
 
346
  )
347
 
348
  def control_access(profile: Optional[gr.OAuthProfile] = None, oauth_token: Optional[gr.OAuthToken] = None):
349
+ if not profile: return gr.update(visible=False), gr.update(visible=False), ""
350
+ if verify_pro_status(oauth_token):
351
+ username = get_username(oauth_token)
352
+ remaining = get_remaining_generations(username) if username else 80
353
+ status = f"Welcome! You have {remaining} generations remaining today."
354
+ return gr.update(visible=True), gr.update(visible=False), status
355
  else:
356
  message = (
357
  "## ✨ Exclusive Access for PRO Users\n\n"
 
359
  "To unlock this and many other cool stuff, please consider upgrading your account.\n\n"
360
  "### [**Become a PRO Today!**](http://huggingface.co/subscribe/pro?source=nana_banana)"
361
  )
362
+ return gr.update(visible=False), gr.update(visible=True, value=message), ""
363
+ demo.load(control_access, inputs=None, outputs=[main_interface, pro_message, status_message])
364
 
365
  if __name__ == "__main__":
366
  demo.queue(max_size=None, default_concurrency_limit=None).launch(show_error=True)