364 lines
No EOL
17 KiB
Python
364 lines
No EOL
17 KiB
Python
import os
|
|
import io
|
|
import base64
|
|
import logging
|
|
import time
|
|
import html
|
|
import aiohttp
|
|
import asyncssh
|
|
from PIL import Image, PngImagePlugin
|
|
from discord.ext import commands, tasks
|
|
|
|
class PhixxyCom(commands.Cog):
|
|
|
|
def __init__(self, bot):
|
|
self.bot = bot
|
|
self.SERVER = os.getenv('ftp_server')
|
|
self.USERNAME = os.getenv('ftp_username')
|
|
self.PASSWORD = os.getenv('ftp_password')
|
|
self.working_dir = "tmp/phixxy.com/"
|
|
self.data_dir = "data/phixxy.com/"
|
|
self.folder_setup()
|
|
self.stable_diffusion_log = "data/stable_diffusion/stable_diffusion.log"
|
|
self.logger = logging.getLogger("bot")
|
|
self.phixxy_loop.start()
|
|
self.blog_loop.start()
|
|
self.http_session = self.create_aiohttp_session()
|
|
|
|
def create_aiohttp_session(self):
|
|
return aiohttp.ClientSession()
|
|
|
|
def folder_setup(self):
|
|
try:
|
|
if not os.path.exists(self.working_dir):
|
|
os.mkdir(self.working_dir)
|
|
if not os.path.exists(self.data_dir):
|
|
os.mkdir(self.data_dir)
|
|
except:
|
|
self.logger.exception("PhixxyCom failed to make directories")
|
|
|
|
def find_prompt_from_filename(self, sd_log, filename):
|
|
with open(sd_log, 'r') as f:
|
|
lines = f.readlines()
|
|
for line in reversed(lines):
|
|
if filename in line:
|
|
try:
|
|
prompt = line[line.index("Prompt: ") + 7:line.index("Filename: ")]
|
|
prompt = ''.join(prompt.rsplit(',', 1)) # Remove the last comma
|
|
return html.escape(prompt)
|
|
except:
|
|
self.logger.exception("PhixxyCom failed to find prompt from filename")
|
|
return "Unknown Prompt"
|
|
return "Unknown Prompt"
|
|
|
|
async def upload_sftp(self, local_filename, server_folder, server_filename):
|
|
remotepath = server_folder + server_filename
|
|
async with asyncssh.connect(self.SERVER, username=self.USERNAME, password=self.PASSWORD) as conn:
|
|
async with conn.start_sftp_client() as sftp:
|
|
await sftp.put(local_filename, remotepath=remotepath)
|
|
|
|
async def delete_local_pngs(self, local_folder):
|
|
for filename in os.listdir(local_folder):
|
|
if ".png" in filename:
|
|
os.remove(local_folder + filename)
|
|
|
|
async def delete_ftp_pngs(self,server_folder):
|
|
async with asyncssh.connect(os.getenv('ftp_server'), username=os.getenv('ftp_username'), password=os.getenv('ftp_password')) as conn:
|
|
async with conn.start_sftp_client() as sftp:
|
|
for filename in (await sftp.listdir(server_folder)):
|
|
if '.png' in filename:
|
|
try:
|
|
self.logger.debug("Deleting", filename)
|
|
await sftp.remove(server_folder+filename)
|
|
except:
|
|
self.logger.exception("Couldn't delete", filename)
|
|
|
|
async def extract_image_tags(self,code):
|
|
count = code.count("<img")
|
|
tags = []
|
|
for x in range(0,count):
|
|
index1 = code.find("<img")
|
|
index2 = code[index1:].find(">") + index1 + 1
|
|
img_tag = code[index1:index2]
|
|
tags.append(img_tag)
|
|
code = code[index2:]
|
|
return tags
|
|
|
|
async def extract_image_alt_text(self,tags):
|
|
alt_texts = []
|
|
for tag in tags:
|
|
index1 = tag.find("alt") + 5
|
|
index2 = tag[index1:].find("\"") + index1
|
|
alt_text = tag[index1:index2]
|
|
alt_texts.append(alt_text)
|
|
return alt_texts
|
|
|
|
async def generate_images(self, local_folder, image_list):
|
|
url = os.getenv('stablediffusion_url')
|
|
if url == "disabled":
|
|
return
|
|
file_list = []
|
|
for image in image_list:
|
|
filename = image.replace(" ", "").lower() + ".png"
|
|
payload = {"prompt": image, "steps": 25}
|
|
response = await self.http_session.post(url=f'{url}/sdapi/v1/txt2img', json=payload)
|
|
r = await response.json()
|
|
for i in r['images']:
|
|
image = Image.open(io.BytesIO(base64.b64decode(i.split(",", 1)[0])))
|
|
png_payload = {"image": "data:image/png;base64," + i}
|
|
response2 = await self.http_session.post(url=f'{url}/sdapi/v1/png-info', json=png_payload)
|
|
pnginfo = PngImagePlugin.PngInfo()
|
|
json_response = await response2.json()
|
|
pnginfo.add_text("parameters", json_response.get("info"))
|
|
image.save(local_folder + filename, pnginfo=pnginfo)
|
|
file_list.append(filename)
|
|
return file_list
|
|
|
|
async def add_image_filenames(self, code, file_list):
|
|
for filename in file_list:
|
|
code = code.replace("src=\"\"", "src=\""+ filename + "\"", 1)
|
|
return code
|
|
|
|
async def upload_html_and_imgs(self, local_folder):
|
|
|
|
for filename in os.listdir(local_folder):
|
|
if ".png" in filename:
|
|
await self.upload_sftp(local_folder + filename, (os.getenv('ftp_public_html') + 'ai-webpage/'), filename)
|
|
#explicitly upload html files last!
|
|
for filename in os.listdir(local_folder):
|
|
if ".html" in filename:
|
|
await self.upload_sftp(local_folder + filename, (os.getenv('ftp_public_html') + 'ai-webpage/'), filename)
|
|
|
|
async def delete_derp_files(self, server_folder):
|
|
async with asyncssh.connect(self.SERVER, username=self.USERNAME, password=self.PASSWORD) as conn:
|
|
async with conn.start_sftp_client() as sftp:
|
|
for filename in (await sftp.listdir(server_folder)):
|
|
if filename == '.' or filename == '..' or filename == 'style.css' or filename == 'myScript.js':
|
|
pass
|
|
else:
|
|
try:
|
|
self.logger.debug("Deleting", filename)
|
|
await sftp.remove(server_folder+filename)
|
|
except:
|
|
self.logger.exception("Couldn't delete", filename)
|
|
|
|
async def meme_handler(self, folder):
|
|
for f in os.listdir(folder):
|
|
filepath = folder + f
|
|
await self.update_meme_webpage(filepath)
|
|
|
|
async def update_meme_webpage(self, filename):
|
|
server_folder = (os.getenv('ftp_public_html') + 'ai-memes/')
|
|
new_file_name = str(time.time_ns()) + ".png"
|
|
await self.upload_sftp(filename, server_folder, new_file_name)
|
|
self.logger.debug(f"Uploaded {new_file_name}")
|
|
with open(f"{self.data_dir}ai-memes/index.html", 'r') as f:
|
|
html_data = f.read()
|
|
html_insert = '<!--ADD IMG HERE-->\n <img src="' + new_file_name + '" loading="lazy">'
|
|
html_data = html_data.replace('<!--ADD IMG HERE-->',html_insert)
|
|
with open(f"{self.data_dir}ai-memes/index.html", "w") as f:
|
|
f.writelines(html_data)
|
|
await self.upload_sftp(f"{self.data_dir}ai-memes/index.html", server_folder, "index.html")
|
|
os.rename(filename, 'tmp/' + new_file_name)
|
|
|
|
async def upload_ftp_ai_images(self, ai_dict):
|
|
try:
|
|
for folder in ai_dict:
|
|
for filename in os.listdir(folder):
|
|
if filename[-4:] == '.png':
|
|
filepath = folder + filename
|
|
self.logger.info(f"Found file = {filename}")
|
|
prompt = self.find_prompt_from_filename(ai_dict[folder], filename)
|
|
self.logger.info(f"Found prompt = {prompt}")
|
|
html_file = f"{self.data_dir}ai-images/index.html"
|
|
html_insert = '''<!--REPLACE THIS COMMENT-->
|
|
<div>
|
|
<img src="<!--filename-->" loading="lazy">
|
|
<p class="image-description"><!--description--></p>
|
|
</div>'''
|
|
server_folder = (os.getenv('ftp_public_html') + 'ai-images/')
|
|
new_filename = str(time.time_ns()) + ".png"
|
|
await self.upload_sftp(filepath, server_folder, new_filename)
|
|
self.logger.info(f"Uploaded {new_filename}")
|
|
with open(html_file, 'r') as f:
|
|
html_data = f.read()
|
|
html_insert = html_insert.replace("<!--filename-->", new_filename)
|
|
html_insert = html_insert.replace("<!--description-->", prompt)
|
|
html_data = html_data.replace("<!--REPLACE THIS COMMENT-->", html_insert)
|
|
with open(html_file, "w") as f:
|
|
f.writelines(html_data)
|
|
await self.upload_sftp(html_file, server_folder, "index.html")
|
|
os.rename(filepath, f"tmp/{new_filename}")
|
|
except:
|
|
self.logger.exception("Something went wrong in upload_ftp_ai_images")
|
|
|
|
async def answer_question(self, topic, model="gpt-4o-mini"):
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': f'Bearer {os.getenv("openai.api_key")}',
|
|
}
|
|
|
|
data = {
|
|
"model": model,
|
|
"messages": [{"role": "user", "content": topic}]
|
|
}
|
|
|
|
url = "https://api.openai.com/v1/chat/completions"
|
|
|
|
try:
|
|
async with self.http_session.post(url, headers=headers, json=data) as resp:
|
|
response_data = await resp.json()
|
|
response = response_data['choices'][0]['message']['content']
|
|
return response
|
|
|
|
except:
|
|
self.logger.exception("Error in answer_question")
|
|
return None
|
|
|
|
@commands.command(
|
|
description="Blog",
|
|
help="Adds your topic to the list of possible future blog topics. Usage: !blog (topic)",
|
|
brief="Suggest a blog topic"
|
|
)
|
|
async def blog(self, ctx, *args):
|
|
message = ' '.join(args)
|
|
if '\n' in message:
|
|
await ctx.send("Send only one topic at a time.")
|
|
return
|
|
else:
|
|
blogpost_file = f"{self.data_dir}blog_topics.txt"
|
|
with open(blogpost_file, 'a') as f:
|
|
f.writelines(message+'\n')
|
|
await ctx.send("Saved suggestion!")
|
|
|
|
def get_last_5_messages(self):
|
|
with open(f"data/chatgpt/logs/346102473993355267.log", 'r') as f:
|
|
lines = f.readlines()
|
|
last_5_messages = ""
|
|
for i in range(5,1,-1):
|
|
last_5_messages += lines[-i]
|
|
return last_5_messages
|
|
|
|
async def generate_blog(self, force=False):
|
|
start_time = time.time()
|
|
topic = ''
|
|
#filename = f"{self.data_dir}ai-blog/index.html"
|
|
#filename format year-month-day ie: 2021-01-01.md
|
|
filename = f"{time.strftime('%Y-%m-%d')}.md"
|
|
filepath = f"{self.data_dir}ai-blog/{time.strftime('%Y-%m-%d')}.md"
|
|
if os.path.exists(filepath) and not force:
|
|
return
|
|
date = time.strftime("%B %d, %Y")
|
|
blogpost_file = f"{self.data_dir}blog_topics.txt"
|
|
if os.path.isfile(blogpost_file):
|
|
with open(blogpost_file, 'r') as f:
|
|
blogpost_topics = f.read()
|
|
f.seek(0)
|
|
topic = f.readline()
|
|
blogpost_topics = blogpost_topics.replace(topic, '')
|
|
with open(blogpost_file, 'w') as f:
|
|
f.write(blogpost_topics)
|
|
if topic == '':
|
|
messages = self.get_last_5_messages()
|
|
question = f"you have a blog and you are inspired based on this short text chat interaction:\n{messages}\nwhat will the topic of your next blog be? just tell me the topic and a one sentence description"
|
|
self.logger.info("No topic given for blogpost, generating one.")
|
|
topic = await self.answer_question(question)
|
|
self.logger.info("Writing blogpost")
|
|
title_prompt = 'generate an absurd essay title about ' + topic
|
|
title = await self.answer_question(title_prompt, model="gpt-4o-mini")
|
|
prompt = 'Write a satirical essay with a serious tone titled: "' + title + '". Do not label parts of the essay.'
|
|
content = await self.answer_question(prompt, model="gpt-4o")
|
|
if title in content[:len(title)]:
|
|
content = content.replace(title, '', 1)
|
|
with open(filepath, 'w') as f:
|
|
f.write(f"# {title}\n\n*{date}*\n\n{content}")
|
|
await self.upload_sftp(filepath, (os.getenv('ftp_public_html') + 'ai-blog/content/'), filename)
|
|
run_time = time.time() - start_time
|
|
self.logger.debug("It took " + str(run_time) + " seconds to generate the blog post!")
|
|
output = f"Blog Updated! ({run_time} seconds) {title} https://ai.phixxy.com/ai-blog"
|
|
return output
|
|
|
|
@commands.command()
|
|
async def force_blog(self, ctx):
|
|
await ctx.send("Forcing blog generation")
|
|
await self.generate_blog(force=True)
|
|
|
|
@commands.command(
|
|
description="Website",
|
|
help="Generates a website using gpt 3.5. Usage: !website (topic)",
|
|
brief="Generate a website"
|
|
)
|
|
async def website(self, ctx):
|
|
server_folder = os.getenv('ftp_public_html') + 'ai-webpage/'
|
|
local_folder = f"{self.working_dir}/webpage/"
|
|
working_file = local_folder + "index.html"
|
|
if not os.path.exists(local_folder):
|
|
os.mkdir(local_folder)
|
|
try:
|
|
await ctx.send("Please wait, this will take a long time! You will be able to view the website here: https://ai.phixxy.com/ai-webpage/")
|
|
with open(working_file, "w") as f:
|
|
f.write("<!DOCTYPE html><html><head><script>setTimeout(function(){location.reload();}, 10000);</script><title>Generating Website</title><style>body {font-size: 24px;text-align: center;margin-top: 100px;}</style></head><body><p>This webpage is currently being generated. The page will refresh once it is complete. Please be patient.</p></body></html>")
|
|
await self.upload_sftp(working_file, server_folder, "index.html")
|
|
topic = ctx.message.content.split(" ", maxsplit=1)[1]
|
|
prompt = "Generate a webpage using html and inline css. The webpage topic should be " + topic + ". Feel free to add image tags with alt text. Leave the image source blank. The images will be added later."
|
|
code = await self.answer_question(prompt)
|
|
|
|
|
|
await self.delete_local_pngs(local_folder)
|
|
await self.delete_ftp_pngs(server_folder)
|
|
|
|
tags = await self.extract_image_tags(code)
|
|
alt_texts = await self.extract_image_alt_text(tags)
|
|
file_list = await self.generate_images(local_folder, alt_texts)
|
|
code = await self.add_image_filenames(code, file_list)
|
|
|
|
with open(working_file, 'w') as f:
|
|
f.write(code)
|
|
f.close()
|
|
|
|
await self.upload_html_and_imgs(local_folder)
|
|
|
|
await ctx.send("Finished https://ai.phixxy.com/ai-webpage/")
|
|
except Exception as error:
|
|
#await ctx.send("Failed, Try again.")
|
|
self.logger.exception("Website Error")
|
|
|
|
@tasks.loop(seconds=60)
|
|
async def phixxy_loop(self):
|
|
ai_images_dict = {
|
|
# Folder Path : Log Path
|
|
"tmp/stable_diffusion/sfw/":self.stable_diffusion_log,
|
|
"data/chatgpt/dalle/":"data/chatgpt/logs/dalle3.log",
|
|
"data/chatgpt/dalle2/":"data/chatgpt/logs/dalle2.log"
|
|
}
|
|
await self.upload_ftp_ai_images(ai_images_dict)
|
|
await self.meme_handler('tmp/meme/')
|
|
|
|
|
|
@tasks.loop(hours=1)
|
|
async def blog_loop(self):
|
|
try:
|
|
message = await self.generate_blog()
|
|
bot_stuff_channel = self.bot.get_channel(544408659174883328)
|
|
if message:
|
|
await bot_stuff_channel.send(message)
|
|
except:
|
|
self.logger.exception("Failed to generate blog")
|
|
|
|
@commands.command(
|
|
description="Moderate",
|
|
help="This currently tool works by replacing the filename on the ftp server with a black image. The description will remain the same and may need to be altered.",
|
|
brief="Moderation Tools"
|
|
)
|
|
async def moderate(self, ctx, filename):
|
|
await self.upload_sftp(f"{self.data_dir}blank_image.png", (os.getenv('ftp_public_html') + 'ai-images/'), filename)
|
|
output = "Image " + filename + " replaced"
|
|
await ctx.send(output)
|
|
|
|
async def setup(bot):
|
|
if os.getenv("upload_phixxy").lower() == "true":
|
|
asyncssh.set_log_level(30)
|
|
asyncssh.set_sftp_log_level(30)
|
|
await bot.add_cog(PhixxyCom(bot))
|
|
else:
|
|
pass |