2024-01-20 01:33:35 -08:00
import os
import io
import base64
2024-02-14 00:00:58 -08:00
import logging
2024-01-20 01:33:35 -08:00
import time
2024-01-21 02:49:52 -08:00
import html
2024-02-13 01:54:36 -08:00
import aiohttp
2024-01-20 01:33:35 -08:00
import asyncssh
from PIL import Image , PngImagePlugin
from discord . ext import commands , tasks
class PhixxyCom ( commands . Cog ) :
def __init__ ( self , bot ) :
self . bot = bot
self . SERVER = os . getenv ( ' ftp_server ' )
self . USERNAME = os . getenv ( ' ftp_username ' )
self . PASSWORD = os . getenv ( ' ftp_password ' )
self . working_dir = " tmp/phixxy.com/ "
self . data_dir = " data/phixxy.com/ "
self . folder_setup ( )
2024-01-20 21:55:19 -08:00
self . stable_diffusion_log = " data/stable_diffusion/stable_diffusion.log "
2024-02-14 00:00:58 -08:00
self . logger = logging . getLogger ( " bot " )
2024-01-20 01:33:35 -08:00
self . phixxy_loop . start ( )
2024-01-25 00:19:10 -08:00
self . blog_loop . start ( )
2024-02-13 01:54:36 -08:00
self . http_session = self . create_aiohttp_session ( )
def create_aiohttp_session ( self ) :
return aiohttp . ClientSession ( )
2024-01-20 01:33:35 -08:00
def folder_setup ( self ) :
try :
if not os . path . exists ( self . working_dir ) :
os . mkdir ( self . working_dir )
if not os . path . exists ( self . data_dir ) :
os . mkdir ( self . data_dir )
except :
2024-02-14 00:00:58 -08:00
self . logger . exception ( " PhixxyCom failed to make directories " )
2024-01-20 01:33:35 -08:00
2024-01-20 22:00:45 -08:00
def find_prompt_from_filename ( self , sd_log , filename ) :
2024-01-20 21:55:19 -08:00
with open ( sd_log , ' r ' ) as f :
lines = f . readlines ( )
for line in reversed ( lines ) :
if filename in line :
2024-02-06 01:39:32 -08:00
try :
prompt = line [ line . index ( " Prompt: " ) + 7 : line . index ( " Filename: " ) ]
prompt = ' ' . join ( prompt . rsplit ( ' , ' , 1 ) ) # Remove the last comma
return html . escape ( prompt )
except :
2024-02-14 00:00:58 -08:00
self . logger . exception ( " PhixxyCom failed to find prompt from filename " )
2024-02-06 01:39:32 -08:00
return " Unknown Prompt "
2024-01-20 21:55:19 -08:00
return " Unknown Prompt "
2024-01-20 01:33:35 -08:00
async def upload_sftp ( self , local_filename , server_folder , server_filename ) :
2024-02-11 14:27:50 -08:00
remotepath = server_folder + server_filename
async with asyncssh . connect ( self . SERVER , username = self . USERNAME , password = self . PASSWORD ) as conn :
async with conn . start_sftp_client ( ) as sftp :
await sftp . put ( local_filename , remotepath = remotepath )
2024-01-20 01:33:35 -08:00
async def delete_local_pngs ( self , local_folder ) :
for filename in os . listdir ( local_folder ) :
if " .png " in filename :
os . remove ( local_folder + filename )
async def delete_ftp_pngs ( self , server_folder ) :
async with asyncssh . connect ( os . getenv ( ' ftp_server ' ) , username = os . getenv ( ' ftp_username ' ) , password = os . getenv ( ' ftp_password ' ) ) as conn :
async with conn . start_sftp_client ( ) as sftp :
for filename in ( await sftp . listdir ( server_folder ) ) :
if ' .png ' in filename :
try :
2024-02-14 00:00:58 -08:00
self . logger . debug ( " Deleting " , filename )
2024-01-20 01:33:35 -08:00
await sftp . remove ( server_folder + filename )
except :
2024-02-14 00:00:58 -08:00
self . logger . exception ( " Couldn ' t delete " , filename )
2024-01-20 01:33:35 -08:00
async def extract_image_tags ( self , code ) :
count = code . count ( " <img " )
tags = [ ]
for x in range ( 0 , count ) :
index1 = code . find ( " <img " )
index2 = code [ index1 : ] . find ( " > " ) + index1 + 1
img_tag = code [ index1 : index2 ]
tags . append ( img_tag )
code = code [ index2 : ]
return tags
async def extract_image_alt_text ( self , tags ) :
alt_texts = [ ]
for tag in tags :
index1 = tag . find ( " alt " ) + 5
index2 = tag [ index1 : ] . find ( " \" " ) + index1
alt_text = tag [ index1 : index2 ]
alt_texts . append ( alt_text )
return alt_texts
async def generate_images ( self , local_folder , image_list ) :
url = os . getenv ( ' stablediffusion_url ' )
if url == " disabled " :
return
file_list = [ ]
for image in image_list :
filename = image . replace ( " " , " " ) . lower ( ) + " .png "
payload = { " prompt " : image , " steps " : 25 }
2024-02-13 01:54:36 -08:00
response = await self . http_session . post ( url = f ' { url } /sdapi/v1/txt2img ' , json = payload )
2024-01-20 01:33:35 -08:00
r = await response . json ( )
for i in r [ ' images ' ] :
image = Image . open ( io . BytesIO ( base64 . b64decode ( i . split ( " , " , 1 ) [ 0 ] ) ) )
png_payload = { " image " : " data:image/png;base64, " + i }
2024-02-15 23:15:11 -08:00
response2 = await self . http_session . post ( url = f ' { url } /sdapi/v1/png-info ' , json = png_payload )
2024-01-20 01:33:35 -08:00
pnginfo = PngImagePlugin . PngInfo ( )
json_response = await response2 . json ( )
pnginfo . add_text ( " parameters " , json_response . get ( " info " ) )
image . save ( local_folder + filename , pnginfo = pnginfo )
file_list . append ( filename )
return file_list
async def add_image_filenames ( self , code , file_list ) :
for filename in file_list :
code = code . replace ( " src= \" \" " , " src= \" " + filename + " \" " , 1 )
return code
async def upload_html_and_imgs ( self , local_folder ) :
for filename in os . listdir ( local_folder ) :
if " .png " in filename :
await self . upload_sftp ( local_folder + filename , ( os . getenv ( ' ftp_public_html ' ) + ' ai-webpage/ ' ) , filename )
#explicitly upload html files last!
for filename in os . listdir ( local_folder ) :
if " .html " in filename :
await self . upload_sftp ( local_folder + filename , ( os . getenv ( ' ftp_public_html ' ) + ' ai-webpage/ ' ) , filename )
async def delete_derp_files ( self , server_folder ) :
async with asyncssh . connect ( self . SERVER , username = self . USERNAME , password = self . PASSWORD ) as conn :
async with conn . start_sftp_client ( ) as sftp :
for filename in ( await sftp . listdir ( server_folder ) ) :
if filename == ' . ' or filename == ' .. ' or filename == ' style.css ' or filename == ' myScript.js ' :
pass
else :
try :
2024-02-14 00:00:58 -08:00
self . logger . debug ( " Deleting " , filename )
2024-01-20 01:33:35 -08:00
await sftp . remove ( server_folder + filename )
except :
2024-02-14 00:00:58 -08:00
self . logger . exception ( " Couldn ' t delete " , filename )
2024-01-20 01:33:35 -08:00
async def meme_handler ( self , folder ) :
2024-01-27 21:56:45 -08:00
for f in os . listdir ( folder ) :
filepath = folder + f
2024-01-20 01:33:35 -08:00
await self . update_meme_webpage ( filepath )
async def update_meme_webpage ( self , filename ) :
server_folder = ( os . getenv ( ' ftp_public_html ' ) + ' ai-memes/ ' )
new_file_name = str ( time . time_ns ( ) ) + " .png "
await self . upload_sftp ( filename , server_folder , new_file_name )
2024-02-14 00:00:58 -08:00
self . logger . debug ( f " Uploaded { new_file_name } " )
2024-01-21 14:01:54 -08:00
with open ( f " { self . data_dir } ai-memes/index.html " , ' r ' ) as f :
2024-01-20 01:33:35 -08:00
html_data = f . read ( )
html_insert = ' <!--ADD IMG HERE--> \n <img src= " ' + new_file_name + ' " loading= " lazy " > '
html_data = html_data . replace ( ' <!--ADD IMG HERE--> ' , html_insert )
2024-01-21 14:01:54 -08:00
with open ( f " { self . data_dir } ai-memes/index.html " , " w " ) as f :
2024-01-20 01:33:35 -08:00
f . writelines ( html_data )
2024-01-21 14:01:54 -08:00
await self . upload_sftp ( f " { self . data_dir } ai-memes/index.html " , server_folder , " index.html " )
2024-01-20 01:33:35 -08:00
os . rename ( filename , ' tmp/ ' + new_file_name )
2024-01-27 21:01:24 -08:00
async def upload_ftp_ai_images ( self , ai_dict ) :
2024-01-27 21:06:01 -08:00
try :
2024-02-11 14:27:50 -08:00
for folder in ai_dict :
for filename in os . listdir ( folder ) :
if filename [ - 4 : ] == ' .png ' :
filepath = folder + filename
2024-02-14 00:00:58 -08:00
self . logger . info ( f " Found file = { filename } " )
2024-02-11 14:27:50 -08:00
prompt = self . find_prompt_from_filename ( ai_dict [ folder ] , filename )
2024-02-14 00:00:58 -08:00
self . logger . info ( f " Found prompt = { prompt } " )
2024-02-11 14:27:50 -08:00
html_file = f " { self . data_dir } ai-images/index.html "
html_insert = ''' <!--REPLACE THIS COMMENT-->
< div >
< img src = " <!--filename--> " loading = " lazy " >
< p class = " image-description " > < ! - - description - - > < / p >
< / div > '''
server_folder = ( os . getenv ( ' ftp_public_html ' ) + ' ai-images/ ' )
new_filename = str ( time . time_ns ( ) ) + " .png "
await self . upload_sftp ( filepath , server_folder , new_filename )
2024-02-14 00:00:58 -08:00
self . logger . info ( f " Uploaded { new_filename } " )
2024-02-11 14:27:50 -08:00
with open ( html_file , ' r ' ) as f :
html_data = f . read ( )
html_insert = html_insert . replace ( " <!--filename--> " , new_filename )
html_insert = html_insert . replace ( " <!--description--> " , prompt )
html_data = html_data . replace ( " <!--REPLACE THIS COMMENT--> " , html_insert )
with open ( html_file , " w " ) as f :
f . writelines ( html_data )
await self . upload_sftp ( html_file , server_folder , " index.html " )
os . rename ( filepath , f " tmp/ { new_filename } " )
2024-01-27 21:06:01 -08:00
except :
2024-02-14 00:00:58 -08:00
self . logger . exception ( " Something went wrong in upload_ftp_ai_images " )
2024-01-20 01:33:35 -08:00
2024-01-20 01:51:44 -08:00
async def answer_question ( self , topic , model = " gpt-3.5-turbo " ) :
headers = {
' Content-Type ' : ' application/json ' ,
' Authorization ' : f ' Bearer { os . getenv ( " openai.api_key " ) } ' ,
}
data = {
" model " : model ,
" messages " : [ { " role " : " user " , " content " : topic } ]
}
url = " https://api.openai.com/v1/chat/completions "
try :
2024-02-15 23:15:11 -08:00
async with self . http_session . post ( url , headers = headers , json = data ) as resp :
2024-01-20 01:51:44 -08:00
response_data = await resp . json ( )
response = response_data [ ' choices ' ] [ 0 ] [ ' message ' ] [ ' content ' ]
return response
2024-02-15 23:13:48 -08:00
except :
self . logger . exception ( " Error in answer_question " )
2024-01-20 01:53:22 -08:00
return None
2024-01-27 16:49:50 -08:00
@commands.command (
description = " Blog " ,
2024-02-15 23:04:09 -08:00
help = " Adds your topic to the list of possible future blog topics. Usage: !blog (topic) " ,
2024-01-27 16:49:50 -08:00
brief = " Suggest a blog topic "
)
async def blog ( self , ctx , * args ) :
message = ' ' . join ( args )
if ' \n ' in message :
await ctx . send ( " Send only one topic at a time. " )
return
else :
blogpost_file = f " { self . data_dir } blog_topics.txt "
with open ( blogpost_file , ' a ' ) as f :
f . writelines ( message + ' \n ' )
await ctx . send ( " Saved suggestion! " )
2024-01-20 01:51:44 -08:00
2024-02-03 04:39:33 -08:00
def get_last_5_messages ( self ) :
with open ( f " data/chatgpt/logs/346102473993355267.log " , ' r ' ) as f :
lines = f . readlines ( )
last_5_messages = " "
for i in range ( 5 , 1 , - 1 ) :
last_5_messages + = lines [ - i ]
return last_5_messages
2024-04-20 21:03:41 -07:00
async def generate_blog ( self , force = False ) :
2024-02-11 14:27:50 -08:00
start_time = time . time ( )
topic = ' '
2024-04-20 21:03:41 -07:00
#filename = f"{self.data_dir}ai-blog/index.html"
#filename format year-month-day ie: 2021-01-01.md
filename = f " { self . data_dir } ai-blog/ { time . strftime ( ' % Y- % m- %d ' ) } .md "
if os . path . exists ( filename ) and not force :
2024-02-11 14:27:50 -08:00
return
2024-04-20 21:09:28 -07:00
date = time . strftime ( " % B %d , % Y " )
2024-02-11 14:27:50 -08:00
blogpost_file = f " { self . data_dir } blog_topics.txt "
if os . path . isfile ( blogpost_file ) :
with open ( blogpost_file , ' r ' ) as f :
blogpost_topics = f . read ( )
f . seek ( 0 )
topic = f . readline ( )
blogpost_topics = blogpost_topics . replace ( topic , ' ' )
with open ( blogpost_file , ' w ' ) as f :
f . write ( blogpost_topics )
2024-02-15 23:13:48 -08:00
if topic == ' ' :
2024-02-11 14:27:50 -08:00
messages = self . get_last_5_messages ( )
question = f " you have a blog and you are inspired based on this short text chat interaction: \n { messages } \n what will the topic of your next blog be? just tell me the topic and a one sentence description "
2024-02-14 00:00:58 -08:00
self . logger . info ( " No topic given for blogpost, generating one. " )
2024-02-11 14:27:50 -08:00
topic = await self . answer_question ( question )
2024-02-15 23:13:48 -08:00
self . logger . info ( " Writing blogpost " )
2024-02-11 14:27:50 -08:00
title_prompt = ' generate an absurd essay title about ' + topic
title = await self . answer_question ( title_prompt , model = " gpt-3.5-turbo " )
prompt = ' Write a satirical essay with a serious tone titled: " ' + title + ' " . Do not label parts of the essay. '
content = await self . answer_question ( prompt , model = " gpt-4-turbo-preview " )
if title in content [ : len ( title ) ] :
content = content . replace ( title , ' ' , 1 )
2024-04-20 21:03:41 -07:00
with open ( filename , ' w ' ) as f :
f . write ( f " # { title } \n \n * { date } * \n \n { content } " )
await self . upload_sftp ( filename , ( os . getenv ( ' ftp_public_html ' ) + ' ai-blog/ ' ) , filename )
2024-02-11 14:27:50 -08:00
run_time = time . time ( ) - start_time
2024-02-14 00:00:58 -08:00
self . logger . debug ( " It took " + str ( run_time ) + " seconds to generate the blog post! " )
2024-03-20 00:11:49 -07:00
output = f " Blog Updated! ( { run_time } seconds) { title } https://ai.phixxy.com/ai-blog "
2024-02-11 14:27:50 -08:00
return output
2024-01-25 02:06:54 -08:00
@commands.command ( )
async def force_blog ( self , ctx ) :
2024-01-25 02:07:35 -08:00
await ctx . send ( " Forcing blog generation " )
2024-04-20 21:03:41 -07:00
await self . generate_blog ( force = True )
2024-01-20 01:33:35 -08:00
@commands.command (
description = " Website " ,
help = " Generates a website using gpt 3.5. Usage: !website (topic) " ,
brief = " Generate a website "
)
async def website ( self , ctx ) :
server_folder = os . getenv ( ' ftp_public_html ' ) + ' ai-webpage/ '
local_folder = f " { self . working_dir } /webpage/ "
working_file = local_folder + " index.html "
if not os . path . exists ( local_folder ) :
os . mkdir ( local_folder )
try :
await ctx . send ( " Please wait, this will take a long time! You will be able to view the website here: https://ai.phixxy.com/ai-webpage/ " )
with open ( working_file , " w " ) as f :
f . write ( " <!DOCTYPE html><html><head><script>setTimeout(function() { location.reload();}, 10000);</script><title>Generating Website</title><style>body { font-size: 24px;text-align: center;margin-top: 100px;}</style></head><body><p>This webpage is currently being generated. The page will refresh once it is complete. Please be patient.</p></body></html> " )
await self . upload_sftp ( working_file , server_folder , " index.html " )
topic = ctx . message . content . split ( " " , maxsplit = 1 ) [ 1 ]
prompt = " Generate a webpage using html and inline css. The webpage topic should be " + topic + " . Feel free to add image tags with alt text. Leave the image source blank. The images will be added later. "
code = await self . answer_question ( prompt )
await self . delete_local_pngs ( local_folder )
await self . delete_ftp_pngs ( server_folder )
tags = await self . extract_image_tags ( code )
alt_texts = await self . extract_image_alt_text ( tags )
file_list = await self . generate_images ( local_folder , alt_texts )
code = await self . add_image_filenames ( code , file_list )
with open ( working_file , ' w ' ) as f :
f . write ( code )
f . close ( )
await self . upload_html_and_imgs ( local_folder )
await ctx . send ( " Finished https://ai.phixxy.com/ai-webpage/ " )
except Exception as error :
2024-01-20 01:50:29 -08:00
#await ctx.send("Failed, Try again.")
2024-02-14 00:00:58 -08:00
self . logger . exception ( " Website Error " )
2024-01-20 01:33:35 -08:00
2024-01-25 00:13:16 -08:00
@tasks.loop ( seconds = 60 )
2024-01-20 01:33:35 -08:00
async def phixxy_loop ( self ) :
2024-01-27 21:01:24 -08:00
ai_images_dict = {
# Folder Path : Log Path
" tmp/stable_diffusion/sfw/ " : self . stable_diffusion_log ,
" data/chatgpt/dalle/ " : " data/chatgpt/logs/dalle3.log " ,
" data/chatgpt/dalle2/ " : " data/chatgpt/logs/dalle2.log "
}
2024-01-27 21:06:01 -08:00
await self . upload_ftp_ai_images ( ai_images_dict )
2024-01-27 21:56:45 -08:00
await self . meme_handler ( ' tmp/meme/ ' )
2024-01-25 00:13:16 -08:00
@tasks.loop ( hours = 1 )
async def blog_loop ( self ) :
try :
message = await self . generate_blog ( )
2024-01-20 01:33:35 -08:00
bot_stuff_channel = self . bot . get_channel ( 544408659174883328 )
2024-01-25 02:17:44 -08:00
if message :
await bot_stuff_channel . send ( message )
2024-02-15 23:04:09 -08:00
except :
2024-02-14 00:00:58 -08:00
self . logger . exception ( " Failed to generate blog " )
2024-01-20 01:33:35 -08:00
2024-01-21 14:32:28 -08:00
@commands.command (
description = " Moderate " ,
help = " This currently tool works by replacing the filename on the ftp server with a black image. The description will remain the same and may need to be altered. " ,
brief = " Moderation Tools "
)
async def moderate ( self , ctx , filename ) :
await self . upload_sftp ( f " { self . data_dir } blank_image.png " , ( os . getenv ( ' ftp_public_html ' ) + ' ai-images/ ' ) , filename )
output = " Image " + filename + " replaced "
await ctx . send ( output )
2024-01-20 01:33:35 -08:00
async def setup ( bot ) :
2024-02-15 23:04:09 -08:00
if os . getenv ( " upload_phixxy " ) . lower ( ) == " true " :
2024-02-11 14:27:50 -08:00
asyncssh . set_log_level ( 30 )
asyncssh . set_sftp_log_level ( 30 )
await bot . add_cog ( PhixxyCom ( bot ) )
else :
pass