Compare commits

..

No commits in common. "4e111b410d7b9edde6ceb8ae14b92dbbad026970" and "b01bd5b80b6c3e894c74eecf652476fa3c590dcd" have entirely different histories.

23 changed files with 494 additions and 872 deletions

4
.gitignore vendored
View file

@ -12,6 +12,4 @@ received_memory.json
translation_report.txt
translationcompleteness.py
modules/volta
log.txt
settings/settings.json
settings/splash.txt
log.txt

View file

@ -1,189 +0,0 @@
import os
import subprocess
from typing import Dict, List
import discord
from discord.ext import commands
import discord.ext
import discord.ext.commands
from modules.volta.main import _ , set_language
from modules.permission import requires_admin
from modules.sentenceprocessing import send_message
from modules.settings import instance as settings_manager
import requests
settings = settings_manager.settings
def get_git_origin_raw():
try:
result = subprocess.run(
["git", "config", "--get", "remote.origin.url"],
capture_output=True, text=True, check=True
)
return result.stdout.strip()
except Exception:
return "http://forgejo.expect.ovh/gooberinc/goober"
class BaseCommands(commands.Cog):
def __init__(self, bot):
self.bot: discord.ext.commands.Bot = bot
@commands.command()
async def help(self, ctx: commands.Context) -> None:
embed: discord.Embed = discord.Embed(
title=f"{_('command_help_embed_title')}",
description=f"{_('command_help_embed_desc')}",
color=discord.Colour(0x000000),
)
command_categories = {
f"{_('command_help_categories_general')}": [
"mem",
"talk",
"about",
"ping",
"impact",
"demotivator",
"help",
],
f"{_('command_help_categories_admin')}": ["stats", "retrain", "setlanguage"],
}
custom_commands: List[str] = []
for cog_name, cog in self.bot.cogs.items():
for command in cog.get_commands():
if (
command.name
not in command_categories[f"{_('command_help_categories_general')}"]
and command.name
not in command_categories[f"{_('command_help_categories_admin')}"]
):
custom_commands.append(command.name)
if custom_commands:
embed.add_field(
name=_('command_help_categories_custom'),
value="\n".join(
[
f"{settings['bot']['prefix']}{command}"
for command in custom_commands
]
),
inline=False,
)
for category, commands_list in command_categories.items():
commands_in_category: str = "\n".join(
[f"{settings['bot']['prefix']}{command}" for command in commands_list]
)
embed.add_field(name=category, value=commands_in_category, inline=False)
await send_message(ctx, embed=embed)
@requires_admin()
@commands.command()
async def setlanguage(self, ctx: commands.Context, locale: str) -> None:
await ctx.defer()
set_language(locale)
await ctx.send(":thumbsup:")
@commands.command()
async def ping(self, ctx: commands.Context) -> None:
await ctx.defer()
latency: int = round(self.bot.latency * 1000)
embed: discord.Embed = discord.Embed(
title="Pong!!",
description=(
settings["bot"]["misc"]["ping_line"],
f"`{_('command_ping_embed_desc')}: {latency}ms`\n",
),
color=discord.Colour(0x000000),
)
embed.set_footer(
text=f"{_('command_ping_footer')} {ctx.author.name}",
icon_url=ctx.author.display_avatar.url,
)
await ctx.send(embed=embed)
@commands.command()
async def about(self, ctx: commands.Context) -> None:
embed: discord.Embed = discord.Embed(
title=f"{_('command_about_embed_title')}",
description="",
color=discord.Colour(0x000000),
)
embed.add_field(
name=_('command_about_embed_field1'),
value=settings['name'],
inline=False,
)
embed.add_field(name="Git", value=get_git_origin_raw())
await send_message(ctx, embed=embed)
@commands.command()
async def stats(self, ctx: commands.Context) -> None:
memory_file: str = "memory.json"
file_size: int = os.path.getsize(memory_file)
with open(memory_file, "r") as file:
line_count: int = sum(1 for _ in file)
embed: discord.Embed = discord.Embed(
title=f"{_('command_stats_embed_title')}",
description=f"{_('command_stats_embed_desc')}",
color=discord.Colour(0x000000),
)
embed.add_field(
name=f"{_('command_stats_embed_field1name')}",
value=f"placeholder",
inline=False,
)
with open(settings["splash_text_loc"], "r") as f:
splash_text = "".join(f.readlines())
embed.add_field(
name=_('command_stats_embed_field3name'),
value=_('command_stats_embed_field3value').format(
NAME=settings["name"],
PREFIX=settings["bot"]["prefix"],
ownerid=settings["bot"]["owner_ids"][0],
PING_LINE=settings["bot"]["misc"]["ping_line"],
showmemenabled=settings["bot"]["allow_show_mem_command"],
USERTRAIN_ENABLED=settings["bot"]["user_training"],
song=settings["bot"]["misc"]["active_song"],
splashtext=splash_text
),
inline=False,
)
await send_message(ctx, embed=embed)
@commands.command()
async def mem(self, ctx: commands.Context) -> None:
if not settings["bot"]["allow_show_mem_command"]:
return
with open(settings["bot"]["active_memory"], "rb") as f:
data: bytes = f.read()
response = requests.post(
"https://litterbox.catbox.moe/resources/internals/api.php",
data={"reqtype": "fileupload", "time": "1h"},
files={"fileToUpload": data},
)
await send_message(ctx, response.text)
async def setup(bot: discord.ext.commands.Bot):
print("Setting up base_commands")
bot.remove_command("help")
await bot.add_cog(BaseCommands(bot))

View file

@ -1,119 +0,0 @@
import os
import random
import re
import discord
from discord.ext import commands
import discord.ext
import discord.ext.commands
from modules.markovmemory import save_markov_model, train_markov_model, load_markov_model
from modules.permission import requires_admin
from modules.sentenceprocessing import (
improve_sentence_coherence,
is_positive,
rephrase_for_coherence,
send_message,
)
from modules.volta.main import _
import logging
from typing import List, Optional, Set
import json
import time
import markovify
logger = logging.getLogger("goober")
from modules.settings import instance as settings_manager
settings = settings_manager.settings
class Markov(commands.Cog):
def __init__(self, bot):
self.bot: discord.ext.commands.Bot = bot
self.model: markovify.NewlineText
@requires_admin()
@commands.command()
async def retrain(self, ctx: discord.ext.commands.Context):
message_ref: discord.Message | None = await send_message(
ctx, f"{_('command_markov_retrain')}"
)
if message_ref is None:
logger.error("Failed to send message!")
return
try:
with open(settings["bot"]["active_memory"], "r") as f:
memory: List[str] = json.load(f)
except FileNotFoundError:
await send_message(ctx, f"{_('command_markov_memory_not_found')}")
return
except json.JSONDecodeError:
await send_message(ctx, f"{_('command_markov_memory_is_corrupt')}")
return
data_size: int = len(memory)
processing_message_ref: discord.Message | None = await send_message(
ctx, f"{(_('command_markov_retraining').format(data_size=data_size))}"
)
if processing_message_ref is None:
logger.error("Couldnt find message processing message!")
start_time: float = time.time()
model = train_markov_model(memory)
if not model:
logger.error("Failed to train markov model")
await ctx.send("Failed to retrain!")
return False
self.model = model
save_markov_model(self.model)
logger.debug(f"Completed retraining in {round(time.time() - start_time,3)}s")
await send_message(
ctx,
_('command_markov_retraining').format(data_size=data_size),
edit=True,
message_reference=processing_message_ref,
)
@commands.command()
async def talk(self, ctx: commands.Context, sentence_size: int = 5) -> None:
if not self.model:
await send_message(ctx, f"{_('command_markovcommand_talk_insufficent_text')}")
return
response: str = ""
if sentence_size == 1:
response = (
self.model.make_short_sentence(max_chars=100, tries=100)
or _('command_markovcommand_talk_generation_fail')
)
else:
response = improve_sentence_coherence(
self.model.make_sentence(tries=100, max_words=sentence_size)
or _('command_talk_generation_fail')
)
cleaned_response: str = re.sub(r"[^\w\s]", "", response).lower()
coherent_response: str = rephrase_for_coherence(cleaned_response)
if random.random() < 0.9 and is_positive(coherent_response):
gif_url: str = random.choice(settings["bot"]["misc"]["positive_gifs"])
coherent_response = f"{coherent_response}\n[jif]({gif_url})"
os.environ["gooberlatestgen"] = coherent_response
await send_message(ctx, coherent_response)
async def setup(bot):
await bot.add_cog(Markov(bot))

View file

@ -1,118 +0,0 @@
import discord
from discord.ext import commands
from modules.permission import requires_admin
from modules.settings import instance as settings_manager
settings = settings_manager.settings
class PermissionManager(commands.Cog):
def __init__(self, bot):
self.bot = bot
@requires_admin()
@commands.command()
async def add_owner(self, ctx: commands.Context, member: discord.Member):
settings["bot"]["owner_ids"].append(member.id)
settings_manager.add_admin_log_event(
{
"action": "add",
"author": ctx.author.id,
"change": "owner_ids",
"messageId": ctx.message.id,
"target": member.id,
}
)
settings_manager.commit()
embed = discord.Embed(
title="Permissions",
description=f"Set {member.name} as an owner",
color=discord.Color.blue(),
)
await ctx.send(embed=embed)
@requires_admin()
@commands.command()
async def remove_owner(self, ctx: commands.Context, member: discord.Member):
try:
settings["bot"]["owner_ids"].remove(member.id)
settings_manager.add_admin_log_event(
{
"action": "del",
"author": ctx.author.id,
"change": "owner_ids",
"messageId": ctx.message.id,
"target": member.id,
}
)
settings_manager.commit()
except ValueError:
await ctx.send("User is not an owner!")
return
embed = discord.Embed(
title="Permissions",
description=f"Removed {member.name} from being an owner",
color=discord.Color.blue(),
)
await ctx.send(embed=embed)
@requires_admin()
@commands.command()
async def blacklist_user(self, ctx: commands.Context, member: discord.Member):
settings["bot"]["blacklisted_users"].append(member.id)
settings_manager.add_admin_log_event(
{
"action": "add",
"author": ctx.author.id,
"change": "blacklisted_users",
"messageId": ctx.message.id,
"target": member.id,
}
)
settings_manager.commit()
embed = discord.Embed(
title="Blacklist",
description=f"Added {member.name} to the blacklist",
color=discord.Color.blue(),
)
await ctx.send(embed=embed)
@requires_admin()
@commands.command()
async def unblacklist_user(self, ctx: commands.Context, member: discord.Member):
try:
settings["bot"]["blacklisted_users"].remove(member.id)
settings_manager.add_admin_log_event(
{
"action": "del",
"author": ctx.author.id,
"change": "blacklisted_users",
"messageId": ctx.message.id,
"target": member.id,
}
)
settings_manager.commit()
except ValueError:
await ctx.send("User is not on the blacklist!")
return
embed = discord.Embed(
title="Blacklist",
description=f"Removed {member.name} from blacklist",
color=discord.Color.blue(),
)
await ctx.send(embed=embed)
async def setup(bot):
await bot.add_cog(PermissionManager(bot))

BIN
assets/fonts/Impact.ttf Normal file

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

BIN
assets/images/bibinos.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 295 KiB

BIN
assets/images/crash.webp Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 155 KiB

BIN
assets/images/crash2.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 746 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 355 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 275 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.7 MiB

View file

@ -1 +1,29 @@
DISCORDBOTTOKEN=
DISCORDBOTTOKEN=
BOTPREFIX="g."
PINGLINE="The Beretta fires fast and won't make you feel any better!"
BLACKLISTEDUSERS=
OWNERID=
USERTRAINENABLED="true"
SHOWMEMENABLED="true"
LOCALE=fi
NAME=goober
AUTOUPDATE="True"
SONG="Basket Case - Green Day"
CHECKSDISABLED="Frue"
REACT="True"
STATUS="idle"
POSITIVEGIFS="https://media.discordapp.net/attachments/821047460151427135/1181371808566493184/jjpQGeno.gif, https://tenor.com/view/chill-guy-my-new-character-gif-2777893510283028272,https://tenor.com/view/goodnight-goodnight-friends-weezer-weezer-goodnight-gif-7322052181075806988"
SPLASHTEXT="
SS\
SS |
SSSSSS\ SSSSSS\ SSSSSS\ SSSSSSS\ SSSSSS\ SSSSSS\
SS __SS\ SS __SS\ SS __SS\ SS __SS\ SS __SS\ SS __SS\
SS / SS |SS / SS |SS / SS |SS | SS |SSSSSSSS |SS | \__|
SS | SS |SS | SS |SS | SS |SS | SS |SS ____|SS |
\SSSSSSS |\SSSSSS |\SSSSSS |SSSSSSS |\SSSSSSS\ SS |
\____SS | \______/ \______/ \_______/ \_______|\__|
SS\ SS |
\SSSSSS |
\______/
"

