replaced translations with easier one and added demotivators (not polished)

This commit is contained in:
WhatDidYouExpect 2025-07-05 20:15:54 +02:00
parent d641908a8c
commit 22d454dd42
15 changed files with 304 additions and 244 deletions

BIN
assets/fonts/TNR.ttf Normal file

Binary file not shown.

View file

@ -1,4 +1,5 @@
{ {
"unhandled_exception": "An unhandled exception occurred. Please report this issue on GitHub.",
"active_users:": "Active users:", "active_users:": "Active users:",
"spacy_initialized": "spaCy and spacytextblob are ready.", "spacy_initialized": "spaCy and spacytextblob are ready.",
"spacy_model_not_found": "The spaCy model was not found! Downloading it....`", "spacy_model_not_found": "The spaCy model was not found! Downloading it....`",

View file

@ -1,4 +1,5 @@
{ {
"unhandled_exception": "Käsittelemätön virhe tapahtui. Ilmoita tästä GitHubissa.",
"env_file_not_found": ".env-tiedostoa ei löytnyt! Luo tiedosto jossa on tarvittavat muuttujat", "env_file_not_found": ".env-tiedostoa ei löytnyt! Luo tiedosto jossa on tarvittavat muuttujat",
"already_started": "Olen jo käynnistynyt! Ei päivitetä...", "already_started": "Olen jo käynnistynyt! Ei päivitetä...",
"please_restart": "Käynnistä uudelleen, hölmö!", "please_restart": "Käynnistä uudelleen, hölmö!",

View file

@ -1,4 +1,5 @@
{ {
"unhandled_exception": "Si è verificata un'eccezione non gestita. Segnala questo problema su GitHub, per favore.",
"active_users:": "Utenti attivi:", "active_users:": "Utenti attivi:",
"spacy_initialized": "spaCy e spacytextblob sono pronti.", "spacy_initialized": "spaCy e spacytextblob sono pronti.",
"spacy_model_not_found": "Il modello spaCy non è stato trovato! Lo sto scaricando...", "spacy_model_not_found": "Il modello spaCy non è stato trovato! Lo sto scaricando...",

338
bot.py
View file

@ -9,6 +9,8 @@ import tempfile
import shutil import shutil
import uuid import uuid
import asyncio import asyncio
import sys
from typing import List, Dict, Set, Optional, Tuple, Any, Union, Callable, Coroutine, TypeVar, Type
from modules.globalvars import * from modules.globalvars import *
from modules.prestartchecks import start_checks from modules.prestartchecks import start_checks
@ -21,25 +23,55 @@ import requests
import discord import discord
from discord.ext import commands from discord.ext import commands
from discord import Colour from discord import Colour, Embed, File, Interaction, Message
from discord.abc import Messageable
from better_profanity import profanity from better_profanity import profanity
from discord.ext import commands from discord.ext import commands
from modules.central import ping_server from modules.central import ping_server
from modules.translations import * from modules.translations import _
from modules.markovmemory import * from modules.markovmemory import *
from modules.version import * from modules.version import *
from modules.sentenceprocessing import * from modules.sentenceprocessing import *
from modules.unhandledexception import handle_exception from modules.unhandledexception import handle_exception
from modules.image import gen_image from modules.image import gen_meme, gen_demotivator
sys.excepthook = handle_exception sys.excepthook = handle_exception
check_for_update() # Check for updates (from modules/version.py) check_for_update() # Check for updates (from modules/version.py)
# removed since all locales come with goober now # Type aliases
T = TypeVar('T')
MessageContext = Union[commands.Context, discord.Interaction]
MessageReference = Union[Message, discord.WebhookMessage]
# Constants with type hints
positive_gifs: List[str] = os.getenv("POSITIVE_GIFS", "").split(',')
currenthash: str = ""
launched: bool = False
slash_commands_enabled: bool = False
# Set up Discord bot intents and create bot instance
intents: discord.Intents = discord.Intents.default()
intents.messages = True
intents.message_content = True
bot: commands.Bot = commands.Bot(
command_prefix=PREFIX,
intents=intents,
allowed_mentions=discord.AllowedMentions(everyone=False, roles=False, users=False, replied_user=True)
)
# Load memory and Markov model for text generation
memory: List[str] = load_memory()
markov_model: Optional[markovify.Text] = load_markov_model()
if not markov_model:
print(f"{RED}{(_('markov_model_not_found'))}{RESET}")
memory = load_memory()
markov_model = train_markov_model(memory)
generated_sentences: Set[str] = set()
used_words: Set[str] = set()
# Dynamically load all cogs (extensions) from the cogs folder
async def load_cogs_from_folder(bot, folder_name="assets/cogs"): async def load_cogs_from_folder(bot, folder_name="assets/cogs"):
for filename in os.listdir(folder_name): for filename in os.listdir(folder_name):
if filename.endswith(".py") and not filename.startswith("_"): if filename.endswith(".py") and not filename.startswith("_"):
@ -47,72 +79,51 @@ async def load_cogs_from_folder(bot, folder_name="assets/cogs"):
module_path = folder_name.replace("/", ".").replace("\\", ".") + f".{cog_name}" module_path = folder_name.replace("/", ".").replace("\\", ".") + f".{cog_name}"
try: try:
await bot.load_extension(module_path) await bot.load_extension(module_path)
print(f"{GREEN}{get_translation(LOCALE, 'loaded_cog')} {cog_name}{RESET}") print(f"{GREEN}{(_('loaded_cog'))} {cog_name}{RESET}")
except Exception as e: except Exception as e:
print(f"{RED}{get_translation(LOCALE, 'cog_fail')} {cog_name} {e}{RESET}") print(f"{RED}{(_('cog_fail'))} {cog_name} {e}{RESET}")
traceback.print_exc() traceback.print_exc()
currenthash = "" async def fetch_active_users() -> str:
# Set up Discord bot intents and create bot instance
intents = discord.Intents.default()
intents.messages = True
intents.message_content = True
bot = commands.Bot(
command_prefix=PREFIX,
intents=intents,
allowed_mentions=discord.AllowedMentions(everyone=False, roles=False, users=False, replied_user=True)
)
# Load memory and Markov model for text generation
memory = load_memory()
markov_model = load_markov_model()
if not markov_model:
print(f"{get_translation(LOCALE, 'no_model')}")
memory = load_memory()
markov_model = train_markov_model(memory)
generated_sentences = set()
used_words = set()
async def fetch_active_users():
try: try:
response = requests.get(f"{VERSION_URL}/active-users") response: requests.Response = requests.get(f"{VERSION_URL}/active-users")
if response.status_code == 200: if response.status_code == 200:
return response.text.strip() return response.text.strip()
else: else:
return "?" return "?"
except Exception as e: except Exception as e:
print(f"{RED}{get_translation(LOCALE, 'error_fetching_active_users')}{RESET} {e}") print(f"{RED}{(_('error_fetching_active_users'))}{RESET} {e}")
return "?" return "?"
async def send_alive_ping_periodically(): async def send_alive_ping_periodically() -> None:
while True: while True:
try: try:
requests.post(f"{VERSION_URL}/aliveping", json={"name": NAME}) requests.post(f"{VERSION_URL}/aliveping", json={"name": NAME})
except Exception as e: except Exception as e:
print(f"{RED}{get_translation(LOCALE, 'error_sending_alive_ping')}{RESET} {e}") print(f"{RED}{(_('error_sending_alive_ping'))}{RESET} {e}")
await asyncio.sleep(60) await asyncio.sleep(60)
# Event: Called when the bot is ready # Event: Called when the bot is ready
@bot.event @bot.event
async def on_ready(): async def on_ready() -> None:
global launched global launched
global slash_commands_enabled global slash_commands_enabled
global NAME global NAME
folder_name = "cogs"
if launched == True: folder_name: str = "cogs"
if launched:
return return
await load_cogs_from_folder(bot) await load_cogs_from_folder(bot)
try: try:
synced = await bot.tree.sync() synced: List[discord.app_commands.AppCommand] = await bot.tree.sync()
print(f"{GREEN}{get_translation(LOCALE, 'synced_commands')} {len(synced)} {get_translation(LOCALE, 'synced_commands2')} {RESET}") print(f"{GREEN}{_('synced_commands')} {len(synced)} {(_('synced_commands2'))} {RESET}")
slash_commands_enabled = True slash_commands_enabled = True
ping_server() # ping_server from modules/central.py ping_server() # ping_server from modules/central.py
# I FORGOT TO REMOVE THE ITALIAN VERSION FUCKKKKKKKKK
active_users = await fetch_active_users() active_users: str = await fetch_active_users()
print(f"{GREEN}{get_translation(LOCALE, 'active_users:')} {active_users}{RESET}") print(f"{GREEN}{(_('active_users:'))} {active_users}{RESET}")
print(f"{GREEN}{get_translation(LOCALE, 'started').format(name=NAME)}{RESET}") print(f"{GREEN}{(_('started')).format(name=NAME)}{RESET}")
bot.loop.create_task(send_alive_ping_periodically()) bot.loop.create_task(send_alive_ping_periodically())
except discord.errors.Forbidden as perm_error: except discord.errors.Forbidden as perm_error:
@ -120,23 +131,21 @@ async def on_ready():
print(f"{RED}Make sure the bot has the 'applications.commands' scope and is invited with the correct permissions.{RESET}") print(f"{RED}Make sure the bot has the 'applications.commands' scope and is invited with the correct permissions.{RESET}")
quit() quit()
except Exception as e: except Exception as e:
print(f"{RED}{get_translation(LOCALE, 'fail_commands_sync')} {e}{RESET}") print(f"{RED}{(_('fail_commands_sync'))} {e}{RESET}")
traceback.print_exc() traceback.print_exc()
quit() quit()
if not song: if not song:
return return
await bot.change_presence(activity=discord.Activity(type=discord.ActivityType.listening, name=f"{song}")) await bot.change_presence(activity=discord.Activity(type=discord.ActivityType.listening, name=f"{song}"))
launched = True launched = True
# Load positive GIF URLs from environment variable
positive_gifs: list[str] = os.getenv("POSITIVE_GIFS", "").split(',')
@bot.event @bot.event
async def on_command_error(ctx, error): async def on_command_error(ctx: commands.Context, error: commands.CommandError) -> None:
from modules.unhandledexception import handle_exception from modules.unhandledexception import handle_exception
if isinstance(error, commands.CommandInvokeError): if isinstance(error, commands.CommandInvokeError):
original = error.original original: Exception = error.original
handle_exception( handle_exception(
type(original), original, original.__traceback__, type(original), original, original.__traceback__,
context=f"Command: {ctx.command} | User: {ctx.author}" context=f"Command: {ctx.command} | User: {ctx.author}"
@ -147,46 +156,47 @@ async def on_command_error(ctx, error):
context=f"Command: {ctx.command} | User: {ctx.author}" context=f"Command: {ctx.command} | User: {ctx.author}"
) )
# Command: Retrain the Markov model from memory # Command: Retrain the Markov model from memory
@bot.hybrid_command(description=f"{get_translation(LOCALE, 'command_desc_retrain')}") @bot.hybrid_command(description=f"{(_('command_desc_retrain'))}")
async def retrain(ctx): async def retrain(ctx: commands.Context) -> None:
if ctx.author.id != ownerid: if ctx.author.id != ownerid:
return return
message_ref = await send_message(ctx, f"{get_translation(LOCALE, 'command_markov_retrain')}") # send_message from modules/sentenceprocessing.py message_ref: MessageReference = await send_message(ctx, f"{(_('command_markov_retrain'))}")
try: try:
with open(MEMORY_FILE, 'r') as f: with open(MEMORY_FILE, 'r') as f:
memory = json.load(f) memory: List[str] = json.load(f)
except FileNotFoundError: except FileNotFoundError:
await send_message(ctx, f"{get_translation(LOCALE, 'command_markov_memory_not_found')}") await send_message(ctx, f"{(_('command_markov_memory_not_found'))}")
return return
except json.JSONDecodeError: except json.JSONDecodeError:
await send_message(ctx, f"{get_translation(LOCALE, 'command_markov_memory_is_corrupt')}") await send_message(ctx, f"{(_('command_markov_memory_is_corrupt'))}")
return return
data_size = len(memory)
processed_data = 0 data_size: int = len(memory)
processing_message_ref = await send_message(ctx, f"{get_translation(LOCALE, 'command_markov_retraining').format(processed_data=processed_data, data_size=data_size)}") processed_data: int = 0
start_time = time.time() processing_message_ref: MessageReference = await send_message(ctx, f"{(_('command_markov_retraining')).format(processed_data=processed_data, data_size=data_size)}")
start_time: float = time.time()
for i, data in enumerate(memory): for i, data in enumerate(memory):
processed_data += 1 processed_data += 1
if processed_data % 1000 == 0 or processed_data == data_size: if processed_data % 1000 == 0 or processed_data == data_size:
await send_message(ctx, f"{get_translation(LOCALE, 'command_markov_retraining').format(processed_data=processed_data, data_size=data_size)}", edit=True, message_reference=processing_message_ref) await send_message(ctx, f"{(_('command_markov_retraining')).format(processed_data=processed_data, data_size=data_size)}", edit=True, message_reference=processing_message_ref)
global markov_model global markov_model
markov_model = train_markov_model(memory) markov_model = train_markov_model(memory)
save_markov_model(markov_model) save_markov_model(markov_model)
await send_message(ctx, f"{get_translation(LOCALE, 'command_markov_retrain_successful').format(data_size=data_size)}", edit=True, message_reference=processing_message_ref) await send_message(ctx, f"{(_('command_markov_retrain_successful')).format(data_size=data_size)}", edit=True, message_reference=processing_message_ref)
# Command: Generate a sentence using the Markov model # Command: Generate a sentence using the Markov model
@bot.hybrid_command(description=f"{get_translation(LOCALE, 'command_desc_talk')}") @bot.hybrid_command(description=f"{(_('command_desc_talk'))}")
async def talk(ctx, sentence_size: int = 5): async def talk(ctx: commands.Context, sentence_size: int = 5) -> None:
if not markov_model: if not markov_model:
await send_message(ctx, f"{get_translation(LOCALE, 'command_talk_insufficent_text')}") await send_message(ctx, f"{(_('command_talk_insufficent_text'))}")
return return
response = None response: Optional[str] = None
for _ in range(20): for _ in range(20):
if sentence_size == 1: if sentence_size == 1:
response = markov_model.make_short_sentence(max_chars=100, tries=100) response = markov_model.make_short_sentence(max_chars=100, tries=100)
@ -202,45 +212,42 @@ async def talk(ctx, sentence_size: int = 5):
break break
if response: if response:
cleaned_response = re.sub(r'[^\w\s]', '', response).lower() cleaned_response: str = re.sub(r'[^\w\s]', '', response).lower()
coherent_response = rephrase_for_coherence(cleaned_response) coherent_response: str = rephrase_for_coherence(cleaned_response)
if random.random() < 0.9 and is_positive(coherent_response): if random.random() < 0.9 and is_positive(coherent_response):
gif_url = random.choice(positive_gifs) gif_url: str = random.choice(positive_gifs)
combined_message = f"{coherent_response}\n[jif]({gif_url})" combined_message: str = f"{coherent_response}\n[jif]({gif_url})"
else: else:
combined_message = coherent_response combined_message: str = coherent_response
print(combined_message) print(combined_message)
os.environ['gooberlatestgen'] = combined_message os.environ['gooberlatestgen'] = combined_message
await send_message(ctx, combined_message) await send_message(ctx, combined_message)
else: else:
await send_message(ctx, f"{get_translation(LOCALE, 'command_talk_generation_fail')}") await send_message(ctx, f"{(_('command_talk_generation_fail'))}")
# Remove default help command to use custom help # Command: Generate an image
bot.help_command = None @bot.hybrid_command(description=f"{(_('command_desc_help'))}")
async def meme(ctx: commands.Context) -> None:
assets_folder: str = "assets/images"
temp_input: Optional[str] = None
# Command: Show help information def get_random_asset_image() -> Optional[str]:
@bot.hybrid_command(description=f"{get_translation(LOCALE, 'command_desc_help')}") files: List[str] = [f for f in os.listdir(assets_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))]
async def image(ctx):
assets_folder = "assets/images"
temp_input = None
def get_random_asset_image():
files = [f for f in os.listdir(assets_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))]
if not files: if not files:
return None return None
return os.path.join(assets_folder, random.choice(files)) return os.path.join(assets_folder, random.choice(files))
if ctx.message.attachments: if ctx.message.attachments:
attachment = ctx.message.attachments[0] attachment: discord.Attachment = ctx.message.attachments[0]
if attachment.content_type and attachment.content_type.startswith("image/"): if attachment.content_type and attachment.content_type.startswith("image/"):
ext = os.path.splitext(attachment.filename)[1] ext: str = os.path.splitext(attachment.filename)[1]
temp_input = f"tempy{ext}" temp_input = f"tempy{ext}"
await attachment.save(temp_input) await attachment.save(temp_input)
input_path = temp_input input_path: str = temp_input
else: else:
fallback_image = get_random_asset_image() fallback_image: Optional[str] = get_random_asset_image()
if fallback_image is None: if fallback_image is None:
await ctx.reply(get_translation(LOCALE, "no_image_available")) await ctx.reply((_('no_image_available')))
return return
temp_input = tempfile.mktemp(suffix=os.path.splitext(fallback_image)[1]) temp_input = tempfile.mktemp(suffix=os.path.splitext(fallback_image)[1])
shutil.copy(fallback_image, temp_input) shutil.copy(fallback_image, temp_input)
@ -248,19 +255,18 @@ async def image(ctx):
else: else:
fallback_image = get_random_asset_image() fallback_image = get_random_asset_image()
if fallback_image is None: if fallback_image is None:
await ctx.reply("No image available to process.") await ctx.reply((_('no_image_available')))
return return
# got lazy here
temp_input = tempfile.mktemp(suffix=os.path.splitext(fallback_image)[1]) temp_input = tempfile.mktemp(suffix=os.path.splitext(fallback_image)[1])
shutil.copy(fallback_image, temp_input) shutil.copy(fallback_image, temp_input)
input_path = temp_input input_path = temp_input
output_path = await gen_image(input_path) output_path: Optional[str] = await gen_meme(input_path)
if output_path is None or not os.path.isfile(output_path): if output_path is None or not os.path.isfile(output_path):
if temp_input and os.path.exists(temp_input): if temp_input and os.path.exists(temp_input):
os.remove(temp_input) os.remove(temp_input)
await ctx.reply(get_translation(LOCALE, "failed_generate_image")) await ctx.reply((_('failed_generate_image')))
return return
await ctx.send(file=discord.File(output_path)) await ctx.send(file=discord.File(output_path))
@ -268,43 +274,88 @@ async def image(ctx):
if temp_input and os.path.exists(temp_input): if temp_input and os.path.exists(temp_input):
os.remove(temp_input) os.remove(temp_input)
# New demotivator command
@bot.hybrid_command(description="Generate a demotivator poster with two lines of text")
async def demotivator(ctx: commands.Context) -> None:
assets_folder: str = "assets/images"
temp_input: Optional[str] = None
def get_random_asset_image() -> Optional[str]:
files: List[str] = [f for f in os.listdir(assets_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))]
if not files:
return None
return os.path.join(assets_folder, random.choice(files))
# Remove default help command to use custom help if ctx.message.attachments:
bot.help_command = None attachment: discord.Attachment = ctx.message.attachments[0]
if attachment.content_type and attachment.content_type.startswith("image/"):
ext: str = os.path.splitext(attachment.filename)[1]
temp_input = f"tempy{ext}"
await attachment.save(temp_input)
input_path: str = temp_input
else:
fallback_image: Optional[str] = get_random_asset_image()
if fallback_image is None:
await ctx.reply((_('no_image_available')))
return
temp_input = tempfile.mktemp(suffix=os.path.splitext(fallback_image)[1])
shutil.copy(fallback_image, temp_input)
input_path = temp_input
else:
fallback_image = get_random_asset_image()
if fallback_image is None:
await ctx.reply((_('no_image_available')))
return
temp_input = tempfile.mktemp(suffix=os.path.splitext(fallback_image)[1])
shutil.copy(fallback_image, temp_input)
input_path = temp_input
output_path: Optional[str] = await gen_demotivator(input_path)
if output_path is None or not os.path.isfile(output_path):
if temp_input and os.path.exists(temp_input):
os.remove(temp_input)
await ctx.reply("Failed to generate demotivator.")
return
await ctx.send(file=discord.File(output_path))
if temp_input and os.path.exists(temp_input):
os.remove(temp_input)
bot.remove_command('help')
# Command: Show help information # Command: Show help information
@bot.hybrid_command(description=f"{get_translation(LOCALE, 'command_desc_help')}") @bot.hybrid_command(description=f"{(_('command_desc_help'))}")
async def help(ctx): async def help(ctx: commands.Context) -> None:
embed = discord.Embed( embed: discord.Embed = discord.Embed(
title=f"{get_translation(LOCALE, 'command_help_embed_title')}", title=f"{(_('command_help_embed_title'))}",
description=f"{get_translation(LOCALE, 'command_help_embed_desc')}", description=f"{(_('command_help_embed_desc'))}",
color=Colour(0x000000) color=Colour(0x000000)
) )
command_categories = { command_categories: Dict[str, List[str]] = {
f"{get_translation(LOCALE, 'command_help_categories_general')}": ["mem", "talk", "about", "ping", "image"], f"{(_('command_help_categories_general'))}": ["mem", "talk", "about", "ping", "image"],
f"{get_translation(LOCALE, 'command_help_categories_admin')}": ["stats", "retrain"] f"{(_('command_help_categories_admin'))}": ["stats", "retrain"]
} }
custom_commands = [] custom_commands: List[str] = []
for cog_name, cog in bot.cogs.items(): for cog_name, cog in bot.cogs.items():
for command in cog.get_commands(): for command in cog.get_commands():
if command.name not in command_categories[f"{get_translation(LOCALE, 'command_help_categories_general')}"] and command.name not in command_categories[f"{get_translation(LOCALE, 'command_help_categories_admin')}"]: if command.name not in command_categories[f"{(_('command_help_categories_general'))}"] and command.name not in command_categories[f"{(_('command_help_categories_admin'))}"]:
custom_commands.append(command.name) custom_commands.append(command.name)
if custom_commands: if custom_commands:
embed.add_field(name=f"{get_translation(LOCALE, 'command_help_categories_custom')}", value="\n".join([f"{PREFIX}{command}" for command in custom_commands]), inline=False) embed.add_field(name=f"{(_('command_help_categories_custom'))}", value="\n".join([f"{PREFIX}{command}" for command in custom_commands]), inline=False)
for category, commands_list in command_categories.items(): for category, commands_list in command_categories.items():
commands_in_category = "\n".join([f"{PREFIX}{command}" for command in commands_list]) commands_in_category: str = "\n".join([f"{PREFIX}{command}" for command in commands_list])
embed.add_field(name=category, value=commands_in_category, inline=False) embed.add_field(name=category, value=commands_in_category, inline=False)
await send_message(ctx, embed=embed) await send_message(ctx, embed=embed)
# Event: Called on every message # Event: Called on every message
@bot.event @bot.event
async def on_message(message): async def on_message(message: discord.Message) -> None:
global memory, markov_model global memory, markov_model
# Ignore bot messages # Ignore bot messages
@ -317,111 +368,112 @@ async def on_message(message):
# Process commands if message starts with a command prefix # Process commands if message starts with a command prefix
if message.content.startswith((f"{PREFIX}talk", f"{PREFIX}mem", f"{PREFIX}help", f"{PREFIX}stats", f"{PREFIX}")): if message.content.startswith((f"{PREFIX}talk", f"{PREFIX}mem", f"{PREFIX}help", f"{PREFIX}stats", f"{PREFIX}")):
print(f"{get_translation(LOCALE, 'command_ran').format(message=message)}") print(f"{(_('failed_generate_image')).format(message=message)}")
await bot.process_commands(message) await bot.process_commands(message)
return return
# Ignore messages with profanity # Ignore messages with profanity
if profanity.contains_profanity(message.content): # profanity from better_profanity if profanity.contains_profanity(message.content):
return return
# Add user messages to memory for training if enabled # Add user messages to memory for training if enabled
if message.content: if message.content:
if not USERTRAIN_ENABLED: if not USERTRAIN_ENABLED:
return return
formatted_message = append_mentions_to_18digit_integer(message.content) # append_mentions_to_18digit_integer from modules/sentenceprocessing.py formatted_message: str = append_mentions_to_18digit_integer(message.content)
cleaned_message = preprocess_message(formatted_message) # preprocess_message from modules/sentenceprocessing.py cleaned_message: str = preprocess_message(formatted_message)
if cleaned_message: if cleaned_message:
memory.append(cleaned_message) memory.append(cleaned_message)
save_memory(memory) # save_memory from modules/markovmemory.py save_memory(memory)
# Process any commands in the message # Process any commands in the message
await bot.process_commands(message) await bot.process_commands(message)
# Event: Called on every interaction (slash command, etc.) # Event: Called on every interaction (slash command, etc.)
@bot.event @bot.event
async def on_interaction(interaction): async def on_interaction(interaction: discord.Interaction) -> None:
print(f"{get_translation(LOCALE, 'command_ran_s').format(interaction=interaction)}{interaction.data['name']}") print(f"{(_('command_ran_s')).format(interaction=interaction)}{interaction.data['name']}")
# Global check: Block blacklisted users from running commands # Global check: Block blacklisted users from running commands
@bot.check @bot.check
async def block_blacklisted(ctx): async def block_blacklisted(ctx: commands.Context) -> bool:
if str(ctx.author.id) in BLACKLISTED_USERS: if str(ctx.author.id) in BLACKLISTED_USERS:
try: try:
if isinstance(ctx, discord.Interaction): if isinstance(ctx, discord.Interaction):
if not ctx.response.is_done(): if not ctx.response.is_done():
await ctx.response.send_message(get_translation(LOCALE, "blacklisted"), ephemeral=True) await ctx.response.send_message((_('blacklisted')), ephemeral=True)
else: else:
await ctx.followup.send(get_translation(LOCALE, "blacklisted"), ephemeral=True) await ctx.followup.send((_('blacklisted')), ephemeral=True)
else: else:
await ctx.send(get_translation(LOCALE, "blacklisted_user"), ephemeral=True) await ctx.send((_('blacklisted_user')), ephemeral=True)
except: except:
pass pass
return False return False
return True return True
# Command: Show bot latency # Command: Show bot latency
@bot.hybrid_command(description=f"{get_translation(LOCALE, 'command_desc_ping')}") @bot.hybrid_command(description=f"{(_('command_desc_ping'))}")
async def ping(ctx): async def ping(ctx: commands.Context) -> None:
await ctx.defer() await ctx.defer()
latency = round(bot.latency * 1000) latency: int = round(bot.latency * 1000)
LOLembed = discord.Embed( LOLembed: discord.Embed = discord.Embed(
title="Pong!!", title="Pong!!",
description=( description=(
f"{PING_LINE}\n" f"{PING_LINE}\n"
f"`{get_translation(LOCALE, 'command_ping_embed_desc')}: {latency}ms`\n" f"`{(_('command_ping_embed_desc'))}: {latency}ms`\n"
), ),
color=Colour(0x000000) color=Colour(0x000000)
) )
LOLembed.set_footer(text=f"{get_translation(LOCALE, 'command_ping_footer')} {ctx.author.name}", icon_url=ctx.author.avatar.url) LOLembed.set_footer(text=f"{(_('command_ping_footer'))} {ctx.author.name}", icon_url=ctx.author.avatar.url)
await ctx.send(embed=LOLembed) await ctx.send(embed=LOLembed)
# Command: Show about information # Command: Show about information
@bot.hybrid_command(description=f"{get_translation(LOCALE, 'command_about_desc')}") @bot.hybrid_command(description=f"{(_('command_about_desc'))}")
async def about(ctx): async def about(ctx: commands.Context) -> None:
print("-----------------------------------\n\n") print("-----------------------------------\n\n")
latest_version = check_for_update() # check_for_update from modules/version.py latest_version: str = check_for_update()
print("-----------------------------------") print("-----------------------------------")
embed = discord.Embed(title=f"{get_translation(LOCALE, 'command_about_embed_title')}", description="", color=Colour(0x000000)) embed: discord.Embed = discord.Embed(title=f"{(_('command_about_embed_title'))}", description="", color=Colour(0x000000))
embed.add_field(name=f"{get_translation(LOCALE, 'command_about_embed_field1')}", value=f"{NAME}", inline=False) embed.add_field(name=f"{(_('command_about_embed_field1'))}", value=f"{NAME}", inline=False)
embed.add_field(name=f"{get_translation(LOCALE, 'command_about_embed_field2name')}", value=f"{get_translation(LOCALE, 'command_about_embed_field2value').format(local_version=local_version, latest_version=latest_version)}", inline=False) embed.add_field(name=f"{(_('command_about_embed_field2name'))}", value=f"{(_('command_about_embed_field2value')).format(local_version=local_version, latest_version=latest_version)}", inline=False)
await send_message(ctx, embed=embed) await send_message(ctx, embed=embed)
# Command: Show bot statistics (admin only) # Command: Show bot statistics (admin only)
@bot.hybrid_command(description="stats") @bot.hybrid_command(description="stats")
async def stats(ctx): async def stats(ctx: commands.Context) -> None:
if ctx.author.id != ownerid: if ctx.author.id != ownerid:
return return
print("-----------------------------------\n\n") print("-----------------------------------\n\n")
latest_version = check_for_update() # check_for_update from modules/version.py latest_version: str = check_for_update()
print("-----------------------------------") print("-----------------------------------")
memory_file = 'memory.json' memory_file: str = 'memory.json'
file_size = os.path.getsize(memory_file) file_size: int = os.path.getsize(memory_file)
with open(memory_file, 'r') as file: with open(memory_file, 'r') as file:
line_count = sum(1 for _ in file) line_count: int = sum(1 for _ in file)
embed = discord.Embed(title=f"{get_translation(LOCALE, 'command_stats_embed_title')}", description=f"{get_translation(LOCALE, 'command_stats_embed_desc')}", color=Colour(0x000000))
embed.add_field(name=f"{get_translation(LOCALE, 'command_stats_embed_field1name')}", value=f"{get_translation(LOCALE, 'command_stats_embed_field1value').format(file_size=file_size, line_count=line_count)}", inline=False) embed: discord.Embed = discord.Embed(title=f"{(_('command_stats_embed_title'))}", description=f"{(_('command_stats_embed_desc'))}", color=Colour(0x000000))
embed.add_field(name=f"{get_translation(LOCALE, 'command_stats_embed_field2name')}", value=f"{get_translation(LOCALE, 'command_stats_embed_field2value').format(local_version=local_version, latest_version=latest_version)}", inline=False) embed.add_field(name=f"{(_('command_stats_embed_field1name'))}", value=f"{(_('command_stats_embed_field1value')).format(file_size=file_size, line_count=line_count)}", inline=False)
embed.add_field(name=f"{get_translation(LOCALE, 'command_stats_embed_field3name')}", value=f"{get_translation(LOCALE, 'command_stats_embed_field3value').format(NAME=NAME, PREFIX=PREFIX, ownerid=ownerid, cooldown_time=cooldown_time, PING_LINE=PING_LINE, showmemenabled=showmemenabled, USERTRAIN_ENABLED=USERTRAIN_ENABLED, song=song, splashtext=splashtext)}", inline=False) embed.add_field(name=f"{(_('command_stats_embed_field2name'))}", value=f"{(_('command_stats_embed_field2value')).format(local_version=local_version, latest_version=latest_version)}", inline=False)
embed.add_field(name=f"{(_('command_stats_embed_field3name'))}", value=f"{(_('command_stats_embed_field3value')).format(NAME=NAME, PREFIX=PREFIX, ownerid=ownerid, cooldown_time=cooldown_time, PING_LINE=PING_LINE, showmemenabled=showmemenabled, USERTRAIN_ENABLED=USERTRAIN_ENABLED, song=song, splashtext=splashtext)}", inline=False)
await send_message(ctx, embed=embed) await send_message(ctx, embed=embed)
# Command: Upload memory.json to litterbox.catbox.moe and return the link # Command: Upload memory.json to litterbox.catbox.moe and return the link
@bot.hybrid_command() @bot.hybrid_command()
async def mem(ctx): async def mem(ctx: commands.Context) -> None:
if showmemenabled != "true": if showmemenabled != "true":
return return
command = """curl -F "reqtype=fileupload" -F "time=1h" -F "fileToUpload=@memory.json" https://litterbox.catbox.moe/resources/internals/api.php""" command: str = """curl -F "reqtype=fileupload" -F "time=1h" -F "fileToUpload=@memory.json" https://litterbox.catbox.moe/resources/internals/api.php"""
memorylitter = subprocess.run(command, shell=True, capture_output=True, text=True) memorylitter: subprocess.CompletedProcess = subprocess.run(command, shell=True, capture_output=True, text=True)
print(memorylitter) print(memorylitter)
await send_message(ctx, memorylitter.stdout.strip()) await send_message(ctx, memorylitter.stdout.strip())
# Helper: Improve sentence coherence (simple capitalization fix) # Helper: Improve sentence coherence (simple capitalization fix)
def improve_sentence_coherence(sentence): def improve_sentence_coherence(sentence: str) -> str:
# Capitalizes "i" to "I" in the sentence # Capitalizes "i" to "I" in the sentence
sentence = sentence.replace(" i ", " I ") sentence = sentence.replace(" i ", " I ")
return sentence return sentence

View file

@ -1,19 +1,19 @@
import requests import requests
import os import os
import modules.globalvars as gv import modules.globalvars as gv
from modules.translations import * from modules.translations import _
from modules.markovmemory import get_file_info from modules.markovmemory import get_file_info
# Ping the server to check if it's alive and send some info # Ping the server to check if it's alive and send some info
def ping_server(): def ping_server():
if gv.ALIVEPING == "false": if gv.ALIVEPING == "false":
# If pinging is disabled, print message and set environment variable # If pinging is disabled, print message and set environment variable
print(f"{gv.YELLOW}{get_translation(gv.LOCALE, 'pinging_disabled')}{RESET}") print(f"{gv.YELLOW}{(_('pinging_disabled'))}{gv.RESET}")
os.environ['gooberauthenticated'] = 'No' os.environ['gooberauthenticated'] = 'No'
return return
# Get server alert message # Get server alert message
goobres = requests.get(f"{gv.VERSION_URL}/alert") goobres = requests.get(f"{gv.VERSION_URL}/alert")
print(f"{get_translation(gv.LOCALE, 'goober_server_alert')}{goobres.text}") print(f"{(_('goober_server_alert'))}{goobres.text}")
# Gather file info for payload # Gather file info for payload
file_info = get_file_info(gv.MEMORY_FILE) file_info = get_file_info(gv.MEMORY_FILE)
payload = { payload = {
@ -28,15 +28,15 @@ def ping_server():
response = requests.post(gv.VERSION_URL+"/ping", json=payload) response = requests.post(gv.VERSION_URL+"/ping", json=payload)
if response.status_code == 200: if response.status_code == 200:
# Success: print message and set environment variable # Success: print message and set environment variable
print(f"{gv.GREEN}{get_translation(gv.LOCALE, 'goober_ping_success').format(NAME=gv.NAME)}{RESET}") print(f"{gv.GREEN}{(_('goober_ping_success')).format(NAME=gv.NAME)}{gv.RESET}")
os.environ['gooberauthenticated'] = 'Yes' os.environ['gooberauthenticated'] = 'Yes'
else: else:
# Failure: print error and set environment variable # Failure: print error and set environment variable
print(f"{RED}{get_translation(gv.LOCALE, 'goober_ping_fail')} {response.status_code}{RESET}") print(f"{gv.RED}{(_('goober_ping_fail'))} {response.status_code}{gv.RESET}")
os.environ['gooberauthenticated'] = 'No' os.environ['gooberauthenticated'] = 'No'
except Exception as e: except Exception as e:
# Exception: print error and set environment variable # Exception: print error and set environment variable
print(f"{RED}{get_translation(gv.LOCALE, 'goober_ping_fail2')} {str(e)}{RESET}") print(f"{gv.RED}{(_('goober_ping_fail2'))} {str(e)}{gv.RESET}")
os.environ['gooberauthenticated'] = 'No' os.environ['gooberauthenticated'] = 'No'
# Check if a given name is available for registration # Check if a given name is available for registration
@ -52,11 +52,11 @@ def is_name_available(NAME):
return data.get("available", False) return data.get("available", False)
else: else:
# Print error if request failed # Print error if request failed
print(f"{get_translation(gv.LOCALE, 'name_check')}", response.json()) print(f"{(_('name_check'))}", response.json())
return False return False
except Exception as e: except Exception as e:
# Print exception if request failed # Print exception if request failed
print(f"{get_translation(gv.LOCALE, 'name_check2')}", e) print(f"{(_('name_check2'))}", e)
return False return False
# Register a new name with the server # Register a new name with the server
@ -70,7 +70,7 @@ def register_name(NAME):
if os.getenv("gooberTOKEN"): if os.getenv("gooberTOKEN"):
return return
# Name taken: print error and exit # Name taken: print error and exit
print(f"{RED}{get_translation(gv.LOCALE, 'name_taken')}{RESET}") print(f"{RED}{(_('name_taken'))}{gv.RESET}")
quit() quit()
# Register the name # Register the name
response = requests.post(f"{gv.VERSION_URL}/register", json={"name": NAME}, headers={"Content-Type": "application/json"}) response = requests.post(f"{gv.VERSION_URL}/register", json={"name": NAME}, headers={"Content-Type": "application/json"})
@ -79,18 +79,18 @@ def register_name(NAME):
token = data.get("token") token = data.get("token")
if not os.getenv("gooberTOKEN"): if not os.getenv("gooberTOKEN"):
# Print instructions to add token and exit # Print instructions to add token and exit
print(f"{gv.GREEN}{get_translation(gv.LOCALE, 'add_token').format(token=token)} gooberTOKEN=<token>.{gv.RESET}") print(f"{gv.GREEN}{(_('add_token')).format(token=token)} gooberTOKEN=<token>.{gv.gv.RESET}")
quit() quit()
else: else:
print(f"{gv.GREEN}{gv.RESET}") print(f"{gv.GREEN}{gv.gv.RESET}")
return token return token
else: else:
# Print error if registration failed # Print error if registration failed
print(f"{gv.RED}{get_translation(gv.LOCALE, 'token_exists').format()}{RESET}", response.json()) print(f"{gv.RED}{(_('token_exists')).format()}{gv.RESET}", response.json())
return None return None
except Exception as e: except Exception as e:
# Print exception if registration failed # Print exception if registration failed
print(f"{gv.RED}{get_translation(gv.LOCALE, 'registration_error').format()}{RESET}", e) print(f"{gv.RED}{(_('registration_error')).format()}{gv.RESET}", e)
return None return None
# Attempt to register the name at module load # Attempt to register the name at module load

View file

@ -19,6 +19,7 @@ TOKEN = os.getenv("DISCORD_BOT_TOKEN", "0")
PREFIX = os.getenv("BOT_PREFIX", "g.") PREFIX = os.getenv("BOT_PREFIX", "g.")
hourlyspeak = int(os.getenv("hourlyspeak", "0")) hourlyspeak = int(os.getenv("hourlyspeak", "0"))
PING_LINE = os.getenv("PING_LINE") PING_LINE = os.getenv("PING_LINE")
CHECKS_DISABLED = os.getenv("CHECKS_DISABLED")
LOCALE = os.getenv("locale", "en") LOCALE = os.getenv("locale", "en")
gooberTOKEN = os.getenv("gooberTOKEN") gooberTOKEN = os.getenv("gooberTOKEN")
cooldown_time = os.getenv("cooldown") cooldown_time = os.getenv("cooldown")
@ -39,5 +40,6 @@ arch = platform.machine()
slash_commands_enabled = False slash_commands_enabled = False
launched = False launched = False
latest_version = "0.0.0" latest_version = "0.0.0"
local_version = "1.0.8" local_version = "2.0.0"
os.environ['gooberlocal_version'] = local_version os.environ['gooberlocal_version'] = local_version
beta = True

View file

View file

@ -3,7 +3,7 @@ import json
import markovify import markovify
import pickle import pickle
from modules.globalvars import * from modules.globalvars import *
from modules.translations import * from modules.translations import _
# Get file size and line count for a given file path # Get file size and line count for a given file path
def get_file_info(file_path): def get_file_info(file_path):
@ -64,8 +64,8 @@ def load_markov_model(filename='markov_model.pkl'):
try: try:
with open(filename, 'rb') as f: with open(filename, 'rb') as f:
model = pickle.load(f) model = pickle.load(f)
print(f"{GREEN}{get_translation(LOCALE, 'model_loaded')} {filename}.{RESET}") print(f"{GREEN}{_('model_loaded')} {filename}.{RESET}")
return model return model
except FileNotFoundError: except FileNotFoundError:
print(f"{RED}{filename} {get_translation(LOCALE, 'not_found')}{RESET}") print(f"{RED}{filename} {_('not_found')}{RESET}")
return None return None

View file

@ -1,5 +1,5 @@
from modules.globalvars import * from modules.globalvars import *
from modules.translations import get_translation from modules.translations import _
import time import time
import os import os
@ -14,7 +14,7 @@ try:
import psutil import psutil
except ImportError: except ImportError:
psutilavaliable = False psutilavaliable = False
print(RED, get_translation(LOCALE, 'missing_requests_psutil'), RESET) print(RED, _('missing_requests_psutil'), RESET)
import re import re
@ -42,7 +42,7 @@ def check_requirements():
requirements_path = os.path.abspath(requirements_path) requirements_path = os.path.abspath(requirements_path)
if not os.path.exists(requirements_path): if not os.path.exists(requirements_path):
print(f"{RED}{get_translation(LOCALE, 'requirements_not_found').format(path=requirements_path)}{RESET}") print(f"{RED}{(_('requirements_not_found')).format(path=requirements_path)}{RESET}")
return return
with open(requirements_path, 'r') as f: with open(requirements_path, 'r') as f:
@ -74,31 +74,31 @@ def check_requirements():
continue continue
requirements.add(pkg) requirements.add(pkg)
except Exception as e: except Exception as e:
print(f"{YELLOW}{get_translation(LOCALE, 'warning_failed_parse_imports').format(filename=filename, error=e)}{RESET}") print(f"{YELLOW}{(_('warning_failed_parse_imports')).format(filename=filename, error=e)}{RESET}")
else: else:
print(f"{YELLOW}{get_translation(LOCALE, 'cogs_dir_not_found').format(path=cogs_dir)}{RESET}") print(f"{YELLOW}{(_('cogs_dir_not_found')).format(path=cogs_dir)}{RESET}")
installed_packages = {dist.metadata['Name'].lower() for dist in importlib.metadata.distributions()} installed_packages = {dist.metadata['Name'].lower() for dist in importlib.metadata.distributions()}
missing = [] missing = []
for req in sorted(requirements): for req in sorted(requirements):
if req in STD_LIB_MODULES or req == 'modules': if req in STD_LIB_MODULES or req == 'modules':
print(get_translation(LOCALE, "std_lib_local_skipped").format(package=req)) print((_('std_lib_local_skipped')).format(package=req))
continue continue
check_name = PACKAGE_ALIASES.get(req, req).lower() check_name = PACKAGE_ALIASES.get(req, req).lower()
if check_name in installed_packages: if check_name in installed_packages:
print(f"[ {GREEN}{get_translation(LOCALE, 'ok_installed').format(package=check_name)}{RESET} ] {check_name}") print(f"[ {GREEN}{(_('ok_installed')).format(package=check_name)}{RESET} ] {check_name}")
else: else:
print(f"[ {RED}{get_translation(LOCALE, 'missing_package').format(package=check_name)}{RESET} ] {check_name} {get_translation(LOCALE, 'missing_package2')}") print(f"[ {RED}{(_('missing_package')).format(package=check_name)}{RESET} ] {check_name} {(_('missing_package2'))}")
missing.append(check_name) missing.append(check_name)
if missing: if missing:
print(RED, get_translation(LOCALE, "missing_packages_detected"), RESET) print(RED, _('missing_packages_detected'), RESET)
for pkg in missing: for pkg in missing:
print(f" - {pkg}") print(f" - {pkg}")
print(get_translation(LOCALE, "telling_goober_central").format(url=VERSION_URL)) print((_('telling_goober_central')).format(url=VERSION_URL))
payload = { payload = {
"name": NAME, "name": NAME,
"version": local_version, "version": local_version,
@ -108,10 +108,10 @@ def check_requirements():
try: try:
response = requests.post(VERSION_URL + "/ping", json=payload) response = requests.post(VERSION_URL + "/ping", json=payload)
except Exception as e: except Exception as e:
print(f"{RED}{get_translation(LOCALE, 'failed_to_contact').format(url=VERSION_URL, error=e)}{RESET}") print(f"{RED}{(_('failed_to_contact')).format(url=VERSION_URL, error=e)}{RESET}")
sys.exit(1) sys.exit(1)
else: else:
print(get_translation(LOCALE, "all_requirements_satisfied")) print(_('all_requirements_satisfied'))
def check_latency(): def check_latency():
host = "1.1.1.1" host = "1.1.1.1"
@ -137,16 +137,16 @@ def check_latency():
match = re.search(latency_pattern, result.stdout) match = re.search(latency_pattern, result.stdout)
if match: if match:
latency_ms = float(match.group(1)) latency_ms = float(match.group(1))
print(get_translation(LOCALE, "ping_to").format(host=host, latency=latency_ms)) print((_('ping_to')).format(host=host, latency=latency_ms))
if latency_ms > 300: if latency_ms > 300:
print(f"{YELLOW}{get_translation(LOCALE, 'high_latency')}{RESET}") print(f"{YELLOW}{(_('high_latency'))}{RESET}")
else: else:
print(f"{YELLOW}{get_translation(LOCALE, 'could_not_parse_latency')}{RESET}") print(f"{YELLOW}{(_('could_not_parse_latency'))}{RESET}")
else: else:
print(result.stderr) print(result.stderr)
print(f"{RED}{get_translation(LOCALE, 'ping_failed').format(host=host)}{RESET}") print(f"{RED}{(_('ping_failed')).format(host=host)}{RESET}")
except Exception as e: except Exception as e:
print(f"{RED}{get_translation(LOCALE, 'error_running_ping').format(error=e)}{RESET}") print(f"{RED}{(_('error_running_ping')).format(error=e)}{RESET}")
def check_memory(): def check_memory():
if psutilavaliable == False: if psutilavaliable == False:
@ -157,21 +157,21 @@ def check_memory():
used_memory = memory_info.used / (1024 ** 3) used_memory = memory_info.used / (1024 ** 3)
free_memory = memory_info.available / (1024 ** 3) free_memory = memory_info.available / (1024 ** 3)
print(get_translation(LOCALE, "memory_usage").format(used=used_memory, total=total_memory, percent=(used_memory / total_memory) * 100)) print((_('memory_usage')).format(used=used_memory, total=total_memory, percent=(used_memory / total_memory) * 100))
if used_memory > total_memory * 0.9: if used_memory > total_memory * 0.9:
print(f"{YELLOW}{get_translation(LOCALE, 'memory_above_90').format(percent=(used_memory / total_memory) * 100)}{RESET}") print(f"{YELLOW}{(_('memory_above_90')).format(percent=(used_memory / total_memory) * 100)}{RESET}")
print(get_translation(LOCALE, "total_memory").format(total=total_memory)) print((_('total_memory')).format(total=total_memory))
print(get_translation(LOCALE, "used_memory").format(used=used_memory)) print((_('used_memory')).format(used=used_memory))
if free_memory < 1: if free_memory < 1:
print(f"{RED}{get_translation(LOCALE, 'low_free_memory').format(free=free_memory)}{RESET}") print(f"{RED}{(_('low_free_memory')).format(free=free_memory)}{RESET}")
sys.exit(1) sys.exit(1)
except ImportError: except ImportError:
print("psutil is not installed. Memory check skipped.") print(_('psutil_not_installed')) # todo: translate this into italian and put it in the translations "psutil is not installed. Memory check skipped."
def check_cpu(): def check_cpu():
if psutilavaliable == False: if psutilavaliable == False:
return return
print(get_translation(LOCALE, "measuring_cpu")) print((_('measuring_cpu')))
cpu_per_core = psutil.cpu_percent(interval=1, percpu=True) cpu_per_core = psutil.cpu_percent(interval=1, percpu=True)
for idx, core_usage in enumerate(cpu_per_core): for idx, core_usage in enumerate(cpu_per_core):
bar_length = int(core_usage / 5) bar_length = int(core_usage / 5)
@ -182,33 +182,33 @@ def check_cpu():
color = YELLOW color = YELLOW
else: else:
color = GREEN color = GREEN
print(get_translation(LOCALE, "core_usage").format(idx=idx, bar=bar, usage=core_usage)) print((_('core_usage')).format(idx=idx, bar=bar, usage=core_usage))
total_cpu = sum(cpu_per_core) / len(cpu_per_core) total_cpu = sum(cpu_per_core) / len(cpu_per_core)
print(get_translation(LOCALE, "total_cpu_usage").format(usage=total_cpu)) print((_('total_cpu_usage')).format(usage=total_cpu))
if total_cpu > 85: if total_cpu > 85:
print(f"{YELLOW}{get_translation(LOCALE, 'high_avg_cpu').format(usage=total_cpu)}{RESET}") print(f"{YELLOW}{(_('high_avg_cpu')).format(usage=total_cpu)}{RESET}")
if total_cpu > 95: if total_cpu > 95:
print(f"{RED}{get_translation(LOCALE, 'really_high_cpu')}{RESET}") print(f"{RED}{(_('really_high_cpu'))}{RESET}")
sys.exit(1) sys.exit(1)
def check_memoryjson(): def check_memoryjson():
try: try:
print(get_translation(LOCALE, "memory_file").format(size=os.path.getsize(MEMORY_FILE) / (1024 ** 2))) print((_('memory_file')).format(size=os.path.getsize(MEMORY_FILE) / (1024 ** 2)))
if os.path.getsize(MEMORY_FILE) > 1_073_741_824: if os.path.getsize(MEMORY_FILE) > 1_073_741_824:
print(f"{YELLOW}{get_translation(LOCALE, 'memory_file_large')}{RESET}") print(f"{YELLOW}{(_('memory_file_large'))}{RESET}")
try: try:
with open(MEMORY_FILE, 'r', encoding='utf-8') as f: with open(MEMORY_FILE, 'r', encoding='utf-8') as f:
json.load(f) json.load(f)
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
print(f"{RED}{get_translation(LOCALE, 'memory_file_corrupted').format(error=e)}{RESET}") print(f"{RED}{(_('memory_file_corrupted')).format(error=e)}{RESET}")
print(f"{YELLOW}{get_translation(LOCALE, 'consider_backup_memory')}{RESET}") print(f"{YELLOW}{(_('consider_backup_memory'))}{RESET}")
except UnicodeDecodeError as e: except UnicodeDecodeError as e:
print(f"{RED}{get_translation(LOCALE, 'memory_file_encoding').format(error=e)}{RESET}") print(f"{RED}{(_('memory_file_encoding')).format(error=e)}{RESET}")
print(f"{YELLOW}{get_translation(LOCALE, 'consider_backup_memory')}{RESET}") print(f"{YELLOW}{(_('consider_backup_memory'))}{RESET}")
except Exception as e: except Exception as e:
print(f"{RED}{get_translation(LOCALE, 'error_reading_memory').format(error=e)}{RESET}") print(f"{RED}{(_('error_reading_memory')).format(error=e)}{RESET}")
except FileNotFoundError: except FileNotFoundError:
print(f"{YELLOW}{get_translation(LOCALE, 'memory_file_not_found')}{RESET}") print(f"{YELLOW}{(_('memory_file_not_found'))}{RESET}")
def presskey2skip(timeout): def presskey2skip(timeout):
if os.name == 'nt': if os.name == 'nt':
@ -241,9 +241,12 @@ def presskey2skip(timeout):
time.sleep(0.1) time.sleep(0.1)
finally: finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
beta = beta
def start_checks(): def start_checks():
print(get_translation(LOCALE, "running_prestart_checks")) if CHECKS_DISABLED == "True":
print(f"{YELLOW}{(_('checks_disabled'))}{RESET}")
return
print(_('running_prestart_checks'))
check_requirements() check_requirements()
check_latency() check_latency()
check_memory() check_memory()
@ -252,9 +255,13 @@ def start_checks():
if os.path.exists(".env"): if os.path.exists(".env"):
pass pass
else: else:
print(f"{YELLOW}{get_translation(LOCALE, 'env_file_not_found')}{RESET}") print(f"{YELLOW}{(_('env_file_not_found'))}{RESET}")
sys.exit(1) sys.exit(1)
print(get_translation(LOCALE, "continuing_in_seconds").format(seconds=5)) if beta == True:
print(f"{YELLOW}this build isnt finished yet, some things might not work as expected{RESET}")
else:
pass
print(_('continuing_in_seconds').format(seconds=5))
presskey2skip(timeout=5) presskey2skip(timeout=5)
os.system('cls' if os.name == 'nt' else 'clear') os.system('cls' if os.name == 'nt' else 'clear')
print(splashtext) print(splashtext)

View file

@ -1,6 +1,6 @@
import re import re
from modules.globalvars import * from modules.globalvars import *
from modules.translations import * from modules.translations import _
import spacy import spacy
from spacy.tokens import Doc from spacy.tokens import Doc
@ -13,12 +13,12 @@ def check_resources():
try: try:
nlp = spacy.load("en_core_web_sm") nlp = spacy.load("en_core_web_sm")
except OSError: except OSError:
print(get_translation(LOCALE, 'spacy_model_not_found')) print((_('spacy_model_not_found')))
spacy.cli.download("en_core_web_sm") spacy.cli.download("en_core_web_sm")
nlp = spacy.load("en_core_web_sm") nlp = spacy.load("en_core_web_sm")
if "spacytextblob" not in nlp.pipe_names: if "spacytextblob" not in nlp.pipe_names:
nlp.add_pipe("spacytextblob") nlp.add_pipe("spacytextblob")
print(get_translation(LOCALE, 'spacy_initialized')) print((_('spacy_initialized')))
check_resources() check_resources()
@ -26,7 +26,7 @@ def is_positive(sentence):
doc = nlp(sentence) doc = nlp(sentence)
sentiment_score = doc._.polarity # from spacytextblob sentiment_score = doc._.polarity # from spacytextblob
debug_message = f"{DEBUG}{get_translation(LOCALE, 'sentence_positivity')} {sentiment_score}{RESET}" debug_message = f"{DEBUG}{(_('sentence_positivity'))} {sentiment_score}{RESET}"
print(debug_message) print(debug_message)
return sentiment_score > 0.1 return sentiment_score > 0.1
@ -36,7 +36,7 @@ async def send_message(ctx, message=None, embed=None, file=None, edit=False, mes
try: try:
await message_reference.edit(content=message, embed=embed) await message_reference.edit(content=message, embed=embed)
except Exception as e: except Exception as e:
await ctx.send(f"{RED}{get_translation(LOCALE, 'edit_fail')} {e}{RESET}") await ctx.send(f"{RED}{(_('edit_fail'))} {e}{RESET}")
else: else:
if hasattr(ctx, "respond"): if hasattr(ctx, "respond"):
sent_message = None sent_message = None

View file

@ -1,39 +1,31 @@
import os import os
import json import json
import pathlib import pathlib
from modules.globalvars import RED, RESET from modules.globalvars import RED, RESET, LOCALE
# Load translations at module import
def load_translations(): def load_translations():
"""
Loads all translation JSON files from the 'locales' directory.
Returns a dictionary mapping language codes to their translation dictionaries.
"""
translations = {} translations = {}
# Get the path to the 'locales' directory (one level up from this file)
translations_dir = pathlib.Path(__file__).parent.parent / 'assets' / 'locales' translations_dir = pathlib.Path(__file__).parent.parent / 'assets' / 'locales'
# Iterate over all files in the 'locales' directory
for filename in os.listdir(translations_dir): for filename in os.listdir(translations_dir):
if filename.endswith(".json"): if filename.endswith(".json"):
# Extract language code from filename (e.g., 'en' from 'en.json')
lang_code = filename.replace(".json", "") lang_code = filename.replace(".json", "")
# Open and load the JSON file with open(translations_dir / filename, "r", encoding="utf-8") as f:
with open(os.path.join(translations_dir, filename), "r", encoding="utf-8") as f:
translations[lang_code] = json.load(f) translations[lang_code] = json.load(f)
return translations return translations
# Load all translations at module import
translations = load_translations() translations = load_translations()
def set_language(lang: str):
global LOCALE
LOCALE = lang if lang in translations else "en"
def get_translation(lang: str, key: str): def get_translation(lang: str, key: str):
"""
Retrieves the translation for a given key and language.
Falls back to English if the language is not found.
Prints a warning if the key is missing.
"""
# Get translations for the specified language, or fall back to English
lang_translations = translations.get(lang, translations["en"]) lang_translations = translations.get(lang, translations["en"])
if key not in lang_translations: if key not in lang_translations:
# Print a warning if the key is missing in the selected language
print(f"{RED}Missing key: {key} in language {lang}{RESET}") print(f"{RED}Missing key: {key} in language {lang}{RESET}")
# Return the translation if found, otherwise return the key itself
return lang_translations.get(key, key) return lang_translations.get(key, key)
def _(key: str) -> str:
return get_translation(LOCALE, key)

View file

@ -2,6 +2,7 @@ import sys
import traceback import traceback
import os import os
from modules.globalvars import RED, RESET, splashtext from modules.globalvars import RED, RESET, splashtext
from modules.translations import _
def handle_exception(exc_type, exc_value, exc_traceback, *, context=None): def handle_exception(exc_type, exc_value, exc_traceback, *, context=None):
os.system('cls' if os.name == 'nt' else 'clear') os.system('cls' if os.name == 'nt' else 'clear')
@ -14,7 +15,8 @@ def handle_exception(exc_type, exc_value, exc_traceback, *, context=None):
print(f"{RED}=====BEGINNING OF TRACEBACK====={RESET}") print(f"{RED}=====BEGINNING OF TRACEBACK====={RESET}")
traceback.print_exception(exc_type, exc_value, exc_traceback) traceback.print_exception(exc_type, exc_value, exc_traceback)
print(f"{RED}========END OF TRACEBACK========{RESET}") print(f"{RED}========END OF TRACEBACK========{RESET}")
print(f"{RED}An unhandled exception occurred. Please report this issue on GitHub.{RESET}") print(f"{RED}{_('unhandled_exception')}{RESET}")
if context: if context:
print(f"{RED}Context: {context}{RESET}") print(f"{RED}Context: {context}{RESET}")

View file

@ -1,4 +1,4 @@
from modules.translations import * from modules.translations import _
from modules.globalvars import * from modules.globalvars import *
import requests import requests
import subprocess import subprocess
@ -18,18 +18,18 @@ def is_remote_ahead(branch='main', remote='origin'):
# Automatically update the local repository if the remote is ahead # Automatically update the local repository if the remote is ahead
def auto_update(branch='main', remote='origin'): def auto_update(branch='main', remote='origin'):
if launched == True: if launched == True:
print(get_translation(LOCALE, "already_started")) print((_('already_started')))
return return
if AUTOUPDATE != "True": if AUTOUPDATE != "True":
pass # Auto-update is disabled pass # Auto-update is disabled
if is_remote_ahead(branch, remote): if is_remote_ahead(branch, remote):
print(get_translation(LOCALE, "remote_ahead").format(remote=remote, branch=branch)) print((_('remote_ahead')).format(remote=remote, branch=branch))
pull_result = run_cmd(f'git pull {remote} {branch}') pull_result = run_cmd(f'git pull {remote} {branch}')
print(pull_result) print(pull_result)
print(get_translation(LOCALE, "please_restart")) print((_('please_restart')))
sys.exit(0) sys.exit(0)
else: else:
print(get_translation(LOCALE, "local_ahead").format(remote=remote, branch=branch)) print((_('local_ahead')).format(remote=remote, branch=branch))
# Fetch the latest version info from the update server # Fetch the latest version info from the update server
def get_latest_version_info(): def get_latest_version_info():
@ -38,21 +38,23 @@ def get_latest_version_info():
if response.status_code == 200: if response.status_code == 200:
return response.json() return response.json()
else: else:
print(f"{RED}{get_translation(LOCALE, 'version_error')} {response.status_code}{RESET}") print(f"{RED}{(_('version_error'))} {response.status_code}{RESET}")
return None return None
except requests.RequestException as e: except requests.RequestException as e:
print(f"{RED}{get_translation(LOCALE, 'version_error')} {e}{RESET}") print(f"{RED}{(_('version_error'))} {e}{RESET}")
return None return None
# Check if an update is available and perform update if needed # Check if an update is available and perform update if needed
def check_for_update(): def check_for_update():
global latest_version, local_version, launched
launched = True
if ALIVEPING != "True": if ALIVEPING != "True":
pass # Update check is disabled return # Update check is disabled
global latest_version, local_version
latest_version_info = get_latest_version_info() latest_version_info = get_latest_version_info()
if not latest_version_info: if not latest_version_info:
print(f"{get_translation(LOCALE, 'fetch_update_fail')}") print(f"{(_('fetch_update_fail'))}")
return None, None return None, None
latest_version = latest_version_info.get("version") latest_version = latest_version_info.get("version")
@ -60,20 +62,20 @@ def check_for_update():
download_url = latest_version_info.get("download_url") download_url = latest_version_info.get("download_url")
if not latest_version or not download_url: if not latest_version or not download_url:
print(f"{RED}{get_translation(LOCALE, 'invalid_server')}{RESET}") print(f"{RED}{(_('invalid_server'))}{RESET}")
return None, None return None, None
# Check if local_version is valid # Check if local_version is valid
if local_version == "0.0.0" or None: if local_version == "0.0.0" or None:
print(f"{RED}{get_translation(LOCALE, 'cant_find_local_version')}{RESET}") print(f"{RED}{(_('cant_find_local_version'))}{RESET}")
return return
# Compare local and latest versions # Compare local and latest versions
if local_version < latest_version: if local_version < latest_version:
print(f"{YELLOW}{get_translation(LOCALE, 'new_version').format(latest_version=latest_version, local_version=local_version)}{RESET}") print(f"{YELLOW}{(_('new_version')).format(latest_version=latest_version, local_version=local_version)}{RESET}")
print(f"{YELLOW}{get_translation(LOCALE, 'changelog').format(VERSION_URL=VERSION_URL)}{RESET}") print(f"{YELLOW}{(_('changelog')).format(VERSION_URL=VERSION_URL)}{RESET}")
auto_update() auto_update()
elif local_version == latest_version: elif local_version == latest_version:
print(f"{GREEN}{get_translation(LOCALE, 'latest_version')} {local_version}{RESET}") print(f"{GREEN}{(_('latest_version'))} {local_version}{RESET}")
print(f"{get_translation(LOCALE, 'latest_version2').format(VERSION_URL=VERSION_URL)}\n\n") print(f"{(_('latest_version2')).format(VERSION_URL=VERSION_URL)}\n\n")
return latest_version return latest_version