Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| import discord | |
| import os | |
| import threading | |
| import gradio as gr | |
| import requests | |
| import json | |
| import random | |
| import time | |
| import re | |
| from discord import Embed, Color | |
| from discord.ext import commands | |
| from gradio_client import Client | |
| from PIL import Image | |
| from ratelimiter import RateLimiter | |
| from datetime import datetime # for times | |
| from pytz import timezone # for times | |
| import asyncio # check if used | |
| zurich_tz = timezone("Europe/Zurich") | |
| def convert_to_timezone(dt, tz): | |
| return dt.astimezone(tz).strftime("%Y-%m-%d %H:%M:%S %Z") | |
| DISCORD_TOKEN = os.environ.get("DISCORD_TOKEN", None) | |
| intents = discord.Intents.all() | |
| bot = commands.Bot(command_prefix='!', intents=intents) | |
| #rate_limiter = RateLimiter(max_calls=10, period=60) # needs testing | |
| # stats stuff --------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| number_of_messages = 0 | |
| user_cooldowns = {} | |
| async def on_message(message): | |
| try: | |
| global number_of_messages | |
| if message.author != bot.user: | |
| user = bot.get_user(811235357663297546) #811235357663297546 | |
| number_of_messages = number_of_messages + 1 | |
| message_link = f"[#{message.channel.name}]({message.jump_url})" | |
| dm_message = await user.send(f"{number_of_messages}| {message_link} |{message.author}: {message.content}") | |
| ### antispam experiment | |
| cooldown_duration = 1 # 1 message per second | |
| if message.author.id not in user_cooldowns: | |
| user_cooldowns[message.author.id] = {'count': 1, 'timestamp': message.created_at} | |
| else: | |
| if (message.created_at - user_cooldowns[message.author.id]['timestamp']).total_seconds() > cooldown_duration: | |
| var1 = message.created_at | |
| var2 = user_cooldowns[message.author.id]['timestamp'] | |
| print(f"seconds since last message by {message.author}: ({var1} - {var2}).seconds = {(var1 - var2).total_seconds()}") | |
| # if we wait longer than cooldown_duration, count will reset | |
| user_cooldowns[message.author.id] = {'count': 1, 'timestamp': message.created_at} | |
| else: | |
| user_cooldowns[message.author.id]['count'] += 1 | |
| # tldr; if we post 2 messages with less than 1s between them | |
| if user_cooldowns[message.author.id]['count'] > 1: | |
| var1 = message.created_at | |
| var2 = user_cooldowns[message.author.id]['timestamp'] | |
| print(f"seconds since last message by {message.author}: {(var1 - var2).total_seconds()}") | |
| spam_count = user_cooldowns[message.author.id]['count'] | |
| print(f"count: {user_cooldowns[message.author.id]['count']}") | |
| test_server = os.environ.get('TEST_SERVER') | |
| if test_server == 'True': | |
| alert = "<@&1106995261487710411>" # test @alerts role | |
| if test_server == 'False': | |
| alert = "<@&1108342563628404747>" # normal @alerts role | |
| await bot.log_channel.send( | |
| f"[EXPERIMENTAL ALERT] {message.author.mention} is posting too quickly! \n" | |
| f"Spam count: {user_cooldowns[message.author.id]['count']}\n" | |
| f"Message content: {message.content}\n" | |
| f"[Jump to message!](https://discord.com/channels/{message.guild.id}/{message.channel.id}/{message.id})\n" | |
| f"{alert}" | |
| ) | |
| """ | |
| if user_cooldowns[message.author.id]['count']/5 > cooldown_duration: | |
| # ping admins | |
| # timeout for user | |
| # kick user | |
| """ | |
| user_cooldowns[message.author.id]['timestamp'] = message.created_at | |
| # Allow other event handlers to process the message | |
| await bot.process_commands(message) | |
| except Exception as e: | |
| print(f"on_message Error: {e}") | |
| # moderation stuff----------------------------------------------------------------------------------------------------------------------------------------------------- | |
| async def on_message_edit(before, after): | |
| try: | |
| if before.author == bot.user: | |
| return | |
| if before.content != after.content: | |
| embed = Embed(color=Color.orange()) | |
| embed.set_author(name=f"{before.author} ID: {before.author.id}", icon_url=before.author.avatar.url if before.author.avatar else bot.user.avatar.url) | |
| embed.title = "Message Edited" | |
| embed.description = f"**Before:** {before.content or '*(empty message)*'}\n**After:** {after.content or '*(empty message)*'}" | |
| embed.add_field(name="Author Username", value=before.author.name, inline=True) | |
| embed.add_field(name="Channel", value=before.channel.mention, inline=True) | |
| #embed.add_field(name="Message Created On", value=before.created_at.strftime("%Y-%m-%d %H:%M:%S UTC"), inline=True) | |
| embed.add_field(name="Message Created On", value=convert_to_timezone(before.created_at, zurich_tz), inline=True) | |
| embed.add_field(name="Message ID", value=before.id, inline=True) | |
| embed.add_field(name="Message Jump URL", value=f"[Jump to message!](https://discord.com/channels/{before.guild.id}/{before.channel.id}/{before.id})", inline=True) | |
| if before.attachments: | |
| attachment_urls = "\n".join([attachment.url for attachment in before.attachments]) | |
| embed.add_field(name="Attachments", value=attachment_urls, inline=False) | |
| #embed.set_footer(text=f"{datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}") | |
| embed.set_footer(text=f"{convert_to_timezone(datetime.utcnow(), zurich_tz)}") | |
| await bot.log_channel.send(embed=embed) | |
| except Exception as e: | |
| print(f"on_message_edit Error: {e}") | |
| async def on_message_delete(message): | |
| try: | |
| if message.author == bot.user: | |
| return | |
| embed = Embed(color=Color.red()) | |
| embed.set_author(name=f"{message.author} ID: {message.author.id}", icon_url=message.author.avatar.url if message.author.avatar else bot.user.avatar.url) | |
| embed.title = "Message Deleted" | |
| embed.description = message.content or "*(empty message)*" | |
| embed.add_field(name="Author Username", value=message.author.name, inline=True) | |
| embed.add_field(name="Channel", value=message.channel.mention, inline=True) | |
| #embed.add_field(name="Message Created On", value=message.created_at.strftime("%Y-%m-%d %H:%M:%S UTC"), inline=True) | |
| embed.add_field(name="Message Created On", value=convert_to_timezone(message.created_at, zurich_tz), inline=True) | |
| embed.add_field(name="Message ID", value=message.id, inline=True) | |
| embed.add_field(name="Message Jump URL", value=f"[Jump to message!](https://discord.com/channels/{message.guild.id}/{message.channel.id}/{message.id})", inline=True) | |
| if message.attachments: | |
| attachment_urls = "\n".join([attachment.url for attachment in message.attachments]) | |
| embed.add_field(name="Attachments", value=attachment_urls, inline=False) | |
| #embed.set_footer(text=f"{datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}") | |
| embed.set_footer(text=f"{convert_to_timezone(datetime.utcnow(), zurich_tz)}") | |
| await bot.log_channel.send(embed=embed) | |
| except Exception as e: | |
| print(f"on_message_delete Error: {e}") | |
| # nickname stuff --------------------------------------------------------------------------------------------------------------------------- | |
| async def on_member_update(before, after): | |
| try: | |
| """ | |
| if before.name != after.name: | |
| async for entry in before.guild.audit_logs(limit=5): | |
| print(f'{entry.user} did {entry.action} to {entry.target}') | |
| """ | |
| if before.nick != after.nick: | |
| embed = Embed(color=Color.orange()) | |
| embed.set_author(name=f"{after} ID: {after.id}", icon_url=after.avatar.url if after.avatar else bot.user.avatar.url) | |
| embed.title = "Nickname Modified" | |
| embed.add_field(name="Mention", value=after.mention, inline=True) | |
| embed.add_field(name="Old", value=before.nick, inline=True) | |
| embed.add_field(name="New", value=after.nick, inline=True) | |
| embed.set_footer(text=f"{convert_to_timezone(datetime.utcnow(), zurich_tz)}") | |
| await bot.log_channel.send(embed=embed) | |
| except Exception as e: | |
| print(f"on_member_update Error: {e}") | |
| async def on_member_ban(guild, banned_user): | |
| try: | |
| await asyncio.sleep(5) | |
| entry1 = await guild.fetch_ban(banned_user) | |
| ban_reason = entry1.reason | |
| print(f"ban_reason: {ban_reason}") | |
| async for entry2 in guild.audit_logs(action=discord.AuditLogAction.ban, limit=1): | |
| if ban_reason: | |
| print(f'{entry2.user} banned {entry2.target} for {ban_reason}') | |
| else: | |
| print(f'{entry2.user} banned {entry2.target} (no reason specified)') | |
| #ban_reason = f"Reason: {ban_reason}" | |
| # HF 1108342563628404747 | |
| # test 1106995261487710411 | |
| content = "<@&1108342563628404747>" # @alerts role | |
| embed = Embed(color=Color.red()) | |
| embed.set_author(name=f"{banned_user} ID: {banned_user.id}", icon_url=banned_user.avatar.url if banned_user.avatar else bot.user.avatar.url) | |
| embed.title = "User Banned" | |
| embed.add_field(name="User", value=banned_user.mention, inline=True) | |
| embed.add_field(name="Nickname", value=banned_user.nick, inline=True) | |
| embed.add_field(name="Account Created At", value=banned_user.created_at, inline=True) | |
| embed.add_field(name="Moderator", value=moderator.mention, inline=True) | |
| embed.add_field(name="Nickname", value=moderator.nick, inline=True) | |
| embed.add_field(name="Reason", value=ban_reason, inline=False) | |
| embed.set_footer(text=f"{convert_to_timezone(datetime.utcnow(), zurich_tz)}") | |
| #user = bot.get_user(811235357663297546) | |
| #dm_message = await user.send(content=content, embed=embed) | |
| await bot.log_channel.send(content=content, embed=embed) | |
| except Exception as e: | |
| print(f"on_member_ban Error: {e}") | |
| async def on_member_unban(guild, unbanned_user): | |
| try: | |
| await asyncio.sleep(5) | |
| async for entry in guild.audit_logs(action=discord.AuditLogAction.unban, limit=1): | |
| if unbanned_user == entry.target: # verify that unbanned user is in audit log | |
| moderator = entry.user | |
| created_and_age = f"{unbanned_user.created_at}" | |
| content = "<@&1108342563628404747>" # @alerts role | |
| embed = Embed(color=Color.red()) | |
| embed.set_author(name=f"{unbanned_user} ID: {unbanned_user.id}", icon_url=unbanned_user.avatar.url if unbanned_user.avatar else bot.user.avatar.url) | |
| embed.title = "User Unbanned" | |
| embed.add_field(name="User", value=unbanned_user.mention, inline=True) | |
| embed.add_field(name="Account Created At", value=created_and_age, inline=True) | |
| embed.add_field(name="Moderator", value=moderator.mention, inline=True) | |
| embed.add_field(name="Nickname", value=moderator.nick, inline=True) | |
| embed.set_footer(text=f"{convert_to_timezone(datetime.utcnow(), zurich_tz)}") | |
| #user = bot.get_user(811235357663297546) | |
| #dm_message = await user.send(content=content, embed=embed) | |
| await bot.log_channel.send(content=content, embed=embed) | |
| except Exception as e: | |
| print(f"on_member_unban Error: {e}") | |
| # admin stuff----------------------------------------------------------------------------------------------------------------------- | |
| async def on_member_join(member): | |
| try: | |
| await asyncio.sleep(5) | |
| embed = Embed(color=Color.blue()) | |
| avatar_url = member.avatar.url if member.avatar else bot.user.avatar.url | |
| embed.set_author(name=f"{member} ID: {member.id}", icon_url=avatar_url) | |
| embed.title = "User Joined" | |
| embed.add_field(name="Mention", value=member.mention, inline=True) | |
| embed.add_field(name="Nickname", value=member.nick, inline=True) | |
| embed.add_field(name="Account Created At", value=member.created_at, inline=True) | |
| embed.set_footer(text=f"{convert_to_timezone(datetime.utcnow(), zurich_tz)}") | |
| await bot.log_channel.send(embed=embed) | |
| except Exception as e: | |
| print(f"on_member_join Error: {e}") | |
| async def on_member_remove(member): | |
| try: | |
| embed = Embed(color=Color.blue()) | |
| embed.set_author(name=f"{member} ID: {member.id}", icon_url=member.avatar.url if member.avatar else bot.user.avatar.url) | |
| embed.title = "User Left" | |
| embed.add_field(name="Mention", value=member.mention, inline=True) | |
| embed.add_field(name="Nickname", value=member.nick, inline=True) | |
| embed.add_field(name="Account Created At", value=member.created_at, inline=True) | |
| embed.set_footer(text=f"{convert_to_timezone(datetime.utcnow(), zurich_tz)}") | |
| await bot.log_channel.send(embed=embed) | |
| except Exception as e: | |
| print(f"on_member_remove Error: {e}") | |
| async def on_guild_channel_create(channel): | |
| try: | |
| # creating channels | |
| embed = Embed(description=f'Channel {channel.mention} was created', color=Color.green()) | |
| await bot.log_channel.send(embed=embed) | |
| except Exception as e: | |
| print(f"on_guild_channel_create Error: {e}") | |
| async def on_guild_channel_delete(channel): | |
| try: | |
| # deleting channels, should ping @alerts for this | |
| embed = Embed(description=f'Channel {channel.name} ({channel.mention}) was deleted', color=Color.red()) | |
| await bot.log_channel.send(embed=embed) | |
| except Exception as e: | |
| print(f"on_guild_channel_delete Error: {e}") | |
| async def on_guild_role_create(role): | |
| try: | |
| # creating roles | |
| embed = Embed(description=f'Role {role.mention} was created', color=Color.green()) | |
| await bot.log_channel.send(embed=embed) | |
| except Exception as e: | |
| print(f"on_guild_role_create Error: {e}") | |
| async def on_guild_role_delete(role): | |
| try: | |
| # deleting roles, should ping @alerts for this | |
| embed = Embed(description=f'Role {role.name} ({role.mention}) was deleted', color=Color.red()) | |
| await bot.log_channel.send(embed=embed) | |
| except Exception as e: | |
| print(f"on_guild_role_delete Error: {e}") | |
| async def on_guild_role_update(before, after): | |
| try: | |
| # editing roles, could expand this | |
| if before.name != after.name: | |
| embed = Embed(description=f'Role {before.mention} was renamed to {after.name}', color=Color.orange()) | |
| await bot.log_channel.send(embed=embed) | |
| if before.permissions.administrator != after.permissions.administrator: | |
| # changes involving the administrator permission / sensitive permissions (can help to prevent mistakes) | |
| content = "<@&1108342563628404747>" # @alerts role | |
| embed = Embed(description=f'Role {after.mention} had its administrator permission {"enabled" if after.permissions.administrator else "disabled"}', color=Color.red()) | |
| await bot.log_channel.send(content=content, embed=embed) | |
| except Exception as e: | |
| print(f"on_guild_role_update Error: {e}") | |
| async def on_voice_state_update(member, before, after): | |
| try: | |
| if before.mute != after.mute: | |
| # muting members | |
| embed = Embed(description=f'{member} was {"muted" if after.mute else "unmuted"} in voice chat', color=Color.orange()) | |
| await bot.log_channel.send(embed=embed) | |
| if before.deaf != after.deaf: | |
| # deafening members | |
| embed = Embed(description=f'{member} was {"deafened" if after.deaf else "undeafened"} in voice chat', color=Color.orange()) | |
| await bot.log_channel.send(embed=embed) | |
| except Exception as e: | |
| print(f"on_voice_state_update Error: {e}") | |
| # github test stuff ------------------------------------------------------------------------------------------------------------------- | |
| """ | |
| async def check_github(): | |
| url = f'https://api.github.com/repos/{github_repo}/pulls' | |
| response = requests.get(url) | |
| pulls = response.json() | |
| for pull in pulls: | |
| # Check if the pull request was just opened | |
| if pull['state'] == 'open' and pull['created_at'] == pull['updated_at']: | |
| channel = client.get_channel(channel_id) | |
| if channel: | |
| await channel.send(f'New PR opened: {pull["title"]}') | |
| """ | |
| # bot stuff --------------------------------------------------------------------------------------------------------------------------- | |
| async def on_ready(): | |
| print('Logged on as', bot.user) | |
| bot.log_channel = bot.get_channel(1036960509586587689) # admin-logs | |
| print(bot.log_channel) | |
| def run_bot(): | |
| bot.run(DISCORD_TOKEN) | |
| threading.Thread(target=run_bot).start() | |
| def greet(name): | |
| return "Hello " + name + "!" | |
| demo = gr.Interface(fn=greet, inputs="text", outputs="text") | |
| demo.launch() |