From 92dbc06b2620b2cb0f3512cb0bcafe0c7614efc9 Mon Sep 17 00:00:00 2001 From: ctih1 Date: Wed, 23 Jul 2025 14:51:57 +0300 Subject: [PATCH] cleaned up bot.py seperating it into different files, added formatting ignore rules for key_compiler, modified prestart checks --- assets/cogs/internal/base_commands.py | 174 +++++++++++++++ assets/cogs/{ => internal}/cogmanager.py | 0 assets/cogs/internal/markov.py | 119 +++++++++++ bot.py | 261 +---------------------- modules/globalvars.py | 2 +- modules/key_compiler.py | 8 - modules/markovmemory.py | 5 +- modules/prestartchecks.py | 5 + settings/settings.json | 1 - 9 files changed, 311 insertions(+), 264 deletions(-) create mode 100644 assets/cogs/internal/base_commands.py rename assets/cogs/{ => internal}/cogmanager.py (100%) create mode 100644 assets/cogs/internal/markov.py diff --git a/assets/cogs/internal/base_commands.py b/assets/cogs/internal/base_commands.py new file mode 100644 index 0000000..ad6980d --- /dev/null +++ b/assets/cogs/internal/base_commands.py @@ -0,0 +1,174 @@ +import os +from typing import Dict, List +import discord +from discord.ext import commands +import discord.ext +import discord.ext.commands +import modules.keys as k +from modules.permission import requires_admin +from modules.sentenceprocessing import send_message +from modules.settings import Settings as SettingsManager +import requests + +settings_manager = SettingsManager() +settings = settings_manager.settings + + +class BaseCommands(commands.Cog): + def __init__(self, bot): + self.bot: discord.ext.commands.Bot = bot + + @commands.command() + async def help(self, ctx: commands.Context) -> None: + embed: discord.Embed = discord.Embed( + title=f"{k.command_help_embed_title()}", + description=f"{k.command_help_embed_desc()}", + color=discord.Colour(0x000000), + ) + + command_categories = { + f"{k.command_help_categories_general()}": [ + "mem", + "talk", + "about", + "ping", + "impact", + "demotivator", + "help", + ], + f"{k.command_help_categories_admin()}": ["stats", "retrain", "setlanguage"], + } + + custom_commands: List[str] = [] + for cog_name, cog in self.bot.cogs.items(): + for command in cog.get_commands(): + if ( + command.name + not in command_categories[f"{k.command_help_categories_general()}"] + and command.name + not in command_categories[f"{k.command_help_categories_admin()}"] + ): + custom_commands.append(command.name) + + if custom_commands: + embed.add_field( + name=f"{k.command_help_categories_custom()}", + value="\n".join( + [ + f"{settings["bot"]["prefix"]}{command}" + for command in custom_commands + ] + ), + inline=False, + ) + + for category, commands_list in command_categories.items(): + commands_in_category: str = "\n".join( + [f"{settings["bot"]["prefix"]}{command}" for command in commands_list] + ) + embed.add_field(name=category, value=commands_in_category, inline=False) + + await send_message(ctx, embed=embed) + + @requires_admin() + @commands.command() + async def setlanguage(self, ctx: commands.Context, locale: str) -> None: + await ctx.defer() + k.change_language(locale) + await ctx.send(":thumbsup:") + + @commands.command() + async def ping(self, ctx: commands.Context) -> None: + await ctx.defer() + latency: int = round(self.bot.latency * 1000) + + embed: discord.Embed = discord.Embed( + title="Pong!!", + description=( + settings["bot"]["misc"]["ping_line"], + f"`{k.command_ping_embed_desc()}: {latency}ms`\n", + ), + color=discord.Colour(0x000000), + ) + embed.set_footer( + text=f"{k.command_ping_footer()} {ctx.author.name}", + icon_url=ctx.author.display_avatar.url, + ) + + await ctx.send(embed=embed) + + @commands.command() + async def about(self, ctx: commands.Context) -> None: + embed: discord.Embed = discord.Embed( + title=f"{k.command_about_embed_title()}", + description="", + color=discord.Colour(0x000000), + ) + + embed.add_field( + name=k.command_about_embed_field1(), + value=f"{settings["name"]}", + inline=False, + ) + + embed.add_field(name="Github", value=f"https://github.com/gooberinc/goober") + await send_message(ctx, embed=embed) + + @commands.command() + async def stats(self, ctx: commands.Context) -> None: + memory_file: str = "memory.json" + file_size: int = os.path.getsize(memory_file) + + with open(memory_file, "r") as file: + line_count: int = sum(1 for _ in file) + + embed: discord.Embed = discord.Embed( + title=f"{k.command_stats_embed_title()}", + description=f"{k.command_stats_embed_desc()}", + color=discord.Colour(0x000000), + ) + embed.add_field( + name=f"{k.command_stats_embed_field1name()}", + value=f"{k.command_stats_embed_field1value(file_size=file_size, line_count=line_count)}", + inline=False, + ) + + with open(settings["splash_text_loc"], "r") as f: + splash_text = "".join(f.readlines()) + + embed.add_field( + name=f"{k.command_stats_embed_field3name()}", + value=f"{k.command_stats_embed_field3value( + NAME=settings["name"], PREFIX=settings["bot"]["prefix"], ownerid=settings["bot"]["owner_ids"][0], + PING_LINE=settings["bot"]["misc"]["ping_line"], showmemenabled=settings["bot"]["allow_show_mem_command"], + USERTRAIN_ENABLED=settings["bot"]["user_training"], song=settings["bot"]["misc"]["active_song"], + splashtext=splash_text + )}", + inline=False, + ) + + await send_message(ctx, embed=embed) + + @commands.command() + async def mem(self, ctx: commands.Context) -> None: + if not settings["bot"]["allow_show_mem_command"]: + return + + with open(settings["bot"]["active_memory"], "rb") as f: + data: bytes = f.read() + + response = requests.post( + "https://litterbox.catbox.moe/resources/internals/api.php", + data={"reqtype": "fileupload", "time": "1h", "fileToUpload": data}, + ) + + print(response.text) + print(response.status_code) + + # await send_message(ctx, memorylitter.stdout.strip()) + + +async def setup(bot: discord.ext.commands.Bot): + print("Setting up base_commands") + bot.remove_command("help") + await bot.add_cog(BaseCommands(bot)) diff --git a/assets/cogs/cogmanager.py b/assets/cogs/internal/cogmanager.py similarity index 100% rename from assets/cogs/cogmanager.py rename to assets/cogs/internal/cogmanager.py diff --git a/assets/cogs/internal/markov.py b/assets/cogs/internal/markov.py new file mode 100644 index 0000000..0c63813 --- /dev/null +++ b/assets/cogs/internal/markov.py @@ -0,0 +1,119 @@ +import os +import random +import re +import discord +from discord.ext import commands + +import discord.ext +import discord.ext.commands + +from modules.markovmemory import save_markov_model, train_markov_model +from modules.permission import requires_admin +from modules.sentenceprocessing import ( + improve_sentence_coherence, + is_positive, + rephrase_for_coherence, + send_message, +) +import modules.keys as k +import logging +from typing import List, Optional, Set +import json +import time +import markovify + + +logger = logging.getLogger("goober") +from modules.settings import Settings as SettingsManager + +settings_manager = SettingsManager() +settings = settings_manager.settings + + +class Markov(commands.Cog): + def __init__(self, bot): + self.bot: discord.ext.commands.Bot = bot + self.markov_model: markovify.NewlineText + + @requires_admin() + @commands.command() + async def retrain(self, ctx: discord.ext.commands.Context): + message_ref: discord.Message | None = await send_message( + ctx, f"{k.command_markov_retrain()}" + ) + + if message_ref is None: + logger.error("Failed to send message!") + return + + try: + with open(settings["bot"]["active_memory"], "r") as f: + memory: List[str] = json.load(f) + except FileNotFoundError: + await send_message(ctx, f"{k.command_markov_memory_not_found()}") + return + except json.JSONDecodeError: + await send_message(ctx, f"{k.command_markov_memory_is_corrupt()}") + return + + data_size: int = len(memory) + + processing_message_ref: discord.Message | None = await send_message( + ctx, f"{k.command_markov_retraining(data_size)}" + ) + if processing_message_ref is None: + logger.error("Couldnt find message processing message!") + + start_time: float = time.time() + + model = train_markov_model(memory) + if not model: + logger.error("Failed to train markov model") + await ctx.send("Failed to retrain!") + return False + + self.model = model + save_markov_model(self.model) + + logger.debug(f"Completed retraining in {round(time.time() - start_time,3)}s") + + await send_message( + ctx, + f"{k.command_markov_retrain_successful(data_size)}", + edit=True, + message_reference=processing_message_ref, + ) + + @commands.command() + async def talk(self, ctx: commands.Context, sentence_size: int = 5) -> None: + if not self.model: + await send_message(ctx, f"{k.command_talk_insufficent_text()}") + return + + response: str = "" + if sentence_size == 1: + response = ( + self.model.make_short_sentence(max_chars=100, tries=100) + or k.command_talk_generation_fail() + ) + + else: + response = improve_sentence_coherence( + self.model.make_sentence(tries=100, max_words=sentence_size) + or k.command_talk_generation_fail() + ) + + cleaned_response: str = re.sub(r"[^\w\s]", "", response).lower() + coherent_response: str = rephrase_for_coherence(cleaned_response) + + if random.random() < 0.9 and is_positive(coherent_response): + gif_url: str = random.choice(settings["bot"]["misc"]["positive_gifs"]) + + coherent_response = f"{coherent_response}\n[jif]({gif_url})" + + os.environ["gooberlatestgen"] = coherent_response + await send_message(ctx, coherent_response) + + +async def setup(bot): + await bot.add_cog(Markov(bot)) diff --git a/bot.py b/bot.py index 79e040e..751a0e1 100644 --- a/bot.py +++ b/bot.py @@ -133,9 +133,11 @@ used_words: Set[str] = set() async def load_cogs_from_folder(bot: commands.Bot, folder_name="assets/cogs"): for filename in [file for file in os.listdir(folder_name) if file.endswith(".py")]: cog_name: str = filename[:-3] - print(cog_name) - if cog_name not in settings["bot"]["enabled_cogs"]: + if ( + "internal" not in folder_name + and cog_name not in settings["bot"]["enabled_cogs"] + ): logger.debug(f"Skipping cog {cog_name} (not in enabled cogs)") continue @@ -153,17 +155,16 @@ async def load_cogs_from_folder(bot: commands.Bot, folder_name="assets/cogs"): @bot.event async def on_ready() -> None: global launched - global slash_commands_enabled folder_name: str = "cogs" if launched: return - await load_cogs_from_folder(bot) + await load_cogs_from_folder(bot, "assets/cogs/internal") try: synced: List[discord.app_commands.AppCommand] = await bot.tree.sync() + logger.info(f"{k.synced_commands()} {len(synced)} {k.synced_commands2()}") - slash_commands_enabled = True logger.info(k.started(settings["name"])) except discord.errors.Forbidden as perm_error: @@ -209,90 +210,6 @@ async def on_command_error(ctx: commands.Context, error: commands.CommandError) ) -# Command: Retrain the Markov model from memory -@requires_admin() -@bot.hybrid_command(description=f"{k.command_desc_retrain()}") -async def retrain(ctx: commands.Context) -> None: - global markov_model - - message_ref: discord.Message | None = await send_message( - ctx, f"{k.command_markov_retrain()}" - ) - if message_ref is None: - logger.error("Failed to send message!") - return - - try: - with open(settings["bot"]["active_memory"], "r") as f: - memory: List[str] = json.load(f) - except FileNotFoundError: - await send_message(ctx, f"{k.command_markov_memory_not_found()}") - return - except json.JSONDecodeError: - await send_message(ctx, f"{k.command_markov_memory_is_corrupt()}") - return - - data_size: int = len(memory) - - processing_message_ref: discord.Message | None = await send_message( - ctx, f"{k.command_markov_retraining(data_size)}" - ) - if processing_message_ref is None: - logger.error("Couldnt find message processing message!") - - start_time: float = time.time() - - markov_model = train_markov_model(memory) - save_markov_model(markov_model) - - logger.debug(f"Completed retraining in {round(time.time() - start_time,3)}s") - - await send_message( - ctx, - f"{k.command_markov_retrain_successful(data_size)}", - edit=True, - message_reference=processing_message_ref, - ) - - -# Command: Generate a sentence using the Markov model -@bot.hybrid_command(description=f"{k.command_desc_talk()}") -async def talk(ctx: commands.Context, sentence_size: int = 5) -> None: - if not markov_model: - await send_message(ctx, f"{k.command_talk_insufficent_text()}") - return - - response: Optional[str] = None - for _ in range(20): - if sentence_size == 1: - response = markov_model.make_short_sentence(max_chars=100, tries=100) - if response: - response = response.split()[0] - else: - response = markov_model.make_sentence(tries=100, max_words=sentence_size) - - if response and response not in generated_sentences: - if sentence_size > 1: - response = improve_sentence_coherence(response) - generated_sentences.add(response) - break - - if response: - cleaned_response: str = re.sub(r"[^\w\s]", "", response).lower() - coherent_response: str = rephrase_for_coherence(cleaned_response) - if random.random() < 0.9 and is_positive(coherent_response): - gif_url: str = random.choice(positive_gifs) - combined_message: str = f"{coherent_response}\n[jif]({gif_url})" - else: - combined_message: str = coherent_response - logger.info(combined_message) - - os.environ["gooberlatestgen"] = combined_message - await send_message(ctx, combined_message) - else: - await send_message(ctx, f"{k.command_talk_generation_fail()}") - - # New demotivator command @bot.hybrid_command(description="Generate a demotivator poster with two lines of text") async def demotivator(ctx: commands.Context) -> None: @@ -348,69 +265,6 @@ async def demotivator(ctx: commands.Context) -> None: os.remove(temp_input) -bot.remove_command("help") - - -# Command: Show help information -@bot.hybrid_command(description=f"{k.command_desc_help()}") -async def help(ctx: commands.Context) -> None: - embed: discord.Embed = discord.Embed( - title=f"{k.command_help_embed_title()}", - description=f"{k.command_help_embed_desc()}", - color=Colour(0x000000), - ) - - command_categories: Dict[str, List[str]] = { - f"{k.command_help_categories_general()}": [ - "mem", - "talk", - "about", - "ping", - "impact", - "demotivator", - "help", - ], - f"{k.command_help_categories_admin()}": ["stats", "retrain", "setlanguage"], - } - - custom_commands: List[str] = [] - for cog_name, cog in bot.cogs.items(): - for command in cog.get_commands(): - if ( - command.name - not in command_categories[f"{k.command_help_categories_general()}"] - and command.name - not in command_categories[f"{k.command_help_categories_admin()}"] - ): - custom_commands.append(command.name) - - if custom_commands: - embed.add_field( - name=f"{k.command_help_categories_custom()}", - value="\n".join( - [f"{settings["bot"]["prefix"]}{command}" for command in custom_commands] - ), - inline=False, - ) - - for category, commands_list in command_categories.items(): - commands_in_category: str = "\n".join( - [f"{settings["bot"]["prefix"]}{command}" for command in commands_list] - ) - embed.add_field(name=category, value=commands_in_category, inline=False) - - await send_message(ctx, embed=embed) - - -@requires_admin() -@bot.hybrid_command(description=f"{k.command_desc_setlang()}") -@app_commands.describe(locale="Choose your language") -async def setlanguage(ctx: commands.Context, locale: str) -> None: - await ctx.defer() - k.change_language(locale) - await ctx.send(":thumbsup:") - - # Event: Called on every message @bot.event async def on_message(message: discord.Message) -> None: @@ -514,106 +368,6 @@ async def block_blacklisted(ctx: commands.Context) -> bool: return True -# Command: Show bot latency -@bot.hybrid_command(description=f"{k.command_desc_ping()}") -async def ping(ctx: commands.Context) -> None: - await ctx.defer() - latency: int = round(bot.latency * 1000) - - embed: discord.Embed = discord.Embed( - title="Pong!!", - description=( - settings["bot"]["misc"]["ping_line"], - f"`{k.command_ping_embed_desc()}: {latency}ms`\n", - ), - color=Colour(0x000000), - ) - embed.set_footer( - text=f"{k.command_ping_footer()} {ctx.author.name}", - icon_url=ctx.author.display_avatar.url, - ) - - await ctx.send(embed=embed) - - -# Command: Show about information -@bot.hybrid_command(description=f"{k.command_about_desc()}") -async def about(ctx: commands.Context) -> None: - embed: discord.Embed = discord.Embed( - title=f"{k.command_about_embed_title()}", description="", color=Colour(0x000000) - ) - - embed.add_field( - name=k.command_about_embed_field1(), value=f"{settings["name"]}", inline=False - ) - - embed.add_field( - name=k.command_about_embed_field2name(), - value=k.command_about_embed_field2value( - local_version=local_version, latest_version=latest_version - ), - inline=False, - ) - - embed.add_field(name="Github", value=f"https://github.com/gooberinc/goober") - - await send_message(ctx, embed=embed) - - -@requires_admin() -@bot.hybrid_command(description="stats") -async def stats(ctx: commands.Context) -> None: - memory_file: str = "memory.json" - file_size: int = os.path.getsize(memory_file) - - with open(memory_file, "r") as file: - line_count: int = sum(1 for _ in file) - - embed: discord.Embed = discord.Embed( - title=f"{k.command_stats_embed_title()}", - description=f"{k.command_stats_embed_desc()}", - color=Colour(0x000000), - ) - embed.add_field( - name=f"{k.command_stats_embed_field1name()}", - value=f"{k.command_stats_embed_field1value(file_size=file_size, line_count=line_count)}", - inline=False, - ) - embed.add_field( - name=f"{k.command_stats_embed_field2name()}", - value=f"{k.command_stats_embed_field2value(local_version=local_version, latest_version=latest_version)}", - inline=False, - ) - embed.add_field( - name=f"{k.command_stats_embed_field3name()}", - value=f"{k.command_stats_embed_field3value( - NAME=settings["name"], PREFIX=settings["bot"]["prefix"], ownerid=settings["bot"]["owner_ids"][0], - PING_LINE=settings["bot"]["misc"]["ping_line"], showmemenabled=settings["bot"]["allow_show_mem_command"], - USERTRAIN_ENABLED=settings["bot"]["user_training"], song=settings["bot"]["misc"]["active_song"], - splashtext=splash_text - )}", - inline=False, - ) - - await send_message(ctx, embed=embed) - - -# Command: Upload memory.json to litterbox.catbox.moe and return the link -@bot.hybrid_command() -async def mem(ctx: commands.Context) -> None: - if not settings["bot"]["allow_show_mem_command"]: - return - - command: str = ( - """curl -F "reqtype=fileupload" -F "time=1h" -F "fileToUpload=@memory.json" https://litterbox.catbox.moe/resources/internals/api.php""" - ) - memorylitter: subprocess.CompletedProcess = subprocess.run( - command, shell=True, capture_output=True, text=True - ) - logger.debug(memorylitter) - await send_message(ctx, memorylitter.stdout.strip()) - - # Helper: Improve sentence coherence (simple capitalization fix) def improve_sentence_coherence(sentence: str) -> str: # Capitalizes "i" to "I" in the sentence @@ -651,4 +405,5 @@ class Handler(FileSystemEventHandler): # Start the bot -bot.run(os.environ.get("DISCORD_BOT_TOKEN", "")) +if __name__ == "__main__": + bot.run(os.environ.get("DISCORD_BOT_TOKEN", "")) diff --git a/modules/globalvars.py b/modules/globalvars.py index fc0c2a1..dbbbbeb 100644 --- a/modules/globalvars.py +++ b/modules/globalvars.py @@ -27,7 +27,7 @@ RED = f"{ANSI}31m" GREEN = f"{ANSI}32m" YELLOW = f"{ANSI}33m" PURPLE = f"{ANSI}35m" -DEBUG = f"{ANSI}1;30m" +DEBUG = f"{ANSI}90m" RESET = f"{ANSI}0m" VERSION_URL = "https://raw.githubusercontent.com/gooberinc/version/main" diff --git a/modules/key_compiler.py b/modules/key_compiler.py index 0fc69e5..5340bce 100644 --- a/modules/key_compiler.py +++ b/modules/key_compiler.py @@ -35,13 +35,6 @@ NOTICE = """ """ -logging.basicConfig( - level=logging.DEBUG, - format="%(levelname)s: [%(filename)s:%(funcName)s] %(message)s", - datefmt="%d/%m/%Y %H.%M.%S", - stream=sys.stdout, -) - logger = logging.getLogger("kaannos") @@ -61,7 +54,6 @@ class LanguageCollector: keys: Dict[str, str] = json.load(f) self.languages[locale] = keys - print(self.languages) self.find_missing_keys() def find_missing_keys(self) -> None: diff --git a/modules/markovmemory.py b/modules/markovmemory.py index a90524e..41fcbbb 100644 --- a/modules/markovmemory.py +++ b/modules/markovmemory.py @@ -45,16 +45,19 @@ def save_memory(memory): json.dump(memory, f, indent=4) -def train_markov_model(memory, additional_data=None): +def train_markov_model(memory, additional_data=None) -> markovify.NewlineText | None: if not memory: return None + filtered_memory = [line for line in memory if isinstance(line, str)] if additional_data: filtered_memory.extend( line for line in additional_data if isinstance(line, str) ) + if not filtered_memory: return None + text = "\n".join(filtered_memory) model = markovify.NewlineText(text, state_size=2) return model diff --git a/modules/prestartchecks.py b/modules/prestartchecks.py index d100cb4..b6636ff 100644 --- a/modules/prestartchecks.py +++ b/modules/prestartchecks.py @@ -192,8 +192,10 @@ def check_cpu(): cpu_per_core = psutil.cpu_percent(interval=1, percpu=True) # type: ignore total_cpu = sum(cpu_per_core) / len(cpu_per_core) logger.info(k.total_cpu_usage(usage=total_cpu)) + if total_cpu > 85: logger.warning(f"{k.high_avg_cpu(usage=total_cpu)}") + if total_cpu > 95: logger.error(k.really_high_cpu()) sys.exit(1) @@ -211,12 +213,15 @@ def check_memoryjson(): try: with open(settings["bot"]["active_memory"], "r", encoding="utf-8") as f: json.load(f) + except json.JSONDecodeError as e: logger.error(f"{k.memory_file_corrupted(error=e)}") logger.warning(f"{k.consider_backup_memory()}") + except UnicodeDecodeError as e: logger.error(f"{k.memory_file_encoding(error=e)}") logger.warning(f"{k.consider_backup_memory()}") + except Exception as e: logger.error(f"{k.error_reading_memory(error=e)}") except FileNotFoundError: diff --git a/settings/settings.json b/settings/settings.json index 36e55cd..aa11e29 100644 --- a/settings/settings.json +++ b/settings/settings.json @@ -16,7 +16,6 @@ }, "active_memory": "memory.json", "enabled_cogs": [ - "cogmanager" ] }, "locale": "fi",