diff --git a/.gitignore b/.gitignore index 704d466..8e0b517 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,6 @@ received_memory.json translation_report.txt translationcompleteness.py modules/volta -log.txt \ No newline at end of file +log.txt +settings/settings.json +settings/splash.txt diff --git a/assets/cogs/internal/base_commands.py b/assets/cogs/internal/base_commands.py new file mode 100644 index 0000000..6be6405 --- /dev/null +++ b/assets/cogs/internal/base_commands.py @@ -0,0 +1,189 @@ +import os +import subprocess +from typing import Dict, List +import discord +from discord.ext import commands +import discord.ext +import discord.ext.commands +from modules.volta.main import _ , set_language +from modules.permission import requires_admin +from modules.sentenceprocessing import send_message +from modules.settings import instance as settings_manager +import requests + +settings = settings_manager.settings + +def get_git_origin_raw(): + try: + result = subprocess.run( + ["git", "config", "--get", "remote.origin.url"], + capture_output=True, text=True, check=True + ) + return result.stdout.strip() + except Exception: + return "http://forgejo.expect.ovh/gooberinc/goober" + +class BaseCommands(commands.Cog): + def __init__(self, bot): + self.bot: discord.ext.commands.Bot = bot + + + + @commands.command() + async def help(self, ctx: commands.Context) -> None: + embed: discord.Embed = discord.Embed( + title=f"{_('command_help_embed_title')}", + description=f"{_('command_help_embed_desc')}", + color=discord.Colour(0x000000), + ) + + command_categories = { + f"{_('command_help_categories_general')}": [ + "mem", + "talk", + "about", + "ping", + "impact", + "demotivator", + "help", + ], + f"{_('command_help_categories_admin')}": ["stats", "retrain", "setlanguage"], + } + + custom_commands: List[str] = [] + for cog_name, cog in self.bot.cogs.items(): + for command in cog.get_commands(): + if ( + command.name + not in command_categories[f"{_('command_help_categories_general')}"] + and command.name + not in command_categories[f"{_('command_help_categories_admin')}"] + ): + custom_commands.append(command.name) + + if custom_commands: + embed.add_field( + name=_('command_help_categories_custom'), + value="\n".join( + [ + f"{settings['bot']['prefix']}{command}" + for command in custom_commands + ] + ), + inline=False, + ) + + for category, commands_list in command_categories.items(): + commands_in_category: str = "\n".join( + [f"{settings['bot']['prefix']}{command}" for command in commands_list] + ) + embed.add_field(name=category, value=commands_in_category, inline=False) + + + await send_message(ctx, embed=embed) + + @requires_admin() + @commands.command() + async def setlanguage(self, ctx: commands.Context, locale: str) -> None: + await ctx.defer() + set_language(locale) + await ctx.send(":thumbsup:") + + @commands.command() + async def ping(self, ctx: commands.Context) -> None: + await ctx.defer() + latency: int = round(self.bot.latency * 1000) + + embed: discord.Embed = discord.Embed( + title="Pong!!", + description=( + settings["bot"]["misc"]["ping_line"], + f"`{_('command_ping_embed_desc')}: {latency}ms`\n", + ), + color=discord.Colour(0x000000), + ) + embed.set_footer( + text=f"{_('command_ping_footer')} {ctx.author.name}", + icon_url=ctx.author.display_avatar.url, + ) + + await ctx.send(embed=embed) + + @commands.command() + async def about(self, ctx: commands.Context) -> None: + embed: discord.Embed = discord.Embed( + title=f"{_('command_about_embed_title')}", + description="", + color=discord.Colour(0x000000), + ) + + embed.add_field( + name=_('command_about_embed_field1'), + value=settings['name'], + inline=False, + ) + + embed.add_field(name="Git", value=get_git_origin_raw()) + await send_message(ctx, embed=embed) + + @commands.command() + async def stats(self, ctx: commands.Context) -> None: + memory_file: str = "memory.json" + file_size: int = os.path.getsize(memory_file) + + with open(memory_file, "r") as file: + line_count: int = sum(1 for _ in file) + + embed: discord.Embed = discord.Embed( + title=f"{_('command_stats_embed_title')}", + description=f"{_('command_stats_embed_desc')}", + color=discord.Colour(0x000000), + ) + embed.add_field( + name=f"{_('command_stats_embed_field1name')}", + value=f"placeholder", + inline=False, + ) + + with open(settings["splash_text_loc"], "r") as f: + splash_text = "".join(f.readlines()) + + embed.add_field( + name=_('command_stats_embed_field3name'), + value=_('command_stats_embed_field3value').format( + NAME=settings["name"], + PREFIX=settings["bot"]["prefix"], + ownerid=settings["bot"]["owner_ids"][0], + PING_LINE=settings["bot"]["misc"]["ping_line"], + showmemenabled=settings["bot"]["allow_show_mem_command"], + USERTRAIN_ENABLED=settings["bot"]["user_training"], + song=settings["bot"]["misc"]["active_song"], + splashtext=splash_text + ), + inline=False, + ) + + + await send_message(ctx, embed=embed) + + @commands.command() + async def mem(self, ctx: commands.Context) -> None: + if not settings["bot"]["allow_show_mem_command"]: + return + + with open(settings["bot"]["active_memory"], "rb") as f: + data: bytes = f.read() + + response = requests.post( + "https://litterbox.catbox.moe/resources/internals/api.php", + data={"reqtype": "fileupload", "time": "1h"}, + files={"fileToUpload": data}, + ) + + await send_message(ctx, response.text) + + +async def setup(bot: discord.ext.commands.Bot): + print("Setting up base_commands") + bot.remove_command("help") + await bot.add_cog(BaseCommands(bot)) \ No newline at end of file diff --git a/assets/cogs/internal/markov.py b/assets/cogs/internal/markov.py new file mode 100644 index 0000000..56f319a --- /dev/null +++ b/assets/cogs/internal/markov.py @@ -0,0 +1,119 @@ +import os +import random +import re +import discord +from discord.ext import commands + +import discord.ext +import discord.ext.commands + +from modules.markovmemory import save_markov_model, train_markov_model, load_markov_model +from modules.permission import requires_admin +from modules.sentenceprocessing import ( + improve_sentence_coherence, + is_positive, + rephrase_for_coherence, + send_message, +) +from modules.volta.main import _ +import logging +from typing import List, Optional, Set +import json +import time +import markovify + + +logger = logging.getLogger("goober") +from modules.settings import instance as settings_manager + +settings = settings_manager.settings + + +class Markov(commands.Cog): + def __init__(self, bot): + self.bot: discord.ext.commands.Bot = bot + self.model: markovify.NewlineText + + @requires_admin() + @commands.command() + async def retrain(self, ctx: discord.ext.commands.Context): + message_ref: discord.Message | None = await send_message( + ctx, f"{_('command_markov_retrain')}" + ) + + if message_ref is None: + logger.error("Failed to send message!") + return + + try: + with open(settings["bot"]["active_memory"], "r") as f: + memory: List[str] = json.load(f) + except FileNotFoundError: + await send_message(ctx, f"{_('command_markov_memory_not_found')}") + return + except json.JSONDecodeError: + await send_message(ctx, f"{_('command_markov_memory_is_corrupt')}") + return + + data_size: int = len(memory) + + processing_message_ref: discord.Message | None = await send_message( + ctx, f"{(_('command_markov_retraining').format(data_size=data_size))}" + ) + if processing_message_ref is None: + logger.error("Couldnt find message processing message!") + + start_time: float = time.time() + + model = train_markov_model(memory) + if not model: + logger.error("Failed to train markov model") + await ctx.send("Failed to retrain!") + return False + + self.model = model + save_markov_model(self.model) + + logger.debug(f"Completed retraining in {round(time.time() - start_time,3)}s") + + await send_message( + ctx, + _('command_markov_retraining').format(data_size=data_size), + edit=True, + message_reference=processing_message_ref, + ) + + + @commands.command() + async def talk(self, ctx: commands.Context, sentence_size: int = 5) -> None: + if not self.model: + await send_message(ctx, f"{_('command_markovcommand_talk_insufficent_text')}") + return + + response: str = "" + if sentence_size == 1: + response = ( + self.model.make_short_sentence(max_chars=100, tries=100) + or _('command_markovcommand_talk_generation_fail') + ) + + else: + response = improve_sentence_coherence( + self.model.make_sentence(tries=100, max_words=sentence_size) + or _('command_talk_generation_fail') + ) + + cleaned_response: str = re.sub(r"[^\w\s]", "", response).lower() + coherent_response: str = rephrase_for_coherence(cleaned_response) + + if random.random() < 0.9 and is_positive(coherent_response): + gif_url: str = random.choice(settings["bot"]["misc"]["positive_gifs"]) + + coherent_response = f"{coherent_response}\n[jif]({gif_url})" + + os.environ["gooberlatestgen"] = coherent_response + await send_message(ctx, coherent_response) + + +async def setup(bot): + await bot.add_cog(Markov(bot)) \ No newline at end of file diff --git a/assets/cogs/internal/permission.py b/assets/cogs/internal/permission.py new file mode 100644 index 0000000..bdb7f42 --- /dev/null +++ b/assets/cogs/internal/permission.py @@ -0,0 +1,118 @@ +import discord +from discord.ext import commands + +from modules.permission import requires_admin +from modules.settings import instance as settings_manager + +settings = settings_manager.settings + + +class PermissionManager(commands.Cog): + def __init__(self, bot): + self.bot = bot + + @requires_admin() + @commands.command() + async def add_owner(self, ctx: commands.Context, member: discord.Member): + settings["bot"]["owner_ids"].append(member.id) + settings_manager.add_admin_log_event( + { + "action": "add", + "author": ctx.author.id, + "change": "owner_ids", + "messageId": ctx.message.id, + "target": member.id, + } + ) + + settings_manager.commit() + + embed = discord.Embed( + title="Permissions", + description=f"Set {member.name} as an owner", + color=discord.Color.blue(), + ) + + await ctx.send(embed=embed) + + @requires_admin() + @commands.command() + async def remove_owner(self, ctx: commands.Context, member: discord.Member): + try: + settings["bot"]["owner_ids"].remove(member.id) + settings_manager.add_admin_log_event( + { + "action": "del", + "author": ctx.author.id, + "change": "owner_ids", + "messageId": ctx.message.id, + "target": member.id, + } + ) + settings_manager.commit() + except ValueError: + await ctx.send("User is not an owner!") + return + + embed = discord.Embed( + title="Permissions", + description=f"Removed {member.name} from being an owner", + color=discord.Color.blue(), + ) + + await ctx.send(embed=embed) + + @requires_admin() + @commands.command() + async def blacklist_user(self, ctx: commands.Context, member: discord.Member): + settings["bot"]["blacklisted_users"].append(member.id) + settings_manager.add_admin_log_event( + { + "action": "add", + "author": ctx.author.id, + "change": "blacklisted_users", + "messageId": ctx.message.id, + "target": member.id, + } + ) + settings_manager.commit() + + embed = discord.Embed( + title="Blacklist", + description=f"Added {member.name} to the blacklist", + color=discord.Color.blue(), + ) + + await ctx.send(embed=embed) + + @requires_admin() + @commands.command() + async def unblacklist_user(self, ctx: commands.Context, member: discord.Member): + try: + settings["bot"]["blacklisted_users"].remove(member.id) + settings_manager.add_admin_log_event( + { + "action": "del", + "author": ctx.author.id, + "change": "blacklisted_users", + "messageId": ctx.message.id, + "target": member.id, + } + ) + settings_manager.commit() + + except ValueError: + await ctx.send("User is not on the blacklist!") + return + + embed = discord.Embed( + title="Blacklist", + description=f"Removed {member.name} from blacklist", + color=discord.Color.blue(), + ) + + await ctx.send(embed=embed) + + +async def setup(bot): + await bot.add_cog(PermissionManager(bot)) \ No newline at end of file diff --git a/example.env b/example.env index 45cfd9b..30dfe93 100644 --- a/example.env +++ b/example.env @@ -1,29 +1 @@ -DISCORDBOTTOKEN= -BOTPREFIX="g." -PINGLINE="The Beretta fires fast and won't make you feel any better!" -BLACKLISTEDUSERS= -OWNERID= -USERTRAINENABLED="true" -SHOWMEMENABLED="true" -LOCALE=fi -NAME=goober -AUTOUPDATE="True" -SONG="Basket Case - Green Day" -CHECKSDISABLED="Frue" -REACT="True" -STATUS="idle" -POSITIVEGIFS="https://media.discordapp.net/attachments/821047460151427135/1181371808566493184/jjpQGeno.gif, https://tenor.com/view/chill-guy-my-new-character-gif-2777893510283028272,https://tenor.com/view/goodnight-goodnight-friends-weezer-weezer-goodnight-gif-7322052181075806988" -SPLASHTEXT=" - - SS\ - SS | - SSSSSS\ SSSSSS\ SSSSSS\ SSSSSSS\ SSSSSS\ SSSSSS\ -SS __SS\ SS __SS\ SS __SS\ SS __SS\ SS __SS\ SS __SS\ -SS / SS |SS / SS |SS / SS |SS | SS |SSSSSSSS |SS | \__| -SS | SS |SS | SS |SS | SS |SS | SS |SS ____|SS | -\SSSSSSS |\SSSSSS |\SSSSSS |SSSSSSS |\SSSSSSS\ SS | - \____SS | \______/ \______/ \_______/ \_______|\__| -SS\ SS | -\SSSSSS | - \______/ -" \ No newline at end of file +DISCORDBOTTOKEN= \ No newline at end of file diff --git a/main.py b/main.py index 3a9f617..4437ffb 100644 --- a/main.py +++ b/main.py @@ -7,16 +7,30 @@ import traceback import subprocess import tempfile import shutil -import psutil -import asyncio -import platform import sys -from typing import List, Dict, Set, Optional, Tuple, Any, Union, Callable, Coroutine, TypeVar, Type +from typing import ( + List, + Dict, + Literal, + Set, + Optional, + Tuple, + Any, + TypedDict, + Union, + Callable, + Coroutine, + TypeVar, + Type, +) import logging -from modules.globalvars import * from modules.prestartchecks import start_checks from modules.logger import GooberFormatter +from modules.volta.main import * import logging +from modules.settings import Settings as SettingsManager +from modules.permission import requires_admin +from modules.volta.main import _ logger = logging.getLogger("goober") logger.setLevel(logging.DEBUG) @@ -32,37 +46,64 @@ file_handler.setFormatter(GooberFormatter(colors=False)) logger.addHandler(console_handler) logger.addHandler(file_handler) -# Print splash text and check for updates -print(splashtext) # Print splash text (from modules/globalvars.py) +settings_manager = SettingsManager() +settings = settings_manager.settings + +splash_text: str = "" + +with open(settings["splash_text_loc"], "r", encoding="UTF-8") as f: + splash_text = "".join(f.readlines()) + print(splash_text) + start_checks() -import requests import discord from discord.ext import commands from discord import app_commands -from discord import Colour, Embed, File, Interaction, Message -from discord.abc import Messageable +from discord import Colour, Message + +from better_profanity import profanity from discord.ext import commands -from modules.volta.main import _, set_language from modules.markovmemory import * -from modules.version import * from modules.sentenceprocessing import * from modules.unhandledexception import handle_exception + sys.excepthook = handle_exception -check_for_update() -T = TypeVar('T') -MessageContext = Union[commands.Context, discord.Interaction] -MessageReference = Union[Message, discord.WebhookMessage] + + +class MessageMetadata(TypedDict): + user_id: str + user_name: str + guild_id: str | Literal["DM"] + guild_name: str | Literal["DM"] + channel_id: str + channel_name: str + message: str + timestamp: float + # Constants with type hints -positive_gifs: List[str] = os.getenv("POSITIVE_GIFS", "").split(',') +positive_gifs: List[str] = settings["bot"]["misc"]["positive_gifs"] currenthash: str = "" launched: bool = False slash_commands_enabled: bool = False +# Set up Discord bot intents and create bot instance +intents: discord.Intents = discord.Intents.default() +intents.messages = True +intents.message_content = True + +bot: commands.Bot = commands.Bot( + command_prefix=settings["bot"]["prefix"], + intents=intents, + allowed_mentions=discord.AllowedMentions( + everyone=False, roles=False, users=False, replied_user=True + ), +) + # Load memory and Markov model for text generation -memory: List[str] = load_memory() +memory: List[str | Dict[Literal["_meta"], MessageMetadata]] = load_memory() markov_model: Optional[markovify.Text] = load_markov_model() if not markov_model: logger.error(_('markov_model_not_found')) @@ -72,148 +113,65 @@ if not markov_model: generated_sentences: Set[str] = set() used_words: Set[str] = set() -def get_git_remote_url(): - try: - url = subprocess.check_output( - ["git", "config", "--get", "remote.origin.url"], - text=True, - stderr=subprocess.DEVNULL, - ).strip() - return url - except subprocess.CalledProcessError: - return "Unknown" -async def load_cogs_from_folder(bot, folder_name="assets/cogs"): - for filename in os.listdir(folder_name): - if filename.endswith(".py") and not filename.startswith("_"): - cog_name = filename[:-3] - module_path = folder_name.replace("/", ".").replace("\\", ".") + f".{cog_name}" - try: - await bot.load_extension(module_path) - logger.info(f"{(_('loaded_cog'))} {cog_name}") - except Exception as e: - logger.error(f"{(_('cog_fail'))} {cog_name} {e}") - traceback.print_exc() +async def load_cogs_from_folder(bot: commands.Bot, folder_name="assets/cogs"): + for filename in [file for file in os.listdir(folder_name) if file.endswith(".py")]: + cog_name: str = filename[:-3] + + if ( + "internal" not in folder_name + and cog_name not in settings["bot"]["enabled_cogs"] + ): + logger.debug(f"Skipping cog {cog_name} (not in enabled cogs)") + continue + + module_path = folder_name.replace("/", ".").replace("\\", ".") + f".{cog_name}" + + try: + await bot.load_extension(module_path) + logger.info(f"{_('loaded_cog')} {cog_name}") + except Exception as e: + logger.error(f"{_('cog_fail')} {cog_name} {e}") + traceback.print_exc() + # Event: Called when the bot is ready @bot.event async def on_ready() -> None: global launched - global slash_commands_enabled - global NAME - global status - + folder_name: str = "cogs" if launched: return - + await load_cogs_from_folder(bot) + await load_cogs_from_folder(bot, "assets/cogs/internal") try: synced: List[discord.app_commands.AppCommand] = await bot.tree.sync() - logger.info(f"{_('synced_commands')} {len(synced)} {(_('synced_commands2'))}") - slash_commands_enabled = True - logger.info(f"{(_('started')).format(name=NAME)}") + + logger.info(f"{_('synced_commands')} {len(synced)} {_('synced_commands2')}") + logger.info(_('started')) + except discord.errors.Forbidden as perm_error: - logger.error(f"Permission error while syncing commands: {perm_error}") - logger.error("Make sure the bot has the 'applications.commands' scope and is invited with the correct permissions.") + logger.error(f"Permission error while syncing commands: {perm_error}") + logger.error( + "Make sure the bot has the 'applications.commands' scope and is invited with the correct permissions." + ) quit() except Exception as e: logger.error(f"{_('fail_commands_sync')} {e}") traceback.print_exc() quit() - - if not song: + + if not settings["bot"]["misc"]["active_song"]: return - - status = { - "idle": discord.Status.idle, - "dnd": discord.Status.dnd, - "invisible": discord.Status.invisible, - "online": discord.Status.online - }.get(status.lower(), discord.Status.online) - await bot.change_presence(status=status, activity=discord.Activity(type=discord.ActivityType.listening, name=f"{song}")) + await bot.change_presence( + activity=discord.Activity( + type=discord.ActivityType.listening, + name=settings["bot"]["misc"]["active_song"], + ) + ) launched = True -@bot.event -async def on_command_error(ctx: commands.Context, error: commands.CommandError) -> None: - from modules.unhandledexception import handle_exception - - if isinstance(error, commands.CommandInvokeError): - original: Exception = error.original - handle_exception( - type(original), original, original.__traceback__, - context=f"Command: {ctx.command} | User: {ctx.author}" - ) - else: - handle_exception( - type(error), error, error.__traceback__, - context=f"Command: {ctx.command} | User: {ctx.author}" - ) - -@bot.hybrid_command(description=f"{(_('command_desc_retrain'))}") -async def retrain(ctx: commands.Context) -> None: - if ctx.author.id != ownerid: - return - global markov_model - - message_ref: MessageReference = await send_message(ctx, f"{(_('command_markov_retrain'))}") - try: - with open(MEMORY_FILE, 'r') as f: - memory: List[str] = json.load(f) - except FileNotFoundError: - await send_message(ctx, f"{(_('command_markov_memory_not_found'))}") - return - except json.JSONDecodeError: - await send_message(ctx, f"{(_('command_markov_memory_is_corrupt'))}") - return - data_size: int = len(memory) - processed_data: int = 0 - processing_message_ref: MessageReference = await send_message(ctx, f"{(_('command_markov_retraining')).format(processed_data=processed_data, data_size=data_size)}") - markov_model = train_markov_model(memory) - save_markov_model(markov_model) - await send_message(ctx, f"{_('command_markov_retrain_successful').format(data_size=data_size)}", edit=True, message_reference=processing_message_ref) - -@bot.hybrid_command(description=f"{(_('command_desc_talk'))}") -async def talk(ctx: commands.Context, sentence_size: int = 5) -> None: - if not markov_model: - await send_message(ctx, f"{(_('command_talk_insufficent_text'))}") - return - - response = None - for _ in range(20): - if sentence_size == 1: - sentence = markov_model.make_short_sentence(max_chars=100, tries=100) - response = sentence.split()[0] if sentence else None - else: - response = markov_model.make_sentence(tries=100, max_words=sentence_size) - - if response and response not in generated_sentences: - if sentence_size > 1: - response = improve_sentence_coherence(response) - generated_sentences.add(response) - break - else: - await send_message(ctx, f"{(_('command_talk_generation_fail'))}") - return - - cleaned = re.sub(r'[^\w\s]', '', response).lower() - coherent = rephrase_for_coherence(cleaned) - - if random.random() < 0.9 and is_positive(coherent): - gif_url = random.choice(positive_gifs) - message = f"{coherent}\n[jif]({gif_url})" - else: - message = coherent - - logger.info(message) - os.environ['gooberlatestgen'] = message - await send_message(ctx, message) - - -@bot.hybrid_command(description=f"RAM") -async def ramusage(ctx): - process = psutil.Process(os.getpid()) - mem = process.memory_info().rss - await send_message(ctx, f"{mem / 1024 / 1024:.2f} MB") bot.remove_command('help') # Command: Show help information @@ -245,130 +203,135 @@ async def help(ctx: commands.Context) -> None: await send_message(ctx, embed=embed) -@bot.hybrid_command(description=f"{(_('command_desc_setlang'))}") -@app_commands.describe(locale="Choose your language") -async def setlanguage(ctx: commands.Context, locale: str) -> None: - if ctx.author.id != ownerid: - await ctx.send(":thumbsdown:") - return - await ctx.defer() - set_language(locale) - await ctx.send(":thumbsup:") +@bot.event +async def on_command_error(ctx: commands.Context, error: commands.CommandError) -> None: + from modules.unhandledexception import handle_exception + if isinstance(error, commands.CommandInvokeError): + original: Exception = error.original + handle_exception( + type(original), + original, + original.__traceback__, + context=f"Command: {ctx.command} | User: {ctx.author}", + ) + else: + handle_exception( + type(error), + error, + error.__traceback__, + context=f"Command: {ctx.command} | User: {ctx.author}", + ) + +# Event: Called on every message @bot.event async def on_message(message: discord.Message) -> None: global memory, markov_model + EMOJIS = [ + "\U0001f604", + "\U0001f44d", + "\U0001f525", + "\U0001f4af", + "\U0001f389", + "\U0001f60e", + ] # originally was emojis but it would probably shit itself on systems without unicode so.... if message.author.bot: return - if str(message.author.id) in BLACKLISTED_USERS: + if str(message.author.id) in settings["bot"]["blacklisted_users"]: return - - if message.content.startswith((f"{PREFIX}talk", f"{PREFIX}mem", f"{PREFIX}help", f"{PREFIX}stats", f"{PREFIX}")): + + commands = [ + settings["bot"]["prefix"] + command.name for command in bot.tree.get_commands() + ] + + if message.content.startswith(tuple(commands)): logger.info(f"{(_('command_ran')).format(message=message)}") await bot.process_commands(message) return + if ( + profanity.contains_profanity(message.content) + and settings["bot"]["misc"]["block_profanity"] + ): + return + if message.content: - if not USERTRAIN_ENABLED: + if not settings["bot"]["user_training"]: return + formatted_message: str = message.content - cleaned_message: str = formatted_message - save_memory(memory) + cleaned_message: str = preprocess_message(formatted_message) + if cleaned_message: + memory.append(cleaned_message) + + message_metadata: MessageMetadata = { + "user_id": str(message.author.id), + "user_name": str(message.author), + "guild_id": str(message.guild.id) if message.guild else "DM", + "guild_name": str(message.guild.name) if message.guild else "DM", + "channel_id": str(message.channel.id), + "channel_name": str(message.channel), + "message": message.content, + "timestamp": time.time(), + } + try: + if isinstance(memory, list): + memory.append({"_meta": message_metadata}) + else: + logger.warning("Memory is not a list; can't append metadata") + except Exception as e: + logger.warning(f"Failed to append metadata to memory: {e}") + + save_memory(memory) + + sentiment_score = is_positive( + message.content + ) # doesnt work but im scared to change the logic now please ignore + if sentiment_score > 0.8: + if not settings["bot"]["react_to_messages"]: + return + emoji = random.choice(EMOJIS) + try: + await message.add_reaction(emoji) + except Exception as e: + logger.info(f"Failed to react with emoji: {e}") + await bot.process_commands(message) + +# Event: Called on every interaction (slash command, etc.) @bot.event async def on_interaction(interaction: discord.Interaction) -> None: - name = None - if interaction.data.get('name') is None: - name = "Unknown" - else: - name = interaction.data['name'] logger.info(f"{(_('command_ran_s')).format(interaction=interaction)}{name}") + + +# Global check: Block blacklisted users from running commands @bot.check async def block_blacklisted(ctx: commands.Context) -> bool: - if str(ctx.author.id) in BLACKLISTED_USERS: - try: - if isinstance(ctx, discord.Interaction): - if not ctx.response.is_done(): - await ctx.response.send_message(_('blacklisted'), ephemeral=True) - else: - await ctx.followup.send(_('blacklisted'), ephemeral=True) + if ctx.author.id not in settings["bot"]["blacklisted_users"]: + return True + + try: + if isinstance(ctx, discord.Interaction): + if not ctx.response.is_done(): + await ctx.response.send_message(_('blacklisted'), ephemeral=True) else: - await ctx.send(_('blacklisted_user'), ephemeral=True) - except: - pass + await ctx.followup.send(_('blacklisted'), ephemeral=True) + else: + await ctx.send(_('blacklisted_user'), ephemeral=True) + except: return False + return True -@bot.hybrid_command(description=f"{(_('command_desc_ping'))}") -async def ping(ctx: commands.Context) -> None: - await ctx.defer() - latency: int = round(bot.latency * 1000) - pingembed: discord.Embed = discord.Embed( - title="Pong!!", - description=( - f"{PING_LINE}\n" - f"`{(_('command_ping_embed_desc'))}: {latency}ms`\n" - ), - color=Colour(0x000000) - ) - pingembed.set_footer(text=f"{(_('command_ping_footer'))} {ctx.author.name}", icon_url=ctx.author.avatar.url) - - await ctx.send(embed=pingembed) - -@bot.hybrid_command(description=f"{(_('command_about_desc'))}") -async def about(ctx: commands.Context) -> None: - latest_version: str = check_for_update(slient=True) - embed: discord.Embed = discord.Embed(title=f"{(_('command_about_embed_title'))}", description="", color=Colour(0x000000)) - embed.add_field(name=f"{(_('command_about_embed_field1'))}", value=f"{NAME}", inline=False) - embed.add_field(name=f"{(_('command_about_embed_field2name'))}", value=f"{(_('command_about_embed_field2value')).format(local_version=local_version, latest_version=latest_version)}", inline=False) - embed.add_field(name=f"Git", value=get_git_remote_url()) - embed.add_field(name=f"OS", value=platform.platform()) - - await send_message(ctx, embed=embed) - -@bot.hybrid_command(description="stats") -async def stats(ctx: commands.Context) -> None: - if ctx.author.id != ownerid: - return - latest_version: str = check_for_update() - memory_file: str = 'memory.json' - file_size: int = os.path.getsize(memory_file) - with open(memory_file, 'r') as file: - line_count: int = sum(1 for _ in file) - embed: discord.Embed = discord.Embed(title=f"{(_('command_stats_embed_title'))}", description=f"{(_('command_stats_embed_desc'))}", color=Colour(0x000000)) - embed.add_field(name=f"{(_('command_stats_embed_field1name'))}", value=f"{(_('command_stats_embed_field1value')).format(file_size=file_size, line_count=line_count)}", inline=False) - embed.add_field(name=f"{(_('command_stats_embed_field2name'))}", value=f"{(_('command_stats_embed_field2value')).format(local_version=local_version, latest_version=latest_version)}", inline=False) - embed.add_field(name=f"{(_('command_stats_embed_field3name'))}", value=f"{(_('command_stats_embed_field3value')).format(NAME=NAME, PREFIX=PREFIX, ownerid=ownerid, PING_LINE=PING_LINE, showmemenabled=showmemenabled, USERTRAIN_ENABLED=USERTRAIN_ENABLED, song=song, splashtext=splashtext)}", inline=False) - embed.add_field(name=f"OS", value=platform.platform()) - embed.add_field(name="Python Version", value=platform.python_version()) - await send_message(ctx, embed=embed) - -@bot.hybrid_command() -async def mem(ctx: commands.Context) -> None: - if showmemenabled != "true": - return - - with open("memory.json", "rb") as file: - files = { - "fileToUpload": file - } - data = { - "reqtype": "fileupload", - "time": "1h" - } - - try: - response = requests.post("https://litterbox.catbox.moe/resources/internals/api.php", files=files, data=data) - response.raise_for_status() - await send_message(ctx, response.text.strip()) - except requests.RequestException as e: - logger.error(f"Upload failed: {e}") - await send_message(ctx, "Upload failed.") +# Helper: Improve sentence coherence (simple capitalization fix) def improve_sentence_coherence(sentence: str) -> str: + # Capitalizes "i" to "I" in the sentence + sentence = sentence.replace(" i ", " I ") return sentence # Start the bot -bot.run(TOKEN) \ No newline at end of file +if __name__ == "__main__": + bot.run(os.environ.get("DISCORDBOTTOKEN", "")) \ No newline at end of file diff --git a/modules/permission.py b/modules/permission.py new file mode 100644 index 0000000..3d87d07 --- /dev/null +++ b/modules/permission.py @@ -0,0 +1,37 @@ +from functools import wraps +import discord + +import discord.ext +import discord.ext.commands + +from modules.settings import Settings as SettingsManager +import logging + +logger = logging.getLogger("goober") + +settings_manager = SettingsManager() +settings = settings_manager.settings + + +class PermissionError(Exception): + pass + + +def requires_admin(): + async def wrapper(ctx: discord.ext.commands.Context): + if ctx.author.id not in settings["bot"]["owner_ids"]: + await ctx.send( + "You don't have the necessary permissions to run this command!" + ) + return False + + command = ctx.command + if not command: + logger.info(f"Unknown command ran {ctx.message}") + else: + logger.info( + f'Command {settings["bot"]["prefix"]}{command.name} @{ctx.author.name}' + ) + return True + + return discord.ext.commands.check(wrapper) \ No newline at end of file diff --git a/modules/sentenceprocessing.py b/modules/sentenceprocessing.py index 95a703b..f7a933a 100644 --- a/modules/sentenceprocessing.py +++ b/modules/sentenceprocessing.py @@ -37,29 +37,27 @@ def is_positive(sentence): return sentiment_score > 0.6 # had to raise the bar because it kept saying "death to jews" was fine and it kept reacting to them async def send_message(ctx, message=None, embed=None, file=None, edit=False, message_reference=None): - if edit and message_reference: - try: + try: + if edit and message_reference: await message_reference.edit(content=message, embed=embed) - except Exception as e: - await ctx.send(f"{RED}{(_('edit_fail'))} {e}{RESET}") - else: + return message_reference + + send_kwargs = {} + if message: + send_kwargs['content'] = message + if embed: + send_kwargs['embed'] = embed + if file: + send_kwargs['file'] = file + if hasattr(ctx, "respond"): - sent_message = None - if embed: - sent_message = await ctx.respond(embed=embed, ephemeral=False) - elif message: - sent_message = await ctx.respond(message, ephemeral=False) - if file: - sent_message = await ctx.respond(file=file, ephemeral=False) + return await ctx.respond(**send_kwargs, ephemeral=False) else: - sent_message = None - if embed: - sent_message = await ctx.send(embed=embed) - elif message: - sent_message = await ctx.send(message) - if file: - sent_message = await ctx.send(file=file) - return sent_message + return await ctx.send(**send_kwargs) + + except Exception as e: + await ctx.send(f"{RED}{(_('edit_fail'))} {e}{RESET}") + def preprocess_message(message): message = message diff --git a/modules/settings.py b/modules/settings.py new file mode 100644 index 0000000..1dfc377 --- /dev/null +++ b/modules/settings.py @@ -0,0 +1,58 @@ +import json +import os +from typing import List, TypedDict +import copy + +class MiscBotOptions(TypedDict): + ping_line: str + active_song: str + positive_gifs: List[str] + block_profanity: bool + +class BotSettings(TypedDict): + prefix: str + owner_ids: List[int] + blacklisted_users: List[int] + user_training: bool + allow_show_mem_command: bool + react_to_messages: bool + misc: MiscBotOptions + enabled_cogs: List[str] + active_memory: str + +class SettingsType(TypedDict): + bot: BotSettings + locale: str + name: str + auto_update: bool + disable_checks: bool + splash_text_loc: str + +class Settings: + def __init__(self) -> None: + self.path = os.path.join(".", "settings", "settings.json") + if not os.path.exists(self.path): + raise FileNotFoundError("settings.json file does not exist!") + + with open(self.path, "r") as f: + self._kv_store = json.load(f) + + self.settings: SettingsType = self._kv_store # type: ignore + self.original_settings = copy.deepcopy(self.settings) + + def get_locale(self) -> str: + # Return locale or None if missing + return self.settings.get("locale", None) + + def commit(self) -> None: + with open(self.path, "w") as f: + json.dump(self.settings, f, indent=4) + self.original_settings = copy.deepcopy(self.settings) + + def discard(self) -> None: + self.settings = copy.deepcopy(self.original_settings) + +# Usage +instance = Settings() +locale = instance.get_locale() +print("Locale:", locale) diff --git a/modules/volta/main.py b/modules/volta/main.py index 4b62144..ec1edad 100644 --- a/modules/volta/main.py +++ b/modules/volta/main.py @@ -50,12 +50,50 @@ def find_dotenv(start_path: pathlib.Path) -> pathlib.Path | None: current = current.parent return None -env_path = find_dotenv(pathlib.Path(__file__).parent) -if env_path: - load_dotenv(dotenv_path=env_path) - print(f"[VOLTA] {GREEN}Loaded .env from {env_path}{RESET}") +def load_settings_json() -> dict | None: + start_path = working_dir.resolve() + current = start_path.resolve() + while current != current.parent: + candidate = current / "settings.json" + if candidate.exists(): + try: + with open(candidate, "r", encoding="utf-8") as f: + data = json.load(f) + print(f"[VOLTA] {GREEN}Loaded settings.json locale '{data.get('locale')}' from {candidate}{RESET}") + return data + except Exception as e: + print(f"[VOLTA] {RED}Failed to load settings.json at {candidate}: {e}{RESET}") + return None + current = current.parent + + for root, dirs, files in os.walk(start_path): + if "settings.json" in files: + candidate = pathlib.Path(root) / "settings.json" + try: + with open(candidate, "r", encoding="utf-8") as f: + data = json.load(f) + print(f"[VOLTA] {GREEN}Loaded settings.json locale '{data.get('locale')}' from {candidate}{RESET}") + return data + except Exception as e: + print(f"[VOLTA] {RED}Failed to load settings.json at {candidate}: {e}{RESET}") + return None + + print(f"[VOLTA] {YELLOW}No settings.json found scanning up or down from {start_path}{RESET}") + return None + +settings = load_settings_json() +if settings and "locale" in settings: + LOCALE = settings["locale"] else: - print(f"[VOLTA] {YELLOW}No .env file found from {__file__} upwards.{RESET}") + env_path = find_dotenv(pathlib.Path(__file__).parent) + if env_path: + load_dotenv(dotenv_path=env_path) + print(f"[VOLTA] {GREEN}Loaded .env from {env_path}{RESET}") + else: + print(f"[VOLTA] {YELLOW}No .env file found from {__file__} upwards.{RESET}") + + LOCALE = os.getenv("LOCALE") or None + locales_dirs.extend(find_locales_dirs(module_dir)) if working_dir != module_dir: diff --git a/settings/settingsexample.json b/settings/settingsexample.json new file mode 100644 index 0000000..dc89081 --- /dev/null +++ b/settings/settingsexample.json @@ -0,0 +1,26 @@ +{ + "bot": { + "prefix": "g.", + "owner_ids": [ + + ], + "blacklisted_users": [], + "user_training": true, + "allow_show_mem_command": true, + "react_to_messages": true, + "misc": { + "ping_line": "The Beretta fires fast and won't make you feel any better!", + "active_song": "Basket Case - Green Day", + "positive_gifs": [], + "block_profanity": false + }, + "active_memory": "memory.json", + "enabled_cogs": [ + ] + }, + "locale": "fi", + "name": "goober", + "auto_update": true, + "disable_checks": false, + "splash_text_loc": "settings/splash.txt" +} \ No newline at end of file diff --git a/settings/splashexample.txt b/settings/splashexample.txt new file mode 100644 index 0000000..cc6b951 --- /dev/null +++ b/settings/splashexample.txt @@ -0,0 +1,11 @@ + SS\ + SS | + SSSSSS\ SSSSSS\ SSSSSS\ SSSSSSS\ SSSSSS\ SSSSSS\ +SS __SS\ SS __SS\ SS __SS\ SS __SS\ SS __SS\ SS __SS\ +SS / SS |SS / SS |SS / SS |SS | SS |SSSSSSSS |SS | \__| +SS | SS |SS | SS |SS | SS |SS | SS |SS ____|SS | +\SSSSSSS |\SSSSSS |\SSSSSS |SSSSSSS |\SSSSSSS\ SS | + \____SS | \______/ \______/ \_______/ \_______|\__| +SS\ SS | +\SSSSSS | + \______/ \ No newline at end of file