moved all files to extensions folder
This commit is contained in:
parent
65117c42e8
commit
040e0e72fc
10 changed files with 0 additions and 0 deletions
72
extensions/admin.py
Normal file
72
extensions/admin.py
Normal file
|
|
@ -0,0 +1,72 @@
|
|||
#Adds administrative commands to the bot
|
||||
import os
|
||||
import sys
|
||||
import subprocess
|
||||
|
||||
import asyncssh
|
||||
from discord.ext import commands
|
||||
|
||||
async def upload_sftp(local_filename, server_folder, server_filename):
|
||||
remotepath = server_folder + server_filename
|
||||
async with asyncssh.connect(os.getenv('ftp_server'), username=os.getenv('ftp_username'), password=os.getenv('ftp_password')) as conn:
|
||||
async with conn.start_sftp_client() as sftp:
|
||||
await sftp.put(local_filename, remotepath=remotepath)
|
||||
|
||||
@commands.command(
|
||||
description="Kill",
|
||||
help="Kills the bot in event of an emergency. Only special users can do this! Usage: !kill",
|
||||
brief="Kill the bot",
|
||||
hidden=True
|
||||
)
|
||||
async def kill(ctx):
|
||||
"Kills the bot"
|
||||
if ctx.author.id == 242018983241318410:
|
||||
exit()
|
||||
else:
|
||||
await ctx.channel.send("You don't have permission to do that.")
|
||||
|
||||
@commands.command(
|
||||
description="Reset",
|
||||
help="Resets the bot in event of an emergency. Only special users can do this! Usage: !reset",
|
||||
brief="Reset the bot",
|
||||
hidden=True
|
||||
)
|
||||
async def reset(ctx):
|
||||
if ctx.author.id == 242018983241318410:
|
||||
python = sys.executable
|
||||
os.execl(python, python, *sys.argv)
|
||||
else:
|
||||
await ctx.channel.send("You don't have permission to do that.")
|
||||
|
||||
@commands.command(
|
||||
description="Update",
|
||||
help="This will update sparkytron to the most recent version on github. Only privileged users can run this command! Usage: !update",
|
||||
brief="Runs git pull",
|
||||
hidden=True
|
||||
)
|
||||
async def update(ctx):
|
||||
if ctx.author.id == 242018983241318410:
|
||||
output = subprocess.run(["git","pull"],capture_output=True)
|
||||
if output.stderr:
|
||||
await ctx.send("Update Attempted")
|
||||
await ctx.send(output.stderr.decode('utf-8'))
|
||||
else:
|
||||
await ctx.send(output.stdout.decode('utf-8'))
|
||||
else:
|
||||
await ctx.send("You don't have permission to do this.")
|
||||
|
||||
@commands.command(
|
||||
description="Moderate",
|
||||
help="This currently tool works by replacing the filename on the ftp server with a black image. The description will remain the same and may need to be altered.",
|
||||
brief="Moderation Tools"
|
||||
)
|
||||
async def moderate(ctx, filename):
|
||||
await upload_sftp("blank_image.png", (os.getenv('ftp_public_html') + 'ai-images/'), filename)
|
||||
output = "Image " + filename + " replaced"
|
||||
await ctx.send(output)
|
||||
|
||||
async def setup(bot):
|
||||
bot.add_command(update)
|
||||
bot.add_command(reset)
|
||||
bot.add_command(kill)
|
||||
bot.add_command(moderate)
|
||||
215
extensions/currency.py
Normal file
215
extensions/currency.py
Normal file
|
|
@ -0,0 +1,215 @@
|
|||
#plugin for sparkytron3000
|
||||
import time
|
||||
import random
|
||||
import json
|
||||
import math
|
||||
|
||||
from discord.ext import commands
|
||||
|
||||
@commands.command(
|
||||
description="Currency",
|
||||
help="Server currency. You can run !currency claim to get started!", #This needs an overhaul
|
||||
brief="Server currency tools"
|
||||
)
|
||||
async def currency(ctx, arg1=None, arg2=None, arg3=None, arg4=None):
|
||||
|
||||
def read_db(filepath):
|
||||
with open(filepath,"r") as fileobj:
|
||||
db_content = json.load(fileobj)
|
||||
#print(db_content,type(db_content))
|
||||
return db_content
|
||||
|
||||
def save_to_db(filepath,db_content):
|
||||
with open(filepath,"w") as fileobj:
|
||||
json.dump(db_content,fileobj,indent=4)
|
||||
|
||||
def add_currency(filepath,amount):
|
||||
player_db = read_db(filepath)
|
||||
player_db["currency"] += amount
|
||||
save_to_db(filepath,player_db)
|
||||
return player_db
|
||||
|
||||
def calc_level_from_xp(xp):
|
||||
level = min(100,math.floor(0.262615*xp**0.3220627))
|
||||
return level
|
||||
|
||||
def add_xp(filepath,player_db,time_spent):
|
||||
activity = player_db["status"]["current_activity"]
|
||||
starting_xp = player_db["skills"][activity]["xp"]
|
||||
starting_level = player_db["skills"][activity]["level"]
|
||||
|
||||
|
||||
if activity == "mining":
|
||||
equipment_level = player_db["equipment"]["pickaxe"]["level"]
|
||||
xp_gained = (time_spent) * equipment_level # SKILL LEVEL of XP / SEC
|
||||
player_db["skills"][activity]["xp"] += xp_gained
|
||||
new_xp = starting_xp + xp_gained
|
||||
new_level = calc_level_from_xp(new_xp) #calculate with curve here
|
||||
player_db["skills"][activity]["level"] = new_level
|
||||
levels_gained = new_level - starting_level
|
||||
|
||||
summary = "" #summary should include xp gained, levels gained (only if any were gained), current level (or new level)
|
||||
summary += "You gained {} {} xp!".format(xp_gained, activity)
|
||||
if levels_gained > 0: #if levels gained > 0, then include levels gained in summary
|
||||
summary += "\nYou gained {} {} level(s)! You are now level {}.".format(levels_gained, activity, new_level)
|
||||
|
||||
save_to_db(filepath, player_db)
|
||||
return player_db,summary
|
||||
|
||||
def add_resources(filepath,player_db,time_spent):
|
||||
|
||||
mining_resources = {
|
||||
"sapphire": {
|
||||
"value": 100,
|
||||
"amount": 0
|
||||
},
|
||||
"emerald": {
|
||||
"value": 250,
|
||||
"amount": 0
|
||||
},
|
||||
"ruby": {
|
||||
"value": 1000,
|
||||
"amount": 0
|
||||
},
|
||||
"diamond": {
|
||||
"value": 3000,
|
||||
"amount": 0
|
||||
}
|
||||
}
|
||||
|
||||
if player_db["status"]["current_activity"] == "mining":
|
||||
pick_power = player_db["equipment"]["pickaxe"]["power"]
|
||||
pick_level = player_db["equipment"]["pickaxe"]["level"]
|
||||
mining_level = player_db["skills"]["mining"]["level"]
|
||||
numerator = pick_power + pick_level + mining_level
|
||||
denominator = 1000
|
||||
items_gained = []
|
||||
time_summary = time.strftime("%H:%M:%S", time.gmtime(time_spent))
|
||||
|
||||
for second in range(0,time_spent):
|
||||
roll = random.randint(0,denominator)
|
||||
if roll <= numerator: #get a resource
|
||||
roll2 = random.randint(0,100)
|
||||
if roll2 <= 50:
|
||||
mining_resources["sapphire"]["amount"] += 1
|
||||
elif roll2 <=80:
|
||||
mining_resources["emerald"]["amount"] += 1
|
||||
elif roll2 <= 95:
|
||||
mining_resources["ruby"]["amount"] += 1
|
||||
else:
|
||||
mining_resources["diamond"]["amount"] += 1
|
||||
for item in mining_resources:
|
||||
mined_amount = mining_resources[item]["amount"]
|
||||
if item in player_db["items"]:
|
||||
player_db["items"][item]["amount"] += mined_amount
|
||||
items_gained.append(item.title())
|
||||
items_gained.append(mined_amount)
|
||||
else:
|
||||
player_db["items"][item] = mining_resources[item]
|
||||
items_gained.append(item.title())
|
||||
items_gained.append(mined_amount)
|
||||
|
||||
save_to_db(filepath, player_db)
|
||||
|
||||
summary = "You spent {} mining. You mined {} x{}, {} x{}, {} x{}, and {} x{}.".format(time_summary, *items_gained)
|
||||
|
||||
return player_db,summary
|
||||
|
||||
async def transfer_currency(filepath, player_db, player_id, amount):
|
||||
try:
|
||||
amount = int(amount)
|
||||
player2_filepath = "currency/players/" + str(player_id) + ".json"
|
||||
player2_db = read_db(player2_filepath)
|
||||
if player_db["currency"] >= amount:
|
||||
add_currency(filepath, -amount)
|
||||
add_currency(player2_filepath,amount)
|
||||
await ctx.send("Sent " + str(amount) + " sparks to " + str(player_id))
|
||||
except FileNotFoundError:
|
||||
await ctx.send("They don't seem to be playing the game.")
|
||||
|
||||
|
||||
async def show_levels(player_db):
|
||||
output = ''
|
||||
for skill in player_db["skills"]:
|
||||
output += skill + ': ' + str(player_db["skills"][skill]["level"]) + '\n'
|
||||
await ctx.send(output)
|
||||
|
||||
async def show_currency(player_db):
|
||||
output = 'Sparks: ' + str(player_db["currency"])
|
||||
await ctx.send(output)
|
||||
|
||||
async def show_items(player_db):
|
||||
output = ''
|
||||
for item in player_db["items"]:
|
||||
output += item + ': ' + str(player_db["items"][item]["amount"]) + '\n'
|
||||
await ctx.send(output)
|
||||
|
||||
|
||||
async def stop_activity(filepath,player_db):
|
||||
if player_db["status"]["current_activity"] == "idle":
|
||||
await ctx.send("You are currently idle. There is no activity to stop.")
|
||||
else:
|
||||
time_spent = int(time.time() - player_db["status"]["start_time"]) #integer in seconds
|
||||
player_db, xp_summary = add_xp(filepath,player_db,time_spent)
|
||||
player_db, resources_summary = add_resources(filepath,player_db,time_spent)
|
||||
await ctx.send(xp_summary)
|
||||
await ctx.send(resources_summary)
|
||||
player_db["status"]["current_activity"] = "idle"
|
||||
save_to_db(filepath,player_db)
|
||||
|
||||
async def claim(filepath, player_db):
|
||||
if time.time() - player_db["status"]["last_claimed"] >= 86400:
|
||||
player_db = add_currency(filepath, 100)
|
||||
player_db["status"]["last_claimed"] = time.time()
|
||||
save_to_db(filepath,player_db)
|
||||
await ctx.send("You claimed 100 sparks!")
|
||||
else:
|
||||
await ctx.send("Sorry, you already claimed your sparks today.")
|
||||
|
||||
async def mine(filepath, player_db):
|
||||
if player_db["status"]["current_activity"] == "idle":
|
||||
player_db["status"]["current_activity"] = "mining"
|
||||
player_db["status"]["start_time"] = time.time()
|
||||
save_to_db(filepath, player_db)
|
||||
await ctx.send("You start mining.")
|
||||
elif player_db["status"]["current_activity"] == "mining":
|
||||
await ctx.send("You are already mining!")
|
||||
else:
|
||||
await ctx.send("You must stop " + player_db["status"]["current_activity"] + " before you start mining!")
|
||||
|
||||
async def gamble(filepath, player_db):
|
||||
pass
|
||||
|
||||
working_dir = "databases/currency/"
|
||||
players_dir = "players/"
|
||||
sender_id = str(ctx.author.id)
|
||||
default_db = read_db("{0}{1}default.json".format(working_dir, players_dir))
|
||||
filepath = '{0}{1}{2}.json'.format(working_dir, players_dir, sender_id)
|
||||
|
||||
try:
|
||||
player_db = read_db(filepath)
|
||||
except FileNotFoundError:
|
||||
save_to_db(filepath,default_db)
|
||||
player_db = read_db(filepath)
|
||||
|
||||
if arg1 == "claim":
|
||||
await claim(filepath, player_db)
|
||||
player_db = read_db(filepath)
|
||||
elif arg1 == "stop":
|
||||
await stop_activity(filepath, player_db)
|
||||
player_db = read_db(filepath)
|
||||
elif arg1 == "mine":
|
||||
await mine(filepath, player_db)
|
||||
player_db = read_db(filepath)
|
||||
elif arg1 == "levels":
|
||||
await show_levels(player_db)
|
||||
elif arg1 == "items":
|
||||
await show_items(player_db)
|
||||
elif (arg1 == "send" or arg1 == "give") and arg2 and arg3:
|
||||
await transfer_currency(filepath, player_db, arg2, arg3)
|
||||
player_db = read_db(filepath)
|
||||
else:
|
||||
await show_currency(player_db)
|
||||
|
||||
async def setup(bot):
|
||||
bot.add_command(currency)
|
||||
185
extensions/essentials.py
Normal file
185
extensions/essentials.py
Normal file
|
|
@ -0,0 +1,185 @@
|
|||
#plugin for sparkytron 3000
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
|
||||
from discord.ext import commands
|
||||
|
||||
async def handle_error(error):
|
||||
print(error)
|
||||
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||||
log_line = current_time + ': ' + str(error) + '\n'
|
||||
with open("databases/error_log.txt", 'a') as f:
|
||||
f.write(log_line)
|
||||
return error
|
||||
|
||||
def create_channel_config(filepath):
|
||||
config_dict = {
|
||||
"personality":"average",
|
||||
"channel_topic":"casual",
|
||||
"chat_enabled":False,
|
||||
"commands_enabled":True,
|
||||
"chat_history_len":5,
|
||||
"look_at_images":False,
|
||||
"react_to_msgs":False,
|
||||
"ftp_enabled":False
|
||||
}
|
||||
|
||||
with open(filepath,"w") as f:
|
||||
json.dump(config_dict,f)
|
||||
print("Wrote config variables to file.")
|
||||
|
||||
async def get_channel_config(channel_id):
|
||||
filepath = "channels/config/{0}.json".format(str(channel_id))
|
||||
if not os.path.exists(filepath):
|
||||
create_channel_config(filepath)
|
||||
with open(filepath, "r") as f:
|
||||
config_dict = json.loads(f.readline())
|
||||
return config_dict
|
||||
|
||||
def edit_channel_config(channel_id, key, value):
|
||||
config_file = "channels/config/" + str(channel_id) + ".json"
|
||||
with open(config_file, 'r') as f:
|
||||
config_data = json.load(f)
|
||||
config_data[key] = value
|
||||
with open(config_file, "w") as f:
|
||||
json.dump(config_data, f)
|
||||
|
||||
@commands.command(
|
||||
description="View Images",
|
||||
help="Enable or disable bot viewing images in this channel. Usage !viewimages (enable|disable)",
|
||||
brief="Enable or disable bot viewing images"
|
||||
)
|
||||
async def view_images(ctx, message):
|
||||
if "enable" in message:
|
||||
edit_channel_config(ctx.channel.id, "look_at_images", True)
|
||||
await ctx.send("Viewing Enabled")
|
||||
elif "disable" in message:
|
||||
edit_channel_config(ctx.channel.id, "look_at_images", False)
|
||||
await ctx.send("Viewing Disabled")
|
||||
else:
|
||||
await ctx.send("Usage: !viewimages (enable|disable)")
|
||||
|
||||
|
||||
@commands.command(
|
||||
description="FTP",
|
||||
help="Enable or disable bot FTP to phixxy.com in this channel. Usage !ftp (enable|disable)",
|
||||
brief="Enable or disable uploading to web"
|
||||
)
|
||||
async def ftp(ctx, message):
|
||||
if "enable" in message:
|
||||
edit_channel_config(ctx.channel.id, "ftp_enabled", True)
|
||||
await ctx.send("FTP Enabled")
|
||||
elif "disable" in message:
|
||||
edit_channel_config(ctx.channel.id, "ftp_enabled", False)
|
||||
await ctx.send("FTP Disabled")
|
||||
else:
|
||||
await ctx.send("Usage: !ftp (enable|disable)")
|
||||
|
||||
@commands.command(
|
||||
description="Personality",
|
||||
help="Set the personality of the bot. Usage: !personality (personality)",
|
||||
brief="Set the personality"
|
||||
)
|
||||
async def personality(ctx):
|
||||
personality_type = ctx.message.content.split(" ", maxsplit=1)[1]
|
||||
edit_channel_config(ctx.channel.id, "personality", personality_type)
|
||||
await ctx.send("Personality changed to " + personality_type)
|
||||
|
||||
@commands.command(
|
||||
description="Commands",
|
||||
help="Enable or disable bot commands in this channel. Usage !enable_commands (enable|disable)",
|
||||
brief="Enable or disable bot commands"
|
||||
)
|
||||
async def enable_commands(ctx, message):
|
||||
if "disable" in message or "false" in message:
|
||||
edit_channel_config(ctx.channel.id, "commands_enabled", False)
|
||||
await ctx.send("Commands Disabled")
|
||||
else:
|
||||
edit_channel_config(ctx.channel.id, "commands_enabled", True)
|
||||
await ctx.send("Commands Enabled")
|
||||
|
||||
@commands.command(
|
||||
description="Topic",
|
||||
help="Set the channel topic for the bot. Usage: !topic (topic)",
|
||||
brief="Set channel topic"
|
||||
)
|
||||
async def topic(ctx, channel_topic):
|
||||
edit_channel_config(ctx.channel.id, "channel_topic", channel_topic)
|
||||
await ctx.send("Topic changed to " + channel_topic)
|
||||
|
||||
@commands.command(
|
||||
description="Chat",
|
||||
help="Enable or disable bot chat in this channel. Usage !chat (enable|disable)",
|
||||
brief="Enable or disable bot chat"
|
||||
)
|
||||
async def chat(ctx, message):
|
||||
if "enable" in message:
|
||||
edit_channel_config(ctx.channel.id, "chat_enabled", True)
|
||||
await ctx.send("Chat Enabled")
|
||||
elif "disable" in message:
|
||||
edit_channel_config(ctx.channel.id, "chat_enabled", False)
|
||||
await ctx.send("Chat Disabled")
|
||||
else:
|
||||
await ctx.send("Usage: !chat (enable|disable)")
|
||||
|
||||
@commands.command(
|
||||
description="Reactions",
|
||||
help="Enable or disable bot reactions in this channel. Usage !reactions (enable|disable)",
|
||||
brief="Enable or disable bot reactions"
|
||||
)
|
||||
async def reactions(ctx, message):
|
||||
if "enable" in message:
|
||||
edit_channel_config(ctx.channel.id, "react_to_msgs", True)
|
||||
await ctx.send("Reactions Enabled")
|
||||
elif "disable" in message:
|
||||
edit_channel_config(ctx.channel.id, "react_to_msgs", False)
|
||||
await ctx.send("Reactions Disabled")
|
||||
else:
|
||||
await ctx.send("Usage: !reactions (enable|disable)")
|
||||
|
||||
@commands.command(
|
||||
description="Feature",
|
||||
help="Suggest a feature. Usage: !feature (feature)",
|
||||
brief="Suggest a feature"
|
||||
)
|
||||
async def feature(ctx):
|
||||
try:
|
||||
feature = ctx.message.content.split(" ", maxsplit=1)[1]
|
||||
with open("features.txt",'a') as f:
|
||||
f.writelines('\n' + feature)
|
||||
await ctx.send("Added " + feature)
|
||||
except Exception as error:
|
||||
await handle_error(error)
|
||||
|
||||
with open("features.txt",'r') as f:
|
||||
features = f.read()
|
||||
await ctx.send(features)
|
||||
|
||||
@commands.command(
|
||||
description="Errors",
|
||||
help="Shows the last errors that were logged.",
|
||||
brief="Display Errors"
|
||||
)
|
||||
async def errors(ctx, amount="5"):
|
||||
output = ""
|
||||
amount = int(amount)
|
||||
try:
|
||||
with open("databases/error_log.txt", 'r') as f:
|
||||
for line in (f.readlines() [-amount:]):
|
||||
output += line
|
||||
await ctx.send(output)
|
||||
except Exception as error:
|
||||
await handle_error(error)
|
||||
|
||||
|
||||
async def setup(bot):
|
||||
bot.add_command(feature)
|
||||
bot.add_command(reactions)
|
||||
bot.add_command(chat)
|
||||
bot.add_command(topic)
|
||||
bot.add_command(enable_commands)
|
||||
bot.add_command(personality)
|
||||
bot.add_command(ftp)
|
||||
bot.add_command(view_images)
|
||||
bot.add_command(errors)
|
||||
129
extensions/highscores.py
Normal file
129
extensions/highscores.py
Normal file
|
|
@ -0,0 +1,129 @@
|
|||
#plugin to show message count as a graph
|
||||
import os
|
||||
import time
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
import discord
|
||||
from discord.ext import commands
|
||||
|
||||
async def handle_error(error):
|
||||
print(error)
|
||||
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||||
log_line = current_time + ': ' + str(error) + '\n'
|
||||
with open("databases/error_log.txt", 'a') as f:
|
||||
f.write(log_line)
|
||||
return error
|
||||
|
||||
@commands.command(
|
||||
description="Highscores",
|
||||
help="Shows a bar graph of users in this channel and how many messages they have sent.",
|
||||
brief="Display chat highscores"
|
||||
)
|
||||
async def highscores(ctx, limit=0):
|
||||
filename = str(ctx.channel.id) + ".log"
|
||||
with open("channels/logs/" + filename, 'r', encoding="utf-8") as logfile:
|
||||
data = logfile.readlines()
|
||||
logfile.close()
|
||||
|
||||
def is_username(user):
|
||||
for character in user:
|
||||
if character.isupper():
|
||||
return False
|
||||
if not (character.isalpha() or character.isdigit() or character == '.' or character == '_'):
|
||||
return False
|
||||
return True
|
||||
|
||||
user_message_counts = {}
|
||||
for line in data:
|
||||
try:
|
||||
user = line[0:line.find(':')]
|
||||
if is_username(user):
|
||||
if user not in user_message_counts and user != "" and len(user) <= 32:
|
||||
user_message_counts[user] = 1
|
||||
else:
|
||||
user_message_counts[user] += 1
|
||||
except Exception as error:
|
||||
await handle_error(error)
|
||||
|
||||
def remove_dict_keys_if_less_than_x(dictionary,x):
|
||||
for key in dictionary:
|
||||
if dictionary[key] <= x:
|
||||
dictionary.pop(key)
|
||||
return remove_dict_keys_if_less_than_x(dictionary,x)
|
||||
return dictionary
|
||||
|
||||
print(user_message_counts)
|
||||
remove_dict_keys_if_less_than_x(user_message_counts,limit)
|
||||
keys = list(user_message_counts.keys())
|
||||
values = list(user_message_counts.values())
|
||||
fig, ax = plt.subplots()
|
||||
bar_container = ax.barh(keys, values)
|
||||
ax.set_xlabel("Message Count")
|
||||
ax.set_ylabel("Username")
|
||||
ax.set_title("Messages Sent in " + ctx.channel.name)
|
||||
ax.bar_label(bar_container, label_type='center')
|
||||
plt.savefig(str(ctx.channel.id) + '_hiscores.png', dpi=1000, bbox_inches="tight")
|
||||
with open(str(ctx.channel.id) + '_hiscores.png', "rb") as fh:
|
||||
f = discord.File(fh, filename=str(ctx.channel.id) + '_hiscores.png')
|
||||
await ctx.send(file=f)
|
||||
|
||||
|
||||
@commands.command(
|
||||
description="Highscores Server",
|
||||
help="Shows a bar graph of users across all servers I am in and how many messages they have sent.",
|
||||
brief="Display chat highscores"
|
||||
)
|
||||
async def highscores_server(ctx, limit=0):
|
||||
|
||||
def remove_dict_keys_if_less_than_x(dictionary,x):
|
||||
for key in dictionary:
|
||||
if dictionary[key] <= x:
|
||||
dictionary.pop(key)
|
||||
return remove_dict_keys_if_less_than_x(dictionary,x)
|
||||
return dictionary
|
||||
|
||||
def is_username(user):
|
||||
for character in user:
|
||||
if character.isupper():
|
||||
return False
|
||||
if not (character.isalpha() or character.isdigit() or character == '.' or character == '_'):
|
||||
return False
|
||||
return True
|
||||
|
||||
user_message_counts = {}
|
||||
data = []
|
||||
for filename in os.listdir("channels/logs/"):
|
||||
with open("channels/logs/" + filename, 'r', encoding="utf-8") as logfile:
|
||||
data += logfile.readlines()
|
||||
logfile.close()
|
||||
user_message_counts = {}
|
||||
for line in data:
|
||||
try:
|
||||
user = line[0:line.find(':')]
|
||||
if is_username(user):
|
||||
if user not in user_message_counts and user != "" and len(user) <= 32:
|
||||
user_message_counts[user] = 1
|
||||
else:
|
||||
user_message_counts[user] += 1
|
||||
except Exception as error:
|
||||
await handle_error(error)
|
||||
|
||||
print(user_message_counts)
|
||||
print("printed")
|
||||
user_message_counts = remove_dict_keys_if_less_than_x(user_message_counts,limit)
|
||||
keys = list(user_message_counts.keys())
|
||||
values = list(user_message_counts.values())
|
||||
fig, ax = plt.subplots()
|
||||
bar_container = ax.barh(keys, values)
|
||||
ax.set_xlabel("Message Count")
|
||||
ax.set_ylabel("Username")
|
||||
ax.set_title("Messages Sent in all channels I can see")
|
||||
ax.bar_label(bar_container, label_type='center')
|
||||
plt.savefig(str(ctx.channel.id) + '_hiscores.png', dpi=1000, bbox_inches="tight")
|
||||
with open(str(ctx.channel.id) + '_hiscores.png', "rb") as fh:
|
||||
f = discord.File(fh, filename=str(ctx.channel.id) + '_hiscores.png')
|
||||
await ctx.send(file=f)
|
||||
|
||||
async def setup(bot):
|
||||
bot.add_command(highscores)
|
||||
bot.add_command(highscores_server)
|
||||
122
extensions/meme.py
Normal file
122
extensions/meme.py
Normal file
|
|
@ -0,0 +1,122 @@
|
|||
#plugin for sparkytron3000
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
import aiohttp
|
||||
|
||||
from discord.ext import commands
|
||||
|
||||
async def answer_question(topic, model="gpt-3.5-turbo"): # Only needed for draw command
|
||||
headers = {
|
||||
'Content-Type': 'application/json',
|
||||
'Authorization': f'Bearer {os.getenv("openai.api_key")}',
|
||||
}
|
||||
|
||||
data = {
|
||||
"model": model,
|
||||
"messages": [{"role": "user", "content": topic}]
|
||||
}
|
||||
|
||||
url = "https://api.openai.com/v1/chat/completions"
|
||||
|
||||
try:
|
||||
http_session = aiohttp.ClientSession()
|
||||
async with http_session.post(url, headers=headers, json=data) as resp:
|
||||
response_data = await resp.json()
|
||||
response = response_data['choices'][0]['message']['content']
|
||||
await http_session.close()
|
||||
return response
|
||||
|
||||
except Exception as error:
|
||||
return await handle_error(error)
|
||||
|
||||
async def handle_error(error):
|
||||
print(error)
|
||||
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||||
log_line = current_time + ': ' + str(error) + '\n'
|
||||
with open("databases/error_log.txt", 'a') as f:
|
||||
f.write(log_line)
|
||||
return error
|
||||
|
||||
@commands.command(
|
||||
description="Meme",
|
||||
help="Generates a meme based on input. Usage: !meme (topic)",
|
||||
brief="Generate a meme"
|
||||
)
|
||||
async def meme(ctx):
|
||||
async def generate_random_meme(topic):
|
||||
http_session = aiohttp.ClientSession()
|
||||
async with http_session.get('https://api.imgflip.com/get_memes') as resp:
|
||||
response_data = await resp.json()
|
||||
response = response_data['data']['memes']
|
||||
memepics = [{'name':image['name'],'url':image['url'],'id':image['id']} for image in response]
|
||||
|
||||
#Pick a meme format
|
||||
memenumber = random.randint(1,99)
|
||||
meme_name = response[memenumber-1]['name']
|
||||
panel_count = response[memenumber-1]['box_count']
|
||||
print("panel_count ",panel_count)
|
||||
panel_text = await answer_question("Create text for a meme. The meme is " + meme_name + ". It has " + str(panel_count) + " panels. Only create one meme. Do not use emojis or hashtags! Use the topic: " + topic + ". Use the output format (DO NOT USE EXTRA NEWLINES AND DO NOT DESCRIBE THE PICTURE IN YOUR OUTPUT): \n1: [panel 1 text]\n2: [panel 2 text]")
|
||||
|
||||
id = memenumber
|
||||
imgflip_username = os.getenv('imgflip_username')
|
||||
imgflip_password = os.getenv('imgflip_password')
|
||||
params = {
|
||||
'username':imgflip_username,
|
||||
'password':imgflip_password,
|
||||
'template_id':memepics[id-1]['id']
|
||||
}
|
||||
boxes = []
|
||||
text = panel_text.split('\n')
|
||||
for x in range(len(text)):
|
||||
if text[x].strip() != "":
|
||||
item = text[x][3:]
|
||||
if len(params)-3 < panel_count:
|
||||
dictionary = {"text":item, "color": "#ffffff", "outline_color": "#000000"}
|
||||
boxes.append(dictionary)
|
||||
|
||||
for i, box in enumerate(boxes):
|
||||
params[f"boxes[{i}][text]"] = box["text"]
|
||||
params[f"boxes[{i}][color]"] = box["color"]
|
||||
params[f"boxes[{i}][outline_color]"] = box["outline_color"]
|
||||
|
||||
URL = 'https://api.imgflip.com/caption_image'
|
||||
|
||||
try:
|
||||
#http_session = aiohttp.ClientSession()
|
||||
async with http_session.post(URL, params=params) as resp:
|
||||
response = await resp.json()
|
||||
print(f"Generated Meme = {response['success']}\nImage Link = {response['data']['url']}\nPage Link = {response['data']['page_url']}")
|
||||
image_link = response['data']['url']
|
||||
except Exception as error:
|
||||
await handle_error(error)
|
||||
try:
|
||||
#------------------------------------Saving Image Using Aiohttp---------------------------------#
|
||||
filename = memepics[id-1]['name']
|
||||
async with http_session.get(image_link) as response:
|
||||
folder = "tmp/meme/"
|
||||
filename = folder + topic + str(len(os.listdir(folder))) + ".jpg"
|
||||
|
||||
with open(filename, "wb") as file:
|
||||
while True:
|
||||
chunk = await response.content.read(1024) # Read the response in chunks
|
||||
if not chunk:
|
||||
break
|
||||
file.write(chunk)
|
||||
except Exception as error:
|
||||
await handle_error(error)
|
||||
print("Something's Wrong with the urllib So try again")
|
||||
await http_session.close()
|
||||
return image_link, filename
|
||||
|
||||
try:
|
||||
topic = ctx.message.content.split(" ", maxsplit=1)[1]
|
||||
await ctx.send(f'Generating {topic} meme')
|
||||
link, filepath = await generate_random_meme(topic)
|
||||
await ctx.send(link)
|
||||
except Exception as error:
|
||||
await handle_error(error)
|
||||
await ctx.send('Something went wrong try again. Usage: !meme (topic)')
|
||||
|
||||
async def setup(bot):
|
||||
bot.add_command(meme)
|
||||
274
extensions/pokemon.py
Normal file
274
extensions/pokemon.py
Normal file
|
|
@ -0,0 +1,274 @@
|
|||
#plugin file for sparkytron 3000
|
||||
|
||||
from discord.ext import commands
|
||||
import discord
|
||||
import random
|
||||
import os
|
||||
import json
|
||||
import math
|
||||
import time
|
||||
import aiohttp
|
||||
|
||||
|
||||
|
||||
async def get_json(url):
|
||||
http_session = aiohttp.ClientSession()
|
||||
async with http_session.get(url) as resp:
|
||||
json_data = await resp.json()
|
||||
return json_data
|
||||
|
||||
@commands.command(
|
||||
description="Pokemon",
|
||||
help="Pokemon game",
|
||||
brief="Pokemon Game",
|
||||
aliases=['pkmn'],
|
||||
hidden=True
|
||||
)
|
||||
async def pokemon(ctx, arg1=None, arg2=None, arg3=None, arg4=None):
|
||||
async def starter_picker(id): #id = pokedex number
|
||||
url = "https://pokeapi.co/api/v2/pokemon-species/" + str(id)
|
||||
json_data = await get_json(url)
|
||||
if (json_data["evolves_from_species"] == None) and (not json_data['is_mythical']) and (not json_data['is_legendary']):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
async def shiny_roll():
|
||||
roll = random.randint(0,2047)
|
||||
return not roll
|
||||
|
||||
async def save_pokemon(discord_id, pokemon_dict):
|
||||
if not os.path.isdir("databases/pokemon/"):
|
||||
os.makedirs("databases/pokemon/")
|
||||
|
||||
path = "databases/pokemon/"+str(discord_id)+".json"
|
||||
pokemon_dict = json.dumps(pokemon_dict)
|
||||
with open(path, 'w') as f:
|
||||
f.writelines(pokemon_dict)
|
||||
return True
|
||||
|
||||
async def load_pokemon(discord_id):
|
||||
if not os.path.isdir("databases/pokemon/"):
|
||||
os.makedirs("databases/pokemon/")
|
||||
if os.path.isfile("databases/pokemon/"+str(discord_id)+".json"):
|
||||
with open("databases/pokemon/"+str(discord_id)+".json", 'r') as f:
|
||||
json_data = json.loads(f.readline())
|
||||
return json_data
|
||||
else:
|
||||
return False
|
||||
|
||||
async def generate_starter(discord_id):
|
||||
random.seed(discord_id)
|
||||
json_data = await get_json('https://pokeapi.co/api/v2/pokemon-species/')
|
||||
pokemon_count = json_data['count']
|
||||
base_pokemon = False
|
||||
while not base_pokemon:
|
||||
starter_id = random.randint(1,pokemon_count)
|
||||
base_pokemon = await starter_picker(starter_id)
|
||||
random.seed()
|
||||
return starter_id
|
||||
|
||||
async def get_pkmn_from_id(id):
|
||||
url = 'https://pokeapi.co/api/v2/pokemon/' + str(id)
|
||||
json_data = await get_json(url)
|
||||
return json_data
|
||||
|
||||
async def give_buddy_food(pkmn_data):
|
||||
try:
|
||||
last_food = pkmn_data['last_food']
|
||||
except:
|
||||
last_food = 0
|
||||
this_food = time.time()
|
||||
if (this_food - last_food) >= 1800:
|
||||
pkmn_data['last_food'] = this_food
|
||||
level = await calc_pkmn_buddy_level(pkmn_data)
|
||||
pkmn_data['buddy_xp'] += (4*level)
|
||||
return pkmn_data, True
|
||||
else:
|
||||
return pkmn_data, False
|
||||
|
||||
async def give_buddy_affection(pkmn_data):
|
||||
try:
|
||||
last_hug = pkmn_data['last_hug']
|
||||
except:
|
||||
last_hug = 0
|
||||
this_hug = time.time()
|
||||
if (this_hug - last_hug) >= 600:
|
||||
pkmn_data['last_hug'] = this_hug
|
||||
level = await calc_pkmn_buddy_level(pkmn_data)
|
||||
pkmn_data['buddy_xp'] += (3*level)
|
||||
return pkmn_data, True
|
||||
else:
|
||||
return pkmn_data, False
|
||||
|
||||
async def calc_pkmn_buddy_level(pkmn_json): #this uses the 'fast' xp rate
|
||||
buddy_xp = pkmn_json['buddy_xp']
|
||||
return min(math.floor(((5*buddy_xp)/4)**(1/3)),100)
|
||||
|
||||
async def make_pmkn_embed(pkmn_dict):
|
||||
if pkmn_dict['nickname']:
|
||||
title = pkmn_dict['nickname'] + ' (' + pkmn_dict['name'].capitalize() + ')'
|
||||
else:
|
||||
title = pkmn_dict['name'].capitalize()
|
||||
embed=discord.Embed(title=title)
|
||||
if pkmn_dict['shiny']:
|
||||
embed.set_image(url=pkmn_dict['sprites']['front_shiny'])
|
||||
else:
|
||||
embed.set_image(url=pkmn_dict['sprites']['front_default'])
|
||||
nature = pkmn_dict['nature']
|
||||
buddy_level = await calc_pkmn_buddy_level(pkmn_dict)
|
||||
buddy_xp = pkmn_dict['buddy_xp']
|
||||
types = []
|
||||
for key in pkmn_dict['types']:
|
||||
types.append(key['type']['name'].capitalize())
|
||||
type_str = ', '.join(types)
|
||||
embed.add_field(name="Nature", value=nature.capitalize(), inline=False)
|
||||
embed.add_field(name="Buddy Level", value=buddy_level , inline=True)
|
||||
embed.add_field(name="Buddy XP", value=buddy_xp, inline=True)
|
||||
embed.add_field(name="Types", value=type_str, inline=False)
|
||||
return embed
|
||||
|
||||
if arg1=='start':
|
||||
if not os.path.isdir("databases/pokemon/"):
|
||||
os.makedirs("databases/pokemon/")
|
||||
if not os.path.isfile("databases/pokemon/"+str(ctx.author.id)+'.json'):
|
||||
uniq_id = time.time()
|
||||
starter_id = await generate_starter(ctx.author.id)
|
||||
json_data = await get_pkmn_from_id(starter_id)
|
||||
is_shiny = await shiny_roll()
|
||||
nature = random.randint(0,19)
|
||||
nature_data = await get_json('https://pokeapi.co/api/v2/nature/')
|
||||
nature = nature_data['results'][nature]['name']
|
||||
json_data['shiny'] = is_shiny
|
||||
json_data['nickname'] = None
|
||||
json_data['unique_id'] = uniq_id
|
||||
json_data['nature'] = nature
|
||||
json_data['buddy_level'] = 1
|
||||
json_data['buddy_xp'] = 0
|
||||
json_data['last_food'] = 0
|
||||
json_data['last_hug'] = 0
|
||||
await save_pokemon(ctx.author.id, json_data)
|
||||
embed = await make_pmkn_embed(json_data)
|
||||
await ctx.channel.send(embed=embed)
|
||||
return
|
||||
else:
|
||||
await ctx.channel.send("You already have a pokemon!")
|
||||
return
|
||||
|
||||
elif arg1 == 'nick' or arg1 == 'nickname':
|
||||
nickname = arg2
|
||||
json_data = await load_pokemon(ctx.author.id)
|
||||
json_data['nickname'] = nickname
|
||||
await save_pokemon(ctx.author.id, json_data)
|
||||
message = "You gave " + nickname + ' a new name!'
|
||||
await ctx.channel.send(message)
|
||||
return
|
||||
|
||||
elif arg1 == 'feed':
|
||||
json_data = await load_pokemon(ctx.author.id)
|
||||
json_data, fed = await give_buddy_food(json_data)
|
||||
if fed:
|
||||
await save_pokemon(ctx.author.id, json_data)
|
||||
if json_data['nickname']:
|
||||
message = "You " + arg1 + ' ' + json_data['nickname']
|
||||
else:
|
||||
message = "You " + arg1 + ' ' + json_data['name']
|
||||
await ctx.channel.send(message)
|
||||
return
|
||||
else:
|
||||
if json_data['nickname']:
|
||||
message = "Your " + json_data['nickname'] + " isn't hungry!"
|
||||
else:
|
||||
message = "Your " + json_data['name'] + " isn't hungry!"
|
||||
await ctx.channel.send(message)
|
||||
return
|
||||
|
||||
elif arg1 == 'hug':
|
||||
json_data = await load_pokemon(ctx.author.id)
|
||||
json_data, hugged = await give_buddy_affection(json_data)
|
||||
if hugged:
|
||||
await save_pokemon(ctx.author.id, json_data)
|
||||
if json_data['nickname']:
|
||||
message = "You " + arg1 + ' ' + json_data['nickname']
|
||||
else:
|
||||
message = "You " + arg1 + ' ' + json_data['name']
|
||||
await ctx.channel.send(message)
|
||||
return
|
||||
else:
|
||||
if json_data['nickname']:
|
||||
message = "You hugged " + json_data['nickname'] + " but " + json_data['nickname'] + " has been hugged recently."
|
||||
else:
|
||||
message = "You hugged " + json_data['name'] + " but " + json_data['name'] + " has been hugged recently."
|
||||
await ctx.channel.send(message)
|
||||
return
|
||||
|
||||
#Default !pokemon behavior (Load and show pokemon embed)
|
||||
discord_id = ctx.author.id
|
||||
buddy_json = await load_pokemon(discord_id)
|
||||
if not buddy_json:
|
||||
await ctx.channel.send("You don't have a buddy yet. Type ```!pokemon start``` to start your Pokemon journey!")
|
||||
else:
|
||||
embed = await make_pmkn_embed(buddy_json)
|
||||
message = await ctx.channel.send(embed=embed)
|
||||
return
|
||||
|
||||
@commands.command(
|
||||
description="Pokedex",
|
||||
help="Get information on pokemon",
|
||||
brief="Pokedex",
|
||||
aliases=['pdex'],
|
||||
hidden=False
|
||||
)
|
||||
async def pokedex(ctx):
|
||||
pokemon = ctx.message.content.split(" ", maxsplit=1)[1]
|
||||
try:
|
||||
shiny = False
|
||||
if 'shiny ' in pokemon:
|
||||
shiny = True
|
||||
pokemon = pokemon.replace('shiny ', '')
|
||||
url = "https://pokeapi.co/api/v2/pokemon/" + pokemon
|
||||
dex_url = "https://pokeapi.co/api/v2/pokemon-species/" + pokemon
|
||||
#try:
|
||||
data = await get_json(url)
|
||||
name = data['name']
|
||||
height_str = str(int(data['height'])/10) + 'm'
|
||||
weight_str = str(int(data['weight'])/10) + 'kg'
|
||||
type1 = data['types'][0]['type']['name']
|
||||
try:
|
||||
type2 = data['types'][1]['type']['name']
|
||||
type_str = type1.capitalize() + ', ' + type2.capitalize()
|
||||
except:
|
||||
type2 = "None"
|
||||
type_str = type1.capitalize()
|
||||
sprite = data["sprites"]["front_default"]
|
||||
if shiny:
|
||||
sprite = data["sprites"]["front_shiny"]
|
||||
dex_data = await get_json(dex_url)
|
||||
generation = dex_data['generation']['name'].upper().replace("GENERATION","Generation")
|
||||
for entry in dex_data['flavor_text_entries']:
|
||||
if entry['language']['name'] == 'en':
|
||||
dex_desc = entry['flavor_text'].replace("\u000c", '\n')
|
||||
dex_desc_game = entry['version']['name'].capitalize()
|
||||
break
|
||||
for entry in dex_data['genera']:
|
||||
if entry['language']['name'] == 'en':
|
||||
genus = entry['genus']
|
||||
break
|
||||
footer = generation + ' | Pokédex entry from Pokémon ' + dex_desc_game
|
||||
dex_num = dex_data['pokedex_numbers'][0]['entry_number']
|
||||
embed=discord.Embed(title=name.capitalize())
|
||||
embed.set_image(url=sprite)
|
||||
embed.add_field(name="Number", value=dex_num, inline=False)
|
||||
embed.add_field(name=genus, value=dex_desc, inline=False)
|
||||
embed.add_field(name="Weight", value=weight_str , inline=True)
|
||||
embed.add_field(name="Height", value=height_str, inline=True)
|
||||
embed.add_field(name="Types", value=type_str, inline=True)
|
||||
embed.set_footer(text=footer)
|
||||
await ctx.send(embed=embed)
|
||||
except:
|
||||
message = "No data for " + str(pokemon)
|
||||
await ctx.channel.send(message)
|
||||
|
||||
async def setup(bot):
|
||||
bot.add_command(pokedex)
|
||||
bot.add_command(pokemon)
|
||||
52
extensions/roll_poll.py
Normal file
52
extensions/roll_poll.py
Normal file
|
|
@ -0,0 +1,52 @@
|
|||
import random
|
||||
import discord
|
||||
from discord.ext import commands
|
||||
|
||||
@commands.command(
|
||||
description="Poll",
|
||||
help='Create a poll with up to 9 options. Usage: !poll "Put question here" "option 1" "option 2"',
|
||||
brief="Enable or disable bot reactions"
|
||||
)
|
||||
async def poll(ctx, question, *options: str):
|
||||
if len(options) > 9:
|
||||
await ctx.send("Error: You cannot have more than 9 options")
|
||||
return
|
||||
|
||||
embed = discord.Embed(title=question, colour=discord.Colour(0x283593))
|
||||
for i, option in enumerate(options):
|
||||
embed.add_field(name=f"Option {i+1}", value=option, inline=False)
|
||||
|
||||
message = await ctx.send(embed=embed)
|
||||
numbers = {0: "\u0030\ufe0f\u20e3", 1: "\u0031\ufe0f\u20e3", 2: "\u0032\ufe0f\u20e3", 3: "\u0033\ufe0f\u20e3", 4: "\u0034\ufe0f\u20e3", 5: "\u0035\ufe0f\u20e3", 6: "\u0036\ufe0f\u20e3", 7: "\u0037\ufe0f\u20e3", 8: "\u0038\ufe0f\u20e3", 9: "\u0039\ufe0f\u20e3"}
|
||||
for i in range(len(options)):
|
||||
await message.add_reaction(numbers.get(i+1))
|
||||
|
||||
@commands.command(
|
||||
description="Roll",
|
||||
help="Rolls dice mostly for Dungeons and Dragons type games. Usage: !roll 3d6+2",
|
||||
brief="Simulate rolling dice"
|
||||
)
|
||||
async def roll(ctx, dice_string):
|
||||
dice_parts = dice_string.split('d')
|
||||
num_dice = int(dice_parts[0])
|
||||
if '+' in dice_parts[1]:
|
||||
die_parts = dice_parts[1].split('+')
|
||||
die_size = int(die_parts[0])
|
||||
modifier = int(die_parts[1])
|
||||
elif '-' in dice_parts[1]:
|
||||
die_parts = dice_parts[1].split('-')
|
||||
die_size = int(die_parts[0])
|
||||
modifier = -int(die_parts[1])
|
||||
else:
|
||||
die_size = int(dice_parts[1])
|
||||
modifier = 0
|
||||
|
||||
rolls = [random.randint(1, die_size) for i in range(num_dice)]
|
||||
dice_str = ' + '.join([str(roll) for roll in rolls])
|
||||
total = sum(rolls) + modifier
|
||||
|
||||
await ctx.send(f'{dice_str} + {modifier} = {total}' if modifier != 0 else f'{dice_str} = {total}')
|
||||
|
||||
async def setup(bot):
|
||||
bot.add_command(roll)
|
||||
bot.add_command(poll)
|
||||
31
extensions/runescape.py
Normal file
31
extensions/runescape.py
Normal file
|
|
@ -0,0 +1,31 @@
|
|||
#sparkytron3000 plugin
|
||||
from discord.ext import commands
|
||||
|
||||
@commands.command(
|
||||
description="RSGP",
|
||||
help="Uses probably outdated information to calculate how much rsgp is worth in usd. Usage: !rsgp (amount)",
|
||||
brief="Runescape gold to usd"
|
||||
)
|
||||
async def rsgp(ctx, amount):
|
||||
output = ""
|
||||
cost_per_bil = 25.50 #1b rsgp to usd
|
||||
cost_per_bil_os = 210
|
||||
gold_per_bond = 70000000
|
||||
gold_per_bond_os = 7000000
|
||||
cost_per_bond = 8 #dollars usd
|
||||
bondcost = (int(amount)/gold_per_bond) * cost_per_bond
|
||||
rwtcost = (int(amount) * cost_per_bil / 1000000000)
|
||||
dollar_gp = (int(amount)*1000000000)/cost_per_bil
|
||||
osbondcost = (int(amount)/gold_per_bond_os) * cost_per_bond
|
||||
osrwtcost = (int(amount) * cost_per_bil_os / 1000000000)
|
||||
osdollar_gp = (int(amount)*1000000000)/cost_per_bil_os
|
||||
output += str(amount) + ' rs3 gp would cost: $' + str(round(rwtcost,2)) + " (RWT)\n"
|
||||
output += str(amount) + ' osrs gp would cost: $' + str(round(osrwtcost,2)) + " (RWT)\n"
|
||||
output += str(amount) + ' rs3 gp would cost: $' + str(round(bondcost,2)) + " (Bonds)\n"
|
||||
output += str(amount) + ' osrs gp would cost: $' + str(round(osbondcost,2)) + " (Bonds)\n"
|
||||
output += str(amount) + ' dollars spent on rs3 gp would be: ' + str(round(dollar_gp,2)) + " (RS3 GP)\n"
|
||||
output += str(amount) + ' dollars spent on osrs gp would be: ' + str(round(osdollar_gp,2)) + " (OSRS GP)\n"
|
||||
await ctx.send(output)
|
||||
|
||||
async def setup(bot):
|
||||
bot.add_command(rsgp)
|
||||
324
extensions/sparkytron_openai.py
Normal file
324
extensions/sparkytron_openai.py
Normal file
|
|
@ -0,0 +1,324 @@
|
|||
#sparkytron 3000 plugin
|
||||
import os
|
||||
import time
|
||||
from PIL import Image, PngImagePlugin
|
||||
import io
|
||||
import base64
|
||||
|
||||
import aiohttp
|
||||
import asyncssh
|
||||
from discord.ext import commands, tasks
|
||||
|
||||
async def upload_sftp(local_filename, server_folder, server_filename):
|
||||
remotepath = server_folder + server_filename
|
||||
async with asyncssh.connect(os.getenv('ftp_server'), username=os.getenv('ftp_username'), password=os.getenv('ftp_password')) as conn:
|
||||
async with conn.start_sftp_client() as sftp:
|
||||
await sftp.put(local_filename, remotepath=remotepath)
|
||||
|
||||
async def handle_error(error):
|
||||
print(error)
|
||||
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||||
log_line = current_time + ': ' + str(error) + '\n'
|
||||
with open("databases/error_log.txt", 'a') as f:
|
||||
f.write(log_line)
|
||||
return error
|
||||
|
||||
async def answer_question(topic, model="gpt-3.5-turbo"):
|
||||
headers = {
|
||||
'Content-Type': 'application/json',
|
||||
'Authorization': f'Bearer {os.getenv("openai.api_key")}',
|
||||
}
|
||||
|
||||
data = {
|
||||
"model": model,
|
||||
"messages": [{"role": "user", "content": topic}]
|
||||
}
|
||||
|
||||
url = "https://api.openai.com/v1/chat/completions"
|
||||
|
||||
try:
|
||||
http_session = aiohttp.ClientSession()
|
||||
async with http_session.post(url, headers=headers, json=data) as resp:
|
||||
response_data = await resp.json()
|
||||
response = response_data['choices'][0]['message']['content']
|
||||
await http_session.close()
|
||||
return response
|
||||
|
||||
except Exception as error:
|
||||
return await handle_error(error)
|
||||
|
||||
@commands.command(
|
||||
description="Blog",
|
||||
help="Adds your topic to the list of possible future blog topics. Usage: !suggest_blog (topic)",
|
||||
brief="Suggest a blog topic"
|
||||
)
|
||||
async def blog(ctx, *args):
|
||||
message = ' '.join(args)
|
||||
if '\n' in message:
|
||||
await ctx.send("Send only one topic at a time.")
|
||||
return
|
||||
else:
|
||||
blogpost_file = "databases/blog_topics.txt"
|
||||
with open(blogpost_file, 'a') as f:
|
||||
f.writelines(message+'\n')
|
||||
await ctx.send("Saved suggestion!")
|
||||
|
||||
@commands.command()
|
||||
async def generate_blog(ctx):
|
||||
start_time = time.time()
|
||||
topic = ''
|
||||
filename = "phixxy.com/ai-blog/index.html"
|
||||
with open(filename, 'r', encoding="utf-8") as f:
|
||||
html_data = f.read()
|
||||
current_time = time.time()
|
||||
current_struct_time = time.localtime(current_time)
|
||||
date = time.strftime("%B %d, %Y", current_struct_time)
|
||||
if date in html_data:
|
||||
print("I already wrote a blog post today!")
|
||||
return
|
||||
blogpost_file = "databases/blog_topics.txt"
|
||||
#blog_subscribers = ["276197608735637505","242018983241318410"]
|
||||
if os.path.isfile(blogpost_file):
|
||||
with open(blogpost_file, 'r') as f:
|
||||
blogpost_topics = f.read()
|
||||
f.seek(0)
|
||||
topic = f.readline()
|
||||
blogpost_topics = blogpost_topics.replace(topic, '')
|
||||
with open(blogpost_file, 'w') as f:
|
||||
f.write(blogpost_topics)
|
||||
if topic != '':
|
||||
print("Writing blogpost")
|
||||
else:
|
||||
print("No topic given for blogpost, generating one.")
|
||||
topic = await answer_question("Give me one topic for an absurd blogpost.")
|
||||
|
||||
|
||||
post_div = '''<!--replace this with a post-->
|
||||
<div class="post">
|
||||
<h2 class="post-title"><!--POST_TITLE--></h2>
|
||||
<p class="post-date"><!--POST_DATE--></p>
|
||||
<div class="post-content">
|
||||
<!--POST_CONTENT-->
|
||||
</div>
|
||||
</div>'''
|
||||
title_prompt = 'generate an absurd essay title about ' + topic
|
||||
title = await answer_question(title_prompt, model="gpt-3.5-turbo")
|
||||
prompt = 'Write a satirical essay with a serious tone titled: "' + title + '". Do not label parts of the essay.'
|
||||
content = await answer_question(prompt, model="gpt-4")
|
||||
if title in content[:len(title)]:
|
||||
content = content.replace(title, '', 1)
|
||||
content = f"<p>{content}</p>"
|
||||
content = content.replace('\n\n', "</p><p>")
|
||||
content = content.replace("<p></p>", '')
|
||||
|
||||
post_div = post_div.replace("<!--POST_TITLE-->", title)
|
||||
post_div = post_div.replace("<!--POST_DATE-->", date)
|
||||
post_div = post_div.replace("<!--POST_CONTENT-->", content)
|
||||
|
||||
html_data = html_data.replace("<!--replace this with a post-->", post_div)
|
||||
with open(filename, 'w', encoding="utf-8") as f:
|
||||
f.write(html_data)
|
||||
await upload_sftp(filename, (os.getenv('ftp_public_html') + 'ai-blog/'), "index.html")
|
||||
run_time = time.time() - start_time
|
||||
print("It took " + str(run_time) + " seconds to generate the blog post!")
|
||||
output = "Blog Updated! (" + str(run_time) + " seconds) https://ai.phixxy.com/ai-blog"
|
||||
#output += '\nNotifying subscribers: '
|
||||
#for subscriber in blog_subscribers:
|
||||
# output += '<@' + subscriber + '> '
|
||||
print(output)
|
||||
|
||||
@commands.command(
|
||||
description="Question",
|
||||
help="Ask a raw chatgpt question. Usage: !question (question)",
|
||||
brief="Get an answer"
|
||||
)
|
||||
async def question(ctx):
|
||||
question = ctx.message.content.split(" ", maxsplit=1)[1]
|
||||
answer = await answer_question(question)
|
||||
chunks = [answer[i:i+1999] for i in range(0, len(answer), 1999)]
|
||||
for chunk in chunks:
|
||||
await ctx.send(chunk)
|
||||
|
||||
@commands.command(
|
||||
description="Question GPT4",
|
||||
help="Ask GPT4 a question. Usage: !question_gpt4 (question)",
|
||||
brief="Get an answer"
|
||||
)
|
||||
async def question_gpt4(ctx):
|
||||
question = ctx.message.content.split(" ", maxsplit=1)[1]
|
||||
answer = await answer_question(question, "gpt-4-vision-preview")
|
||||
chunks = [answer[i:i+1999] for i in range(0, len(answer), 1999)]
|
||||
for chunk in chunks:
|
||||
await ctx.send(chunk)
|
||||
|
||||
@commands.command(
|
||||
description="Image GPT4",
|
||||
help="Ask GPT4 a question about an image. Usage: !question_gpt4 (link) (question)",
|
||||
brief="Get an answer"
|
||||
)
|
||||
async def looker(ctx):
|
||||
image_link = ctx.message.content.split(" ", maxsplit=2)[1]
|
||||
question = ctx.message.content.split(" ", maxsplit=2)[2]
|
||||
|
||||
headers = {
|
||||
'Content-Type': 'application/json',
|
||||
'Authorization': f'Bearer {os.getenv("openai.api_key")}',
|
||||
}
|
||||
|
||||
data = {
|
||||
"model": "gpt-4-vision-preview",
|
||||
"messages": [{"role": "user", "content": [{"type": "text", "text": question},{"type": "image_url","image_url": {"url": image_link}}]}],
|
||||
"max_tokens": 500
|
||||
}
|
||||
|
||||
url = "https://api.openai.com/v1/chat/completions"
|
||||
|
||||
try:
|
||||
http_session = aiohttp.ClientSession()
|
||||
async with http_session.post(url, headers=headers, json=data) as resp:
|
||||
response_data = await resp.json()
|
||||
print(response_data)
|
||||
answer = response_data['choices'][0]['message']['content']
|
||||
await http_session.close()
|
||||
|
||||
|
||||
except Exception as error:
|
||||
return await handle_error(error)
|
||||
|
||||
chunks = [answer[i:i+1999] for i in range(0, len(answer), 1999)]
|
||||
for chunk in chunks:
|
||||
await ctx.send(chunk)
|
||||
|
||||
@commands.command(
|
||||
description="Website",
|
||||
help="Generates a website using gpt 3.5. Usage: !website (topic)",
|
||||
brief="Generate a website"
|
||||
)
|
||||
async def website(ctx):
|
||||
async def delete_local_pngs(local_folder):
|
||||
for filename in os.listdir(local_folder):
|
||||
if ".png" in filename:
|
||||
os.remove(local_folder + filename)
|
||||
|
||||
async def delete_ftp_pngs(server_folder):
|
||||
async with asyncssh.connect(os.getenv('ftp_server'), username=os.getenv('ftp_username'), password=os.getenv('ftp_password')) as conn:
|
||||
async with conn.start_sftp_client() as sftp:
|
||||
for filename in (await sftp.listdir(server_folder)):
|
||||
if '.png' in filename:
|
||||
try:
|
||||
print("Deleting", filename)
|
||||
await sftp.remove(server_folder+filename)
|
||||
except:
|
||||
print("Couldn't delete", filename)
|
||||
|
||||
async def extract_image_tags(code):
|
||||
count = code.count("<img")
|
||||
tags = []
|
||||
for x in range(0,count):
|
||||
index1 = code.find("<img")
|
||||
index2 = code[index1:].find(">") + index1 + 1
|
||||
img_tag = code[index1:index2]
|
||||
tags.append(img_tag)
|
||||
code = code[index2:]
|
||||
return tags
|
||||
|
||||
async def extract_image_alt_text(tags):
|
||||
alt_texts = []
|
||||
for tag in tags:
|
||||
index1 = tag.find("alt") + 5
|
||||
index2 = tag[index1:].find("\"") + index1
|
||||
alt_text = tag[index1:index2]
|
||||
alt_texts.append(alt_text)
|
||||
return alt_texts
|
||||
|
||||
async def generate_images(local_folder, image_list):
|
||||
url = os.getenv('stablediffusion_url')
|
||||
if url == "disabled":
|
||||
return
|
||||
file_list = []
|
||||
for image in image_list:
|
||||
filename = image.replace(" ", "").lower() + ".png"
|
||||
payload = {"prompt": image, "steps": 25}
|
||||
http_session = aiohttp.ClientSession()
|
||||
response = await http_session.post(url=f'{url}/sdapi/v1/txt2img', json=payload)
|
||||
r = await response.json()
|
||||
for i in r['images']:
|
||||
image = Image.open(io.BytesIO(base64.b64decode(i.split(",", 1)[0])))
|
||||
png_payload = {"image": "data:image/png;base64," + i}
|
||||
response2 = await http_session.post(url=f'{url}/sdapi/v1/png-info', json=png_payload)
|
||||
pnginfo = PngImagePlugin.PngInfo()
|
||||
json_response = await response2.json()
|
||||
pnginfo.add_text("parameters", json_response.get("info"))
|
||||
image.save(local_folder + filename, pnginfo=pnginfo)
|
||||
file_list.append(filename)
|
||||
await http_session.close()
|
||||
return file_list
|
||||
|
||||
async def add_image_filenames(code, file_list):
|
||||
for filename in file_list:
|
||||
code = code.replace("src=\"\"", "src=\""+ filename + "\"", 1)
|
||||
return code
|
||||
|
||||
async def upload_html_and_imgs(local_folder, server_folder):
|
||||
|
||||
for filename in os.listdir(local_folder):
|
||||
if ".png" in filename:
|
||||
await upload_sftp(local_folder + filename, (os.getenv('ftp_public_html') + 'ai-webpage/'), filename)
|
||||
#explicitly upload html files last!
|
||||
for filename in os.listdir(local_folder):
|
||||
if ".html" in filename:
|
||||
await upload_sftp(local_folder + filename, (os.getenv('ftp_public_html') + 'ai-webpage/'), filename)
|
||||
|
||||
|
||||
server_folder = os.getenv('ftp_public_html') + 'ai-webpage/'
|
||||
local_folder = "tmp/webpage/"
|
||||
working_file = local_folder + "index.html"
|
||||
if not os.path.exists(local_folder):
|
||||
os.mkdir(local_folder)
|
||||
|
||||
try:
|
||||
await ctx.send("Please wait, this will take a long time! You will be able to view the website here: https://ai.phixxy.com/ai-webpage/")
|
||||
with open(working_file, "w") as f:
|
||||
f.write("<!DOCTYPE html><html><head><script>setTimeout(function(){location.reload();}, 10000);</script><title>Generating Website</title><style>body {font-size: 24px;text-align: center;margin-top: 100px;}</style></head><body><p>This webpage is currently being generated. The page will refresh once it is complete. Please be patient.</p></body></html>")
|
||||
await upload_sftp(working_file, server_folder, "index.html")
|
||||
topic = ctx.message.content.split(" ", maxsplit=1)[1]
|
||||
prompt = "Generate a webpage using html and inline css. The webpage topic should be " + topic + ". Feel free to add image tags with alt text. Leave the image source blank. The images will be added later."
|
||||
code = await answer_question(prompt)
|
||||
|
||||
|
||||
await delete_local_pngs(local_folder)
|
||||
await delete_ftp_pngs(server_folder)
|
||||
|
||||
tags = await extract_image_tags(code)
|
||||
alt_texts = await extract_image_alt_text(tags)
|
||||
file_list = await generate_images(local_folder, alt_texts)
|
||||
code = await add_image_filenames(code, file_list)
|
||||
|
||||
with open(working_file, 'w') as f:
|
||||
f.write(code)
|
||||
f.close()
|
||||
|
||||
await upload_html_and_imgs(local_folder, server_folder)
|
||||
|
||||
await ctx.send("Finished https://ai.phixxy.com/ai-webpage/")
|
||||
except Exception as error:
|
||||
await handle_error(error)
|
||||
await ctx.send("Failed, Try again.")
|
||||
|
||||
@tasks.loop(seconds=1)
|
||||
async def ai_task_loop():
|
||||
current_time = time.localtime()
|
||||
if current_time.tm_hour == 17 and current_time.tm_min == 0 and current_time.tm_sec == 0:
|
||||
try:
|
||||
await generate_blog()
|
||||
except Exception as error:
|
||||
await handle_error(error)
|
||||
|
||||
async def setup(bot):
|
||||
bot.add_command(question)
|
||||
bot.add_command(question_gpt4)
|
||||
bot.add_command(generate_blog)
|
||||
bot.add_command(blog)
|
||||
bot.add_command(website)
|
||||
bot.add_command(looker)
|
||||
#ai_task_loop.start() #I don't know if this will work or not
|
||||
410
extensions/stable_diffusion.py
Normal file
410
extensions/stable_diffusion.py
Normal file
|
|
@ -0,0 +1,410 @@
|
|||
# Extension for sparkytron 3000
|
||||
# This extension enables the ability to generate AI artwork using the AUTOMATIC1111 API
|
||||
import io
|
||||
import base64
|
||||
import os
|
||||
import time
|
||||
import random
|
||||
from PIL import Image, PngImagePlugin
|
||||
import discord
|
||||
from discord.ext import commands
|
||||
|
||||
|
||||
class StableDiffusion(commands.Cog):
|
||||
|
||||
def __init__(self, bot):
|
||||
self.bot = bot
|
||||
self.stable_diffusion_url = os.getenv("stablediffusion_url") # Change this to stable_diffusion_url
|
||||
self.working_dir = "tmp/stable_diffusion/"
|
||||
self.db_dir = "db/stable_diffusion/"
|
||||
self.folder_setup()
|
||||
|
||||
def folder_setup(self):
|
||||
try:
|
||||
if not os.path.exists(self.working_dir):
|
||||
os.mkdir(self.working_dir)
|
||||
os.mkdir(f"{self.working_dir}sfw")
|
||||
os.mkdir(f"{self.working_dir}nsfw")
|
||||
if not os.path.exists(self.db_dir):
|
||||
os.mkdir(self.db_dir)
|
||||
except:
|
||||
print("StableDiffusion failed to make directories")
|
||||
|
||||
async def answer_question(self, topic, model="gpt-3.5-turbo"): # Only needed for draw command
|
||||
headers = {
|
||||
'Content-Type': 'application/json',
|
||||
'Authorization': f'Bearer {os.getenv("openai.api_key")}',
|
||||
}
|
||||
data = {
|
||||
"model": model,
|
||||
"messages": [{"role": "user", "content": topic}]
|
||||
}
|
||||
url = "https://api.openai.com/v1/chat/completions"
|
||||
try:
|
||||
async with self.bot.http_session.post(url, headers=headers, json=data) as resp:
|
||||
response_data = await resp.json()
|
||||
response = response_data['choices'][0]['message']['content']
|
||||
return response
|
||||
except Exception as error:
|
||||
return await self.handle_error(error)
|
||||
|
||||
async def handle_error(self, error): # This needs to be deleted and replaced with logging
|
||||
print(error)
|
||||
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||||
log_line = current_time + ': ' + str(error) + '\n'
|
||||
file_name = self.db_dir + "error_log.log"
|
||||
with open(file_name, 'a') as f:
|
||||
f.write(log_line)
|
||||
return error
|
||||
|
||||
def get_kv_from_ctx(self, ctx):
|
||||
try:
|
||||
prompt = ctx.message.content.split(" ", maxsplit=1)[1]
|
||||
kv_strings = list(filter(lambda x: '=' in x,prompt.split(' ')))
|
||||
key_value_pairs = dict(map(lambda a: a.replace(',','').split('='),kv_strings))
|
||||
return key_value_pairs
|
||||
except:
|
||||
return None
|
||||
|
||||
def get_prompt_from_ctx(self, ctx):
|
||||
try:
|
||||
prompt = ctx.message.content.split(" ", maxsplit=1)[1]
|
||||
prompt = ' '.join(list(filter(lambda x: '=' not in x,prompt.split(' '))))
|
||||
return prompt
|
||||
except:
|
||||
return None
|
||||
|
||||
async def my_open_img_file(self, path):
|
||||
img = Image.open(path)
|
||||
encoded = ""
|
||||
with io.BytesIO() as output:
|
||||
img.save(output, format="PNG")
|
||||
contents = output.getvalue()
|
||||
encoded = str(base64.b64encode(contents), encoding='utf-8')
|
||||
img.close()
|
||||
return encoded
|
||||
|
||||
async def look_at(self, ctx, look=False):
|
||||
metadata = ""
|
||||
if look:
|
||||
url = self.stable_diffusion_url
|
||||
if url == "disabled":
|
||||
return
|
||||
for attachment in ctx.attachments:
|
||||
if attachment.url.endswith(('.jpg', '.png')):
|
||||
print("image seen")
|
||||
async with self.bot.http_session.get(attachment.url) as response:
|
||||
imageName = self.working_dir + str(time.time_ns()) + '.png'
|
||||
|
||||
with open(imageName, 'wb') as out_file:
|
||||
print('Saving image: ' + imageName)
|
||||
while True:
|
||||
chunk = await response.content.read(1024)
|
||||
if not chunk:
|
||||
break
|
||||
out_file.write(chunk)
|
||||
|
||||
img_link = self.my_open_img_file(imageName)
|
||||
|
||||
try:
|
||||
payload = {"image": img_link}
|
||||
async with self.bot.http_session.post(f'{url}/sdapi/v1/interrogate', json=payload) as response:
|
||||
data = await response.json()
|
||||
description = data.get("caption")
|
||||
description = description.split(',')[0]
|
||||
metadata += f"<image:{description}>\n"
|
||||
except self.bot.aiohttp.ClientError as error:
|
||||
await self.handle_error(error)
|
||||
return "ERROR: CLIP may not be running. Could not look at image."
|
||||
return metadata
|
||||
|
||||
async def generate_prompt(self):
|
||||
choice1 = "Give me 11 keywords I can use to generate art using AI. They should all be related to one piece of art. Please only respond with the keywords and no other text. Be sure to use keywords that really describe what the art portrays. Keywords should be comma separated with no other text!"
|
||||
choice2 = "Describe a creative scene, use only one sentence"
|
||||
choice3 = "Give me comma seperated keywords describing an imaginary piece of art. Only return the keywords and no other text."
|
||||
choice4 = "Describe a unique character and an environment in one sentence"
|
||||
choice5 = "Describe a nonhuman character and an environment in one sentence"
|
||||
prompt = random.choice([choice1,choice2,choice3,choice4,choice5])
|
||||
prompt = await self.answer_question(prompt)
|
||||
if random.randint(0,9):
|
||||
prompt = prompt.replace("abstract, ", "")
|
||||
prompt = prompt.replace("AI, ", "")
|
||||
if "." in prompt:
|
||||
prompt = prompt.replace(".",",")
|
||||
prompt = prompt + " masterpiece, studio quality"
|
||||
else:
|
||||
prompt = prompt + ", masterpiece, studio quality"
|
||||
return prompt
|
||||
|
||||
|
||||
@commands.command(
|
||||
description="Change Model",
|
||||
help="Choose from a list of stable diffusion models.",
|
||||
brief="Change stable diffusion model"
|
||||
)
|
||||
async def change_model(self, ctx, model_choice='0'): # Needs to be a configurable list of models
|
||||
model_choices = {
|
||||
'1': ("deliberate_v2.safetensors [9aba26abdf]", "DeliberateV2"),
|
||||
'2': ("flat2DAnimerge_v30.safetensors [5dd56bfa12]", "Flat2D"),
|
||||
'3': ("Anything-V3.0.ckpt [8712e20a5d]", "AnythingV3"),
|
||||
'4': ("aZovyaPhotoreal_v2.safetensors [dde3b17c05]", "PhotorealV2"),
|
||||
'5': ("Pixel_Art_V1_PublicPrompts.ckpt [0f02127697]", "Pixel Art"),
|
||||
'6': ("mistoonAnime_v20.safetensors [c35e1054c0]", "Mistoon AnimeV2")
|
||||
}
|
||||
url = self.stable_diffusion_url
|
||||
if url == "disabled":
|
||||
await ctx.send("This command is currently disabled")
|
||||
else:
|
||||
async with self.bot.http_session.get(url=f'{url}/sdapi/v1/options') as response:
|
||||
config_json = await response.json()
|
||||
|
||||
current_model = config_json["sd_model_checkpoint"]
|
||||
output = 'Current Model: ' + current_model + '\n'
|
||||
|
||||
if model_choice in model_choices:
|
||||
model_id, model_name = model_choices[model_choice]
|
||||
if current_model != model_id:
|
||||
payload = {"sd_model_checkpoint": model_id}
|
||||
async with self.bot.http_session.post(url=f'{url}/sdapi/v1/options', json=payload) as response:
|
||||
output = "Changed model to: " + model_name
|
||||
await ctx.send(output)
|
||||
return
|
||||
else:
|
||||
await ctx.send(f"Already set to use {model_name}")
|
||||
return
|
||||
else:
|
||||
output = '\n'.join([f"{choice}: {name}" for choice, name in model_choices.items()])
|
||||
await ctx.send(output)
|
||||
|
||||
@commands.command(
|
||||
description="Lora",
|
||||
help="List the stable diffusion loras.",
|
||||
brief="List the stable diffusion loras"
|
||||
)
|
||||
async def lora(self, ctx):
|
||||
lora_choices = {
|
||||
'0': ("Lora Name", "Trigger Words"),
|
||||
'1': ("<lora:rebecca:1>", "rebecca (cyberpunk)"),
|
||||
'2': ("<lora:lucy:1>", "lucy (cyberpunk)"),
|
||||
'3': ("<lora:dirty:1>", "dirty"),
|
||||
'4': ("<lora:starcraft:1>", "c0nst3llation")
|
||||
}
|
||||
output = ""
|
||||
lora_options = '\n'.join([f"{choice}: {name}" for choice, name in lora_choices.items()])
|
||||
output += lora_options
|
||||
await ctx.send(output)
|
||||
|
||||
@commands.command(
|
||||
description="Imagine",
|
||||
help="Generate an image using stable diffusion. You can add keyword arguments to your prompt and they will be treated as stable diffusion options. Usage !imagine (topic)",
|
||||
brief="Generate an image"
|
||||
)
|
||||
async def imagine(self, ctx):
|
||||
url = self.stable_diffusion_url
|
||||
if url == "disabled":
|
||||
await ctx.send("Command is currently disabled.")
|
||||
return
|
||||
else:
|
||||
url=f"{url}/sdapi/v1/txt2img"
|
||||
prompt = self.get_prompt_from_ctx(ctx)
|
||||
key_value_pairs = self.get_kv_from_ctx(ctx)
|
||||
if prompt == None:
|
||||
prompt = await self.generate_prompt()
|
||||
try:
|
||||
neg_prompt_file = f"{self.db_dir}negative_prompt.txt"
|
||||
with open(neg_prompt_file, 'r') as f:
|
||||
negative_prompt = f.readline()
|
||||
except:
|
||||
neg_prompt_file = f"{self.db_dir}negative_prompt.txt"
|
||||
with open(neg_prompt_file, 'w') as f:
|
||||
f.writelines("")
|
||||
negative_prompt = ""
|
||||
await ctx.send(f"Please be patient this may take some time! Generating: {prompt}.")
|
||||
payload = {
|
||||
"prompt": prompt,
|
||||
"steps": 25,
|
||||
"negative_prompt": negative_prompt
|
||||
}
|
||||
headers = {
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
if key_value_pairs:
|
||||
payload.update(key_value_pairs)
|
||||
try:
|
||||
async with self.bot.http_session.post(url, headers=headers, json=payload) as resp:
|
||||
r = await resp.json()
|
||||
except Exception as error:
|
||||
await ctx.send("My image generation service may not be running.")
|
||||
await self.handle_error(error)
|
||||
|
||||
for i in r['images']:
|
||||
image = Image.open(io.BytesIO(base64.b64decode(i.split(",", 1)[0])))
|
||||
png_payload = {"image": "data:image/png;base64," + i}
|
||||
|
||||
try:
|
||||
async with self.bot.http_session.post(url, json=png_payload) as resp:
|
||||
response2 = await resp.json()
|
||||
except Exception as error:
|
||||
await ctx.send("My image generation service may not be running.")
|
||||
await self.handle_error(self, error)
|
||||
|
||||
pnginfo = PngImagePlugin.PngInfo()
|
||||
pnginfo.add_text("parameters", response2.get("info"))
|
||||
try:
|
||||
if ctx.channel.is_nsfw():
|
||||
folder = self.working_dir + "nsfw/"
|
||||
else:
|
||||
folder = self.working_dir + "sfw"
|
||||
except:
|
||||
folder = self.working_dir
|
||||
my_filename = folder + str(time.time_ns()) + ".png"
|
||||
image.save(my_filename, pnginfo=pnginfo)
|
||||
|
||||
with open(my_filename, "rb") as fh:
|
||||
f = discord.File(fh, filename=my_filename)
|
||||
|
||||
log_data = f'Author: {ctx.author.name}, Prompt: {prompt}, Filename: {my_filename}\n'
|
||||
with open(f"{self.db_dir}stable_diffusion.log", 'a') as log_file:
|
||||
log_file.writelines(log_data)
|
||||
|
||||
await ctx.send(f'Generated by: {ctx.author.name}\nPrompt: {prompt}', file=f)
|
||||
|
||||
|
||||
@commands.command(
|
||||
description="Describe",
|
||||
help="Get better understanding of what the bot \"sees\" when you post an image! (Runs it through CLIP) Usage !describe (image link)",
|
||||
brief="Describe image"
|
||||
)
|
||||
async def describe(self, ctx):
|
||||
url = self.stable_diffusion_url
|
||||
if url == "disabled":
|
||||
await ctx.send("Command is currently disabled")
|
||||
return
|
||||
else:
|
||||
url=f"{url}/sdapi/v1/interrogate"
|
||||
try:
|
||||
if ctx.message.content.startswith("!describe "):
|
||||
file_url = ctx.message.content.split(" ", maxsplit=1)[1]
|
||||
elif ctx.message.attachments:
|
||||
file_url = ctx.message.attachments[0].url
|
||||
else:
|
||||
print("No image linked or attached.")
|
||||
return
|
||||
except Exception as error:
|
||||
await self.handle_error(error)
|
||||
print("Couldn't find image.")
|
||||
return
|
||||
async with self.bot.http_session.get(file_url) as response:
|
||||
imageName = self.working_dir + str(time.time_ns()) + ".png"
|
||||
with open(imageName, 'wb') as out_file:
|
||||
print(f"Saving image: {imageName}")
|
||||
while True:
|
||||
chunk = await response.content.read(1024)
|
||||
if not chunk:
|
||||
break
|
||||
out_file.write(chunk)
|
||||
|
||||
img_link = self.my_open_img_file(imageName)
|
||||
try:
|
||||
payload = {"image": img_link}
|
||||
async with self.bot.http_session.post(url, json=payload) as response:
|
||||
r = await response.json()
|
||||
print(r)
|
||||
await ctx.send(r.get("caption"))
|
||||
except Exception as error:
|
||||
await self.handle_error(error)
|
||||
await ctx.send("My image generation service may not be running.")
|
||||
|
||||
@commands.command(
|
||||
description="Reimagine",
|
||||
help="Reimagine an image as something else. One example is reimagining a picture as anime. This command can be hard to use. \nUsage: !reimagine (image link) (topic)\nExample: !reimagine (image link) anime",
|
||||
brief="Reimagine an image"
|
||||
)
|
||||
async def reimagine(self, ctx):
|
||||
url = self.stable_diffusion_url
|
||||
if url == "disabled":
|
||||
await ctx.send("Command is currently disabled")
|
||||
return
|
||||
try:
|
||||
if ctx.message.attachments:
|
||||
file_url = ctx.message.attachments[0].url
|
||||
prompt = ctx.message.content.split(" ", maxsplit=1)[1]
|
||||
elif ctx.message.content.startswith("!reimagine "):
|
||||
file_url = ctx.message.content.split(" ", maxsplit=2)[1]
|
||||
prompt = ctx.message.content.split(" ", maxsplit=2)[2]
|
||||
else:
|
||||
print("No image linked or attached.")
|
||||
return
|
||||
except Exception as error:
|
||||
await self.handle_error(error)
|
||||
print("Couldn't find image.")
|
||||
return
|
||||
prompt = self.get_prompt_from_ctx(ctx)
|
||||
key_value_pairs = self.get_kv_from_ctx(ctx)
|
||||
try:
|
||||
async with self.bot.http_session.get(file_url) as response:
|
||||
imageName = self.working_dir + str(time.time_ns()) + ".png"
|
||||
with open(imageName, 'wb') as out_file:
|
||||
print(f"Saving image: {imageName}")
|
||||
while True:
|
||||
chunk = await response.content.read(1024)
|
||||
if not chunk:
|
||||
break
|
||||
out_file.write(chunk)
|
||||
|
||||
except Exception as error:
|
||||
await ctx.send("My image generation service may not be running.")
|
||||
await self.handle_error(error)
|
||||
|
||||
img_link = self.my_open_img_file(imageName)
|
||||
|
||||
negative_prompt = "badhandsv4, worst quality, lowres, EasyNegative, hermaphrodite, cropped, not in the frame, additional faces, jpeg large artifacts, jpeg small artifacts, ugly, tiling, poorly drawn hands, poorly drawn feet, poorly drawn face, out of frame, extra limbs, disfigured, deformed, body out of frame, blurry, bad anatomy, blurred, watermark, grainy, signature, cut off, draft, not finished drawing, unfinished image, bad eyes, doll, 3d, cartoon, (bad eyes:1.2), (worst quality:1.2), (low quality:1.2), bad-image-v2-39000, (bad_prompt_version2:0.8), nude, badhandv4 By bad artist -neg easynegative ng_deepnegative_v1_75t verybadimagenegative_v1.3, (Worst Quality, Low Quality:1.4), Poorly Made Bad 3D, Lousy Bad Realistic, nsfw,(worst quality, low quality:1.4), (lip, nose, tooth, rouge, lipstick, eyeshadow:1.4), ( jpeg artifacts:1.4), (depth of field, bokeh, blurry, film grain, chromatic aberration, lens flare:1.0), (1boy, abs, muscular, rib:1.0), greyscale, monochrome, dusty sunbeams, trembling, motion lines, motion blur, emphasis lines, text, title, logo, signature, child, childlike, young, easynegative, (bad-hands-5:0.8), plain background, monochrome, poorly drawn face, poorly drawn hands, watermark, censored, (mutated hands and fingers), ugly, worst quality, low quality,, nsfw,(worst quality, low quality:1.4), (lip, nose, tooth, rouge, lipstick, eyeshadow:1.4), ( jpeg artifacts:1.4), (depth of field, bokeh, blurry, film grain, chromatic aberration, lens flare:1.0), (1boy, abs, muscular, rib:1.0), greyscale, monochrome, dusty sunbeams, trembling, motion lines, motion blur, emphasis lines, text, title, logo, signature, child, childlike, young"
|
||||
|
||||
await ctx.send("Please be patient this may take some time! Generating: " + prompt + ".")
|
||||
|
||||
payload = {"init_images": [img_link], "prompt": prompt, "steps": 40, "negative_prompt": negative_prompt, "denoising_strength": 0.5}
|
||||
payload.update(key_value_pairs)
|
||||
|
||||
try:
|
||||
async with self.bot.http_session.post(url=f'{url}/sdapi/v1/img2img', json=payload) as response:
|
||||
data = await response.json()
|
||||
for i in data['images']:
|
||||
if not os.path.isdir(f"{self.working_dir}reimagined/"+ str(ctx.author.id)):
|
||||
os.makedirs(f"{self.working_dir}reimagined/"+ str(ctx.author.id))
|
||||
image = Image.open(io.BytesIO(base64.b64decode(i.split(",",1)[0])))
|
||||
png_payload = {"image": "data:image/png;base64," + i}
|
||||
async with self.bot.http_session.post(url=f'{url}/sdapi/v1/png-info', json=png_payload) as resp2:
|
||||
response2 = await resp2.json()
|
||||
pnginfo = PngImagePlugin.PngInfo()
|
||||
pnginfo.add_text("parameters", response2.get("info"))
|
||||
my_filename = self.working_dir + str(time.time_ns()) + ".png"
|
||||
image.save(my_filename, pnginfo=pnginfo)
|
||||
with open(my_filename, "rb") as fh:
|
||||
f = discord.File(fh, filename=my_filename)
|
||||
await ctx.send(file=f)
|
||||
except Exception as error:
|
||||
await ctx.send("My image generation service may not be running.")
|
||||
await self.handle_error(error)
|
||||
|
||||
@commands.command(
|
||||
description="Negative Prompt",
|
||||
help="Changes the negative prompt for imagine across all channels",
|
||||
brief="Change the negative prompt for imagine"
|
||||
)
|
||||
async def negative_prompt(self, ctx, *args):
|
||||
message = ' '.join(args)
|
||||
if not message:
|
||||
message = "easynegative, badhandv4, verybadimagenegative_v1.3"
|
||||
neg_prompt_file = f"{self.db_dir}negative_prompt.txt"
|
||||
with open(neg_prompt_file, 'w') as f:
|
||||
f.writelines(message)
|
||||
await ctx.send("Changed negative prompt to " + message)
|
||||
|
||||
|
||||
async def setup(bot):
|
||||
try:
|
||||
await bot.add_cog(StableDiffusion(bot))
|
||||
print("Successfully added StableDiffusion Cog")
|
||||
except:
|
||||
print("Failed to load StableDiffusion Cog")
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue