diff --git a/.gitignore b/.gitignore index 704d466..8e0b517 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,6 @@ received_memory.json translation_report.txt translationcompleteness.py modules/volta -log.txt \ No newline at end of file +log.txt +settings/settings.json +settings/splash.txt diff --git a/README.md b/README.md index c1e9957..a4fa2ce 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ knockoff of genai basically :p - +THIS!! IS THE ACTUAL REPO!!!! NOT THE OTHER ONE!!! THIS ONE!!! Special thanks to [Charlie's Computers](https://github.com/PowerPCFan) for being the only one I know of that's hosting Goober 24/7 diff --git a/assets/cogs/README.md b/assets/cogs/README.md deleted file mode 100644 index 8680835..0000000 --- a/assets/cogs/README.md +++ /dev/null @@ -1,25 +0,0 @@ -# goobers custom commands -[Hello World!](https://github.com/WhatDidYouExpect/goober/blob/main/cogs/hello.py) -by expect - -[WhoAmI (lists username and nickname)](https://github.com/WhatDidYouExpect/goober/blob/main/cogs/whoami.py) -by PowerPCFan - -[Cog Manager](https://github.com/WhatDidYouExpect/goober/blob/main/cogs/cogmanager.py) -by expect - -[Web Scraper](https://raw.githubusercontent.com/WhatDidYouExpect/goober/refs/heads/main/cogs/webscraper.py) -by expect (requires goober version 0.11.7.2 or higher) - -[Status Changer](https://raw.githubusercontent.com/WhatDidYouExpect/goober/refs/heads/main/cogs/songchanger.py) -by expect (requires goober version 0.11.8 or higher) - -[Status Changer](https://raw.githubusercontent.com/WhatDidYouExpect/goober/refs/heads/main/cogs/songchanger.py) -by expect (requires goober version 0.11.8 or higher) - -[webUI](https://raw.githubusercontent.com/WhatDidYouExpect/goober/refs/heads/main/cogs/webserver.py) -by expect (requires goober version 0.11.8 or higher) - -[LastFM](https://raw.githubusercontent.com/WhatDidYouExpect/goober/refs/heads/main/cogs/webserver.py) -by expect (no idea what version it needs i've only tried it on 1.0.3) -- you have to add LASTFM_USERNAME and LASTFM_API_KEY to your .env \ No newline at end of file diff --git a/assets/cogs/filesharing.py b/assets/cogs/filesharing.py index ad6ac98..7229673 100644 --- a/assets/cogs/filesharing.py +++ b/assets/cogs/filesharing.py @@ -1,20 +1,17 @@ import discord from discord.ext import commands -from modules.globalvars import ownerid +from modules.permission import requires_admin class FileSync(commands.Cog): def __init__(self, bot): self.bot = bot self.mode = None self.peer_id = None self.awaiting_file = False - + @requires_admin() @commands.command() async def syncfile(self, ctx, mode: str, peer: discord.User): self.mode = mode.lower() self.peer_id = peer.id - if ctx.author.id != ownerid: - await ctx.send("You don't have permission to execute this command.") - return if self.mode == "s": await ctx.send(f"<@{self.peer_id}> FILE_TRANSFER_REQUEST") await ctx.send(file=discord.File("memory.json")) diff --git a/assets/cogs/fuckup.py b/assets/cogs/fuckup.py deleted file mode 100644 index 1dbc4d0..0000000 --- a/assets/cogs/fuckup.py +++ /dev/null @@ -1,98 +0,0 @@ -import discord -from discord.ext import commands -from modules.image import * -from modules.volta.main import _ -from PIL import Image, ImageEnhance, ImageFilter, ImageOps, ImageChops, ImageColor -import os, random, shutil, tempfile - -async def deepfryimage(path): - with Image.open(path).convert("RGB") as im: - # make it burn - for _ in range(3): - im = im.resize((int(im.width * 0.7), int(im.height * 0.7))) - im = im.resize((int(im.width * 1.5), int(im.height * 1.5))) - im = ImageEnhance.Contrast(im).enhance(random.uniform(5, 10)) - im = ImageEnhance.Sharpness(im).enhance(random.uniform(10, 50)) - im = ImageEnhance.Brightness(im).enhance(random.uniform(1.5, 3)) - r, g, b = im.split() - r = r.point(lambda i: min(255, i * random.uniform(1.2, 2.0))) - g = g.point(lambda i: min(255, i * random.uniform(0.5, 1.5))) - b = b.point(lambda i: min(255, i * random.uniform(0.5, 2.0))) - channels = [r, g, b] - random.shuffle(channels) - im = Image.merge("RGB", tuple(channels)) - overlay_color = tuple(random.randint(0, 255) for _ in range(3)) - overlay = Image.new("RGB", im.size, overlay_color) - im = ImageChops.add(im, overlay, scale=2.0, offset=random.randint(-64, 64)) - - im = im.filter(ImageFilter.EDGE_ENHANCE_MORE) - im = im.filter(ImageFilter.GaussianBlur(radius=random.uniform(0.5, 2))) - for _ in range(3): - tmp_path = tempfile.mktemp(suffix=".jpg") - im.save(tmp_path, format="JPEG", quality=random.randint(5, 15)) - im = Image.open(tmp_path) - if random.random() < 0.3: - im = ImageOps.posterize(im, bits=random.choice([2, 3, 4])) - if random.random() < 0.2: - im = ImageOps.invert(im) - out_path = tempfile.mktemp(suffix=".jpg") - im.save(out_path, format="JPEG", quality=5) - return out_path - - -class whami(commands.Cog): - def __init__(self, bot): - self.bot = bot - - - @commands.command() - async def fuckup(self, ctx): - assets_folder = "assets/images" - temp_input = None - - def get_random_asset_image(): - files = [f for f in os.listdir(assets_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))] - if not files: - return None - return os.path.join(assets_folder, random.choice(files)) - - if ctx.message.attachments: - attachment = ctx.message.attachments[0] - if attachment.content_type and attachment.content_type.startswith("image/"): - ext = os.path.splitext(attachment.filename)[1] - temp_input = f"tempy{ext}" - await attachment.save(temp_input) - input_path = temp_input - else: - fallback_image = get_random_asset_image() - if fallback_image is None: - await ctx.reply(_('no_image_available')) - return - temp_input = tempfile.mktemp(suffix=os.path.splitext(fallback_image)[1]) - shutil.copy(fallback_image, temp_input) - input_path = temp_input - else: - fallback_image = get_random_asset_image() - if fallback_image is None: - await ctx.reply(_('no_image_available')) - return - temp_input = tempfile.mktemp(suffix=os.path.splitext(fallback_image)[1]) - shutil.copy(fallback_image, temp_input) - input_path = temp_input - - output_path = await gen_meme(input_path) - - if output_path is None or not os.path.isfile(output_path): - if temp_input and os.path.exists(temp_input): - os.remove(temp_input) - await ctx.reply(_('failed_generate_image')) - return - - deepfried_path = await deepfryimage(output_path) - await ctx.send(file=discord.File(deepfried_path)) - - if temp_input and os.path.exists(temp_input): - os.remove(temp_input) - -async def setup(bot): - await bot.add_cog(whami(bot)) diff --git a/assets/cogs/internal/base_commands.py b/assets/cogs/internal/base_commands.py new file mode 100644 index 0000000..ea1aaf8 --- /dev/null +++ b/assets/cogs/internal/base_commands.py @@ -0,0 +1,196 @@ +import os +import platform +import subprocess +from typing import Dict, List +import discord +from discord import Colour +from discord.ext import commands +import discord.ext +import discord.ext.commands +from modules.globalvars import local_version +from modules.volta.main import _ , set_language +from modules.permission import requires_admin +from modules.sentenceprocessing import send_message +from modules.settings import instance as settings_manager +from modules.version import check_for_update + +import requests + +settings = settings_manager.settings + +def get_git_origin_raw(): + try: + result = subprocess.run( + ["git", "config", "--get", "remote.origin.url"], + capture_output=True, text=True, check=True + ) + return result.stdout.strip() + except Exception: + return "Failed to get git origin" + +class BaseCommands(commands.Cog): + def __init__(self, bot): + self.bot: discord.ext.commands.Bot = bot + + def get_git_origin_raw(): + try: + result = subprocess.run( + ["git", "config", "--get", "remote.origin.url"], + capture_output=True, text=True, check=True + ) + return result.stdout.strip() + except Exception: + return "https://forgejo.expect.ovh/gooberinc/goober" # fallback if git fails + + @commands.command() + async def help(self, ctx: commands.Context) -> None: + embed: discord.Embed = discord.Embed( + title=f"{_('command_help_embed_title')}", + description=f"{_('command_help_embed_desc')}", + color=discord.Colour(0x000000), + ) + + command_categories = { + f"{_('command_help_categories_general')}": [ + "mem", + "talk", + "about", + "ping", + "impact", + "demotivator", + "help", + ], + f"{_('command_help_categories_admin')}": ["stats", "retrain", "setlanguage"], + } + + custom_commands: List[str] = [] + for cog_name, cog in self.bot.cogs.items(): + for command in cog.get_commands(): + if ( + command.name + not in command_categories[f"{_('command_help_categories_general')}"] + and command.name + not in command_categories[f"{_('command_help_categories_admin')}"] + ): + custom_commands.append(command.name) + + if custom_commands: + embed.add_field( + name=_('command_help_categories_custom'), + value="\n".join( + [ + f"{settings['bot']['prefix']}{command}" + for command in custom_commands + ] + ), + inline=False, + ) + + for category, commands_list in command_categories.items(): + commands_in_category: str = "\n".join( + [f"{settings['bot']['prefix']}{command}" for command in commands_list] + ) + embed.add_field(name=category, value=commands_in_category, inline=False) + + + await send_message(ctx, embed=embed) + + @requires_admin() + @commands.command() + async def setlanguage(self, ctx: commands.Context, locale: str) -> None: + await ctx.defer() + set_language(locale) + await ctx.send(":thumbsup:") + + @commands.command() + async def ping(self, ctx: commands.Context) -> None: + await ctx.defer() + latency: int = round(self.bot.latency * 1000) + + embed: discord.Embed = discord.Embed( + title="Pong!!", + description=( + settings["bot"]["misc"]["ping_line"], + f"`{_('command_ping_embed_desc')}: {latency}ms`\n", + ), + color=discord.Colour(0x000000), + ) + embed.set_footer( + text=f"{_('command_ping_footer')} {ctx.author.name}", + icon_url=ctx.author.display_avatar.url, + ) + + await ctx.send(embed=embed) + + @commands.command() + async def about(self, ctx: commands.Context) -> None: + latest_version: str = check_for_update(slient=True) + embed: discord.Embed = discord.Embed(title=f"{(_('command_about_embed_title'))}", description="", color=Colour(0x000000)) + embed.add_field(name=f"{(_('command_about_embed_field1'))}", value=f"{settings['name']}", inline=False) + embed.add_field(name=f"{(_('command_about_embed_field2name'))}", value=f"{(_('command_about_embed_field2value')).format(local_version=local_version, latest_version=latest_version)}", inline=False) + embed.add_field(name=f"Git", value=get_git_origin_raw()) + embed.add_field(name=f"OS", value=platform.platform()) + + await send_message(ctx, embed=embed) + + @commands.command() + async def stats(self, ctx: commands.Context) -> None: + memory_file: str = "memory.json" + file_size: int = os.path.getsize(memory_file) + + with open(memory_file, "r") as file: + line_count: int = sum(1 for _ in file) + + embed: discord.Embed = discord.Embed( + title=f"{_('command_stats_embed_title')}", + description=f"{_('command_stats_embed_desc')}", + color=discord.Colour(0x000000), + ) + embed.add_field( + name=f"{_('command_stats_embed_field1name')}", + value=f"placeholder", + inline=False, + ) + + with open(settings["splash_text_loc"], "r") as f: + splash_text = "".join(f.readlines()) + + embed.add_field( + name=_('command_stats_embed_field3name'), + value=_('command_stats_embed_field3value').format( + NAME=settings["name"], + PREFIX=settings["bot"]["prefix"], + ownerid=settings["bot"]["owner_ids"][0], + PING_LINE=settings["bot"]["misc"]["ping_line"], + showmemenabled=settings["bot"]["allow_show_mem_command"], + USERTRAIN_ENABLED=settings["bot"]["user_training"], + song=settings["bot"]["misc"]["active_song"], + splashtext=splash_text + ), + inline=False, + ) + + + await send_message(ctx, embed=embed) + + @commands.command() + async def mem(self, ctx: commands.Context) -> None: + if not settings["bot"]["allow_show_mem_command"]: + return + + with open(settings["bot"]["active_memory"], "rb") as f: + data: bytes = f.read() + + response = requests.post( + "https://litterbox.catbox.moe/resources/internals/api.php", + data={"reqtype": "fileupload", "time": "1h"}, + files={"fileToUpload": data}, + ) + + await send_message(ctx, response.text) + + +async def setup(bot: discord.ext.commands.Bot): + print("Setting up base_commands") + bot.remove_command("help") + await bot.add_cog(BaseCommands(bot)) \ No newline at end of file diff --git a/assets/cogs/cogmanager.py b/assets/cogs/internal/cogmanager.py similarity index 82% rename from assets/cogs/cogmanager.py rename to assets/cogs/internal/cogmanager.py index 017f021..1843eab 100644 --- a/assets/cogs/cogmanager.py +++ b/assets/cogs/internal/cogmanager.py @@ -1,18 +1,15 @@ import discord from discord.ext import commands -from modules.globalvars import ownerid +from modules.permission import requires_admin COG_PREFIX = "assets.cogs." class CogManager(commands.Cog): def __init__(self, bot): self.bot = bot - + @requires_admin() @commands.command() async def load(self, ctx, cog_name: str = None): - if ctx.author.id != ownerid: - await ctx.send("You do not have permission to use this command.") - return if cog_name is None: await ctx.send("Please provide the cog name to load.") return @@ -21,12 +18,9 @@ class CogManager(commands.Cog): await ctx.send(f"Loaded cog `{cog_name}` successfully.") except Exception as e: await ctx.send(f"Error loading cog `{cog_name}`: {e}") - + @requires_admin() @commands.command() async def unload(self, ctx, cog_name: str = None): - if ctx.author.id != ownerid: - await ctx.send("You do not have permission to use this command.") - return if cog_name is None: await ctx.send("Please provide the cog name to unload.") return @@ -35,12 +29,9 @@ class CogManager(commands.Cog): await ctx.send(f"Unloaded cog `{cog_name}` successfully.") except Exception as e: await ctx.send(f"Error unloading cog `{cog_name}`: {e}") - + @requires_admin() @commands.command() async def reload(self, ctx, cog_name: str = None): - if ctx.author.id != ownerid: - await ctx.send("You do not have permission to use this command.") - return if cog_name is None: await ctx.send("Please provide the cog name to reload.") return diff --git a/assets/cogs/internal/markov.py b/assets/cogs/internal/markov.py new file mode 100644 index 0000000..274c8d9 --- /dev/null +++ b/assets/cogs/internal/markov.py @@ -0,0 +1,112 @@ +import os +import random +import re +import discord +from discord.ext import commands + +import discord.ext +import discord.ext.commands + +from modules.markovmemory import save_markov_model, train_markov_model, load_markov_model +from modules.permission import requires_admin +from modules.sentenceprocessing import ( + improve_sentence_coherence, + is_positive, + rephrase_for_coherence, + send_message, +) +from modules.volta.main import _ +import logging +from typing import List, Optional, Set +import json +import time +import markovify + + +logger = logging.getLogger("goober") +from modules.settings import instance as settings_manager + +settings = settings_manager.settings + + +class Markov(commands.Cog): + def __init__(self, bot): + self.bot: discord.ext.commands.Bot = bot + + + @requires_admin() + @commands.command() + async def retrain(self, ctx: discord.ext.commands.Context): + markov_model: Optional[markovify.Text] = load_markov_model() + message_ref: discord.Message | None = await send_message( + ctx, f"{_('command_markov_retrain')}" + ) + + if message_ref is None: + logger.error("Failed to send message!") + return + + try: + with open(settings["bot"]["active_memory"], "r") as f: + memory: List[str] = json.load(f) + except FileNotFoundError: + await send_message(ctx, f"{_('command_markov_memory_not_found')}") + return + except json.JSONDecodeError: + await send_message(ctx, f"{_('command_markov_memory_is_corrupt')}") + return + + data_size: int = len(memory) + + processing_message_ref: discord.Message | None = await send_message( + ctx, f"{(_('command_markov_retraining').format(data_size=data_size))}" + ) + if processing_message_ref is None: + logger.error("Couldnt find message processing message!") + + start_time: float = time.time() + + model = train_markov_model(memory) + if not model: + logger.error("Failed to train markov model") + await ctx.send("Failed to retrain!") + return False + + markov_model = model + save_markov_model(markov_model) + + logger.debug(f"Completed retraining in {round(time.time() - start_time,3)}s") + + await send_message( + ctx, + _('command_markov_retraining').format(data_size=data_size), + edit=True, + message_reference=processing_message_ref, + ) + + + @commands.command() + async def talk(self, ctx: commands.Context, sentence_size: int = 5) -> None: + markov_model: Optional[markovify.Text] = load_markov_model() + if markov_model is None: + await send_message(ctx, _("command_markovcommand_talk_insufficent_text")) + return + + raw_sentence = None + if sentence_size == 1: + raw_sentence = markov_model.make_short_sentence(max_chars=100, tries=100) + else: + raw_sentence = markov_model.make_sentence(tries=100, max_words=sentence_size) + print(raw_sentence) + + if random.random() < 0.9 and is_positive(raw_sentence): + gif_url = random.choice(settings["bot"]["misc"]["positive_gifs"]) + raw_sentence = f"{raw_sentence}\n[jif]({gif_url})" + + os.environ["gooberlatestgen"] = raw_sentence + await send_message(ctx, raw_sentence) + + + +async def setup(bot): + await bot.add_cog(Markov(bot)) \ No newline at end of file diff --git a/assets/cogs/internal/permission.py b/assets/cogs/internal/permission.py new file mode 100644 index 0000000..cf03af7 --- /dev/null +++ b/assets/cogs/internal/permission.py @@ -0,0 +1,89 @@ +import discord +from discord.ext import commands + +from modules.permission import requires_admin +from modules.settings import instance as settings_manager + +settings = settings_manager.settings + + +class PermissionManager(commands.Cog): + def __init__(self, bot): + self.bot = bot + + @requires_admin() + @commands.command() + async def add_owner(self, ctx: commands.Context, member: discord.Member): + settings["bot"]["owner_ids"].append(member.id) + settings_manager.commit() + embed = discord.Embed( + title="Permissions", + description=f"Set {member.name} as an owner", + color=discord.Color.blue(), + ) + + await ctx.send(embed=embed) + + @requires_admin() + @commands.command() + async def remove_owner(self, ctx: commands.Context, member: discord.Member): + try: + settings["bot"]["owner_ids"].remove(member.id) + settings_manager.commit() + except ValueError: + await ctx.send("User is not an owner!") + return + + embed = discord.Embed( + title="Permissions", + description=f"Removed {member.name} from being an owner", + color=discord.Color.blue(), + ) + + await ctx.send(embed=embed) + + @requires_admin() + @commands.command() + async def blacklist_user(self, ctx: commands.Context, member: discord.Member): + settings["bot"]["blacklisted_users"].append(member.id) + settings_manager.add_admin_log_event( + { + "action": "add", + "author": ctx.author.id, + "change": "blacklisted_users", + "messageId": ctx.message.id, + "target": member.id, + } + ) + settings_manager.commit() + + embed = discord.Embed( + title="Blacklist", + description=f"Added {member.name} to the blacklist", + color=discord.Color.blue(), + ) + + await ctx.send(embed=embed) + + @requires_admin() + @commands.command() + async def unblacklist_user(self, ctx: commands.Context, member: discord.Member): + try: + settings["bot"]["blacklisted_users"].remove(member.id) + settings_manager.commit() + + except ValueError: + await ctx.send("User is not on the blacklist!") + return + + embed = discord.Embed( + title="Blacklist", + description=f"Removed {member.name} from blacklist", + color=discord.Color.blue(), + ) + + await ctx.send(embed=embed) + + +async def setup(bot): + await bot.add_cog(PermissionManager(bot)) \ No newline at end of file diff --git a/assets/cogs/lastfm.py.disabled b/assets/cogs/lastfm.py.disabled deleted file mode 100644 index 822305a..0000000 --- a/assets/cogs/lastfm.py.disabled +++ /dev/null @@ -1,84 +0,0 @@ -import os -import discord -from discord.ext import commands, tasks -import aiohttp -from dotenv import load_dotenv - -load_dotenv() - -#stole most of this code from my old expect bot so dont be suprised if its poorly made - -LASTFM_API_KEY = os.getenv("LASTFM_API_KEY") -LASTFM_USERNAME = os.getenv("LASTFM_USERNAME") - -class LastFmCog(commands.Cog): - def __init__(self, bot): - self.bot = bot - self.current_track = None - self.update_presence_task = None - self.ready = False - bot.loop.create_task(self.wait_until_ready()) - - async def wait_until_ready(self): - await self.bot.wait_until_ready() - self.ready = True - self.update_presence.start() - - @tasks.loop(seconds=60) - async def update_presence(self): - print("Looped!") - if not self.ready: - return - track = await self.fetch_current_track() - if track and track != self.current_track: - self.current_track = track - artist, song = track - activity_name = f"{artist} - {song}" - await self.bot.change_presence(activity=discord.Activity(type=discord.ActivityType.listening, name=activity_name)) - print(f"Updated song to {artist} - {song}") - else: - print("LastFM gave me the same track! not updating...") - - @update_presence.before_loop - async def before_update_presence(self): - await self.bot.wait_until_ready() - - @commands.command(name="lastfm") - async def lastfm_command(self, ctx): - track = await self.fetch_current_track() - if not track: - await ctx.send("No track currently playing or could not fetch data") - return - self.current_track = track - artist, song = track - activity_name = f"{artist} - {song}" - await self.bot.change_presence(activity=discord.Activity(type=discord.ActivityType.listening, name=activity_name)) - await ctx.send(f"Updated presence to: Listening to {activity_name}") - - async def fetch_current_track(self): - url = ( - f"http://ws.audioscrobbler.com/2.0/?method=user.getrecenttracks" - f"&user={LASTFM_USERNAME}&api_key={LASTFM_API_KEY}&format=json&limit=1" - ) - async with aiohttp.ClientSession() as session: - async with session.get(url) as resp: - if resp.status != 200: - return None - data = await resp.json() - - recenttracks = data.get("recenttracks", {}).get("track", []) - if not recenttracks: - return None - - track = recenttracks[0] - if '@attr' in track and track['@attr'].get('nowplaying') == 'true': - artist = track.get('artist', {}).get('#text', 'Unknown Artist') - song = track.get('name', 'Unknown Song') - return artist, song - return None - -async def setup(bot): - if not LASTFM_API_KEY or not LASTFM_USERNAME: - return - else: - await bot.add_cog(LastFmCog(bot)) diff --git a/assets/cogs/songchanger.py b/assets/cogs/songchanger.py deleted file mode 100644 index 36fde47..0000000 --- a/assets/cogs/songchanger.py +++ /dev/null @@ -1,33 +0,0 @@ -import discord -from discord.ext import commands -from modules.globalvars import RED, GREEN, RESET, LOCAL_VERSION_FILE -import os - -class songchange(commands.Cog): - def __init__(self, bot): - self.bot = bot - - def get_local_version(): - if os.path.exists(LOCAL_VERSION_FILE): - with open(LOCAL_VERSION_FILE, "r") as f: - return f.read().strip() - return "0.0.0" - - global local_version - local_version = get_local_version() - - @commands.command() - async def changesong(self, ctx): - if LOCAL_VERSION_FILE > "0.11.8": - await ctx.send(f"Goober is too old! you must have version 0.11.8 you have {local_version}") - return - await ctx.send("Check the terminal! (this does not persist across restarts)") - song = input("\nEnter a song:\n") - try: - await self.bot.change_presence(activity=discord.Activity(type=discord.ActivityType.listening, name=f"{song}")) - print(f"{GREEN}Changed song to {song}{RESET}") - except Exception as e: - print(f"{RED}An error occurred while changing songs..: {str(e)}{RESET}") - -async def setup(bot): - await bot.add_cog(songchange(bot)) diff --git a/assets/cogs/webscraper.py.disabled b/assets/cogs/webscraper.py.disabled deleted file mode 100644 index 351e2e8..0000000 --- a/assets/cogs/webscraper.py.disabled +++ /dev/null @@ -1,113 +0,0 @@ -import discord -from discord.ext import commands -import aiohttp -from bs4 import BeautifulSoup -import json -import asyncio -from urllib.parse import urljoin -from modules.globalvars import ownerid -class WebScraper(commands.Cog): - def __init__(self, bot): - self.bot = bot - self.visited_urls = set() - - async def fetch(self, session, url): - """Fetch the HTML content of a URL.""" - try: - async with session.get(url, timeout=10) as response: - return await response.text() - except Exception as e: - print(f"Failed to fetch {url}: {e}") - return None - - def extract_sentences(self, text): - """Extract sentences from text.""" - sentences = text.split('.') - return [sentence.strip() for sentence in sentences if sentence.strip()] - - def save_to_json(self, sentences): - """Save sentences to memory.json.""" - try: - try: - with open("memory.json", "r") as file: - data = json.load(file) - except (FileNotFoundError, json.JSONDecodeError): - data = [] - data.extend(sentences) - with open("memory.json", "w") as file: - json.dump(data, file, indent=4) - except Exception as e: - print(f"Failed to save to JSON: {e}") - - def undo_last_scrape(self): - """Undo the last scrape by removing the most recent sentences.""" - try: - with open("memory.json", "r") as file: - data = json.load(file) - - if not data: - print("No data to undo.") - return False - - - data = data[:-1] - - with open("memory.json", "w") as file: - json.dump(data, file, indent=4) - - return True - except (FileNotFoundError, json.JSONDecodeError): - print("No data to undo or failed to load JSON.") - return False - except Exception as e: - print(f"Failed to undo last scrape: {e}") - return False - - async def scrape_links(self, session, url, depth=2): - print(f"Scraping: {url}") - self.visited_urls.add(url) - - html = await self.fetch(session, url) - if not html: - return - - soup = BeautifulSoup(html, "html.parser") - - for paragraph in soup.find_all('p'): - sentences = self.extract_sentences(paragraph.get_text()) - self.save_to_json(sentences) - - - @commands.command() - async def start_scrape(self, ctx, start_url: str): - """Command to start the scraping process.""" - if ctx.author.id != ownerid: - await ctx.send("You do not have permission to use this command.") - return - - if not start_url.startswith("http"): - await ctx.send("Please provide a valid URL.") - return - - await ctx.send(f"Starting scrape from {start_url}... This may take a while!") - - async with aiohttp.ClientSession() as session: - await self.scrape_links(session, start_url) - - await ctx.send("Scraping complete! Sentences saved to memory.json.") - - @commands.command() - async def undo_scrape(self, ctx): - """Command to undo the last scrape.""" - if ctx.author.id != ownerid: - await ctx.send("You do not have permission to use this command.") - return - - success = self.undo_last_scrape() - if success: - await ctx.send("Last scrape undone successfully.") - else: - await ctx.send("No data to undo or an error occurred.") - -async def setup(bot): - await bot.add_cog(WebScraper(bot)) diff --git a/assets/cogs/webserver.py b/assets/cogs/webserver.py deleted file mode 100644 index 110c20d..0000000 --- a/assets/cogs/webserver.py +++ /dev/null @@ -1,880 +0,0 @@ -import discord -from discord.ext import commands, tasks -import asyncio -from aiohttp import web -import psutil -import os -import json -from datetime import datetime -import time -import aiohttp -import re -from aiohttp import WSMsgType -from modules.globalvars import VERSION_URL -import sys -import subprocess - -class GooberWeb(commands.Cog): - def __init__(self, bot): - self.bot = bot - self.app = web.Application() - self.runner = None - self.site = None - self.last_command = "No commands executed yet" - self.last_command_time = "Never" - self.start_time = time.time() - self.websockets = set() - - self.app.add_routes([ - web.get('/', self.handle_index), - web.get('/changesong', self.handle_changesong), - web.get('/stats', self.handle_stats), - web.get('/data', self.handle_json_data), - web.get('/ws', self.handle_websocket), - web.get('/styles.css', self.handle_css), - web.get('/settings', self.handle_settings), - web.post('/update_settings', self.handle_update_settings), - web.post('/restart_bot', self.handle_restart_bot), - ]) - - self.bot.loop.create_task(self.start_web_server()) - self.update_clients.start() - - async def restart_bot(self): - await asyncio.sleep(1) - python = sys.executable - os.execl(python, python, *sys.argv) - - async def handle_restart_bot(self, request): - asyncio.create_task(self.restart_bot()) - return web.Response(text="Bot is restarting...") - - async def get_blacklisted_users(self): - blacklisted_ids = os.getenv("BLACKLISTED_USERS", "").split(",") - blacklisted_users = [] - - for user_id in blacklisted_ids: - if not user_id.strip(): - continue - - try: - user = await self.bot.fetch_user(int(user_id)) - blacklisted_users.append({ - "name": f"{user.name}", - "avatar_url": str(user.avatar.url) if user.avatar else str(user.default_avatar.url), - "id": user.id - }) - except discord.NotFound: - blacklisted_users.append({ - "name": f"Unknown User ({user_id})", - "avatar_url": "", - "id": user_id - }) - except discord.HTTPException as e: - print(f"Error fetching user {user_id}: {e}") - continue - - return blacklisted_users - - async def get_enhanced_guild_info(self): - guilds = sorted(self.bot.guilds, key=lambda g: g.member_count, reverse=True) - guild_info = [] - - for guild in guilds: - icon_url = str(guild.icon.url) if guild.icon else "" - guild_info.append({ - "name": guild.name, - "member_count": guild.member_count, - "icon_url": icon_url, - "id": guild.id - }) - - return guild_info - - async def start_web_server(self): - self.runner = web.AppRunner(self.app) - await self.runner.setup() - self.site = web.TCPSite(self.runner, '0.0.0.0', 8080) - await self.site.start() - print("Goober web server started on port 8080") - - async def stop_web_server(self): - await self.site.stop() - await self.runner.cleanup() - print("Web server stopped") - - def cog_unload(self): - self.update_clients.cancel() - self.bot.loop.create_task(self.stop_web_server()) - - @tasks.loop(seconds=5) - async def update_clients(self): - if not self.websockets: - return - - stats = await self.get_bot_stats() - message = json.dumps(stats) - - for ws in set(self.websockets): - try: - await ws.send_str(message) - except ConnectionResetError: - self.websockets.remove(ws) - except Exception as e: - print(f"Error sending to websocket: {e}") - self.websockets.remove(ws) - - async def handle_websocket(self, request): - ws = web.WebSocketResponse() - await ws.prepare(request) - self.websockets.add(ws) - - try: - async for msg in ws: - if msg.type == WSMsgType.ERROR: - print(f"WebSocket error: {ws.exception()}") - finally: - self.websockets.remove(ws) - - return ws - - async def handle_css(self, request): - css_path = os.path.join(os.path.dirname(__file__), 'styles.css') - if os.path.exists(css_path): - return web.FileResponse(css_path) - return web.Response(text="CSS file not found", status=404) - - @commands.Cog.listener() - async def on_message(self, message): - if message.author.bot: - return - - ctx = await self.bot.get_context(message) - if ctx.valid and ctx.command: - self._update_command_stats(ctx.command.name, ctx.author) - - @commands.Cog.listener() - async def on_app_command_completion(self, interaction, command): - self._update_command_stats(command.name, interaction.user) - - def _update_command_stats(self, command_name, user): - self.last_command = f"{command_name} (by {user.name})" - self.last_command_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - if self.websockets: - asyncio.create_task(self.update_clients()) - - async def get_bot_stats(self): - process = psutil.Process(os.getpid()) - mem_info = process.memory_full_info() - cpu_percent = psutil.cpu_percent() - process_cpu = process.cpu_percent() - - memory_json_size = "N/A" - if os.path.exists("memory.json"): - memory_json_size = f"{os.path.getsize('memory.json') / 1024:.2f} KB" - - guild_info = await self.get_enhanced_guild_info() - blacklisted_users = await self.get_blacklisted_users() - - uptime_seconds = int(time.time() - self.start_time) - uptime_str = f"{uptime_seconds // 86400}d {(uptime_seconds % 86400) // 3600}h {(uptime_seconds % 3600) // 60}m {uptime_seconds % 60}s" - - return { - "ram_usage": f"{mem_info.rss / 1024 / 1024:.2f} MB", - "cpu_usage": f"{process_cpu}%", - "system_cpu": f"{cpu_percent}%", - "memory_json_size": memory_json_size, - "guild_count": len(guild_info), - "bl_count": len(blacklisted_users), - "guilds": guild_info, - "blacklisted_users": blacklisted_users, - "last_command": self.last_command, - "last_command_time": self.last_command_time, - "bot_uptime": uptime_str, - "latency": f"{self.bot.latency * 1000:.2f} ms", - "bot_name": self.bot.user.name, - "bot_avatar_url": str(self.bot.user.avatar.url) if self.bot.user.avatar else "", - "authenticated": os.getenv("gooberauthenticated"), - "lastmsg": os.getenv("gooberlatestgen"), - "localversion": os.getenv("gooberlocal_version"), - "latestversion": os.getenv("gooberlatest_version"), - "owner": os.getenv("ownerid") - } - - async def handle_update(self, request): - if os.path.exists("goob/update.py"): - return web.FileResponse("goob/update.py") - return web.Response(text="Update file not found", status=404) - - async def handle_changesong(self, request): - song = request.query.get('song', '') - if song: - await self.bot.change_presence(activity=discord.Activity(type=discord.ActivityType.listening, name=song)) - return web.Response(text=f"Changed song to: {song}") - return web.Response(text="Please provide a song parameter", status=400) - - async def handle_changes(self, request): - if os.path.exists("goob/changes.txt"): - return web.FileResponse("goob/changes.txt") - return web.Response(text="Changelog not found", status=404) - - async def read_env_file(self): - env_vars = {} - try: - with open('.env', 'r') as f: - for line in f: - line = line.strip() - if not line or line.startswith('#') or '=' not in line: - continue - - key, value = line.split('=', 1) - key = key.strip() - if key in ['splashtext', 'DISCORD_BOT_TOKEN']: - continue - - env_vars[key] = value.strip('"\'') - except FileNotFoundError: - print(".env file not found") - return env_vars - - - async def handle_settings(self, request): - env_vars = await self.read_env_file() - - # Get config.py variables - config_vars = {} - try: - with open('config.py', 'r') as f: - for line in f: - if line.startswith('VERSION_URL'): - config_vars['VERSION_URL'] = line.split('=', 1)[1].strip().strip('"') - except FileNotFoundError: - pass - - settings_html = """ - - - - Goober Settings - - - -
-

Goober Settings

-
- """ - - for key, value in env_vars.items(): - settings_html += f""" -
- - -
- """ - - for key, value in config_vars.items(): - settings_html += f""" -
- - -
- """ - - settings_html += """ - -
-
- -
-
- - - """ - - return web.Response(text=settings_html, content_type='text/html') - - async def handle_update_settings(self, request): - data = await request.post() - env_text = "" - - try: - with open('.env', 'r') as f: - env_text = f.read() - except FileNotFoundError: - pass - - def replace_match(match): - key = match.group(1) - value = match.group(2) - if key in ['splashtext', 'DISCORD_BOT_TOKEN']: - return match.group(0) - if key in data: - new_value = data[key] - if not (new_value.startswith('"') and new_value.endswith('"')): - new_value = f'"{new_value}"' - return f'{key}={new_value}' - return match.group(0) - - env_text = re.sub(r'^(\w+)=([\s\S]+?)(?=\n\w+=|\Z)', replace_match, env_text, flags=re.MULTILINE) - - with open('.env', 'w') as f: - f.write(env_text.strip() + '\n') - - if 'VERSION_URL' in data: - config_text = "" - try: - with open('config.py', 'r') as f: - config_text = f.read() - except FileNotFoundError: - pass - - config_text = re.sub(r'^(VERSION_URL\s*=\s*").+?"', f'\\1{data["VERSION_URL"]}"', config_text, flags=re.MULTILINE) - - with open('config.py', 'w') as f: - f.write(config_text.strip() + '\n') - - return aiohttp.web.Response(text="Settings updated successfully!") - - async def handle_index(self, request): - stats = await self.get_bot_stats() - - guild_list_html = "" - for guild in stats['guilds']: - icon_html = f'guild icon' if guild["icon_url"] else '
' - guild_list_html += f""" -
- {icon_html} -
-
{guild["name"]}
-
{guild["member_count"]} members
-
-
- """ - blacklisted_users_html = "" - for user in stats['blacklisted_users']: - avatar_html = f'user avatar' if user["avatar_url"] else '
' - blacklisted_users_html += f""" -
- {avatar_html} -
-
{user["name"]}
-
ID: {user["id"]}
-
-
- """ - - owner_id = stats.get('owner') - owner = None - owner_username = "Owner" - owner_pfp = "" - - if owner_id: - try: - owner = await self.bot.fetch_user(int(owner_id)) - owner_username = f"{owner.name}" - owner_pfp = str(owner.avatar.url) if owner and owner.avatar else "" - except: - pass - - - html_content = f""" - - - - goobs central - - - -
- -
Welcome, {owner_username}
-
-
-
- RAM: - {stats['ram_usage']} -
-
- CPU: - {stats['system_cpu']} -
-
- Latency: - {stats['latency']} -
-
- JSON Size: - {stats['memory_json_size']} -
-
- Uptime: - {stats['bot_uptime']} -
-
- -
-
- botvatar -

{stats['bot_name']}

-
-
-

your stupid little goober that learns off other people's messages

-
- -
-
-
Last Command
-
{stats['last_command']}
-
at {stats['last_command_time']}
-
-
Logged into goober central
-
{stats['authenticated']}
-
-
Last generated message
-
{stats['lastmsg']}
-
-
Version
-
Installed Version: {stats['localversion']}
-
Latest Version: {stats['latestversion']}
-
-
goober-central URL
-
{VERSION_URL}
-
-
Change song
-
- - -
-
- -
-
Servers ({stats['guild_count']})
-
- {guild_list_html} -
-
-
Blacklisted Users ({stats['bl_count']})
-
- {blacklisted_users_html if stats['blacklisted_users'] else "
No blacklisted users
"} -
-
-
- - - - """ - - return web.Response(text=html_content, content_type='text/html') - - async def handle_stats(self, request): - return await self.handle_index(request) - - async def handle_json_data(self, request): - stats = await self.get_bot_stats() - return web.json_response(stats) - -async def setup(bot): - await bot.add_cog(GooberWeb(bot)) diff --git a/assets/fonts/Impact.ttf b/assets/fonts/Impact.ttf deleted file mode 100644 index 7b7956f..0000000 Binary files a/assets/fonts/Impact.ttf and /dev/null differ diff --git a/assets/fonts/TNR.ttf b/assets/fonts/TNR.ttf deleted file mode 100644 index 51261a0..0000000 Binary files a/assets/fonts/TNR.ttf and /dev/null differ diff --git a/assets/images/attention.webp b/assets/images/attention.webp deleted file mode 100644 index f680665..0000000 Binary files a/assets/images/attention.webp and /dev/null differ diff --git a/assets/images/bibinos.png b/assets/images/bibinos.png deleted file mode 100644 index 5a7b846..0000000 Binary files a/assets/images/bibinos.png and /dev/null differ diff --git a/assets/images/crash.webp b/assets/images/crash.webp deleted file mode 100644 index 60d918b..0000000 Binary files a/assets/images/crash.webp and /dev/null differ diff --git a/assets/images/crash2.png b/assets/images/crash2.png deleted file mode 100644 index 12f7616..0000000 Binary files a/assets/images/crash2.png and /dev/null differ diff --git a/assets/images/genuineidiot.png b/assets/images/genuineidiot.png deleted file mode 100644 index 226ca47..0000000 Binary files a/assets/images/genuineidiot.png and /dev/null differ diff --git a/assets/images/smashedphone.webp b/assets/images/smashedphone.webp deleted file mode 100644 index 845070f..0000000 Binary files a/assets/images/smashedphone.webp and /dev/null differ diff --git a/assets/images/thisisfine.png b/assets/images/thisisfine.png deleted file mode 100644 index 41977dc..0000000 Binary files a/assets/images/thisisfine.png and /dev/null differ diff --git a/bot.py b/bot.py deleted file mode 100644 index bbee94d..0000000 --- a/bot.py +++ /dev/null @@ -1,528 +0,0 @@ -import os -import re -import json -import time -import random -import traceback -import subprocess -import tempfile -import shutil -import psutil -import asyncio -import platform -import sys -from typing import List, Dict, Set, Optional, Tuple, Any, Union, Callable, Coroutine, TypeVar, Type -import logging -from modules.globalvars import * -from modules.prestartchecks import start_checks -from modules.logger import GooberFormatter -import logging - -logger = logging.getLogger("goober") -logger.setLevel(logging.DEBUG) - -console_handler = logging.StreamHandler() -console_handler.setLevel(logging.DEBUG) -console_handler.setFormatter(GooberFormatter()) - -file_handler = logging.FileHandler("log.txt", mode="w+", encoding="UTF-8") -file_handler.setLevel(logging.DEBUG) -file_handler.setFormatter(GooberFormatter(colors=False)) - -logger.addHandler(console_handler) -logger.addHandler(file_handler) - -# Print splash text and check for updates -print(splashtext) # Print splash text (from modules/globalvars.py) -start_checks() - -import requests -import discord -from discord.ext import commands -from discord import app_commands -from discord import Colour, Embed, File, Interaction, Message -from discord.abc import Messageable -from discord.ext import commands - -from modules.volta.main import _, set_language -from modules.markovmemory import * -from modules.version import * -from modules.sentenceprocessing import * -from modules.unhandledexception import handle_exception -from modules.image import gen_meme, gen_demotivator -from modules.minigames import guessthenumber, hangman -sys.excepthook = handle_exception -check_for_update() # Check for updates (from modules/version.py) -# Type aliases -T = TypeVar('T') -MessageContext = Union[commands.Context, discord.Interaction] -MessageReference = Union[Message, discord.WebhookMessage] - -# Constants with type hints -positive_gifs: List[str] = os.getenv("POSITIVE_GIFS", "").split(',') -currenthash: str = "" -launched: bool = False -slash_commands_enabled: bool = False - -# Load memory and Markov model for text generation -memory: List[str] = load_memory() -markov_model: Optional[markovify.Text] = load_markov_model() -if not markov_model: - logger.error(_('markov_model_not_found')) - memory = load_memory() - markov_model = train_markov_model(memory) - -generated_sentences: Set[str] = set() -used_words: Set[str] = set() - -async def load_cogs_from_folder(bot, folder_name="assets/cogs"): - for filename in os.listdir(folder_name): - if filename.endswith(".py") and not filename.startswith("_"): - cog_name = filename[:-3] - module_path = folder_name.replace("/", ".").replace("\\", ".") + f".{cog_name}" - try: - await bot.load_extension(module_path) - logger.info(f"{(_('loaded_cog'))} {cog_name}") - except Exception as e: - logger.error(f"{(_('cog_fail'))} {cog_name} {e}") - traceback.print_exc() - -async def send_alive_ping_periodically() -> None: - while True: - try: - requests.post(f"{VERSION_URL}/aliveping", json={"name": NAME}) - except Exception as e: - logger.error(f"{(_('error_sending_alive_ping'))}{RESET} {e}") - await asyncio.sleep(60) - -# Event: Called when the bot is ready -@bot.event -async def on_ready() -> None: - global launched - global slash_commands_enabled - global NAME - global status - - folder_name: str = "cogs" - if launched: - return - - await load_cogs_from_folder(bot) - try: - synced: List[discord.app_commands.AppCommand] = await bot.tree.sync() - logger.info(f"{_('synced_commands')} {len(synced)} {(_('synced_commands2'))}") - slash_commands_enabled = True - logger.info(f"{(_('started')).format(name=NAME)}") - bot.loop.create_task(send_alive_ping_periodically()) - except discord.errors.Forbidden as perm_error: - logger.error(f"Permission error while syncing commands: {perm_error}") - logger.error("Make sure the bot has the 'applications.commands' scope and is invited with the correct permissions.") - quit() - except Exception as e: - logger.error(f"{_('fail_commands_sync')} {e}") - traceback.print_exc() - quit() - - if not song: - return - - status = { - "idle": discord.Status.idle, - "dnd": discord.Status.dnd, - "invisible": discord.Status.invisible, - "online": discord.Status.online - }.get(status.lower(), discord.Status.online) - await bot.change_presence(status=status, activity=discord.Activity(type=discord.ActivityType.listening, name=f"{song}")) - launched = True -@bot.event -async def on_command_error(ctx: commands.Context, error: commands.CommandError) -> None: - from modules.unhandledexception import handle_exception - - if isinstance(error, commands.CommandInvokeError): - original: Exception = error.original - handle_exception( - type(original), original, original.__traceback__, - context=f"Command: {ctx.command} | User: {ctx.author}" - ) - else: - handle_exception( - type(error), error, error.__traceback__, - context=f"Command: {ctx.command} | User: {ctx.author}" - ) - -# Command: Retrain the Markov model from memory -@bot.hybrid_command(description=f"{(_('command_desc_retrain'))}") -async def retrain(ctx: commands.Context) -> None: - if ctx.author.id != ownerid: - return - - message_ref: MessageReference = await send_message(ctx, f"{(_('command_markov_retrain'))}") - try: - with open(MEMORY_FILE, 'r') as f: - memory: List[str] = json.load(f) - except FileNotFoundError: - await send_message(ctx, f"{(_('command_markov_memory_not_found'))}") - return - except json.JSONDecodeError: - await send_message(ctx, f"{(_('command_markov_memory_is_corrupt'))}") - return - - data_size: int = len(memory) - processed_data: int = 0 - processing_message_ref: MessageReference = await send_message(ctx, f"{(_('command_markov_retraining')).format(processed_data=processed_data, data_size=data_size)}") - start_time: float = time.time() - - for i, data in enumerate(memory): - processed_data += 1 - - global markov_model - markov_model = train_markov_model(memory) - save_markov_model(markov_model) - - await send_message(ctx, f"{_('command_markov_retrain_successful').format(data_size=data_size)}", edit=True, message_reference=processing_message_ref) - -# Command: Generate a sentence using the Markov model -@bot.hybrid_command(description=f"{(_('command_desc_talk'))}") -async def talk(ctx: commands.Context, sentence_size: int = 5) -> None: - if not markov_model: - await send_message(ctx, f"{(_('command_talk_insufficent_text'))}") - return - - response: Optional[str] = None - for _ in range(20): - if sentence_size == 1: - response = markov_model.make_short_sentence(max_chars=100, tries=100) - if response: - response = response.split()[0] - else: - response = markov_model.make_sentence(tries=100, max_words=sentence_size) - - if response and response not in generated_sentences: - if sentence_size > 1: - response = improve_sentence_coherence(response) - generated_sentences.add(response) - break - - if response: - cleaned_response: str = re.sub(r'[^\w\s]', '', response).lower() - coherent_response: str = rephrase_for_coherence(cleaned_response) - if random.random() < 0.9 and is_positive(coherent_response): - gif_url: str = random.choice(positive_gifs) - combined_message: str = f"{coherent_response}\n[jif]({gif_url})" - else: - combined_message: str = coherent_response - logger.info(combined_message) - os.environ['gooberlatestgen'] = combined_message - await send_message(ctx, combined_message) - else: - await send_message(ctx, f"{(_('command_talk_generation_fail'))}") - -@bot.hybrid_command(description=f"RAM") -async def ramusage(ctx): - process = psutil.Process(os.getpid()) - mem = process.memory_info().rss - await send_message(ctx, f"Total memory used: {mem / 1024 / 1024:.2f} MB") - -# Command: Generate an image -@bot.hybrid_command(description=f"{(_('command_desc_help'))}") -async def impact(ctx: commands.Context, text: Optional[str] = None) -> None: - assets_folder: str = "assets/images" - temp_input: Optional[str] = None - - def get_random_asset_image() -> Optional[str]: - files: List[str] = [f for f in os.listdir(assets_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))] - if not files: - return None - return os.path.join(assets_folder, random.choice(files)) - - if ctx.message.attachments: - attachment: discord.Attachment = ctx.message.attachments[0] - if attachment.content_type and attachment.content_type.startswith("image/"): - ext: str = os.path.splitext(attachment.filename)[1] - temp_input = f"tempy{ext}" - await attachment.save(temp_input) - input_path: str = temp_input - else: - fallback_image: Optional[str] = get_random_asset_image() - if fallback_image is None: - await ctx.reply(_('no_image_available')) - return - temp_input = tempfile.mktemp(suffix=os.path.splitext(fallback_image)[1]) - shutil.copy(fallback_image, temp_input) - input_path = temp_input - else: - fallback_image = get_random_asset_image() - if fallback_image is None: - await ctx.reply(_('no_image_available')) - return - temp_input = tempfile.mktemp(suffix=os.path.splitext(fallback_image)[1]) - shutil.copy(fallback_image, temp_input) - input_path = temp_input - - output_path: Optional[str] = await gen_meme(input_path, custom_text=text) - - - if output_path is None or not os.path.isfile(output_path): - if temp_input and os.path.exists(temp_input): - os.remove(temp_input) - await ctx.reply(_('failed_generate_image')) - return - - await ctx.send(file=discord.File(output_path)) - - if temp_input and os.path.exists(temp_input): - os.remove(temp_input) - -# New demotivator command -@bot.hybrid_command(description="Generate a demotivator poster with two lines of text") -async def demotivator(ctx: commands.Context) -> None: - assets_folder: str = "assets/images" - temp_input: Optional[str] = None - - def get_random_asset_image() -> Optional[str]: - files: List[str] = [f for f in os.listdir(assets_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))] - if not files: - return None - return os.path.join(assets_folder, random.choice(files)) - - if ctx.message.attachments: - attachment: discord.Attachment = ctx.message.attachments[0] - if attachment.content_type and attachment.content_type.startswith("image/"): - ext: str = os.path.splitext(attachment.filename)[1] - temp_input = f"tempy{ext}" - await attachment.save(temp_input) - input_path: str = temp_input - else: - fallback_image: Optional[str] = get_random_asset_image() - if fallback_image is None: - await ctx.reply(_('no_image_available')) - return - temp_input = tempfile.mktemp(suffix=os.path.splitext(fallback_image)[1]) - shutil.copy(fallback_image, temp_input) - input_path = temp_input - else: - fallback_image = get_random_asset_image() - if fallback_image is None: - await ctx.reply(_('no_image_available')) - return - temp_input = tempfile.mktemp(suffix=os.path.splitext(fallback_image)[1]) - shutil.copy(fallback_image, temp_input) - input_path = temp_input - - output_path: Optional[str] = await gen_demotivator(input_path) - - if output_path is None or not os.path.isfile(output_path): - if temp_input and os.path.exists(temp_input): - os.remove(temp_input) - await ctx.reply("Failed to generate demotivator.") - return - - await ctx.send(file=discord.File(output_path)) - - if temp_input and os.path.exists(temp_input): - os.remove(temp_input) - -bot.remove_command('help') -# Command: Show help information -@bot.hybrid_command(description=f"{(_('command_desc_help'))}") -async def help(ctx: commands.Context) -> None: - embed: discord.Embed = discord.Embed( - title=f"{(_('command_help_embed_title'))}", - description=f"{(_('command_help_embed_desc'))}", - color=Colour(0x000000) - ) - - command_categories: Dict[str, List[str]] = { - f"{(_('command_help_categories_general'))}": ["mem", "talk", "about", "ping", "impact", "demotivator", "help"], - f"{(_('command_help_categories_admin'))}": ["stats", "retrain", "setlanguage"] - } - - custom_commands: List[str] = [] - for cog_name, cog in bot.cogs.items(): - for command in cog.get_commands(): - if command.name not in command_categories[f"{(_('command_help_categories_general'))}"] and command.name not in command_categories[f"{(_('command_help_categories_admin'))}"]: - custom_commands.append(command.name) - - if custom_commands: - embed.add_field(name=f"{(_('command_help_categories_custom'))}", value="\n".join([f"{PREFIX}{command}" for command in custom_commands]), inline=False) - - for category, commands_list in command_categories.items(): - commands_in_category: str = "\n".join([f"{PREFIX}{command}" for command in commands_list]) - embed.add_field(name=category, value=commands_in_category, inline=False) - - await send_message(ctx, embed=embed) - -@bot.hybrid_command(description=f"{(_('command_desc_setlang'))}") -@app_commands.describe(locale="Choose your language") -async def setlanguage(ctx: commands.Context, locale: str) -> None: - if ctx.author.id != ownerid: - await ctx.send(":thumbsdown:") - return - await ctx.defer() - set_language(locale) - await ctx.send(":thumbsup:") - -# Event: Called on every message -@bot.event -async def on_message(message: discord.Message) -> None: - global memory, markov_model - EMOJIS = ["\U0001F604", "\U0001F44D", "\U0001F525", "\U0001F4AF", "\U0001F389", "\U0001F60E"] # originally was emojis but it would probably shit itself on systems without unicode so.... - if message.author.bot: - return - - if str(message.author.id) in BLACKLISTED_USERS: - return - - if message.content.startswith((f"{PREFIX}talk", f"{PREFIX}mem", f"{PREFIX}help", f"{PREFIX}stats", f"{PREFIX}")): - logger.info(f"{(_('command_ran')).format(message=message)}") - await bot.process_commands(message) - return - - if message.content: - if not USERTRAIN_ENABLED: - return - formatted_message: str = message.content - cleaned_message: str = formatted_message - if cleaned_message: - memory.append(cleaned_message) - message_metadata = { - "user_id": str(message.author.id), - "user_name": str(message.author), - "guild_id": str(message.guild.id) if message.guild else "DM", - "guild_name": str(message.guild.name) if message.guild else "DM", - "channel_id": str(message.channel.id), - "channel_name": str(message.channel), - "message": message.content, - "timestamp": time.time() - } - try: - if isinstance(memory, list): - memory.append({"_meta": message_metadata}) - else: - logger.warning("Memory is not a list; can't append metadata") - except Exception as e: - logger.warning(f"Failed to append metadata to memory: {e}") - - save_memory(memory) - - sentiment_score = is_positive(message.content) # doesnt work but im scared to change the logic now please ignore - if sentiment_score > 0.8: - if REACT != "True": - return - emoji = random.choice(EMOJIS) - try: - await message.add_reaction(emoji) - except Exception as e: - logger.info(f"Failed to react with emoji: {e}") - - await bot.process_commands(message) - -@bot.event -async def on_interaction(interaction: discord.Interaction) -> None: - name = None - if interaction.data.get('name') is None: - name = "Unknown" - else: - name = interaction.data['name'] - logger.info(f"{(_('command_ran_s')).format(interaction=interaction)}{name}") - -# Global check: Block blacklisted users from running commands -@bot.check -async def block_blacklisted(ctx: commands.Context) -> bool: - if str(ctx.author.id) in BLACKLISTED_USERS: - try: - if isinstance(ctx, discord.Interaction): - if not ctx.response.is_done(): - await ctx.response.send_message(_('blacklisted'), ephemeral=True) - else: - await ctx.followup.send(_('blacklisted'), ephemeral=True) - else: - await ctx.send(_('blacklisted_user'), ephemeral=True) - except: - pass - return False - return True - -# Command: Show bot latency -@bot.hybrid_command(description=f"{(_('command_desc_ping'))}") -async def ping(ctx: commands.Context) -> None: - await ctx.defer() - latency: int = round(bot.latency * 1000) - - LOLembed: discord.Embed = discord.Embed( - title="Pong!!", - description=( - f"{PING_LINE}\n" - f"`{(_('command_ping_embed_desc'))}: {latency}ms`\n" - ), - color=Colour(0x000000) - ) - LOLembed.set_footer(text=f"{(_('command_ping_footer'))} {ctx.author.name}", icon_url=ctx.author.avatar.url) - - await ctx.send(embed=LOLembed) - -def get_git_remote_url(): - try: - url = subprocess.check_output( - ["git", "config", "--get", "remote.origin.url"], - text=True, - stderr=subprocess.DEVNULL, - ).strip() - return url - except subprocess.CalledProcessError: - return "Unknown" - -# Command: Show about information -@bot.hybrid_command(description=f"{(_('command_about_desc'))}") -async def about(ctx: commands.Context) -> None: - print("-----------------------------------\n\n") - latest_version: str = check_for_update() - print("-----------------------------------") - embed: discord.Embed = discord.Embed(title=f"{(_('command_about_embed_title'))}", description="", color=Colour(0x000000)) - embed.add_field(name=f"{(_('command_about_embed_field1'))}", value=f"{NAME}", inline=False) - embed.add_field(name=f"{(_('command_about_embed_field2name'))}", value=f"{(_('command_about_embed_field2value')).format(local_version=local_version, latest_version=latest_version)}", inline=False) - embed.add_field(name=f"Git", value=get_git_remote_url()) - embed.add_field(name=f"OS", value=platform.platform()) - - await send_message(ctx, embed=embed) - -# Command: Show bot statistics (admin only) -@bot.hybrid_command(description="stats") -async def stats(ctx: commands.Context) -> None: - if ctx.author.id != ownerid: - return - print("-----------------------------------\n\n") - latest_version: str = check_for_update() - print("-----------------------------------") - memory_file: str = 'memory.json' - file_size: int = os.path.getsize(memory_file) - - with open(memory_file, 'r') as file: - line_count: int = sum(1 for _ in file) - - embed: discord.Embed = discord.Embed(title=f"{(_('command_stats_embed_title'))}", description=f"{(_('command_stats_embed_desc'))}", color=Colour(0x000000)) - embed.add_field(name=f"{(_('command_stats_embed_field1name'))}", value=f"{(_('command_stats_embed_field1value')).format(file_size=file_size, line_count=line_count)}", inline=False) - embed.add_field(name=f"{(_('command_stats_embed_field2name'))}", value=f"{(_('command_stats_embed_field2value')).format(local_version=local_version, latest_version=latest_version)}", inline=False) - embed.add_field(name=f"{(_('command_stats_embed_field3name'))}", value=f"{(_('command_stats_embed_field3value')).format(NAME=NAME, PREFIX=PREFIX, ownerid=ownerid, PING_LINE=PING_LINE, showmemenabled=showmemenabled, USERTRAIN_ENABLED=USERTRAIN_ENABLED, song=song, splashtext=splashtext)}", inline=False) - embed.add_field(name=f"OS", value=platform.platform()) - embed.add_field(name="Python Version", value=platform.python_version()) - await send_message(ctx, embed=embed) - -# Command: Upload memory.json to litterbox.catbox.moe and return the link -@bot.hybrid_command() -async def mem(ctx: commands.Context) -> None: - if showmemenabled != "true": - return - command: str = """curl -F "reqtype=fileupload" -F "time=1h" -F "fileToUpload=@memory.json" https://litterbox.catbox.moe/resources/internals/api.php""" - memorylitter: subprocess.CompletedProcess = subprocess.run(command, shell=True, capture_output=True, text=True) - logger.debug(memorylitter) - await send_message(ctx, memorylitter.stdout.strip()) - -# Helper: Improve sentence coherence (simple capitalization fix) -def improve_sentence_coherence(sentence: str) -> str: - # Capitalizes "i" to "I" in the sentence - sentence = sentence.replace(" i ", " I ") - return sentence - -# Start the bot -bot.run(TOKEN) \ No newline at end of file diff --git a/botminimal.py b/botminimal.py deleted file mode 100644 index 859662f..0000000 --- a/botminimal.py +++ /dev/null @@ -1,245 +0,0 @@ -import discord -from discord.ext import commands, tasks -import json -import markovify -import nltk -from nltk.tokenize import word_tokenize -import random -import os -import time -import re -from dotenv import load_dotenv -load_dotenv() -# download NLTK data files -nltk.download('punkt') -MEMORY_FILE = "memory.json" -MEMORY_LOADED_FILE = "MEMORY_LOADED" - -def load_memory(): - data = [] - - # Try to load data from MEMORY_FILE - try: - with open(MEMORY_FILE, "r") as f: - data = json.load(f) - except FileNotFoundError: - pass - - return data - -# Save memory data to MEMORY_FILE -def save_memory(memory): - with open(MEMORY_FILE, "w") as f: - json.dump(memory, f, indent=4) - -def train_markov_model(memory, additional_data=None): - if not memory: - return None - filtered_memory = [line for line in memory if isinstance(line, str)] - if additional_data: - filtered_memory.extend(line for line in additional_data if isinstance(line, str)) - if not filtered_memory: - return None - text = "\n".join(filtered_memory) - model = markovify.NewlineText(text, state_size=2) - return model - -#this doesnt work and im extremely pissed and mad -def append_mentions_to_18digit_integer(message): - pattern = r'\b\d{18}\b' - return re.sub(pattern, lambda match: f"<@{match.group(0)}>", message) - -def preprocess_message(message): - message = append_mentions_to_18digit_integer(message) - tokens = word_tokenize(message) - tokens = [token for token in tokens if token.isalnum()] - return " ".join(tokens) - - -intents = discord.Intents.default() -intents.messages = True -intents.message_content = True -bot = commands.Bot(command_prefix="g!", intents=intents) -memory = load_memory() -markov_model = train_markov_model(memory) - -generated_sentences = set() -used_words = set() - -@bot.event -async def on_ready(): - print(f"Logged in as {bot.user}") - post_message.start() - -positive_keywords = ["happy", "good", "great", "amazing", "awesome", "joy", "love", "fantastic", "positive", "cheerful", "victory", "favorite", "lmao", "lol", "xd", "XD", "xD", "Xd"] - -positive_gifs = [ - "https://tenor.com/view/chill-guy-my-new-character-gif-2777893510283028272", - "https://tenor.com/view/goodnight-goodnight-friends-weezer-weezer-goodnight-gif-7322052181075806988" -] - -def is_positive(sentence): - sentence_lower = sentence.lower() - return any(keyword in sentence_lower for keyword in positive_keywords) - -@bot.command() -async def ask(ctx): - await ctx.send("Command undergoing fixes!") - #not really lol - -@bot.command() -async def talk(ctx): - if markov_model: - response = None - for _ in range(10): # im going to shit my pants 10 times to get a coherent sentence - response = markov_model.make_sentence(tries=100) - if response and response not in generated_sentences: - # preprocess shit for grammer - response = improve_sentence_coherence(response) - generated_sentences.add(response) - break - - if response: - async with ctx.typing(): - cleaned_response = re.sub(r'[^\w\s]', '', response) - cleaned_response = cleaned_response.lower() - coherent_response = rephrase_for_coherence(cleaned_response) - if random.random() < 0.9: - if is_positive(coherent_response): - gif_url = random.choice(positive_gifs) - combined_message = f"{coherent_response}\n[jif]({gif_url})" - await ctx.send(combined_message) - else: - await ctx.send(coherent_response) - else: - await ctx.send(coherent_response) - else: - await ctx.send("I have nothing to say right now!") - else: - await ctx.send("I need to learn more from messages before I can talk.") - -def improve_sentence_coherence(sentence): - - sentence = sentence.replace(" i ", " I ") - return sentence - -def rephrase_for_coherence(sentence): - - words = sentence.split() - - coherent_sentence = " ".join(words) - return coherent_sentence - -bot.help_command = None - - -@bot.command() -async def help(ctx, *args): - - if args: - command_name = args[0] - command = bot.get_command(command_name) - - if command: - embed = discord.Embed( - title=f"Help: g!{command_name}", - description=f"**Description:** {command.help}", - color=discord.Color.blue() - ) - await ctx.send(embed=embed) - else: - await ctx.send(f"Command `{command_name}` not found.") - else: - - embed = discord.Embed( - title="Bot Help", - description="List of commands grouped by category.", - color=discord.Color.blue() - ) - - command_categories = { - "General": ["show_memory", "talk", "ask", "ping"], - "Debug": ["word_usage"] - } - - for category, commands_list in command_categories.items(): - commands_in_category = "\n".join([f"g!{command}" for command in commands_list]) - embed.add_field(name=category, value=commands_in_category, inline=False) - - await ctx.send(embed=embed) - -@bot.event -async def on_message(message): - global memory, markov_model, last_random_talk_time - - if message.author.bot: - return - - - if message.content.startswith(("g!talk", "g!show_memory", "g!help", "g!")): - await bot.process_commands(message) - return - - if message.content: - formatted_message = append_mentions_to_18digit_integer(message.content) - cleaned_message = preprocess_message(formatted_message) - if cleaned_message: - memory.append(cleaned_message) - save_memory(memory) - markov_model = train_markov_model(memory) - - # process any commands in the message - await bot.process_commands(message) - -@bot.command() -async def ping(ctx): - await ctx.defer() - #stolen from my expect bot very proud - latency = round(bot.latency * 1000) - - LOLembed = discord.Embed( - title="Pong!!", - description=( - f"The Beretta fires fast and won't make you feel any better!\n" - f"`Bot Latency: {latency}ms`\n" - ), - color=discord.Color.blue() - ) - LOLembed.set_footer(text=f"Requested by {ctx.author.name}", icon_url=ctx.author.avatar.url) - - await ctx.send(embed=LOLembed) # use ctx.send instead of respond because it has nothing to respond to and its not a slash command - -@bot.command() -async def show_memory(ctx): - memory = load_memory() - memory_text = json.dumps(memory, indent=4) - if len(memory_text) > 1024: - with open(MEMORY_FILE, "r") as f: - await ctx.send(" ", file=discord.File(f, MEMORY_FILE)) - else: - embed = discord.Embed(title="Memory Contents", description="The bot's memory.", color=discord.Color.blue()) - embed.add_field(name="Memory Data", value=f"```json\n{memory_text}\n```", inline=False) - await ctx.send(embed=embed) - -def improve_sentence_coherence(sentence): - sentence = sentence.replace(" i ", " I ") - return sentence - -@tasks.loop(minutes=60) -async def post_message(): - channel_id = 1296141985253691433 - channel = bot.get_channel(channel_id) - if channel and markov_model: - response = None - for _ in range(10): - response = markov_model.make_sentence(tries=100) - if response and response not in generated_sentences: - generated_sentences.add(response) - break - - if response: - await channel.send(response) - -# run the bot -TOKEN = os.getenv("DISCORDBOTTOKEN", "0") -bot.run(TOKEN) diff --git a/example.env b/example.env index 45cfd9b..30dfe93 100644 --- a/example.env +++ b/example.env @@ -1,29 +1 @@ -DISCORDBOTTOKEN= -BOTPREFIX="g." -PINGLINE="The Beretta fires fast and won't make you feel any better!" -BLACKLISTEDUSERS= -OWNERID= -USERTRAINENABLED="true" -SHOWMEMENABLED="true" -LOCALE=fi -NAME=goober -AUTOUPDATE="True" -SONG="Basket Case - Green Day" -CHECKSDISABLED="Frue" -REACT="True" -STATUS="idle" -POSITIVEGIFS="https://media.discordapp.net/attachments/821047460151427135/1181371808566493184/jjpQGeno.gif, https://tenor.com/view/chill-guy-my-new-character-gif-2777893510283028272,https://tenor.com/view/goodnight-goodnight-friends-weezer-weezer-goodnight-gif-7322052181075806988" -SPLASHTEXT=" - - SS\ - SS | - SSSSSS\ SSSSSS\ SSSSSS\ SSSSSSS\ SSSSSS\ SSSSSS\ -SS __SS\ SS __SS\ SS __SS\ SS __SS\ SS __SS\ SS __SS\ -SS / SS |SS / SS |SS / SS |SS | SS |SSSSSSSS |SS | \__| -SS | SS |SS | SS |SS | SS |SS | SS |SS ____|SS | -\SSSSSSS |\SSSSSS |\SSSSSS |SSSSSSS |\SSSSSSS\ SS | - \____SS | \______/ \______/ \_______/ \_______|\__| -SS\ SS | -\SSSSSS | - \______/ -" \ No newline at end of file +DISCORDBOTTOKEN= \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..45d8595 --- /dev/null +++ b/main.py @@ -0,0 +1,302 @@ +import os +import re +import json +import time +import random +import traceback +import subprocess +import tempfile +import shutil +import sys +from typing import ( + List, + Dict, + Literal, + Set, + Optional, + Tuple, + Any, + TypedDict, + Union, + Callable, + Coroutine, + TypeVar, + Type, +) +import logging +from modules.prestartchecks import start_checks +from modules.logger import GooberFormatter +from modules.volta.main import * +import logging +from modules.settings import Settings as SettingsManager +from modules.permission import requires_admin +from modules.volta.main import _ +from modules.version import check_for_update +check_for_update() +logger = logging.getLogger("goober") +logger.setLevel(logging.DEBUG) + +console_handler = logging.StreamHandler() +console_handler.setLevel(logging.DEBUG) +console_handler.setFormatter(GooberFormatter()) + +file_handler = logging.FileHandler("log.txt", mode="w+", encoding="UTF-8") +file_handler.setLevel(logging.DEBUG) +file_handler.setFormatter(GooberFormatter(colors=False)) + +logger.addHandler(console_handler) +logger.addHandler(file_handler) + +settings_manager = SettingsManager() +settings = settings_manager.settings + +splash_text: str = "" + +with open(settings["splash_text_loc"], "r", encoding="UTF-8") as f: + splash_text = "".join(f.readlines()) + print(splash_text) + +start_checks() + +import discord +from discord.ext import commands +from discord import app_commands +from discord import Colour, Message + +from better_profanity import profanity +from discord.ext import commands + +from modules.markovmemory import * +from modules.sentenceprocessing import * +from modules.unhandledexception import handle_exception + +sys.excepthook = handle_exception + + +class MessageMetadata(TypedDict): + user_id: str + user_name: str + guild_id: str | Literal["DM"] + guild_name: str | Literal["DM"] + channel_id: str + channel_name: str + message: str + timestamp: float + + +# Constants with type hints +positive_gifs: List[str] = settings["bot"]["misc"]["positive_gifs"] +name = settings["name"] +currenthash: str = "" +launched: bool = False +slash_commands_enabled: bool = False + + +# Set up Discord bot intents and create bot instance +intents: discord.Intents = discord.Intents.default() +intents.messages = True +intents.presences = True +intents.members = True +intents.message_content = True +bot: commands.Bot = commands.Bot( + command_prefix=settings["bot"]["prefix"], + intents=intents, + allowed_mentions=discord.AllowedMentions( + everyone=False, roles=False, users=False, replied_user=True + ), +) + +# Load memory and Markov model for text generation +memory: List[str | Dict[Literal["_meta"], MessageMetadata]] = load_memory() + +generated_sentences: Set[str] = set() +used_words: Set[str] = set() + + +async def load_cogs_from_folder(bot: commands.Bot, folder_name="assets/cogs"): + for filename in [file for file in os.listdir(folder_name) if file.endswith(".py")]: + cog_name: str = filename[:-3] + + if ( + "internal" not in folder_name + and cog_name not in settings["bot"]["enabled_cogs"] + ): + logger.debug(f"Skipping cog {cog_name} (not in enabled cogs)") + continue + + module_path = folder_name.replace("/", ".").replace("\\", ".") + f".{cog_name}" + + try: + await bot.load_extension(module_path) + logger.info(f"{_('loaded_cog')} {cog_name}") + except Exception as e: + logger.error(f"{_('cog_fail')} {cog_name} {e}") + traceback.print_exc() + + +# Event: Called when the bot is ready +@bot.event +async def on_ready() -> None: + global launched + + folder_name: str = "cogs" + if launched: + return + + await load_cogs_from_folder(bot) + await load_cogs_from_folder(bot, "assets/cogs/internal") + try: + synced: List[discord.app_commands.AppCommand] = await bot.tree.sync() + + logger.info(f"{_('synced_commands')} {len(synced)} {_('synced_commands2')}") + logger.info(_('started').format(name=name)) + + except discord.errors.Forbidden as perm_error: + logger.error(f"Permission error while syncing commands: {perm_error}") + logger.error( + "Make sure the bot has the 'applications.commands' scope and is invited with the correct permissions." + ) + quit() + except Exception as e: + logger.error(f"{_('fail_commands_sync')} {e}") + traceback.print_exc() + quit() + + if not settings["bot"]["misc"]["active_song"]: + return + await bot.change_presence( + activity=discord.Activity( + type=discord.ActivityType.listening, + name=settings["bot"]["misc"]["active_song"], + ) + ) + launched = True + +bot.remove_command('help') + +@bot.event +async def on_command_error(ctx: commands.Context, error: commands.CommandError) -> None: + from modules.unhandledexception import handle_exception + + if isinstance(error, commands.CommandInvokeError): + original: Exception = error.original + handle_exception( + type(original), + original, + original.__traceback__, + context=f"Command: {ctx.command} | User: {ctx.author}", + ) + else: + handle_exception( + type(error), + error, + error.__traceback__, + context=f"Command: {ctx.command} | User: {ctx.author}", + ) + +# Event: Called on every message +@bot.event +async def on_message(message: discord.Message) -> None: + global memory + EMOJIS = [ + "\U0001f604", + "\U0001f44d", + "\U0001f525", + "\U0001f4af", + "\U0001f389", + "\U0001f60e", + ] # originally was emojis but it would probably shit itself on systems without unicode so.... + if message.author.bot: + return + + if str(message.author.id) in settings["bot"]["blacklisted_users"]: + return + + commands = [ + settings["bot"]["prefix"] + command.name for command in bot.tree.get_commands() + ] + + if message.content.startswith(tuple(commands)): + logger.info(f"{(_('command_ran')).format(message=message)}") + await bot.process_commands(message) + return + + if message.content: + if not settings["bot"]["user_training"]: + return + + formatted_message: str = message.content + cleaned_message: str = preprocess_message(formatted_message) + if cleaned_message: + memory.append(cleaned_message) + + message_metadata: MessageMetadata = { + "user_id": str(message.author.id), + "user_name": str(message.author), + "guild_id": str(message.guild.id) if message.guild else "DM", + "guild_name": str(message.guild.name) if message.guild else "DM", + "channel_id": str(message.channel.id), + "channel_name": str(message.channel), + "message": message.content, + "timestamp": time.time(), + } + try: + if isinstance(memory, list): + memory.append({"_meta": message_metadata}) + else: + logger.warning("Memory is not a list; can't append metadata") + except Exception as e: + logger.warning(f"Failed to append metadata to memory: {e}") + + save_memory(memory) + + sentiment_score = is_positive( + message.content + ) # doesnt work but im scared to change the logic now please ignore + if sentiment_score > 0.8: + if not settings["bot"]["react_to_messages"]: + return + emoji = random.choice(EMOJIS) + try: + await message.add_reaction(emoji) + except Exception as e: + logger.info(f"Failed to react with emoji: {e}") + + await bot.process_commands(message) + + +# Event: Called on every interaction (slash command, etc.) +@bot.event +async def on_interaction(interaction: discord.Interaction) -> None: + logger.info(f"{(_('command_ran_s')).format(interaction=interaction)}{name}") + + +# Global check: Block blacklisted users from running commands +@bot.check +async def block_blacklisted(ctx: commands.Context) -> bool: + if ctx.author.id not in settings["bot"]["blacklisted_users"]: + return True + + try: + if isinstance(ctx, discord.Interaction): + if not ctx.response.is_done(): + await ctx.response.send_message(_('blacklisted'), ephemeral=True) + else: + await ctx.followup.send(_('blacklisted'), ephemeral=True) + else: + await ctx.send(_('blacklisted_user'), ephemeral=True) + except: + return False + + return True + + +# Helper: Improve sentence coherence (simple capitalization fix) +def improve_sentence_coherence(sentence: str) -> str: + # Capitalizes "i" to "I" in the sentence + sentence = sentence.replace(" i ", " I ") + return sentence + +# Start the bot +if __name__ == "__main__": + bot.run(os.environ.get("DISCORDBOTTOKEN", "")) \ No newline at end of file diff --git a/modules/globalvars.py b/modules/globalvars.py index ac37ed4..3225605 100644 --- a/modules/globalvars.py +++ b/modules/globalvars.py @@ -1,72 +1,71 @@ import os import platform +from typing import Callable, List from dotenv import load_dotenv import pathlib -import discord -from discord.ext import commands -from discord import app_commands -from discord import Colour, Embed, File, Interaction, Message -from discord.abc import Messageable -from discord.ext import commands import subprocess + + def get_git_branch(): try: - branch = subprocess.check_output( - ["git", "rev-parse", "--abbrev-ref", "HEAD"], - stderr=subprocess.DEVNULL - ).decode('utf-8').strip() + branch = ( + subprocess.check_output( + ["git", "rev-parse", "--abbrev-ref", "HEAD"], stderr=subprocess.DEVNULL + ) + .decode("utf-8") + .strip() + ) return branch except subprocess.CalledProcessError: return None -env_path = pathlib.Path(__file__).parent.parent / '.env' + +env_path = pathlib.Path(__file__).parent.parent / ".env" load_dotenv(dotenv_path=env_path) + +available_cogs: Callable[[], List[str]] = lambda: [ + file[:-3] for file in os.listdir("assets/cogs") if file.endswith(".py") +] + ANSI = "\033[" RED = f"{ANSI}31m" GREEN = f"{ANSI}32m" YELLOW = f"{ANSI}33m" PURPLE = f"{ANSI}35m" -DEBUG = f"{ANSI}1;30m" +DEBUG = f"{ANSI}90m" RESET = f"{ANSI}0m" + VERSION_URL = "https://raw.githubusercontent.com/gooberinc/version/main" -UPDATE_URL = VERSION_URL+"/latest_version.json" +UPDATE_URL = VERSION_URL + "/latest_version.json" print(UPDATE_URL) -LOCAL_VERSION_FILE = "current_version.txt" -TOKEN = os.getenv("DISCORDBOTTOKEN", "0") -PREFIX = os.getenv("BOTPREFIX", "g.") -PING_LINE = os.getenv("PINGLINE") -CHECKS_DISABLED = os.getenv("CHECKSDISABLED") -LOCALE = os.getenv("LOCALE", "en") -gooberTOKEN = os.getenv("GOOBERTOKEN") -splashtext = os.getenv("SPLASHTEXT") -ownerid = int(os.getenv("OWNERID", "0")) -status = os.getenv("STATUS") -showmemenabled = os.getenv("SHOWMEMENABLED") -BLACKLISTED_USERS = os.getenv("BLACKLISTEDUSERS", "").split(",") -USERTRAIN_ENABLED = os.getenv("USERTRAINENABLED", "true").lower() == "true" -NAME = os.getenv("NAME") -MEMORY_FILE = "memory.json" -MEMORY_LOADED_FILE = "MEMORY_LOADED" # is this still even used?? okay just checked its used in the markov module -ALIVEPING = os.getenv("ALIVEPING") -AUTOUPDATE = os.getenv("AUTOUPDATE") -song = os.getenv("SONG") +LOCAL_VERSION_FILE = "current_version.txt" + +# TOKEN = os.getenv("DISCORDBOTTOKEN", "0") +# PREFIX = os.getenv("BOTPREFIX", "g.") +# PING_LINE = os.getenv("PINGLINE") +# CHECKS_DISABLED = os.getenv("CHECKSDISABLED") +# LOCALE = os.getenv("LOCALE", "en") +# BLACKLISTED_USERS = os.getenv("BLACKLISTEDUSERS", "").split(",") +# USERTRAIN_ENABLED = os.getenv("USERTRAINENABLED", "true").lower() == "true" +# NAME = os.getenv("NAME") +# MEMORY_FILE = "memory.json" +# MEMORY_LOADED_FILE = "MEMORY_LOADED" # is this still even used?? okay just checked its used in the markov module +# ALIVEPING = os.getenv("ALIVEPING") +# AUTOUPDATE = os.getenv("AUTOUPDATE") +# REACT = os.getenv("REACT") + +# gooberTOKEN = os.getenv("GOOBERTOKEN") +# splashtext = os.getenv("SPLASHTEXT") +# ownerid = int(os.getenv("OWNERID", "0")) +# showmemenabled = os.getenv("SHOWMEMENABLED") + + +# IGNOREWARNING = False # is this either??? i don't think so? +# song = os.getenv("song") arch = platform.machine() +slash_commands_enabled = True # 100% broken, its a newer enough version so its probably enabled by default.... fix this at somepoint or hard code it in goober central code launched = False latest_version = "0.0.0" -local_version = "2.3.5" -os.environ['gooberlocal_version'] = local_version -REACT = os.getenv("REACT") -if get_git_branch() == "dev": - beta = True - # this makes goober think its a beta version, so it will not update to the latest stable version or run any version checks -else: - beta = False - - -# Set up Discord bot intents and create bot instance -intents: discord.Intents = discord.Intents.default() -intents.messages = True -intents.presences = True -intents.members = True -intents.message_content = True -bot: commands.Bot = commands.Bot(command_prefix=PREFIX, intents=intents, allowed_mentions=discord.AllowedMentions(everyone=False, roles=False, users=False, replied_user=True)) \ No newline at end of file +local_version = "3.0.0" +os.environ["gooberlocal_version"] = local_version +beta = get_git_branch() == "dev" \ No newline at end of file diff --git a/modules/image.py b/modules/image.py deleted file mode 100644 index d3807df..0000000 --- a/modules/image.py +++ /dev/null @@ -1,177 +0,0 @@ -import os -import re -import random -import shutil -import tempfile -from typing import Optional, List -from PIL import Image, ImageDraw, ImageFont, ImageOps -from modules.markovmemory import load_markov_model -from modules.sentenceprocessing import improve_sentence_coherence, rephrase_for_coherence - -generated_sentences = set() - -def load_font(size): - return ImageFont.truetype("assets/fonts/Impact.ttf", size=size) - -def load_tnr(size): - return ImageFont.truetype("assets/fonts/TNR.ttf", size=size) - -def draw_text_with_outline(draw, text, x, y, font): - outline_offsets = [(-2, -2), (-2, 2), (2, -2), (2, 2), (0, -2), (0, 2), (-2, 0), (2, 0)] - for ox, oy in outline_offsets: - draw.text((x + ox, y + oy), text, font=font, fill="black") - draw.text((x, y), text, font=font, fill="white") - -def fits_in_width(text, font, max_width, draw): - bbox = draw.textbbox((0, 0), text, font=font) - text_width = bbox[2] - bbox[0] - return text_width <= max_width - -def split_text_to_fit(text, font, max_width, draw): - words = text.split() - for i in range(len(words), 0, -1): - top_text = " ".join(words[:i]) - bottom_text = " ".join(words[i:]) - if fits_in_width(top_text, font, max_width, draw) and fits_in_width(bottom_text, font, max_width, draw): - return top_text, bottom_text - midpoint = len(words) // 2 - return " ".join(words[:midpoint]), " ".join(words[midpoint:]) - -async def gen_meme(input_image_path, sentence_size=5, max_attempts=10, custom_text=None): - markov_model = load_markov_model() - if not markov_model or not os.path.isfile(input_image_path): - return None - - attempt = 0 - while attempt < max_attempts: - with Image.open(input_image_path).convert("RGBA") as img: - draw = ImageDraw.Draw(img) - width, height = img.size - - font_size = int(height / 10) - font = load_font(font_size) - - response = None - if custom_text: - response = custom_text - else: - for _ in range(20): - if sentence_size == 1: - candidate = markov_model.make_short_sentence(max_chars=100, tries=100) - if candidate: - candidate = candidate.split()[0] - else: - candidate = markov_model.make_sentence(tries=100, max_words=sentence_size) - - if candidate and candidate not in generated_sentences: - if sentence_size > 1: - candidate = improve_sentence_coherence(candidate) - generated_sentences.add(candidate) - response = candidate - break - - if not response: - response = "NO TEXT GENERATED" - - cleaned_response = re.sub(r'[^\w\s]', '', response).lower() - coherent_response = rephrase_for_coherence(cleaned_response).upper() - - bbox = draw.textbbox((0, 0), coherent_response, font=font) - text_width = bbox[2] - bbox[0] - text_height_px = bbox[3] - bbox[1] - max_text_height = height // 4 - - if text_width <= width and text_height_px <= max_text_height: - draw_text_with_outline(draw, coherent_response, (width - text_width) / 2, 0, font) - img.save(input_image_path) - return input_image_path - else: - top_text, bottom_text = split_text_to_fit(coherent_response, font, width, draw) - - top_bbox = draw.textbbox((0, 0), top_text, font=font) - bottom_bbox = draw.textbbox((0, 0), bottom_text, font=font) - - top_height = top_bbox[3] - top_bbox[1] - bottom_height = bottom_bbox[3] - bottom_bbox[1] - - if top_height <= max_text_height and bottom_height <= max_text_height: - draw_text_with_outline(draw, top_text, (width - (top_bbox[2] - top_bbox[0])) / 2, 0, font) - y_bottom = height - bottom_height - int(height * 0.04) - draw_text_with_outline(draw, bottom_text, (width - (bottom_bbox[2] - bottom_bbox[0])) / 2, y_bottom, font) - img.save(input_image_path) - return input_image_path - - attempt += 1 - - with Image.open(input_image_path).convert("RGBA") as img: - draw = ImageDraw.Draw(img) - width, height = img.size - font_size = int(height / 10) - font = load_font(font_size) - - truncated = coherent_response[:100] - bbox = draw.textbbox((0, 0), truncated, font=font) - text_width = bbox[2] - bbox[0] - draw_text_with_outline(draw, truncated, (width - text_width) / 2, 0, font) - img.save(input_image_path) - return input_image_path - -async def gen_demotivator(input_image_path, max_attempts=5): - markov_model = load_markov_model() - if not markov_model or not os.path.isfile(input_image_path): - return None - - attempt = 0 - while attempt < max_attempts: - with Image.open(input_image_path).convert("RGB") as img: - size = max(img.width, img.height) - frame_thick = int(size * 0.0054) - inner_size = size - 2 * frame_thick - resized_img = img.resize((inner_size, inner_size), Image.LANCZOS) - framed = Image.new("RGB", (size, size), "white") - framed.paste(resized_img, (frame_thick, frame_thick)) - landscape_w = int(size * 1.5) - caption_h = int(size * 0.3) - canvas_h = framed.height + caption_h - canvas = Image.new("RGB", (landscape_w, canvas_h), "black") - # the above logic didnt even work, fml - fx = (landscape_w - framed.width) // 2 - canvas.paste(framed, (fx, 0)) - - draw = ImageDraw.Draw(canvas) - - title = subtitle = None - for _ in range(20): - t = markov_model.make_sentence(tries=100, max_words=4) - s = markov_model.make_sentence(tries=100, max_words=5) - if t and s and t != s: - title = t.upper() - subtitle = s.capitalize() - break - if not title: title = "DEMOTIVATOR" - if not subtitle: subtitle = "no text generated" - - title_sz = int(caption_h * 0.4) - sub_sz = int(caption_h * 0.25) - title_font = load_tnr(title_sz) - sub_font = load_tnr(sub_sz) - - bbox = draw.textbbox((0, 0), title, font=title_font) - txw, txh = bbox[2] - bbox[0], bbox[3] - bbox[1] - tx = (landscape_w - txw) // 2 - ty = framed.height + int(caption_h * 0.1) - draw_text_with_outline(draw, title, tx, ty, title_font) - - bbox = draw.textbbox((0, 0), subtitle, font=sub_font) - sxw, sxh = bbox[2] - bbox[0], bbox[3] - bbox[1] - sx = (landscape_w - sxw) // 2 - sy = ty + txh + int(caption_h * 0.05) - for ox, oy in [(-1, -1), (1, -1), (-1, 1), (1, 1)]: - draw.text((sx + ox, sy + oy), subtitle, font=sub_font, fill="black") - draw.text((sx, sy), subtitle, font=sub_font, fill="#AAAAAA") - - canvas.save(input_image_path) - return input_image_path - - attempt += 1 - return None diff --git a/modules/markovmemory.py b/modules/markovmemory.py index 3235035..d7ce6cd 100644 --- a/modules/markovmemory.py +++ b/modules/markovmemory.py @@ -3,9 +3,16 @@ import json import markovify import pickle from modules.globalvars import * -from modules.volta.main import _ import logging +from modules.volta.main import _ +from modules.settings import instance as settings_manager + +settings = settings_manager.settings + + logger = logging.getLogger("goober") + + # Get file size and line count for a given file path def get_file_info(file_path): try: @@ -16,46 +23,52 @@ def get_file_info(file_path): except Exception as e: return {"error": str(e)} + # Load memory data from file, or use default dataset if not loaded yet def load_memory(): data = [] # Try to load data from MEMORY_FILE try: - with open(MEMORY_FILE, "r") as f: + with open(settings["bot"]["active_memory"], "r") as f: data = json.load(f) except FileNotFoundError: pass return data + # Save memory data to MEMORY_FILE def save_memory(memory): - with open(MEMORY_FILE, "w") as f: + with open(settings["bot"]["active_memory"], "w") as f: json.dump(memory, f, indent=4) -def train_markov_model(memory, additional_data=None): + +def train_markov_model(memory, additional_data=None) -> markovify.NewlineText | None: if not memory: return None + filtered_memory = [line for line in memory if isinstance(line, str)] if additional_data: - filtered_memory.extend(line for line in additional_data if isinstance(line, str)) + filtered_memory.extend( + line for line in additional_data if isinstance(line, str) + ) + if not filtered_memory: return None + text = "\n".join(filtered_memory) model = markovify.NewlineText(text, state_size=2) return model -# Save the Markov model to a pickle file -def save_markov_model(model, filename='markov_model.pkl'): - with open(filename, 'wb') as f: +def save_markov_model(model, filename="markov_model.pkl"): + with open(filename, "wb") as f: pickle.dump(model, f) logger.info(f"Markov model saved to {filename}.") -# Load the Markov model from a pickle file -def load_markov_model(filename='markov_model.pkl'): +def load_markov_model(filename="markov_model.pkl"): try: - with open(filename, 'rb') as f: + with open(filename, "rb") as f: model = pickle.load(f) logger.info(f"{_('model_loaded')} {filename}.{RESET}") return model diff --git a/modules/minigames.py b/modules/minigames.py deleted file mode 100644 index 0a74a82..0000000 --- a/modules/minigames.py +++ /dev/null @@ -1,71 +0,0 @@ -import random -import discord -from discord import ui, Interaction, TextStyle -from discord.ext import commands -import aiohttp -import asyncio -from modules.globalvars import bot -from modules.volta.main import _ - -# @bot.hybrid_command(description=_('minigames_guess_the_number')) -async def guessthenumber(ctx: commands.Context): - number = random.randint(1, 10) - class GuessModal(ui.Modal, title=_('minigames_guess_the_number')): - guess = ui.TextInput(label=_('minigames_your_guess'), style=TextStyle.short) - async def on_submit(self, interaction: Interaction): - try: - user_guess = int(self.guess.value) - except: - await interaction.response.send_message(_('minigames_invalid_number'), ephemeral=True) - return - if user_guess == number: - await interaction.response.send_message(_('minigames_correct'), ephemeral=True) - else: - await interaction.response.send_message(f"{_('minigames_wrong_number')} {number}.", ephemeral=True) - async def button_callback(interaction: Interaction): - await interaction.response.send_modal(GuessModal()) - button = ui.Button(label=_('minigames_guess_button'), style=discord.ButtonStyle.primary) - button.callback = button_callback - view = ui.View() - view.add_item(button) - await ctx.send(_('minigames_click_to_guess'), view=view) - -# @bot.hybrid_command(description=_('minigames_hangman')) nope nope nope fuck no nope no thanks no nuh uh not today nope -async def hangman(ctx: commands.Context): - async with aiohttp.ClientSession() as session: - async with session.get("https://random-word-api.herokuapp.com/word?number=1") as resp: - if resp.status != 200: - await ctx.send("Failed to get a random word.") - return - data = await resp.json() - word = data[0].lower() - print(word) - guessed_letters = set() - wrong_guesses = 0 - max_wrong = 6 - def display_word(): - return " ".join([c if c in guessed_letters else "_" for c in word]) - class GuessModal(ui.Modal, title=_('minigames_hangman_guess')): - letter = ui.TextInput(label=_('minigames_hangman_user_letter_guess'), style=TextStyle.short, max_length=1) - async def on_submit(self, interaction: Interaction): - nonlocal guessed_letters, wrong_guesses - guess = self.letter.value.lower() - if guess in guessed_letters: - await interaction.response.send_message(f"{_('minigames_hangman_already_guessed')}'{guess}'!", ephemeral=True) - return - guessed_letters.add(guess) - if guess not in word: - wrong_guesses += 1 - if all(c in guessed_letters for c in word): - await interaction.response.edit_message(content=f"{_('minigames_hangman_won')} **{word}**", view=None) - elif wrong_guesses >= max_wrong: - await interaction.response.edit_message(content=f"{_('minigames_hangman_lost')} **{word}**", view=None) - else: - await interaction.response.edit_message(content=_('minigames_hangman_game').format(display_word=display_word(),wrong_guesses=wrong_guesses,max_wrong=max_wrong), view=view) - async def button_callback(interaction: Interaction): - await interaction.response.send_modal(GuessModal()) - button = ui.Button(label=_('minigames_click_to_guess'), style=discord.ButtonStyle.primary) - button.callback = button_callback - view = ui.View() - view.add_item(button) - await ctx.send(_('minigames_hangman_game').format(display_word=display_word,wrong_guesses=wrong_guesses,max_wrong=max_wrong), view=view) \ No newline at end of file diff --git a/modules/permission.py b/modules/permission.py new file mode 100644 index 0000000..5434c61 --- /dev/null +++ b/modules/permission.py @@ -0,0 +1,37 @@ +from functools import wraps +import discord + +import discord.ext +import discord.ext.commands + +from modules.settings import Settings as SettingsManager +import logging + +logger = logging.getLogger("goober") + +settings_manager = SettingsManager() +settings = settings_manager.settings + + +class PermissionError(Exception): + pass + + +def requires_admin(): + async def wrapper(ctx: discord.ext.commands.Context): + print(ctx.author.id) + if ctx.author.id not in settings["bot"]["owner_ids"]: + await ctx.send("You don't have the necessary permissions to run this command!") + return + + + command = ctx.command + if not command: + logger.info(f"Unknown command ran {ctx.message}") + else: + logger.info( + f'Command {settings["bot"]["prefix"]}{command.name} @{ctx.author.name}' + ) + return True + + return discord.ext.commands.check(wrapper) \ No newline at end of file diff --git a/modules/prestartchecks.py b/modules/prestartchecks.py index bb952f4..cd9fd72 100644 --- a/modules/prestartchecks.py +++ b/modules/prestartchecks.py @@ -1,4 +1,5 @@ from modules.globalvars import * +from modules.settings import Settings as SettingsManager from modules.volta.main import _, check_missing_translations import time import os @@ -14,6 +15,13 @@ import logging logger = logging.getLogger("goober") +settings_manager = SettingsManager() +settings = settings_manager.settings +MEMORY_FILE = settings["bot"]["active_memory"] +with open(settings["splash_text_loc"], "r", encoding="UTF-8") as f: + splash_text = "".join(f.readlines()) + + # import shutil psutilavaliable = True try: @@ -28,8 +36,7 @@ def check_for_model(): logger.info("Model is installed.") else: logger.info("Model is not installed.") - - + def iscloned(): if os.path.exists(".git"): return True @@ -38,130 +45,112 @@ def iscloned(): sys.exit(1) def get_stdlib_modules(): - stdlib_path = pathlib.Path(sysconfig.get_paths()['stdlib']) - modules = set() - if hasattr(sys, 'builtin_module_names'): - modules.update(sys.builtin_module_names) - for file in stdlib_path.glob('*.py'): - if file.stem != '__init__': - modules.add(file.stem) - for folder in stdlib_path.iterdir(): - if folder.is_dir() and (folder / '__init__.py').exists(): - modules.add(folder.name) - for file in stdlib_path.glob('*.*'): - if file.suffix in ('.so', '.pyd'): - modules.add(file.stem) + stdlib = pathlib.Path(sysconfig.get_paths()['stdlib']) + modules = set(sys.builtin_module_names) + + modules.update( + f.stem for f in stdlib.glob('*.py') if f.stem != '__init__' + ) + modules.update( + d.name for d in stdlib.iterdir() if (d / '__init__.py').exists() + ) + modules.update( + f.stem for f in stdlib.glob('*') if f.suffix in ('.so', '.pyd') + ) return modules def check_requirements(): - STD_LIB_MODULES = get_stdlib_modules() - PACKAGE_ALIASES = { + stdlib = get_stdlib_modules() + aliases = { "discord": "discord.py", "better_profanity": "better-profanity", "dotenv": "python-dotenv", "pil": "pillow" } - parent_dir = os.path.dirname(os.path.abspath(__file__)) - requirements_path = os.path.abspath(os.path.join(parent_dir, '..', 'requirements.txt')) - - if not os.path.exists(requirements_path): - logger.error(f"{(_('requirements_not_found')).format(path=requirements_path)}") + req_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'requirements.txt')) + if not os.path.exists(req_path): + logger.error(_('requirements_not_found').format(path=req_path)) return - with open(requirements_path, 'r') as f: - lines = f.readlines() - requirements = set() - for line in lines: - line = line.strip() - if line and not line.startswith('#'): - base_pkg = line.split('==')[0].lower() - aliased_pkg = PACKAGE_ALIASES.get(base_pkg, base_pkg) - requirements.add(aliased_pkg) + with open(req_path) as f: + requirements = { + aliases.get(line.split('==')[0].strip().lower(), line.split('==')[0].strip().lower()) + for line in f if line.strip() and not line.startswith('#') + } - installed_packages = {dist.metadata['Name'].lower() for dist in importlib.metadata.distributions()} + installed = {d.metadata['Name'].lower() for d in importlib.metadata.distributions()} missing = [] - for req in sorted(requirements): - if req in STD_LIB_MODULES or req == 'modules': - print((_('std_lib_local_skipped')).format(package=req)) + for pkg in sorted(requirements): + if pkg in stdlib or pkg == 'modules': + print(_('std_lib_local_skipped').format(package=pkg)) continue - - check_name = req.lower() - - if check_name in installed_packages: - logger.info(f"{_('ok_installed').format(package=check_name)} {check_name}") + if pkg in installed: + logger.info(_('ok_installed').format(package=pkg)) else: - logger.error(f"{(_('missing_package')).format(package=check_name)} {check_name} {(_('missing_package2'))}") - missing.append(check_name) + logger.error(f"{_('missing_package').format(package=pkg)} {pkg} {_('missing_package2')}") + missing.append(pkg) if missing: logger.error(_('missing_packages_detected')) for pkg in missing: print(f" - {pkg}") sys.exit(1) - else: - logger.info(_('all_requirements_satisfied')) + + logger.info(_('all_requirements_satisfied')) + def check_latency(): host = "1.1.1.1" system = platform.system() - if system == "Windows": - cmd = ["ping", "-n", "1", "-w", "1000", host] - latency_pattern = r"Average = (\d+)ms" - - elif system == "Darwin": - cmd = ["ping", "-c", "1", host] - latency_pattern = r"time=([\d\.]+) ms" - - else: - cmd = ["ping", "-c", "1", "-W", "1", host] - latency_pattern = r"time=([\d\.]+) ms" + cmd, pattern = { + "Windows": (["ping", "-n", "1", "-w", "1000", host], r"Average = (\d+)ms"), + "Darwin": (["ping", "-c", "1", host], r"time=([\d\.]+) ms") + }.get(system, (["ping", "-c", "1", "-W", "1", host], r"time=([\d\.]+) ms")) try: - result = subprocess.run( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True - ) - - if result.returncode == 0: - match = re.search(latency_pattern, result.stdout) - if match: - latency_ms = float(match.group(1)) - logger.info((_('ping_to')).format(host=host, latency=latency_ms)) - if latency_ms > 300: - logger.warning(f"{(_('high_latency'))}") - else: - logger.warning((_('could_not_parse_latency'))) - else: + result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + if result.returncode != 0: print(result.stderr) - logger.error(f"{(_('ping_failed')).format(host=host)}{RESET}") + return logger.error(_('ping_failed').format(host=host) + RESET) + + match = re.search(pattern, result.stdout) + if not match: + return logger.warning(_('could_not_parse_latency')) + + latency = float(match.group(1)) + logger.info(_('ping_to').format(host=host, latency=latency)) + if latency > 300: + logger.warning(_('high_latency')) + except Exception as e: - logger.error((_('error_running_ping')).format(error=e)) + logger.error(_('error_running_ping').format(error=e)) def check_memory(): - if psutilavaliable == False: + if not psutilavaliable: return - try: - memory_info = psutil.virtual_memory() # type: ignore - total_memory = memory_info.total / (1024 ** 3) - used_memory = memory_info.used / (1024 ** 3) - free_memory = memory_info.available / (1024 ** 3) - logger.info((_('memory_usage')).format(used=used_memory, total=total_memory, percent=(used_memory / total_memory) * 100)) - if used_memory > total_memory * 0.9: - print(f"{YELLOW}{(_('memory_above_90')).format(percent=(used_memory / total_memory) * 100)}{RESET}") - logger.info((_('total_memory')).format(total=total_memory)) - logger.info((_('used_memory')).format(used=used_memory)) - if free_memory < 1: - logger.warning(f"{(_('low_free_memory')).format(free=free_memory)}") + try: + mem = psutil.virtual_memory() # type: ignore + total = mem.total / 1e9 + used = mem.used / 1e9 + free = mem.available / 1e9 + percent_used = (used / total) * 100 + + logger.info(_('memory_usage').format(used=used, total=total, percent=percent_used)) + if percent_used > 90: + print(f"{YELLOW}{_('memory_above_90').format(percent=percent_used)}{RESET}") + logger.info(_('total_memory').format(total=total)) + logger.info(_('used_memory').format(used=used)) + if free < 1: + logger.warning(_('low_free_memory').format(free=free)) sys.exit(1) + except ImportError: - logger.error(_('psutil_not_installed')) # todo: translate this into italian and put it in the translations "psutil is not installed. Memory check skipped." + logger.error(_('psutil_not_installed')) def check_cpu(): if psutilavaliable == False: @@ -178,22 +167,23 @@ def check_cpu(): def check_memoryjson(): try: - logger.info((_('memory_file')).format(size=os.path.getsize(MEMORY_FILE) / (1024 ** 2))) - if os.path.getsize(MEMORY_FILE) > 1_073_741_824: - logger.warning(f"{(_('memory_file_large'))}") + size_mb = os.path.getsize(MEMORY_FILE) / (1024 ** 2) + logger.info(_('memory_file').format(size=size_mb)) + if size_mb > 1024: + logger.warning(_('memory_file_large')) + try: with open(MEMORY_FILE, 'r', encoding='utf-8') as f: json.load(f) - except json.JSONDecodeError as e: - logger.error(f"{(_('memory_file_corrupted')).format(error=e)}") - logger.warning(f"{(_('consider_backup_memory'))}") - except UnicodeDecodeError as e: - logger.error(f"{(_('memory_file_encoding')).format(error=e)}") - logger.warning(f"{(_('consider_backup_memory'))}") + except (json.JSONDecodeError, UnicodeDecodeError) as e: + msg = _('memory_file_corrupted') if isinstance(e, json.JSONDecodeError) else _('memory_file_encoding') + logger.error(msg.format(error=e)) + logger.warning(_('consider_backup_memory')) except Exception as e: - logger.error(f"{(_('error_reading_memory')).format(error=e)}") + logger.error(_('error_reading_memory').format(error=e)) + except FileNotFoundError: - logger(f"{(_('memory_file_not_found'))}") + logger.error(_('memory_file_not_found')) def presskey2skip(timeout): if os.name == 'nt': @@ -228,8 +218,8 @@ def presskey2skip(timeout): termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) beta = beta def start_checks(): - if CHECKS_DISABLED == "True": - logger.warning(f"{(_('checks_disabled'))}") + if settings["disable_checks"]: + logger.warning(f"{_('checks_disabled')}") return logger.info(_('running_prestart_checks')) check_for_model() @@ -251,5 +241,4 @@ def start_checks(): pass logger.info(_('continuing_in_seconds').format(seconds=5)) presskey2skip(timeout=5) - os.system('cls' if os.name == 'nt' else 'clear') - print(splashtext) \ No newline at end of file + os.system('cls' if os.name == 'nt' else 'clear') \ No newline at end of file diff --git a/modules/sentenceprocessing.py b/modules/sentenceprocessing.py index 95a703b..f7a933a 100644 --- a/modules/sentenceprocessing.py +++ b/modules/sentenceprocessing.py @@ -37,29 +37,27 @@ def is_positive(sentence): return sentiment_score > 0.6 # had to raise the bar because it kept saying "death to jews" was fine and it kept reacting to them async def send_message(ctx, message=None, embed=None, file=None, edit=False, message_reference=None): - if edit and message_reference: - try: + try: + if edit and message_reference: await message_reference.edit(content=message, embed=embed) - except Exception as e: - await ctx.send(f"{RED}{(_('edit_fail'))} {e}{RESET}") - else: + return message_reference + + send_kwargs = {} + if message: + send_kwargs['content'] = message + if embed: + send_kwargs['embed'] = embed + if file: + send_kwargs['file'] = file + if hasattr(ctx, "respond"): - sent_message = None - if embed: - sent_message = await ctx.respond(embed=embed, ephemeral=False) - elif message: - sent_message = await ctx.respond(message, ephemeral=False) - if file: - sent_message = await ctx.respond(file=file, ephemeral=False) + return await ctx.respond(**send_kwargs, ephemeral=False) else: - sent_message = None - if embed: - sent_message = await ctx.send(embed=embed) - elif message: - sent_message = await ctx.send(message) - if file: - sent_message = await ctx.send(file=file) - return sent_message + return await ctx.send(**send_kwargs) + + except Exception as e: + await ctx.send(f"{RED}{(_('edit_fail'))} {e}{RESET}") + def preprocess_message(message): message = message diff --git a/modules/settings.py b/modules/settings.py new file mode 100644 index 0000000..1dfc377 --- /dev/null +++ b/modules/settings.py @@ -0,0 +1,58 @@ +import json +import os +from typing import List, TypedDict +import copy + +class MiscBotOptions(TypedDict): + ping_line: str + active_song: str + positive_gifs: List[str] + block_profanity: bool + +class BotSettings(TypedDict): + prefix: str + owner_ids: List[int] + blacklisted_users: List[int] + user_training: bool + allow_show_mem_command: bool + react_to_messages: bool + misc: MiscBotOptions + enabled_cogs: List[str] + active_memory: str + +class SettingsType(TypedDict): + bot: BotSettings + locale: str + name: str + auto_update: bool + disable_checks: bool + splash_text_loc: str + +class Settings: + def __init__(self) -> None: + self.path = os.path.join(".", "settings", "settings.json") + if not os.path.exists(self.path): + raise FileNotFoundError("settings.json file does not exist!") + + with open(self.path, "r") as f: + self._kv_store = json.load(f) + + self.settings: SettingsType = self._kv_store # type: ignore + self.original_settings = copy.deepcopy(self.settings) + + def get_locale(self) -> str: + # Return locale or None if missing + return self.settings.get("locale", None) + + def commit(self) -> None: + with open(self.path, "w") as f: + json.dump(self.settings, f, indent=4) + self.original_settings = copy.deepcopy(self.settings) + + def discard(self) -> None: + self.settings = copy.deepcopy(self.original_settings) + +# Usage +instance = Settings() +locale = instance.get_locale() +print("Locale:", locale) diff --git a/modules/unhandledexception.py b/modules/unhandledexception.py index 9521e72..c1dd5f6 100644 --- a/modules/unhandledexception.py +++ b/modules/unhandledexception.py @@ -1,23 +1,16 @@ import sys import traceback -import os -from modules.globalvars import RED, RESET, splashtext +from modules.globalvars import RED, RESET from modules.volta.main import _ def handle_exception(exc_type, exc_value, exc_traceback, *, context=None): - os.system('cls' if os.name == 'nt' else 'clear') - if issubclass(exc_type, KeyboardInterrupt): sys.__excepthook__(exc_type, exc_value, exc_traceback) return - - print(splashtext) print(f"{RED}=====BEGINNING OF TRACEBACK====={RESET}") traceback.print_exception(exc_type, exc_value, exc_traceback) print(f"{RED}========END OF TRACEBACK========{RESET}") print(f"{RED}{_('unhandled_exception')}{RESET}") - - if context: print(f"{RED}Context: {context}{RESET}") diff --git a/modules/version.py b/modules/version.py index a5c1132..8be107f 100644 --- a/modules/version.py +++ b/modules/version.py @@ -21,7 +21,6 @@ def is_remote_ahead(branch='main', remote='origin'): count = run_cmd(f'git rev-list --count HEAD..{remote}/{branch}') return int(count) > 0 -# Automatically update the local repository if the remote is ahead def auto_update(branch='main', remote='origin'): if launched == True: print(_("already_started")) @@ -70,7 +69,7 @@ def get_latest_version_info(): return None # Check if an update is available and perform update if needed -def check_for_update(): +def check_for_update(slient=False): global latest_version, local_version, launched latest_version_info = get_latest_version_info() @@ -90,16 +89,17 @@ def check_for_update(): logger.error(f"{RED}{_('cant_find_local_version')}{RESET}") return # Compare local and latest versions - if local_version < latest_version: - logger.info(f"{YELLOW}{_('new_version').format(latest_version=latest_version, local_version=local_version)}{RESET}") - logger.info(f"{YELLOW}{_('changelog').format(VERSION_URL=VERSION_URL)}{RESET}") - auto_update() - elif beta == True: - logger.warning(f"You are running an \"unstable\" version of Goober, do not expect it to work properly.\nVersion {local_version}\nServer: {latest_version}{RESET}") - elif local_version > latest_version: - logger.warning(f"{_('modification_warning')}") - elif local_version == latest_version: - logger.info(f"{_('latest_version')} {local_version}") - logger.info(f"{_('latest_version2').format(VERSION_URL=VERSION_URL)}\n\n") - launched = True + if slient != True: + if local_version < latest_version: + logger.info(f"{YELLOW}{_('new_version').format(latest_version=latest_version, local_version=local_version)}{RESET}") + logger.info(f"{YELLOW}{_('changelog').format(VERSION_URL=VERSION_URL)}{RESET}") + auto_update() + elif beta == True: + logger.warning(f"You are running an \"unstable\" version of Goober, do not expect it to work properly.\nVersion {local_version}\nServer: {latest_version}{RESET}") + elif local_version > latest_version: + logger.warning(f"{_('modification_warning')}") + elif local_version == latest_version: + logger.info(f"{_('latest_version')} {local_version}") + logger.info(f"{_('latest_version2').format(VERSION_URL=VERSION_URL)}\n\n") + launched = True return latest_version \ No newline at end of file diff --git a/modules/volta/main.py b/modules/volta/main.py index 4b62144..ec1edad 100644 --- a/modules/volta/main.py +++ b/modules/volta/main.py @@ -50,12 +50,50 @@ def find_dotenv(start_path: pathlib.Path) -> pathlib.Path | None: current = current.parent return None -env_path = find_dotenv(pathlib.Path(__file__).parent) -if env_path: - load_dotenv(dotenv_path=env_path) - print(f"[VOLTA] {GREEN}Loaded .env from {env_path}{RESET}") +def load_settings_json() -> dict | None: + start_path = working_dir.resolve() + current = start_path.resolve() + while current != current.parent: + candidate = current / "settings.json" + if candidate.exists(): + try: + with open(candidate, "r", encoding="utf-8") as f: + data = json.load(f) + print(f"[VOLTA] {GREEN}Loaded settings.json locale '{data.get('locale')}' from {candidate}{RESET}") + return data + except Exception as e: + print(f"[VOLTA] {RED}Failed to load settings.json at {candidate}: {e}{RESET}") + return None + current = current.parent + + for root, dirs, files in os.walk(start_path): + if "settings.json" in files: + candidate = pathlib.Path(root) / "settings.json" + try: + with open(candidate, "r", encoding="utf-8") as f: + data = json.load(f) + print(f"[VOLTA] {GREEN}Loaded settings.json locale '{data.get('locale')}' from {candidate}{RESET}") + return data + except Exception as e: + print(f"[VOLTA] {RED}Failed to load settings.json at {candidate}: {e}{RESET}") + return None + + print(f"[VOLTA] {YELLOW}No settings.json found scanning up or down from {start_path}{RESET}") + return None + +settings = load_settings_json() +if settings and "locale" in settings: + LOCALE = settings["locale"] else: - print(f"[VOLTA] {YELLOW}No .env file found from {__file__} upwards.{RESET}") + env_path = find_dotenv(pathlib.Path(__file__).parent) + if env_path: + load_dotenv(dotenv_path=env_path) + print(f"[VOLTA] {GREEN}Loaded .env from {env_path}{RESET}") + else: + print(f"[VOLTA] {YELLOW}No .env file found from {__file__} upwards.{RESET}") + + LOCALE = os.getenv("LOCALE") or None + locales_dirs.extend(find_locales_dirs(module_dir)) if working_dir != module_dir: diff --git a/settings/settingsexample.json b/settings/settingsexample.json new file mode 100644 index 0000000..dc89081 --- /dev/null +++ b/settings/settingsexample.json @@ -0,0 +1,26 @@ +{ + "bot": { + "prefix": "g.", + "owner_ids": [ + + ], + "blacklisted_users": [], + "user_training": true, + "allow_show_mem_command": true, + "react_to_messages": true, + "misc": { + "ping_line": "The Beretta fires fast and won't make you feel any better!", + "active_song": "Basket Case - Green Day", + "positive_gifs": [], + "block_profanity": false + }, + "active_memory": "memory.json", + "enabled_cogs": [ + ] + }, + "locale": "fi", + "name": "goober", + "auto_update": true, + "disable_checks": false, + "splash_text_loc": "settings/splash.txt" +} \ No newline at end of file diff --git a/settings/splashexample.txt b/settings/splashexample.txt new file mode 100644 index 0000000..cc6b951 --- /dev/null +++ b/settings/splashexample.txt @@ -0,0 +1,11 @@ + SS\ + SS | + SSSSSS\ SSSSSS\ SSSSSS\ SSSSSSS\ SSSSSS\ SSSSSS\ +SS __SS\ SS __SS\ SS __SS\ SS __SS\ SS __SS\ SS __SS\ +SS / SS |SS / SS |SS / SS |SS | SS |SSSSSSSS |SS | \__| +SS | SS |SS | SS |SS | SS |SS | SS |SS ____|SS | +\SSSSSSS |\SSSSSS |\SSSSSS |SSSSSSS |\SSSSSSS\ SS | + \____SS | \______/ \______/ \_______/ \_______|\__| +SS\ SS | +\SSSSSS | + \______/ \ No newline at end of file