lobotmized goober and cleared out junk

This commit is contained in:
WhatDidYouExpect 2025-07-22 19:34:46 +02:00
parent b860d0e271
commit 8021d17d27
14 changed files with 19 additions and 1488 deletions

420
main.py Normal file
View file

@ -0,0 +1,420 @@
import os
import re
import json
import time
import random
import traceback
import subprocess
import tempfile
import shutil
import psutil
import asyncio
import platform
import sys
from typing import List, Dict, Set, Optional, Tuple, Any, Union, Callable, Coroutine, TypeVar, Type
import logging
from modules.globalvars import *
from modules.prestartchecks import start_checks
from modules.logger import GooberFormatter
import logging
logger = logging.getLogger("goober")
logger.setLevel(logging.DEBUG)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(GooberFormatter())
file_handler = logging.FileHandler("log.txt", mode="w+", encoding="UTF-8")
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(GooberFormatter(colors=False))
logger.addHandler(console_handler)
logger.addHandler(file_handler)
# Print splash text and check for updates
print(splashtext) # Print splash text (from modules/globalvars.py)
start_checks()
import requests
import discord
from discord.ext import commands
from discord import app_commands
from discord import Colour, Embed, File, Interaction, Message
from discord.abc import Messageable
from discord.ext import commands
from modules.volta.main import _, set_language
from modules.markovmemory import *
from modules.version import *
from modules.sentenceprocessing import *
from modules.unhandledexception import handle_exception
from modules.image import gen_meme
sys.excepthook = handle_exception
check_for_update()
T = TypeVar('T')
MessageContext = Union[commands.Context, discord.Interaction]
MessageReference = Union[Message, discord.WebhookMessage]
# Constants with type hints
positive_gifs: List[str] = os.getenv("POSITIVE_GIFS", "").split(',')
currenthash: str = ""
launched: bool = False
slash_commands_enabled: bool = False
# Load memory and Markov model for text generation
memory: List[str] = load_memory()
markov_model: Optional[markovify.Text] = load_markov_model()
if not markov_model:
logger.error(_('markov_model_not_found'))
memory = load_memory()
markov_model = train_markov_model(memory)
generated_sentences: Set[str] = set()
used_words: Set[str] = set()
def get_git_remote_url():
try:
url = subprocess.check_output(
["git", "config", "--get", "remote.origin.url"],
text=True,
stderr=subprocess.DEVNULL,
).strip()
return url
except subprocess.CalledProcessError:
return "Unknown"
async def load_cogs_from_folder(bot, folder_name="assets/cogs"):
for filename in os.listdir(folder_name):
if filename.endswith(".py") and not filename.startswith("_"):
cog_name = filename[:-3]
module_path = folder_name.replace("/", ".").replace("\\", ".") + f".{cog_name}"
try:
await bot.load_extension(module_path)
logger.info(f"{(_('loaded_cog'))} {cog_name}")
except Exception as e:
logger.error(f"{(_('cog_fail'))} {cog_name} {e}")
traceback.print_exc()
# Event: Called when the bot is ready
@bot.event
async def on_ready() -> None:
global launched
global slash_commands_enabled
global NAME
global status
folder_name: str = "cogs"
if launched:
return
await load_cogs_from_folder(bot)
try:
synced: List[discord.app_commands.AppCommand] = await bot.tree.sync()
logger.info(f"{_('synced_commands')} {len(synced)} {(_('synced_commands2'))}")
slash_commands_enabled = True
logger.info(f"{(_('started')).format(name=NAME)}")
except discord.errors.Forbidden as perm_error:
logger.error(f"Permission error while syncing commands: {perm_error}")
logger.error("Make sure the bot has the 'applications.commands' scope and is invited with the correct permissions.")
quit()
except Exception as e:
logger.error(f"{_('fail_commands_sync')} {e}")
traceback.print_exc()
quit()
if not song:
return
status = {
"idle": discord.Status.idle,
"dnd": discord.Status.dnd,
"invisible": discord.Status.invisible,
"online": discord.Status.online
}.get(status.lower(), discord.Status.online)
await bot.change_presence(status=status, activity=discord.Activity(type=discord.ActivityType.listening, name=f"{song}"))
launched = True
@bot.event
async def on_command_error(ctx: commands.Context, error: commands.CommandError) -> None:
from modules.unhandledexception import handle_exception
if isinstance(error, commands.CommandInvokeError):
original: Exception = error.original
handle_exception(
type(original), original, original.__traceback__,
context=f"Command: {ctx.command} | User: {ctx.author}"
)
else:
handle_exception(
type(error), error, error.__traceback__,
context=f"Command: {ctx.command} | User: {ctx.author}"
)
@bot.hybrid_command(description=f"{(_('command_desc_retrain'))}")
async def retrain(ctx: commands.Context) -> None:
if ctx.author.id != ownerid:
return
global markov_model
message_ref: MessageReference = await send_message(ctx, f"{(_('command_markov_retrain'))}")
try:
with open(MEMORY_FILE, 'r') as f:
memory: List[str] = json.load(f)
except FileNotFoundError:
await send_message(ctx, f"{(_('command_markov_memory_not_found'))}")
return
except json.JSONDecodeError:
await send_message(ctx, f"{(_('command_markov_memory_is_corrupt'))}")
return
data_size: int = len(memory)
processed_data: int = 0
processing_message_ref: MessageReference = await send_message(ctx, f"{(_('command_markov_retraining')).format(processed_data=processed_data, data_size=data_size)}")
markov_model = train_markov_model(memory)
save_markov_model(markov_model)
await send_message(ctx, f"{_('command_markov_retrain_successful').format(data_size=data_size)}", edit=True, message_reference=processing_message_ref)
@bot.hybrid_command(description=f"{(_('command_desc_talk'))}")
async def talk(ctx: commands.Context, sentence_size: int = 5) -> None:
if not markov_model:
await send_message(ctx, f"{(_('command_talk_insufficent_text'))}")
return
response: Optional[str] = None
for _ in range(20):
if sentence_size == 1:
response = markov_model.make_short_sentence(max_chars=100, tries=100)
if response:
response = response.split()[0]
else:
response = markov_model.make_sentence(tries=100, max_words=sentence_size)
if response and response not in generated_sentences:
if sentence_size > 1:
response = improve_sentence_coherence(response)
generated_sentences.add(response)
break
if response:
cleaned_response: str = re.sub(r'[^\w\s]', '', response).lower()
coherent_response: str = rephrase_for_coherence(cleaned_response)
if random.random() < 0.9 and is_positive(coherent_response):
gif_url: str = random.choice(positive_gifs)
combined_message: str = f"{coherent_response}\n[jif]({gif_url})"
else:
combined_message: str = coherent_response
logger.info(combined_message)
os.environ['gooberlatestgen'] = combined_message
await send_message(ctx, combined_message)
else:
await send_message(ctx, f"{(_('command_talk_generation_fail'))}")
@bot.hybrid_command(description=f"RAM")
async def ramusage(ctx):
process = psutil.Process(os.getpid())
mem = process.memory_info().rss
await send_message(ctx, f"{mem / 1024 / 1024:.2f} MB")
@bot.hybrid_command(description=f"{(_('command_desc_help'))}")
async def impact(ctx: commands.Context, text: Optional[str] = None) -> None:
assets_folder: str = "assets/images"
temp_input: Optional[str] = None
def get_random_asset_image() -> Optional[str]:
files: List[str] = [f for f in os.listdir(assets_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))]
if not files:
return None
return os.path.join(assets_folder, random.choice(files))
if ctx.message.attachments:
attachment: discord.Attachment = ctx.message.attachments[0]
if attachment.content_type and attachment.content_type.startswith("image/"):
ext: str = os.path.splitext(attachment.filename)[1]
temp_input = f"tempy{ext}"
await attachment.save(temp_input)
input_path: str = temp_input
else:
fallback_image: Optional[str] = get_random_asset_image()
if fallback_image is None:
await ctx.reply(_('no_image_available'))
return
temp_input = tempfile.mktemp(suffix=os.path.splitext(fallback_image)[1])
shutil.copy(fallback_image, temp_input)
input_path = temp_input
else:
fallback_image = get_random_asset_image()
if fallback_image is None:
await ctx.reply(_('no_image_available'))
return
temp_input = tempfile.mktemp(suffix=os.path.splitext(fallback_image)[1])
shutil.copy(fallback_image, temp_input)
input_path = temp_input
output_path: Optional[str] = await gen_meme(input_path, custom_text=text)
if output_path is None or not os.path.isfile(output_path):
if temp_input and os.path.exists(temp_input):
os.remove(temp_input)
await ctx.reply(_('failed_generate_image'))
return
await ctx.send(file=discord.File(output_path))
if temp_input and os.path.exists(temp_input):
os.remove(temp_input)
bot.remove_command('help')
# Command: Show help information
@bot.hybrid_command(description=f"{(_('command_desc_help'))}")
async def help(ctx: commands.Context) -> None:
embed: discord.Embed = discord.Embed(
title=f"{(_('command_help_embed_title'))}",
description=f"{(_('command_help_embed_desc'))}",
color=Colour(0x000000)
)
command_categories: Dict[str, List[str]] = {
f"{(_('command_help_categories_general'))}": ["mem", "talk", "about", "ping", "impact", "demotivator", "help"],
f"{(_('command_help_categories_admin'))}": ["stats", "retrain", "setlanguage"]
}
custom_commands: List[str] = []
for cog_name, cog in bot.cogs.items():
for command in cog.get_commands():
if command.name not in command_categories[f"{(_('command_help_categories_general'))}"] and command.name not in command_categories[f"{(_('command_help_categories_admin'))}"]:
custom_commands.append(command.name)
if custom_commands:
embed.add_field(name=f"{(_('command_help_categories_custom'))}", value="\n".join([f"{PREFIX}{command}" for command in custom_commands]), inline=False)
for category, commands_list in command_categories.items():
commands_in_category: str = "\n".join([f"{PREFIX}{command}" for command in commands_list])
embed.add_field(name=category, value=commands_in_category, inline=False)
await send_message(ctx, embed=embed)
@bot.hybrid_command(description=f"{(_('command_desc_setlang'))}")
@app_commands.describe(locale="Choose your language")
async def setlanguage(ctx: commands.Context, locale: str) -> None:
if ctx.author.id != ownerid:
await ctx.send(":thumbsdown:")
return
await ctx.defer()
set_language(locale)
await ctx.send(":thumbsup:")
@bot.event
async def on_message(message: discord.Message) -> None:
global memory, markov_model
if message.author.bot:
return
if str(message.author.id) in BLACKLISTED_USERS:
return
if message.content.startswith((f"{PREFIX}talk", f"{PREFIX}mem", f"{PREFIX}help", f"{PREFIX}stats", f"{PREFIX}")):
logger.info(f"{(_('command_ran')).format(message=message)}")
await bot.process_commands(message)
return
if message.content:
if not USERTRAIN_ENABLED:
return
formatted_message: str = message.content
cleaned_message: str = formatted_message
save_memory(memory)
await bot.process_commands(message)
@bot.event
async def on_interaction(interaction: discord.Interaction) -> None:
name = None
if interaction.data.get('name') is None:
name = "Unknown"
else:
name = interaction.data['name']
logger.info(f"{(_('command_ran_s')).format(interaction=interaction)}{name}")
@bot.check
async def block_blacklisted(ctx: commands.Context) -> bool:
if str(ctx.author.id) in BLACKLISTED_USERS:
try:
if isinstance(ctx, discord.Interaction):
if not ctx.response.is_done():
await ctx.response.send_message(_('blacklisted'), ephemeral=True)
else:
await ctx.followup.send(_('blacklisted'), ephemeral=True)
else:
await ctx.send(_('blacklisted_user'), ephemeral=True)
except:
pass
return False
return True
@bot.hybrid_command(description=f"{(_('command_desc_ping'))}")
async def ping(ctx: commands.Context) -> None:
await ctx.defer()
latency: int = round(bot.latency * 1000)
LOLembed: discord.Embed = discord.Embed(
title="Pong!!",
description=(
f"{PING_LINE}\n"
f"`{(_('command_ping_embed_desc'))}: {latency}ms`\n"
),
color=Colour(0x000000)
)
LOLembed.set_footer(text=f"{(_('command_ping_footer'))} {ctx.author.name}", icon_url=ctx.author.avatar.url)
await ctx.send(embed=LOLembed)
# Command: Show about information
@bot.hybrid_command(description=f"{(_('command_about_desc'))}")
async def about(ctx: commands.Context) -> None:
print("-----------------------------------\n\n")
latest_version: str = check_for_update()
print("-----------------------------------")
embed: discord.Embed = discord.Embed(title=f"{(_('command_about_embed_title'))}", description="", color=Colour(0x000000))
embed.add_field(name=f"{(_('command_about_embed_field1'))}", value=f"{NAME}", inline=False)
embed.add_field(name=f"{(_('command_about_embed_field2name'))}", value=f"{(_('command_about_embed_field2value')).format(local_version=local_version, latest_version=latest_version)}", inline=False)
embed.add_field(name=f"Git", value=get_git_remote_url())
embed.add_field(name=f"OS", value=platform.platform())
await send_message(ctx, embed=embed)
# Command: Show bot statistics (admin only)
@bot.hybrid_command(description="stats")
async def stats(ctx: commands.Context) -> None:
if ctx.author.id != ownerid:
return
print("-----------------------------------\n\n")
latest_version: str = check_for_update()
print("-----------------------------------")
memory_file: str = 'memory.json'
file_size: int = os.path.getsize(memory_file)
with open(memory_file, 'r') as file:
line_count: int = sum(1 for _ in file)
embed: discord.Embed = discord.Embed(title=f"{(_('command_stats_embed_title'))}", description=f"{(_('command_stats_embed_desc'))}", color=Colour(0x000000))
embed.add_field(name=f"{(_('command_stats_embed_field1name'))}", value=f"{(_('command_stats_embed_field1value')).format(file_size=file_size, line_count=line_count)}", inline=False)
embed.add_field(name=f"{(_('command_stats_embed_field2name'))}", value=f"{(_('command_stats_embed_field2value')).format(local_version=local_version, latest_version=latest_version)}", inline=False)
embed.add_field(name=f"{(_('command_stats_embed_field3name'))}", value=f"{(_('command_stats_embed_field3value')).format(NAME=NAME, PREFIX=PREFIX, ownerid=ownerid, PING_LINE=PING_LINE, showmemenabled=showmemenabled, USERTRAIN_ENABLED=USERTRAIN_ENABLED, song=song, splashtext=splashtext)}", inline=False)
embed.add_field(name=f"OS", value=platform.platform())
embed.add_field(name="Python Version", value=platform.python_version())
await send_message(ctx, embed=embed)
# Command: Upload memory.json to litterbox.catbox.moe and return the link
@bot.hybrid_command()
async def mem(ctx: commands.Context) -> None:
if showmemenabled != "true":
return
command: str = """curl -F "reqtype=fileupload" -F "time=1h" -F "fileToUpload=@memory.json" https://litterbox.catbox.moe/resources/internals/api.php"""
memorylitter: subprocess.CompletedProcess = subprocess.run(command, shell=True, capture_output=True, text=True)
logger.debug(memorylitter)
await send_message(ctx, memorylitter.stdout.strip())
# Helper: Improve sentence coherence (simple capitalization fix)
def improve_sentence_coherence(sentence: str) -> str:
# Capitalizes "i" to "I" in the sentence
sentence = sentence.replace(" i ", " I ")
return sentence
# Start the bot
bot.run(TOKEN)