import os import re import json import time import random import traceback import subprocess import tempfile import shutil import psutil import asyncio import platform import sys from typing import List, Dict, Set, Optional, Tuple, Any, Union, Callable, Coroutine, TypeVar, Type import logging from modules.globalvars import * from modules.prestartchecks import start_checks from modules.logger import GooberFormatter import logging logger = logging.getLogger("goober") logger.setLevel(logging.DEBUG) console_handler = logging.StreamHandler() console_handler.setLevel(logging.DEBUG) console_handler.setFormatter(GooberFormatter()) file_handler = logging.FileHandler("log.txt", mode="w+", encoding="UTF-8") file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(GooberFormatter(colors=False)) logger.addHandler(console_handler) logger.addHandler(file_handler) # Print splash text and check for updates print(splashtext) # Print splash text (from modules/globalvars.py) start_checks() import requests import discord from discord.ext import commands from discord import app_commands from discord import Colour, Embed, File, Interaction, Message from discord.abc import Messageable from discord.ext import commands from modules.volta.main import _, set_language from modules.markovmemory import * from modules.version import * from modules.sentenceprocessing import * from modules.unhandledexception import handle_exception from modules.image import gen_meme sys.excepthook = handle_exception check_for_update() T = TypeVar('T') MessageContext = Union[commands.Context, discord.Interaction] MessageReference = Union[Message, discord.WebhookMessage] # Constants with type hints positive_gifs: List[str] = os.getenv("POSITIVE_GIFS", "").split(',') currenthash: str = "" launched: bool = False slash_commands_enabled: bool = False # Load memory and Markov model for text generation memory: List[str] = load_memory() markov_model: Optional[markovify.Text] = load_markov_model() if not markov_model: logger.error(_('markov_model_not_found')) memory = load_memory() markov_model = train_markov_model(memory) generated_sentences: Set[str] = set() used_words: Set[str] = set() def get_git_remote_url(): try: url = subprocess.check_output( ["git", "config", "--get", "remote.origin.url"], text=True, stderr=subprocess.DEVNULL, ).strip() return url except subprocess.CalledProcessError: return "Unknown" async def load_cogs_from_folder(bot, folder_name="assets/cogs"): for filename in os.listdir(folder_name): if filename.endswith(".py") and not filename.startswith("_"): cog_name = filename[:-3] module_path = folder_name.replace("/", ".").replace("\\", ".") + f".{cog_name}" try: await bot.load_extension(module_path) logger.info(f"{(_('loaded_cog'))} {cog_name}") except Exception as e: logger.error(f"{(_('cog_fail'))} {cog_name} {e}") traceback.print_exc() # Event: Called when the bot is ready @bot.event async def on_ready() -> None: global launched global slash_commands_enabled global NAME global status folder_name: str = "cogs" if launched: return await load_cogs_from_folder(bot) try: synced: List[discord.app_commands.AppCommand] = await bot.tree.sync() logger.info(f"{_('synced_commands')} {len(synced)} {(_('synced_commands2'))}") slash_commands_enabled = True logger.info(f"{(_('started')).format(name=NAME)}") except discord.errors.Forbidden as perm_error: logger.error(f"Permission error while syncing commands: {perm_error}") logger.error("Make sure the bot has the 'applications.commands' scope and is invited with the correct permissions.") quit() except Exception as e: logger.error(f"{_('fail_commands_sync')} {e}") traceback.print_exc() quit() if not song: return status = { "idle": discord.Status.idle, "dnd": discord.Status.dnd, "invisible": discord.Status.invisible, "online": discord.Status.online }.get(status.lower(), discord.Status.online) await bot.change_presence(status=status, activity=discord.Activity(type=discord.ActivityType.listening, name=f"{song}")) launched = True @bot.event async def on_command_error(ctx: commands.Context, error: commands.CommandError) -> None: from modules.unhandledexception import handle_exception if isinstance(error, commands.CommandInvokeError): original: Exception = error.original handle_exception( type(original), original, original.__traceback__, context=f"Command: {ctx.command} | User: {ctx.author}" ) else: handle_exception( type(error), error, error.__traceback__, context=f"Command: {ctx.command} | User: {ctx.author}" ) @bot.hybrid_command(description=f"{(_('command_desc_retrain'))}") async def retrain(ctx: commands.Context) -> None: if ctx.author.id != ownerid: return global markov_model message_ref: MessageReference = await send_message(ctx, f"{(_('command_markov_retrain'))}") try: with open(MEMORY_FILE, 'r') as f: memory: List[str] = json.load(f) except FileNotFoundError: await send_message(ctx, f"{(_('command_markov_memory_not_found'))}") return except json.JSONDecodeError: await send_message(ctx, f"{(_('command_markov_memory_is_corrupt'))}") return data_size: int = len(memory) processed_data: int = 0 processing_message_ref: MessageReference = await send_message(ctx, f"{(_('command_markov_retraining')).format(processed_data=processed_data, data_size=data_size)}") markov_model = train_markov_model(memory) save_markov_model(markov_model) await send_message(ctx, f"{_('command_markov_retrain_successful').format(data_size=data_size)}", edit=True, message_reference=processing_message_ref) @bot.hybrid_command(description=f"{(_('command_desc_talk'))}") async def talk(ctx: commands.Context, sentence_size: int = 5) -> None: if not markov_model: await send_message(ctx, f"{(_('command_talk_insufficent_text'))}") return response = None for _ in range(20): if sentence_size == 1: sentence = markov_model.make_short_sentence(max_chars=100, tries=100) response = sentence.split()[0] if sentence else None else: response = markov_model.make_sentence(tries=100, max_words=sentence_size) if response and response not in generated_sentences: if sentence_size > 1: response = improve_sentence_coherence(response) generated_sentences.add(response) break else: await send_message(ctx, f"{(_('command_talk_generation_fail'))}") return cleaned = re.sub(r'[^\w\s]', '', response).lower() coherent = rephrase_for_coherence(cleaned) if random.random() < 0.9 and is_positive(coherent): gif_url = random.choice(positive_gifs) message = f"{coherent}\n[jif]({gif_url})" else: message = coherent logger.info(message) os.environ['gooberlatestgen'] = message await send_message(ctx, message) @bot.hybrid_command(description=f"RAM") async def ramusage(ctx): process = psutil.Process(os.getpid()) mem = process.memory_info().rss await send_message(ctx, f"{mem / 1024 / 1024:.2f} MB") @bot.hybrid_command(description=f"{(_('command_desc_help'))}") async def impact(ctx: commands.Context, text: Optional[str] = None) -> None: assets_folder = "assets/images" def get_random_asset_image() -> Optional[str]: images = [f for f in os.listdir(assets_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))] if not images: return None return os.path.join(assets_folder, random.choice(images)) temp_input = None input_path = None # Determine input image path if ctx.message.attachments: attachment = ctx.message.attachments[0] if attachment.content_type and attachment.content_type.startswith("image/"): ext = os.path.splitext(attachment.filename)[1] temp_input = f"tempy{ext}" await attachment.save(temp_input) input_path = temp_input else: fallback_image = get_random_asset_image() if fallback_image is None: await ctx.reply(_('no_image_available')) return temp_input = tempfile.mktemp(suffix=os.path.splitext(fallback_image)[1]) shutil.copy(fallback_image, temp_input) input_path = temp_input else: fallback_image = get_random_asset_image() if fallback_image is None: await ctx.reply(_('no_image_available')) return temp_input = tempfile.mktemp(suffix=os.path.splitext(fallback_image)[1]) shutil.copy(fallback_image, temp_input) input_path = temp_input # Generate meme image with one-shot text generation output_path = await gen_meme(input_path, custom_text=text) if output_path is None or not os.path.isfile(output_path): if temp_input and os.path.exists(temp_input): os.remove(temp_input) await ctx.reply(_('failed_generate_image')) return await ctx.send(file=discord.File(output_path)) if temp_input and os.path.exists(temp_input): os.remove(temp_input) bot.remove_command('help') # Command: Show help information @bot.hybrid_command(description=f"{(_('command_desc_help'))}") async def help(ctx: commands.Context) -> None: embed: discord.Embed = discord.Embed( title=f"{(_('command_help_embed_title'))}", description=f"{(_('command_help_embed_desc'))}", color=Colour(0x000000) ) command_categories: Dict[str, List[str]] = { f"{(_('command_help_categories_general'))}": ["mem", "talk", "about", "ping", "impact", "demotivator", "help"], f"{(_('command_help_categories_admin'))}": ["stats", "retrain", "setlanguage"] } custom_commands: List[str] = [] for cog_name, cog in bot.cogs.items(): for command in cog.get_commands(): if command.name not in command_categories[f"{(_('command_help_categories_general'))}"] and command.name not in command_categories[f"{(_('command_help_categories_admin'))}"]: custom_commands.append(command.name) if custom_commands: embed.add_field(name=f"{(_('command_help_categories_custom'))}", value="\n".join([f"{PREFIX}{command}" for command in custom_commands]), inline=False) for category, commands_list in command_categories.items(): commands_in_category: str = "\n".join([f"{PREFIX}{command}" for command in commands_list]) embed.add_field(name=category, value=commands_in_category, inline=False) await send_message(ctx, embed=embed) @bot.hybrid_command(description=f"{(_('command_desc_setlang'))}") @app_commands.describe(locale="Choose your language") async def setlanguage(ctx: commands.Context, locale: str) -> None: if ctx.author.id != ownerid: await ctx.send(":thumbsdown:") return await ctx.defer() set_language(locale) await ctx.send(":thumbsup:") @bot.event async def on_message(message: discord.Message) -> None: global memory, markov_model if message.author.bot: return if str(message.author.id) in BLACKLISTED_USERS: return if message.content.startswith((f"{PREFIX}talk", f"{PREFIX}mem", f"{PREFIX}help", f"{PREFIX}stats", f"{PREFIX}")): logger.info(f"{(_('command_ran')).format(message=message)}") await bot.process_commands(message) return if message.content: if not USERTRAIN_ENABLED: return formatted_message: str = message.content cleaned_message: str = formatted_message save_memory(memory) await bot.process_commands(message) @bot.event async def on_interaction(interaction: discord.Interaction) -> None: name = None if interaction.data.get('name') is None: name = "Unknown" else: name = interaction.data['name'] logger.info(f"{(_('command_ran_s')).format(interaction=interaction)}{name}") @bot.check async def block_blacklisted(ctx: commands.Context) -> bool: if str(ctx.author.id) in BLACKLISTED_USERS: try: if isinstance(ctx, discord.Interaction): if not ctx.response.is_done(): await ctx.response.send_message(_('blacklisted'), ephemeral=True) else: await ctx.followup.send(_('blacklisted'), ephemeral=True) else: await ctx.send(_('blacklisted_user'), ephemeral=True) except: pass return False return True @bot.hybrid_command(description=f"{(_('command_desc_ping'))}") async def ping(ctx: commands.Context) -> None: await ctx.defer() latency: int = round(bot.latency * 1000) LOLembed: discord.Embed = discord.Embed( title="Pong!!", description=( f"{PING_LINE}\n" f"`{(_('command_ping_embed_desc'))}: {latency}ms`\n" ), color=Colour(0x000000) ) LOLembed.set_footer(text=f"{(_('command_ping_footer'))} {ctx.author.name}", icon_url=ctx.author.avatar.url) await ctx.send(embed=LOLembed) # Command: Show about information @bot.hybrid_command(description=f"{(_('command_about_desc'))}") async def about(ctx: commands.Context) -> None: print("-----------------------------------\n\n") latest_version: str = check_for_update() print("-----------------------------------") embed: discord.Embed = discord.Embed(title=f"{(_('command_about_embed_title'))}", description="", color=Colour(0x000000)) embed.add_field(name=f"{(_('command_about_embed_field1'))}", value=f"{NAME}", inline=False) embed.add_field(name=f"{(_('command_about_embed_field2name'))}", value=f"{(_('command_about_embed_field2value')).format(local_version=local_version, latest_version=latest_version)}", inline=False) embed.add_field(name=f"Git", value=get_git_remote_url()) embed.add_field(name=f"OS", value=platform.platform()) await send_message(ctx, embed=embed) # Command: Show bot statistics (admin only) @bot.hybrid_command(description="stats") async def stats(ctx: commands.Context) -> None: if ctx.author.id != ownerid: return print("-----------------------------------\n\n") latest_version: str = check_for_update() print("-----------------------------------") memory_file: str = 'memory.json' file_size: int = os.path.getsize(memory_file) with open(memory_file, 'r') as file: line_count: int = sum(1 for _ in file) embed: discord.Embed = discord.Embed(title=f"{(_('command_stats_embed_title'))}", description=f"{(_('command_stats_embed_desc'))}", color=Colour(0x000000)) embed.add_field(name=f"{(_('command_stats_embed_field1name'))}", value=f"{(_('command_stats_embed_field1value')).format(file_size=file_size, line_count=line_count)}", inline=False) embed.add_field(name=f"{(_('command_stats_embed_field2name'))}", value=f"{(_('command_stats_embed_field2value')).format(local_version=local_version, latest_version=latest_version)}", inline=False) embed.add_field(name=f"{(_('command_stats_embed_field3name'))}", value=f"{(_('command_stats_embed_field3value')).format(NAME=NAME, PREFIX=PREFIX, ownerid=ownerid, PING_LINE=PING_LINE, showmemenabled=showmemenabled, USERTRAIN_ENABLED=USERTRAIN_ENABLED, song=song, splashtext=splashtext)}", inline=False) embed.add_field(name=f"OS", value=platform.platform()) embed.add_field(name="Python Version", value=platform.python_version()) await send_message(ctx, embed=embed) # Command: Upload memory.json to litterbox.catbox.moe and return the link @bot.hybrid_command() async def mem(ctx: commands.Context) -> None: if showmemenabled != "true": return command: str = """curl -F "reqtype=fileupload" -F "time=1h" -F "fileToUpload=@memory.json" https://litterbox.catbox.moe/resources/internals/api.php""" memorylitter: subprocess.CompletedProcess = subprocess.run(command, shell=True, capture_output=True, text=True) logger.debug(memorylitter) await send_message(ctx, memorylitter.stdout.strip()) # Helper: Improve sentence coherence (simple capitalization fix) def improve_sentence_coherence(sentence: str) -> str: # Capitalizes "i" to "I" in the sentence sentence = sentence.replace(" i ", " I ") return sentence # Start the bot bot.run(TOKEN)