481
main.py
View file

@ -7,30 +7,16 @@ import traceback
import subprocess
import tempfile
import shutil
import psutil
import asyncio
import platform
import sys
from typing import (
List,
Dict,
Literal,
Set,
Optional,
Tuple,
Any,
TypedDict,
Union,
Callable,
Coroutine,
TypeVar,
Type,
)
from typing import List, Dict, Set, Optional, Tuple, Any, Union, Callable, Coroutine, TypeVar, Type
import logging
from modules.globalvars import *
from modules.prestartchecks import start_checks
from modules.logger import GooberFormatter
from modules.volta.main import *
import logging
from modules.settings import Settings as SettingsManager
from modules.permission import requires_admin
from modules.volta.main import _
logger = logging.getLogger("goober")
logger.setLevel(logging.DEBUG)
@ -46,64 +32,38 @@ file_handler.setFormatter(GooberFormatter(colors=False))
logger.addHandler(console_handler)
logger.addHandler(file_handler)
settings_manager = SettingsManager()
settings = settings_manager.settings
splash_text: str = ""
with open(settings["splash_text_loc"], "r", encoding="UTF-8") as f:
splash_text = "".join(f.readlines())
print(splash_text)
# Print splash text and check for updates
print(splashtext) # Print splash text (from modules/globalvars.py)
start_checks()
import requests
import discord
from discord.ext import commands
from discord import app_commands
from discord import Colour, Message
from better_profanity import profanity
from discord import Colour, Embed, File, Interaction, Message
from discord.abc import Messageable
from discord.ext import commands
from modules.volta.main import _, set_language
from modules.markovmemory import *
from modules.version import *
from modules.sentenceprocessing import *
from modules.unhandledexception import handle_exception
from modules.image import gen_meme
sys.excepthook = handle_exception
class MessageMetadata(TypedDict):
user_id: str
user_name: str
guild_id: str | Literal["DM"]
guild_name: str | Literal["DM"]
channel_id: str
channel_name: str
message: str
timestamp: float
check_for_update()
T = TypeVar('T')
MessageContext = Union[commands.Context, discord.Interaction]
MessageReference = Union[Message, discord.WebhookMessage]
# Constants with type hints
positive_gifs: List[str] = settings["bot"]["misc"]["positive_gifs"]
positive_gifs: List[str] = os.getenv("POSITIVE_GIFS", "").split(',')
currenthash: str = ""
launched: bool = False
slash_commands_enabled: bool = False
# Set up Discord bot intents and create bot instance
intents: discord.Intents = discord.Intents.default()
intents.messages = True
intents.message_content = True
bot: commands.Bot = commands.Bot(
command_prefix=settings["bot"]["prefix"],
intents=intents,
allowed_mentions=discord.AllowedMentions(
everyone=False, roles=False, users=False, replied_user=True
),
)
# Load memory and Markov model for text generation
memory: List[str | Dict[Literal["_meta"], MessageMetadata]] = load_memory()
memory: List[str] = load_memory()
markov_model: Optional[markovify.Text] = load_markov_model()
if not markov_model:
logger.error(_('markov_model_not_found'))
@ -113,65 +73,201 @@ if not markov_model:
generated_sentences: Set[str] = set()
used_words: Set[str] = set()
def get_git_remote_url():
try:
url = subprocess.check_output(
["git", "config", "--get", "remote.origin.url"],
text=True,
stderr=subprocess.DEVNULL,
).strip()
return url
except subprocess.CalledProcessError:
return "Unknown"
async def load_cogs_from_folder(bot: commands.Bot, folder_name="assets/cogs"):
for filename in [file for file in os.listdir(folder_name) if file.endswith(".py")]:
cog_name: str = filename[:-3]
if (
"internal" not in folder_name
and cog_name not in settings["bot"]["enabled_cogs"]
):
logger.debug(f"Skipping cog {cog_name} (not in enabled cogs)")
continue
module_path = folder_name.replace("/", ".").replace("\\", ".") + f".{cog_name}"
try:
await bot.load_extension(module_path)
logger.info(f"{_('loaded_cog')} {cog_name}")
except Exception as e:
logger.error(f"{_('cog_fail')} {cog_name} {e}")
traceback.print_exc()
async def load_cogs_from_folder(bot, folder_name="assets/cogs"):
for filename in os.listdir(folder_name):
if filename.endswith(".py") and not filename.startswith("_"):
cog_name = filename[:-3]
module_path = folder_name.replace("/", ".").replace("\\", ".") + f".{cog_name}"
try:
await bot.load_extension(module_path)
logger.info(f"{(_('loaded_cog'))} {cog_name}")
except Exception as e:
logger.error(f"{(_('cog_fail'))} {cog_name} {e}")
traceback.print_exc()
# Event: Called when the bot is ready
@bot.event
async def on_ready() -> None:
global launched
global slash_commands_enabled
global NAME
global status
folder_name: str = "cogs"
if launched:
return
await load_cogs_from_folder(bot)
await load_cogs_from_folder(bot, "assets/cogs/internal")
try:
synced: List[discord.app_commands.AppCommand] = await bot.tree.sync()
logger.info(f"{_('synced_commands')} {len(synced)} {_('synced_commands2')}")
logger.info(_('started'))
logger.info(f"{_('synced_commands')} {len(synced)} {(_('synced_commands2'))}")
slash_commands_enabled = True
logger.info(f"{(_('started')).format(name=NAME)}")
except discord.errors.Forbidden as perm_error:
logger.error(f"Permission error while syncing commands: {perm_error}")
logger.error(
"Make sure the bot has the 'applications.commands' scope and is invited with the correct permissions."
)
logger.error(f"Permission error while syncing commands: {perm_error}")
logger.error("Make sure the bot has the 'applications.commands' scope and is invited with the correct permissions.")
quit()
except Exception as e:
logger.error(f"{_('fail_commands_sync')} {e}")
traceback.print_exc()
quit()
if not settings["bot"]["misc"]["active_song"]:
if not song:
return
await bot.change_presence(
activity=discord.Activity(
type=discord.ActivityType.listening,
name=settings["bot"]["misc"]["active_song"],
)
)
status = {
"idle": discord.Status.idle,
"dnd": discord.Status.dnd,
"invisible": discord.Status.invisible,
"online": discord.Status.online
}.get(status.lower(), discord.Status.online)
await bot.change_presence(status=status, activity=discord.Activity(type=discord.ActivityType.listening, name=f"{song}"))
launched = True
@bot.event
async def on_command_error(ctx: commands.Context, error: commands.CommandError) -> None:
from modules.unhandledexception import handle_exception
if isinstance(error, commands.CommandInvokeError):
original: Exception = error.original
handle_exception(
type(original), original, original.__traceback__,
context=f"Command: {ctx.command} | User: {ctx.author}"
)
else:
handle_exception(
type(error), error, error.__traceback__,
context=f"Command: {ctx.command} | User: {ctx.author}"
)
@bot.hybrid_command(description=f"{(_('command_desc_retrain'))}")
async def retrain(ctx: commands.Context) -> None:
if ctx.author.id != ownerid:
return
global markov_model
message_ref: MessageReference = await send_message(ctx, f"{(_('command_markov_retrain'))}")
try:
with open(MEMORY_FILE, 'r') as f:
memory: List[str] = json.load(f)
except FileNotFoundError:
await send_message(ctx, f"{(_('command_markov_memory_not_found'))}")
return
except json.JSONDecodeError:
await send_message(ctx, f"{(_('command_markov_memory_is_corrupt'))}")
return
data_size: int = len(memory)
processed_data: int = 0
processing_message_ref: MessageReference = await send_message(ctx, f"{(_('command_markov_retraining')).format(processed_data=processed_data, data_size=data_size)}")
markov_model = train_markov_model(memory)
save_markov_model(markov_model)
await send_message(ctx, f"{_('command_markov_retrain_successful').format(data_size=data_size)}", edit=True, message_reference=processing_message_ref)
@bot.hybrid_command(description=f"{(_('command_desc_talk'))}")
async def talk(ctx: commands.Context, sentence_size: int = 5) -> None:
if not markov_model:
await send_message(ctx, f"{(_('command_talk_insufficent_text'))}")
return
response = None
for _ in range(20):
if sentence_size == 1:
sentence = markov_model.make_short_sentence(max_chars=100, tries=100)
response = sentence.split()[0] if sentence else None
else:
response = markov_model.make_sentence(tries=100, max_words=sentence_size)
if response and response not in generated_sentences:
if sentence_size > 1:
response = improve_sentence_coherence(response)
generated_sentences.add(response)
break
else:
await send_message(ctx, f"{(_('command_talk_generation_fail'))}")
return
cleaned = re.sub(r'[^\w\s]', '', response).lower()
coherent = rephrase_for_coherence(cleaned)
if random.random() < 0.9 and is_positive(coherent):
gif_url = random.choice(positive_gifs)
message = f"{coherent}\n[jif]({gif_url})"
else:
message = coherent
logger.info(message)
os.environ['gooberlatestgen'] = message
await send_message(ctx, message)
@bot.hybrid_command(description=f"RAM")
async def ramusage(ctx):
process = psutil.Process(os.getpid())
mem = process.memory_info().rss
await send_message(ctx, f"{mem / 1024 / 1024:.2f} MB")
@bot.hybrid_command(description=f"{(_('command_desc_help'))}")
async def impact(ctx: commands.Context, text: Optional[str] = None) -> None:
assets_folder = "assets/images"
def get_random_asset_image() -> Optional[str]:
images = [f for f in os.listdir(assets_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))]
if not images:
return None
return os.path.join(assets_folder, random.choice(images))
temp_input = None
input_path = None
# Determine input image path
if ctx.message.attachments:
attachment = ctx.message.attachments[0]
if attachment.content_type and attachment.content_type.startswith("image/"):
ext = os.path.splitext(attachment.filename)[1]
temp_input = f"tempy{ext}"
await attachment.save(temp_input)
input_path = temp_input
else:
fallback_image = get_random_asset_image()
if fallback_image is None:
await ctx.reply(_('no_image_available'))
return
temp_input = tempfile.mktemp(suffix=os.path.splitext(fallback_image)[1])
shutil.copy(fallback_image, temp_input)
input_path = temp_input
else:
fallback_image = get_random_asset_image()
if fallback_image is None:
await ctx.reply(_('no_image_available'))
return
temp_input = tempfile.mktemp(suffix=os.path.splitext(fallback_image)[1])
shutil.copy(fallback_image, temp_input)
input_path = temp_input
# Generate meme image with one-shot text generation
output_path = await gen_meme(input_path, custom_text=text)
if output_path is None or not os.path.isfile(output_path):
if temp_input and os.path.exists(temp_input):
os.remove(temp_input)
await ctx.reply(_('failed_generate_image'))
return
await ctx.send(file=discord.File(output_path))
if temp_input and os.path.exists(temp_input):
os.remove(temp_input)
bot.remove_command('help')
# Command: Show help information
@ -203,128 +299,122 @@ async def help(ctx: commands.Context) -> None:
await send_message(ctx, embed=embed)
@bot.event
async def on_command_error(ctx: commands.Context, error: commands.CommandError) -> None:
from modules.unhandledexception import handle_exception
@bot.hybrid_command(description=f"{(_('command_desc_setlang'))}")
@app_commands.describe(locale="Choose your language")
async def setlanguage(ctx: commands.Context, locale: str) -> None:
if ctx.author.id != ownerid:
await ctx.send(":thumbsdown:")
return
await ctx.defer()
set_language(locale)
await ctx.send(":thumbsup:")
if isinstance(error, commands.CommandInvokeError):
original: Exception = error.original
handle_exception(
type(original),
original,
original.__traceback__,
context=f"Command: {ctx.command} | User: {ctx.author}",
)
else:
handle_exception(
type(error),
error,
error.__traceback__,
context=f"Command: {ctx.command} | User: {ctx.author}",
)
# Event: Called on every message
@bot.event
async def on_message(message: discord.Message) -> None:
global memory, markov_model
EMOJIS = [
"\U0001f604",
"\U0001f44d",
"\U0001f525",
"\U0001f4af",
"\U0001f389",
"\U0001f60e",
] # originally was emojis but it would probably shit itself on systems without unicode so....
if message.author.bot:
return
if str(message.author.id) in settings["bot"]["blacklisted_users"]:
if str(message.author.id) in BLACKLISTED_USERS:
return
commands = [
settings["bot"]["prefix"] + command.name for command in bot.tree.get_commands()
]
if message.content.startswith(tuple(commands)):
if message.content.startswith((f"{PREFIX}talk", f"{PREFIX}mem", f"{PREFIX}help", f"{PREFIX}stats", f"{PREFIX}")):
logger.info(f"{(_('command_ran')).format(message=message)}")
await bot.process_commands(message)
return
if (
profanity.contains_profanity(message.content)
and settings["bot"]["misc"]["block_profanity"]
):
return
if message.content:
if not settings["bot"]["user_training"]:
if not USERTRAIN_ENABLED:
return
formatted_message: str = message.content
cleaned_message: str = preprocess_message(formatted_message)
if cleaned_message:
memory.append(cleaned_message)
message_metadata: MessageMetadata = {
"user_id": str(message.author.id),
"user_name": str(message.author),
"guild_id": str(message.guild.id) if message.guild else "DM",
"guild_name": str(message.guild.name) if message.guild else "DM",
"channel_id": str(message.channel.id),
"channel_name": str(message.channel),
"message": message.content,
"timestamp": time.time(),
}
try:
if isinstance(memory, list):
memory.append({"_meta": message_metadata})
else:
logger.warning("Memory is not a list; can't append metadata")
except Exception as e:
logger.warning(f"Failed to append metadata to memory: {e}")
save_memory(memory)
sentiment_score = is_positive(
message.content
) # doesnt work but im scared to change the logic now please ignore
if sentiment_score > 0.8:
if not settings["bot"]["react_to_messages"]:
return
emoji = random.choice(EMOJIS)
try:
await message.add_reaction(emoji)
except Exception as e:
logger.info(f"Failed to react with emoji: {e}")
cleaned_message: str = formatted_message
save_memory(memory)
await bot.process_commands(message)
# Event: Called on every interaction (slash command, etc.)
@bot.event
async def on_interaction(interaction: discord.Interaction) -> None:
name = None
if interaction.data.get('name') is None:
name = "Unknown"
else:
name = interaction.data['name']
logger.info(f"{(_('command_ran_s')).format(interaction=interaction)}{name}")
# Global check: Block blacklisted users from running commands
@bot.check
async def block_blacklisted(ctx: commands.Context) -> bool:
if ctx.author.id not in settings["bot"]["blacklisted_users"]:
return True
try:
if isinstance(ctx, discord.Interaction):
if not ctx.response.is_done():
await ctx.response.send_message(_('blacklisted'), ephemeral=True)
if str(ctx.author.id) in BLACKLISTED_USERS:
try:
if isinstance(ctx, discord.Interaction):
if not ctx.response.is_done():
await ctx.response.send_message(_('blacklisted'), ephemeral=True)
else:
await ctx.followup.send(_('blacklisted'), ephemeral=True)
else:
await ctx.followup.send(_('blacklisted'), ephemeral=True)
else:
await ctx.send(_('blacklisted_user'), ephemeral=True)
except:
await ctx.send(_('blacklisted_user'), ephemeral=True)
except:
pass
return False
return True
@bot.hybrid_command(description=f"{(_('command_desc_ping'))}")
async def ping(ctx: commands.Context) -> None:
await ctx.defer()
latency: int = round(bot.latency * 1000)
LOLembed: discord.Embed = discord.Embed(
title="Pong!!",
description=(
f"{PING_LINE}\n"
f"`{(_('command_ping_embed_desc'))}: {latency}ms`\n"
),
color=Colour(0x000000)
)
LOLembed.set_footer(text=f"{(_('command_ping_footer'))} {ctx.author.name}", icon_url=ctx.author.avatar.url)
await ctx.send(embed=LOLembed)
# Command: Show about information
@bot.hybrid_command(description=f"{(_('command_about_desc'))}")
async def about(ctx: commands.Context) -> None:
print("-----------------------------------\n\n")
latest_version: str = check_for_update()
print("-----------------------------------")
embed: discord.Embed = discord.Embed(title=f"{(_('command_about_embed_title'))}", description="", color=Colour(0x000000))
embed.add_field(name=f"{(_('command_about_embed_field1'))}", value=f"{NAME}", inline=False)
embed.add_field(name=f"{(_('command_about_embed_field2name'))}", value=f"{(_('command_about_embed_field2value')).format(local_version=local_version, latest_version=latest_version)}", inline=False)
embed.add_field(name=f"Git", value=get_git_remote_url())
embed.add_field(name=f"OS", value=platform.platform())
await send_message(ctx, embed=embed)
# Command: Show bot statistics (admin only)
@bot.hybrid_command(description="stats")
async def stats(ctx: commands.Context) -> None:
if ctx.author.id != ownerid:
return
print("-----------------------------------\n\n")
latest_version: str = check_for_update()
print("-----------------------------------")
memory_file: str = 'memory.json'
file_size: int = os.path.getsize(memory_file)
with open(memory_file, 'r') as file:
line_count: int = sum(1 for _ in file)
embed: discord.Embed = discord.Embed(title=f"{(_('command_stats_embed_title'))}", description=f"{(_('command_stats_embed_desc'))}", color=Colour(0x000000))
embed.add_field(name=f"{(_('command_stats_embed_field1name'))}", value=f"{(_('command_stats_embed_field1value')).format(file_size=file_size, line_count=line_count)}", inline=False)
embed.add_field(name=f"{(_('command_stats_embed_field2name'))}", value=f"{(_('command_stats_embed_field2value')).format(local_version=local_version, latest_version=latest_version)}", inline=False)
embed.add_field(name=f"{(_('command_stats_embed_field3name'))}", value=f"{(_('command_stats_embed_field3value')).format(NAME=NAME, PREFIX=PREFIX, ownerid=ownerid, PING_LINE=PING_LINE, showmemenabled=showmemenabled, USERTRAIN_ENABLED=USERTRAIN_ENABLED, song=song, splashtext=splashtext)}", inline=False)
embed.add_field(name=f"OS", value=platform.platform())
embed.add_field(name="Python Version", value=platform.python_version())
await send_message(ctx, embed=embed)
# Command: Upload memory.json to litterbox.catbox.moe and return the link
@bot.hybrid_command()
async def mem(ctx: commands.Context) -> None:
if showmemenabled != "true":
return
command: str = """curl -F "reqtype=fileupload" -F "time=1h" -F "fileToUpload=@memory.json" https://litterbox.catbox.moe/resources/internals/api.php"""
memorylitter: subprocess.CompletedProcess = subprocess.run(command, shell=True, capture_output=True, text=True)
logger.debug(memorylitter)
await send_message(ctx, memorylitter.stdout.strip())
# Helper: Improve sentence coherence (simple capitalization fix)
def improve_sentence_coherence(sentence: str) -> str:
@ -333,5 +423,4 @@ def improve_sentence_coherence(sentence: str) -> str:
return sentence
# Start the bot
if __name__ == "__main__":
bot.run(os.environ.get("DISCORDBOTTOKEN", ""))
bot.run(TOKEN)

View file

@ -1,19 +1,26 @@
import os
import platform
import pathlib
import subprocess
from dotenv import load_dotenv
import pathlib
import discord
from discord import Colour, Embed, File, Interaction, Message
from discord.ext import commands
from discord import app_commands
from discord import Colour, Embed, File, Interaction, Message
from discord.abc import Messageable
from discord.ext import commands
import subprocess
def get_git_branch():
try:
branch = subprocess.check_output(
["git", "rev-parse", "--abbrev-ref", "HEAD"],
stderr=subprocess.DEVNULL
).decode('utf-8').strip()
return branch
except subprocess.CalledProcessError:
return None
env_path = pathlib.Path(__file__).parent.parent / '.env'
load_dotenv(dotenv_path=env_path)
# ANSI colors
ANSI = "\033["
RED = f"{ANSI}31m"
GREEN = f"{ANSI}32m"
@ -21,30 +28,10 @@ YELLOW = f"{ANSI}33m"
PURPLE = f"{ANSI}35m"
DEBUG = f"{ANSI}1;30m"
RESET = f"{ANSI}0m"
VERSION_URL = "https://raw.githubusercontent.com/gooberinc/version/main"
UPDATE_URL = f"{VERSION_URL}/latest_version.json"
UPDATE_URL = VERSION_URL+"/latest_version.json"
print(UPDATE_URL)
LOCAL_VERSION_FILE = "current_version.txt"
MEMORY_FILE = "memory.json"
MEMORY_LOADED_FILE = "MEMORY_LOADED" # used in markov module
local_version = "3.0.0"
latest_version = "0.0.0"
os.environ['gooberlocal_version'] = local_version
def get_git_branch() -> str | None:
try:
return subprocess.check_output(
["git", "rev-parse", "--abbrev-ref", "HEAD"],
stderr=subprocess.DEVNULL
).decode().strip()
except subprocess.CalledProcessError:
return None
branch = get_git_branch()
beta = branch != "main" if branch else True
LOCAL_VERSION_FILE = "current_version.txt"
TOKEN = os.getenv("DISCORDBOTTOKEN", "0")
PREFIX = os.getenv("BOTPREFIX", "g.")
PING_LINE = os.getenv("PINGLINE")
@ -58,23 +45,27 @@ showmemenabled = os.getenv("SHOWMEMENABLED")
BLACKLISTED_USERS = os.getenv("BLACKLISTEDUSERS", "").split(",")
USERTRAIN_ENABLED = os.getenv("USERTRAINENABLED", "true").lower() == "true"
NAME = os.getenv("NAME")
MEMORY_FILE = "memory.json"
MEMORY_LOADED_FILE = "MEMORY_LOADED" # is this still even used?? okay just checked its used in the markov module
ALIVEPING = os.getenv("ALIVEPING")
AUTOUPDATE = os.getenv("AUTOUPDATE")
song = os.getenv("SONG")
launched = False
latest_version = "0.0.0"
local_version = "3.0.0"
os.environ['gooberlocal_version'] = local_version
REACT = os.getenv("REACT")
if get_git_branch() != "main":
beta = True
# this makes goober think its a beta version, so it will not update to the latest stable version or run any version checks
else:
beta = False
intents = discord.Intents.default()
# Set up Discord bot intents and create bot instance
intents: discord.Intents = discord.Intents.default()
intents.messages = True
intents.presences = True
intents.members = True
intents.message_content = True
bot = commands.Bot(
command_prefix=PREFIX,
intents=intents,
allowed_mentions=discord.AllowedMentions(
everyone=False, roles=False, users=False, replied_user=True
)
)
launched = False
bot: commands.Bot = commands.Bot(command_prefix=PREFIX, intents=intents, allowed_mentions=discord.AllowedMentions(everyone=False, roles=False, users=False, replied_user=True))

111
modules/image.py Normal file
View file

@ -0,0 +1,111 @@
import os
import re
import random
import shutil
import tempfile
from typing import Optional, List
from PIL import Image, ImageDraw, ImageFont, ImageOps
from modules.markovmemory import load_markov_model
from modules.sentenceprocessing import improve_sentence_coherence, rephrase_for_coherence
generated_sentences = set()
def load_font(size):
return ImageFont.truetype("assets/fonts/Impact.ttf", size=size)
def draw_text_with_outline(draw, text, x, y, font):
outline_offsets = [(-2, -2), (-2, 2), (2, -2), (2, 2), (0, -2), (0, 2), (-2, 0), (2, 0)]
for ox, oy in outline_offsets:
draw.text((x + ox, y + oy), text, font=font, fill="black")
draw.text((x, y), text, font=font, fill="white")
def fits_in_width(text, font, max_width, draw):
bbox = draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
return text_width <= max_width
def split_text_to_fit(text, font, max_width, draw):
words = text.split()
for i in range(len(words), 0, -1):
top_text = " ".join(words[:i])
bottom_text = " ".join(words[i:])
if fits_in_width(top_text, font, max_width, draw) and fits_in_width(bottom_text, font, max_width, draw):
return top_text, bottom_text
midpoint = len(words) // 2
return " ".join(words[:midpoint]), " ".join(words[midpoint:])
async def gen_meme(input_image_path, sentence_size=5, max_attempts=10, custom_text=None):
markov_model = load_markov_model()
if not markov_model or not os.path.isfile(input_image_path):
return None
def generate_text():
if custom_text:
return custom_text
if sentence_size == 1:
candidate = markov_model.make_short_sentence(max_chars=100, tries=100)
if candidate:
candidate = candidate.split()[0]
return candidate
else:
candidate = markov_model.make_sentence(tries=100, max_words=sentence_size)
if candidate:
return improve_sentence_coherence(candidate)
print(candidate)
return None
def draw_centered_text(img, text):
draw = ImageDraw.Draw(img)
width, height = img.size
font_size = int(height / 10)
font = load_font(font_size)
cleaned = re.sub(r'[^\w\s]', '', text).lower()
coherent = rephrase_for_coherence(cleaned).upper()
bbox = draw.textbbox((0, 0), coherent, font=font)
text_width, text_height_px = bbox[2] - bbox[0], bbox[3] - bbox[1]
max_text_height = height // 4
if text_width <= width and text_height_px <= max_text_height:
draw_text_with_outline(draw, coherent, (width - text_width) / 2, 0, font)
img.save(input_image_path)
return True
top_text, bottom_text = split_text_to_fit(coherent, font, width, draw)
top_bbox = draw.textbbox((0, 0), top_text, font=font)
bottom_bbox = draw.textbbox((0, 0), bottom_text, font=font)
top_height = top_bbox[3] - top_bbox[1]
bottom_height = bottom_bbox[3] - bottom_bbox[1]
if top_height <= max_text_height and bottom_height <= max_text_height:
draw_text_with_outline(draw, top_text, (width - (top_bbox[2] - top_bbox[0])) / 2, 0, font)
y_bottom = height - bottom_height - int(height * 0.04)
draw_text_with_outline(draw, bottom_text, (width - (bottom_bbox[2] - bottom_bbox[0])) / 2, y_bottom, font)
img.save(input_image_path)
return True
return False
attempt = 0
while attempt < max_attempts:
response = generate_text() or "NO TEXT GENERATED"
with Image.open(input_image_path).convert("RGBA") as img:
if draw_centered_text(img, response):
return input_image_path
attempt += 1
with Image.open(input_image_path).convert("RGBA") as img:
draw = ImageDraw.Draw(img)
width, height = img.size
font_size = int(height / 10)
font = load_font(font_size)
truncated = (rephrase_for_coherence(re.sub(r'[^\w\s]', '', "NO TEXT GENERATED").lower()).upper())[:100]
bbox = draw.textbbox((0, 0), truncated, font=font)
text_width = bbox[2] - bbox[0]
draw_text_with_outline(draw, truncated, (width - text_width) / 2, 0, font)
img.save(input_image_path)
return input_image_path

View file

@ -1,37 +0,0 @@
from functools import wraps
import discord
import discord.ext
import discord.ext.commands
from modules.settings import Settings as SettingsManager
import logging
logger = logging.getLogger("goober")
settings_manager = SettingsManager()
settings = settings_manager.settings
class PermissionError(Exception):
pass
def requires_admin():
async def wrapper(ctx: discord.ext.commands.Context):
if ctx.author.id not in settings["bot"]["owner_ids"]:
await ctx.send(
"You don't have the necessary permissions to run this command!"
)
return False
command = ctx.command
if not command:
logger.info(f"Unknown command ran {ctx.message}")
else:
logger.info(
f'Command {settings["bot"]["prefix"]}{command.name} @{ctx.author.name}'
)
return True
return discord.ext.commands.check(wrapper)

View file

@ -37,27 +37,29 @@ def is_positive(sentence):
return sentiment_score > 0.6 # had to raise the bar because it kept saying "death to jews" was fine and it kept reacting to them
async def send_message(ctx, message=None, embed=None, file=None, edit=False, message_reference=None):
try:
if edit and message_reference:
if edit and message_reference:
try:
await message_reference.edit(content=message, embed=embed)
return message_reference
send_kwargs = {}
if message:
send_kwargs['content'] = message
if embed:
send_kwargs['embed'] = embed
if file:
send_kwargs['file'] = file
except Exception as e:
await ctx.send(f"{RED}{(_('edit_fail'))} {e}{RESET}")
else:
if hasattr(ctx, "respond"):
return await ctx.respond(**send_kwargs, ephemeral=False)
sent_message = None
if embed:
sent_message = await ctx.respond(embed=embed, ephemeral=False)
elif message:
sent_message = await ctx.respond(message, ephemeral=False)
if file:
sent_message = await ctx.respond(file=file, ephemeral=False)
else:
return await ctx.send(**send_kwargs)
except Exception as e:
await ctx.send(f"{RED}{(_('edit_fail'))} {e}{RESET}")
sent_message = None
if embed:
sent_message = await ctx.send(embed=embed)
elif message:
sent_message = await ctx.send(message)
if file:
sent_message = await ctx.send(file=file)
return sent_message
def preprocess_message(message):
message = message

View file

@ -1,58 +0,0 @@
import json
import os
from typing import List, TypedDict
import copy
class MiscBotOptions(TypedDict):
ping_line: str
active_song: str
positive_gifs: List[str]
block_profanity: bool
class BotSettings(TypedDict):
prefix: str
owner_ids: List[int]
blacklisted_users: List[int]
user_training: bool
allow_show_mem_command: bool
react_to_messages: bool
misc: MiscBotOptions
enabled_cogs: List[str]
active_memory: str
class SettingsType(TypedDict):
bot: BotSettings
locale: str
name: str
auto_update: bool
disable_checks: bool
splash_text_loc: str
class Settings:
def __init__(self) -> None:
self.path = os.path.join(".", "settings", "settings.json")
if not os.path.exists(self.path):
raise FileNotFoundError("settings.json file does not exist!")
with open(self.path, "r") as f:
self._kv_store = json.load(f)
self.settings: SettingsType = self._kv_store # type: ignore
self.original_settings = copy.deepcopy(self.settings)
def get_locale(self) -> str:
# Return locale or None if missing
return self.settings.get("locale", None)
def commit(self) -> None:
with open(self.path, "w") as f:
json.dump(self.settings, f, indent=4)
self.original_settings = copy.deepcopy(self.settings)
def discard(self) -> None:
self.settings = copy.deepcopy(self.original_settings)
# Usage
instance = Settings()
locale = instance.get_locale()
print("Locale:", locale)

