724 lines
No EOL
32 KiB
Python
724 lines
No EOL
32 KiB
Python
import os
|
|
import time
|
|
import json
|
|
import logging
|
|
import random
|
|
import asyncio
|
|
import aiofiles
|
|
import aiohttp
|
|
import discord
|
|
from discord.ext import commands, tasks
|
|
import matplotlib.pyplot as plt
|
|
|
|
class ChatGPT(commands.Cog):
|
|
|
|
def __init__(self, bot):
|
|
self.bot = bot
|
|
self.bot_id = 1097302679836971038
|
|
self.admin_id = 242018983241318410
|
|
self.API_KEY = os.getenv("openai.api_key")
|
|
self.default_budget = 20
|
|
self.working_dir = "tmp/chatgpt/"
|
|
self.data_dir = "data/chatgpt/"
|
|
self.folder_setup()
|
|
self.remind_me_loop.start()
|
|
self.http_session = self.create_aiohttp_session()
|
|
self.logger = logging.getLogger("bot")
|
|
self.headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': f'Bearer {os.getenv("openai.api_key")}',
|
|
}
|
|
|
|
def create_aiohttp_session(self):
|
|
return aiohttp.ClientSession()
|
|
|
|
def folder_setup(self):
|
|
try:
|
|
folders = [
|
|
self.working_dir,
|
|
self.data_dir,
|
|
self.data_dir + "config",
|
|
self.data_dir + "logs",
|
|
self.data_dir + "dalle",
|
|
self.data_dir + "costs"
|
|
]
|
|
|
|
for folder in folders:
|
|
if not os.path.exists(folder):
|
|
os.mkdir(folder)
|
|
|
|
except Exception as e:
|
|
self.logger.exception(f"ChatGPT failed to make directories: {e}")
|
|
|
|
def text_cost_calc(self, model, input_tokens, output_tokens):
|
|
cost_table = {"gpt-4o-mini":{"input_tokens":0.00000015,"output_tokens":0.0000006},
|
|
"gpt-4-turbo-preview":{"input_tokens":0.00001,"output_tokens":0.00003},
|
|
"gpt-4-vision-preview":{"input_tokens":0.00001,"output_tokens":0.00003},
|
|
"gpt-4o":{"input_tokens":0.000005,"output_tokens":0.000015}
|
|
}
|
|
input_cost = cost_table[model]["input_tokens"] * input_tokens
|
|
output_cost = cost_table[model]["output_tokens"] * output_tokens
|
|
cost = input_cost + output_cost
|
|
return cost
|
|
|
|
def add_cost(self, category: str, cost: float):
|
|
day = time.strftime("%d")
|
|
month = time.strftime("%B")
|
|
year = time.strftime("%Y")
|
|
filepath = f"{self.data_dir}costs/{month}_{day}_{year}.json"
|
|
if not os.path.exists(filepath):
|
|
with open(filepath, "w") as f:
|
|
json.dump({},f)
|
|
with open(filepath, "r") as f:
|
|
costs_dict = json.loads(f.readline())
|
|
if category not in costs_dict:
|
|
costs_dict[category] = cost
|
|
else:
|
|
costs_dict[category] += cost
|
|
with open(filepath, "w") as f:
|
|
json.dump(costs_dict,f)
|
|
|
|
async def graph_cost(self, graph_title, costs_per_day):
|
|
plt.plot(costs_per_day.values())
|
|
plt.title(f"{graph_title} Costs")
|
|
plt.xlabel("Day")
|
|
plt.ylabel("Cost")
|
|
plt.savefig(f"{self.data_dir}costs/{graph_title}_costs.png")
|
|
plt.close()
|
|
return f"{self.data_dir}costs/{graph_title}_costs.png"
|
|
|
|
async def graph_category(self, graph_title, cost_per_category):
|
|
bar_container = plt.barh(list(cost_per_category.keys()), cost_per_category.values())
|
|
plt.title(f"{graph_title} Costs")
|
|
plt.xlabel("Category")
|
|
plt.ylabel("Cost")
|
|
plt.bar_label(bar_container)
|
|
plt.savefig(f"{self.data_dir}costs/{graph_title}_categories.png")
|
|
plt.close()
|
|
return f"{self.data_dir}costs/{graph_title}_categories.png"
|
|
|
|
async def get_monthly_cost(self, month=time.strftime("%B"), year=time.strftime("%Y")):
|
|
total_cost = 0
|
|
for x in range(1,32):
|
|
if x < 10:
|
|
x = f"0{x}"
|
|
filepath = f"{self.data_dir}costs/{month}_{x}_{year}.json"
|
|
if os.path.exists(filepath):
|
|
with open(filepath, "r") as f:
|
|
costs_dict = json.loads(f.readline())
|
|
for category, cost in costs_dict.items():
|
|
total_cost += cost
|
|
return total_cost
|
|
|
|
async def moderation_check(self, prompt):
|
|
data = {
|
|
"input": prompt
|
|
}
|
|
|
|
url = "https://api.openai.com/v1/moderations"
|
|
|
|
|
|
async with self.http_session.post(url, json=data, headers=self.headers) as resp:
|
|
response_data = await resp.json()
|
|
flagged = response_data['results'][0]['flagged']
|
|
categories = response_data['results'][0]['categories']
|
|
category_scores = response_data['results'][0]['category_scores']
|
|
return (flagged, categories, category_scores)
|
|
|
|
@commands.command(
|
|
name="costs",
|
|
brief="Get the costs for a given month and year.",
|
|
help="Get the costs for a given month and year. If no month or year is given, it will default to the current month and year.",
|
|
aliases=["cost"],
|
|
usage="costs [month] [year]",
|
|
)
|
|
async def costs(self, ctx, month=time.strftime("%B"), year=time.strftime("%Y")):
|
|
total_cost = 0
|
|
cost_per_day = {}
|
|
for x in range(1,32):
|
|
daily_cost = 0
|
|
if x < 10:
|
|
x = f"0{x}"
|
|
filepath = f"{self.data_dir}costs/{month}_{x}_{year}.json"
|
|
if os.path.exists(filepath):
|
|
with open(filepath, "r") as f:
|
|
costs_dict = json.loads(f.readline())
|
|
for category, cost in costs_dict.items():
|
|
total_cost += cost
|
|
daily_cost += cost
|
|
cost_per_day[x] = daily_cost
|
|
rounded_total = round(total_cost, 2)
|
|
if rounded_total == 0:
|
|
await ctx.send(f"No data for {month} {year}")
|
|
else:
|
|
graph_title = f"{month} {year}"
|
|
await ctx.send(f"Total cost for {month} {year}: ${rounded_total}", file=discord.File(await self.graph_cost(graph_title, cost_per_day)))
|
|
|
|
@commands.command(
|
|
name="category_costs",
|
|
brief="Get the costs per category for a given month and year.",
|
|
help="Get the costs per category for a given month and year. If no month or year is given, it will default to the current month and year.",
|
|
aliases=["category_cost"],
|
|
usage="category_costs [month] [year] [day]",
|
|
)
|
|
async def category_costs(self, ctx, month=time.strftime("%B"), year=time.strftime("%Y"), day=None):
|
|
total_cost = 0
|
|
cost_per_category = {}
|
|
if day == None:
|
|
day_range = range(1,32)
|
|
else:
|
|
day_range = [int(day)]
|
|
for x in day_range:
|
|
daily_cost = 0
|
|
if x < 10:
|
|
x = f"0{x}"
|
|
filepath = f"{self.data_dir}costs/{month}_{x}_{year}.json"
|
|
if os.path.exists(filepath):
|
|
with open(filepath, "r") as f:
|
|
costs_dict = json.loads(f.readline())
|
|
for category, cost in costs_dict.items():
|
|
if category not in cost_per_category:
|
|
cost_per_category[category] = cost
|
|
else:
|
|
cost_per_category[category] += cost
|
|
total_cost += cost
|
|
graph_title = f"{month} {year}"
|
|
rounded_total = round(total_cost, 2)
|
|
if rounded_total == 0:
|
|
await ctx.send(f"No data for {month} {year}")
|
|
else:
|
|
await ctx.send(f"Total cost for {month} {year}: ${rounded_total}", file=discord.File(await self.graph_category(graph_title, cost_per_category)))
|
|
|
|
def create_channel_config(self, filepath):
|
|
config_dict = {
|
|
"personality":"average",
|
|
"channel_topic":"casual",
|
|
"chat_enabled":False,
|
|
"chat_history_len":5,
|
|
"react_to_msgs":False,
|
|
"log_images":False,
|
|
"llama_enabled":False
|
|
}
|
|
|
|
with open(filepath,"w") as f:
|
|
json.dump(config_dict,f)
|
|
self.logger.debug("Wrote ChatGPT config variables to file.")
|
|
|
|
async def get_channel_config(self, channel_id):
|
|
filepath = f"{self.data_dir}config/{channel_id}.json"
|
|
if not os.path.exists(filepath):
|
|
self.create_channel_config(filepath)
|
|
with open(filepath, "r") as f:
|
|
config_dict = json.loads(f.readline())
|
|
return config_dict
|
|
|
|
def edit_channel_config(self, channel_id, key, value):
|
|
config_file = f"{self.data_dir}config/{channel_id}.json"
|
|
with open(config_file, 'r') as f:
|
|
config_data = json.load(f)
|
|
config_data[key] = value
|
|
with open(config_file, "w") as f:
|
|
json.dump(config_data, f)
|
|
|
|
def read_db(self,filepath):
|
|
with open(filepath,"r") as fileobj:
|
|
db_content = json.load(fileobj)
|
|
return db_content
|
|
|
|
def save_to_db(self,filepath,db_content):
|
|
with open(filepath,"w") as fileobj:
|
|
json.dump(db_content,fileobj,indent=4)
|
|
|
|
async def remind(self,reminder_dict): #THIS IS THE FUNCTION TO AUTOMATICALLY FULFILL THE RESPONSE WHEN CALLED BY THE REMIND ME LOOP
|
|
#this is what the reminder_dict looks like: data[target_time] = {"user_id":user_id,"response":response}
|
|
user = self.bot.get_user(reminder_dict["user_id"])
|
|
return await user.send(reminder_dict["response"])
|
|
|
|
async def answer_question(self, topic, model="gpt-4o-mini"):
|
|
data = {
|
|
"model": model,
|
|
"messages": [{"role": "user", "content": topic}]
|
|
}
|
|
|
|
url = "https://api.openai.com/v1/chat/completions"
|
|
|
|
try:
|
|
async with self.http_session.post(url, json=data, headers=self.headers) as resp:
|
|
response_data = await resp.json()
|
|
if resp.status != 200:
|
|
self.logger.error(f"Error occurred in answer_question: {response_data}")
|
|
return "Error occurred in answer_question"
|
|
response = response_data['choices'][0]['message']['content']
|
|
input_tokens = response_data['usage']['prompt_tokens']
|
|
output_tokens = response_data['usage']['completion_tokens']
|
|
cost = self.text_cost_calc(model, input_tokens, output_tokens)
|
|
self.add_cost(model, cost)
|
|
return response
|
|
|
|
except Exception as error:
|
|
self.logger.exception("Error occurred in answer_question")
|
|
return "Error occurred in answer_question"
|
|
|
|
@commands.command()
|
|
async def translate(self, ctx):
|
|
if ctx.message.attachments:
|
|
attachment = ctx.message.attachments[0] # assuming only one attachment
|
|
await attachment.save(self.working_dir + '/' + attachment.filename)
|
|
|
|
with open(self.working_dir + '/' + attachment.filename, 'r') as file:
|
|
text = file.read()
|
|
question = f"Translate the following text to english: {text}"
|
|
# Now text contains the content of the downloaded file
|
|
translated_text = await self.answer_question(question)
|
|
# Save the translated text to a new file
|
|
with open(f'{self.working_dir}/translated_text.txt', 'w') as new_file:
|
|
new_file.write(translated_text)
|
|
|
|
# Send the text file as an attachment
|
|
file = discord.File(f'{self.working_dir}/translated_text.txt')
|
|
await ctx.send(file=file)
|
|
|
|
|
|
@commands.command(
|
|
description="Personality",
|
|
help="Set the personality of the bot. Usage: !personality (personality)",
|
|
brief="Set the personality"
|
|
)
|
|
async def personality(self, ctx, *personality_type):
|
|
if personality_type:
|
|
personality_type = ' '.join(personality_type)
|
|
self.edit_channel_config(ctx.channel.id, "personality", personality_type)
|
|
await ctx.send(f"Personality changed to {personality_type}")
|
|
else:
|
|
channel_config = await self.get_channel_config(ctx.channel.id)
|
|
current_personality = channel_config["personality"]
|
|
await ctx.send(f"Current personality is {current_personality}")
|
|
|
|
|
|
@commands.command(
|
|
description="Topic",
|
|
help="Set the channel topic for the bot. Usage: !topic (topic)",
|
|
brief="Set channel topic"
|
|
)
|
|
async def topic(self, ctx, *channel_topic):
|
|
if channel_topic:
|
|
channel_topic = ' '.join(channel_topic)
|
|
self.edit_channel_config(ctx.channel.id, "channel_topic", channel_topic)
|
|
await ctx.send(f"Topic changed to {channel_topic}")
|
|
else:
|
|
channel_config = await self.get_channel_config(ctx.channel.id)
|
|
current_topic = channel_config["channel_topic"]
|
|
await ctx.send(f"Current topic is {current_topic}")
|
|
|
|
|
|
@commands.command(
|
|
description="Chat",
|
|
help="Enable or disable bot chat in this channel. Usage !chat (enable|disable)",
|
|
brief="Enable or disable bot chat"
|
|
)
|
|
async def chat(self, ctx, message):
|
|
if "enable" in message:
|
|
self.edit_channel_config(ctx.channel.id, "chat_enabled", True)
|
|
await ctx.send("Chat Enabled")
|
|
elif "disable" in message:
|
|
self.edit_channel_config(ctx.channel.id, "chat_enabled", False)
|
|
await ctx.send("Chat Disabled")
|
|
else:
|
|
await ctx.send("Usage: !chat (enable|disable)")
|
|
|
|
@commands.command(
|
|
description="Log Images",
|
|
help="Enable or disable logging images in this channel. Usage !log_images (enable|disable)",
|
|
brief="Enable or disable bot logging images"
|
|
)
|
|
async def log_images(self, ctx, message):
|
|
if "enable" in message:
|
|
self.edit_channel_config(ctx.channel.id, "log_images", True)
|
|
await ctx.send("Image Viewing Enabled")
|
|
elif "disable" in message:
|
|
self.edit_channel_config(ctx.channel.id, "log_images", False)
|
|
await ctx.send("Image Viewing Disabled")
|
|
else:
|
|
await ctx.send("Usage: !log_images (enable|disable)")
|
|
|
|
@commands.command(
|
|
description="Reactions",
|
|
help="Enable or disable bot reactions in this channel. Usage !reactions (enable|disable)",
|
|
brief="Enable or disable bot reactions"
|
|
)
|
|
async def reactions(self, ctx, message):
|
|
if "enable" in message:
|
|
self.edit_channel_config(ctx.channel.id, "react_to_msgs", True)
|
|
await ctx.send("Reactions Enabled")
|
|
elif "disable" in message:
|
|
self.edit_channel_config(ctx.channel.id, "react_to_msgs", False)
|
|
await ctx.send("Reactions Disabled")
|
|
else:
|
|
await ctx.send("Usage: !reactions (enable|disable)")
|
|
|
|
@commands.command(
|
|
description="Question",
|
|
help="Ask a raw chatgpt question. Usage: !question (question)",
|
|
brief="Get an answer"
|
|
)
|
|
async def question(self, ctx):
|
|
await ctx.send("One moment, let me think...")
|
|
question = ctx.message.content.split(" ", maxsplit=1)[1]
|
|
answer = await self.answer_question(question)
|
|
chunks = [answer[i:i+1999] for i in range(0, len(answer), 1999)]
|
|
for chunk in chunks:
|
|
await ctx.send(chunk)
|
|
|
|
async def dalle_api_call(self, prompt: str, model: str="dall-e-2", quality: str="standard", size: str="1024x1024") -> tuple:
|
|
if self.dalle_budget <= await self.get_monthly_cost():
|
|
self.logger.info("DALL-E API call failed due to budget")
|
|
return (1337, "DALL-E API call failed due to budget. Consider using !donate to fund the bot.")
|
|
data = {
|
|
"model": model,
|
|
"prompt": prompt,
|
|
"size": size,
|
|
"quality": quality,
|
|
"n":1,
|
|
}
|
|
url = "https://api.openai.com/v1/images/generations"
|
|
async with self.http_session.post(url, json=data, headers=self.headers) as resp:
|
|
response_data = await resp.json()
|
|
if resp.status == 200:
|
|
response = response_data['data'][0]['url']
|
|
else:
|
|
response = response_data["error"]["message"]
|
|
self.logger.info(f"Error occurred in dalle: {resp.status} | {response}")
|
|
return (resp.status, response)
|
|
|
|
async def download_image(self, url, destination) -> int:
|
|
async with self.http_session.get(url) as resp:
|
|
if resp.status == 200:
|
|
f = await aiofiles.open(destination, mode='wb')
|
|
await f.write(await resp.read())
|
|
await f.close()
|
|
return resp.status
|
|
|
|
async def send_moderation_message(self, command, user_id, username, prompt, categories, category_scores):
|
|
categories = [k for k, v in categories.items() if v]
|
|
category_scores = {k: v for k, v in category_scores.items() if v > 0.5}
|
|
embed = discord.Embed(title="Moderation", description=f"Command: {command}\nUsername: {username}\nUser ID: {user_id}\nPrompt: {prompt}\nCategories: {categories}\nCategory Scores: {category_scores}", color=0x00ff00)
|
|
embed.set_footer(text="Moderation")
|
|
user = self.bot.get_user(self.admin_id)
|
|
await user.send(embed=embed)
|
|
|
|
async def generate_dalle_image(self, ctx, model, quality="standard", size="1024x1024") -> None:
|
|
prompt = ctx.message.content.split(" ", maxsplit=1)[1]
|
|
await ctx.send(f"Please be patient this may take some time! Generating: {prompt}.")
|
|
flagged, categories, category_scores = await self.moderation_check(prompt)
|
|
if flagged:
|
|
self.logger.info(f"Prompt {prompt} was flagged for inappropriate content.")
|
|
await ctx.send(f"This prompt {prompt} was flagged for inappropriate content. This has been reported.")
|
|
await self.send_moderation_message("dalle", ctx.author.id, ctx.author.name, prompt, categories, category_scores)
|
|
return
|
|
resp_status, resp = await self.dalle_api_call(prompt, model=model, quality=quality, size=size)
|
|
if resp_status != 200:
|
|
await ctx.send(f"Error generating image: {resp_status}: {resp}")
|
|
return
|
|
my_filename = str(time.time_ns()) + ".png"
|
|
image_filepath = f"{self.data_dir}dalle/{my_filename}"
|
|
await self.download_image(resp, image_filepath)
|
|
with open(image_filepath, "rb") as fh:
|
|
f = discord.File(fh, filename=image_filepath)
|
|
prompt = prompt.replace('\n',' ')
|
|
log_data = f'Author: {ctx.author.name}, Prompt: {prompt}, Filename: {my_filename}\n'
|
|
with open(f"{self.data_dir}logs/dalle3.log", 'a') as log_filepath:
|
|
log_filepath.writelines(log_data)
|
|
await ctx.send(f'Generated by: {ctx.author.name}\nPrompt: {prompt}', file=f)
|
|
|
|
@commands.command(
|
|
description="Big Spenders",
|
|
help="Generate a list of the biggest spenders. Usage: !bigspenders",
|
|
brief="Generate list of big spenders"
|
|
)
|
|
async def bigspenders(self, ctx):
|
|
filenames = os.listdir(self.data_dir + "logs/")
|
|
user_cost_dict = {}
|
|
for filename in filenames:
|
|
if ".log" in filename:
|
|
with open(f"{self.data_dir}logs/{filename}", 'r', encoding="utf-8") as f:
|
|
for line in f:
|
|
try:
|
|
if "!dalle3hd" in line:
|
|
cost = 0.08
|
|
username = line[0:line.index(':')]
|
|
if " " in username:
|
|
break
|
|
if username not in user_cost_dict:
|
|
user_cost_dict[username] = 0
|
|
user_cost_dict[username] += cost
|
|
elif "!dalle2" in line:
|
|
cost = 0.02
|
|
username = line[0:line.index(':')]
|
|
if " " in username:
|
|
break
|
|
if username not in user_cost_dict:
|
|
user_cost_dict[username] = 0
|
|
user_cost_dict[username] += cost
|
|
if "!dalle" in line or "!dalle3" in line:
|
|
cost = 0.04
|
|
username = line[0:line.index(':')]
|
|
if " " in username:
|
|
break
|
|
if username not in user_cost_dict:
|
|
user_cost_dict[username] = 0
|
|
user_cost_dict[username] += cost
|
|
else:
|
|
pass
|
|
except:
|
|
pass
|
|
message = "Big Spenders:\n"
|
|
sorted_dictionary = sorted(user_cost_dict.items(), key=lambda x: x[1], reverse=True)
|
|
for user in sorted_dictionary:
|
|
message += f"{user[0]}: ${user[1]:.2f}\n"
|
|
await ctx.send(message)
|
|
|
|
|
|
@commands.command(
|
|
description="Dalle 2",
|
|
help="Generate an image with Dalle 2 Usage: !dalle2 (prompt)",
|
|
brief="Generate Image"
|
|
)
|
|
async def dalle2(self, ctx):
|
|
self.add_cost("dalle2", 0.02)
|
|
await self.generate_dalle_image(ctx, model="dall-e-2")
|
|
|
|
|
|
@commands.command(
|
|
description="Dalle 3",
|
|
help="Generate an image with Dalle 3 Usage: !dalle3 (prompt)",
|
|
brief="Generate Image",
|
|
aliases = ['dalle']
|
|
)
|
|
async def dalle3(self, ctx):
|
|
self.add_cost("dalle3", 0.04)
|
|
await self.generate_dalle_image(ctx, model="dall-e-3")
|
|
|
|
|
|
@commands.command(
|
|
description="Dalle 3 HD",
|
|
help="Generate an HD image with Dalle 3 Usage: !dalle3 (prompt)",
|
|
brief="Generate HD Image",
|
|
)
|
|
async def dalle3hd(self, ctx):
|
|
self.add_cost("dalle3hd", 0.08)
|
|
await self.generate_dalle_image(ctx, model="dall-e-3", quality="hd", size="1792x1024")
|
|
|
|
@commands.command(
|
|
description="Looker",
|
|
help="Ask GPT4 a question about an image. Usage: !looker (link) (question)",
|
|
brief="Get an answer"
|
|
)
|
|
async def looker(self, ctx):
|
|
if len(ctx.message.attachments) > 0:
|
|
image_link = ctx.message.attachments[0].url
|
|
question = ctx.message.content
|
|
else:
|
|
image_link = ctx.message.content.split(" ", maxsplit=2)[1]
|
|
question = ctx.message.content.split(" ", maxsplit=2)[2]
|
|
|
|
data = {
|
|
"model": "gpt-4o",
|
|
"messages": [{"role": "user", "content": [{"type": "text", "text": question},{"type": "image_url","image_url": {"url": image_link}}]}],
|
|
"max_tokens": 500
|
|
}
|
|
|
|
url = "https://api.openai.com/v1/chat/completions"
|
|
|
|
try:
|
|
async with self.http_session.post(url, json=data, headers=self.headers) as resp:
|
|
response_data = await resp.json()
|
|
self.logger.debug(response_data)
|
|
answer = response_data['choices'][0]['message']['content']
|
|
|
|
|
|
except Exception as error:
|
|
self.logger.exception("error occurred in looker")
|
|
|
|
chunks = [answer[i:i+1999] for i in range(0, len(answer), 1999)]
|
|
for chunk in chunks:
|
|
await ctx.send(chunk)
|
|
|
|
@commands.command(
|
|
description="Remind Me",
|
|
help="Send a request in natural language to ask Sparky to remind you of a task or event by direct message in a specified amount of time. Minimum 1 minute.",
|
|
brief="Get a reminder",
|
|
aliases=["remindme","remind_me","remind"]
|
|
)
|
|
async def save_reminder(self,ctx):
|
|
#SETUP
|
|
reminders_path = self.data_dir + "reminders.txt"
|
|
if not os.path.exists(reminders_path):
|
|
with open(reminders_path,"w") as file_obj:
|
|
file_obj.write("{}")
|
|
prompt = ctx.message.content.split(" ", maxsplit=1)[1]
|
|
reminder_data = self.read_db(reminders_path)
|
|
current_time = int(time.time_ns())
|
|
user_id = ctx.author.id
|
|
|
|
#PARSE PROMPT
|
|
duration_s = await self.answer_question(f"You are an automated bot whose only function is to convert a natural language number into an integer. You must determine a number representing after how long, in seconds, the user wishes you to remind them of a task based on the information provided. Please respond using only an integer of the equivalent or approximate time in seconds. You must not use any words in your response other than a single integer representing that time in seconds. You are incapable of using any English words at all. If you are unable to determine a time from the prompt given, return only the integer 0. The prompt is as follows: {prompt}")
|
|
response = await self.answer_question(f"You are a reminder bot whose purpose is to help users by reminding them of tasks or events. A user by the name of {ctx.author.name} has asked you to remind them about something after a certain amount of time has passed. That time has now passed. Their original request was as follows: {prompt}")
|
|
if duration_s == "0":
|
|
await ctx.reply("Sorry! I'm not sure exactly when you need me to remind you based on your wording. Could you phrase the request a bit differently?",mention_author=True)
|
|
return
|
|
else:
|
|
await ctx.reply("Sure thing! I'll remind you when the time comes.", mention_author=False)
|
|
#MATHS
|
|
duration_ns = int(duration_s) * 1000000000
|
|
target_time = current_time + int(duration_ns)
|
|
|
|
#CREATE FILEDUMP
|
|
reminder_data[target_time] = {"user_id":user_id,"response":response}
|
|
|
|
self.logger.info(f"Reminding user {ctx.author.id} in {duration_s} seconds || Target time (ns): {target_time}")
|
|
self.save_to_db(reminders_path,reminder_data)
|
|
|
|
@tasks.loop(seconds=60) # THIS ONE NEEDS TO POP AND THEN CALL THE REMIND FUNC
|
|
async def remind_me_loop(self):
|
|
reminders_path = self.data_dir + "reminders.txt"
|
|
current_time = int(time.time_ns())
|
|
data = self.read_db(reminders_path)
|
|
trash = []
|
|
|
|
#CHECK IF ANY NEED TO BE FULFILLED
|
|
for remind_time in data.keys():
|
|
if current_time >= int(remind_time):
|
|
reminder_dict = data[remind_time]
|
|
sent = await self.remind(reminder_dict) #THIS SENDS THE REMINDER DICT TO REMIND FUNC
|
|
if sent:
|
|
self.logger.info(f"Reminder sent successfully to {reminder_dict['user_id']}")
|
|
trash.append(remind_time) #NEED TO POP OR THEY WILL GET REMINDED AD INFINITUM
|
|
|
|
for key in trash:
|
|
if data.pop(key):
|
|
self.logger.debug("Fulfilled reminders successfully purged")
|
|
|
|
self.save_to_db(reminders_path,data)
|
|
|
|
async def log_chat_and_get_history(self, message, logfile, channel_vars):
|
|
log_line = ''
|
|
if message.attachments and channel_vars.get("log_images", False) and not message.author.bot:
|
|
#log_image MUST BE ADDED TO THE JSON FILES
|
|
for attachment in message.attachments:
|
|
image_description = await self.view_image(attachment.url)
|
|
image_description = image_description.replace("\n"," ")
|
|
log_line += attachment.url + " <image description>" + image_description + "</image description> "
|
|
log_line += message.content
|
|
log_line = message.author.name + ": " + log_line +"\n"
|
|
chat_history = ""
|
|
self.logger.debug("Logging: " + log_line, end="")
|
|
with open(logfile, "a", encoding="utf-8") as f:
|
|
f.write(log_line)
|
|
with open(logfile, "r", encoding="utf-8") as f:
|
|
for line in (f.readlines() [-int(channel_vars["chat_history_len"]):]):
|
|
chat_history += line
|
|
return chat_history
|
|
|
|
async def react_to_msg(self, ctx, react):
|
|
#todo ctx is actually a message, make this obv
|
|
def is_emoji(string):
|
|
if len(string) == 1:
|
|
# Range of Unicode codepoints for emojis
|
|
if 0x1F300 <= ord(string) <= 0x1F6FF:
|
|
return True
|
|
return False
|
|
|
|
if react:
|
|
if not random.randint(0,10) and ctx.author.id != 1097302679836971038:
|
|
#todo above line is do not react to self, make this work programatically
|
|
system_msg = "Send only an emoji as a discord reaction to the following chat message"
|
|
message = ctx.content[0]
|
|
|
|
data = {
|
|
"model": "gpt-4o-mini",
|
|
"messages": [{"role": "system", "content": system_msg}, {"role": "user", "content": message}]
|
|
}
|
|
|
|
url = "https://api.openai.com/v1/chat/completions"
|
|
|
|
try:
|
|
async with self.http_session.post(url, json=data, headers=self.headers) as resp:
|
|
response_data = await resp.json()
|
|
reaction = response_data['choices'][0]['message']['content'].strip()
|
|
if is_emoji(reaction):
|
|
await ctx.add_reaction(reaction)
|
|
else:
|
|
await ctx.add_reaction("😓")
|
|
except Exception as error:
|
|
self.logger.exception("Some error happened while trying to react to a message")
|
|
|
|
async def view_image(self, image_link):
|
|
if self.dalle_budget <= await self.get_monthly_cost():
|
|
self.logger.info("View_image failed due to budget")
|
|
return (1337, "View_image API call failed due to budget. Consider using !donate to fund the bot.")
|
|
model = "gpt-4o"
|
|
data = {
|
|
"model": model,
|
|
"messages": [{"role": "user", "content": [{"type": "text", "text": "Describe this"},{"type": "image_url","image_url": {"url": image_link}}]}],
|
|
"max_tokens": 500
|
|
}
|
|
|
|
url = "https://api.openai.com/v1/chat/completions"
|
|
|
|
try:
|
|
async with self.http_session.post(url, json=data, headers=self.headers) as resp:
|
|
response_data = await resp.json()
|
|
self.logger.debug(response_data)
|
|
answer = response_data['choices'][0]['message']['content']
|
|
input_tokens = response_data['usage']['prompt_tokens']
|
|
output_tokens = response_data['usage']['completion_tokens']
|
|
cost = self.text_cost_calc(model, input_tokens, output_tokens)
|
|
self.add_cost(model, cost)
|
|
|
|
except Exception as error:
|
|
self.logger.exception("error occurred in view_image")
|
|
|
|
return answer
|
|
|
|
async def chat_response(self, message, channel_vars, chat_history_string):
|
|
async with message.channel.typing():
|
|
await asyncio.sleep(1)
|
|
prompt = f"You are a {channel_vars['personality']} chat bot named Sparkytron 3000 created by @phixxy.com. Your personality should be {channel_vars['personality']}. You are currently in a {channel_vars['channel_topic']} chatroom. The message history is: {chat_history_string}\nSparkytron 3000: "
|
|
response = await self.answer_question(prompt)
|
|
if "Sparkytron 3000:" in response[0:17]:
|
|
response = response.replace("Sparkytron 3000:", "")
|
|
max_len = 1999
|
|
if len(response) > max_len:
|
|
messages=[response[y-max_len:y] for y in range(max_len, len(response)+max_len,max_len)]
|
|
else:
|
|
messages=[response]
|
|
for response_message in messages:
|
|
await message.channel.send(response_message)
|
|
|
|
|
|
@commands.Cog.listener()
|
|
async def on_reaction_add(self, reaction, user):
|
|
if not random.randint(0,9):
|
|
message = reaction.message
|
|
emoji = reaction.emoji
|
|
await message.add_reaction(emoji)
|
|
|
|
@commands.Cog.listener()
|
|
async def on_message(self, message: discord.Message):
|
|
# Log Chat
|
|
# Todo, make a logging cog to handle this stuff later
|
|
logfile = f"{self.data_dir}logs/{message.channel.id}.log"
|
|
channel_vars = await self.get_channel_config(message.channel.id)
|
|
chat_history_string = await self.log_chat_and_get_history(message, logfile, channel_vars)
|
|
# Emoji Reaction
|
|
await self.react_to_msg(message, channel_vars["react_to_msgs"])
|
|
# Chat Response
|
|
if channel_vars["chat_enabled"] and not message.author.bot or self.bot_id in [x.id for x in message.mentions]:
|
|
if message.content and message.content[0] != "!":
|
|
await self.chat_response(message, channel_vars, chat_history_string)
|
|
elif not message.content:
|
|
await self.chat_response(message, channel_vars, chat_history_string)
|
|
|
|
|
|
async def setup(bot):
|
|
await bot.add_cog(ChatGPT(bot)) |