import discord from discord.ext import commands, tasks from discord.utils import get import shutil import json import random import time import os import io import base64 import asyncio import sys import subprocess import math from PIL import Image, PngImagePlugin from dotenv import load_dotenv import matplotlib.pyplot as plt import aiohttp import aioftp import asyncssh #Stable Diffusion #Set this env variable to http://host:port or "disabled" #os.getenv('stablediffusion_url') #env vars START load_dotenv() imgflip_username = os.getenv('imgflip_username') imgflip_password = os.getenv('imgflip_password') discord_token = os.getenv('discord_token') ftp_server = os.getenv('ftp_server') ftp_username = os.getenv('ftp_username') ftp_password = os.getenv('ftp_password') ftp_public_html = os.getenv('ftp_public_html') #env vars END #discord setup START intents = discord.Intents.default() intents.message_content = True bot = commands.Bot(command_prefix='!', intents=intents) #discord setup END @bot.command( description="Moderate", help="This currently tool works by replacing the filename on the ftp server with a black image. The description will remain the same and may need to be altered.", brief="Moderation Tools" ) async def moderate(ctx, filename): await upload_sftp("blank_image.png", (os.getenv('ftp_public_html') + 'ai-images/'), filename) output = "Image " + filename + " replaced" await ctx.send(output) async def upload_ftp(local_filename, server_folder, server_filename): client = aioftp.Client() await client.connect(ftp_server) await client.login(ftp_username, ftp_password) await client.change_directory(server_folder) await client.upload(local_filename, server_folder+server_filename, write_into=True) await client.quit() async def upload_sftp(local_filename, server_folder, server_filename): remotepath = server_folder + server_filename async with asyncssh.connect(ftp_server, username=ftp_username, password=ftp_password) as conn: async with conn.start_sftp_client() as sftp: await sftp.put(local_filename, remotepath=remotepath) async def handle_error(error): print(error) current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) log_line = current_time + ': ' + str(error) + '\n' with open("databases/error_log.txt", 'a') as f: f.write(log_line) return error async def upload_ftp_ai_images(filename, prompt): html_file = "phixxy.com/ai-images/index.html" html_insert = '''
{content}
" content = content.replace('\n\n', "") content = content.replace("
", '') post_div = post_div.replace("", title) post_div = post_div.replace("", date) post_div = post_div.replace("", content) html_data = html_data.replace("", post_div) with open(filename, 'w', encoding="utf-8") as f: f.write(html_data) await upload_sftp(filename, (os.getenv('ftp_public_html') + 'ai-blog/'), "index.html") run_time = time.time() - start_time print("It took " + str(run_time) + " seconds to generate the blog post!") output = "Blog Updated! (" + str(run_time) + " seconds) https://ai.phixxy.com/ai-blog" #output += '\nNotifying subscribers: ' #for subscriber in blog_subscribers: # output += '<@' + subscriber + '> ' await ctx.send(output) @bot.command( description="Question", help="Ask a raw chatgpt question. Usage: !question (question)", brief="Get an answer" ) async def question(ctx): question = ctx.message.content.split(" ", maxsplit=1)[1] answer = await answer_question(question) chunks = [answer[i:i+1999] for i in range(0, len(answer), 1999)] for chunk in chunks: await ctx.send(chunk) @bot.command( description="Question GPT4", help="Ask GPT4 a question. Usage: !question_gpt4 (question)", brief="Get an answer" ) async def question_gpt4(ctx): question = ctx.message.content.split(" ", maxsplit=1)[1] answer = await answer_question(question, "gpt-4-vision-preview") chunks = [answer[i:i+1999] for i in range(0, len(answer), 1999)] for chunk in chunks: await ctx.send(chunk) @bot.command( description="Image GPT4", help="Ask GPT4 a question about an image. Usage: !question_gpt4 (link) (question)", brief="Get an answer" ) async def looker(ctx): image_link = ctx.message.content.split(" ", maxsplit=2)[1] question = ctx.message.content.split(" ", maxsplit=2)[2] headers = { 'Content-Type': 'application/json', 'Authorization': f'Bearer {os.getenv("openai.api_key")}', } data = { "model": "gpt-4-vision-preview", "messages": [{"role": "user", "content": [{"type": "text", "text": question},{"type": "image_url","image_url": {"url": image_link}}]}], "max_tokens": 500 } url = "https://api.openai.com/v1/chat/completions" try: async with bot.http_session.post(url, headers=headers, json=data) as resp: response_data = await resp.json() print(response_data) answer = response_data['choices'][0]['message']['content'] except Exception as error: return await handle_error(error) chunks = [answer[i:i+1999] for i in range(0, len(answer), 1999)] for chunk in chunks: await ctx.send(chunk) @bot.command( description="Highscores", help="Shows a bar graph of users in this channel and how many messages they have sent.", brief="Display chat highscores" ) async def highscores(ctx, limit=0): filename = str(ctx.channel.id) + ".log" with open("channels/logs/" + filename, 'r', encoding="utf-8") as logfile: data = logfile.readlines() logfile.close() def is_username(user): for character in user: if character.isupper(): return False if not (character.isalpha() or character.isdigit() or character == '.' or character == '_'): return False return True user_message_counts = {} for line in data: try: user = line[0:line.find(':')] if is_username(user): if user not in user_message_counts and user != "" and len(user) <= 32: user_message_counts[user] = 1 else: user_message_counts[user] += 1 except Exception as error: await handle_error(error) def remove_dict_keys_if_less_than_x(dictionary,x): for key in dictionary: if dictionary[key] <= x: dictionary.pop(key) return remove_dict_keys_if_less_than_x(dictionary,x) return dictionary print(user_message_counts) remove_dict_keys_if_less_than_x(user_message_counts,limit) keys = list(user_message_counts.keys()) values = list(user_message_counts.values()) fig, ax = plt.subplots() bar_container = ax.barh(keys, values) ax.set_xlabel("Message Count") ax.set_ylabel("Username") ax.set_title("Messages Sent in " + ctx.channel.name) ax.bar_label(bar_container, label_type='center') plt.savefig(str(ctx.channel.id) + '_hiscores.png', dpi=1000, bbox_inches="tight") with open(str(ctx.channel.id) + '_hiscores.png', "rb") as fh: f = discord.File(fh, filename=str(ctx.channel.id) + '_hiscores.png') await ctx.send(file=f) @bot.command( description="Highscores Server", help="Shows a bar graph of users across all servers I am in and how many messages they have sent.", brief="Display chat highscores" ) async def highscores_server(ctx, limit=0): user_message_counts = {} data = [] for filename in os.listdir("channels/logs/"): with open("channels/logs/" + filename, 'r', encoding="utf-8") as logfile: data += logfile.readlines() logfile.close() def is_username(user): for character in user: if character.isupper(): return False if not (character.isalpha() or character.isdigit() or character == '.' or character == '_'): return False return True user_message_counts = {} for line in data: try: user = line[0:line.find(':')] if is_username(user): if user not in user_message_counts and user != "" and len(user) <= 32: user_message_counts[user] = 1 else: user_message_counts[user] += 1 except Exception as error: await handle_error(error) def remove_dict_keys_if_less_than_x(dictionary,x): for key in dictionary: if dictionary[key] <= x: dictionary.pop(key) return remove_dict_keys_if_less_than_x(dictionary,x) return dictionary print(user_message_counts) print("printed") user_message_counts = remove_dict_keys_if_less_than_x(user_message_counts,limit) keys = list(user_message_counts.keys()) values = list(user_message_counts.values()) fig, ax = plt.subplots() bar_container = ax.barh(keys, values) ax.set_xlabel("Message Count") ax.set_ylabel("Username") ax.set_title("Messages Sent in all channels I can see") ax.bar_label(bar_container, label_type='center') plt.savefig(str(ctx.channel.id) + '_hiscores.png', dpi=1000, bbox_inches="tight") with open(str(ctx.channel.id) + '_hiscores.png', "rb") as fh: f = discord.File(fh, filename=str(ctx.channel.id) + '_hiscores.png') await ctx.send(file=f) @bot.command( description="Website", help="Generates a website using gpt 3.5. Usage: !website (topic)", brief="Generate a website" ) async def website(ctx): async def delete_local_pngs(local_folder): for filename in os.listdir(local_folder): if ".png" in filename: os.remove(local_folder + filename) async def delete_ftp_pngs(server_folder): async with asyncssh.connect(ftp_server, username=ftp_username, password=ftp_password) as conn: async with conn.start_sftp_client() as sftp: for filename in (await sftp.listdir(server_folder)): if '.png' in filename: try: print("Deleting", filename) await sftp.remove(server_folder+filename) except: print("Couldn't delete", filename) async def extract_image_tags(code): count = code.count("This webpage is currently being generated. The page will refresh once it is complete. Please be patient.
") await upload_sftp(working_file, server_folder, "index.html") topic = ctx.message.content.split(" ", maxsplit=1)[1] prompt = "Generate a webpage using html and inline css. The webpage topic should be " + topic + ". Feel free to add image tags with alt text. Leave the image source blank. The images will be added later." code = await answer_question(prompt) await delete_local_pngs(local_folder) await delete_ftp_pngs(server_folder) tags = await extract_image_tags(code) alt_texts = await extract_image_alt_text(tags) file_list = await generate_images(local_folder, alt_texts) code = await add_image_filenames(code, file_list) with open(working_file, 'w') as f: f.write(code) f.close() await upload_html_and_imgs(local_folder, server_folder) await ctx.send("Finished https://ai.phixxy.com/ai-webpage/") except Exception as error: await handle_error(error) await ctx.send("Failed, Try again.") @bot.command( description="Feature", help="Suggest a feature. Usage: !feature (feature)", brief="Suggest a feature" ) async def feature(ctx): try: feature = ctx.message.content.split(" ", maxsplit=1)[1] with open("features.txt",'a') as f: f.writelines('\n' + feature) await ctx.send("Added " + feature) except Exception as error: await handle_error(error) with open("features.txt",'r') as f: features = f.read() await ctx.send(features) @bot.command( description="Chat", help="Enable or disable bot chat in this channel. Usage !chat (enable|disable)", brief="Enable or disable bot chat" ) async def chat(ctx, message): if "enable" in message: edit_channel_config(ctx.channel.id, "chat_enabled", True) await ctx.send("Chat Enabled") elif "disable" in message: edit_channel_config(ctx.channel.id, "chat_enabled", False) await ctx.send("Chat Disabled") else: await ctx.send("Usage: !chat (enable|disable)") @bot.command( description="Reactions", help="Enable or disable bot reactions in this channel. Usage !reactions (enable|disable)", brief="Enable or disable bot reactions" ) async def reactions(ctx, message): if "enable" in message: edit_channel_config(ctx.channel.id, "react_to_msgs", True) await ctx.send("Reactions Enabled") elif "disable" in message: edit_channel_config(ctx.channel.id, "react_to_msgs", False) await ctx.send("Reactions Disabled") else: await ctx.send("Usage: !reactions (enable|disable)") @bot.command( description="View Images", help="Enable or disable bot viewing images in this channel. Usage !viewimages (enable|disable)", brief="Enable or disable bot viewing images" ) async def viewimages(ctx, message): if "enable" in message: edit_channel_config(ctx.channel.id, "look_at_images", True) await ctx.send("Viewing Enabled") elif "disable" in message: edit_channel_config(ctx.channel.id, "look_at_images", False) await ctx.send("Viewing Disabled") else: await ctx.send("Usage: !viewimages (enable|disable)") @bot.command( description="Commands", help="Enable or disable bot commands in this channel. Usage !enable_commands (enable|disable)", brief="Enable or disable bot commands" ) async def enable_commands(ctx, message): if "disable" in message or "false" in message: edit_channel_config(ctx.channel.id, "commands_enabled", False) await ctx.send("Commands Disabled") else: edit_channel_config(ctx.channel.id, "commands_enabled", True) await ctx.send("Commands Enabled") @bot.command( description="Topic", help="Set the channel topic for the bot. Usage: !topic (topic)", brief="Set channel topic" ) async def topic(ctx, channel_topic): edit_channel_config(ctx.channel.id, "channel_topic", channel_topic) await ctx.send("Topic changed to " + channel_topic) '''@bot.command( description="Python", help="Run some python code. Imports are disabled but random is imported for you. Usage !python (codeblock)", brief="Run some python code" ) async def python(ctx): try: code = ctx.message.content print(code.find("```"),code.rfind("```")) code = code[code.find("```")+3:code.rfind("```")] #Finds the code in codeblocks if "import" in code: await ctx.send("Imports not allowed") return 0 if "```" in code: code = code.replace("```", "") code = "import random\nimport math\n" + code if len(code) == 0: await ctx.send('Please provide some code to run') else: folder_path = "tmp/python_temp_scripts/" if not os.path.exists(folder_path): os.makedirs(folder_path) unique_num = str(len(os.listdir(folder_path))) filename = f"{folder_path}{unique_num}.py" with open(filename, "w") as f: f.write(code) try: try: response = subprocess.run(["python", filename], timeout=10, capture_output=True, check=True) except subprocess.TimeoutExpired: await ctx.send("Code took too long to run!") return 0 print("response", response.stdout.decode('utf-8')) if response.stdout.decode('utf-8') == "": await ctx.send("No Output") return 0 await ctx.send(response.stdout.decode('utf-8')) except subprocess.CalledProcessError as error: await ctx.send(error.stderr.decode('utf-8')) except Exception as error: await handle_error(error) await ctx.send("Usage: !python (codeblock)")''' @bot.command( description="FTP", help="Enable or disable bot FTP to phixxy.com in this channel. Usage !ftp (enable|disable)", brief="Enable or disable uploading to web" ) async def ftp(ctx, message): if "enable" in message: edit_channel_config(ctx.channel.id, "ftp_enabled", True) await ctx.send("FTP Enabled") elif "disable" in message: edit_channel_config(ctx.channel.id, "ftp_enabled", False) await ctx.send("FTP Disabled") else: await ctx.send("Usage: !ftp (enable|disable)") @bot.command( description="Personality", help="Set the personality of the bot. Usage: !personality (personality)", brief="Set the personality" ) async def personality(ctx): personality_type = ctx.message.content.split(" ", maxsplit=1)[1] edit_channel_config(ctx.channel.id, "personality", personality_type) await ctx.send("Personality changed to " + personality_type) '''@bot.command( description="Secret Santa Register", help="Register for secret santa!", brief="Register for secret santa!" ) async def ss_register(ctx): try: email = ctx.message.content.split(" ", maxsplit=1)[1] print(ctx.author.name, email) with open("santa.txt", 'a') as f: f.writelines(ctx.author.name + ';' + email + ',') await ctx.send(ctx.author.name + " registered for secret santa!") except: await ctx.send("Usage: !ss_register (email address)")''' @bot.command( description="Poll", help='Create a poll with up to 9 options. Usage: !poll "Put question here" "option 1" "option 2"', brief="Enable or disable bot reactions" ) async def poll(ctx, question, *options: str): if len(options) > 9: await ctx.send("Error: You cannot have more than 9 options") return embed = discord.Embed(title=question, colour=discord.Colour(0x283593)) for i, option in enumerate(options): embed.add_field(name=f"Option {i+1}", value=option, inline=False) message = await ctx.send(embed=embed) numbers = {0: "\u0030\ufe0f\u20e3", 1: "\u0031\ufe0f\u20e3", 2: "\u0032\ufe0f\u20e3", 3: "\u0033\ufe0f\u20e3", 4: "\u0034\ufe0f\u20e3", 5: "\u0035\ufe0f\u20e3", 6: "\u0036\ufe0f\u20e3", 7: "\u0037\ufe0f\u20e3", 8: "\u0038\ufe0f\u20e3", 9: "\u0039\ufe0f\u20e3"} for i in range(len(options)): await message.add_reaction(numbers.get(i+1)) @bot.command( description="Roll", help="Rolls dice mostly for Dungeons and Dragons type games. Usage: !roll 3d6+2", brief="Simulate rolling dice" ) async def roll(ctx, dice_string): dice_parts = dice_string.split('d') num_dice = int(dice_parts[0]) if '+' in dice_parts[1]: die_parts = dice_parts[1].split('+') die_size = int(die_parts[0]) modifier = int(die_parts[1]) elif '-' in dice_parts[1]: die_parts = dice_parts[1].split('-') die_size = int(die_parts[0]) modifier = -int(die_parts[1]) else: die_size = int(dice_parts[1]) modifier = 0 rolls = [random.randint(1, die_size) for i in range(num_dice)] dice_str = ' + '.join([str(roll) for roll in rolls]) total = sum(rolls) + modifier await ctx.send(f'{dice_str} + {modifier} = {total}' if modifier != 0 else f'{dice_str} = {total}') @bot.command( description="Kill", help="Kills the bot in event of an emergency. Only special users can do this! Usage: !kill", brief="Kill the bot", hidden=True ) async def kill(ctx): "Kills the bot" if ctx.author.id == 242018983241318410: exit() else: await ctx.channel.send("You don't have permission to do that.") @bot.command( description="Reset", help="Resets the bot in event of an emergency. Only special users can do this! Usage: !reset", brief="Reset the bot", hidden=True ) async def reset(ctx): if ctx.author.id == 242018983241318410: python = sys.executable os.execl(python, python, *sys.argv) else: await ctx.channel.send("You don't have permission to do that.") @bot.event async def on_reaction_add(reaction, user): if not random.randint(0,9): message = reaction.message emoji = reaction.emoji await message.add_reaction(emoji) async def pkmn_msg(discord_id): path = "databases/pokemon/"+str(discord_id)+'.json' if os.path.isfile(path): with open(path, 'r') as f: json_data = json.loads(f.readline()) json_data['buddy_xp'] += random.randint(1,5) json_data = json.dumps(json_data) with open(path, 'w') as f: f.writelines(json_data) @bot.event async def on_message(ctx): #log stuff logfile = "channels/logs/{0}.log".format(str(ctx.channel.id)) channel_vars = await get_channel_config(ctx.channel.id) chat_history_string = await log_chat_and_get_history(ctx, logfile, channel_vars) #add pokemon xp await pkmn_msg(ctx.author.id) #handle non-text channels (dms, etc) if ctx.channel.type.value != 0 and ctx.author.id != 242018983241318410: #This used to notify the user it cannot respond in this channel, but that spammed threads return await react_to_msg(ctx, channel_vars["react_to_msgs"]) #emoji reactions if channel_vars["commands_enabled"] or (ctx.author.id == 242018983241318410 and ctx.content[0] == "!"): await bot.process_commands(ctx) if not channel_vars["commands_enabled"]: await ctx.channel.send("This command only ran because you set it to allow to run even when commands are disabled") if channel_vars["chat_enabled"] and not ctx.author.bot: if ctx.content and ctx.content[0] != "!": await chat_response(ctx, channel_vars, chat_history_string) elif not ctx.content: await chat_response(ctx, channel_vars, chat_history_string) bot.run(discord_token)