View file

@ -69,7 +69,7 @@ def get_latest_version_info():
return None
# Check if an update is available and perform update if needed
def check_for_update(slient=False):
def check_for_update():
global latest_version, local_version, launched
latest_version_info = get_latest_version_info()
@ -89,17 +89,16 @@ def check_for_update(slient=False):
logger.error(f"{RED}{_('cant_find_local_version')}{RESET}")
return
# Compare local and latest versions
if slient != True:
if local_version < latest_version:
logger.info(f"{YELLOW}{_('new_version').format(latest_version=latest_version, local_version=local_version)}{RESET}")
logger.info(f"{YELLOW}{_('changelog').format(VERSION_URL=VERSION_URL)}{RESET}")
auto_update()
elif beta == True:
logger.warning(f"You are running an \"unstable\" version of Goober, do not expect it to work properly.\nVersion {local_version}\nServer: {latest_version}{RESET}")
elif local_version > latest_version:
logger.warning(f"{_('modification_warning')}")
elif local_version == latest_version:
logger.info(f"{_('latest_version')} {local_version}")
logger.info(f"{_('latest_version2').format(VERSION_URL=VERSION_URL)}\n\n")
launched = True
if local_version < latest_version:
logger.info(f"{YELLOW}{_('new_version').format(latest_version=latest_version, local_version=local_version)}{RESET}")
logger.info(f"{YELLOW}{_('changelog').format(VERSION_URL=VERSION_URL)}{RESET}")
auto_update()
elif beta == True:
logger.warning(f"You are running an \"unstable\" version of Goober, do not expect it to work properly.\nVersion {local_version}\nServer: {latest_version}{RESET}")
elif local_version > latest_version:
logger.warning(f"{_('modification_warning')}")
elif local_version == latest_version:
logger.info(f"{_('latest_version')} {local_version}")
logger.info(f"{_('latest_version2').format(VERSION_URL=VERSION_URL)}\n\n")
launched = True
return latest_version

View file

@ -50,50 +50,12 @@ def find_dotenv(start_path: pathlib.Path) -> pathlib.Path | None:
current = current.parent
return None
def load_settings_json() -> dict | None:
start_path = working_dir.resolve()
current = start_path.resolve()
while current != current.parent:
candidate = current / "settings.json"
if candidate.exists():
try:
with open(candidate, "r", encoding="utf-8") as f:
data = json.load(f)
print(f"[VOLTA] {GREEN}Loaded settings.json locale '{data.get('locale')}' from {candidate}{RESET}")
return data
except Exception as e:
print(f"[VOLTA] {RED}Failed to load settings.json at {candidate}: {e}{RESET}")
return None
current = current.parent
for root, dirs, files in os.walk(start_path):
if "settings.json" in files:
candidate = pathlib.Path(root) / "settings.json"
try:
with open(candidate, "r", encoding="utf-8") as f:
data = json.load(f)
print(f"[VOLTA] {GREEN}Loaded settings.json locale '{data.get('locale')}' from {candidate}{RESET}")
return data
except Exception as e:
print(f"[VOLTA] {RED}Failed to load settings.json at {candidate}: {e}{RESET}")
return None
print(f"[VOLTA] {YELLOW}No settings.json found scanning up or down from {start_path}{RESET}")
return None
settings = load_settings_json()
if settings and "locale" in settings:
LOCALE = settings["locale"]
env_path = find_dotenv(pathlib.Path(__file__).parent)
if env_path:
load_dotenv(dotenv_path=env_path)
print(f"[VOLTA] {GREEN}Loaded .env from {env_path}{RESET}")
else:
env_path = find_dotenv(pathlib.Path(__file__).parent)
if env_path:
load_dotenv(dotenv_path=env_path)
print(f"[VOLTA] {GREEN}Loaded .env from {env_path}{RESET}")
else:
print(f"[VOLTA] {YELLOW}No .env file found from {__file__} upwards.{RESET}")
LOCALE = os.getenv("LOCALE") or None
print(f"[VOLTA] {YELLOW}No .env file found from {__file__} upwards.{RESET}")
locales_dirs.extend(find_locales_dirs(module_dir))
if working_dir != module_dir:

View file

@ -1,26 +0,0 @@
{
"bot": {
"prefix": "g.",
"owner_ids": [
],
"blacklisted_users": [],
"user_training": true,
"allow_show_mem_command": true,
"react_to_messages": true,
"misc": {
"ping_line": "The Beretta fires fast and won't make you feel any better!",
"active_song": "Basket Case - Green Day",
"positive_gifs": [],
"block_profanity": false
},
"active_memory": "memory.json",
"enabled_cogs": [
]
},
"locale": "fi",
"name": "goober",
"auto_update": true,
"disable_checks": false,
"splash_text_loc": "settings/splash.txt"
}

View file

@ -1,11 +0,0 @@
SS\
SS |
SSSSSS\ SSSSSS\ SSSSSS\ SSSSSSS\ SSSSSS\ SSSSSS\
SS __SS\ SS __SS\ SS __SS\ SS __SS\ SS __SS\ SS __SS\
SS / SS |SS / SS |SS / SS |SS | SS |SSSSSSSS |SS | \__|
SS | SS |SS | SS |SS | SS |SS | SS |SS ____|SS |
\SSSSSSS |\SSSSSS |\SSSSSS |SSSSSSS |\SSSSSSS\ SS |
\____SS | \______/ \______/ \_______/ \_______|\__|
SS\ SS |
\SSSSSS |
\______/