cleaned up bot.py seperating it into different files, added formatting ignore rules for key_compiler, modified prestart checks

This commit is contained in:
ctih1 2025-07-23 14:51:57 +03:00
parent f186e079da
commit 92dbc06b26
9 changed files with 311 additions and 264 deletions

View file

@ -0,0 +1,174 @@
import os
from typing import Dict, List
import discord
from discord.ext import commands
import discord.ext
import discord.ext.commands
import modules.keys as k
from modules.permission import requires_admin
from modules.sentenceprocessing import send_message
from modules.settings import Settings as SettingsManager
import requests
settings_manager = SettingsManager()
settings = settings_manager.settings
class BaseCommands(commands.Cog):
def __init__(self, bot):
self.bot: discord.ext.commands.Bot = bot
@commands.command()
async def help(self, ctx: commands.Context) -> None:
embed: discord.Embed = discord.Embed(
title=f"{k.command_help_embed_title()}",
description=f"{k.command_help_embed_desc()}",
color=discord.Colour(0x000000),
)
command_categories = {
f"{k.command_help_categories_general()}": [
"mem",
"talk",
"about",
"ping",
"impact",
"demotivator",
"help",
],
f"{k.command_help_categories_admin()}": ["stats", "retrain", "setlanguage"],
}
custom_commands: List[str] = []
for cog_name, cog in self.bot.cogs.items():
for command in cog.get_commands():
if (
command.name
not in command_categories[f"{k.command_help_categories_general()}"]
and command.name
not in command_categories[f"{k.command_help_categories_admin()}"]
):
custom_commands.append(command.name)
if custom_commands:
embed.add_field(
name=f"{k.command_help_categories_custom()}",
value="\n".join(
[
f"{settings["bot"]["prefix"]}{command}"
for command in custom_commands
]
),
inline=False,
)
for category, commands_list in command_categories.items():
commands_in_category: str = "\n".join(
[f"{settings["bot"]["prefix"]}{command}" for command in commands_list]
)
embed.add_field(name=category, value=commands_in_category, inline=False)
await send_message(ctx, embed=embed)
@requires_admin()
@commands.command()
async def setlanguage(self, ctx: commands.Context, locale: str) -> None:
await ctx.defer()
k.change_language(locale)
await ctx.send(":thumbsup:")
@commands.command()
async def ping(self, ctx: commands.Context) -> None:
await ctx.defer()
latency: int = round(self.bot.latency * 1000)
embed: discord.Embed = discord.Embed(
title="Pong!!",
description=(
settings["bot"]["misc"]["ping_line"],
f"`{k.command_ping_embed_desc()}: {latency}ms`\n",
),
color=discord.Colour(0x000000),
)
embed.set_footer(
text=f"{k.command_ping_footer()} {ctx.author.name}",
icon_url=ctx.author.display_avatar.url,
)
await ctx.send(embed=embed)
@commands.command()
async def about(self, ctx: commands.Context) -> None:
embed: discord.Embed = discord.Embed(
title=f"{k.command_about_embed_title()}",
description="",
color=discord.Colour(0x000000),
)
embed.add_field(
name=k.command_about_embed_field1(),
value=f"{settings["name"]}",
inline=False,
)
embed.add_field(name="Github", value=f"https://github.com/gooberinc/goober")
await send_message(ctx, embed=embed)
@commands.command()
async def stats(self, ctx: commands.Context) -> None:
memory_file: str = "memory.json"
file_size: int = os.path.getsize(memory_file)
with open(memory_file, "r") as file:
line_count: int = sum(1 for _ in file)
embed: discord.Embed = discord.Embed(
title=f"{k.command_stats_embed_title()}",
description=f"{k.command_stats_embed_desc()}",
color=discord.Colour(0x000000),
)
embed.add_field(
name=f"{k.command_stats_embed_field1name()}",
value=f"{k.command_stats_embed_field1value(file_size=file_size, line_count=line_count)}",
inline=False,
)
with open(settings["splash_text_loc"], "r") as f:
splash_text = "".join(f.readlines())
embed.add_field(
name=f"{k.command_stats_embed_field3name()}",
value=f"{k.command_stats_embed_field3value(
NAME=settings["name"], PREFIX=settings["bot"]["prefix"], ownerid=settings["bot"]["owner_ids"][0],
PING_LINE=settings["bot"]["misc"]["ping_line"], showmemenabled=settings["bot"]["allow_show_mem_command"],
USERTRAIN_ENABLED=settings["bot"]["user_training"], song=settings["bot"]["misc"]["active_song"],
splashtext=splash_text
)}",
inline=False,
)
await send_message(ctx, embed=embed)
@commands.command()
async def mem(self, ctx: commands.Context) -> None:
if not settings["bot"]["allow_show_mem_command"]:
return
with open(settings["bot"]["active_memory"], "rb") as f:
data: bytes = f.read()
response = requests.post(
"https://litterbox.catbox.moe/resources/internals/api.php",
data={"reqtype": "fileupload", "time": "1h", "fileToUpload": data},
)
print(response.text)
print(response.status_code)
# await send_message(ctx, memorylitter.stdout.strip())
async def setup(bot: discord.ext.commands.Bot):
print("Setting up base_commands")
bot.remove_command("help")
await bot.add_cog(BaseCommands(bot))

View file

@ -0,0 +1,119 @@
import os
import random
import re
import discord
from discord.ext import commands
import discord.ext
import discord.ext.commands
from modules.markovmemory import save_markov_model, train_markov_model
from modules.permission import requires_admin
from modules.sentenceprocessing import (
improve_sentence_coherence,
is_positive,
rephrase_for_coherence,
send_message,
)
import modules.keys as k
import logging
from typing import List, Optional, Set
import json
import time
import markovify
logger = logging.getLogger("goober")
from modules.settings import Settings as SettingsManager
settings_manager = SettingsManager()
settings = settings_manager.settings
class Markov(commands.Cog):
def __init__(self, bot):
self.bot: discord.ext.commands.Bot = bot
self.markov_model: markovify.NewlineText
@requires_admin()
@commands.command()
async def retrain(self, ctx: discord.ext.commands.Context):
message_ref: discord.Message | None = await send_message(
ctx, f"{k.command_markov_retrain()}"
)
if message_ref is None:
logger.error("Failed to send message!")
return
try:
with open(settings["bot"]["active_memory"], "r") as f:
memory: List[str] = json.load(f)
except FileNotFoundError:
await send_message(ctx, f"{k.command_markov_memory_not_found()}")
return
except json.JSONDecodeError:
await send_message(ctx, f"{k.command_markov_memory_is_corrupt()}")
return
data_size: int = len(memory)
processing_message_ref: discord.Message | None = await send_message(
ctx, f"{k.command_markov_retraining(data_size)}"
)
if processing_message_ref is None:
logger.error("Couldnt find message processing message!")
start_time: float = time.time()
model = train_markov_model(memory)
if not model:
logger.error("Failed to train markov model")
await ctx.send("Failed to retrain!")
return False
self.model = model
save_markov_model(self.model)
logger.debug(f"Completed retraining in {round(time.time() - start_time,3)}s")
await send_message(
ctx,
f"{k.command_markov_retrain_successful(data_size)}",
edit=True,
message_reference=processing_message_ref,
)
@commands.command()
async def talk(self, ctx: commands.Context, sentence_size: int = 5) -> None:
if not self.model:
await send_message(ctx, f"{k.command_talk_insufficent_text()}")
return
response: str = ""
if sentence_size == 1:
response = (
self.model.make_short_sentence(max_chars=100, tries=100)
or k.command_talk_generation_fail()
)
else:
response = improve_sentence_coherence(
self.model.make_sentence(tries=100, max_words=sentence_size)
or k.command_talk_generation_fail()
)
cleaned_response: str = re.sub(r"[^\w\s]", "", response).lower()
coherent_response: str = rephrase_for_coherence(cleaned_response)
if random.random() < 0.9 and is_positive(coherent_response):
gif_url: str = random.choice(settings["bot"]["misc"]["positive_gifs"])
coherent_response = f"{coherent_response}\n[jif]({gif_url})"
os.environ["gooberlatestgen"] = coherent_response
await send_message(ctx, coherent_response)
async def setup(bot):
await bot.add_cog(Markov(bot))

261
bot.py
View file

@ -133,9 +133,11 @@ used_words: Set[str] = set()
async def load_cogs_from_folder(bot: commands.Bot, folder_name="assets/cogs"):
for filename in [file for file in os.listdir(folder_name) if file.endswith(".py")]:
cog_name: str = filename[:-3]
print(cog_name)
if cog_name not in settings["bot"]["enabled_cogs"]:
if (
"internal" not in folder_name
and cog_name not in settings["bot"]["enabled_cogs"]
):
logger.debug(f"Skipping cog {cog_name} (not in enabled cogs)")
continue
@ -153,17 +155,16 @@ async def load_cogs_from_folder(bot: commands.Bot, folder_name="assets/cogs"):
@bot.event
async def on_ready() -> None:
global launched
global slash_commands_enabled
folder_name: str = "cogs"
if launched:
return
await load_cogs_from_folder(bot)
await load_cogs_from_folder(bot, "assets/cogs/internal")
try:
synced: List[discord.app_commands.AppCommand] = await bot.tree.sync()
logger.info(f"{k.synced_commands()} {len(synced)} {k.synced_commands2()}")
slash_commands_enabled = True
logger.info(k.started(settings["name"]))
except discord.errors.Forbidden as perm_error:
@ -209,90 +210,6 @@ async def on_command_error(ctx: commands.Context, error: commands.CommandError)
)
# Command: Retrain the Markov model from memory
@requires_admin()
@bot.hybrid_command(description=f"{k.command_desc_retrain()}")
async def retrain(ctx: commands.Context) -> None:
global markov_model
message_ref: discord.Message | None = await send_message(
ctx, f"{k.command_markov_retrain()}"
)
if message_ref is None:
logger.error("Failed to send message!")
return
try:
with open(settings["bot"]["active_memory"], "r") as f:
memory: List[str] = json.load(f)
except FileNotFoundError:
await send_message(ctx, f"{k.command_markov_memory_not_found()}")
return
except json.JSONDecodeError:
await send_message(ctx, f"{k.command_markov_memory_is_corrupt()}")
return
data_size: int = len(memory)
processing_message_ref: discord.Message | None = await send_message(
ctx, f"{k.command_markov_retraining(data_size)}"
)
if processing_message_ref is None:
logger.error("Couldnt find message processing message!")
start_time: float = time.time()
markov_model = train_markov_model(memory)
save_markov_model(markov_model)
logger.debug(f"Completed retraining in {round(time.time() - start_time,3)}s")
await send_message(
ctx,
f"{k.command_markov_retrain_successful(data_size)}",
edit=True,
message_reference=processing_message_ref,
)
# Command: Generate a sentence using the Markov model
@bot.hybrid_command(description=f"{k.command_desc_talk()}")
async def talk(ctx: commands.Context, sentence_size: int = 5) -> None:
if not markov_model:
await send_message(ctx, f"{k.command_talk_insufficent_text()}")
return
response: Optional[str] = None
for _ in range(20):
if sentence_size == 1:
response = markov_model.make_short_sentence(max_chars=100, tries=100)
if response:
response = response.split()[0]
else:
response = markov_model.make_sentence(tries=100, max_words=sentence_size)
if response and response not in generated_sentences:
if sentence_size > 1:
response = improve_sentence_coherence(response)
generated_sentences.add(response)
break
if response:
cleaned_response: str = re.sub(r"[^\w\s]", "", response).lower()
coherent_response: str = rephrase_for_coherence(cleaned_response)
if random.random() < 0.9 and is_positive(coherent_response):
gif_url: str = random.choice(positive_gifs)
combined_message: str = f"{coherent_response}\n[jif]({gif_url})"
else:
combined_message: str = coherent_response
logger.info(combined_message)
os.environ["gooberlatestgen"] = combined_message
await send_message(ctx, combined_message)
else:
await send_message(ctx, f"{k.command_talk_generation_fail()}")
# New demotivator command
@bot.hybrid_command(description="Generate a demotivator poster with two lines of text")
async def demotivator(ctx: commands.Context) -> None:
@ -348,69 +265,6 @@ async def demotivator(ctx: commands.Context) -> None:
os.remove(temp_input)
bot.remove_command("help")
# Command: Show help information
@bot.hybrid_command(description=f"{k.command_desc_help()}")
async def help(ctx: commands.Context) -> None:
embed: discord.Embed = discord.Embed(
title=f"{k.command_help_embed_title()}",
description=f"{k.command_help_embed_desc()}",
color=Colour(0x000000),
)
command_categories: Dict[str, List[str]] = {
f"{k.command_help_categories_general()}": [
"mem",
"talk",
"about",
"ping",
"impact",
"demotivator",
"help",
],
f"{k.command_help_categories_admin()}": ["stats", "retrain", "setlanguage"],
}
custom_commands: List[str] = []
for cog_name, cog in bot.cogs.items():
for command in cog.get_commands():
if (
command.name
not in command_categories[f"{k.command_help_categories_general()}"]
and command.name
not in command_categories[f"{k.command_help_categories_admin()}"]
):
custom_commands.append(command.name)
if custom_commands:
embed.add_field(
name=f"{k.command_help_categories_custom()}",
value="\n".join(
[f"{settings["bot"]["prefix"]}{command}" for command in custom_commands]
),
inline=False,
)
for category, commands_list in command_categories.items():
commands_in_category: str = "\n".join(
[f"{settings["bot"]["prefix"]}{command}" for command in commands_list]
)
embed.add_field(name=category, value=commands_in_category, inline=False)
await send_message(ctx, embed=embed)
@requires_admin()
@bot.hybrid_command(description=f"{k.command_desc_setlang()}")
@app_commands.describe(locale="Choose your language")
async def setlanguage(ctx: commands.Context, locale: str) -> None:
await ctx.defer()
k.change_language(locale)
await ctx.send(":thumbsup:")
# Event: Called on every message
@bot.event
async def on_message(message: discord.Message) -> None:
@ -514,106 +368,6 @@ async def block_blacklisted(ctx: commands.Context) -> bool:
return True
# Command: Show bot latency
@bot.hybrid_command(description=f"{k.command_desc_ping()}")
async def ping(ctx: commands.Context) -> None:
await ctx.defer()
latency: int = round(bot.latency * 1000)
embed: discord.Embed = discord.Embed(
title="Pong!!",
description=(
settings["bot"]["misc"]["ping_line"],
f"`{k.command_ping_embed_desc()}: {latency}ms`\n",
),
color=Colour(0x000000),
)
embed.set_footer(
text=f"{k.command_ping_footer()} {ctx.author.name}",
icon_url=ctx.author.display_avatar.url,
)
await ctx.send(embed=embed)
# Command: Show about information
@bot.hybrid_command(description=f"{k.command_about_desc()}")
async def about(ctx: commands.Context) -> None:
embed: discord.Embed = discord.Embed(
title=f"{k.command_about_embed_title()}", description="", color=Colour(0x000000)
)
embed.add_field(
name=k.command_about_embed_field1(), value=f"{settings["name"]}", inline=False
)
embed.add_field(
name=k.command_about_embed_field2name(),
value=k.command_about_embed_field2value(
local_version=local_version, latest_version=latest_version
),
inline=False,
)
embed.add_field(name="Github", value=f"https://github.com/gooberinc/goober")
await send_message(ctx, embed=embed)
@requires_admin()
@bot.hybrid_command(description="stats")
async def stats(ctx: commands.Context) -> None:
memory_file: str = "memory.json"
file_size: int = os.path.getsize(memory_file)
with open(memory_file, "r") as file:
line_count: int = sum(1 for _ in file)
embed: discord.Embed = discord.Embed(
title=f"{k.command_stats_embed_title()}",
description=f"{k.command_stats_embed_desc()}",
color=Colour(0x000000),
)
embed.add_field(
name=f"{k.command_stats_embed_field1name()}",
value=f"{k.command_stats_embed_field1value(file_size=file_size, line_count=line_count)}",
inline=False,
)
embed.add_field(
name=f"{k.command_stats_embed_field2name()}",
value=f"{k.command_stats_embed_field2value(local_version=local_version, latest_version=latest_version)}",
inline=False,
)
embed.add_field(
name=f"{k.command_stats_embed_field3name()}",
value=f"{k.command_stats_embed_field3value(
NAME=settings["name"], PREFIX=settings["bot"]["prefix"], ownerid=settings["bot"]["owner_ids"][0],
PING_LINE=settings["bot"]["misc"]["ping_line"], showmemenabled=settings["bot"]["allow_show_mem_command"],
USERTRAIN_ENABLED=settings["bot"]["user_training"], song=settings["bot"]["misc"]["active_song"],
splashtext=splash_text
)}",
inline=False,
)
await send_message(ctx, embed=embed)
# Command: Upload memory.json to litterbox.catbox.moe and return the link
@bot.hybrid_command()
async def mem(ctx: commands.Context) -> None:
if not settings["bot"]["allow_show_mem_command"]:
return
command: str = (
"""curl -F "reqtype=fileupload" -F "time=1h" -F "fileToUpload=@memory.json" https://litterbox.catbox.moe/resources/internals/api.php"""
)
memorylitter: subprocess.CompletedProcess = subprocess.run(
command, shell=True, capture_output=True, text=True
)
logger.debug(memorylitter)
await send_message(ctx, memorylitter.stdout.strip())
# Helper: Improve sentence coherence (simple capitalization fix)
def improve_sentence_coherence(sentence: str) -> str:
# Capitalizes "i" to "I" in the sentence
@ -651,4 +405,5 @@ class Handler(FileSystemEventHandler):
# Start the bot
bot.run(os.environ.get("DISCORD_BOT_TOKEN", ""))
if __name__ == "__main__":
bot.run(os.environ.get("DISCORD_BOT_TOKEN", ""))

