diff --git a/README.md b/README.md index a6ed482..bb820eb 100644 --- a/README.md +++ b/README.md @@ -1 +1,2 @@ -[moved here lol](https://forgejo.expect.ovh/gooberinc/goober) +Real repo: https://forgejo.expect.ovh/gooberinc/goober +This is just a fork that was made the upstream diff --git a/assets/cogs/cogmanager.py b/assets/cogs/cogmanager.py index 8c13e36..e21ba60 100644 --- a/assets/cogs/cogmanager.py +++ b/assets/cogs/cogmanager.py @@ -2,23 +2,23 @@ import discord from discord.ext import commands import discord.ext import discord.ext.commands +from modules.permission import requires_admin from modules.settings import Settings as SettingsManager + settings_manager = SettingsManager() settings = settings_manager.settings COG_PREFIX = "assets.cogs." + class CogManager(commands.Cog): def __init__(self, bot): self.bot = bot + @requires_admin() @commands.command() async def enable(self, ctx, cog_name: str): - if ctx.author.id not in settings["bot"]["owner_ids"]: - await ctx.send("You do not have permission to use this command.") - return - try: await self.bot.load_extension(COG_PREFIX + cog_name) await ctx.send(f"Loaded cog `{cog_name}` successfully.") @@ -28,17 +28,13 @@ class CogManager(commands.Cog): except Exception as e: await ctx.send(f"Error enabling cog `{cog_name}`: {e}") - + @requires_admin() @commands.command() async def load(self, ctx, cog_name: str | None = None): - if ctx.author.id not in settings["bot"]["owner_ids"]: - await ctx.send("You do not have permission to use this command.") - return - if cog_name is None: await ctx.send("Give cog_name") return - + if cog_name[:-3] not in settings["bot"]["enabled_cogs"]: await ctx.send("Please enable the cog first!") return @@ -51,11 +47,9 @@ class CogManager(commands.Cog): except Exception as e: await ctx.send(f"Error loading cog `{cog_name}`: {e}") + @requires_admin() @commands.command() async def unload(self, ctx, cog_name: str | None = None): - if ctx.author.id not in settings["bot"]["owner_ids"]: - await ctx.send("You do not have permission to use this command.") - return if cog_name is None: await ctx.send("Please provide the cog name to unload.") return @@ -65,11 +59,9 @@ class CogManager(commands.Cog): except Exception as e: await ctx.send(f"Error unloading cog `{cog_name}`: {e}") + @requires_admin() @commands.command() async def disable(self, ctx, cog_name: str | None = None): - if ctx.author.id not in settings["bot"]["owner_ids"]: - await ctx.send("You do not have permission to use this command.") - return if cog_name is None: await ctx.send("Please provide the cog name to disable.") return @@ -81,12 +73,9 @@ class CogManager(commands.Cog): except Exception as e: await ctx.send(f"Error unloading cog `{cog_name}`: {e}") - + @requires_admin() @commands.command() async def reload(self, ctx, cog_name: str | None = None): - if ctx.author.id not in settings["bot"]["owner_ids"]: - await ctx.send("You do not have permission to use this command.") - return if cog_name is None: await ctx.send("Please provide the cog name to reload.") return @@ -109,9 +98,13 @@ class CogManager(commands.Cog): await ctx.send("No cogs are currently loaded.") return - embed = discord.Embed(title="Loaded Cogs", description="Here is a list of all currently loaded cogs:") + embed = discord.Embed( + title="Loaded Cogs", + description="Here is a list of all currently loaded cogs:", + ) embed.add_field(name="Cogs", value="\n".join(cogs), inline=False) await ctx.send(embed=embed) + async def setup(bot): await bot.add_cog(CogManager(bot)) diff --git a/assets/cogs/eightball.py b/assets/cogs/eightball.py index 9f96901..39eb42c 100644 --- a/assets/cogs/eightball.py +++ b/assets/cogs/eightball.py @@ -2,36 +2,40 @@ import random import discord from discord.ext import commands + class eightball(commands.Cog): def __init__(self, bot): self.bot = bot @commands.command() async def eightball(self, ctx): - answer = random.choice([ - "It is certain.", - "It is decidedly so.", - "Without a doubt.", - "Yes definitely.", - "You may rely on it.", - "As I see it, yes.", - "Most likely.", - "Outlook good.", - "Yes.", - "Signs point to yes.", - "Reply hazy, try again.", - "Ask again later.", - "Better not tell you now.", - "Cannot predict now.", - "Concentrate and ask again.", - "Don't count on it.", - "My reply is no.", - "My sources say no.", - "Outlook not so good.", - "Very doubtful." - ]) + answer = random.choice( + [ + "It is certain.", + "It is decidedly so.", + "Without a doubt.", + "Yes definitely.", + "You may rely on it.", + "As I see it, yes.", + "Most likely.", + "Outlook good.", + "Yes.", + "Signs point to yes.", + "Reply hazy, try again.", + "Ask again later.", + "Better not tell you now.", + "Cannot predict now.", + "Concentrate and ask again.", + "Don't count on it.", + "My reply is no.", + "My sources say no.", + "Outlook not so good.", + "Very doubtful.", + ] + ) await ctx.send(answer) + async def setup(bot): await bot.add_cog(eightball(bot)) diff --git a/assets/cogs/filesharing.py b/assets/cogs/filesharing.py index fe7803c..47f59a3 100644 --- a/assets/cogs/filesharing.py +++ b/assets/cogs/filesharing.py @@ -3,7 +3,9 @@ import discord.context_managers from discord.ext import commands import logging from typing import Literal, get_args, cast +from modules.permission import requires_admin from modules.settings import Settings as SettingsManager + settings_manager = SettingsManager() settings = settings_manager.settings @@ -12,6 +14,7 @@ logger = logging.getLogger("goober") AvailableModes = Literal["r", "s"] + class FileSync(commands.Cog): def __init__(self, bot): self.bot: discord.Client = bot @@ -19,24 +22,21 @@ class FileSync(commands.Cog): self.peer_id = None self.awaiting_file = False + @requires_admin() @commands.command() async def syncfile(self, ctx: commands.Context, mode: str, peer: discord.User): if self.mode not in get_args(AvailableModes): await ctx.send("Invalid mode, use 's' or 'r'.") return - + self.mode = cast(AvailableModes, mode.lower()) self.peer_id = peer.id - if ctx.author.id not in settings["bot"]["owner_ids"]: - await ctx.send("You don't have permission to execute this command.") - return - if self.mode == "s": await ctx.send(f"<@{self.peer_id}> FILE_TRANSFER_REQUEST") await ctx.send(file=discord.File("memory.json")) await ctx.send("File sent in this channel.") - + elif self.mode == "r": await ctx.send("Waiting for incoming file...") self.awaiting_file = True @@ -53,12 +53,11 @@ class FileSync(commands.Cog): logger.info("Ping received. Awaiting file...") if not message.attachments: return - for attachment in message.attachments: if not attachment.filename.endswith(".json"): continue - + filename = "received_memory.json" with open(filename, "wb") as f: await attachment.save(f) @@ -67,5 +66,6 @@ class FileSync(commands.Cog): await message.channel.send("File received and saved.") self.awaiting_file = False + async def setup(bot): await bot.add_cog(FileSync(bot)) diff --git a/assets/cogs/fuckup.py b/assets/cogs/fuckup.py index 0f79066..89a3867 100644 --- a/assets/cogs/fuckup.py +++ b/assets/cogs/fuckup.py @@ -5,6 +5,7 @@ from PIL import Image, ImageEnhance, ImageFilter, ImageOps, ImageChops, ImageCol import os, random, shutil, tempfile import modules.keys as k + async def deepfryimage(path): with Image.open(path).convert("RGB") as im: # make it burn @@ -44,14 +45,17 @@ class whami(commands.Cog): def __init__(self, bot): self.bot = bot - @commands.command() async def fuckup(self, ctx): assets_folder = "assets/images" temp_input = None def get_random_asset_image(): - files = [f for f in os.listdir(assets_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))] + files = [ + f + for f in os.listdir(assets_folder) + if f.lower().endswith((".png", ".jpg", ".jpeg", ".webp")) + ] if not files: return None return os.path.join(assets_folder, random.choice(files)) @@ -94,5 +98,6 @@ class whami(commands.Cog): if temp_input and os.path.exists(temp_input): os.remove(temp_input) + async def setup(bot): await bot.add_cog(whami(bot)) diff --git a/assets/cogs/lastfm.py b/assets/cogs/lastfm.py index 822305a..2ae3290 100644 --- a/assets/cogs/lastfm.py +++ b/assets/cogs/lastfm.py @@ -6,11 +6,12 @@ from dotenv import load_dotenv load_dotenv() -#stole most of this code from my old expect bot so dont be suprised if its poorly made +# stole most of this code from my old expect bot so dont be suprised if its poorly made LASTFM_API_KEY = os.getenv("LASTFM_API_KEY") LASTFM_USERNAME = os.getenv("LASTFM_USERNAME") + class LastFmCog(commands.Cog): def __init__(self, bot): self.bot = bot @@ -34,7 +35,11 @@ class LastFmCog(commands.Cog): self.current_track = track artist, song = track activity_name = f"{artist} - {song}" - await self.bot.change_presence(activity=discord.Activity(type=discord.ActivityType.listening, name=activity_name)) + await self.bot.change_presence( + activity=discord.Activity( + type=discord.ActivityType.listening, name=activity_name + ) + ) print(f"Updated song to {artist} - {song}") else: print("LastFM gave me the same track! not updating...") @@ -52,7 +57,11 @@ class LastFmCog(commands.Cog): self.current_track = track artist, song = track activity_name = f"{artist} - {song}" - await self.bot.change_presence(activity=discord.Activity(type=discord.ActivityType.listening, name=activity_name)) + await self.bot.change_presence( + activity=discord.Activity( + type=discord.ActivityType.listening, name=activity_name + ) + ) await ctx.send(f"Updated presence to: Listening to {activity_name}") async def fetch_current_track(self): @@ -71,12 +80,13 @@ class LastFmCog(commands.Cog): return None track = recenttracks[0] - if '@attr' in track and track['@attr'].get('nowplaying') == 'true': - artist = track.get('artist', {}).get('#text', 'Unknown Artist') - song = track.get('name', 'Unknown Song') + if "@attr" in track and track["@attr"].get("nowplaying") == "true": + artist = track.get("artist", {}).get("#text", "Unknown Artist") + song = track.get("name", "Unknown Song") return artist, song return None + async def setup(bot): if not LASTFM_API_KEY or not LASTFM_USERNAME: return diff --git a/assets/cogs/pulse.py b/assets/cogs/pulse.py index a41b902..f249693 100644 --- a/assets/cogs/pulse.py +++ b/assets/cogs/pulse.py @@ -2,10 +2,13 @@ from discord.ext import commands import discord from collections import defaultdict, Counter import datetime +from modules.permission import requires_admin from modules.settings import Settings as SettingsManager + settings_manager = SettingsManager() settings = settings_manager.settings + class StatsCog(commands.Cog): def __init__(self, bot): self.bot = bot @@ -32,38 +35,54 @@ class StatsCog(commands.Cog): async def on_command(self, ctx): self.command_usage[ctx.command.qualified_name] += 1 + @requires_admin() @commands.command() async def spyware(self, ctx): - if ctx.author.id not in settings["bot"]["owner_ids"]: - return uptime = datetime.datetime.utcnow() - self.start_time hours_elapsed = max((uptime.total_seconds() / 3600), 1) avg_per_hour = self.total_messages / hours_elapsed if self.messages_per_hour: - peak_hour, peak_count = max(self.messages_per_hour.items(), key=lambda x: x[1]) + peak_hour, peak_count = max( + self.messages_per_hour.items(), key=lambda x: x[1] + ) else: peak_hour, peak_count = "N/A", 0 top_users = self.user_message_counts.most_common(5) embed = discord.Embed(title="Community Stats", color=discord.Color.blue()) - embed.add_field(name="Uptime", value=str(uptime).split('.')[0], inline=False) - embed.add_field(name="Total Messages", value=str(self.total_messages), inline=True) - embed.add_field(name="Active Users", value=str(len(self.active_users)), inline=True) - embed.add_field(name="Avg Messages/Hour", value=f"{avg_per_hour:.2f}", inline=True) - embed.add_field(name="Peak Hour (UTC)", value=f"{peak_hour}: {peak_count} messages", inline=True) + embed.add_field(name="Uptime", value=str(uptime).split(".")[0], inline=False) + embed.add_field( + name="Total Messages", value=str(self.total_messages), inline=True + ) + embed.add_field( + name="Active Users", value=str(len(self.active_users)), inline=True + ) + embed.add_field( + name="Avg Messages/Hour", value=f"{avg_per_hour:.2f}", inline=True + ) + embed.add_field( + name="Peak Hour (UTC)", + value=f"{peak_hour}: {peak_count} messages", + inline=True, + ) - top_str = "\n".join( - f"<@{user_id}>: {count} messages" for user_id, count in top_users - ) or "No data" + top_str = ( + "\n".join(f"<@{user_id}>: {count} messages" for user_id, count in top_users) + or "No data" + ) embed.add_field(name="Top Chatters", value=top_str, inline=False) - cmd_str = "\n".join( - f"{cmd}: {count}" for cmd, count in self.command_usage.most_common(5) - ) or "No commands used yet" + cmd_str = ( + "\n".join( + f"{cmd}: {count}" for cmd, count in self.command_usage.most_common(5) + ) + or "No commands used yet" + ) embed.add_field(name="Top Commands", value=cmd_str, inline=False) await ctx.send(embed=embed) + async def setup(bot): await bot.add_cog(StatsCog(bot)) diff --git a/assets/cogs/slashcomandexample.py b/assets/cogs/slashcomandexample.py index 63b45f1..8ccdf27 100644 --- a/assets/cogs/slashcomandexample.py +++ b/assets/cogs/slashcomandexample.py @@ -2,6 +2,7 @@ import discord from discord.ext import commands from discord import app_commands + class Ping(commands.Cog): def __init__(self, bot): self.bot = bot @@ -12,11 +13,15 @@ class Ping(commands.Cog): exampleembed = discord.Embed( title="Pong!!", description="The Beretta fires fast and won't make you feel any better!", - color=discord.Color.blue() + color=discord.Color.blue(), + ) + exampleembed.set_footer( + text=f"Requested by {interaction.user.name}", + icon_url=interaction.user.avatar.url, ) - exampleembed.set_footer(text=f"Requested by {interaction.user.name}", icon_url=interaction.user.avatar.url) await interaction.followup.send(embed=exampleembed) + async def setup(bot): await bot.add_cog(Ping(bot)) diff --git a/assets/cogs/songchanger.py b/assets/cogs/songchanger.py index 36fde47..3ae29eb 100644 --- a/assets/cogs/songchanger.py +++ b/assets/cogs/songchanger.py @@ -3,6 +3,7 @@ from discord.ext import commands from modules.globalvars import RED, GREEN, RESET, LOCAL_VERSION_FILE import os + class songchange(commands.Cog): def __init__(self, bot): self.bot = bot @@ -19,15 +20,22 @@ class songchange(commands.Cog): @commands.command() async def changesong(self, ctx): if LOCAL_VERSION_FILE > "0.11.8": - await ctx.send(f"Goober is too old! you must have version 0.11.8 you have {local_version}") + await ctx.send( + f"Goober is too old! you must have version 0.11.8 you have {local_version}" + ) return await ctx.send("Check the terminal! (this does not persist across restarts)") song = input("\nEnter a song:\n") try: - await self.bot.change_presence(activity=discord.Activity(type=discord.ActivityType.listening, name=f"{song}")) + await self.bot.change_presence( + activity=discord.Activity( + type=discord.ActivityType.listening, name=f"{song}" + ) + ) print(f"{GREEN}Changed song to {song}{RESET}") except Exception as e: print(f"{RED}An error occurred while changing songs..: {str(e)}{RESET}") + async def setup(bot): await bot.add_cog(songchange(bot)) diff --git a/assets/cogs/tf.py b/assets/cogs/tf.py index 0fbee78..0b31a91 100644 --- a/assets/cogs/tf.py +++ b/assets/cogs/tf.py @@ -13,20 +13,22 @@ ready = True MODEL_MATCH_STRING = r"[0-9]{2}_[0-9]{2}_[0-9]{4}-[0-9]{2}_[0-9]{2}" try: - import tensorflow as tf + import tensorflow as tf import keras from keras.preprocessing.text import Tokenizer from keras.preprocessing.sequence import pad_sequences from keras.models import Sequential, load_model from keras.layers import Embedding, LSTM, Dense from keras.backend import clear_session - - if tf.config.list_physical_devices('GPU'): + + if tf.config.list_physical_devices("GPU"): print("Using GPU acceleration") - elif tf.config.list_physical_devices('Metal'): + elif tf.config.list_physical_devices("Metal"): print("Using Metal for macOS acceleration") except ImportError: - print("ERROR: Failed to import TensorFlow. Ensure you have the correct dependencies:") + print( + "ERROR: Failed to import TensorFlow. Ensure you have the correct dependencies:" + ) print("tensorflow>=2.15.0") print("For macOS (Apple Silicon): tensorflow-metal") ready = False @@ -38,24 +40,39 @@ class TFCallback(keras.callbacks.Callback): self.bot = bot self.message = message self.times = [time.time()] - + async def send_message(self, message: str, description: str, **kwargs): if "epoch" in kwargs: self.times.append(time.time()) avg_epoch_time = np.mean(np.diff(self.times)) description = f"ETA: {round(avg_epoch_time)}s" - self.embed.add_field(name=f" - {message}", value=description, inline=False) + self.embed.add_field( + name=f" - {message}", + value=description, + inline=False, + ) await self.message.edit(embed=self.embed) - + def on_train_end(self, logs=None): - self.bot.loop.create_task(self.send_message("Training stopped", "Training has been stopped.")) - + self.bot.loop.create_task( + self.send_message("Training stopped", "Training has been stopped.") + ) + def on_epoch_begin(self, epoch, logs=None): - self.bot.loop.create_task(self.send_message(f"Starting epoch {epoch}", "This might take a while", epoch=True)) - + self.bot.loop.create_task( + self.send_message( + f"Starting epoch {epoch}", "This might take a while", epoch=True + ) + ) + def on_epoch_end(self, epoch, logs=None): - self.bot.loop.create_task(self.send_message(f"Epoch {epoch} ended", f"Accuracy: {round(logs.get('accuracy', 0.0), 4)}")) - + self.bot.loop.create_task( + self.send_message( + f"Epoch {epoch} ended", + f"Accuracy: {round(logs.get('accuracy', 0.0), 4)}", + ) + ) + class Ai: def __init__(self): @@ -63,11 +80,11 @@ class Ai: if model_path: self.__load_model(model_path) self.is_loaded = model_path is not None - self.batch_size = 64 - + self.batch_size = 64 + def generate_model_name(self): - return time.strftime('%d_%m_%Y-%H_%M', time.localtime()) - + return time.strftime("%d_%m_%Y-%H_%M", time.localtime()) + def __load_model(self, model_path): clear_session() self.model = load_model(os.path.join(model_path, "model.h5")) @@ -81,7 +98,7 @@ class Ai: with open("memory.json", "r") as f: self.tokenizer.fit_on_texts(json.load(f)) self.is_loaded = True - + def reload_model(self): clear_session() model_path = settings.get("model_path") @@ -90,9 +107,11 @@ class Ai: self.is_loaded = True async def run_async(self, func, bot, *args, **kwargs): - return await bot.loop.run_in_executor(None, functools.partial(func, *args, **kwargs)) + return await bot.loop.run_in_executor( + None, functools.partial(func, *args, **kwargs) + ) + - class Learning(Ai): def create_model(self, memory, epochs=2): memory = memory[:2000] @@ -107,41 +126,58 @@ class Learning(Ai): maxlen = max(map(len, X)) X = pad_sequences(X, maxlen=maxlen, padding="pre") y = np.array(y) - - model = Sequential([ - Embedding(input_dim=VOCAB_SIZE, output_dim=128, input_length=maxlen), - LSTM(64), - Dense(VOCAB_SIZE, activation="softmax") - ]) - - model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) + + model = Sequential( + [ + Embedding(input_dim=VOCAB_SIZE, output_dim=128, input_length=maxlen), + LSTM(64), + Dense(VOCAB_SIZE, activation="softmax"), + ] + ) + + model.compile( + optimizer="adam", + loss="sparse_categorical_crossentropy", + metrics=["accuracy"], + ) history = model.fit(X, y, epochs=epochs, batch_size=64, callbacks=[tf_callback]) self.save_model(model, tokenizer, history) - + def save_model(self, model, tokenizer, history, name=None): name = name or self.generate_model_name() model_dir = os.path.join("models", name) os.makedirs(model_dir, exist_ok=True) - + with open(os.path.join(model_dir, "info.json"), "w") as f: json.dump(history.history, f) with open(os.path.join(model_dir, "tokenizer.pkl"), "wb") as f: pickle.dump(tokenizer, f) model.save(os.path.join(model_dir, "model.h5")) - + class Generation(Ai): def generate_sentence(self, word_amount, seed): if not self.is_loaded: return False for _ in range(word_amount): token_list = self.tokenizer.texts_to_sequences([seed])[0] - token_list = pad_sequences([token_list], maxlen=self.model.input_shape[1], padding="pre") - predicted_word_index = np.argmax(self.model.predict(token_list, verbose=0), axis=-1)[0] - output_word = next((w for w, i in self.tokenizer.word_index.items() if i == predicted_word_index), "") + token_list = pad_sequences( + [token_list], maxlen=self.model.input_shape[1], padding="pre" + ) + predicted_word_index = np.argmax( + self.model.predict(token_list, verbose=0), axis=-1 + )[0] + output_word = next( + ( + w + for w, i in self.tokenizer.word_index.items() + if i == predicted_word_index + ), + "", + ) seed += " " + output_word return seed - + VOCAB_SIZE = 100_000 settings = {} @@ -152,4 +188,4 @@ tf_callback = None async def setup(bot): - await bot.add_cog(Tf(bot)) \ No newline at end of file + await bot.add_cog(Tf(bot)) diff --git a/assets/cogs/webscraper.py b/assets/cogs/webscraper.py index 9547790..e344fea 100644 --- a/assets/cogs/webscraper.py +++ b/assets/cogs/webscraper.py @@ -5,10 +5,13 @@ from bs4 import BeautifulSoup import json import asyncio from urllib.parse import urljoin +from modules.permission import requires_admin from modules.settings import Settings as SettingsManager + settings_manager = SettingsManager() settings = settings_manager.settings + class WebScraper(commands.Cog): def __init__(self, bot): self.bot = bot @@ -25,7 +28,7 @@ class WebScraper(commands.Cog): def extract_sentences(self, text): """Extract sentences from text.""" - sentences = text.split('.') + sentences = text.split(".") return [sentence.strip() for sentence in sentences if sentence.strip()] def save_to_json(self, sentences): @@ -52,7 +55,6 @@ class WebScraper(commands.Cog): print("No data to undo.") return False - data = data[:-1] with open("memory.json", "w") as file: @@ -76,18 +78,14 @@ class WebScraper(commands.Cog): soup = BeautifulSoup(html, "html.parser") - for paragraph in soup.find_all('p'): + for paragraph in soup.find_all("p"): sentences = self.extract_sentences(paragraph.get_text()) self.save_to_json(sentences) - + @requires_admin() @commands.command() async def start_scrape(self, ctx, start_url: str): """Command to start the scraping process.""" - if ctx.author.id not in settings["bot"]["owner_ids"]: - await ctx.send("You do not have permission to use this command.") - return - if not start_url.startswith("http"): await ctx.send("Please provide a valid URL.") return @@ -99,18 +97,16 @@ class WebScraper(commands.Cog): await ctx.send("Scraping complete! Sentences saved to memory.json.") + @requires_admin() @commands.command() async def undo_scrape(self, ctx): """Command to undo the last scrape.""" - if ctx.author.id not in settings["bot"]["owner_ids"]: - await ctx.send("You do not have permission to use this command.") - return - success = self.undo_last_scrape() if success: await ctx.send("Last scrape undone successfully.") else: await ctx.send("No data to undo or an error occurred.") + async def setup(bot): await bot.add_cog(WebScraper(bot)) diff --git a/assets/cogs/webserver.py b/assets/cogs/webserver.py index 70d8296..d367d75 100644 --- a/assets/cogs/webserver.py +++ b/assets/cogs/webserver.py @@ -14,6 +14,7 @@ from modules.globalvars import VERSION_URL import sys import subprocess + class GooberWeb(commands.Cog): def __init__(self, bot): self.bot = bot @@ -24,18 +25,20 @@ class GooberWeb(commands.Cog): self.last_command_time = "Never" self.start_time = time.time() self.websockets = set() - - self.app.add_routes([ - web.get('/', self.handle_index), - web.get('/changesong', self.handle_changesong), - web.get('/stats', self.handle_stats), - web.get('/data', self.handle_json_data), - web.get('/ws', self.handle_websocket), - web.get('/styles.css', self.handle_css), - web.get('/settings', self.handle_settings), - web.post('/update_settings', self.handle_update_settings), - web.post('/restart_bot', self.handle_restart_bot), - ]) + + self.app.add_routes( + [ + web.get("/", self.handle_index), + web.get("/changesong", self.handle_changesong), + web.get("/stats", self.handle_stats), + web.get("/data", self.handle_json_data), + web.get("/ws", self.handle_websocket), + web.get("/styles.css", self.handle_css), + web.get("/settings", self.handle_settings), + web.post("/update_settings", self.handle_update_settings), + web.post("/restart_bot", self.handle_restart_bot), + ] + ) self.bot.loop.create_task(self.start_web_server()) self.update_clients.start() @@ -52,72 +55,82 @@ class GooberWeb(commands.Cog): async def get_blacklisted_users(self): blacklisted_ids = os.getenv("BLACKLISTED_USERS", "").split(",") blacklisted_users = [] - + for user_id in blacklisted_ids: if not user_id.strip(): continue - + try: user = await self.bot.fetch_user(int(user_id)) - blacklisted_users.append({ - "name": f"{user.name}", - "avatar_url": str(user.avatar.url) if user.avatar else str(user.default_avatar.url), - "id": user.id - }) + blacklisted_users.append( + { + "name": f"{user.name}", + "avatar_url": ( + str(user.avatar.url) + if user.avatar + else str(user.default_avatar.url) + ), + "id": user.id, + } + ) except discord.NotFound: - blacklisted_users.append({ - "name": f"Unknown User ({user_id})", - "avatar_url": "", - "id": user_id - }) + blacklisted_users.append( + { + "name": f"Unknown User ({user_id})", + "avatar_url": "", + "id": user_id, + } + ) except discord.HTTPException as e: print(f"Error fetching user {user_id}: {e}") continue - + return blacklisted_users - + async def get_enhanced_guild_info(self): guilds = sorted(self.bot.guilds, key=lambda g: g.member_count, reverse=True) guild_info = [] - + for guild in guilds: icon_url = str(guild.icon.url) if guild.icon else "" - guild_info.append({ - "name": guild.name, - "member_count": guild.member_count, - "icon_url": icon_url, - "id": guild.id - }) - + guild_info.append( + { + "name": guild.name, + "member_count": guild.member_count, + "icon_url": icon_url, + "id": guild.id, + } + ) + return guild_info - + async def start_web_server(self): self.runner = web.AppRunner(self.app) await self.runner.setup() - self.site = web.TCPSite(self.runner, '0.0.0.0', 8080) + self.site = web.TCPSite(self.runner, "0.0.0.0", 8080) await self.site.start() print("Goober web server started on port 8080") - + async def stop_web_server(self): if self.site is None or self.runner is None: return - + await self.site.stop() await self.runner.cleanup() print("Web server stopped") - + def cog_unload(self): self.update_clients.cancel() self.bot.loop.create_task(self.stop_web_server()) - + @tasks.loop(seconds=5) async def update_clients(self): if not self.websockets: return - + stats = await self.get_bot_stats() message = json.dumps(stats) - + for ws in set(self.websockets): try: await ws.send_str(message) @@ -126,62 +139,62 @@ class GooberWeb(commands.Cog): except Exception as e: print(f"Error sending to websocket: {e}") self.websockets.remove(ws) - + async def handle_websocket(self, request): ws = web.WebSocketResponse() await ws.prepare(request) self.websockets.add(ws) - + try: async for msg in ws: if msg.type == WSMsgType.ERROR: print(f"WebSocket error: {ws.exception()}") finally: self.websockets.remove(ws) - + return ws - + async def handle_css(self, request): - css_path = os.path.join(os.path.dirname(__file__), 'styles.css') + css_path = os.path.join(os.path.dirname(__file__), "styles.css") if os.path.exists(css_path): return web.FileResponse(css_path) return web.Response(text="CSS file not found", status=404) - + @commands.Cog.listener() async def on_message(self, message): if message.author.bot: return - + ctx = await self.bot.get_context(message) if ctx.valid and ctx.command: self._update_command_stats(ctx.command.name, ctx.author) - + @commands.Cog.listener() async def on_app_command_completion(self, interaction, command): self._update_command_stats(command.name, interaction.user) - + def _update_command_stats(self, command_name, user): self.last_command = f"{command_name} (by {user.name})" self.last_command_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") if self.websockets: asyncio.create_task(self.update_clients()) - + async def get_bot_stats(self): process = psutil.Process(os.getpid()) mem_info = process.memory_full_info() cpu_percent = psutil.cpu_percent() process_cpu = process.cpu_percent() - + memory_json_size = "N/A" if os.path.exists("memory.json"): memory_json_size = f"{os.path.getsize('memory.json') / 1024:.2f} KB" - + guild_info = await self.get_enhanced_guild_info() blacklisted_users = await self.get_blacklisted_users() - + uptime_seconds = int(time.time() - self.start_time) uptime_str = f"{uptime_seconds // 86400}d {(uptime_seconds % 86400) // 3600}h {(uptime_seconds % 3600) // 60}m {uptime_seconds % 60}s" - + return { "ram_usage": f"{mem_info.rss / 1024 / 1024:.2f} MB", "cpu_usage": f"{process_cpu}%", @@ -196,23 +209,29 @@ class GooberWeb(commands.Cog): "bot_uptime": uptime_str, "latency": f"{self.bot.latency * 1000:.2f} ms", "bot_name": self.bot.user.name, - "bot_avatar_url": str(self.bot.user.avatar.url) if self.bot.user.avatar else "", + "bot_avatar_url": ( + str(self.bot.user.avatar.url) if self.bot.user.avatar else "" + ), "authenticated": os.getenv("gooberauthenticated"), "lastmsg": os.getenv("gooberlatestgen"), "localversion": os.getenv("gooberlocal_version"), "latestversion": os.getenv("gooberlatest_version"), - "owner": os.getenv("ownerid") + "owner": os.getenv("ownerid"), } - + async def handle_update(self, request): if os.path.exists("goob/update.py"): return web.FileResponse("goob/update.py") return web.Response(text="Update file not found", status=404) async def handle_changesong(self, request): - song = request.query.get('song', '') + song = request.query.get("song", "") if song: - await self.bot.change_presence(activity=discord.Activity(type=discord.ActivityType.listening, name=song)) + await self.bot.change_presence( + activity=discord.Activity( + type=discord.ActivityType.listening, name=song + ) + ) return web.Response(text=f"Changed song to: {song}") return web.Response(text="Please provide a song parameter", status=400) @@ -224,36 +243,37 @@ class GooberWeb(commands.Cog): async def read_env_file(self): env_vars = {} try: - with open('.env', 'r') as f: + with open(".env", "r") as f: for line in f: line = line.strip() - if not line or line.startswith('#') or '=' not in line: + if not line or line.startswith("#") or "=" not in line: continue - key, value = line.split('=', 1) + key, value = line.split("=", 1) key = key.strip() - if key in ['splashtext', 'DISCORD_BOT_TOKEN']: + if key in ["splashtext", "DISCORD_BOT_TOKEN"]: continue - env_vars[key] = value.strip('"\'') + env_vars[key] = value.strip("\"'") except FileNotFoundError: print(".env file not found") return env_vars - async def handle_settings(self, request): env_vars = await self.read_env_file() - + # Get config.py variables config_vars = {} try: - with open('config.py', 'r') as f: + with open("config.py", "r") as f: for line in f: - if line.startswith('VERSION_URL'): - config_vars['VERSION_URL'] = line.split('=', 1)[1].strip().strip('"') + if line.startswith("VERSION_URL"): + config_vars["VERSION_URL"] = ( + line.split("=", 1)[1].strip().strip('"') + ) except FileNotFoundError: pass - + settings_html = """ @@ -275,7 +295,7 @@ class GooberWeb(commands.Cog):

Goober Settings

""" - + for key, value in env_vars.items(): settings_html += f"""
@@ -283,7 +303,7 @@ class GooberWeb(commands.Cog):
""" - + for key, value in config_vars.items(): settings_html += f"""
@@ -291,7 +311,7 @@ class GooberWeb(commands.Cog):
""" - + settings_html += """
@@ -302,15 +322,15 @@ class GooberWeb(commands.Cog): """ - - return web.Response(text=settings_html, content_type='text/html') - + + return web.Response(text=settings_html, content_type="text/html") + async def handle_update_settings(self, request): data = await request.post() env_text = "" try: - with open('.env', 'r') as f: + with open(".env", "r") as f: env_text = f.read() except FileNotFoundError: pass @@ -318,32 +338,42 @@ class GooberWeb(commands.Cog): def replace_match(match): key = match.group(1) value = match.group(2) - if key in ['splashtext', 'DISCORD_BOT_TOKEN']: + if key in ["splashtext", "DISCORD_BOT_TOKEN"]: return match.group(0) if key in data: new_value = data[key] if not (new_value.startswith('"') and new_value.endswith('"')): new_value = f'"{new_value}"' - return f'{key}={new_value}' + return f"{key}={new_value}" return match.group(0) - env_text = re.sub(r'^(\w+)=([\s\S]+?)(?=\n\w+=|\Z)', replace_match, env_text, flags=re.MULTILINE) + env_text = re.sub( + r"^(\w+)=([\s\S]+?)(?=\n\w+=|\Z)", + replace_match, + env_text, + flags=re.MULTILINE, + ) - with open('.env', 'w') as f: - f.write(env_text.strip() + '\n') + with open(".env", "w") as f: + f.write(env_text.strip() + "\n") - if 'VERSION_URL' in data: + if "VERSION_URL" in data: config_text = "" try: - with open('config.py', 'r') as f: + with open("config.py", "r") as f: config_text = f.read() except FileNotFoundError: pass - config_text = re.sub(r'^(VERSION_URL\s*=\s*").+?"', f'\\1{data["VERSION_URL"]}"', config_text, flags=re.MULTILINE) + config_text = re.sub( + r'^(VERSION_URL\s*=\s*").+?"', + f'\\1{data["VERSION_URL"]}"', + config_text, + flags=re.MULTILINE, + ) - with open('config.py', 'w') as f: - f.write(config_text.strip() + '\n') + with open("config.py", "w") as f: + f.write(config_text.strip() + "\n") return aiohttp.web.Response(text="Settings updated successfully!") @@ -351,8 +381,12 @@ class GooberWeb(commands.Cog): stats = await self.get_bot_stats() guild_list_html = "" - for guild in stats['guilds']: - icon_html = f'guild icon' if guild["icon_url"] else '
' + for guild in stats["guilds"]: + icon_html = ( + f'guild icon' + if guild["icon_url"] + else '
' + ) guild_list_html += f"""
{icon_html} @@ -363,8 +397,12 @@ class GooberWeb(commands.Cog):
""" blacklisted_users_html = "" - for user in stats['blacklisted_users']: - avatar_html = f'user avatar' if user["avatar_url"] else '
' + for user in stats["blacklisted_users"]: + avatar_html = ( + f'user avatar' + if user["avatar_url"] + else '
' + ) blacklisted_users_html += f"""
{avatar_html} @@ -375,11 +413,11 @@ class GooberWeb(commands.Cog):
""" - owner_id = stats.get('owner') + owner_id = stats.get("owner") owner = None owner_username = "Owner" owner_pfp = "" - + if owner_id: try: owner = await self.bot.fetch_user(int(owner_id)) @@ -388,7 +426,6 @@ class GooberWeb(commands.Cog): except: pass - html_content = f""" @@ -869,15 +906,16 @@ class GooberWeb(commands.Cog): """ - - return web.Response(text=html_content, content_type='text/html') - + + return web.Response(text=html_content, content_type="text/html") + async def handle_stats(self, request): return await self.handle_index(request) - + async def handle_json_data(self, request): stats = await self.get_bot_stats() return web.json_response(stats) + async def setup(bot): await bot.add_cog(GooberWeb(bot)) diff --git a/assets/cogs/whoami.py b/assets/cogs/whoami.py index a60633f..55379ec 100644 --- a/assets/cogs/whoami.py +++ b/assets/cogs/whoami.py @@ -1,6 +1,7 @@ import discord from discord.ext import commands + class whoami(commands.Cog): def __init__(self, bot): self.bot = bot @@ -13,12 +14,13 @@ class whoami(commands.Cog): embed = discord.Embed( title="User Information", description=f"Your User ID is: {user_id}\n" - f"Your username is: {username}\n" - f"Your nickname in this server is: <@{user_id}>", - color=discord.Color.blue() + f"Your username is: {username}\n" + f"Your nickname in this server is: <@{user_id}>", + color=discord.Color.blue(), ) await ctx.send(embed=embed) + async def setup(bot): await bot.add_cog(whoami(bot)) diff --git a/bot.py b/bot.py index f1a0a73..79e040e 100644 --- a/bot.py +++ b/bot.py @@ -7,12 +7,23 @@ import traceback import subprocess import tempfile import shutil -import uuid -import asyncio import sys -from typing import List, Dict, Literal, Set, Optional, Tuple, Any, TypedDict, Union, Callable, Coroutine, TypeVar, Type +from typing import ( + List, + Dict, + Literal, + Set, + Optional, + Tuple, + Any, + TypedDict, + Union, + Callable, + Coroutine, + TypeVar, + Type, +) import logging -from modules.globalvars import * from modules.prestartchecks import start_checks from modules.logger import GooberFormatter import modules.keys as k @@ -21,9 +32,18 @@ import logging from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler from modules.settings import Settings as SettingsManager +from modules.permission import requires_admin + def build_keys(): - key_compiler.build_result("en", "assets/locales", types=True, output_path="modules/keys.py", generate_comments=True) + key_compiler.build_result( + "en", + "assets/locales", + types=True, + output_path="modules/keys.py", + generate_comments=True, + ) + build_keys() @@ -52,29 +72,21 @@ with open(settings["splash_text_loc"], "r", encoding="UTF-8") as f: start_checks() -import requests import discord from discord.ext import commands from discord import app_commands -from discord import Colour, Embed, File, Interaction, Message -from discord.abc import Messageable +from discord import Colour, Message from better_profanity import profanity from discord.ext import commands from modules.markovmemory import * -from modules.version import * from modules.sentenceprocessing import * from modules.unhandledexception import handle_exception -from modules.image import gen_meme, gen_demotivator +from modules.image import gen_demotivator sys.excepthook = handle_exception -check_for_update() # Check for updates (from modules/version.py) -# Type aliases -T = TypeVar('T') -MessageContext = Union[commands.Context, discord.Interaction] -MessageReference = Union[Message, discord.WebhookMessage] class MessageMetadata(TypedDict): user_id: str @@ -86,6 +98,7 @@ class MessageMetadata(TypedDict): message: str timestamp: float + # Constants with type hints positive_gifs: List[str] = settings["bot"]["misc"]["positive_gifs"] currenthash: str = "" @@ -100,7 +113,9 @@ intents.message_content = True bot: commands.Bot = commands.Bot( command_prefix=settings["bot"]["prefix"], intents=intents, - allowed_mentions=discord.AllowedMentions(everyone=False, roles=False, users=False, replied_user=True) + allowed_mentions=discord.AllowedMentions( + everyone=False, roles=False, users=False, replied_user=True + ), ) # Load memory and Markov model for text generation @@ -114,9 +129,10 @@ if not markov_model: generated_sentences: Set[str] = set() used_words: Set[str] = set() + async def load_cogs_from_folder(bot: commands.Bot, folder_name="assets/cogs"): for filename in [file for file in os.listdir(folder_name) if file.endswith(".py")]: - cog_name:str = filename[:-3] + cog_name: str = filename[:-3] print(cog_name) if cog_name not in settings["bot"]["enabled_cogs"]: @@ -132,25 +148,17 @@ async def load_cogs_from_folder(bot: commands.Bot, folder_name="assets/cogs"): logger.error(f"{k.cog_fail()} {cog_name} {e}") traceback.print_exc() -async def send_alive_ping_periodically() -> None: - while True: - try: - requests.post(f"{VERSION_URL}/aliveping", json={"name": settings["name"]}) - except Exception as e: - logger.error(f"{k.error_sending_alive_ping(e)}{RESET} {e}") - await asyncio.sleep(60) # Event: Called when the bot is ready @bot.event async def on_ready() -> None: global launched global slash_commands_enabled - global NAME - + folder_name: str = "cogs" if launched: return - + await load_cogs_from_folder(bot) try: synced: List[discord.app_commands.AppCommand] = await bot.tree.sync() @@ -158,52 +166,64 @@ async def on_ready() -> None: slash_commands_enabled = True logger.info(k.started(settings["name"])) - bot.loop.create_task(send_alive_ping_periodically()) except discord.errors.Forbidden as perm_error: - logger.error(f"Permission error while syncing commands: {perm_error}") - logger.error("Make sure the bot has the 'applications.commands' scope and is invited with the correct permissions.") + logger.error(f"Permission error while syncing commands: {perm_error}") + logger.error( + "Make sure the bot has the 'applications.commands' scope and is invited with the correct permissions." + ) quit() except Exception as e: logger.error(f"{k.fail_commands_sync()} {e}") traceback.print_exc() quit() - + if not settings["bot"]["misc"]["active_song"]: - return - await bot.change_presence(activity=discord.Activity(type=discord.ActivityType.listening, name=settings["bot"]["misc"]["active_song"])) + return + await bot.change_presence( + activity=discord.Activity( + type=discord.ActivityType.listening, + name=settings["bot"]["misc"]["active_song"], + ) + ) launched = True + @bot.event async def on_command_error(ctx: commands.Context, error: commands.CommandError) -> None: from modules.unhandledexception import handle_exception - + if isinstance(error, commands.CommandInvokeError): original: Exception = error.original handle_exception( - type(original), original, original.__traceback__, - context=f"Command: {ctx.command} | User: {ctx.author}" + type(original), + original, + original.__traceback__, + context=f"Command: {ctx.command} | User: {ctx.author}", ) else: handle_exception( - type(error), error, error.__traceback__, - context=f"Command: {ctx.command} | User: {ctx.author}" + type(error), + error, + error.__traceback__, + context=f"Command: {ctx.command} | User: {ctx.author}", ) + # Command: Retrain the Markov model from memory +@requires_admin() @bot.hybrid_command(description=f"{k.command_desc_retrain()}") async def retrain(ctx: commands.Context) -> None: global markov_model - if ctx.author.id not in settings["bot"]["owner_ids"]: - return - - message_ref: discord.Message | None = await send_message(ctx, f"{k.command_markov_retrain()}") + message_ref: discord.Message | None = await send_message( + ctx, f"{k.command_markov_retrain()}" + ) if message_ref is None: logger.error("Failed to send message!") return try: - with open(settings["bot"]["active_memory"], 'r') as f: + with open(settings["bot"]["active_memory"], "r") as f: memory: List[str] = json.load(f) except FileNotFoundError: await send_message(ctx, f"{k.command_markov_memory_not_found()}") @@ -214,7 +234,9 @@ async def retrain(ctx: commands.Context) -> None: data_size: int = len(memory) - processing_message_ref: discord.Message | None = await send_message(ctx, f"{k.command_markov_retraining(data_size)}") + processing_message_ref: discord.Message | None = await send_message( + ctx, f"{k.command_markov_retraining(data_size)}" + ) if processing_message_ref is None: logger.error("Couldnt find message processing message!") @@ -225,7 +247,13 @@ async def retrain(ctx: commands.Context) -> None: logger.debug(f"Completed retraining in {round(time.time() - start_time,3)}s") - await send_message(ctx, f"{k.command_markov_retrain_successful(data_size)}", edit=True, message_reference=processing_message_ref) + await send_message( + ctx, + f"{k.command_markov_retrain_successful(data_size)}", + edit=True, + message_reference=processing_message_ref, + ) + # Command: Generate a sentence using the Markov model @bot.hybrid_command(description=f"{k.command_desc_talk()}") @@ -250,16 +278,16 @@ async def talk(ctx: commands.Context, sentence_size: int = 5) -> None: break if response: - cleaned_response: str = re.sub(r'[^\w\s]', '', response).lower() - coherent_response: str = rephrase_for_coherence(cleaned_response) - if random.random() < 0.9 and is_positive(coherent_response): + cleaned_response: str = re.sub(r"[^\w\s]", "", response).lower() + coherent_response: str = rephrase_for_coherence(cleaned_response) + if random.random() < 0.9 and is_positive(coherent_response): gif_url: str = random.choice(positive_gifs) combined_message: str = f"{coherent_response}\n[jif]({gif_url})" else: combined_message: str = coherent_response logger.info(combined_message) - os.environ['gooberlatestgen'] = combined_message + os.environ["gooberlatestgen"] = combined_message await send_message(ctx, combined_message) else: await send_message(ctx, f"{k.command_talk_generation_fail()}") @@ -272,7 +300,11 @@ async def demotivator(ctx: commands.Context) -> None: temp_input: str | None = None def get_random_asset_image() -> Optional[str]: - files: List[str] = [f for f in os.listdir(assets_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))] + files: List[str] = [ + f + for f in os.listdir(assets_folder) + if f.lower().endswith((".png", ".jpg", ".jpeg", ".webp")) + ] if not files: return None return os.path.join(assets_folder, random.choice(files)) @@ -315,79 +347,111 @@ async def demotivator(ctx: commands.Context) -> None: if temp_input and os.path.exists(temp_input): os.remove(temp_input) -bot.remove_command('help') + +bot.remove_command("help") + + # Command: Show help information @bot.hybrid_command(description=f"{k.command_desc_help()}") async def help(ctx: commands.Context) -> None: embed: discord.Embed = discord.Embed( title=f"{k.command_help_embed_title()}", description=f"{k.command_help_embed_desc()}", - color=Colour(0x000000) + color=Colour(0x000000), ) command_categories: Dict[str, List[str]] = { - f"{k.command_help_categories_general()}": ["mem", "talk", "about", "ping", "impact", "demotivator", "help"], - f"{k.command_help_categories_admin()}": ["stats", "retrain", "setlanguage"] + f"{k.command_help_categories_general()}": [ + "mem", + "talk", + "about", + "ping", + "impact", + "demotivator", + "help", + ], + f"{k.command_help_categories_admin()}": ["stats", "retrain", "setlanguage"], } custom_commands: List[str] = [] for cog_name, cog in bot.cogs.items(): for command in cog.get_commands(): - if command.name not in command_categories[f"{k.command_help_categories_general()}"] and command.name not in command_categories[f"{k.command_help_categories_admin()}"]: + if ( + command.name + not in command_categories[f"{k.command_help_categories_general()}"] + and command.name + not in command_categories[f"{k.command_help_categories_admin()}"] + ): custom_commands.append(command.name) if custom_commands: - embed.add_field(name=f"{k.command_help_categories_custom()}", value="\n".join([f"{settings["bot"]["prefix"]}{command}" for command in custom_commands]), inline=False) + embed.add_field( + name=f"{k.command_help_categories_custom()}", + value="\n".join( + [f"{settings["bot"]["prefix"]}{command}" for command in custom_commands] + ), + inline=False, + ) for category, commands_list in command_categories.items(): - commands_in_category: str = "\n".join([f"{settings["bot"]["prefix"]}{command}" for command in commands_list]) + commands_in_category: str = "\n".join( + [f"{settings["bot"]["prefix"]}{command}" for command in commands_list] + ) embed.add_field(name=category, value=commands_in_category, inline=False) await send_message(ctx, embed=embed) + +@requires_admin() @bot.hybrid_command(description=f"{k.command_desc_setlang()}") @app_commands.describe(locale="Choose your language") async def setlanguage(ctx: commands.Context, locale: str) -> None: - if ctx.author.id not in settings["bot"]["owner_ids"]: - await ctx.send(":thumbsdown:") - return - await ctx.defer() k.change_language(locale) await ctx.send(":thumbsup:") + # Event: Called on every message @bot.event async def on_message(message: discord.Message) -> None: global memory, markov_model - EMOJIS = ["\U0001F604", "\U0001F44D", "\U0001F525", "\U0001F4AF", "\U0001F389", "\U0001F60E"] # originally was emojis but it would probably shit itself on systems without unicode so.... + EMOJIS = [ + "\U0001f604", + "\U0001f44d", + "\U0001f525", + "\U0001f4af", + "\U0001f389", + "\U0001f60e", + ] # originally was emojis but it would probably shit itself on systems without unicode so.... if message.author.bot: return if str(message.author.id) in settings["bot"]["blacklisted_users"]: return + commands = [ + settings["bot"]["prefix"] + command.name for command in bot.tree.get_commands() + ] - commands = [settings["bot"]["prefix"] + command.name for command in bot.tree.get_commands()] - if message.content.startswith(tuple(commands)): logger.info(f"{k.command_ran(message.author.name, message.content)}") await bot.process_commands(message) return - if profanity.contains_profanity(message.content) and settings["bot"]["misc"]["block_profanity"]: + if ( + profanity.contains_profanity(message.content) + and settings["bot"]["misc"]["block_profanity"] + ): return if message.content: if not settings["bot"]["user_training"]: return - + formatted_message: str = append_mentions_to_18digit_integer(message.content) cleaned_message: str = preprocess_message(formatted_message) if cleaned_message: memory.append(cleaned_message) - - message_metadata: MessageMetadata = { "user_id": str(message.author.id), @@ -397,7 +461,7 @@ async def on_message(message: discord.Message) -> None: "channel_id": str(message.channel.id), "channel_name": str(message.channel), "message": message.content, - "timestamp": time.time() + "timestamp": time.time(), } try: if isinstance(memory, list): @@ -409,7 +473,9 @@ async def on_message(message: discord.Message) -> None: save_memory(memory) - sentiment_score = is_positive(message.content) # doesnt work but im scared to change the logic now please ignore + sentiment_score = is_positive( + message.content + ) # doesnt work but im scared to change the logic now please ignore if sentiment_score > 0.8: if not settings["bot"]["react_to_messages"]: return @@ -421,105 +487,133 @@ async def on_message(message: discord.Message) -> None: await bot.process_commands(message) + # Event: Called on every interaction (slash command, etc.) @bot.event async def on_interaction(interaction: discord.Interaction) -> None: logger.info(f"{k.command_ran_s(interaction.user.name)} {interaction.user.name}") + # Global check: Block blacklisted users from running commands @bot.check async def block_blacklisted(ctx: commands.Context) -> bool: - if str(ctx.author.id) in settings["bot"]["blacklisted_users"]: - try: - if isinstance(ctx, discord.Interaction): - if not ctx.response.is_done(): - await ctx.response.send_message(k.blacklisted(), ephemeral=True) - else: - await ctx.followup.send(k.blacklisted(), ephemeral=True) + if ctx.author.id not in settings["bot"]["blacklisted_users"]: + return True + + try: + if isinstance(ctx, discord.Interaction): + if not ctx.response.is_done(): + await ctx.response.send_message(k.blacklisted(), ephemeral=True) else: - await ctx.send(k.blacklisted_user(), ephemeral=True) - except: - pass + await ctx.followup.send(k.blacklisted(), ephemeral=True) + else: + await ctx.send(k.blacklisted_user(), ephemeral=True) + except: return False + return True + # Command: Show bot latency @bot.hybrid_command(description=f"{k.command_desc_ping()}") async def ping(ctx: commands.Context) -> None: await ctx.defer() latency: int = round(bot.latency * 1000) - LOLembed: discord.Embed = discord.Embed( + embed: discord.Embed = discord.Embed( title="Pong!!", description=( settings["bot"]["misc"]["ping_line"], - f"`{k.command_ping_embed_desc()}: {latency}ms`\n" + f"`{k.command_ping_embed_desc()}: {latency}ms`\n", ), - color=Colour(0x000000) + color=Colour(0x000000), + ) + embed.set_footer( + text=f"{k.command_ping_footer()} {ctx.author.name}", + icon_url=ctx.author.display_avatar.url, ) - LOLembed.set_footer(text=f"{k.command_ping_footer()} {ctx.author.name}", icon_url=ctx.author.display_avatar.url) - await ctx.send(embed=LOLembed) + await ctx.send(embed=embed) + # Command: Show about information @bot.hybrid_command(description=f"{k.command_about_desc()}") async def about(ctx: commands.Context) -> None: - print("-----------------------------------\n\n") - latest_version: str = str(check_for_update()) - print("-----------------------------------") - embed: discord.Embed = discord.Embed(title=f"{k.command_about_embed_title()}", description="", color=Colour(0x000000)) + embed: discord.Embed = discord.Embed( + title=f"{k.command_about_embed_title()}", description="", color=Colour(0x000000) + ) - embed.add_field(name=k.command_about_embed_field1(), value=f"{settings["name"]}", inline=False) + embed.add_field( + name=k.command_about_embed_field1(), value=f"{settings["name"]}", inline=False + ) embed.add_field( name=k.command_about_embed_field2name(), value=k.command_about_embed_field2value( local_version=local_version, latest_version=latest_version ), - inline=False + inline=False, ) embed.add_field(name="Github", value=f"https://github.com/gooberinc/goober") - + await send_message(ctx, embed=embed) -# Command: Show bot statistics (admin only) + +@requires_admin() @bot.hybrid_command(description="stats") async def stats(ctx: commands.Context) -> None: - if ctx.author.id not in settings["bot"]["owner_ids"]: - return - print("-----------------------------------\n\n") - latest_version: str = str(check_for_update()) - print("-----------------------------------") - memory_file: str = 'memory.json' + memory_file: str = "memory.json" file_size: int = os.path.getsize(memory_file) - - with open(memory_file, 'r') as file: + + with open(memory_file, "r") as file: line_count: int = sum(1 for _ in file) - embed: discord.Embed = discord.Embed(title=f"{k.command_stats_embed_title()}", description=f"{k.command_stats_embed_desc()}", color=Colour(0x000000)) - embed.add_field(name=f"{k.command_stats_embed_field1name()}", value=f"{k.command_stats_embed_field1value(file_size=file_size, line_count=line_count)}", inline=False) - embed.add_field(name=f"{k.command_stats_embed_field2name()}", value=f"{k.command_stats_embed_field2value(local_version=local_version, latest_version=latest_version)}", inline=False) - embed.add_field(name=f"{k.command_stats_embed_field3name()}", value=f"{k.command_stats_embed_field3value( + embed: discord.Embed = discord.Embed( + title=f"{k.command_stats_embed_title()}", + description=f"{k.command_stats_embed_desc()}", + color=Colour(0x000000), + ) + embed.add_field( + name=f"{k.command_stats_embed_field1name()}", + value=f"{k.command_stats_embed_field1value(file_size=file_size, line_count=line_count)}", + inline=False, + ) + embed.add_field( + name=f"{k.command_stats_embed_field2name()}", + value=f"{k.command_stats_embed_field2value(local_version=local_version, latest_version=latest_version)}", + inline=False, + ) + embed.add_field( + name=f"{k.command_stats_embed_field3name()}", + value=f"{k.command_stats_embed_field3value( NAME=settings["name"], PREFIX=settings["bot"]["prefix"], ownerid=settings["bot"]["owner_ids"][0], PING_LINE=settings["bot"]["misc"]["ping_line"], showmemenabled=settings["bot"]["allow_show_mem_command"], USERTRAIN_ENABLED=settings["bot"]["user_training"], song=settings["bot"]["misc"]["active_song"], splashtext=splash_text - )}", inline=False) + )}", + inline=False, + ) await send_message(ctx, embed=embed) + # Command: Upload memory.json to litterbox.catbox.moe and return the link @bot.hybrid_command() async def mem(ctx: commands.Context) -> None: if not settings["bot"]["allow_show_mem_command"]: return - - command: str = """curl -F "reqtype=fileupload" -F "time=1h" -F "fileToUpload=@memory.json" https://litterbox.catbox.moe/resources/internals/api.php""" - memorylitter: subprocess.CompletedProcess = subprocess.run(command, shell=True, capture_output=True, text=True) + + command: str = ( + """curl -F "reqtype=fileupload" -F "time=1h" -F "fileToUpload=@memory.json" https://litterbox.catbox.moe/resources/internals/api.php""" + ) + memorylitter: subprocess.CompletedProcess = subprocess.run( + command, shell=True, capture_output=True, text=True + ) logger.debug(memorylitter) await send_message(ctx, memorylitter.stdout.strip()) + # Helper: Improve sentence coherence (simple capitalization fix) def improve_sentence_coherence(sentence: str) -> str: # Capitalizes "i" to "I" in the sentence @@ -535,7 +629,7 @@ class OnMyWatch: def run(self): event_handler = Handler() - self.observer.schedule(event_handler, self.watchDirectory, recursive = True) + self.observer.schedule(event_handler, self.watchDirectory, recursive=True) self.observer.start() try: while True: @@ -552,10 +646,9 @@ class Handler(FileSystemEventHandler): if event.is_directory: return None - elif event.event_type == 'modified': + elif event.event_type == "modified": build_keys() - # Start the bot -bot.run(os.environ.get("DISCORD_BOT_TOKEN", "")) \ No newline at end of file +bot.run(os.environ.get("DISCORD_BOT_TOKEN", "")) diff --git a/modules/globalvars.py b/modules/globalvars.py index 5748d6c..fc0c2a1 100644 --- a/modules/globalvars.py +++ b/modules/globalvars.py @@ -3,17 +3,23 @@ import platform from dotenv import load_dotenv import pathlib import subprocess + + def get_git_branch(): try: - branch = subprocess.check_output( - ["git", "rev-parse", "--abbrev-ref", "HEAD"], - stderr=subprocess.DEVNULL - ).decode('utf-8').strip() + branch = ( + subprocess.check_output( + ["git", "rev-parse", "--abbrev-ref", "HEAD"], stderr=subprocess.DEVNULL + ) + .decode("utf-8") + .strip() + ) return branch except subprocess.CalledProcessError: return None -env_path = pathlib.Path(__file__).parent.parent / '.env' + +env_path = pathlib.Path(__file__).parent.parent / ".env" load_dotenv(dotenv_path=env_path) ANSI = "\033[" @@ -25,9 +31,9 @@ DEBUG = f"{ANSI}1;30m" RESET = f"{ANSI}0m" VERSION_URL = "https://raw.githubusercontent.com/gooberinc/version/main" -UPDATE_URL = VERSION_URL+"/latest_version.json" +UPDATE_URL = VERSION_URL + "/latest_version.json" print(UPDATE_URL) -LOCAL_VERSION_FILE = "current_version.txt" +LOCAL_VERSION_FILE = "current_version.txt" # TOKEN = os.getenv("DISCORDBOTTOKEN", "0") # PREFIX = os.getenv("BOTPREFIX", "g.") @@ -52,9 +58,9 @@ LOCAL_VERSION_FILE = "current_version.txt" # IGNOREWARNING = False # is this either??? i don't think so? # song = os.getenv("song") arch = platform.machine() -slash_commands_enabled = True # 100% broken, its a newer enough version so its probably enabled by default.... fix this at somepoint or hard code it in goober central code +slash_commands_enabled = True # 100% broken, its a newer enough version so its probably enabled by default.... fix this at somepoint or hard code it in goober central code launched = False latest_version = "0.0.0" local_version = "2.3.3" -os.environ['gooberlocal_version'] = local_version +os.environ["gooberlocal_version"] = local_version beta = get_git_branch() == "dev" diff --git a/modules/image.py b/modules/image.py index 58180d5..e5a8e2a 100644 --- a/modules/image.py +++ b/modules/image.py @@ -6,37 +6,57 @@ import tempfile from typing import Optional, List from PIL import Image, ImageDraw, ImageFont, ImageOps from modules.markovmemory import load_markov_model -from modules.sentenceprocessing import improve_sentence_coherence, rephrase_for_coherence +from modules.sentenceprocessing import ( + improve_sentence_coherence, + rephrase_for_coherence, +) generated_sentences = set() + def load_font(size): return ImageFont.truetype("assets/fonts/Impact.ttf", size=size) + def load_tnr(size): return ImageFont.truetype("assets/fonts/TNR.ttf", size=size) + def draw_text_with_outline(draw, text, x, y, font): - outline_offsets = [(-2, -2), (-2, 2), (2, -2), (2, 2), (0, -2), (0, 2), (-2, 0), (2, 0)] + outline_offsets = [ + (-2, -2), + (-2, 2), + (2, -2), + (2, 2), + (0, -2), + (0, 2), + (-2, 0), + (2, 0), + ] for ox, oy in outline_offsets: draw.text((x + ox, y + oy), text, font=font, fill="black") draw.text((x, y), text, font=font, fill="white") + def fits_in_width(text, font, max_width, draw): bbox = draw.textbbox((0, 0), text, font=font) text_width = bbox[2] - bbox[0] return text_width <= max_width + def split_text_to_fit(text, font, max_width, draw): words = text.split() for i in range(len(words), 0, -1): top_text = " ".join(words[:i]) bottom_text = " ".join(words[i:]) - if fits_in_width(top_text, font, max_width, draw) and fits_in_width(bottom_text, font, max_width, draw): + if fits_in_width(top_text, font, max_width, draw) and fits_in_width( + bottom_text, font, max_width, draw + ): return top_text, bottom_text midpoint = len(words) // 2 return " ".join(words[:midpoint]), " ".join(words[midpoint:]) + async def gen_meme(input_image_path, sentence_size=5, max_attempts=10): markov_model = load_markov_model() if not markov_model or not os.path.isfile(input_image_path): @@ -54,11 +74,15 @@ async def gen_meme(input_image_path, sentence_size=5, max_attempts=10): response = None for _ in range(20): if sentence_size == 1: - candidate = markov_model.make_short_sentence(max_chars=100, tries=100) + candidate = markov_model.make_short_sentence( + max_chars=100, tries=100 + ) if candidate: candidate = candidate.split()[0] else: - candidate = markov_model.make_sentence(tries=100, max_words=sentence_size) + candidate = markov_model.make_sentence( + tries=100, max_words=sentence_size + ) if candidate and candidate not in generated_sentences: if sentence_size > 1: @@ -70,7 +94,7 @@ async def gen_meme(input_image_path, sentence_size=5, max_attempts=10): if not response: response = "NO TEXT GENERATED" - cleaned_response = re.sub(r'[^\w\s]', '', response).lower() + cleaned_response = re.sub(r"[^\w\s]", "", response).lower() coherent_response = rephrase_for_coherence(cleaned_response).upper() bbox = draw.textbbox((0, 0), coherent_response, font=font) @@ -79,11 +103,15 @@ async def gen_meme(input_image_path, sentence_size=5, max_attempts=10): max_text_height = height // 4 if text_width <= width and text_height_px <= max_text_height: - draw_text_with_outline(draw, coherent_response, (width - text_width) / 2, 0, font) + draw_text_with_outline( + draw, coherent_response, (width - text_width) / 2, 0, font + ) img.save(input_image_path) return input_image_path else: - top_text, bottom_text = split_text_to_fit(coherent_response, font, width, draw) + top_text, bottom_text = split_text_to_fit( + coherent_response, font, width, draw + ) top_bbox = draw.textbbox((0, 0), top_text, font=font) bottom_bbox = draw.textbbox((0, 0), bottom_text, font=font) @@ -92,9 +120,21 @@ async def gen_meme(input_image_path, sentence_size=5, max_attempts=10): bottom_height = bottom_bbox[3] - bottom_bbox[1] if top_height <= max_text_height and bottom_height <= max_text_height: - draw_text_with_outline(draw, top_text, (width - (top_bbox[2] - top_bbox[0])) / 2, 0, font) + draw_text_with_outline( + draw, + top_text, + (width - (top_bbox[2] - top_bbox[0])) / 2, + 0, + font, + ) y_bottom = height - bottom_height - int(height * 0.04) - draw_text_with_outline(draw, bottom_text, (width - (bottom_bbox[2] - bottom_bbox[0])) / 2, y_bottom, font) + draw_text_with_outline( + draw, + bottom_text, + (width - (bottom_bbox[2] - bottom_bbox[0])) / 2, + y_bottom, + font, + ) img.save(input_image_path) return input_image_path @@ -113,6 +153,7 @@ async def gen_meme(input_image_path, sentence_size=5, max_attempts=10): img.save(input_image_path) return input_image_path + async def gen_demotivator(input_image_path, max_attempts=5): markov_model = load_markov_model() if not markov_model or not os.path.isfile(input_image_path): @@ -124,7 +165,7 @@ async def gen_demotivator(input_image_path, max_attempts=5): size = max(img.width, img.height) frame_thick = int(size * 0.0054) inner_size = size - 2 * frame_thick - resized_img = img.resize((inner_size, inner_size), Image.LANCZOS) + resized_img = img.resize((inner_size, inner_size), Image.LANCZOS) framed = Image.new("RGB", (size, size), "white") framed.paste(resized_img, (frame_thick, frame_thick)) landscape_w = int(size * 1.5) @@ -145,8 +186,10 @@ async def gen_demotivator(input_image_path, max_attempts=5): title = t.upper() subtitle = s.capitalize() break - if not title: title = "DEMOTIVATOR" - if not subtitle: subtitle = "no text generated" + if not title: + title = "DEMOTIVATOR" + if not subtitle: + subtitle = "no text generated" title_sz = int(caption_h * 0.4) sub_sz = int(caption_h * 0.25) diff --git a/modules/key_compiler.py b/modules/key_compiler.py index 6abac8f..0fc69e5 100644 --- a/modules/key_compiler.py +++ b/modules/key_compiler.py @@ -30,6 +30,8 @@ import time NOTICE = """ # This file was automatically created from localization JSON files. # DO NOT EDIT THIS FILE DIRECTLY. If you want to edit a translation, please use the language's JSON file. + +#fmt: off """ @@ -42,10 +44,11 @@ logging.basicConfig( logger = logging.getLogger("kaannos") + class LanguageCollector: def __init__(self, language_dir: str) -> None: self.path: str = language_dir - self.languages: Dict[str, Dict[str,str]] = {} + self.languages: Dict[str, Dict[str, str]] = {} for file in os.listdir(self.path): if not file.endswith(".json") or len(file) > 7: @@ -55,13 +58,12 @@ class LanguageCollector: locale: str = file.split(".json")[0] logger.info(f"Discovered {file}") with open(os.path.join(self.path, file), "r", encoding="UTF-8") as f: - keys: Dict[str,str] = json.load(f) + keys: Dict[str, str] = json.load(f) self.languages[locale] = keys print(self.languages) self.find_missing_keys() - def find_missing_keys(self) -> None: primary_language_keys: Dict[str, str] = self.languages["en"] @@ -69,21 +71,24 @@ class LanguageCollector: for language in self.languages: if key not in self.languages[language]: logger.warning(f"Key {key} missing from {language}") - + for language in self.languages: for key in self.languages[language]: if key not in primary_language_keys: logger.warning(f"Leftover key {key} found from {language}") + class Script: def __init__(self) -> None: self.script: str = "" - def add_line(self, content, indent: int=0, newline: bool = True) -> None: + def add_line(self, content, indent: int = 0, newline: bool = True) -> None: self.script += f"{'\t' * indent}{content}{'\n' if newline else ''}" + def process_name(key: str) -> str: - return key.replace(" ", "_").replace(":","").lower() + return key.replace(" ", "_").replace(":", "").lower() + def find_args(string: str) -> List[str]: variable_open: bool = False @@ -97,7 +102,7 @@ def find_args(string: str) -> List[str]: variables.append(temp_content) temp_content = "" continue - + if char == "{": raise SyntaxError("Variable already open!") @@ -106,17 +111,17 @@ def find_args(string: str) -> List[str]: else: if char == "}": raise SyntaxError("Trying to close a nonexistant variable") - + if char == "{": variable_open = True - + return variables -def convert_args(inp: str, vars: List[str], mode: Literal["brackets", "none"] = "brackets") -> str: - replacements = { - ".": "_", - ",": "_" - } + +def convert_args( + inp: str, vars: List[str], mode: Literal["brackets", "none"] = "brackets" +) -> str: + replacements = {".": "_", ",": "_"} for var in vars: cleaned_var = var @@ -131,9 +136,15 @@ def convert_args(inp: str, vars: List[str], mode: Literal["brackets", "none"] = return inp - class GenerateScript: - def __init__(self, primary_lang:str, language_data: Dict[str, Dict[str,str]], use_typing: bool = True, output_path: str = "out.py", generate_comments: bool = True): + def __init__( + self, + primary_lang: str, + language_data: Dict[str, Dict[str, str]], + use_typing: bool = True, + output_path: str = "out.py", + generate_comments: bool = True, + ): self.data = language_data self.primary = primary_lang self.script = Script() @@ -142,31 +153,41 @@ class GenerateScript: self.generate_comments = generate_comments def create(self): - # I really don't like this implementation but also it works + # I really don't like this implementation but also it works self.script.add_line(NOTICE) if self.uses_typing: self.script.add_line("from typing import Literal, List") self.script.add_line(f"Language=Literal{list(self.data.keys())}") - self.script.add_line(f"languages: List[Language] = {list(self.data.keys())}") + self.script.add_line( + f"languages: List[Language] = {list(self.data.keys())}" + ) self.script.add_line(f"default_lang: Language | str='{self.primary}'") - self.script.add_line("def change_language(new_lang: Language | str) -> None: global default_lang; default_lang = new_lang") + self.script.add_line( + "def change_language(new_lang: Language | str) -> None: global default_lang; default_lang = new_lang" + ) else: self.script.add_line(f"languages = {list(self.data.keys())}") self.script.add_line(f"default_lang='{self.primary}'") - self.script.add_line("def change_language(new_lang): global default_lang; default_lang = new_lang") - + self.script.add_line( + "def change_language(new_lang): global default_lang; default_lang = new_lang" + ) self.primary_data = self.data[self.primary] for key in self.primary_data: args = find_args(self.primary_data[key]) - self.script.add_line(f"def {process_name(key)}({convert_args(','.join([*args, "lang:str|None=None" if self.uses_typing else "lang"]), args, "none")}):") + self.script.add_line( + f"def {process_name(key)}({convert_args(','.join([*args, "lang:str|None=None" if self.uses_typing else "lang"]), args, "none")}):" + ) if self.generate_comments: self.script.add_line('"""', 1) self.script.add_line("### Locales", 1) for language in self.data: - self.script.add_line(f'- {language.capitalize()}: **{self.data[language].get(key, self.primary_data[key])}**', 1) + self.script.add_line( + f"- {language.capitalize()}: **{self.data[language].get(key, self.primary_data[key])}**", + 1, + ) self.script.add_line('"""', 1) self.script.add_line("if not lang: lang=default_lang", 1) for language in self.data: @@ -174,18 +195,31 @@ class GenerateScript: for arg in args: formatted_map += f'"{convert_args(arg, args, "none")}": {convert_args(arg, args, "none")},' formatted_map = formatted_map[:-1] + "}" - self.script.add_line(f"""if lang == '{language}': return {convert_args(json.dumps( + self.script.add_line( + f"""if lang == '{language}': return {convert_args(json.dumps( self.data[language].get(key,self.primary_data[key]), ensure_ascii=False - ), args)}{f'.format_map({formatted_map})' if len(args) > 0 else ''}""", 1) + ), args)}{f'.format_map({formatted_map})' if len(args) > 0 else ''}""", + 1, + ) - self.script.add_line("else: raise ValueError(f'Invalid language {lang}')", 1) + self.script.add_line( + "else: raise ValueError(f'Invalid language {lang}')", 1 + ) with open(self.output, "w", encoding="UTF-8") as f: f.write(self.script.script) -def build_result(primary_lang: str, locale_dir: str, types: bool, output_path: str, generate_comments: bool = True): +def build_result( + primary_lang: str, + locale_dir: str, + types: bool, + output_path: str, + generate_comments: bool = True, +): start = time.time() lc = LanguageCollector(locale_dir) - GenerateScript(primary_lang, lc.languages, types, output_path, generate_comments).create() - logger.info(f"Done in {time.time() - start}s") \ No newline at end of file + GenerateScript( + primary_lang, lc.languages, types, output_path, generate_comments + ).create() + logger.info(f"Done in {time.time() - start}s") diff --git a/modules/keys.py b/modules/keys.py index a95d41c..3b585c3 100644 --- a/modules/keys.py +++ b/modules/keys.py @@ -2,6 +2,8 @@ # This file was automatically created from localization JSON files. # DO NOT EDIT THIS FILE DIRECTLY. If you want to edit a translation, please use the language's JSON file. +#fmt: off + from typing import Literal, List Language=Literal['en', 'es', 'fi', 'fr', 'it'] languages: List[Language] = ['en', 'es', 'fi', 'fr', 'it'] diff --git a/modules/logger.py b/modules/logger.py index 76f5f10..3b9d361 100644 --- a/modules/logger.py +++ b/modules/logger.py @@ -1,25 +1,28 @@ import logging from modules.globalvars import * + class GooberFormatter(logging.Formatter): - def __init__(self, colors: bool = True): # Disable colors for TXT output + def __init__(self, colors: bool = True): # Disable colors for TXT output self.colors = colors self._format = f"[ %(levelname)-8s ]: %(message)s {DEBUG} [%(asctime)s.%(msecs)03d] (%(filename)s:%(funcName)s) {RESET}" self.FORMATS = { logging.DEBUG: DEBUG + self._format + RESET, - logging.INFO: self._format.replace("%(levelname)-8s", f"{GREEN}%(levelname)-8s{RESET}"), + logging.INFO: self._format.replace( + "%(levelname)-8s", f"{GREEN}%(levelname)-8s{RESET}" + ), logging.WARNING: YELLOW + self._format + RESET, logging.ERROR: RED + self._format + RESET, - logging.CRITICAL: PURPLE + self._format + RESET + logging.CRITICAL: PURPLE + self._format + RESET, } def format(self, record: logging.LogRecord): if self.colors: - log_fmt = self.FORMATS.get(record.levelno) # Add colors + log_fmt = self.FORMATS.get(record.levelno) # Add colors else: - log_fmt = self._format # Just use the default format - + log_fmt = self._format # Just use the default format + formatter = logging.Formatter(log_fmt, datefmt="%m/%d/%y %H:%M:%S") return formatter.format(record) diff --git a/modules/markovmemory.py b/modules/markovmemory.py index 3221b9d..a90524e 100644 --- a/modules/markovmemory.py +++ b/modules/markovmemory.py @@ -6,11 +6,14 @@ from modules.globalvars import * import logging import modules.keys as k from modules.settings import Settings as SettingsManager + settings_manager = SettingsManager() settings = settings_manager.settings logger = logging.getLogger("goober") + + # Get file size and line count for a given file path def get_file_info(file_path): try: @@ -21,6 +24,7 @@ def get_file_info(file_path): except Exception as e: return {"error": str(e)} + # Load memory data from file, or use default dataset if not loaded yet def load_memory(): data = [] @@ -34,36 +38,42 @@ def load_memory(): return data + # Save memory data to MEMORY_FILE def save_memory(memory): with open(settings["bot"]["active_memory"], "w") as f: json.dump(memory, f, indent=4) + def train_markov_model(memory, additional_data=None): if not memory: return None filtered_memory = [line for line in memory if isinstance(line, str)] if additional_data: - filtered_memory.extend(line for line in additional_data if isinstance(line, str)) + filtered_memory.extend( + line for line in additional_data if isinstance(line, str) + ) if not filtered_memory: return None text = "\n".join(filtered_memory) model = markovify.NewlineText(text, state_size=2) return model + # Save the Markov model to a pickle file -def save_markov_model(model, filename='markov_model.pkl'): - with open(filename, 'wb') as f: +def save_markov_model(model, filename="markov_model.pkl"): + with open(filename, "wb") as f: pickle.dump(model, f) logger.info(f"Markov model saved to {filename}.") + # Load the Markov model from a pickle file -def load_markov_model(filename='markov_model.pkl'): +def load_markov_model(filename="markov_model.pkl"): try: - with open(filename, 'rb') as f: + with open(filename, "rb") as f: model = pickle.load(f) logger.info(f"{k.model_loaded()} {filename}.{RESET}") return model except FileNotFoundError: logger.error(f"{filename} {k.not_found()}{RESET}") - return None \ No newline at end of file + return None diff --git a/modules/permission.py b/modules/permission.py new file mode 100644 index 0000000..e6dd9f0 --- /dev/null +++ b/modules/permission.py @@ -0,0 +1,37 @@ +from functools import wraps +import discord + +import discord.ext +import discord.ext.commands + +from modules.settings import Settings as SettingsManager +import logging + +logger = logging.getLogger("goober") + +settings_manager = SettingsManager() +settings = settings_manager.settings + + +class PermissionError(Exception): + pass + + +def requires_admin(): + async def wrapper(ctx: discord.ext.commands.Context): + if ctx.author.id not in settings["bot"]["owner_ids"]: + await ctx.send( + "You don't have the necessary permissions to run this command!" + ) + return False + + command = ctx.command + if not command: + logger.info(f"Unknown command ran {ctx.message}") + else: + logger.info( + f'Command {settings["bot"]["prefix"]}{command.name} @{ctx.author.name}' + ) + return True + + return discord.ext.commands.check(wrapper) diff --git a/modules/prestartchecks.py b/modules/prestartchecks.py index 3948309..d100cb4 100644 --- a/modules/prestartchecks.py +++ b/modules/prestartchecks.py @@ -12,6 +12,7 @@ import importlib.metadata import logging import modules.keys as k from modules.settings import Settings as SettingsManager + settings_manager = SettingsManager() settings = settings_manager.settings @@ -27,68 +28,75 @@ except ImportError: psutilavaliable = False logger.error(k.missing_requests_psutil()) + def check_for_model(): if is_package("en_core_web_sm"): logger.info("Model is installed.") else: logger.info("Model is not installed.") - + def iscloned(): if os.path.exists(".git"): return True else: - logger.error(f"{k.not_cloned()}") + logger.error(f"{k.not_cloned()}") sys.exit(1) + def get_stdlib_modules(): - stdlib_path = pathlib.Path(sysconfig.get_paths()['stdlib']) + stdlib_path = pathlib.Path(sysconfig.get_paths()["stdlib"]) modules = set() - if hasattr(sys, 'builtin_module_names'): + if hasattr(sys, "builtin_module_names"): modules.update(sys.builtin_module_names) - for file in stdlib_path.glob('*.py'): - if file.stem != '__init__': + for file in stdlib_path.glob("*.py"): + if file.stem != "__init__": modules.add(file.stem) for folder in stdlib_path.iterdir(): - if folder.is_dir() and (folder / '__init__.py').exists(): + if folder.is_dir() and (folder / "__init__.py").exists(): modules.add(folder.name) - for file in stdlib_path.glob('*.*'): - if file.suffix in ('.so', '.pyd'): + for file in stdlib_path.glob("*.*"): + if file.suffix in (".so", ".pyd"): modules.add(file.stem) return modules + def check_requirements(): STD_LIB_MODULES = get_stdlib_modules() PACKAGE_ALIASES = { "discord": "discord.py", "better_profanity": "better-profanity", "dotenv": "python-dotenv", - "pil": "pillow" + "pil": "pillow", } parent_dir = os.path.dirname(os.path.abspath(__file__)) - requirements_path = os.path.abspath(os.path.join(parent_dir, '..', 'requirements.txt')) + requirements_path = os.path.abspath( + os.path.join(parent_dir, "..", "requirements.txt") + ) if not os.path.exists(requirements_path): logger.error(f"{k.requirements_not_found(path=requirements_path)}") return - with open(requirements_path, 'r') as f: + with open(requirements_path, "r") as f: lines = f.readlines() requirements = set() for line in lines: line = line.strip() - if line and not line.startswith('#'): - base_pkg = line.split('==')[0].lower() + if line and not line.startswith("#"): + base_pkg = line.split("==")[0].lower() aliased_pkg = PACKAGE_ALIASES.get(base_pkg, base_pkg) requirements.add(aliased_pkg) - installed_packages = {dist.metadata['Name'].lower() for dist in importlib.metadata.distributions()} + installed_packages = { + dist.metadata["Name"].lower() for dist in importlib.metadata.distributions() + } missing = [] for req in sorted(requirements): - if req in STD_LIB_MODULES or req == 'modules': + if req in STD_LIB_MODULES or req == "modules": print(k.std_lib_local_skipped(package=req)) continue @@ -108,6 +116,7 @@ def check_requirements(): else: logger.info(k.all_requirements_satisfied()) + def check_latency(): host = "1.1.1.1" system = platform.system() @@ -126,10 +135,7 @@ def check_latency(): try: result = subprocess.run( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True + cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) if result.returncode == 0: @@ -147,25 +153,37 @@ def check_latency(): except Exception as e: logger.error(k.error_running_ping(error=e)) + def check_memory(): if psutilavaliable == False: return try: memory_info = psutil.virtual_memory() # type: ignore - total_memory = memory_info.total / (1024 ** 3) - used_memory = memory_info.used / (1024 ** 3) - free_memory = memory_info.available / (1024 ** 3) + total_memory = memory_info.total / (1024**3) + used_memory = memory_info.used / (1024**3) + free_memory = memory_info.available / (1024**3) - logger.info(k.memory_usage(used=used_memory, total=total_memory, percent=(used_memory / total_memory) * 100)) + logger.info( + k.memory_usage( + used=used_memory, + total=total_memory, + percent=(used_memory / total_memory) * 100, + ) + ) if used_memory > total_memory * 0.9: - print(f"{YELLOW}{k.memory_above_90(percent=(used_memory / total_memory) * 100)}{RESET}") + print( + f"{YELLOW}{k.memory_above_90(percent=(used_memory / total_memory) * 100)}{RESET}" + ) logger.info(k.total_memory(total=total_memory)) logger.info(k.used_memory(used=used_memory)) if free_memory < 1: logger.warning(f"{k.low_free_memory(free=free_memory)}") sys.exit(1) except ImportError: - logger.error(k.psutil_not_installed()) # todo: translate this into italian and put it in the translations "psutil is not installed. Memory check skipped." + logger.error( + k.psutil_not_installed() + ) # todo: translate this into italian and put it in the translations "psutil is not installed. Memory check skipped." + def check_cpu(): if psutilavaliable == False: @@ -180,13 +198,18 @@ def check_cpu(): logger.error(k.really_high_cpu()) sys.exit(1) + def check_memoryjson(): try: - logger.info(k.memory_file(size=os.path.getsize(settings["bot"]["active_memory"]) / (1024 ** 2))) + logger.info( + k.memory_file( + size=os.path.getsize(settings["bot"]["active_memory"]) / (1024**2) + ) + ) if os.path.getsize(settings["bot"]["active_memory"]) > 1_073_741_824: logger.warning(f"{k.memory_file_large()}") try: - with open(settings["bot"]["active_memory"], 'r', encoding='utf-8') as f: + with open(settings["bot"]["active_memory"], "r", encoding="utf-8") as f: json.load(f) except json.JSONDecodeError as e: logger.error(f"{k.memory_file_corrupted(error=e)}") @@ -199,9 +222,11 @@ def check_memoryjson(): except FileNotFoundError: logger.info(f"{k.memory_file_not_found()}") + def presskey2skip(timeout): - if os.name == 'nt': + if os.name == "nt": import msvcrt + start_time = time.time() while True: if msvcrt.kbhit(): @@ -230,12 +255,16 @@ def presskey2skip(timeout): time.sleep(0.1) finally: termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) + + beta = beta + + def start_checks(): if settings["disable_checks"]: logger.warning(f"{k.checks_disabled()}") return - + logger.info(k.running_prestart_checks()) check_for_model() iscloned() @@ -250,12 +279,14 @@ def start_checks(): logger.warning(f"{k.env_file_not_found()}") sys.exit(1) if beta == True: - logger.warning(f"this build isnt finished yet, some things might not work as expected") + logger.warning( + f"this build isnt finished yet, some things might not work as expected" + ) else: pass logger.info(k.continuing_in_seconds(seconds=5)) presskey2skip(timeout=5) - os.system('cls' if os.name == 'nt' else 'clear') + os.system("cls" if os.name == "nt" else "clear") - with open(settings ["splash_text_loc"], "r") as f: + with open(settings["splash_text_loc"], "r") as f: print("".join(f.readlines())) diff --git a/modules/sentenceprocessing.py b/modules/sentenceprocessing.py index 4955dc9..839c36a 100644 --- a/modules/sentenceprocessing.py +++ b/modules/sentenceprocessing.py @@ -9,6 +9,7 @@ import discord import modules.keys as k import logging + logger = logging.getLogger("goober") @@ -17,18 +18,20 @@ def check_resources(): nlp = spacy.load("en_core_web_sm") except OSError: logging.critical(k.spacy_model_not_found()) - spacy.cli.download("en_core_web_sm") # type: ignore + spacy.cli.download("en_core_web_sm") # type: ignore nlp = spacy.load("en_core_web_sm") if "spacytextblob" not in nlp.pipe_names: nlp.add_pipe("spacytextblob") logger.info(k.spacy_initialized()) + check_resources() nlp = spacy.load("en_core_web_sm") nlp.add_pipe("spacytextblob") Doc.set_extension("polarity", getter=lambda doc: doc._.blob.polarity) + def is_positive(sentence): doc = nlp(sentence) sentiment_score = doc._.polarity # from spacytextblob @@ -36,18 +39,22 @@ def is_positive(sentence): debug_message = f"{k.sentence_positivity()} {sentiment_score}{RESET}" logger.debug(debug_message) - return sentiment_score > 0.6 # had to raise the bar because it kept saying "death to jews" was fine and it kept reacting to them + return ( + sentiment_score > 0.6 + ) # had to raise the bar because it kept saying "death to jews" was fine and it kept reacting to them -async def send_message(ctx: discord.ext.commands.Context, - message: str | None = None, - embed: discord.Embed | None = None, - file: discord.File | None = None, - edit: bool = False, - message_reference: discord.Message | None = None - ) -> discord.Message | None: + +async def send_message( + ctx: discord.ext.commands.Context, + message: str | None = None, + embed: discord.Embed | None = None, + file: discord.File | None = None, + edit: bool = False, + message_reference: discord.Message | None = None, +) -> discord.Message | None: sent_message: discord.Message | None = None - + if edit and message_reference: try: await message_reference.edit(content=message, embed=embed) @@ -62,21 +69,25 @@ async def send_message(ctx: discord.ext.commands.Context, sent_message = await ctx.send(file=file, content=message) else: sent_message = await ctx.send(content=message) - + return sent_message + def append_mentions_to_18digit_integer(message): - pattern = r'\b\d{18}\b' + pattern = r"\b\d{18}\b" return re.sub(pattern, lambda match: "", message) + def preprocess_message(message): message = append_mentions_to_18digit_integer(message) doc = nlp(message) tokens = [token.text for token in doc if token.is_alpha or token.is_digit] return " ".join(tokens) + def improve_sentence_coherence(sentence): - return re.sub(r'\bi\b', 'I', sentence) + return re.sub(r"\bi\b", "I", sentence) + def rephrase_for_coherence(sentence): words = sentence.split() diff --git a/modules/settings.py b/modules/settings.py index ecad665..2d94038 100644 --- a/modules/settings.py +++ b/modules/settings.py @@ -2,17 +2,19 @@ import json import os from typing import List, Mapping, Any, TypedDict from modules.keys import Language -import logging +import logging import copy logger = logging.getLogger("goober") + class MiscBotOptions(TypedDict): ping_line: str active_song: str positive_gifs: List[str] block_profanity: bool + class BotSettings(TypedDict): prefix: str owner_ids: List[int] @@ -24,6 +26,7 @@ class BotSettings(TypedDict): enabled_cogs: List[str] active_memory: str + class SettingsType(TypedDict): bot: BotSettings locale: Language @@ -32,20 +35,21 @@ class SettingsType(TypedDict): disable_checks: bool splash_text_loc: str + class Settings: def __init__(self) -> None: self.path: str = os.path.join(".", "settings", "settings.json") if not os.path.exists(self.path): raise ValueError("settings.json file does not exist!") - + self.settings: SettingsType self.original_settings: SettingsType - + with open(self.path, "r") as f: self.__kv_store: dict = json.load(f) - - self.settings = SettingsType(self.__kv_store) # type: ignore + + self.settings = SettingsType(self.__kv_store) # type: ignore self.original_settings = copy.deepcopy(self.settings) def commit(self) -> None: @@ -53,6 +57,6 @@ class Settings: json.dump(self.settings, f, indent=4) self.original_settings = self.settings - + def discard(self) -> None: - self.settings = self.original_settings \ No newline at end of file + self.settings = self.original_settings diff --git a/modules/unhandledexception.py b/modules/unhandledexception.py index 93ee70e..8247bac 100644 --- a/modules/unhandledexception.py +++ b/modules/unhandledexception.py @@ -10,14 +10,15 @@ settings_manager = SettingsManager() settings = settings_manager.settings logger = logging.getLogger("goober") + def handle_exception(exc_type, exc_value, exc_traceback, *, context=None): - os.system('cls' if os.name == 'nt' else 'clear') - + os.system("cls" if os.name == "nt" else "clear") + if issubclass(exc_type, KeyboardInterrupt): sys.__excepthook__(exc_type, exc_value, exc_traceback) return - with open(settings['splash_text_loc'], "r") as f: + with open(settings["splash_text_loc"], "r") as f: print("".join(f.readlines())) print(f"{RED}=====BEGINNING OF TRACEBACK====={RESET}") @@ -25,9 +26,5 @@ def handle_exception(exc_type, exc_value, exc_traceback, *, context=None): print(f"{RED}========END OF TRACEBACK========{RESET}") print(f"{RED}{k.unhandled_exception()}{RESET}") - if context: print(f"{RED}Context: {context}{RESET}") - - - diff --git a/modules/version.py b/modules/version.py deleted file mode 100644 index 3b09cca..0000000 --- a/modules/version.py +++ /dev/null @@ -1,113 +0,0 @@ -import requests -import subprocess -import sys -import logging -import json -import time -import random -import modules.keys as k -from modules.globalvars import * -from modules.settings import Settings as SettingsManager - -settings_manager = SettingsManager() -settings = settings_manager.settings - -logger = logging.getLogger("goober") -launched = False - -# Run a shell command and return its output -def run_cmd(cmd): - result = subprocess.run(cmd, shell=True, capture_output=True, text=True) - return result.stdout.strip() - -# Check if the remote branch is ahead of the local branch -def is_remote_ahead(branch='main', remote='origin'): - run_cmd(f'git fetch {remote}') - count = run_cmd(f'git rev-list --count HEAD..{remote}/{branch}') - return int(count) > 0 - -# Automatically update the local repository if the remote is ahead -def auto_update(branch='main', remote='origin'): - if launched == True: - print(k.already_started()) - return - if settings["auto_update"] != "True": - pass # Auto-update is disabled - if is_remote_ahead(branch, remote): - logger.info(k.remote_ahead(remote, branch)) - pull_result = run_cmd(f'git pull {remote} {branch}') - logger.info(pull_result) - logger.info(k.please_restart()) - sys.exit(0) - else: - logger.info(k.local_ahead(remote, branch)) - -def get_latest_version_info(): - try: - unique_suffix = f"{int(time.time())}_{random.randint(0, 9999)}" - url = f"{UPDATE_URL}?_={unique_suffix}" - - curl_cmd = [ - "curl", - "-s", - "-H", "Cache-Control: no-cache", - "-H", "Pragma: no-cache", - url - ] - - result = subprocess.run(curl_cmd, capture_output=True, text=True, timeout=5) - content = result.stdout - - if result.returncode != 0: - logger.error(f"curl failed with return code {result.returncode}") - return None - - try: - data = json.loads(content) - return data - except json.JSONDecodeError: - logger.error("JSON decode failed") - logger.error(content[:500]) - return None - - except Exception as e: - logger.error(f"Exception in get_latest_version_info: {e}") - return None - -# Check if an update is available and perform update if needed -def check_for_update(): - global latest_version, local_version, launched - - latest_version_info = get_latest_version_info() - if not latest_version_info: - logger.error(f"{k.fetch_update_fail()}") - return None - - latest_version = latest_version_info.get("version") - os.environ['gooberlatest_version'] = latest_version - download_url = latest_version_info.get("download_url") - - if not latest_version or not download_url: - logger.error(k.invalid_server()) - return None - - # Check if local_version is valid - if local_version == "0.0.0" or None: - logger.error(k.cant_find_local_version()) - return - # Compare local and latest versions - - if local_version < latest_version: - logger.warning(k.new_version(latest_version=latest_version, local_version=local_version)) - logger.warning(k.changelog(VERSION_URL=VERSION_URL)) - auto_update() - - elif beta == True: - logger.warning(f"You are running an \"unstable\" version of Goober, do not expect it to work properly.\nVersion {local_version}\nServer: {latest_version}{RESET}") - elif local_version > latest_version: - logger.warning(f"{k.modification_warning()}") - elif local_version == latest_version: - logger.info(f"{k.latest_version()} {local_version}") - logger.info(f"{k.latest_version2(VERSION_URL=VERSION_URL)}\n\n") - launched = True - return latest_version \ No newline at end of file diff --git a/modules/volta/main.py b/modules/volta/main.py deleted file mode 100644 index fb6a080..0000000 --- a/modules/volta/main.py +++ /dev/null @@ -1,207 +0,0 @@ -# If you're seeing this after cloning the Goober repo, note that this is a standalone module for translations. -# While it's used by Goober Core, it lives in its own repository and should not be modified here. -# For updates or contributions, visit: https://github.com/gooberinc/volta -# Also, Note to self: Add more comments it needs more love -import os -import locale -import json -import pathlib -import threading -import time -from dotenv import load_dotenv - -ANSI = "\033[" -RED = f"{ANSI}31m" -GREEN = f"{ANSI}32m" -YELLOW = f"{ANSI}33m" -DEBUG = f"{ANSI}1;30m" -RESET = f"{ANSI}0m" - -LOCALE = os.getenv("LOCALE") -module_dir = pathlib.Path(__file__).parent.parent -working_dir = pathlib.Path.cwd() -EXCLUDE_DIRS = {'.git', '__pycache__'} - -locales_dirs = [] -ENGLISH_MISSING = False -FALLBACK_LOCALE = "en" -if os.getenv("fallback_locale"): - FALLBACK_LOCALE = os.getenv("fallback_locale") -def find_locales_dirs(base_path): - found = [] - for root, dirs, files in os.walk(base_path): - dirs[:] = [d for d in dirs if d not in EXCLUDE_DIRS] - - if 'locales' in dirs: - locales_path = pathlib.Path(root) / 'locales' - found.append(locales_path) - dirs.remove('locales') - return found - -def find_dotenv(start_path: pathlib.Path) -> pathlib.Path | None: - current = start_path.resolve() - while current != current.parent: - candidate = current / ".env" - if candidate.exists(): - return candidate - current = current.parent - return None - -env_path = find_dotenv(pathlib.Path(__file__).parent) -if env_path: - load_dotenv(dotenv_path=env_path) - print(f"[VOLTA] {GREEN}Loaded .env from {env_path}{RESET}") -else: - print(f"[VOLTA] {YELLOW}No .env file found from {__file__} upwards.{RESET}") - -locales_dirs.extend(find_locales_dirs(module_dir)) -if working_dir != module_dir: - locales_dirs.extend(find_locales_dirs(working_dir)) - -translations = {} -_file_mod_times = {} - -import locale -import platform -import os -import sys - -def get_system_locale(): - system = platform.system() # fallback incase locale isnt set - if system == "Windows": - lang, _ = locale.getdefaultlocale() - return lang or os.getenv("LANG") - elif system == "Darwin": - try: - import subprocess - result = subprocess.run( - ["defaults", "read", "-g", "AppleLocale"], - stdout=subprocess.PIPE, - stderr=subprocess.DEVNULL, - text=True - ) - return result.stdout.strip() or locale.getdefaultlocale()[0] - except Exception: - return locale.getdefaultlocale()[0] - elif system == "Linux": - return ( - os.getenv("LC_ALL") or - os.getenv("LANG") or - locale.getdefaultlocale()[0] - ) - return locale.getdefaultlocale()[0] - - -def load_translations(): - global translations, _file_mod_times - translations.clear() - _file_mod_times.clear() - - for locales_dir in locales_dirs: - for filename in os.listdir(locales_dir): - if filename.endswith(".json"): - lang_code = filename[:-5] - file_path = locales_dir / filename - try: - with open(file_path, "r", encoding="utf-8") as f: - data = json.load(f) - if lang_code not in translations: - translations[lang_code] = {} - translations[lang_code].update(data) - _file_mod_times[(lang_code, file_path)] = file_path.stat().st_mtime - except Exception as e: - print(f"[VOLTA] {RED}Failed loading {file_path}: {e}{RESET}") - -def reload_if_changed(): - while True: - for (lang_code, file_path), last_mtime in list(_file_mod_times.items()): - try: - current_mtime = file_path.stat().st_mtime - if current_mtime != last_mtime: - print(f"[VOLTA] {RED}Translation file changed: {file_path}, reloading...{RESET}") - load_translations() - break - except FileNotFoundError: - print(f"[VOLTA] {RED}Translation file removed: {file_path}{RESET}") - _file_mod_times.pop((lang_code, file_path), None) - if lang_code in translations: - translations.pop(lang_code, None) - -def set_language(lang: str): - global LOCALE, ENGLISH_MISSING - if not LOCALE: - LOCALE = get_system_locale() - elif lang in translations: - LOCALE = lang - else: - print(f"[VOLTA] {RED}Language '{lang}' not found, defaulting to 'en'{RESET}") - if FALLBACK_LOCALE in translations: - LOCALE = FALLBACK_LOCALE - else: - print(f"[VOLTA] {RED}The fallback translations cannot be found! No fallback available.{RESET}") - ENGLISH_MISSING = True - -def check_missing_translations(): - global LOCALE, ENGLISH_MISSING - load_translations() - if FALLBACK_LOCALE not in translations: - print(f"[VOLTA] {RED}Fallback translations ({FALLBACK_LOCALE}.json) missing from assets/locales.{RESET}") - ENGLISH_MISSING = True - return - if LOCALE == "en": - print("[VOLTA] Locale is English, skipping missing key check.") - return - - - en_keys = set(translations.get("en", {}).keys()) - locale_keys = set(translations.get(LOCALE, {}).keys()) - - missing_keys = en_keys - locale_keys - total_keys = len(en_keys) - missing_count = len(missing_keys) - - if missing_count > 0: - percent_missing = (missing_count / total_keys) * 100 - if percent_missing == 100: - print(f"[VOLTA] {YELLOW}Warning: All keys are missing in locale '{LOCALE}'! Defaulting back to {FALLBACK_LOCALE}{RESET}") - set_language(FALLBACK_LOCALE) - elif percent_missing > 0: - print(f"[VOLTA] {YELLOW}Warning: {missing_count}/{total_keys} keys missing in locale '{LOCALE}' ({percent_missing:.1f}%)!{RESET}") - for key in sorted(missing_keys): - print(f" - {key}") - time.sleep(2) - else: - print(f"[VOLTA] All translation keys present for locale: {LOCALE}") - -printedsystemfallback = False - -def get_translation(lang: str, key: str): - global printedsystemfallback - if ENGLISH_MISSING: - return f"[VOLTA] {RED}No fallback available!{RESET}" - fallback_translations = translations.get(FALLBACK_LOCALE, {}) - sys_lang = get_system_locale().split("_")[0] if get_system_locale() else None - sys_translations = translations.get(sys_lang, {}) if sys_lang else {} - lang_translations = translations.get(lang, {}) - if key in lang_translations: - return lang_translations[key] - if sys_lang and sys_lang != lang and key in sys_translations: - if not printedsystemfallback: - print(f"[VOLTA] {YELLOW}Falling back to system language {sys_lang}!{RESET}") - printedsystemfallback = True - return sys_translations[key] - if key in fallback_translations: - print(f"[VOLTA] {YELLOW}Missing key: '{key}' in '{lang}', falling back to fallback locale '{FALLBACK_LOCALE}'{RESET}") - return fallback_translations[key] - return f"[VOLTA] {YELLOW}Missing key: '{key}' in all locales!{RESET}" - -def _(key: str) -> str: - return get_translation(LOCALE, key) - -load_translations() - -watchdog_thread = threading.Thread(target=reload_if_changed, daemon=True) -watchdog_thread.start() - -if __name__ == '__main__': - print("Volta should not be run directly! Please use it as a module..") diff --git a/replace_volta.py b/replace_volta.py index 2de6584..3c7b86f 100644 --- a/replace_volta.py +++ b/replace_volta.py @@ -15,6 +15,7 @@ pattern = re.compile( re.VERBOSE, ) + def fix_content(content): def repl(match): key = match.group(1) @@ -26,6 +27,7 @@ def fix_content(content): return pattern.sub(repl, content) + # File types we sweepin 🧹 file_exts = [".py", ".html", ".txt", ".js"] @@ -44,4 +46,6 @@ for subdir, _, files in os.walk(folder_path): with open(path, "w", encoding="utf-8") as f: f.write(updated) -print("πŸš€πŸ’₯ ALL cleaned. No `_('...')` left on road β€” now it’s k.dot or nothin fam πŸ˜ŽπŸ”«") +print( + "πŸš€πŸ’₯ ALL cleaned. No `_('...')` left on road β€” now it’s k.dot or nothin fam πŸ˜ŽπŸ”«" +) diff --git a/settings/settings.json b/settings/settings.json index 326902f..36e55cd 100644 --- a/settings/settings.json +++ b/settings/settings.json @@ -2,7 +2,7 @@ "bot": { "prefix": "\u00e4.", "owner_ids": [ - 642441889181728810 + ], "blacklisted_users": [], "user_training": true,