2024-01-19 17:26:51 -08:00
# Extension for sparkytron 3000
# This extension enables the ability to generate AI artwork using the AUTOMATIC1111 API
2024-01-17 18:27:17 -08:00
import io
import base64
import os
import time
2024-01-17 18:49:45 -08:00
import random
2024-01-17 18:27:17 -08:00
from PIL import Image , PngImagePlugin
import discord
2024-01-19 17:26:51 -08:00
from discord . ext import commands
2024-01-17 18:27:17 -08:00
2024-01-17 18:49:45 -08:00
2024-01-19 17:26:51 -08:00
class StableDiffusion ( commands . Cog ) :
2024-01-17 18:49:45 -08:00
2024-01-19 17:26:51 -08:00
def __init__ ( self , bot ) :
self . bot = bot
2024-01-19 18:38:24 -08:00
self . stable_diffusion_url = os . getenv ( " stablediffusion_url " ) # Change this to stable_diffusion_url
2024-01-19 17:26:51 -08:00
self . working_dir = " tmp/stable_diffusion/ "
2024-01-20 00:34:54 -08:00
self . data_dir = " data/stable_diffusion/ "
2024-02-11 15:00:23 -08:00
self . default_neg_prompt = " easynegative, badhandv4, verybadimagenegative_v1.3 "
2024-01-19 18:38:24 -08:00
self . folder_setup ( )
def folder_setup ( self ) :
try :
if not os . path . exists ( self . working_dir ) :
os . mkdir ( self . working_dir )
2024-01-19 18:49:07 -08:00
os . mkdir ( f " { self . working_dir } sfw " )
os . mkdir ( f " { self . working_dir } nsfw " )
2024-01-20 00:34:54 -08:00
if not os . path . exists ( self . data_dir ) :
os . mkdir ( self . data_dir )
2024-01-19 18:38:24 -08:00
except :
2024-01-25 02:42:23 -08:00
self . bot . logger . exception ( " StableDiffusion failed to make directories " )
2024-01-17 18:49:45 -08:00
2024-01-19 17:26:51 -08:00
async def answer_question ( self , topic , model = " gpt-3.5-turbo " ) : # Only needed for draw command
headers = {
' Content-Type ' : ' application/json ' ,
' Authorization ' : f ' Bearer { os . getenv ( " openai.api_key " ) } ' ,
}
data = {
" model " : model ,
" messages " : [ { " role " : " user " , " content " : topic } ]
}
url = " https://api.openai.com/v1/chat/completions "
try :
async with self . bot . http_session . post ( url , headers = headers , json = data ) as resp :
response_data = await resp . json ( )
response = response_data [ ' choices ' ] [ 0 ] [ ' message ' ] [ ' content ' ]
return response
except Exception as error :
2024-01-24 23:54:17 -08:00
return " Error in answer question in stable_diffusion "
2024-01-19 17:26:51 -08:00
2024-01-19 20:51:40 -08:00
def get_kv_from_ctx ( self , ctx ) :
2024-01-20 00:24:36 -08:00
try :
prompt = ctx . message . content . split ( " " , maxsplit = 1 ) [ 1 ]
kv_strings = list ( filter ( lambda x : ' = ' in x , prompt . split ( ' ' ) ) )
key_value_pairs = dict ( map ( lambda a : a . replace ( ' , ' , ' ' ) . split ( ' = ' ) , kv_strings ) )
return key_value_pairs
except :
return None
2024-01-19 20:51:40 -08:00
def get_prompt_from_ctx ( self , ctx ) :
2024-01-20 00:24:36 -08:00
try :
prompt = ctx . message . content . split ( " " , maxsplit = 1 ) [ 1 ]
prompt = ' ' . join ( list ( filter ( lambda x : ' = ' not in x , prompt . split ( ' ' ) ) ) )
return prompt
except :
return None
2024-01-19 20:51:40 -08:00
2024-01-19 18:38:24 -08:00
async def my_open_img_file ( self , path ) :
2024-01-19 17:26:51 -08:00
img = Image . open ( path )
encoded = " "
with io . BytesIO ( ) as output :
img . save ( output , format = " PNG " )
contents = output . getvalue ( )
encoded = str ( base64 . b64encode ( contents ) , encoding = ' utf-8 ' )
img . close ( )
return encoded
async def look_at ( self , ctx , look = False ) :
metadata = " "
if look :
2024-01-19 18:38:24 -08:00
url = self . stable_diffusion_url
2024-01-19 17:26:51 -08:00
if url == " disabled " :
return
for attachment in ctx . attachments :
if attachment . url . endswith ( ( ' .jpg ' , ' .png ' ) ) :
2024-01-25 02:42:23 -08:00
self . bot . logger . debug ( " image seen " )
2024-01-19 18:38:24 -08:00
async with self . bot . http_session . get ( attachment . url ) as response :
imageName = self . working_dir + str ( time . time_ns ( ) ) + ' .png '
2024-01-19 17:26:51 -08:00
with open ( imageName , ' wb ' ) as out_file :
2024-01-25 02:42:23 -08:00
self . bot . logger . debug ( ' Saving image: ' + imageName )
2024-01-19 17:26:51 -08:00
while True :
chunk = await response . content . read ( 1024 )
if not chunk :
break
out_file . write ( chunk )
2024-01-21 13:46:25 -08:00
img_link = await self . my_open_img_file ( imageName )
2024-01-19 17:26:51 -08:00
try :
payload = { " image " : img_link }
2024-01-19 18:38:24 -08:00
async with self . bot . http_session . post ( f ' { url } /sdapi/v1/interrogate ' , json = payload ) as response :
2024-01-19 17:26:51 -08:00
data = await response . json ( )
description = data . get ( " caption " )
description = description . split ( ' , ' ) [ 0 ]
metadata + = f " <image: { description } > \n "
2024-01-19 20:51:40 -08:00
except self . bot . aiohttp . ClientError as error :
2024-01-25 02:42:23 -08:00
self . bot . logger . exception ( " ERROR: CLIP may not be running. Could not look at image. " )
2024-01-19 17:26:51 -08:00
return " ERROR: CLIP may not be running. Could not look at image. "
return metadata
2024-01-20 00:24:36 -08:00
async def generate_prompt ( self ) :
choice1 = " Give me 11 keywords I can use to generate art using AI. They should all be related to one piece of art. Please only respond with the keywords and no other text. Be sure to use keywords that really describe what the art portrays. Keywords should be comma separated with no other text! "
choice2 = " Describe a creative scene, use only one sentence "
choice3 = " Give me comma seperated keywords describing an imaginary piece of art. Only return the keywords and no other text. "
choice4 = " Describe a unique character and an environment in one sentence "
choice5 = " Describe a nonhuman character and an environment in one sentence "
prompt = random . choice ( [ choice1 , choice2 , choice3 , choice4 , choice5 ] )
prompt = await self . answer_question ( prompt )
if random . randint ( 0 , 9 ) :
prompt = prompt . replace ( " abstract, " , " " )
prompt = prompt . replace ( " AI, " , " " )
if " . " in prompt :
prompt = prompt . replace ( " . " , " , " )
prompt = prompt + " masterpiece, studio quality "
else :
prompt = prompt + " , masterpiece, studio quality "
return prompt
2024-01-19 17:26:51 -08:00
@commands.command (
description = " Change Model " ,
help = " Choose from a list of stable diffusion models. " ,
brief = " Change stable diffusion model "
)
2024-01-19 18:38:24 -08:00
async def change_model ( self , ctx , model_choice = ' 0 ' ) : # Needs to be a configurable list of models
2024-01-19 17:26:51 -08:00
model_choices = {
' 1 ' : ( " deliberate_v2.safetensors [9aba26abdf] " , " DeliberateV2 " ) ,
' 2 ' : ( " flat2DAnimerge_v30.safetensors [5dd56bfa12] " , " Flat2D " ) ,
' 3 ' : ( " Anything-V3.0.ckpt [8712e20a5d] " , " AnythingV3 " ) ,
' 4 ' : ( " aZovyaPhotoreal_v2.safetensors [dde3b17c05] " , " PhotorealV2 " ) ,
' 5 ' : ( " Pixel_Art_V1_PublicPrompts.ckpt [0f02127697] " , " Pixel Art " ) ,
' 6 ' : ( " mistoonAnime_v20.safetensors [c35e1054c0] " , " Mistoon AnimeV2 " )
}
2024-01-19 18:38:24 -08:00
url = self . stable_diffusion_url
2024-01-19 17:26:51 -08:00
if url == " disabled " :
await ctx . send ( " This command is currently disabled " )
2024-01-19 18:38:24 -08:00
else :
async with self . bot . http_session . get ( url = f ' { url } /sdapi/v1/options ' ) as response :
config_json = await response . json ( )
current_model = config_json [ " sd_model_checkpoint " ]
output = ' Current Model: ' + current_model + ' \n '
if model_choice in model_choices :
model_id , model_name = model_choices [ model_choice ]
if current_model != model_id :
payload = { " sd_model_checkpoint " : model_id }
async with self . bot . http_session . post ( url = f ' { url } /sdapi/v1/options ' , json = payload ) as response :
output = " Changed model to: " + model_name
await ctx . send ( output )
return
else :
await ctx . send ( f " Already set to use { model_name } " )
2024-01-19 17:26:51 -08:00
return
else :
2024-01-19 18:38:24 -08:00
output = ' \n ' . join ( [ f " { choice } : { name } " for choice , name in model_choices . items ( ) ] )
await ctx . send ( output )
2024-01-17 18:27:17 -08:00
2024-01-19 17:26:51 -08:00
@commands.command (
description = " Lora " ,
help = " List the stable diffusion loras. " ,
brief = " List the stable diffusion loras "
)
async def lora ( self , ctx ) :
lora_choices = {
' 0 ' : ( " Lora Name " , " Trigger Words " ) ,
' 1 ' : ( " <lora:rebecca:1> " , " rebecca (cyberpunk) " ) ,
' 2 ' : ( " <lora:lucy:1> " , " lucy (cyberpunk) " ) ,
' 3 ' : ( " <lora:dirty:1> " , " dirty " ) ,
' 4 ' : ( " <lora:starcraft:1> " , " c0nst3llation " )
}
output = " "
lora_options = ' \n ' . join ( [ f " { choice } : { name } " for choice , name in lora_choices . items ( ) ] )
output + = lora_options
await ctx . send ( output )
2024-02-11 22:33:50 -08:00
async def get_image_from_ctx ( self , ctx ) :
if ctx . message . attachments :
file_url = ctx . message . attachments [ 0 ] . url
return file_url
2024-01-19 18:38:24 -08:00
try :
2024-02-11 22:33:50 -08:00
file_url = ctx . message . content . split ( " " , maxsplit = 1 ) [ 1 ]
return file_url
2024-01-19 18:38:24 -08:00
except :
2024-02-11 22:33:50 -08:00
self . bot . logger . info ( " Couldn ' t find image. " )
return None
async def txt2img ( self , ctx , prompt ) :
url = f " { self . stable_diffusion_url } /sdapi/v1/txt2img "
key_value_pairs = self . get_kv_from_ctx ( ctx )
headers = { ' Content-Type ' : ' application/json ' }
2024-01-19 17:26:51 -08:00
payload = {
" prompt " : prompt ,
" steps " : 25 ,
2024-02-11 22:33:50 -08:00
" negative_prompt " : self . get_negative_prompt ( )
2024-01-19 17:26:51 -08:00
}
2024-02-11 22:33:50 -08:00
if key_value_pairs :
payload . update ( key_value_pairs )
try :
async with self . bot . http_session . post ( url , headers = headers , json = payload ) as resp :
if resp . status != 200 :
await ctx . send ( f " { resp . status } { resp . reason } " )
self . bot . logger . exception ( f " { resp . status } { resp . reason } " )
return
r = await resp . json ( )
except ConnectionRefusedError :
await ctx . send ( " Failed to connect to image generation service " )
self . bot . logger . exception ( " Failed to connect to image generation service " )
return
except :
await ctx . send ( " Failed to generate image " )
self . bot . logger . exception ( " Failed to generate image " )
return
await self . send_generated_image ( ctx , r [ ' images ' ] , prompt )
async def save_image ( self , url ) :
async with self . bot . http_session . get ( url ) as response :
image_name = self . working_dir + str ( time . time_ns ( ) ) + " .png "
with open ( image_name , ' wb ' ) as out_file :
self . bot . logger . debug ( f " Saving image: { image_name } " )
while True :
chunk = await response . content . read ( 1024 )
if not chunk :
break
out_file . write ( chunk )
return image_name
async def img2img ( self , ctx , prompt ) :
url = f " { self . stable_diffusion_url } /sdapi/v1/img2img "
file_url = await self . get_image_from_ctx ( ctx )
image_name = await self . save_image ( file_url )
file_url = await self . my_open_img_file ( image_name )
key_value_pairs = self . get_kv_from_ctx ( ctx )
headers = { ' Content-Type ' : ' application/json ' }
payload = {
" init_images " : [ file_url ] ,
" prompt " : prompt ,
" steps " : 40 ,
" negative_prompt " : self . get_negative_prompt ( ) ,
" denoising_strength " : 0.5 ,
2024-01-19 17:26:51 -08:00
}
2024-01-20 00:24:36 -08:00
if key_value_pairs :
payload . update ( key_value_pairs )
2024-01-17 18:27:17 -08:00
try :
2024-01-19 18:38:24 -08:00
async with self . bot . http_session . post ( url , headers = headers , json = payload ) as resp :
2024-02-11 22:33:50 -08:00
if resp . status != 200 :
await ctx . send ( f " { resp . status } { resp . reason } " )
self . bot . logger . error ( f " { resp . status } { resp . reason } " )
return
2024-01-19 17:26:51 -08:00
r = await resp . json ( )
2024-02-11 15:00:23 -08:00
except ConnectionRefusedError :
2024-02-11 22:33:50 -08:00
await ctx . send ( " Failed to connect to image generation service " )
2024-02-11 15:00:23 -08:00
self . bot . logger . exception ( " Failed to connect to image generation service " )
return
except :
await ctx . send ( " Failed to generate image " )
self . bot . logger . exception ( " Failed to generate image " )
2024-02-11 14:34:28 -08:00
return
2024-02-11 22:33:50 -08:00
await self . send_generated_image ( ctx , r [ ' images ' ] , prompt )
async def send_generated_image ( self , ctx , images , prompt ) :
for i in images :
2024-01-19 17:26:51 -08:00
image = Image . open ( io . BytesIO ( base64 . b64decode ( i . split ( " , " , 1 ) [ 0 ] ) ) )
try :
if ctx . channel . is_nsfw ( ) :
2024-01-19 18:49:07 -08:00
folder = self . working_dir + " nsfw/ "
2024-01-19 17:26:51 -08:00
else :
2024-01-20 01:45:49 -08:00
folder = self . working_dir + " sfw/ "
2024-01-19 17:26:51 -08:00
except :
2024-01-20 00:24:36 -08:00
folder = self . working_dir
2024-01-20 21:55:19 -08:00
my_filename = str ( time . time_ns ( ) ) + " .png "
filepath = folder + my_filename
2024-02-11 15:00:23 -08:00
image . save ( filepath )
2024-02-11 22:33:50 -08:00
2024-01-20 21:55:19 -08:00
with open ( filepath , " rb " ) as fh :
f = discord . File ( fh , filename = filepath )
2024-02-11 22:33:50 -08:00
2024-02-06 01:52:03 -08:00
prompt = prompt . replace ( ' \n ' , ' ' )
log_data = f ' Author: { ctx . author . name } , Prompt: { prompt } , Filename: { my_filename } \n '
2024-01-20 00:34:54 -08:00
with open ( f " { self . data_dir } stable_diffusion.log " , ' a ' ) as log_file :
2024-01-19 17:26:51 -08:00
log_file . writelines ( log_data )
2024-01-17 23:54:47 -08:00
2024-01-19 17:26:51 -08:00
await ctx . send ( f ' Generated by: { ctx . author . name } \n Prompt: { prompt } ' , file = f )
2024-02-11 22:33:50 -08:00
def get_negative_prompt ( self ) :
try :
neg_prompt_file = f " { self . data_dir } negative_prompt.txt "
with open ( neg_prompt_file , ' r ' ) as f :
negative_prompt = f . readline ( )
except :
neg_prompt_file = f " { self . data_dir } negative_prompt.txt "
with open ( neg_prompt_file , ' w ' ) as f :
f . writelines ( self . default_neg_prompt )
negative_prompt = self . default_neg_prompt
return negative_prompt
@commands.command (
description = " Imagine " ,
help = " Generate an image using stable diffusion. You can add keyword arguments to your prompt and they will be treated as stable diffusion options. Usage !imagine (topic) " ,
brief = " Generate an image "
)
async def imagine ( self , ctx ) :
url = self . stable_diffusion_url
if url == " disabled " :
await ctx . send ( " Command is currently disabled " )
return
prompt = self . get_prompt_from_ctx ( ctx )
if prompt == None :
prompt = await self . generate_prompt ( )
await ctx . send ( f " Please be patient this may take some time! Generating: { prompt } . " )
await self . txt2img ( ctx , prompt )
2024-01-17 18:27:17 -08:00
2024-01-19 17:26:51 -08:00
@commands.command (
description = " Describe " ,
help = " Get better understanding of what the bot \" sees \" when you post an image! (Runs it through CLIP) Usage !describe (image link) " ,
brief = " Describe image "
)
async def describe ( self , ctx ) :
2024-01-19 18:38:24 -08:00
url = self . stable_diffusion_url
2024-01-19 17:26:51 -08:00
if url == " disabled " :
await ctx . send ( " Command is currently disabled " )
return
2024-01-17 18:27:17 -08:00
else :
2024-01-19 17:26:51 -08:00
url = f " { url } /sdapi/v1/interrogate "
2024-02-11 22:33:50 -08:00
file_url = await self . get_image_from_ctx ( ctx )
image_name = await self . save_image ( file_url )
img_link = await self . my_open_img_file ( image_name )
2024-01-19 17:26:51 -08:00
try :
payload = { " image " : img_link }
2024-01-19 18:38:24 -08:00
async with self . bot . http_session . post ( url , json = payload ) as response :
2024-01-19 17:26:51 -08:00
r = await response . json ( )
await ctx . send ( r . get ( " caption " ) )
2024-02-11 15:00:23 -08:00
except :
2024-01-25 02:42:23 -08:00
self . bot . logger . exception ( " error in describe " )
2024-01-19 17:26:51 -08:00
await ctx . send ( " My image generation service may not be running. " )
2024-02-11 22:33:50 -08:00
2024-01-19 17:26:51 -08:00
@commands.command (
description = " Reimagine " ,
help = " Reimagine an image as something else. One example is reimagining a picture as anime. This command can be hard to use. \n Usage: !reimagine (image link) (topic) \n Example: !reimagine (image link) anime " ,
brief = " Reimagine an image "
)
async def reimagine ( self , ctx ) :
2024-01-19 18:38:24 -08:00
url = self . stable_diffusion_url
2024-01-19 17:26:51 -08:00
if url == " disabled " :
await ctx . send ( " Command is currently disabled " )
return
2024-01-20 00:24:36 -08:00
prompt = self . get_prompt_from_ctx ( ctx )
2024-01-21 13:57:21 -08:00
if not prompt :
prompt = " "
2024-02-11 22:33:50 -08:00
await ctx . send ( f " Please be patient this may take some time! Generating: { prompt } . " )
await self . img2img ( ctx , prompt )
2024-01-19 17:26:51 -08:00
@commands.command (
description = " Negative Prompt " ,
help = " Changes the negative prompt for imagine across all channels " ,
brief = " Change the negative prompt for imagine "
)
async def negative_prompt ( self , ctx , * args ) :
message = ' ' . join ( args )
if not message :
2024-02-11 15:00:23 -08:00
message = self . default_neg_prompt
2024-01-20 00:34:54 -08:00
neg_prompt_file = f " { self . data_dir } negative_prompt.txt "
2024-01-19 17:26:51 -08:00
with open ( neg_prompt_file , ' w ' ) as f :
f . writelines ( message )
await ctx . send ( " Changed negative prompt to " + message )
2024-01-17 18:27:17 -08:00
async def setup ( bot ) :
2024-02-04 00:21:35 -08:00
await bot . add_cog ( StableDiffusion ( bot ) )
2024-01-19 18:38:24 -08:00