246 lines
7.6 KiB
Python
246 lines
7.6 KiB
Python
![]() |
import discord
|
||
|
from discord.ext import commands, tasks
|
||
|
import json
|
||
|
import markovify
|
||
|
import nltk
|
||
|
from nltk.tokenize import word_tokenize
|
||
|
import random
|
||
|
import os
|
||
|
import time
|
||
|
import re
|
||
|
from dotenv import load_dotenv
|
||
|
load_dotenv()
|
||
|
# download NLTK data files
|
||
|
nltk.download('punkt')
|
||
|
MEMORY_FILE = "memory.json"
|
||
|
MEMORY_LOADED_FILE = "MEMORY_LOADED"
|
||
|
|
||
|
def load_memory():
|
||
|
data = []
|
||
|
|
||
|
# Try to load data from MEMORY_FILE
|
||
|
try:
|
||
|
with open(MEMORY_FILE, "r") as f:
|
||
|
data = json.load(f)
|
||
|
except FileNotFoundError:
|
||
|
pass
|
||
|
|
||
|
return data
|
||
|
|
||
|
# Save memory data to MEMORY_FILE
|
||
|
def save_memory(memory):
|
||
|
with open(MEMORY_FILE, "w") as f:
|
||
|
json.dump(memory, f, indent=4)
|
||
|
|
||
|
def train_markov_model(memory, additional_data=None):
|
||
|
if not memory:
|
||
|
return None
|
||
|
filtered_memory = [line for line in memory if isinstance(line, str)]
|
||
|
if additional_data:
|
||
|
filtered_memory.extend(line for line in additional_data if isinstance(line, str))
|
||
|
if not filtered_memory:
|
||
|
return None
|
||
|
text = "\n".join(filtered_memory)
|
||
|
model = markovify.NewlineText(text, state_size=2)
|
||
|
return model
|
||
|
|
||
|
#this doesnt work and im extremely pissed and mad
|
||
|
def append_mentions_to_18digit_integer(message):
|
||
|
pattern = r'\b\d{18}\b'
|
||
|
return re.sub(pattern, lambda match: f"<@{match.group(0)}>", message)
|
||
|
|
||
|
def preprocess_message(message):
|
||
|
message = append_mentions_to_18digit_integer(message)
|
||
|
tokens = word_tokenize(message)
|
||
|
tokens = [token for token in tokens if token.isalnum()]
|
||
|
return " ".join(tokens)
|
||
|
|
||
|
|
||
|
intents = discord.Intents.default()
|
||
|
intents.messages = True
|
||
|
intents.message_content = True
|
||
|
bot = commands.Bot(command_prefix="g!", intents=intents)
|
||
|
memory = load_memory()
|
||
|
markov_model = train_markov_model(memory)
|
||
|
|
||
|
generated_sentences = set()
|
||
|
used_words = set()
|
||
|
|
||
|
@bot.event
|
||
|
async def on_ready():
|
||
|
print(f"Logged in as {bot.user}")
|
||
|
post_message.start()
|
||
|
|
||
|
positive_keywords = ["happy", "good", "great", "amazing", "awesome", "joy", "love", "fantastic", "positive", "cheerful", "victory", "favorite", "lmao", "lol", "xd", "XD", "xD", "Xd"]
|
||
|
|
||
|
positive_gifs = [
|
||
|
"https://tenor.com/view/chill-guy-my-new-character-gif-2777893510283028272",
|
||
|
"https://tenor.com/view/goodnight-goodnight-friends-weezer-weezer-goodnight-gif-7322052181075806988"
|
||
|
]
|
||
|
|
||
|
def is_positive(sentence):
|
||
|
sentence_lower = sentence.lower()
|
||
|
return any(keyword in sentence_lower for keyword in positive_keywords)
|
||
|
|
||
|
@bot.command()
|
||
|
async def ask(ctx):
|
||
|
await ctx.send("Command undergoing fixes!")
|
||
|
#not really lol
|
||
|
|
||
|
@bot.command()
|
||
|
async def talk(ctx):
|
||
|
if markov_model:
|
||
|
response = None
|
||
|
for _ in range(10): # im going to shit my pants 10 times to get a coherent sentence
|
||
|
response = markov_model.make_sentence(tries=100)
|
||
|
if response and response not in generated_sentences:
|
||
|
# preprocess shit for grammer
|
||
|
response = improve_sentence_coherence(response)
|
||
|
generated_sentences.add(response)
|
||
|
break
|
||
|
|
||
|
if response:
|
||
|
async with ctx.typing():
|
||
|
cleaned_response = re.sub(r'[^\w\s]', '', response)
|
||
|
cleaned_response = cleaned_response.lower()
|
||
|
coherent_response = rephrase_for_coherence(cleaned_response)
|
||
|
if random.random() < 0.9:
|
||
|
if is_positive(coherent_response):
|
||
|
gif_url = random.choice(positive_gifs)
|
||
|
combined_message = f"{coherent_response}\n[jif]({gif_url})"
|
||
|
await ctx.send(combined_message)
|
||
|
else:
|
||
|
await ctx.send(coherent_response)
|
||
|
else:
|
||
|
await ctx.send(coherent_response)
|
||
|
else:
|
||
|
await ctx.send("I have nothing to say right now!")
|
||
|
else:
|
||
|
await ctx.send("I need to learn more from messages before I can talk.")
|
||
|
|
||
|
def improve_sentence_coherence(sentence):
|
||
|
|
||
|
sentence = sentence.replace(" i ", " I ")
|
||
|
return sentence
|
||
|
|
||
|
def rephrase_for_coherence(sentence):
|
||
|
|
||
|
words = sentence.split()
|
||
|
|
||
|
coherent_sentence = " ".join(words)
|
||
|
return coherent_sentence
|
||
|
|
||
|
bot.help_command = None
|
||
|
|
||
|
|
||
|
@bot.command()
|
||
|
async def help(ctx, *args):
|
||
|
|
||
|
if args:
|
||
|
command_name = args[0]
|
||
|
command = bot.get_command(command_name)
|
||
|
|
||
|
if command:
|
||
|
embed = discord.Embed(
|
||
|
title=f"Help: g!{command_name}",
|
||
|
description=f"**Description:** {command.help}",
|
||
|
color=discord.Color.blue()
|
||
|
)
|
||
|
await ctx.send(embed=embed)
|
||
|
else:
|
||
|
await ctx.send(f"Command `{command_name}` not found.")
|
||
|
else:
|
||
|
|
||
|
embed = discord.Embed(
|
||
|
title="Bot Help",
|
||
|
description="List of commands grouped by category.",
|
||
|
color=discord.Color.blue()
|
||
|
)
|
||
|
|
||
|
command_categories = {
|
||
|
"General": ["show_memory", "talk", "ask", "ping"],
|
||
|
"Debug": ["word_usage"]
|
||
|
}
|
||
|
|
||
|
for category, commands_list in command_categories.items():
|
||
|
commands_in_category = "\n".join([f"g!{command}" for command in commands_list])
|
||
|
embed.add_field(name=category, value=commands_in_category, inline=False)
|
||
|
|
||
|
await ctx.send(embed=embed)
|
||
|
|
||
|
@bot.event
|
||
|
async def on_message(message):
|
||
|
global memory, markov_model, last_random_talk_time
|
||
|
|
||
|
if message.author.bot:
|
||
|
return
|
||
|
|
||
|
|
||
|
if message.content.startswith(("g!talk", "g!show_memory", "g!help", "g!")):
|
||
|
await bot.process_commands(message)
|
||
|
return
|
||
|
|
||
|
if message.content:
|
||
|
formatted_message = append_mentions_to_18digit_integer(message.content)
|
||
|
cleaned_message = preprocess_message(formatted_message)
|
||
|
if cleaned_message:
|
||
|
memory.append(cleaned_message)
|
||
|
save_memory(memory)
|
||
|
markov_model = train_markov_model(memory)
|
||
|
|
||
|
# process any commands in the message
|
||
|
await bot.process_commands(message)
|
||
|
|
||
|
@bot.command()
|
||
|
async def ping(ctx):
|
||
|
await ctx.defer()
|
||
|
#stolen from my expect bot very proud
|
||
|
latency = round(bot.latency * 1000)
|
||
|
|
||
|
LOLembed = discord.Embed(
|
||
|
title="Pong!!",
|
||
|
description=(
|
||
|
f"The Beretta fires fast and won't make you feel any better!\n"
|
||
|
f"`Bot Latency: {latency}ms`\n"
|
||
|
),
|
||
|
color=discord.Color.blue()
|
||
|
)
|
||
|
LOLembed.set_footer(text=f"Requested by {ctx.author.name}", icon_url=ctx.author.avatar.url)
|
||
|
|
||
|
await ctx.send(embed=LOLembed) # use ctx.send instead of respond because it has nothing to respond to and its not a slash command
|
||
|
|
||
|
@bot.command()
|
||
|
async def show_memory(ctx):
|
||
|
memory = load_memory()
|
||
|
memory_text = json.dumps(memory, indent=4)
|
||
|
if len(memory_text) > 1024:
|
||
|
with open(MEMORY_FILE, "r") as f:
|
||
|
await ctx.send(" ", file=discord.File(f, MEMORY_FILE))
|
||
|
else:
|
||
|
embed = discord.Embed(title="Memory Contents", description="The bot's memory.", color=discord.Color.blue())
|
||
|
embed.add_field(name="Memory Data", value=f"```json\n{memory_text}\n```", inline=False)
|
||
|
await ctx.send(embed=embed)
|
||
|
|
||
|
def improve_sentence_coherence(sentence):
|
||
|
sentence = sentence.replace(" i ", " I ")
|
||
|
return sentence
|
||
|
|
||
|
@tasks.loop(minutes=60)
|
||
|
async def post_message():
|
||
|
channel_id = 1296141985253691433
|
||
|
channel = bot.get_channel(channel_id)
|
||
|
if channel and markov_model:
|
||
|
response = None
|
||
|
for _ in range(10):
|
||
|
response = markov_model.make_sentence(tries=100)
|
||
|
if response and response not in generated_sentences:
|
||
|
generated_sentences.add(response)
|
||
|
break
|
||
|
|
||
|
if response:
|
||
|
await channel.send(response)
|
||
|
|
||
|
# run the bot
|
||
|
TOKEN = os.getenv("DISCORDBOTTOKEN", "0")
|
||
|
bot.run(TOKEN)
|