diff --git a/.gitignore b/.gitignore index 3bd6415..c5b1ad0 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,10 @@ __pycache__ current_version.txt MEMORY_LOADED memory.json -*.pkl \ No newline at end of file +*.pkl +memories/ +models/ +log.txt +translationcompleteness.py +translation_report.txt +output.png diff --git a/README.md b/README.md index e4bc23e..26a4aa0 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,9 @@ Please see the [wiki](https://wiki.goober.whatdidyouexpect.eu) keep in mind that most of the bot was written at 2am +special thanks to +[Charlies Computers](https://github.com/PowerPCFan) for being the only one i know of thats actually hosting goober 24/7 + [Goober Central](https://github.com/whatdidyouexpect/goober-central) ![the goober](https://goober.whatdidyouexpect.eu/imgs/goobs/goobs.png) diff --git a/bot.py b/bot.py index ffb0623..7315154 100644 --- a/bot.py +++ b/bot.py @@ -20,12 +20,17 @@ from better_profanity import profanity from config import * import traceback import shutil +from volta import * from nltk.sentiment.vader import SentimentIntensityAnalyzer analyzer = SentimentIntensityAnalyzer() print(splashtext) # you can use https://patorjk.com/software/taag/ for 3d text or just remove this entirely +os.makedirs("memories", exist_ok=True) +os.makedirs("models", exist_ok=True) + + def download_json(): locales_dir = "locales" response = requests.get(f"{VERSION_URL}/goob/locales/{LOCALE}.json") @@ -62,79 +67,20 @@ def load_translations(): translations = load_translations() -def get_translation(lang: str, key: str): - lang_translations = translations.get(lang, translations["en"]) - if key not in lang_translations: - print(f"{RED}Missing key: {key} in language {lang}{RESET}") - return lang_translations.get(key, key) - - - -def is_name_available(NAME): - if os.getenv("gooberTOKEN"): - return - try: - response = requests.post(f"{VERSION_URL}/check-if-available", json={"name": NAME}, headers={"Content-Type": "application/json"}) - - if response.status_code == 200: - data = response.json() - return data.get("available", False) - else: - print(f"{get_translation(LOCALE, 'name_check')}", response.json()) - return False - except Exception as e: - print(f"{get_translation(LOCALE, 'name_check2')}", e) - return False - -def register_name(NAME): - try: - if ALIVEPING == False: - return - # check if the name is avaliable - if not is_name_available(NAME): - if os.getenv("gooberTOKEN"): - return - print(f"{RED}{get_translation(LOCALE, 'name_taken')}{RESET}") - quit() - - # if it is register it - response = requests.post(f"{VERSION_URL}/register", json={"name": NAME}, headers={"Content-Type": "application/json"}) - - if response.status_code == 200: - data = response.json() - token = data.get("token") - - if not os.getenv("gooberTOKEN"): - print(f"{GREEN}{get_translation(LOCALE, 'add_token').format(token=token)} gooberTOKEN=.{RESET}") - quit() - else: - print(f"{GREEN}{RESET}") - - return token - else: - print(f"{RED}{get_translation(LOCALE, 'token_exists').format()}{RESET}", response.json()) - return None - except Exception as e: - print(f"{RED}{get_translation(LOCALE, 'registration_error').format()}{RESET}", e) - return None - -register_name(NAME) def save_markov_model(model, filename='markov_model.pkl'): - with open(filename, 'wb') as f: + model_file = f"models/{filename}" + with open(model_file, "wb") as f: pickle.dump(model, f) - print(f"Markov model saved to {filename}.") + print(f"{GREEN}Markov model saved to {model_file}{RESET}") -def load_markov_model(filename='markov_model.pkl'): - - try: - with open(filename, 'rb') as f: - model = pickle.load(f) - print(f"{GREEN}{get_translation(LOCALE, 'model_loaded')} {filename}.{RESET}") - return model - except FileNotFoundError: - print(f"{RED}{filename} {get_translation(LOCALE, 'not_found')}{RESET}") - return None +def load_markov_model(server_id=None): + if server_id: + filename = f"markov_model_{server_id}.pkl" + else: + filename = "markov_model.pkl" + + model_file = f"models/{filename}" def get_latest_version_info(): @@ -173,47 +119,12 @@ def generate_sha256_of_current_file(): latest_version = "0.0.0" -local_version = "0.14.8.3" +local_version = "rewrite/seperate-memories" os.environ['gooberlocal_version'] = local_version def check_for_update(): - if ALIVEPING == "false": - return - global latest_version, local_version - - latest_version_info = get_latest_version_info() - if not latest_version_info: - print(f"{get_translation(LOCALE, 'fetch_update_fail')}") - return None, None - - latest_version = latest_version_info.get("version") - os.environ['gooberlatest_version'] = latest_version - download_url = latest_version_info.get("download_url") - - if not latest_version or not download_url: - print(f"{RED}{get_translation(LOCALE, 'invalid_server')}{RESET}") - return None, None - - if local_version == "0.0.0": - with open(LOCAL_VERSION_FILE, "w") as f: - f.write(latest_version) - generate_sha256_of_current_file() - gooberhash = latest_version_info.get("hash") - if gooberhash == currenthash: - if local_version < latest_version: - print(f"{YELLOW}{get_translation(LOCALE, 'new_version').format(latest_version=latest_version, local_version=local_version)}{RESET}") - print(f"{YELLOW}{get_translation(LOCALE, 'changelog').format(VERSION_URL=VERSION_URL)}{RESET}") - else: - print(f"{GREEN}{get_translation(LOCALE, 'latest_version')} {local_version}{RESET}") - print(f"{get_translation(LOCALE, 'latest_version2').format(VERSION_URL=VERSION_URL)}\n\n") - else: - print(f"{YELLOW}{get_translation(LOCALE, 'modification_warning')}") - print(f"{YELLOW}{get_translation(LOCALE, 'reported_version')} {local_version}{RESET}") - print(f"{DEBUG}{get_translation(LOCALE, 'current_hash')} {currenthash}{RESET}") - - -check_for_update() + return def get_file_info(file_path): try: @@ -226,39 +137,50 @@ def get_file_info(file_path): nltk.download('punkt') -def load_memory(): +def load_memory(server_id=None): + if server_id: + memory_file = f"memories/memory_{server_id}.json" + else: + memory_file = "memories/memory.json" + data = [] - - # load data from MEMORY_FILE try: - with open(MEMORY_FILE, "r") as f: + with open(memory_file, "r") as f: data = json.load(f) except FileNotFoundError: pass - - if not os.path.exists(MEMORY_LOADED_FILE): - try: - with open(DEFAULT_DATASET_FILE, "r") as f: - default_data = json.load(f) - data.extend(default_data) - except FileNotFoundError: - pass - with open(MEMORY_LOADED_FILE, "w") as f: - f.write("Data loaded") + except json.JSONDecodeError: + print(f"{RED}Error decoding memory file {memory_file}{RESET}") + return data -def save_memory(memory): - with open(MEMORY_FILE, "w") as f: +def save_memory(memory, server_id=None): + if server_id: + memory_file = f"memories/memory_{server_id}.json" + else: + memory_file = "memories/memory.json" + + with open(memory_file, "w") as f: json.dump(memory, f, indent=4) -def train_markov_model(memory, additional_data=None): +def train_markov_model(memory, additional_data=None, server_id=None): if not memory: return None + text = "\n".join(memory) if additional_data: text += "\n" + "\n".join(additional_data) - model = markovify.NewlineText(text, state_size=2) - return model + + try: + model = markovify.NewlineText(text, state_size=2) + if server_id: + model_filename = f"markov_model_{server_id}.pkl" + save_markov_model(model, model_filename) + return model + except Exception as e: + print(f"{RED}Error training model: {e}{RESET}") + return None + #this doesnt work and im extremely pissed and mad def append_mentions_to_18digit_integer(message): pattern = r'\b\d{18}\b' @@ -304,7 +226,6 @@ async def on_ready(): synced = await bot.tree.sync() print(f"{GREEN}{get_translation(LOCALE, 'synced_commands')} {len(synced)} {get_translation(LOCALE, 'synced_commands2')} {RESET}") slash_commands_enabled = True - ping_server() print(f"{GREEN}{get_translation(LOCALE, 'started').format()}{RESET}") except Exception as e: print(f"{RED}{get_translation(LOCALE, 'fail_commands_sync')} {e}{RESET}") @@ -314,33 +235,7 @@ async def on_ready(): return await bot.change_presence(activity=discord.Activity(type=discord.ActivityType.listening, name=f"{song}")) -def ping_server(): - if ALIVEPING == "false": - print(f"{YELLOW}{get_translation(LOCALE, 'pinging_disabled')}{RESET}") - os.environ['gooberauthenticated'] = 'No' - return - file_info = get_file_info(MEMORY_FILE) - payload = { - "name": NAME, - "memory_file_info": file_info, - "version": local_version, - "slash_commands": slash_commands_enabled, - "token": gooberTOKEN - } - try: - response = requests.post(VERSION_URL+"/ping", json=payload) - if response.status_code == 200: - print(f"{GREEN}{get_translation(LOCALE, 'goober_ping_success').format(NAME=NAME)}{RESET}") - os.environ['gooberauthenticated'] = 'Yes' - else: - print(f"{RED}{get_translation(LOCALE, 'goober_ping_fail')} {response.status_code}{RESET}") - os.environ['gooberauthenticated'] = 'No' - except Exception as e: - print(f"{RED}{get_translation(LOCALE, 'goober_ping_fail2')} {str(e)}{RESET}") - os.environ['gooberauthenticated'] = 'No' - - -positive_gifs = os.getenv("POSITIVE_GIFS").split(',') +positive_gifs = os.getenv("POSITIVEGIFS").split(',') def is_positive(sentence): scores = analyzer.polarity_scores(sentence) @@ -381,42 +276,158 @@ async def send_message(ctx, message=None, embed=None, file=None, edit=False, mes sent_message = await ctx.send(file=file) return sent_message -@bot.hybrid_command(description=f"{get_translation(LOCALE, 'command_desc_retrain')}") -async def retrain(ctx): - if ctx.author.id != ownerid: - return +@bot.hybrid_command(description="Retrain Markov models for servers") +@app_commands.choices(option=[ + app_commands.Choice(name="Retrain current server", value="current"), + app_commands.Choice(name="Retrain all servers", value="all"), + app_commands.Choice(name="Select servers to retrain", value="select") +]) +async def retrain_models(ctx, option: str = None): + if option is None and not ctx.interaction: + options_text = "all, current, select" + return await ctx.send(f"Please specify an option:\n{options_text}") + + if isinstance(option, app_commands.Choice): + option = option.value + if ctx.author.id == ownerid: + if option == "current": + server_id = ctx.guild.id if ctx.guild else "DM" + await retrain_single_server(ctx, server_id) + elif option == "all": + await retrain_all_servers(ctx) + elif option == "select": + await show_server_selection(ctx) + else: + if ctx.guild and (ctx.author.guild_permissions.administrator or any(role.permissions.administrator for role in ctx.author.roles)): + if option == "current": + server_id = ctx.guild.id + await retrain_single_server(ctx, server_id) + else: + await ctx.send("You can only retrain the current server.", ephemeral=True) + else: + await ctx.send("You don't have permission to use this command.", ephemeral=True) - message_ref = await send_message(ctx, f"{get_translation(LOCALE, 'command_markov_retrain')}") +async def retrain_single_server(ctx, server_id): + memory_file = f"memories/memory_{server_id}.json" + model_file = f"models/markov_model_{server_id}.pkl" + try: - with open(MEMORY_FILE, 'r') as f: + with open(memory_file, 'r') as f: memory = json.load(f) except FileNotFoundError: - await send_message(ctx, f"{get_translation(LOCALE, 'command_markov_memory_not_found')}") - return - except json.JSONDecodeError: - await send_message(ctx, f"{get_translation(LOCALE, 'command_markov_memory_is_corrupt')}") - return - data_size = len(memory) - processed_data = 0 - processing_message_ref = await send_message(ctx, f"{get_translation(LOCALE, 'command_markov_retraining').format(processed_data=processed_data, data_size=data_size)}") - start_time = time.time() - for i, data in enumerate(memory): - processed_data += 1 - if processed_data % 1000 == 0 or processed_data == data_size: - await send_message(ctx, f"{get_translation(LOCALE, 'command_markov_retraining').format(processed_data=processed_data, data_size=data_size)}", edit=True, message_reference=processing_message_ref) - - global markov_model + return await ctx.send(f"No memory data found for server {server_id}", ephemeral=True) - markov_model = train_markov_model(memory) - save_markov_model(markov_model) + processing_msg = await ctx.send(f"Retraining model for server {server_id}...") + + model = train_markov_model(memory, server_id=server_id) + + if model: + await processing_msg.edit(content=f"Successfully retrained model for server {server_id}") + else: + await processing_msg.edit(content=f"Failed to retrain model for server {server_id}") - await send_message(ctx, f"{get_translation(LOCALE, 'command_markov_retrain_successful').format(data_size=data_size)}", edit=True, message_reference=processing_message_ref) +async def retrain_all_servers(ctx): + memory_files = [f for f in os.listdir("memories/") if f.startswith("memory_") and f.endswith(".json")] + + if not memory_files: + return await ctx.send("No server memory files found to retrain.", ephemeral=True) + + progress_msg = await ctx.send(f"Retraining models for {len(memory_files)} servers...") + success_count = 0 + + for mem_file in memory_files: + try: + server_id = mem_file.replace("memory_", "").replace(".json", "") + with open(f"memories/{mem_file}", 'r') as f: + memory = json.load(f) + + model = train_markov_model(memory, server_id=server_id) + if model: + success_count += 1 + + if success_count % 5 == 0: + await progress_msg.edit(content=f"Retraining in progress... {success_count}/{len(memory_files)} completed") + + except Exception as e: + print(f"Error retraining {mem_file}: {e}") + + await progress_msg.edit(content=f"Retraining complete successfully retrained {success_count}/{len(memory_files)} servers") + +async def show_server_selection(ctx): + memory_files = [f for f in os.listdir("memories/") if f.startswith("memory_") and f.endswith(".json")] + + if not memory_files: + return await ctx.send("No server memory files found.", ephemeral=True) + options = [] + for mem_file in memory_files: + server_id = mem_file.replace("memory_", "").replace(".json", "") + server_name = f"Server {server_id}" + if server_id != "DM": + guild = bot.get_guild(int(server_id)) + if guild: + server_name = guild.name + + options.append(discord.SelectOption(label=server_name, value=server_id)) + select_menus = [] + for i in range(0, len(options), 25): + chunk = options[i:i+25] + + select = discord.ui.Select( + placeholder=f"Select servers to retrain ({i+1}-{min(i+25, len(options))})", + min_values=1, + max_values=len(chunk), + options=chunk + ) + + select_menus.append(select) + view = discord.ui.View() + for menu in select_menus: + menu.callback = lambda interaction, m=menu: handle_server_selection(interaction, m) + view.add_item(menu) + + if ctx.interaction: + await ctx.send("Select which servers to retrain:", view=view) + else: + await ctx.reply("Select which servers to retrain:", view=view) + +async def handle_server_selection(interaction, select_menu): + await interaction.response.defer() + + selected_servers = select_menu.values + if not selected_servers: + return await interaction.followup.send("No servers selected.", ephemeral=True) + + progress_msg = await interaction.followup.send(f"Retraining {len(selected_servers)} selected servers...") + success_count = 0 + + for server_id in selected_servers: + try: + memory_file = f"memories/memory_{server_id}.json" + with open(memory_file, 'r') as f: + memory = json.load(f) + + model = train_markov_model(memory, server_id=server_id) + if model: + success_count += 1 + if success_count % 5 == 0: + await progress_msg.edit(content=f"Retraining in progress... {success_count}/{len(selected_servers)} completed") + + except Exception as e: + print(f"Error retraining {server_id}: {e}") + + await progress_msg.edit(content=f"Retraining complete Successfully retrained {success_count}/{len(selected_servers)} selected servers") @bot.hybrid_command(description=f"{get_translation(LOCALE, 'command_desc_talk')}") async def talk(ctx, sentence_size: int = 5): + server_id = ctx.guild.id if ctx.guild else "DM" + markov_model = load_markov_model(server_id) + if not markov_model: - await send_message(ctx, f"{get_translation(LOCALE, 'command_talk_insufficent_text')}") - return + memory = load_memory(server_id) + markov_model = train_markov_model(memory, server_id=server_id) + if not markov_model: + await send_message(ctx, f"{get_translation(LOCALE, 'command_talk_insufficent_text')}") + return response = None for _ in range(20): @@ -447,6 +458,7 @@ async def talk(ctx, sentence_size: int = 5): else: await send_message(ctx, f"{get_translation(LOCALE, 'command_talk_generation_fail')}") + def improve_sentence_coherence(sentence): sentence = sentence.replace(" i ", " I ") @@ -492,8 +504,6 @@ async def help(ctx): @bot.event async def on_message(message): - global memory, markov_model, last_random_talk_time - if message.author.bot: return @@ -508,22 +518,29 @@ async def on_message(message): if profanity.contains_profanity(message.content): return - if message.content: - if not USERTRAIN_ENABLED: - return + if message.content and USERTRAIN_ENABLED: + server_id = message.guild.id if message.guild else "DM" + memory = load_memory(server_id) + formatted_message = append_mentions_to_18digit_integer(message.content) cleaned_message = preprocess_message(formatted_message) + if cleaned_message: memory.append(cleaned_message) - save_memory(memory) + save_memory(memory, server_id) - # process any commands in the message await bot.process_commands(message) @bot.event async def on_interaction(interaction): - print(f"{get_translation(LOCALE, 'command_ran_s').format(interaction=interaction)}{interaction.data['name']}") + try: + if interaction.type == discord.InteractionType.application_command: + command_name = interaction.data.get('name', 'unknown') + print(f"{get_translation(LOCALE, 'command_ran_s').format(interaction=interaction)}{command_name}") + except Exception as e: + print(f"{RED}Error handling interaction: {e}{RESET}") + traceback.print_exc() @bot.check async def block_blacklisted(ctx): @@ -560,15 +577,9 @@ async def ping(ctx): @bot.hybrid_command(description=f"{get_translation(LOCALE, 'command_about_desc')}") async def about(ctx): - print("-----------------------------------\n\n") - try: - check_for_update() - except Exception as e: - pass - print("-----------------------------------") embed = discord.Embed(title=f"{get_translation(LOCALE, 'command_about_embed_title')}", description="", color=discord.Color.blue()) embed.add_field(name=f"{get_translation(LOCALE, 'command_about_embed_field1')}", value=f"{NAME}", inline=False) - embed.add_field(name=f"{get_translation(LOCALE, 'command_about_embed_field2name')}", value=f"{get_translation(LOCALE, 'command_about_embed_field2value').format(local_version=local_version, latest_version=latest_version)}", inline=False) + embed.add_field(name=f"{get_translation(LOCALE, 'command_about_embed_field2name')}", value=f"{get_translation(LOCALE, 'command_about_embed_field2value').format(local_version=local_version, latest_version='I have not been updated with the new version checking!')}", inline=False) await send_message(ctx, embed=embed) @@ -598,10 +609,13 @@ async def stats(ctx): async def mem(ctx): if showmemenabled != "true": return - memory = load_memory() - memory_text = json.dumps(memory, indent=4) - with open(MEMORY_FILE, "r") as f: - await send_message(ctx, file=discord.File(f, MEMORY_FILE)) + server_id = ctx.guild.id if ctx.guild else "DM" + memory_file = f"memories/memory_{server_id}.json" if server_id else "memories/memory.json" + try: + with open(memory_file, "r") as f: + await send_message(ctx, file=discord.File(f, memory_file)) + except FileNotFoundError: + await send_message(ctx, f"No memory file found at {memory_file}") def improve_sentence_coherence(sentence): sentence = sentence.replace(" i ", " I ") diff --git a/config.py b/config.py index d4e6b0b..f2b413b 100644 --- a/config.py +++ b/config.py @@ -7,20 +7,16 @@ load_dotenv() VERSION_URL = "https://goober.expect.ovh" UPDATE_URL = VERSION_URL+"/latest_version.json" LOCAL_VERSION_FILE = "current_version.txt" -TOKEN = os.getenv("DISCORD_BOT_TOKEN") -PREFIX = os.getenv("BOT_PREFIX") -hourlyspeak = int(os.getenv("hourlyspeak")) -PING_LINE = os.getenv("PING_LINE") -random_talk_channel_id1 = int(os.getenv("rnd_talk_channel1")) -LOCALE = os.getenv("locale") -gooberTOKEN = os.getenv("gooberTOKEN") -random_talk_channel_id2 = int(os.getenv("rnd_talk_channel2")) -cooldown_time = os.getenv("cooldown") -splashtext = os.getenv("splashtext") -ownerid = int(os.getenv("ownerid")) -showmemenabled = os.getenv("showmemenabled") -BLACKLISTED_USERS = os.getenv("BLACKLISTED_USERS", "").split(",") -USERTRAIN_ENABLED = os.getenv("USERTRAIN_ENABLED", "true").lower() == "true" +TOKEN = os.getenv("DISCORDBOTTOKEN") +PREFIX = os.getenv("BOTPREFIX") +PING_LINE = os.getenv("PINGLINE") +LOCALE = os.getenv("LOCALE") +gooberTOKEN = os.getenv("GOOBERTOKEN") +splashtext = os.getenv("SPLASHTEXT") +ownerid = int(os.getenv("OWNERID")) +showmemenabled = os.getenv("SHOWMEMENABLED") +BLACKLISTED_USERS = os.getenv("BLACKLISTEDUSERS", "").split(",") +USERTRAIN_ENABLED = os.getenv("USERTRAINENABLED", "true").lower() == "true" NAME = os.getenv("NAME") last_random_talk_time = 0 MEMORY_FILE = "memory.json" @@ -28,7 +24,7 @@ DEFAULT_DATASET_FILE = "defaultdataset.json" MEMORY_LOADED_FILE = "MEMORY_LOADED" ALIVEPING = os.getenv("ALIVEPING") IGNOREWARNING = False -song = os.getenv("song") +song = os.getenv("SONG") arch = platform.machine() RED = "\033[31m" GREEN = "\033[32m" diff --git a/modules/volta b/modules/volta new file mode 160000 index 0000000..628de5f --- /dev/null +++ b/modules/volta @@ -0,0 +1 @@ +Subproject commit 628de5f76ea2a258448d2ddfc081c2ce58f509fd diff --git a/todo.txt b/todo.txt new file mode 100644 index 0000000..bd9bd03 --- /dev/null +++ b/todo.txt @@ -0,0 +1 @@ +- update to latest version of goober diff --git a/update.py b/update.py deleted file mode 100644 index bd85c1d..0000000 --- a/update.py +++ /dev/null @@ -1,72 +0,0 @@ -import requests -import os -import argparse - -# config -DEFAULT_VERSION_URL = "https://goober.whatdidyouexpect.eu/latest_version.json" -LOCAL_VERSION_FILE = "current_version.txt" -SCRIPT_FILE = "bot.py" -CONFIG_FILE = "config.py" - -def get_latest_version_info(version_url): - """Fetch the latest version information from the server.""" - try: - response = requests.get(version_url, timeout=5) - response.raise_for_status() # Will raise HTTPError for bad responses - return response.json() - except requests.RequestException as e: - print(f"Error: Unable to connect to the update server. {e}") - return None - -def get_local_version(): - """Read the current version from the local file.""" - if os.path.exists(LOCAL_VERSION_FILE): - with open(LOCAL_VERSION_FILE, "r") as f: - return f.read().strip() - return "0.0.0" # Default version if file doesn't exist - -def save_local_version(version): - """Save the current version to the local file.""" - with open(LOCAL_VERSION_FILE, "w") as f: - f.write(version) - -def download_file(url, destination): - """Download a file from the given URL and save it to the destination.""" - try: - response = requests.get(url, timeout=10) - response.raise_for_status() # Will raise HTTPError for bad responses - with open(destination, "wb") as f: - f.write(response.content) - print(f"Successfully downloaded the file to {destination}.") - except requests.RequestException as e: - print(f"Error: Failed to download the file from {url}. {e}") - -def check_for_update(version_url): - """Check if a new version is available and update the script if needed.""" - latest_version_info = get_latest_version_info(version_url) - if not latest_version_info: - print("Could not fetch update information.") - return - latest_version = latest_version_info.get("version") - download_url = latest_version_info.get("download_url") - config_url = latest_version_info.get("config_url") - if not latest_version or not download_url: - print("Error: Invalid version information received from server.") - return - local_version = get_local_version() - if local_version != latest_version: - print(f"New version available: {latest_version} (Current: {local_version})") - print("Downloading the new version...") - download_file(download_url, SCRIPT_FILE) - download_file(config_url, CONFIG_FILE) - save_local_version(latest_version) - print("Update complete! Restart the bot to use the new version.") - else: - print(f"You're using the latest version: {local_version}") - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Check for updates and download the latest version of the bot.") - parser.add_argument("--host", type=str, help="Custom version URL", default=DEFAULT_VERSION_URL) - - args = parser.parse_args() - check_for_update(args.host) \ No newline at end of file diff --git a/volta.py b/volta.py new file mode 100644 index 0000000..c308b93 --- /dev/null +++ b/volta.py @@ -0,0 +1,208 @@ +# If you're seeing this after cloning the Goober repo, note that this is a standalone module for translations. +# While it's used by Goober Core, it lives in its own repository and should not be modified here. +# For updates or contributions, visit: https://github.com/gooberinc/volta +# Also, Note to self: Add more comments it needs more love +import os +import locale +import json +import pathlib +import threading +import time +from dotenv import load_dotenv + +ANSI = "\033[" +RED = f"{ANSI}31m" +GREEN = f"{ANSI}32m" +YELLOW = f"{ANSI}33m" +DEBUG = f"{ANSI}1;30m" +RESET = f"{ANSI}0m" + +LOCALE = os.getenv("LOCALE") +module_dir = pathlib.Path(__file__).parent.parent +working_dir = pathlib.Path.cwd() +EXCLUDE_DIRS = {'.git', '__pycache__'} + +locales_dirs = [] +ENGLISH_MISSING = False +FALLBACK_LOCALE = "en" +if os.getenv("fallback_locale"): + FALLBACK_LOCALE = os.getenv("fallback_locale") +def find_locales_dirs(base_path): + found = [] + for root, dirs, files in os.walk(base_path): + dirs[:] = [d for d in dirs if d not in EXCLUDE_DIRS] + + if 'locales' in dirs: + locales_path = pathlib.Path(root) / 'locales' + found.append(locales_path) + dirs.remove('locales') + return found + +def find_dotenv(start_path: pathlib.Path) -> pathlib.Path | None: + current = start_path.resolve() + while current != current.parent: + candidate = current / ".env" + if candidate.exists(): + return candidate + current = current.parent + return None + +env_path = find_dotenv(pathlib.Path(__file__).parent) +if env_path: + load_dotenv(dotenv_path=env_path) + print(f"[VOLTA] {GREEN}Loaded .env from {env_path}{RESET}") +else: + print(f"[VOLTA] {YELLOW}No .env file found from {__file__} upwards.{RESET}") + +locales_dirs.extend(find_locales_dirs(module_dir)) +if working_dir != module_dir: + locales_dirs.extend(find_locales_dirs(working_dir)) + +translations = {} +_file_mod_times = {} + +import locale +import platform +import os +import sys + +def get_system_locale(): + system = platform.system() # fallback incase locale isnt set + if system == "Windows": + lang, _ = locale.getdefaultlocale() + return lang or os.getenv("LANG") + elif system == "Darwin": + try: + import subprocess + result = subprocess.run( + ["defaults", "read", "-g", "AppleLocale"], + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + text=True + ) + return result.stdout.strip() or locale.getdefaultlocale()[0] + except Exception: + return locale.getdefaultlocale()[0] + elif system == "Linux": + return ( + os.getenv("LC_ALL") or + os.getenv("LANG") or + locale.getdefaultlocale()[0] + ) + return locale.getdefaultlocale()[0] + + +def load_translations(): + global translations, _file_mod_times + translations.clear() + _file_mod_times.clear() + + for locales_dir in locales_dirs: + for filename in os.listdir(locales_dir): + if filename.endswith(".json"): + lang_code = filename[:-5] + file_path = locales_dir / filename + try: + with open(file_path, "r", encoding="utf-8") as f: + data = json.load(f) + if lang_code not in translations: + translations[lang_code] = {} + translations[lang_code].update(data) + _file_mod_times[(lang_code, file_path)] = file_path.stat().st_mtime + except Exception as e: + print(f"[VOLTA] {RED}Failed loading {file_path}: {e}{RESET}") + +def reload_if_changed(): + while True: + for (lang_code, file_path), last_mtime in list(_file_mod_times.items()): + try: + current_mtime = file_path.stat().st_mtime + if current_mtime != last_mtime: + print(f"[VOLTA] {RED}Translation file changed: {file_path}, reloading...{RESET}") + load_translations() + break + except FileNotFoundError: + print(f"[VOLTA] {RED}Translation file removed: {file_path}{RESET}") + _file_mod_times.pop((lang_code, file_path), None) + if lang_code in translations: + translations.pop(lang_code, None) + +def set_language(lang: str): + global LOCALE, ENGLISH_MISSING + if not LOCALE: + LOCALE = get_system_locale() + elif lang in translations: + LOCALE = lang + else: + print(f"[VOLTA] {RED}Language '{lang}' not found, defaulting to 'en'{RESET}") + if FALLBACK_LOCALE in translations: + LOCALE = FALLBACK_LOCALE + else: + print(f"[VOLTA] {RED}The fallback translations cannot be found! No fallback available.{RESET}") + ENGLISH_MISSING = True + +def check_missing_translations(): + global LOCALE, ENGLISH_MISSING + load_translations() + if FALLBACK_LOCALE not in translations: + print(f"[VOLTA] {RED}Fallback translations ({FALLBACK_LOCALE}.json) missing from assets/locales.{RESET}") + ENGLISH_MISSING = True + return + if LOCALE == "en": + print("[VOLTA] Locale is English, skipping missing key check.") + return + + + en_keys = set(translations.get("en", {}).keys()) + locale_keys = set(translations.get(LOCALE, {}).keys()) + + missing_keys = en_keys - locale_keys + total_keys = len(en_keys) + missing_count = len(missing_keys) + + if missing_count > 0: + percent_missing = (missing_count / total_keys) * 100 + if percent_missing == 100: + print(f"[VOLTA] {YELLOW}Warning: All keys are missing in locale '{LOCALE}'! Defaulting back to {FALLBACK_LOCALE}{RESET}") + set_language(FALLBACK_LOCALE) + elif percent_missing > 0: + print(f"[VOLTA] {YELLOW}Warning: {missing_count}/{total_keys} keys missing in locale '{LOCALE}' ({percent_missing:.1f}%)!{RESET}") + for key in sorted(missing_keys): + print(f" - {key}") + time.sleep(2) + else: + print(f"[VOLTA] All translation keys present for locale: {LOCALE}") + +printedsystemfallback = False + +def get_translation(lang: str, key: str): + global printedsystemfallback + if ENGLISH_MISSING: + return f"[VOLTA] {RED}No fallback available!{RESET}" + fallback_translations = translations.get(FALLBACK_LOCALE, {}) + sys_lang = get_system_locale().split("_")[0] if get_system_locale() else None + sys_translations = translations.get(sys_lang, {}) if sys_lang else {} + lang_translations = translations.get(lang, {}) + if key in lang_translations: + return lang_translations[key] + if sys_lang and sys_lang != lang and key in sys_translations: + if not printedsystemfallback: + print(f"[VOLTA] {YELLOW}Falling back to system language {sys_lang}!{RESET}") + printedsystemfallback = True + return sys_translations[key] + if key in fallback_translations: + print(f"[VOLTA] {YELLOW}Missing key: '{key}' in '{lang}', falling back to fallback locale '{FALLBACK_LOCALE}'{RESET}") + return fallback_translations[key] + return f"[VOLTA] {YELLOW}Missing key: '{key}' in all locales!{RESET}" + +def _(key: str) -> str: + return get_translation(LOCALE, key) + +load_translations() + +watchdog_thread = threading.Thread(target=reload_if_changed, daemon=True) +watchdog_thread.start() + +if __name__ == '__main__': + print("Volta should not be run directly! Please use it as a module..") +