changed extensions to cogs, removed mentions of plugins/extensions

This commit is contained in:
phixxy 2024-02-14 00:07:06 -08:00
parent bdcdd87348
commit 0e5cb9dd0f
17 changed files with 6 additions and 6 deletions

95
cogs/admin.py Normal file
View file

@ -0,0 +1,95 @@
#Adds administrative commands to the bot
import os
import sys
import logging
import subprocess
from discord.ext import commands
class Admin(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.admin_ids = [242018983241318410]
self.log_file = "logs/info.log"
self.logger = logging.getLogger("bot")
@commands.command(
description="Kill",
help="Kills the bot in event of an emergency. Only special users can do this! Usage: !kill",
brief="Kill the bot",
hidden=True
)
async def kill(self, ctx):
"Kills the bot"
if ctx.author.id in self.admin_ids:
self.logger.info(f"Kill command ran by {ctx.author.id}")
exit()
else:
await ctx.channel.send("You don't have permission to do that.")
self.logger.info(f"Kill command attempted by {ctx.author.id}")
@commands.command(
description="Reset",
help="Resets the bot in event of an emergency. Only special users can do this! Usage: !reset",
brief="Reset the bot",
hidden=True
)
async def reset(self, ctx):
if ctx.author.id in self.admin_ids:
self.logger.info(f"Reset command ran by {ctx.author.id}")
python = sys.executable
os.execl(python, python, *sys.argv)
else:
await ctx.channel.send("You don't have permission to do that.")
self.logger.info(f"Reset command attempted by {ctx.author.id}")
@commands.command(
description="Update",
help="This will update sparkytron to the most recent version on github. Only privileged users can run this command! Usage: !update",
brief="Runs git pull",
hidden=True
)
async def update(self, ctx):
if ctx.author.id in self.admin_ids:
self.logger.info(f"Update command ran by {ctx.author.id}")
output = subprocess.run(["git","pull"],capture_output=True)
if output.stderr:
await ctx.send("Update Attempted")
await ctx.send(output.stderr.decode('utf-8'))
else:
await ctx.send(output.stdout.decode('utf-8'))
else:
await ctx.send("You don't have permission to do this.")
self.logger.info(f"Update command attempted by {ctx.author.id}")
def is_error(self, log_line):
if "ERROR" in log_line:
return True
else:
return False
@commands.command(
description="Errors",
help="This will display the last X errors",
brief="Displays errors",
hidden=True
)
async def errors(self, ctx, amount=5):
if ctx.author.id in self.admin_ids:
self.logger.info(f"Errors command ran by {ctx.author.id}")
with open(self.log_file, 'r', encoding="utf-8") as log:
data = log.readlines()
log.close()
errors = []
for line in data[::-1]:
if self.is_error(line):
errors.append(line)
if len(errors) >= amount:
break
await ctx.send("```\n" + "\n".join(errors[::-1]) + "```")
else:
await ctx.send("You don't have permission to do this.")
self.logger.info(f"Update command attempted by {ctx.author.id}")
async def setup(bot):
await bot.add_cog(Admin(bot))

81
cogs/anime.py Normal file
View file

@ -0,0 +1,81 @@
import random
import logging
import aiohttp
from discord.ext import commands
import discord
class Anime(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.url = "https://api.waifu.im/search"
self.http_session = self.create_aiohttp_session()
self.logger = logging.getLogger("bot")
def create_aiohttp_session(self):
return aiohttp.ClientSession()
async def get_waifu(self, tags):
params = {
'included_tags': tags,
'height': '>=1000'
}
async with self.http_session.get(self.url, params=params) as resp:
resp_data = await resp.json()
try:
image = random.choice(resp_data['images'])
return image['url']
except:
if resp_data['detail'] == "No image found matching the criteria given.":
self.logger.info("No image found matching the criteria given.")
return "No image found matching the criteria given."
else:
self.logger.exception("Something went wrong")
return "Something went wrong"
async def get_anime_from_img(self, img_url):
async with self.http_session.get(f"https://api.trace.moe/search?anilistInfo&url={img_url}") as resp:
resp_data = await resp.json()
title = resp_data["result"][0]["anilist"]["title"]
video = resp_data["result"][0]["video"]
return title, video
@commands.command(aliases=["what_anime"])
async def whatanime(self, ctx):
if ctx.message.attachments:
file_url = ctx.message.attachments[0].url
else:
try:
file_url = ctx.message.content.split(" ", maxsplit=2)[1]
except:
await ctx.send("No image linked or attached.")
return
try:
titles, video = await self.get_anime_from_img(file_url)
except:
ctx.send("An error occurred. Try again by saving and uploading the image.")
message = ""
for key in titles:
message += f"{key}: {titles[key]}\n"
message += f"Scene: {video}"
await ctx.send(message)
@commands.command()
async def waifu(self, ctx, nsfw=""):
if nsfw.lower() == "nsfw":
tag = random.choice(["ero", "ass", "hentai", "milf", "oral", "paizuri", "ecchi"])
else:
tag = random.choice(["waifu", "maid", "uniform"])
image_url = await self.get_waifu(tag)
if ctx.channel.type == discord.ChannelType["private"]:
await ctx.send(image_url)
elif not ctx.channel.is_nsfw() and nsfw.lower() == "nsfw":
await ctx.send("Cannot post NSFW images in this channel.")
else:
await ctx.send(image_url)
async def setup(bot):
await bot.add_cog(Anime(bot))

467
cogs/chatgpt.py Normal file
View file

@ -0,0 +1,467 @@
import os
import time
import json
import logging
import random
import asyncio
import aiofiles
import aiohttp
import discord
from discord.ext import commands, tasks
class ChatGPT(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.API_KEY = os.getenv("openai.api_key")
self.working_dir = "tmp/chatgpt/"
self.data_dir = "data/chatgpt/"
self.premium_role = 1200943915579228170
self.folder_setup()
self.remind_me_loop.start()
self.http_session = self.create_aiohttp_session()
self.logger = logging.getLogger("bot")
def create_aiohttp_session(self):
return aiohttp.ClientSession(
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {os.getenv("openai.api_key")}',
}
)
def folder_setup(self):
try:
folders = [
self.working_dir,
self.data_dir,
self.data_dir + "config",
self.data_dir + "logs",
self.data_dir + "dalle"
]
for folder in folders:
if not os.path.exists(folder):
os.mkdir(folder)
except Exception as e:
self.logger.exception(f"ChatGPT failed to make directories: {e}")
def create_channel_config(self, filepath):
config_dict = {
"personality":"average",
"channel_topic":"casual",
"chat_enabled":False,
"chat_history_len":5,
"react_to_msgs":False,
}
with open(filepath,"w") as f:
json.dump(config_dict,f)
self.logger.debug("Wrote ChatGPT config variables to file.")
async def get_channel_config(self, channel_id):
filepath = f"{self.data_dir}config/{channel_id}.json"
if not os.path.exists(filepath):
self.create_channel_config(filepath)
with open(filepath, "r") as f:
config_dict = json.loads(f.readline())
return config_dict
def edit_channel_config(self, channel_id, key, value):
config_file = f"{self.data_dir}config/{channel_id}.json"
with open(config_file, 'r') as f:
config_data = json.load(f)
config_data[key] = value
with open(config_file, "w") as f:
json.dump(config_data, f)
def read_db(self,filepath):
with open(filepath,"r") as fileobj:
db_content = json.load(fileobj)
return db_content
def save_to_db(self,filepath,db_content):
with open(filepath,"w") as fileobj:
json.dump(db_content,fileobj,indent=4)
async def remind(self,reminder_dict): #THIS IS THE FUNCTION TO AUTOMATICALLY FULFILL THE RESPONSE WHEN CALLED BY THE REMIND ME LOOP
#this is what the reminder_dict looks like: data[target_time] = {"user_id":user_id,"response":response}
user = self.bot.get_user(reminder_dict["user_id"])
return await user.send(reminder_dict["response"])
async def answer_question(self, topic, model="gpt-3.5-turbo"):
data = {
"model": model,
"messages": [{"role": "user", "content": topic}]
}
url = "https://api.openai.com/v1/chat/completions"
try:
async with self.http_session.post(url, json=data) as resp:
response_data = await resp.json()
response = response_data['choices'][0]['message']['content']
return response
except Exception as error:
self.logger.exception("Error occurred in answer_question")
return "Error occurred in answer_question"
@commands.command(
description="Personality",
help="Set the personality of the bot. Usage: !personality (personality)",
brief="Set the personality"
)
async def personality(self, ctx, personality_type=None):
if personality_type:
self.edit_channel_config(ctx.channel.id, "personality", personality_type)
await ctx.send(f"Personality changed to {personality_type}")
else:
channel_config = await self.get_channel_config(ctx.channel.id)
current_personality = channel_config["personality"]
await ctx.send(f"Current personality is {current_personality}")
@commands.command(
description="Topic",
help="Set the channel topic for the bot. Usage: !topic (topic)",
brief="Set channel topic"
)
async def topic(self, ctx, channel_topic=None):
if channel_topic:
self.edit_channel_config(ctx.channel.id, "channel_topic", channel_topic)
await ctx.send(f"Topic changed to {channel_topic}")
else:
channel_config = await self.get_channel_config(ctx.channel.id)
current_topic = channel_config["channel_topic"]
await ctx.send(f"Current topic is {current_topic}")
@commands.command(
description="Chat",
help="Enable or disable bot chat in this channel. Usage !chat (enable|disable)",
brief="Enable or disable bot chat"
)
async def chat(self, ctx, message):
if "enable" in message:
self.edit_channel_config(ctx.channel.id, "chat_enabled", True)
await ctx.send("Chat Enabled")
elif "disable" in message:
self.edit_channel_config(ctx.channel.id, "chat_enabled", False)
await ctx.send("Chat Disabled")
else:
await ctx.send("Usage: !chat (enable|disable)")
@commands.command(
description="Reactions",
help="Enable or disable bot reactions in this channel. Usage !reactions (enable|disable)",
brief="Enable or disable bot reactions"
)
async def reactions(self, ctx, message):
if "enable" in message:
self.edit_channel_config(ctx.channel.id, "react_to_msgs", True)
await ctx.send("Reactions Enabled")
elif "disable" in message:
self.edit_channel_config(ctx.channel.id, "react_to_msgs", False)
await ctx.send("Reactions Disabled")
else:
await ctx.send("Usage: !reactions (enable|disable)")
@commands.command(
description="Question",
help="Ask a raw chatgpt question. Usage: !question (question)",
brief="Get an answer"
)
async def question(self, ctx):
await ctx.send("One moment, let me think...")
question = ctx.message.content.split(" ", maxsplit=1)[1]
answer = await self.answer_question(question)
chunks = [answer[i:i+1999] for i in range(0, len(answer), 1999)]
for chunk in chunks:
await ctx.send(chunk)
@commands.command(
description="Question GPT4",
help="Ask GPT4 a question. Usage: !question_gpt4 (question)",
brief="Get an answer"
)
async def question_gpt4(self, ctx):
if ctx.author.get_role(self.premium_role):
await ctx.send("One moment, let me think...")
question = ctx.message.content.split(" ", maxsplit=1)[1]
answer = await self.answer_question(question, "gpt-4-turbo-preview")
chunks = [answer[i:i+1999] for i in range(0, len(answer), 1999)]
for chunk in chunks:
await ctx.send(chunk)
else:
await ctx.send("Sorry you must be a premium member to use this command. (!donate)")
async def dalle_api_call(self, prompt, model="dall-e-2", quality="standard", size="1024x1024"):
data = {
"model": model,
"prompt": prompt,
"size": size,
"quality": quality,
"n":1,
}
url = "https://api.openai.com/v1/images/generations"
try:
async with self.http_session.post(url, json=data) as resp:
response_data = await resp.json()
response = response_data['data'][0]['url']
return response
except Exception as error:
self.logger.exception("Error occurred in dalle")
return "Error occurred in dalle"
async def download_image(self, url, destination):
if url == "Error occurred in dalle":
return
async with self.http_session.get(url) as resp:
if resp.status == 200:
f = await aiofiles.open(destination, mode='wb')
await f.write(await resp.read())
await f.close()
return destination
async def generate_dalle_image(self, ctx, model, quality="standard", size="1024x1024"):
if ctx.author.get_role(self.premium_role):
prompt = ctx.message.content.split(" ", maxsplit=1)[1]
await ctx.send(f"Please be patient this may take some time! Generating: {prompt}.")
image_url = await self.dalle_api_call(prompt, model=model, quality=quality, size=size)
my_filename = str(time.time_ns()) + ".png"
image_filepath = f"{self.data_dir}dalle/{my_filename}"
await self.download_image(image_url, image_filepath)
with open(image_filepath, "rb") as fh:
f = discord.File(fh, filename=image_filepath)
prompt = prompt.replace('\n',' ')
log_data = f'Author: {ctx.author.name}, Prompt: {prompt}, Filename: {my_filename}\n'
with open(f"{self.data_dir}logs/dalle3.log", 'a') as log_filepath:
log_filepath.writelines(log_data)
await ctx.send(f'Generated by: {ctx.author.name}\nPrompt: {prompt}', file=f)
else:
await ctx.send("Sorry you must be a premium member to use this command. (!donate)")
@commands.command(
description="Dalle 2",
help="Generate an image with Dalle 2 Usage: !dalle2 (prompt)",
brief="Generate Image"
)
async def dalle2(self, ctx):
await self.generate_dalle_image(ctx, model="dall-e-2")
@commands.command(
description="Dalle 3",
help="Generate an image with Dalle 3 Usage: !dalle3 (prompt)",
brief="Generate Image",
aliases = ['dalle']
)
async def dalle3(self, ctx):
await self.generate_dalle_image(ctx, model="dall-e-3")
@commands.command(
description="Dalle 3 HD",
help="Generate an HD image with Dalle 3 Usage: !dalle3 (prompt)",
brief="Generate HD Image",
)
async def dalle3hd(self, ctx):
await self.generate_dalle_image(ctx, model="dall-e-3", quality="hd", size="1792x1024")
@commands.command(
description="Image GPT4",
help="Ask GPT4 a question about an image. Usage: !question_gpt4 (link) (question)",
brief="Get an answer"
)
async def looker(self, ctx):
image_link = ctx.message.content.split(" ", maxsplit=2)[1]
question = ctx.message.content.split(" ", maxsplit=2)[2]
data = {
"model": "gpt-4-vision-preview",
"messages": [{"role": "user", "content": [{"type": "text", "text": question},{"type": "image_url","image_url": {"url": image_link}}]}],
"max_tokens": 500
}
url = "https://api.openai.com/v1/chat/completions"
try:
async with self.http_session.post(url, json=data) as resp:
response_data = await resp.json()
self.logger.debug(response_data)
answer = response_data['choices'][0]['message']['content']
except Exception as error:
self.logger.exception("error occurred in looker")
chunks = [answer[i:i+1999] for i in range(0, len(answer), 1999)]
for chunk in chunks:
await ctx.send(chunk)
@commands.command(
description="Remind Me",
help="Send a request in natural language to ask Sparky to remind you of a task or event by direct message in a specified amount of time. Minimum 1 minute.",
brief="Get a reminder",
aliases=["remindme","remind_me","remind"]
)
async def save_reminder(self,ctx):
#SETUP
reminders_path = self.data_dir + "reminders.txt"
if not os.path.exists(reminders_path):
with open(reminders_path,"w") as file_obj:
file_obj.write("{}")
prompt = ctx.message.content.split(" ", maxsplit=1)[1]
reminder_data = self.read_db(reminders_path)
current_time = int(time.time_ns())
user_id = ctx.author.id
#PARSE PROMPT
duration_s = await self.answer_question(f"You are an automated bot whose only function is to convert a natural language number into an integer. You must determine a number representing after how long, in seconds, the user wishes you to remind them of a task based on the information provided. Please respond using only an integer of the equivalent or approximate time in seconds. You must not use any words in your response other than a single integer representing that time in seconds. You are incapable of using any English words at all. If you are unable to determine a time from the prompt given, return only the integer 0. The prompt is as follows: {prompt}")
response = await self.answer_question(f"You are a reminder bot whose purpose is to help users by reminding them of tasks or events. A user by the name of {ctx.author.name} has asked you to remind them about something after a certain amount of time has passed. That time has now passed. Their original request was as follows: {prompt}")
if duration_s == "0":
await ctx.reply("Sorry! I'm not sure exactly when you need me to remind you based on your wording. Could you phrase the request a bit differently?",mention_author=True)
return
else:
await ctx.reply("Sure thing! I'll remind you when the time comes.", mention_author=False)
#MATHS
duration_ns = int(duration_s) * 1000000000
target_time = current_time + int(duration_ns)
#CREATE FILEDUMP
reminder_data[target_time] = {"user_id":user_id,"response":response}
self.logger.info(f"Reminding user {ctx.author.id} in {duration_s} seconds || Target time (ns): {target_time}")
self.save_to_db(reminders_path,reminder_data)
@tasks.loop(seconds=60) # THIS ONE NEEDS TO POP AND THEN CALL THE REMIND FUNC
async def remind_me_loop(self):
reminders_path = self.data_dir + "reminders.txt"
current_time = int(time.time_ns())
data = self.read_db(reminders_path)
trash = []
#CHECK IF ANY NEED TO BE FULFILLED
for remind_time in data.keys():
if current_time >= int(remind_time):
reminder_dict = data[remind_time]
sent = await self.remind(reminder_dict) #THIS SENDS THE REMINDER DICT TO REMIND FUNC
if sent:
self.logger.info(f"Reminder sent successfully to {reminder_dict['user_id']}")
trash.append(remind_time) #NEED TO POP OR THEY WILL GET REMINDED AD INFINITUM
for key in trash:
if data.pop(key):
self.logger.debug("Fulfilled reminders successfully purged")
self.save_to_db(reminders_path,data)
async def log_chat_and_get_history(self, ctx, logfile, channel_vars):
#todo: ctx is actually a message, make this obv
log_line = ''
log_line += ctx.content
log_line = ctx.author.name + ": " + log_line +"\n"
chat_history = ""
self.logger.debug("Logging: " + log_line, end="")
with open(logfile, "a", encoding="utf-8") as f:
f.write(log_line)
with open(logfile, "r", encoding="utf-8") as f:
for line in (f.readlines() [-int(channel_vars["chat_history_len"]):]):
chat_history += line
return chat_history
async def react_to_msg(self, ctx, react):
#todo ctx is actually a message, make this obv
def is_emoji(string):
if len(string) == 1:
# Range of Unicode codepoints for emojis
if 0x1F300 <= ord(string) <= 0x1F6FF:
return True
return False
if react:
if not random.randint(0,10) and ctx.author.id != 1097302679836971038:
#todo above line is do not react to self, make this work programatically
system_msg = "Send only an emoji as a discord reaction to the following chat message"
message = ctx.content[0]
data = {
"model": "gpt-3.5-turbo",
"messages": [{"role": "system", "content": system_msg}, {"role": "user", "content": message}]
}
url = "https://api.openai.com/v1/chat/completions"
try:
async with self.http_session.post(url, json=data) as resp:
response_data = await resp.json()
reaction = response_data['choices'][0]['message']['content'].strip()
if is_emoji(reaction):
await ctx.add_reaction(reaction)
else:
await ctx.add_reaction("😓")
except Exception as error:
self.logger.exception("Some error happened while trying to react to a message")
async def chat_response(self, ctx, channel_vars, chat_history_string):
async with ctx.channel.typing():
await asyncio.sleep(1)
prompt = f"You are a {channel_vars['personality']} chat bot named Sparkytron 3000 created by @phixxy.com. Your personality should be {channel_vars['personality']}. You are currently in a {channel_vars['channel_topic']} chatroom. The message history is: {chat_history_string}"
data = {
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": prompt}]
}
url = "https://api.openai.com/v1/chat/completions"
try:
async with self.http_session.post(url, json=data) as resp:
response_data = await resp.json()
response = response_data['choices'][0]['message']['content']
if "Sparkytron 3000:" in response[0:17]:
response = response.replace("Sparkytron 3000:", "")
max_len = 1999
if len(response) > max_len:
messages=[response[y-max_len:y] for y in range(max_len, len(response)+max_len,max_len)]
else:
messages=[response]
for message in messages:
await ctx.channel.send(message)
except Exception as error:
self.logger.exception("Problem with chat_response in chatgpt")
@commands.Cog.listener()
async def on_reaction_add(self, reaction, user):
if not random.randint(0,9):
message = reaction.message
emoji = reaction.emoji
await message.add_reaction(emoji)
@commands.Cog.listener()
async def on_message(self, message: discord.Message):
# Log Chat
# Todo, make a logging cog to handle this stuff later
logfile = f"{self.data_dir}logs/{message.channel.id}.log"
channel_vars = await self.get_channel_config(message.channel.id)
chat_history_string = await self.log_chat_and_get_history(message, logfile, channel_vars)
# Emoji Reaction
await self.react_to_msg(message, channel_vars["react_to_msgs"])
# Chat Response
if channel_vars["chat_enabled"] and not message.author.bot:
if message.content and message.content[0] != "!":
await self.chat_response(message, channel_vars, chat_history_string)
elif not message.content:
await self.chat_response(message, channel_vars, chat_history_string)
async def setup(bot):
await bot.add_cog(ChatGPT(bot))

214
cogs/currency.py Normal file
View file

@ -0,0 +1,214 @@
#plugin for sparkytron3000
import time
import random
import json
import math
from discord.ext import commands
@commands.command(
description="Currency",
help="Server currency. You can run !currency claim to get started!", #This needs an overhaul
brief="Server currency tools"
)
async def currency(ctx, arg1=None, arg2=None, arg3=None, arg4=None): # just use *args
def read_db(filepath):
with open(filepath,"r") as fileobj:
db_content = json.load(fileobj)
return db_content
def save_to_db(filepath,db_content):
with open(filepath,"w") as fileobj:
json.dump(db_content,fileobj,indent=4)
def add_currency(filepath,amount):
player_db = read_db(filepath)
player_db["currency"] += amount
save_to_db(filepath,player_db)
return player_db
def calc_level_from_xp(xp):
level = min(100,math.floor(0.262615*xp**0.3220627))
return level
def add_xp(filepath,player_db,time_spent):
activity = player_db["status"]["current_activity"]
starting_xp = player_db["skills"][activity]["xp"]
starting_level = player_db["skills"][activity]["level"]
if activity == "mining":
equipment_level = player_db["equipment"]["pickaxe"]["level"]
xp_gained = (time_spent) * equipment_level # SKILL LEVEL of XP / SEC
player_db["skills"][activity]["xp"] += xp_gained
new_xp = starting_xp + xp_gained
new_level = calc_level_from_xp(new_xp) #calculate with curve here
player_db["skills"][activity]["level"] = new_level
levels_gained = new_level - starting_level
summary = "" #summary should include xp gained, levels gained (only if any were gained), current level (or new level)
summary += "You gained {} {} xp!".format(xp_gained, activity)
if levels_gained > 0: #if levels gained > 0, then include levels gained in summary
summary += "\nYou gained {} {} level(s)! You are now level {}.".format(levels_gained, activity, new_level)
save_to_db(filepath, player_db)
return player_db,summary
def add_resources(filepath,player_db,time_spent):
mining_resources = {
"sapphire": {
"value": 100,
"amount": 0
},
"emerald": {
"value": 250,
"amount": 0
},
"ruby": {
"value": 1000,
"amount": 0
},
"diamond": {
"value": 3000,
"amount": 0
}
}
if player_db["status"]["current_activity"] == "mining":
pick_power = player_db["equipment"]["pickaxe"]["power"]
pick_level = player_db["equipment"]["pickaxe"]["level"]
mining_level = player_db["skills"]["mining"]["level"]
numerator = pick_power + pick_level + mining_level
denominator = 1000
items_gained = []
time_summary = time.strftime("%H:%M:%S", time.gmtime(time_spent))
for second in range(0,time_spent):
roll = random.randint(0,denominator)
if roll <= numerator: #get a resource
roll2 = random.randint(0,100)
if roll2 <= 50:
mining_resources["sapphire"]["amount"] += 1
elif roll2 <=80:
mining_resources["emerald"]["amount"] += 1
elif roll2 <= 95:
mining_resources["ruby"]["amount"] += 1
else:
mining_resources["diamond"]["amount"] += 1
for item in mining_resources:
mined_amount = mining_resources[item]["amount"]
if item in player_db["items"]:
player_db["items"][item]["amount"] += mined_amount
items_gained.append(item.title())
items_gained.append(mined_amount)
else:
player_db["items"][item] = mining_resources[item]
items_gained.append(item.title())
items_gained.append(mined_amount)
save_to_db(filepath, player_db)
summary = "You spent {} mining. You mined {} x{}, {} x{}, {} x{}, and {} x{}.".format(time_summary, *items_gained)
return player_db,summary
async def transfer_currency(filepath, player_db, player_id, amount):
try:
amount = int(amount)
player2_filepath = "data/currency/players/" + str(player_id) + ".json"
player2_db = read_db(player2_filepath)
if player_db["currency"] >= amount:
add_currency(filepath, -amount)
add_currency(player2_filepath,amount)
await ctx.send("Sent " + str(amount) + " sparks to " + str(player_id))
except FileNotFoundError:
await ctx.send("They don't seem to be playing the game.")
async def show_levels(player_db):
output = ''
for skill in player_db["skills"]:
output += skill + ': ' + str(player_db["skills"][skill]["level"]) + '\n'
await ctx.send(output)
async def show_currency(player_db):
output = 'Sparks: ' + str(player_db["currency"])
await ctx.send(output)
async def show_items(player_db):
output = ''
for item in player_db["items"]:
output += item + ': ' + str(player_db["items"][item]["amount"]) + '\n'
await ctx.send(output)
async def stop_activity(filepath,player_db):
if player_db["status"]["current_activity"] == "idle":
await ctx.send("You are currently idle. There is no activity to stop.")
else:
time_spent = int(time.time() - player_db["status"]["start_time"]) #integer in seconds
player_db, xp_summary = add_xp(filepath,player_db,time_spent)
player_db, resources_summary = add_resources(filepath,player_db,time_spent)
await ctx.send(xp_summary)
await ctx.send(resources_summary)
player_db["status"]["current_activity"] = "idle"
save_to_db(filepath,player_db)
async def claim(filepath, player_db):
if time.time() - player_db["status"]["last_claimed"] >= 86400:
player_db = add_currency(filepath, 100)
player_db["status"]["last_claimed"] = time.time()
save_to_db(filepath,player_db)
await ctx.send("You claimed 100 sparks!")
else:
await ctx.send("Sorry, you already claimed your sparks today.")
async def mine(filepath, player_db):
if player_db["status"]["current_activity"] == "idle":
player_db["status"]["current_activity"] = "mining"
player_db["status"]["start_time"] = time.time()
save_to_db(filepath, player_db)
await ctx.send("You start mining.")
elif player_db["status"]["current_activity"] == "mining":
await ctx.send("You are already mining!")
else:
await ctx.send("You must stop " + player_db["status"]["current_activity"] + " before you start mining!")
async def gamble(filepath, player_db):
pass
working_dir = "data/currency/"
players_dir = "players/"
sender_id = str(ctx.author.id)
default_db = read_db("{0}{1}default.json".format(working_dir, players_dir))
filepath = '{0}{1}{2}.json'.format(working_dir, players_dir, sender_id)
try:
player_db = read_db(filepath)
except FileNotFoundError:
save_to_db(filepath,default_db)
player_db = read_db(filepath)
if arg1 == "claim":
await claim(filepath, player_db)
player_db = read_db(filepath)
elif arg1 == "stop":
await stop_activity(filepath, player_db)
player_db = read_db(filepath)
elif arg1 == "mine":
await mine(filepath, player_db)
player_db = read_db(filepath)
elif arg1 == "levels":
await show_levels(player_db)
elif arg1 == "items":
await show_items(player_db)
elif (arg1 == "send" or arg1 == "give") and arg2 and arg3:
await transfer_currency(filepath, player_db, arg2, arg3)
player_db = read_db(filepath)
else:
await show_currency(player_db)
async def setup(bot):
bot.add_command(currency)

51
cogs/donate.py Normal file
View file

@ -0,0 +1,51 @@
import logging
import os
import time
import matplotlib.pyplot as plt
import discord
from discord.ext import commands
class Donate(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.donation_link = "https://patreon.com/soullesssurvival"
self.data_dir = "data/donate/"
self.donor_file = "data/donate/supporters.txt"
self.folder_setup()
self.logger = logging.getLogger("bot")
def folder_setup(self):
try:
if not os.path.exists(self.data_dir):
os.mkdir(self.data_dir)
except:
self.logger.exception("Donate failed to make directories")
@commands.command(
description="Donate",
help="Get information on how to donate to support this bot.",
brief="Donation info",
)
async def donate(self, ctx):
await ctx.send(f"If you would like to support the future of this bot please consider donating here: {self.donation_link}")
@commands.command(
description="Supporters",
help="Get information on who supports this bot.",
brief="Supporter info",
aliases = ["supporter"]
)
async def supporters(self, ctx):
with open(self.donor_file, 'r') as f:
supporters = f.readlines()
message = "Thank you to the following supporters:\n"
for line in supporters:
message += line
await ctx.send(message)
async def setup(bot):
await bot.add_cog(Donate(bot))

118
cogs/highscores.py Normal file
View file

@ -0,0 +1,118 @@
#cog to show message count as a graph
import logging
import os
import time
import matplotlib.pyplot as plt
import discord
from discord.ext import commands
class Highscores(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.logger = logging.getLogger("bot")
@commands.command(
description="Highscores",
help="Shows a bar graph of users in this channel and how many messages they have sent.",
brief="Display chat highscores",
aliases=["highscore"]
)
async def highscores(self, ctx, limit=15):
filename = str(ctx.channel.id) + ".log"
with open("data/chatgpt/logs/" + filename, 'r', encoding="utf-8") as logfile:
data = logfile.readlines()
logfile.close()
def is_username(user):
for character in user:
if character.isupper():
return False
if not (character.isalpha() or character.isdigit() or character == '.' or character == '_'):
return False
return True
user_message_counts = {}
for line in data:
try:
user = line[0:line.find(':')]
if is_username(user):
if user not in user_message_counts and user != "" and len(user) <= 32:
user_message_counts[user] = 1
else:
if user != "" and len(user) <= 32:
user_message_counts[user] += 1
except Exception as error:
self.logger.exception("Error occurred in highscores")
sorted_message_counts = sorted(user_message_counts.items(), key=lambda x:x[1])
sorted_dict = dict(sorted_message_counts[-limit::])
keys = list(sorted_dict.keys())
values = list(sorted_dict.values())
fig, ax = plt.subplots()
bar_container = ax.barh(keys, values)
ax.set_xlabel("Message Count")
ax.set_ylabel("Username")
ax.set_title("Messages Sent in " + ctx.channel.name)
ax.bar_label(bar_container, label_type='center')
filepath = 'tmp/' + str(time.time_ns()) + '.png'
plt.savefig(filepath, dpi=1000, bbox_inches="tight")
with open(filepath, "rb") as fh:
f = discord.File(fh, filename=str(ctx.channel.id) + '_hiscores.png')
await ctx.send(file=f)
@commands.command(
description="Highscores Server",
help="Shows a bar graph of users across all servers I am in and how many messages they have sent.",
brief="Display chat highscores",
aliases=["highscore_server"]
)
async def highscores_server(self, ctx, limit=15):
def is_username(user):
for character in user:
if character.isupper():
return False
if not (character.isalpha() or character.isdigit() or character == '.' or character == '_'):
return False
return True
user_message_counts = {}
data = []
for filename in os.listdir("data/chatgpt/logs/"):
with open("data/chatgpt/logs/" + filename, 'r', encoding="utf-8") as logfile:
data += logfile.readlines()
logfile.close()
user_message_counts = {}
for line in data:
try:
user = line[0:line.find(':')]
if is_username(user):
if user not in user_message_counts and user != "" and len(user) <= 32:
user_message_counts[user] = 1
else:
if user != "" and len(user) <= 32:
user_message_counts[user] += 1
except Exception as error:
self.logger.exception("Error occurred in highscores_server")
sorted_message_counts = sorted(user_message_counts.items(), key=lambda x:x[1])
sorted_dict = dict(sorted_message_counts[-limit::])
keys = list(sorted_dict.keys())
values = list(sorted_dict.values())
fig, ax = plt.subplots()
bar_container = ax.barh(keys, values)
ax.set_xlabel("Message Count")
ax.set_ylabel("Username")
ax.set_title("Messages Sent in all channels I can see")
ax.bar_label(bar_container, label_type='center')
filepath = 'tmp/' + str(time.time_ns()) + '.png'
plt.savefig(filepath, dpi=1000, bbox_inches="tight")
with open(filepath, "rb") as fh:
f = discord.File(fh, filename=str(ctx.channel.id) + '_hiscores.png')
await ctx.send(file=f)
async def setup(bot):
await bot.add_cog(Highscores(bot))

149
cogs/inky_phat.py Normal file
View file

@ -0,0 +1,149 @@
import socket # used to get local IP
import time
import logging
import os
import datetime
import psutil
from PIL import Image, ImageFont, ImageDraw
from discord.ext import commands, tasks
import inky
def is_enabled():
if os.getenv("inky").lower() == "enabled":
return True
else:
return False
class InkyScreen(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.enabled = is_enabled()
self.old_message = None
self.display = self.setup()
self.start_time = time.time()
self.admin_ids = [242018983241318410]
self.font_size = 18
self.logger = logging.getLogger("bot")
self.message_loop.start()
def setup(self):
if self.enabled:
display = inky.auto()
display.set_border(inky.YELLOW)
return display
else:
return None
async def write_to_display(self, text: list):
if text is not self.old_message:
#try:
# image = Image.open("data/inky/bg.png")
#except:
# self.logger.exception("InkyScreen: Failed to load background image.")
image = Image.new("P", (self.display.WIDTH, self.display.HEIGHT), (self.display.BLACK))
draw = ImageDraw.Draw(image)
width = self.display.WIDTH
height = self.display.HEIGHT
lines = len(text)
try:
height_diff = height/lines
except:
self.logger.exception("InkyScreen: Failed to calculate height_diff.")
self.logger.info(f"InkyScreen: Text: {text}")
return
x = 0
y = 0
for line in text:
if y <= width:
draw.text((x, y), line, self.display.YELLOW, font=ImageFont.load_default(size=self.font_size))
y += height_diff
else:
self.logger.warning("InkyScreen: Text too long to fit on image.")
image = image.rotate(180)
self.display.set_image(image)
self.display.show()
self.logger.info("InkyScreen: Text successfully written to image.")
self.old_message = text
else:
self.logger.info("InkyScreen: Text is the same as the previous message, not writing to image.")
def get_ip_address(self):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
def get_bot_uptime(self):
sparky_uptime = time.time() - self.start_time
return str(datetime.timedelta(seconds=sparky_uptime))[0:-7]
def get_system_uptime(self):
system_uptime = time.time() - psutil.boot_time()
return str(datetime.timedelta(seconds=system_uptime))[0:-7]
def get_memory_usage(self):
memory_info = psutil.virtual_memory()
used_memory = memory_info.used
if used_memory >= 1000000000:
used_memory = round(used_memory/1000000000,1)
used_memory = f"{used_memory}GB"
else:
used_memory = round(used_memory/1000000)
used_memory = f"{used_memory}MB"
total_memory = round(memory_info.total/1000000000,1)
return f"Memory: {used_memory}/{total_memory}GB"
def get_disk_usage(self):
disk_info = psutil.disk_usage('/')
used_disk = disk_info.used
if used_disk >= 1000000000:
used_disk = round(used_disk/1000000000,1)
used_disk = f"{used_disk}GB"
else:
used_disk = round(used_disk/1000000)
used_disk = f"{used_disk}MB"
total_disk = round(disk_info.total/1000000000,1)
return f"Disk: {used_disk}/{total_disk}GB"
def get_cpu_usage(self):
cpu_percent = psutil.cpu_percent()
return f"CPU: {cpu_percent}%"
async def generate_message(self):
message_list = []
try:
message_list.append(f"IP: {self.get_ip_address()}")
message_list.append(f"Sys Uptime: {self.get_system_uptime()}")
message_list.append(f"Bot Uptime: {self.get_bot_uptime()}")
#message_list.append(f"Last Screen Update: {time.strftime('%H:%M:%S')}")
#message_list.append(f"Servers: {len(self.bot.guilds)}")
message_list.append(self.get_cpu_usage())
message_list.append(self.get_memory_usage())
message_list.append(self.get_disk_usage())
except Exception as e:
self.logger.error(f"Error generating InkyScreen message: {e}")
return message_list
@commands.command()
async def inkyscreen_update(self, ctx):
if ctx.author.id in self.admin_ids:
message = await self.generate_message()
await self.write_to_display(message)
await ctx.send("InkyScreen updated.")
else:
await ctx.send("You do not have permission to use this command.")
@tasks.loop(minutes=10)
async def message_loop(self):
if self.enabled:
message = await self.generate_message()
await self.write_to_display(message)
async def setup(bot):
await bot.add_cog(InkyScreen(bot))

123
cogs/meme.py Normal file
View file

@ -0,0 +1,123 @@
#plugin for sparkytron3000
import os
import random
import aiohttp
from discord.ext import commands
import logging
class Meme(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.working_dir = "tmp/meme/"
self.folder_setup()
self.http_session = self.create_aiohttp_session()
self.logger = logging.getLogger("bot")
def create_aiohttp_session(self):
return aiohttp.ClientSession()
def folder_setup(self):
try:
if not os.path.exists(self.working_dir):
os.mkdir(self.working_dir)
except:
self.logger.exception("Meme failed to make directories")
async def answer_question(self, topic, model="gpt-3.5-turbo"):
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {os.getenv("openai.api_key")}',
}
data = {
"model": model,
"messages": [{"role": "user", "content": topic}]
}
url = "https://api.openai.com/v1/chat/completions"
try:
async with self.http_session.post(url, headers=headers, json=data) as resp:
response_data = await resp.json()
response = response_data['choices'][0]['message']['content']
return response
except:
return "error occurred in meme"
@commands.command(
description="Meme",
help="Generates a meme based on input. Usage: !meme (topic)",
brief="Generate a meme"
)
async def meme(self, ctx):
async def generate_random_meme(topic):
async with self.http_session.get('https://api.imgflip.com/get_memes') as resp:
response_data = await resp.json()
response = response_data['data']['memes']
memepics = [{'name':image['name'],'url':image['url'],'id':image['id']} for image in response]
#Pick a meme format
memenumber = random.randint(1,99)
meme_name = response[memenumber-1]['name']
panel_count = response[memenumber-1]['box_count']
panel_text = await self.answer_question("Create text for a meme. The meme is " + meme_name + ". It has " + str(panel_count) + " panels. Only create one meme. Do not use emojis or hashtags! Use the topic: " + topic + ". Use the output format (DO NOT USE EXTRA NEWLINES AND DO NOT DESCRIBE THE PICTURE IN YOUR OUTPUT): \n1: [panel 1 text]\n2: [panel 2 text]")
id = memenumber
imgflip_username = os.getenv('imgflip_username')
imgflip_password = os.getenv('imgflip_password')
params = {
'username':imgflip_username,
'password':imgflip_password,
'template_id':memepics[id-1]['id']
}
boxes = []
text = panel_text.split('\n')
for x in range(len(text)):
if text[x].strip() != "":
item = text[x][3:]
if len(params)-3 < panel_count:
dictionary = {"text":item, "color": "#ffffff", "outline_color": "#000000"}
boxes.append(dictionary)
for i, box in enumerate(boxes):
params[f"boxes[{i}][text]"] = box["text"]
params[f"boxes[{i}][color]"] = box["color"]
params[f"boxes[{i}][outline_color]"] = box["outline_color"]
URL = 'https://api.imgflip.com/caption_image'
try:
async with self.http_session.post(URL, params=params) as resp:
response = await resp.json()
image_link = response['data']['url']
except:
self.logger.exception("Error occurred in meme")
try:
#------------------------------------Saving Image Using Aiohttp---------------------------------#
filename = memepics[id-1]['name']
async with self.http_session.get(image_link) as response:
folder = "tmp/meme/"
filename = folder + topic + str(len(os.listdir(folder))) + ".jpg"
with open(filename, "wb") as file:
while True:
chunk = await response.content.read(1024) # Read the response in chunks
if not chunk:
break
file.write(chunk)
except:
self.logger.exception("Something's Wrong with the aiohttp in meme So try again")
return image_link, filename
try:
topic = ctx.message.content.split(" ", maxsplit=1)[1]
await ctx.send(f'Generating {topic} meme')
link, filepath = await generate_random_meme(topic)
await ctx.send(link)
except:
self.logger.exception("Error occurred in meme")
await ctx.send('Something went wrong try again. Usage: !meme (topic)')
async def setup(bot):
await bot.add_cog(Meme(bot))

382
cogs/phixxycom.py Normal file
View file

@ -0,0 +1,382 @@
import os
import io
import base64
import logging
import time
import html
import aiohttp
import asyncssh
from PIL import Image, PngImagePlugin
from discord.ext import commands, tasks
class PhixxyCom(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.SERVER = os.getenv('ftp_server')
self.USERNAME = os.getenv('ftp_username')
self.PASSWORD = os.getenv('ftp_password')
self.working_dir = "tmp/phixxy.com/"
self.data_dir = "data/phixxy.com/"
self.folder_setup()
self.stable_diffusion_log = "data/stable_diffusion/stable_diffusion.log"
self.logger = logging.getLogger("bot")
self.phixxy_loop.start()
self.blog_loop.start()
self.http_session = self.create_aiohttp_session()
def create_aiohttp_session(self):
return aiohttp.ClientSession()
def folder_setup(self):
try:
if not os.path.exists(self.working_dir):
os.mkdir(self.working_dir)
if not os.path.exists(self.data_dir):
os.mkdir(self.data_dir)
except:
self.logger.exception("PhixxyCom failed to make directories")
def find_prompt_from_filename(self, sd_log, filename):
with open(sd_log, 'r') as f:
lines = f.readlines()
for line in reversed(lines):
if filename in line:
try:
prompt = line[line.index("Prompt: ") + 7:line.index("Filename: ")]
prompt = ''.join(prompt.rsplit(',', 1)) # Remove the last comma
return html.escape(prompt)
except:
self.logger.exception("PhixxyCom failed to find prompt from filename")
return "Unknown Prompt"
return "Unknown Prompt"
async def upload_sftp(self, local_filename, server_folder, server_filename):
remotepath = server_folder + server_filename
async with asyncssh.connect(self.SERVER, username=self.USERNAME, password=self.PASSWORD) as conn:
async with conn.start_sftp_client() as sftp:
await sftp.put(local_filename, remotepath=remotepath)
async def delete_local_pngs(self, local_folder):
for filename in os.listdir(local_folder):
if ".png" in filename:
os.remove(local_folder + filename)
async def delete_ftp_pngs(self,server_folder):
async with asyncssh.connect(os.getenv('ftp_server'), username=os.getenv('ftp_username'), password=os.getenv('ftp_password')) as conn:
async with conn.start_sftp_client() as sftp:
for filename in (await sftp.listdir(server_folder)):
if '.png' in filename:
try:
self.logger.debug("Deleting", filename)
await sftp.remove(server_folder+filename)
except:
self.logger.exception("Couldn't delete", filename)
async def extract_image_tags(self,code):
count = code.count("<img")
tags = []
for x in range(0,count):
index1 = code.find("<img")
index2 = code[index1:].find(">") + index1 + 1
img_tag = code[index1:index2]
tags.append(img_tag)
code = code[index2:]
return tags
async def extract_image_alt_text(self,tags):
alt_texts = []
for tag in tags:
index1 = tag.find("alt") + 5
index2 = tag[index1:].find("\"") + index1
alt_text = tag[index1:index2]
alt_texts.append(alt_text)
return alt_texts
async def generate_images(self, local_folder, image_list):
url = os.getenv('stablediffusion_url')
if url == "disabled":
return
file_list = []
for image in image_list:
filename = image.replace(" ", "").lower() + ".png"
payload = {"prompt": image, "steps": 25}
response = await self.http_session.post(url=f'{url}/sdapi/v1/txt2img', json=payload)
r = await response.json()
for i in r['images']:
image = Image.open(io.BytesIO(base64.b64decode(i.split(",", 1)[0])))
png_payload = {"image": "data:image/png;base64," + i}
response2 = await self.bot.http_session.post(url=f'{url}/sdapi/v1/png-info', json=png_payload)
pnginfo = PngImagePlugin.PngInfo()
json_response = await response2.json()
pnginfo.add_text("parameters", json_response.get("info"))
image.save(local_folder + filename, pnginfo=pnginfo)
file_list.append(filename)
return file_list
async def add_image_filenames(self, code, file_list):
for filename in file_list:
code = code.replace("src=\"\"", "src=\""+ filename + "\"", 1)
return code
async def upload_html_and_imgs(self, local_folder):
for filename in os.listdir(local_folder):
if ".png" in filename:
await self.upload_sftp(local_folder + filename, (os.getenv('ftp_public_html') + 'ai-webpage/'), filename)
#explicitly upload html files last!
for filename in os.listdir(local_folder):
if ".html" in filename:
await self.upload_sftp(local_folder + filename, (os.getenv('ftp_public_html') + 'ai-webpage/'), filename)
async def delete_derp_files(self, server_folder):
async with asyncssh.connect(self.SERVER, username=self.USERNAME, password=self.PASSWORD) as conn:
async with conn.start_sftp_client() as sftp:
for filename in (await sftp.listdir(server_folder)):
if filename == '.' or filename == '..' or filename == 'style.css' or filename == 'myScript.js':
pass
else:
try:
self.logger.debug("Deleting", filename)
await sftp.remove(server_folder+filename)
except:
self.logger.exception("Couldn't delete", filename)
async def meme_handler(self, folder):
for f in os.listdir(folder):
filepath = folder + f
await self.update_meme_webpage(filepath)
async def update_meme_webpage(self, filename):
server_folder = (os.getenv('ftp_public_html') + 'ai-memes/')
new_file_name = str(time.time_ns()) + ".png"
await self.upload_sftp(filename, server_folder, new_file_name)
self.logger.debug(f"Uploaded {new_file_name}")
with open(f"{self.data_dir}ai-memes/index.html", 'r') as f:
html_data = f.read()
html_insert = '<!--ADD IMG HERE-->\n <img src="' + new_file_name + '" loading="lazy">'
html_data = html_data.replace('<!--ADD IMG HERE-->',html_insert)
with open(f"{self.data_dir}ai-memes/index.html", "w") as f:
f.writelines(html_data)
await self.upload_sftp(f"{self.data_dir}ai-memes/index.html", server_folder, "index.html")
os.rename(filename, 'tmp/' + new_file_name)
async def upload_ftp_ai_images(self, ai_dict):
try:
for folder in ai_dict:
for filename in os.listdir(folder):
if filename[-4:] == '.png':
filepath = folder + filename
self.logger.info(f"Found file = {filename}")
prompt = self.find_prompt_from_filename(ai_dict[folder], filename)
self.logger.info(f"Found prompt = {prompt}")
html_file = f"{self.data_dir}ai-images/index.html"
html_insert = '''<!--REPLACE THIS COMMENT-->
<div>
<img src="<!--filename-->" loading="lazy">
<p class="image-description"><!--description--></p>
</div>'''
server_folder = (os.getenv('ftp_public_html') + 'ai-images/')
new_filename = str(time.time_ns()) + ".png"
await self.upload_sftp(filepath, server_folder, new_filename)
self.logger.info(f"Uploaded {new_filename}")
with open(html_file, 'r') as f:
html_data = f.read()
html_insert = html_insert.replace("<!--filename-->", new_filename)
html_insert = html_insert.replace("<!--description-->", prompt)
html_data = html_data.replace("<!--REPLACE THIS COMMENT-->", html_insert)
with open(html_file, "w") as f:
f.writelines(html_data)
await self.upload_sftp(html_file, server_folder, "index.html")
os.rename(filepath, f"tmp/{new_filename}")
except:
self.logger.exception("Something went wrong in upload_ftp_ai_images")
async def answer_question(self, topic, model="gpt-3.5-turbo"):
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {os.getenv("openai.api_key")}',
}
data = {
"model": model,
"messages": [{"role": "user", "content": topic}]
}
url = "https://api.openai.com/v1/chat/completions"
try:
async with self.bot.http_session.post(url, headers=headers, json=data) as resp:
response_data = await resp.json()
response = response_data['choices'][0]['message']['content']
return response
except Exception as error:
return None
@commands.command(
description="Blog",
help="Adds your topic to the list of possible future blog topics. Usage: !suggest_blog (topic)",
brief="Suggest a blog topic"
)
async def blog(self, ctx, *args):
message = ' '.join(args)
if '\n' in message:
await ctx.send("Send only one topic at a time.")
return
else:
blogpost_file = f"{self.data_dir}blog_topics.txt"
with open(blogpost_file, 'a') as f:
f.writelines(message+'\n')
await ctx.send("Saved suggestion!")
def get_last_5_messages(self):
with open(f"data/chatgpt/logs/346102473993355267.log", 'r') as f:
lines = f.readlines()
last_5_messages = ""
for i in range(5,1,-1):
last_5_messages += lines[-i]
return last_5_messages
async def generate_blog(self):
start_time = time.time()
topic = ''
filename = f"{self.data_dir}ai-blog/index.html"
with open(filename, 'r', encoding="utf-8") as f:
html_data = f.read()
current_time = time.time()
current_struct_time = time.localtime(current_time)
date = time.strftime("%B %d, %Y", current_struct_time)
if date in html_data:
return
blogpost_file = f"{self.data_dir}blog_topics.txt"
if os.path.isfile(blogpost_file):
with open(blogpost_file, 'r') as f:
blogpost_topics = f.read()
f.seek(0)
topic = f.readline()
blogpost_topics = blogpost_topics.replace(topic, '')
with open(blogpost_file, 'w') as f:
f.write(blogpost_topics)
if topic != '':
self.logger.info("Writing blogpost")
else:
messages = self.get_last_5_messages()
question = f"you have a blog and you are inspired based on this short text chat interaction:\n{messages}\nwhat will the topic of your next blog be? just tell me the topic and a one sentence description"
self.logger.info("No topic given for blogpost, generating one.")
topic = await self.answer_question(question)
post_div = '''<!--replace this with a post-->
<div class="post">
<h2 class="post-title"><!--POST_TITLE--></h2>
<p class="post-date"><!--POST_DATE--></p>
<div class="post-content">
<!--POST_CONTENT-->
</div>
</div>'''
title_prompt = 'generate an absurd essay title about ' + topic
title = await self.answer_question(title_prompt, model="gpt-3.5-turbo")
prompt = 'Write a satirical essay with a serious tone titled: "' + title + '". Do not label parts of the essay.'
content = await self.answer_question(prompt, model="gpt-4-turbo-preview")
if title in content[:len(title)]:
content = content.replace(title, '', 1)
content = f"<p>{content}</p>"
content = content.replace('\n\n', "</p><p>")
content = content.replace("<p></p>", '')
post_div = post_div.replace("<!--POST_TITLE-->", title)
post_div = post_div.replace("<!--POST_DATE-->", date)
post_div = post_div.replace("<!--POST_CONTENT-->", content)
html_data = html_data.replace("<!--replace this with a post-->", post_div)
with open(filename, 'w', encoding="utf-8") as f:
f.write(html_data)
await self.upload_sftp(filename, (os.getenv('ftp_public_html') + 'ai-blog/'), "index.html")
run_time = time.time() - start_time
self.logger.debug("It took " + str(run_time) + " seconds to generate the blog post!")
output = "Blog Updated! (" + str(run_time) + " seconds) https://ai.phixxy.com/ai-blog"
return output
@commands.command()
async def force_blog(self, ctx):
await ctx.send("Forcing blog generation")
await self.generate_blog()
@commands.command(
description="Website",
help="Generates a website using gpt 3.5. Usage: !website (topic)",
brief="Generate a website"
)
async def website(self, ctx):
server_folder = os.getenv('ftp_public_html') + 'ai-webpage/'
local_folder = f"{self.working_dir}/webpage/"
working_file = local_folder + "index.html"
if not os.path.exists(local_folder):
os.mkdir(local_folder)
try:
await ctx.send("Please wait, this will take a long time! You will be able to view the website here: https://ai.phixxy.com/ai-webpage/")
with open(working_file, "w") as f:
f.write("<!DOCTYPE html><html><head><script>setTimeout(function(){location.reload();}, 10000);</script><title>Generating Website</title><style>body {font-size: 24px;text-align: center;margin-top: 100px;}</style></head><body><p>This webpage is currently being generated. The page will refresh once it is complete. Please be patient.</p></body></html>")
await self.upload_sftp(working_file, server_folder, "index.html")
topic = ctx.message.content.split(" ", maxsplit=1)[1]
prompt = "Generate a webpage using html and inline css. The webpage topic should be " + topic + ". Feel free to add image tags with alt text. Leave the image source blank. The images will be added later."
code = await self.answer_question(prompt)
await self.delete_local_pngs(local_folder)
await self.delete_ftp_pngs(server_folder)
tags = await self.extract_image_tags(code)
alt_texts = await self.extract_image_alt_text(tags)
file_list = await self.generate_images(local_folder, alt_texts)
code = await self.add_image_filenames(code, file_list)
with open(working_file, 'w') as f:
f.write(code)
f.close()
await self.upload_html_and_imgs(local_folder)
await ctx.send("Finished https://ai.phixxy.com/ai-webpage/")
except Exception as error:
#await ctx.send("Failed, Try again.")
self.logger.exception("Website Error")
@tasks.loop(seconds=60)
async def phixxy_loop(self):
ai_images_dict = {
# Folder Path : Log Path
"tmp/stable_diffusion/sfw/":self.stable_diffusion_log,
"data/chatgpt/dalle/":"data/chatgpt/logs/dalle3.log",
"data/chatgpt/dalle2/":"data/chatgpt/logs/dalle2.log"
}
await self.upload_ftp_ai_images(ai_images_dict)
await self.meme_handler('tmp/meme/')
@tasks.loop(hours=1)
async def blog_loop(self):
try:
message = await self.generate_blog()
bot_stuff_channel = self.bot.get_channel(544408659174883328)
if message:
await bot_stuff_channel.send(message)
except Exception as error:
self.logger.exception("Failed to generate blog")
@commands.command(
description="Moderate",
help="This currently tool works by replacing the filename on the ftp server with a black image. The description will remain the same and may need to be altered.",
brief="Moderation Tools"
)
async def moderate(self, ctx, filename):
await self.upload_sftp(f"{self.data_dir}blank_image.png", (os.getenv('ftp_public_html') + 'ai-images/'), filename)
output = "Image " + filename + " replaced"
await ctx.send(output)
async def setup(bot):
if os.getenv("upload_phixxy") == "true":
asyncssh.set_log_level(30)
asyncssh.set_sftp_log_level(30)
await bot.add_cog(PhixxyCom(bot))
else:
pass

78
cogs/pokedex.py Normal file
View file

@ -0,0 +1,78 @@
import logging
import aiohttp
from discord.ext import commands
import discord
class Pokedex(commands.Cog):
def __init__(self, bot) -> None:
self.bot = bot
self.http_session = self.create_aiohttp_session()
def create_aiohttp_session(self):
return aiohttp.ClientSession()
async def get_json(self, url):
async with self.http_session.get(url) as resp:
json_data = await resp.json()
return json_data
@commands.command(
description="Pokedex",
help="Get information on pokemon",
brief="Pokedex",
aliases=['pdex'],
hidden=False
)
async def pokedex(self, ctx):
pokemon = ctx.message.content.split(" ", maxsplit=1)[1]
try:
shiny = False
if 'shiny ' in pokemon:
shiny = True
pokemon = pokemon.replace('shiny ', '')
url = "https://pokeapi.co/api/v2/pokemon/" + pokemon
dex_url = "https://pokeapi.co/api/v2/pokemon-species/" + pokemon
data = await self.get_json(url)
name = data['name']
height_str = str(int(data['height'])/10) + 'm'
weight_str = str(int(data['weight'])/10) + 'kg'
type1 = data['types'][0]['type']['name']
try:
type2 = data['types'][1]['type']['name']
type_str = type1.capitalize() + ', ' + type2.capitalize()
except:
type2 = "None"
type_str = type1.capitalize()
sprite = data["sprites"]["front_default"]
if shiny:
sprite = data["sprites"]["front_shiny"]
dex_data = await self.get_json(dex_url)
generation = dex_data['generation']['name'].upper().replace("GENERATION","Generation")
for entry in dex_data['flavor_text_entries']:
if entry['language']['name'] == 'en':
dex_desc = entry['flavor_text'].replace("\u000c", '\n')
dex_desc_game = entry['version']['name'].capitalize()
break
for entry in dex_data['genera']:
if entry['language']['name'] == 'en':
genus = entry['genus']
break
footer = generation + ' | Pokédex entry from Pokémon ' + dex_desc_game
dex_num = dex_data['pokedex_numbers'][0]['entry_number']
embed=discord.Embed(title=name.capitalize())
embed.set_image(url=sprite)
embed.add_field(name="Number", value=dex_num, inline=False)
embed.add_field(name=genus, value=dex_desc, inline=False)
embed.add_field(name="Weight", value=weight_str , inline=True)
embed.add_field(name="Height", value=height_str, inline=True)
embed.add_field(name="Types", value=type_str, inline=True)
embed.set_footer(text=footer)
await ctx.send(embed=embed)
except:
self.logger.exception("Something went wrong in pokedex")
message = "No data for " + str(pokemon)
await ctx.channel.send(message)
async def setup(bot):
await bot.add_cog(Pokedex(bot))

267
cogs/pokemon.py Normal file
View file

@ -0,0 +1,267 @@
import random
import logging
import os
import json
import math
import time
import aiohttp
from discord.ext import commands
import discord
class PokemonGame(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.working_dir = "tmp/pokemon/"
self.data_dir = "data/pokemon/"
self.folder_setup()
self.http_session = self.create_aiohttp_session()
self.logger = logging.getLogger("bot")
def create_aiohttp_session(self):
return aiohttp.ClientSession()
async def get_json(self, url):
async with self.http_session.get(url) as resp:
json_data = await resp.json()
return json_data
def folder_setup(self):
try:
if not os.path.exists(self.working_dir):
os.mkdir(self.working_dir)
if not os.path.exists(self.data_dir):
os.mkdir(self.data_dir)
except:
self.logger.exception("PokemonGame failed to make directories")
async def starter_picker(self, id): #id = pokedex number
url = "https://pokeapi.co/api/v2/pokemon-species/" + str(id)
json_data = await self.get_json(url)
if (json_data["evolves_from_species"] == None) and (not json_data['is_mythical']) and (not json_data['is_legendary']):
return True
else:
return False
async def shiny_roll(self):
roll = random.randint(0,2047)
return not roll
async def save_pokemon(self, discord_id, pokemon_dict):
path = self.data_dir+str(discord_id)+".json"
pokemon_dict = json.dumps(pokemon_dict)
with open(path, 'w') as f:
f.writelines(pokemon_dict)
return True
async def load_pokemon(self, discord_id):
if os.path.isfile(self.data_dir+str(discord_id)+".json"):
with open(self.data_dir+str(discord_id)+".json", 'r') as f:
json_data = json.loads(f.readline())
return json_data
else:
return False
async def generate_starter(self, discord_id):
random.seed(discord_id)
json_data = await self.get_json('https://pokeapi.co/api/v2/pokemon-species/')
pokemon_count = json_data['count']
base_pokemon = False
while not base_pokemon:
starter_id = random.randint(1,pokemon_count)
base_pokemon = await self.starter_picker(starter_id)
random.seed()
return starter_id
async def get_pkmn_from_id(self, id):
url = 'https://pokeapi.co/api/v2/pokemon/' + str(id)
json_data = await self.get_json(url)
return json_data
async def give_buddy_food(self, pkmn_data):
try:
last_food = pkmn_data['last_food']
except:
last_food = 0
this_food = time.time()
if (this_food - last_food) >= 1800:
pkmn_data['last_food'] = this_food
level = await self.calc_pkmn_buddy_level(pkmn_data)
pkmn_data['buddy_xp'] += (4*level)
return pkmn_data, True
else:
return pkmn_data, False
async def give_buddy_affection(self, pkmn_data):
try:
last_hug = pkmn_data['last_hug']
except:
last_hug = 0
this_hug = time.time()
if (this_hug - last_hug) >= 600:
pkmn_data['last_hug'] = this_hug
level = await self.calc_pkmn_buddy_level(pkmn_data)
pkmn_data['buddy_xp'] += (3*level)
return pkmn_data, True
else:
return pkmn_data, False
async def calc_pkmn_buddy_level(self, pkmn_json): #this uses the 'fast' xp rate
buddy_xp = pkmn_json['buddy_xp']
return min(math.floor(((5*buddy_xp)/4)**(1/3)),100)
async def make_pmkn_embed(self, pkmn_dict):
if pkmn_dict['nickname']:
title = pkmn_dict['nickname'] + ' (' + pkmn_dict['name'].capitalize() + ')'
else:
title = pkmn_dict['name'].capitalize()
embed=discord.Embed(title=title)
if pkmn_dict['shiny']:
embed.set_image(url=pkmn_dict['sprites']['front_shiny'])
else:
embed.set_image(url=pkmn_dict['sprites']['front_default'])
nature = pkmn_dict['nature']
buddy_level = await self.calc_pkmn_buddy_level(pkmn_dict)
buddy_xp = pkmn_dict['buddy_xp']
types = []
for key in pkmn_dict['types']:
types.append(key['type']['name'].capitalize())
type_str = ', '.join(types)
embed.add_field(name="Nature", value=nature.capitalize(), inline=False)
embed.add_field(name="Buddy Level", value=buddy_level , inline=True)
embed.add_field(name="Buddy XP", value=buddy_xp, inline=True)
embed.add_field(name="Types", value=type_str, inline=False)
return embed
async def get_pokemon_evolve_data(self, pkmn_data):
pkmn_id = pkmn_data['id']
url = f"https://pokeapi.co/api/v2/pokemon-species/{pkmn_id}/"
json_data = await self.get_json(url)
evolution_url = json_data['evolution_chain']['url']
evolution_data = await self.get_json(evolution_url)
evolution_level = evolution_data['chain']['evolves_to'][0]['evolution_details'][0]['min_level']
evolution_type = evolution_data['chain']['evolves_to'][0]['evolution_details'][0]['trigger']['name']
new_pokemon_url = evolution_data['chain']['evolves_to'][0]['species']['url']
new_pokemon_url = new_pokemon_url.replace("pokemon-species/","pokemon/")
return evolution_level, evolution_type, new_pokemon_url
# Some bug in this that doesn't allow middle evolution to evolve
'''@commands.command()
async def pkmn_evolve(self, ctx):
if os.path.isfile(self.data_dir+str(ctx.author.id)+'.json'):
pkmn_data = await self.load_pokemon(ctx.author.id)
evolution_level, evolution_type, new_pokemon_url = await self.get_pokemon_evolve_data(pkmn_data)
buddy_level = await self.calc_pkmn_buddy_level(pkmn_data)
if buddy_level > evolution_level and evolution_type == 'level-up':
new_pokemon_json = await self.get_json(new_pokemon_url)
new_pokemon_json['shiny'] = pkmn_data['shiny']
new_pokemon_json['nickname'] = pkmn_data['nickname']
new_pokemon_json['unique_id'] = pkmn_data['unique_id']
new_pokemon_json['nature'] = pkmn_data['nature']
new_pokemon_json['buddy_level'] = pkmn_data['buddy_level']
new_pokemon_json['buddy_xp'] = pkmn_data['buddy_xp']
new_pokemon_json['last_food'] = pkmn_data['last_food']
new_pokemon_json['last_hug'] = pkmn_data['last_hug']
await self.save_pokemon(ctx.author.id, new_pokemon_json)
embed = await self.make_pmkn_embed(new_pokemon_json)
await ctx.send(embed=embed)
else:
await ctx.send("You don't have enough buddy xp to evolve this pokemon.")'''
@commands.command()
async def pkmn_start(self, ctx):
if not os.path.isfile(self.data_dir+str(ctx.author.id)+'.json'):
uniq_id = time.time()
starter_id = await self.generate_starter(ctx.author.id)
json_data = await self.get_pkmn_from_id(starter_id)
is_shiny = await self.shiny_roll()
nature = random.randint(0,19)
nature_data = await self.get_json('https://pokeapi.co/api/v2/nature/')
nature = nature_data['results'][nature]['name']
json_data['shiny'] = is_shiny
json_data['nickname'] = None
json_data['unique_id'] = uniq_id
json_data['nature'] = nature
json_data['buddy_level'] = 1
json_data['buddy_xp'] = 0
json_data['last_food'] = 0
json_data['last_hug'] = 0
await self.save_pokemon(ctx.author.id, json_data)
embed = await self.make_pmkn_embed(json_data)
await ctx.channel.send(embed=embed)
return
else:
await ctx.channel.send("You already have a pokemon!")
return
@commands.command()
async def pkmn_nick(self, ctx, nickname):
json_data = await self.load_pokemon(ctx.author.id)
json_data['nickname'] = nickname
await self.save_pokemon(ctx.author.id, json_data)
message = "You gave " + nickname + " a new name!"
await ctx.channel.send(message)
@commands.command()
@commands.cooldown(1, 3600, commands.BucketType.user)
async def pkmn_feed(self, ctx):
discord_id = ctx.author.id
json_data = await self.load_pokemon(discord_id)
json_data, fed = await self.give_buddy_food(json_data)
if fed:
await self.save_pokemon(ctx.author.id, json_data)
if json_data['nickname']:
message = "You Feed " + json_data['nickname']
else:
message = "You Feed " + json_data['name']
await ctx.channel.send(message)
return
@commands.command()
@commands.cooldown(1, 1800, commands.BucketType.user)
async def pkmn_hug(self, ctx):
discord_id = ctx.author.id
json_data = await self.load_pokemon(discord_id)
json_data, hugged = await self.give_buddy_affection(json_data)
if hugged:
await self.save_pokemon(discord_id, json_data)
if json_data['nickname']:
message = "You " + "hugged" + ' ' + json_data['nickname']
else:
message = "You " + "hugged" + ' ' + json_data['name']
await ctx.channel.send(message)
@commands.command()
async def pokemon(self, ctx):
discord_id = ctx.author.id
buddy_json = await self.load_pokemon(discord_id)
if not buddy_json:
await ctx.channel.send("You don't have a buddy yet. Type ```!pokemon start``` to start your Pokemon journey!")
else:
embed = await self.make_pmkn_embed(buddy_json)
message = await ctx.channel.send(embed=embed)
return
async def pkmn_msg(self, discord_id):
path = self.data_dir+str(discord_id)+'.json'
if os.path.isfile(path):
with open(path, 'r') as f:
json_data = json.loads(f.readline())
json_data['buddy_xp'] += random.randint(1,5)
json_data = json.dumps(json_data)
with open(path, 'w') as f:
f.writelines(json_data)
@commands.Cog.listener()
async def on_message(self, message: discord.Message):
await self.pkmn_msg(message.author.id)
async def setup(bot):
await bot.add_cog(PokemonGame(bot))

52
cogs/roll_poll.py Normal file
View file

@ -0,0 +1,52 @@
import random
import discord
from discord.ext import commands
@commands.command(
description="Poll",
help='Create a poll with up to 9 options. Usage: !poll "Put question here" "option 1" "option 2"',
brief="Enable or disable bot reactions"
)
async def poll(ctx, question, *options: str):
if len(options) > 9:
await ctx.send("Error: You cannot have more than 9 options")
return
embed = discord.Embed(title=question, colour=discord.Colour(0x283593))
for i, option in enumerate(options):
embed.add_field(name=f"Option {i+1}", value=option, inline=False)
message = await ctx.send(embed=embed)
numbers = {0: "\u0030\ufe0f\u20e3", 1: "\u0031\ufe0f\u20e3", 2: "\u0032\ufe0f\u20e3", 3: "\u0033\ufe0f\u20e3", 4: "\u0034\ufe0f\u20e3", 5: "\u0035\ufe0f\u20e3", 6: "\u0036\ufe0f\u20e3", 7: "\u0037\ufe0f\u20e3", 8: "\u0038\ufe0f\u20e3", 9: "\u0039\ufe0f\u20e3"}
for i in range(len(options)):
await message.add_reaction(numbers.get(i+1))
@commands.command(
description="Roll",
help="Rolls dice mostly for Dungeons and Dragons type games. Usage: !roll 3d6+2",
brief="Simulate rolling dice"
)
async def roll(ctx, dice_string):
dice_parts = dice_string.split('d')
num_dice = int(dice_parts[0])
if '+' in dice_parts[1]:
die_parts = dice_parts[1].split('+')
die_size = int(die_parts[0])
modifier = int(die_parts[1])
elif '-' in dice_parts[1]:
die_parts = dice_parts[1].split('-')
die_size = int(die_parts[0])
modifier = -int(die_parts[1])
else:
die_size = int(dice_parts[1])
modifier = 0
rolls = [random.randint(1, die_size) for i in range(num_dice)]
dice_str = ' + '.join([str(roll) for roll in rolls])
total = sum(rolls) + modifier
await ctx.send(f'{dice_str} + {modifier} = {total}' if modifier != 0 else f'{dice_str} = {total}')
async def setup(bot):
bot.add_command(roll)
bot.add_command(poll)

31
cogs/runescape.py Normal file
View file

@ -0,0 +1,31 @@
#sparkytron3000 plugin
from discord.ext import commands
@commands.command(
description="RSGP",
help="Uses probably outdated information to calculate how much rsgp is worth in usd. Usage: !rsgp (amount)",
brief="Runescape gold to usd"
)
async def rsgp(ctx, amount):
output = ""
cost_per_bil = 21.90 #1b rsgp to usd
cost_per_bil_os = 182
gold_per_bond = 70000000
gold_per_bond_os = 7000000
cost_per_bond = 8 #dollars usd
bondcost = (int(amount)/gold_per_bond) * cost_per_bond
rwtcost = (int(amount) * cost_per_bil / 1000000000)
dollar_gp = (int(amount)*1000000000)/cost_per_bil
osbondcost = (int(amount)/gold_per_bond_os) * cost_per_bond
osrwtcost = (int(amount) * cost_per_bil_os / 1000000000)
osdollar_gp = (int(amount)*1000000000)/cost_per_bil_os
output += str(amount) + ' rs3 gp would cost: $' + str(round(rwtcost,2)) + " (RWT)\n"
output += str(amount) + ' osrs gp would cost: $' + str(round(osrwtcost,2)) + " (RWT)\n"
output += str(amount) + ' rs3 gp would cost: $' + str(round(bondcost,2)) + " (Bonds)\n"
output += str(amount) + ' osrs gp would cost: $' + str(round(osbondcost,2)) + " (Bonds)\n"
output += str(amount) + ' dollars spent on rs3 gp would be: ' + str(round(dollar_gp,2)) + " (RS3 GP)\n"
output += str(amount) + ' dollars spent on osrs gp would be: ' + str(round(osdollar_gp,2)) + " (OSRS GP)\n"
await ctx.send(output)
async def setup(bot):
bot.add_command(rsgp)

533
cogs/stable_diffusion.py Normal file
View file

@ -0,0 +1,533 @@
# This extension enables the ability to generate AI artwork using the AUTOMATIC1111 API
import io
import logging
import base64
import os
import time
import random
from PIL import Image, PngImagePlugin
import aiohttp
import discord
from discord.ext import commands
class StableDiffusion(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.stable_diffusion_url = os.getenv("stablediffusion_url") # Change this to stable_diffusion_url
self.working_dir = "tmp/stable_diffusion/"
self.data_dir = "data/stable_diffusion/"
self.default_neg_prompt = "easynegative, badhandv4, verybadimagenegative_v1.3"
self.folder_setup()
self.http_session = self.create_aiohttp_session()
self.logger = logging.getLogger("bot")
def create_aiohttp_session(self):
return aiohttp.ClientSession()
def folder_setup(self) -> None:
try:
if not os.path.exists(self.working_dir):
os.mkdir(self.working_dir)
os.mkdir(f"{self.working_dir}sfw")
os.mkdir(f"{self.working_dir}nsfw")
if not os.path.exists(self.data_dir):
os.mkdir(self.data_dir)
except:
self.logger.exception("StableDiffusion failed to make directories")
"""
answer_question asynchronously calls the OpenAI API to get a response for the given question/topic using the specified model.
Parameters:
- topic (str): The question or topic to get a response for.
- model (str): The OpenAI model to use. Defaults to "gpt-3.5-turbo".
Returns:
- str: The response from the OpenAI API.
Raises:
- Exception: If an error occurs when calling the API.
"""
async def answer_question(self, topic: str, model: str="gpt-3.5-turbo") -> str: # Only needed for draw command
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {os.getenv("openai.api_key")}',
}
data = {
"model": model,
"messages": [{"role": "user", "content": topic}]
}
url = "https://api.openai.com/v1/chat/completions"
try:
async with self.http_session.post(url, headers=headers, json=data) as resp:
response_data = await resp.json()
response = response_data['choices'][0]['message']['content']
return response
except:
return "Error in answer question in stable_diffusion"
"""
Gets key-value pairs from a context message.
Parses the message content to extract key-value pairs separated by '='.
Returns a dict of key-value pairs.
Parameters:
ctx (commands.Context): The context object containing the message.
Returns:
dict: A dict of key-value pairs extracted from the message.
"""
def get_kv_from_ctx(self, ctx: commands.Context) -> dict:
try:
prompt = ctx.message.content.split(" ", maxsplit=1)[1]
kv_strings = list(filter(lambda x: '=' in x,prompt.split(' ')))
key_value_pairs = dict(map(lambda a: a.replace(',','').split('='),kv_strings))
return key_value_pairs
except:
return None
"""
Gets prompt from context message by splitting on spaces and removing key-value pairs.
Splits the context message content on spaces, takes the second part after
the command name. Removes any key-value pairs separated by '=' from the prompt.
Parameters:
ctx (commands.Context): The context object containing the message.
Returns:
str: The prompt text extracted from the context message.
"""
def get_prompt_from_ctx(self, ctx: commands.Context) -> str:
try:
prompt = ctx.message.content.split(" ", maxsplit=1)[1]
prompt = ' '.join(list(filter(lambda x: '=' not in x,prompt.split(' '))))
return prompt
except:
return None
"""
Encodes an image file from the given path into a base64 string.
Opens the image file, encodes it into a base64 string, closes the image,
and returns the encoded string.
Parameters:
path (str): The path to the image file.
Returns:
str: The base64 encoded image data.
"""
async def my_open_img_file(self, path: str) -> str:
img = Image.open(path)
encoded = ""
with io.BytesIO() as output:
img.save(output, format="PNG")
contents = output.getvalue()
encoded = str(base64.b64encode(contents), encoding='utf-8')
img.close()
return encoded
"""
Looks at an image attachment in the given context and returns metadata about it.
If the look parameter is True, this iterates through the attachments
in the context checking for image files. If an image is found, it is
downloaded and encoded to base64. The image is then sent to the
Stable Diffusion API to generate a caption, which is returned in the metadata.
Parameters:
ctx (commands.Context): The context containing the command and attachments.
look (bool): Whether to look at images and generate metadata.
Returns:
str: The metadata string containing any generated image captions.
"""
async def look_at(self, ctx: commands.Context, look: bool=False) -> str:
metadata = ""
if look:
url = self.stable_diffusion_url
if url == "disabled":
return "Stable Diffusion is disabled, could not look at image"
for attachment in ctx.attachments:
if attachment.url.endswith(('.jpg', '.png')):
self.logger.debug("image seen")
async with self.http_session.get(attachment.url) as response:
imageName = self.working_dir + str(time.time_ns()) + '.png'
with open(imageName, 'wb') as out_file:
self.logger.debug('Saving image: ' + imageName)
while True:
chunk = await response.content.read(1024)
if not chunk:
break
out_file.write(chunk)
img_link = await self.my_open_img_file(imageName)
try:
payload = {"image": img_link}
async with self.http_session.post(f'{url}/sdapi/v1/interrogate', json=payload) as response:
data = await response.json()
description = data.get("caption")
description = description.split(',')[0]
metadata += f"<image:{description}>\n"
except aiohttp.ClientError:
self.logger.exception("ERROR: CLIP may not be running. Could not look at image.")
return "ERROR: CLIP may not be running. Could not look at image."
return metadata
"""
Generates a prompt for use with an AI art generator.
Combines randomly selected question prompts with an AI assistant's response,
then optionally removes abstract keywords and adds modifiers like "masterpiece"
to create a prompt that describes a detailed scene or character for the AI art
generator.
"""
async def generate_prompt(self) -> str:
choice1 = "Give me 11 keywords I can use to generate art using AI. They should all be related to one piece of art. Please only respond with the keywords and no other text. Be sure to use keywords that really describe what the art portrays. Keywords should be comma separated with no other text!"
choice2 = "Describe a creative scene, use only one sentence"
choice3 = "Give me comma seperated keywords describing an imaginary piece of art. Only return the keywords and no other text."
choice4 = "Describe a unique character and an environment in one sentence"
choice5 = "Describe a nonhuman character and an environment in one sentence"
prompt = random.choice([choice1,choice2,choice3,choice4,choice5])
prompt = await self.answer_question(prompt)
if random.randint(0,9):
prompt = prompt.replace("abstract, ", "")
prompt = prompt.replace("AI, ", "")
if "." in prompt:
prompt = prompt.replace(".",",")
prompt = prompt + " masterpiece, studio quality"
else:
prompt = prompt + ", masterpiece, studio quality"
return prompt
@commands.command(
description="Change Model",
help="Changes the Stable Diffusion model used by the bot.",
brief="Change stable diffusion model"
)
async def change_model(self, ctx: commands.Context, model_choice: str='0') -> None: # Needs to be a configurable list of models
model_choices = {
'1': ("deliberate_v2.safetensors [9aba26abdf]", "DeliberateV2"),
'2': ("flat2DAnimerge_v30.safetensors [5dd56bfa12]", "Flat2D"),
'3': ("Anything-V3.0.ckpt [8712e20a5d]", "AnythingV3"),
'4': ("aZovyaPhotoreal_v2.safetensors [dde3b17c05]", "PhotorealV2"),
'5': ("Pixel_Art_V1_PublicPrompts.ckpt [0f02127697]", "Pixel Art"),
'6': ("mistoonAnime_v20.safetensors [c35e1054c0]", "Mistoon AnimeV2")
}
url = self.stable_diffusion_url
if url == "disabled":
await ctx.send("This command is currently disabled")
else:
async with self.http_session.get(url=f'{url}/sdapi/v1/options') as response:
config_json = await response.json()
current_model = config_json["sd_model_checkpoint"]
output = 'Current Model: ' + current_model + '\n'
if model_choice in model_choices:
model_id, model_name = model_choices[model_choice]
if current_model != model_id:
payload = {"sd_model_checkpoint": model_id}
async with self.http_session.post(url=f'{url}/sdapi/v1/options', json=payload) as response:
output = "Changed model to: " + model_name
await ctx.send(output)
return
else:
await ctx.send(f"Already set to use {model_name}")
return
else:
output = '\n'.join([f"{choice}: {name}" for choice, name in model_choices.items()])
await ctx.send(output)
@commands.command(
description="Lora",
help="Lists available Stable Diffusion loras and their trigger words.",
brief="List the stable diffusion loras"
)
async def lora(self, ctx: commands.Context) -> None:
lora_choices = {
'0': ("Lora Name", "Trigger Words"),
'1': ("<lora:rebecca:1>", "rebecca (cyberpunk)"),
'2': ("<lora:lucy:1>", "lucy (cyberpunk)"),
'3': ("<lora:dirty:1>", "dirty"),
'4': ("<lora:starcraft:1>", "c0nst3llation")
}
output = ""
lora_options = '\n'.join([f"{choice}: {name}" for choice, name in lora_choices.items()])
output += lora_options
await ctx.send(output)
"""
Gets the image URL from a Discord context.
Checks for an image URL in attachments or message content.
Args:
ctx: Discord context
Returns:
str: Image URL or None
"""
async def get_image_from_ctx(self, ctx: commands.Context) -> str:
if ctx.message.attachments:
file_url = ctx.message.attachments[0].url
return file_url
try:
file_url = ctx.message.content.split(" ", maxsplit=1)[1]
return file_url
except:
self.logger.info("Couldn't find image.")
return None
"""
Sends an image generation request to the Stable Diffusion API.
Args:
ctx: The Discord context.
prompt: The text prompt to generate the image from.
Returns:
None. Sends the generated image back to the user.
"""
async def txt2img(self, ctx: commands.Context, prompt: str) -> None:
url = f"{self.stable_diffusion_url}/sdapi/v1/txt2img"
key_value_pairs = self.get_kv_from_ctx(ctx)
headers = {'Content-Type': 'application/json'}
payload = {
"prompt": prompt,
"steps": 25,
"negative_prompt": self.get_negative_prompt()
}
if key_value_pairs:
payload.update(key_value_pairs)
try:
async with self.http_session.post(url, headers=headers, json=payload) as resp:
if resp.status != 200:
await ctx.send(f"{resp.status} {resp.reason}")
self.logger.exception(f"{resp.status} {resp.reason}")
return
r = await resp.json()
except ConnectionRefusedError:
await ctx.send("Failed to connect to image generation service")
self.logger.exception("Failed to connect to image generation service")
return
except:
await ctx.send("Failed to generate image")
self.logger.exception("Failed to generate image")
return
await self.send_generated_image(ctx, r['images'], prompt)
"""
Saves an image from a URL to disk.
Args:
url: The URL of the image to save.
Returns:
The path to the saved image file.
"""
async def save_image(self, url: str) -> str:
async with self.http_session.get(url) as response:
image_name = self.working_dir + str(time.time_ns()) + ".png"
with open(image_name, 'wb') as out_file:
self.logger.debug(f"Saving image: {image_name}")
while True:
chunk = await response.content.read(1024)
if not chunk:
break
out_file.write(chunk)
return image_name
"""
Generates an image by modifying an initial image based on an optional
text prompt.
Sends a request to the Stable Diffusion API to modify the initial image
according to the given prompt. The modified image is then sent back to
the user.
Args:
ctx: The Discord context.
prompt: The text prompt to guide image modification.
Returns:
None. Sends the generated image back to the user.
"""
async def img2img(self, ctx: commands.Context, prompt: str) -> None:
url = f"{self.stable_diffusion_url}/sdapi/v1/img2img"
file_url = await self.get_image_from_ctx(ctx)
image_name = await self.save_image(file_url)
file_url = await self.my_open_img_file(image_name)
key_value_pairs = self.get_kv_from_ctx(ctx)
headers = {'Content-Type': 'application/json'}
payload = {
"init_images": [file_url],
"prompt": prompt,
"steps": 40,
"negative_prompt": self.get_negative_prompt(),
"denoising_strength": 0.5,
}
if key_value_pairs:
payload.update(key_value_pairs)
try:
async with self.http_session.post(url, headers=headers, json=payload) as resp:
if resp.status != 200:
await ctx.send(f"{resp.status} {resp.reason}")
self.logger.error(f"{resp.status} {resp.reason}")
return
r = await resp.json()
except ConnectionRefusedError:
await ctx.send("Failed to connect to image generation service")
self.logger.exception("Failed to connect to image generation service")
return
except:
await ctx.send("Failed to generate image")
self.logger.exception("Failed to generate image")
return
await self.send_generated_image(ctx, r['images'], prompt)
"""
Sends a generated image file to Discord along with the prompt.
Saves the image file locally first, logs the prompt and filename,
then sends the image and prompt to Discord.
Args:
ctx: The Discord context.
images: List of base64 encoded image data.
prompt: The text prompt used to generate the image.
Returns: None.
"""
async def send_generated_image(self, ctx: commands.Context, images: dict, prompt: str) -> None:
for i in images:
image = Image.open(io.BytesIO(base64.b64decode(i.split(",", 1)[0])))
try:
if ctx.channel.is_nsfw():
folder = self.working_dir + "nsfw/"
else:
folder = self.working_dir + "sfw/"
except:
folder = self.working_dir
my_filename = str(time.time_ns()) + ".png"
filepath = folder + my_filename
image.save(filepath)
with open(filepath, "rb") as fh:
f = discord.File(fh, filename=filepath)
prompt = prompt.replace('\n',' ')
log_data = f'Author: {ctx.author.name}, Prompt: {prompt}, Filename: {my_filename}\n'
with open(f"{self.data_dir}stable_diffusion.log", 'a') as log_file:
log_file.writelines(log_data)
await ctx.send(f'Generated by: {ctx.author.name}\nPrompt: {prompt}', file=f)
"""
Gets a negative prompt text from a file.
If the file does not exist, it will be created with
default negative prompt text.
Returns:
str: The negative prompt text loaded from the file.
"""
def get_negative_prompt(self) -> str:
try:
neg_prompt_file = f"{self.data_dir}negative_prompt.txt"
with open(neg_prompt_file, 'r') as f:
negative_prompt = f.readline()
except:
neg_prompt_file = f"{self.data_dir}negative_prompt.txt"
with open(neg_prompt_file, 'w') as f:
f.writelines(self.default_neg_prompt)
negative_prompt = self.default_neg_prompt
return negative_prompt
@commands.command(
description="Imagine",
help="Generate an image using stable diffusion. You can add keyword arguments to your prompt and they will be treated as stable diffusion options. Usage !imagine (topic)",
brief="Generate an image"
)
async def imagine(self, ctx: commands.Context) -> None:
url = self.stable_diffusion_url
if url == "disabled":
await ctx.send("Command is currently disabled")
return
prompt = self.get_prompt_from_ctx(ctx)
if prompt == None:
prompt = await self.generate_prompt()
await ctx.send(f"Please be patient this may take some time! Generating: {prompt}.")
await self.txt2img(ctx, prompt)
@commands.command(
description="Describe",
help="Get better understanding of what the bot \"sees\" when you post an image! (Runs it through CLIP) Usage !describe (image link)",
brief="Describe image"
)
async def describe(self, ctx: commands.Context) -> None:
url = self.stable_diffusion_url
if url == "disabled":
await ctx.send("Command is currently disabled")
return
else:
url=f"{url}/sdapi/v1/interrogate"
file_url = await self.get_image_from_ctx(ctx)
image_name = await self.save_image(file_url)
img_link = await self.my_open_img_file(image_name)
try:
payload = {"image": img_link}
async with self.http_session.post(url, json=payload) as response:
r = await response.json()
await ctx.send(r.get("caption"))
except:
self.logger.exception("error in describe")
await ctx.send("My image generation service may not be running.")
@commands.command(
description="Reimagine",
help="Reimagine an image as something else. One example is reimagining a picture as anime. This command can be hard to use. \nUsage: !reimagine (image link) (topic)\nExample: !reimagine (image link) anime",
brief="Reimagine an image"
)
async def reimagine(self, ctx: commands.Context) -> None:
url = self.stable_diffusion_url
if url == "disabled":
await ctx.send("Command is currently disabled")
return
prompt = self.get_prompt_from_ctx(ctx)
if not prompt:
prompt = ""
await ctx.send(f"Please be patient this may take some time! Generating: {prompt}.")
await self.img2img(ctx, prompt)
@commands.command(
description="Negative Prompt",
help="Changes the negative prompt for imagine across all channels",
brief="Change the negative prompt for imagine"
)
async def negative_prompt(self, ctx: commands.Context, *args: list) -> None:
message = ' '.join(args)
if not message:
message = self.default_neg_prompt
neg_prompt_file = f"{self.data_dir}negative_prompt.txt"
with open(neg_prompt_file, 'w') as f:
f.writelines(message)
await ctx.send("Changed negative prompt to " + message)
async def setup(bot: commands.Bot):
await bot.add_cog(StableDiffusion(bot))

121
cogs/tts.py Normal file
View file

@ -0,0 +1,121 @@
import time
import os
import logging
import aiohttp
import discord
from discord.ext import commands
class TextToSpeech(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.working_dir = "tmp/tts/"
self.data_dir = "data/tts/"
self.folder_setup()
self.http_session = self.create_aiohttp_session()
self.logger = logging.getLogger("bot")
def create_aiohttp_session(self):
return aiohttp.ClientSession()
def folder_setup(self):
try:
if not os.path.exists(self.working_dir):
os.mkdir(self.working_dir)
if not os.path.exists(self.data_dir):
os.mkdir(self.data_dir)
except:
self.logger.exception("TextToSpeech failed to make directories")
async def text_to_speech(self, prompt):
CHUNK_SIZE = 1024
url = "https://api.elevenlabs.io/v1/text-to-speech/21m00Tcm4TlvDq8ikWAM"
api_key = os.getenv("eleven_labs")
headers = {
"Accept": "audio/mpeg",
"Content-Type": "application/json",
"xi-api-key": api_key
}
data = {
"text": prompt,
"model_id": "eleven_monolingual_v1",
"voice_settings": {
"stability": 0.5,
"similarity_boost": 0.5
}
}
filename = f"{time.time_ns()}.mp3"
filepath = f"{self.data_dir}{filename}"
response = await self.http_session.post(url, json=data, headers=headers)
with open(filepath, 'wb') as f:
async for chunk in response.content.iter_chunked(CHUNK_SIZE):
if chunk:
f.write(chunk)
return filepath
async def open_text_to_speech(self, prompt):
CHUNK_SIZE = 1024
url = "https://api.openai.com/v1/audio/speech"
api_key = os.getenv("openai.api_key")
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
data = {
"input": prompt,
"model": "tts-1",
"voice": "alloy"
}
filename = f"{time.time_ns()}.mp3"
filepath = f"{self.data_dir}{filename}"
response = await self.http_session.post(url, json=data, headers=headers)
with open(filepath, 'wb') as f:
async for chunk in response.content.iter_chunked(CHUNK_SIZE):
if chunk:
f.write(chunk)
return filepath
def get_prompt_from_ctx(self, ctx):
try:
prompt = ctx.message.content.split(" ", maxsplit=1)[1]
prompt = ' '.join(list(filter(lambda x: '=' not in x,prompt.split(' '))))
return prompt
except:
return None
@commands.command()
async def tts(self, ctx):
prompt = self.get_prompt_from_ctx(ctx)
if prompt is None:
await ctx.send("Please provide a prompt")
return
else:
await ctx.send("Generating...")
try:
filepath = await self.text_to_speech(prompt)
await ctx.send(file=discord.File(filepath))
except:
await ctx.send("Error in tts")
self.logger.exception("Error in tts")
@commands.command()
async def opentts(self, ctx):
prompt = self.get_prompt_from_ctx(ctx)
if prompt is None:
await ctx.send("Please provide a prompt")
return
else:
await ctx.send("Generating...")
try:
filepath = await self.open_text_to_speech(prompt)
await ctx.send(file=discord.File(filepath))
except:
await ctx.send("Error in tts")
self.logger.exception("Error in tts")
async def setup(bot):
await bot.add_cog(TextToSpeech(bot))