View file

@ -27,7 +27,7 @@ RED = f"{ANSI}31m"
GREEN = f"{ANSI}32m"
YELLOW = f"{ANSI}33m"
PURPLE = f"{ANSI}35m"
DEBUG = f"{ANSI}1;30m"
DEBUG = f"{ANSI}90m"
RESET = f"{ANSI}0m"
VERSION_URL = "https://raw.githubusercontent.com/gooberinc/version/main"

View file

@ -35,13 +35,6 @@ NOTICE = """
"""
logging.basicConfig(
level=logging.DEBUG,
format="%(levelname)s: [%(filename)s:%(funcName)s] %(message)s",
datefmt="%d/%m/%Y %H.%M.%S",
stream=sys.stdout,
)
logger = logging.getLogger("kaannos")
@ -61,7 +54,6 @@ class LanguageCollector:
keys: Dict[str, str] = json.load(f)
self.languages[locale] = keys
print(self.languages)
self.find_missing_keys()
def find_missing_keys(self) -> None:

View file

@ -45,16 +45,19 @@ def save_memory(memory):
json.dump(memory, f, indent=4)
def train_markov_model(memory, additional_data=None):
def train_markov_model(memory, additional_data=None) -> markovify.NewlineText | None:
if not memory:
return None
filtered_memory = [line for line in memory if isinstance(line, str)]
if additional_data:
filtered_memory.extend(
line for line in additional_data if isinstance(line, str)
)
if not filtered_memory:
return None
text = "\n".join(filtered_memory)
model = markovify.NewlineText(text, state_size=2)
return model

View file

@ -192,8 +192,10 @@ def check_cpu():
cpu_per_core = psutil.cpu_percent(interval=1, percpu=True) # type: ignore
total_cpu = sum(cpu_per_core) / len(cpu_per_core)
logger.info(k.total_cpu_usage(usage=total_cpu))
if total_cpu > 85:
logger.warning(f"{k.high_avg_cpu(usage=total_cpu)}")
if total_cpu > 95:
logger.error(k.really_high_cpu())
sys.exit(1)
@ -211,12 +213,15 @@ def check_memoryjson():
try:
with open(settings["bot"]["active_memory"], "r", encoding="utf-8") as f:
json.load(f)
except json.JSONDecodeError as e:
logger.error(f"{k.memory_file_corrupted(error=e)}")
logger.warning(f"{k.consider_backup_memory()}")
except UnicodeDecodeError as e:
logger.error(f"{k.memory_file_encoding(error=e)}")
logger.warning(f"{k.consider_backup_memory()}")
except Exception as e:
logger.error(f"{k.error_reading_memory(error=e)}")
except FileNotFoundError:

View file

@ -16,7 +16,6 @@
},
"active_memory": "memory.json",
"enabled_cogs": [
"cogmanager"
]
},
"locale": "fi",