thanks github copilot for the comments i was too lazy to add

This commit is contained in:
expect 2025-06-22 19:07:41 +02:00
parent ec6751ee2e
commit a96616e4a2
6 changed files with 171 additions and 81 deletions

135
bot.py
View file

@ -1,55 +1,60 @@
import os
import re
import json
import time
import random
import shutil
import traceback
import subprocess
import requests
import psutil
import discord
from discord.ext import commands, tasks
from discord import app_commands
import json
import random
import os
import time
import re
import os
import requests
import subprocess
import psutil
from better_profanity import profanity
import traceback
import shutil
import nltk
from nltk.data import find
from nltk import download
from better_profanity import profanity
from modules.globalvars import *
from modules.central import ping_server
from modules.translations import *
from modules.markovmemory import *
from modules.version import *
from modules.sentenceprocessing import *
from discord.ext import commands, tasks
from discord import app_commands
print(splashtext) # you can use https://patorjk.com/software/taag/ for 3d text or just remove this entirely
check_for_update()
# Print splash text and check for updates
print(splashtext) # Print splash text (from modules/globalvars.py)
check_for_update() # Check for updates (from modules/version.py)
launched = False
# Ensure required NLTK resources are available
def check_resources():
# Check for required NLTK resources and download if missing
resources = {
'vader_lexicon': 'sentiment/vader_lexicon',
'punkt_tab': 'tokenizers/punkt',
}
for resource, path in resources.items():
try:
find(path)
find(path) # find is from nltk.data
print(f"{resource} is already installed.")
except LookupError:
print(f"{resource} is not installed. Downloading now...")
download(resource)
download(resource) # download is from nltk
check_resources()
# Download locale JSON files if not present
def download_json():
# Download the locale JSON file from GitHub if not present
locales_dir = "locales"
response = requests.get(f"https://raw.githubusercontent.com/gooberinc/goober/refs/heads/main/locales/{LOCALE}.json")
if response.status_code == 200:
if not os.path.exists(locales_dir):
os.makedirs(locales_dir)
file_path = os.path.join(locales_dir, f"{LOCALE}.json")
@ -58,9 +63,7 @@ def download_json():
else:
with open(file_path, "w", encoding="utf-8") as file:
file.write(response.text)
if not os.path.exists(os.path.join(locales_dir, "en.json")):
response = requests.get(f"https://raw.githubusercontent.com/gooberinc/goober/refs/heads/main/locales/en.json")
if response.status_code == 200:
with open(os.path.join(locales_dir, "en.json"), "w", encoding="utf-8") as file:
@ -68,28 +71,33 @@ def download_json():
download_json()
# Dynamically load all cogs (extensions) from the cogs folder
async def load_cogs_from_folder(bot, folder_name="cogs"):
# Loads all Python files in the cogs folder as Discord bot extensions
for filename in os.listdir(folder_name):
if filename.endswith(".py") and not filename.startswith("_"):
cog_name = filename[:-3]
try:
await bot.load_extension(f"{folder_name}.{cog_name}")
print(f"{GREEN}{get_translation(LOCALE, 'loaded_cog')} {cog_name}{RESET}")
print(f"{GREEN}{get_translation(LOCALE, 'loaded_cog')} {cog_name}{RESET}") # get_translation from modules/translations.py
except Exception as e:
print(f"{RED}{get_translation(LOCALE, 'cog_fail')} {cog_name} {e}{RESET}")
traceback.print_exc()
currenthash = ""
# Set up Discord bot intents and create bot instance
intents = discord.Intents.default()
intents.messages = True
intents.message_content = True
bot = commands.Bot(command_prefix=PREFIX, intents=intents, allowed_mentions=discord.AllowedMentions(everyone=False, roles=False, users=False, replied_user=True))
memory = load_memory()
bot = commands.Bot(
command_prefix=PREFIX,
intents=intents,
allowed_mentions=discord.AllowedMentions(everyone=False, roles=False, users=False, replied_user=True)
)
# Load memory and Markov model for text generation
memory = load_memory()
markov_model = load_markov_model()
if not markov_model:
print(f"{get_translation(LOCALE, 'no_model')}")
@ -99,6 +107,7 @@ if not markov_model:
generated_sentences = set()
used_words = set()
# Event: Called when the bot is ready
@bot.event
async def on_ready():
global launched
@ -109,16 +118,21 @@ async def on_ready():
os.makedirs(folder_name)
print(f"{GREEN}{get_translation(LOCALE, 'folder_created').format(folder_name=folder_name)}{RESET}")
else:
print(f"{DEBUG}{get_translation(LOCALE, 'folder_exists').format(folder_name=folder_name)}{RESET}")
markov_model = train_markov_model(memory)
await load_cogs_from_folder(bot)
global slash_commands_enabled
print(f"{GREEN}{get_translation(LOCALE, 'logged_in')} {bot.user}{RESET}")
print(f"{DEBUG}{get_translation(LOCALE, 'folder_exists').format(folder_name=folder_name)}{RESET}")
try:
synced = await bot.tree.sync()
print(f"{GREEN}{get_translation(LOCALE, 'synced_commands')} {len(synced)} {get_translation(LOCALE, 'synced_commands2')} {RESET}")
slash_commands_enabled = True
ping_server()
ping_server() # ping_server from modules/central.py
print(f"{GREEN}{get_translation(LOCALE, 'started').format()}{RESET}")
except discord.errors.Forbidden as perm_error:
print(f"{RED}Permission error while syncing commands: {perm_error}{RESET}")
print(f"{RED}Make sure the bot has the 'applications.commands' scope and is invited with the correct permissions.{RESET}")
quit()
except Exception as e:
print(f"{RED}{get_translation(LOCALE, 'fail_commands_sync')} {e}{RESET}")
traceback.print_exc()
quit()
print(f"{GREEN}{get_translation(LOCALE, 'started').format()}{RESET}")
except Exception as e:
print(f"{RED}{get_translation(LOCALE, 'fail_commands_sync')} {e}{RESET}")
@ -129,17 +143,16 @@ async def on_ready():
await bot.change_presence(activity=discord.Activity(type=discord.ActivityType.listening, name=f"{song}"))
launched = True
# Load positive GIF URLs from environment variable
positive_gifs = os.getenv("POSITIVE_GIFS").split(',')
# Command: Retrain the Markov model from memory
@bot.hybrid_command(description=f"{get_translation(LOCALE, 'command_desc_retrain')}")
async def retrain(ctx):
if ctx.author.id != ownerid:
return
message_ref = await send_message(ctx, f"{get_translation(LOCALE, 'command_markov_retrain')}")
message_ref = await send_message(ctx, f"{get_translation(LOCALE, 'command_markov_retrain')}") # send_message from modules/sentenceprocessing.py
try:
with open(MEMORY_FILE, 'r') as f:
memory = json.load(f)
@ -159,12 +172,12 @@ async def retrain(ctx):
await send_message(ctx, f"{get_translation(LOCALE, 'command_markov_retraining').format(processed_data=processed_data, data_size=data_size)}", edit=True, message_reference=processing_message_ref)
global markov_model
markov_model = train_markov_model(memory)
save_markov_model(markov_model)
await send_message(ctx, f"{get_translation(LOCALE, 'command_markov_retrain_successful').format(data_size=data_size)}", edit=True, message_reference=processing_message_ref)
# Command: Generate a sentence using the Markov model
@bot.hybrid_command(description=f"{get_translation(LOCALE, 'command_desc_talk')}")
async def talk(ctx, sentence_size: int = 5):
if not markov_model:
@ -188,8 +201,8 @@ async def talk(ctx, sentence_size: int = 5):
if response:
cleaned_response = re.sub(r'[^\w\s]', '', response).lower()
coherent_response = rephrase_for_coherence(cleaned_response)
if random.random() < 0.9 and is_positive(coherent_response):
coherent_response = rephrase_for_coherence(cleaned_response)
if random.random() < 0.9 and is_positive(coherent_response):
gif_url = random.choice(positive_gifs)
combined_message = f"{coherent_response}\n[jif]({gif_url})"
else:
@ -200,10 +213,10 @@ async def talk(ctx, sentence_size: int = 5):
else:
await send_message(ctx, f"{get_translation(LOCALE, 'command_talk_generation_fail')}")
# Remove default help command to use custom help
bot.help_command = None
# Command: Show help information
@bot.hybrid_command(description=f"{get_translation(LOCALE, 'command_desc_help')}")
async def help(ctx):
embed = discord.Embed(
@ -232,41 +245,48 @@ async def help(ctx):
await send_message(ctx, embed=embed)
# Event: Called on every message
@bot.event
async def on_message(message):
global memory, markov_model, last_random_talk_time
# Ignore bot messages
if message.author.bot:
return
# Ignore messages from blacklisted users
if str(message.author.id) in BLACKLISTED_USERS:
return
# Process commands if message starts with a command prefix
if message.content.startswith((f"{PREFIX}talk", f"{PREFIX}mem", f"{PREFIX}help", f"{PREFIX}stats", f"{PREFIX}")):
print(f"{get_translation(LOCALE, 'command_ran').format(message=message)}")
await bot.process_commands(message)
return
if profanity.contains_profanity(message.content):
# Ignore messages with profanity
if profanity.contains_profanity(message.content): # profanity from better_profanity
return
# Add user messages to memory for training if enabled
if message.content:
if not USERTRAIN_ENABLED:
return
formatted_message = append_mentions_to_18digit_integer(message.content)
cleaned_message = preprocess_message(formatted_message)
formatted_message = append_mentions_to_18digit_integer(message.content) # append_mentions_to_18digit_integer from modules/sentenceprocessing.py
cleaned_message = preprocess_message(formatted_message) # preprocess_message from modules/sentenceprocessing.py
if cleaned_message:
memory.append(cleaned_message)
save_memory(memory)
save_memory(memory) # save_memory from modules/markovmemory.py
# process any commands in the message
# Process any commands in the message
await bot.process_commands(message)
# Event: Called on every interaction (slash command, etc.)
@bot.event
async def on_interaction(interaction):
print(f"{get_translation(LOCALE, 'command_ran_s').format(interaction=interaction)}{interaction.data['name']}")
print(f"{get_translation(LOCALE, 'command_ran_s').format(interaction=interaction)}{interaction.data['name']}")
# Global check: Block blacklisted users from running commands
@bot.check
async def block_blacklisted(ctx):
if str(ctx.author.id) in BLACKLISTED_USERS:
@ -283,6 +303,7 @@ async def block_blacklisted(ctx):
return False
return True
# Command: Show bot latency
@bot.hybrid_command(description=f"{get_translation(LOCALE, 'command_desc_ping')}")
async def ping(ctx):
await ctx.defer()
@ -300,11 +321,12 @@ async def ping(ctx):
await ctx.send(embed=LOLembed)
# Command: Show about information
@bot.hybrid_command(description=f"{get_translation(LOCALE, 'command_about_desc')}")
async def about(ctx):
print("-----------------------------------\n\n")
try:
check_for_update()
check_for_update() # check_for_update from modules/version.py
except Exception as e:
pass
print("-----------------------------------")
@ -314,13 +336,14 @@ async def about(ctx):
await send_message(ctx, embed=embed)
# Command: Show bot statistics (admin only)
@bot.hybrid_command(description="stats")
async def stats(ctx):
if ctx.author.id != ownerid:
return
print("-----------------------------------\n\n")
try:
check_for_update()
check_for_update() # check_for_update from modules/version.py
except Exception as e:
pass
print("-----------------------------------")
@ -336,6 +359,7 @@ async def stats(ctx):
await send_message(ctx, embed=embed)
# Command: Upload memory.json to litterbox.catbox.moe and return the link
@bot.hybrid_command()
async def mem(ctx):
if showmemenabled != "true":
@ -345,8 +369,11 @@ async def mem(ctx):
print(memorylitter)
await send_message(ctx, memorylitter.stdout.strip())
# Helper: Improve sentence coherence (simple capitalization fix)
def improve_sentence_coherence(sentence):
# Capitalizes "i" to "I" in the sentence
sentence = sentence.replace(" i ", " I ")
return sentence
# Start the bot
bot.run(TOKEN)

View file

@ -4,13 +4,17 @@ import modules.globalvars as gv
from modules.translations import *
from modules.markovmemory import get_file_info
# Ping the server to check if it's alive and send some info
def ping_server():
if gv.ALIVEPING == "false":
# If pinging is disabled, print message and set environment variable
print(f"{gv.YELLOW}{get_translation(gv.LOCALE, 'pinging_disabled')}{RESET}")
os.environ['gooberauthenticated'] = 'No'
return
# Get server alert message
goobres = requests.get(f"{gv.VERSION_URL}/alert")
print(f"{get_translation(gv.LOCALE, 'goober_server_alert')}{goobres.text}")
# Gather file info for payload
file_info = get_file_info(gv.MEMORY_FILE)
payload = {
"name": gv.NAME,
@ -20,63 +24,74 @@ def ping_server():
"token": gv.gooberTOKEN
}
try:
# Send ping to server
response = requests.post(gv.VERSION_URL+"/ping", json=payload)
if response.status_code == 200:
# Success: print message and set environment variable
print(f"{gv.GREEN}{get_translation(gv.LOCALE, 'goober_ping_success').format(NAME=gv.NAME)}{RESET}")
os.environ['gooberauthenticated'] = 'Yes'
else:
# Failure: print error and set environment variable
print(f"{RED}{get_translation(gv.LOCALE, 'goober_ping_fail')} {response.status_code}{RESET}")
os.environ['gooberauthenticated'] = 'No'
except Exception as e:
# Exception: print error and set environment variable
print(f"{RED}{get_translation(gv.LOCALE, 'goober_ping_fail2')} {str(e)}{RESET}")
os.environ['gooberauthenticated'] = 'No'
# Check if a given name is available for registration
def is_name_available(NAME):
if os.getenv("gooberTOKEN"):
# If token is already set, skip check
return
try:
# Send request to check name availability
response = requests.post(f"{gv.VERSION_URL}/check-if-available", json={"name": NAME}, headers={"Content-Type": "application/json"})
if response.status_code == 200:
data = response.json()
return data.get("available", False)
else:
# Print error if request failed
print(f"{get_translation(gv.LOCALE, 'name_check')}", response.json())
return False
except Exception as e:
# Print exception if request failed
print(f"{get_translation(gv.LOCALE, 'name_check2')}", e)
return False
# Register a new name with the server
def register_name(NAME):
try:
if gv.ALIVEPING == False:
# If pinging is disabled, do nothing
return
# check if the name is avaliable
# Check if the name is available
if not is_name_available(NAME):
if os.getenv("gooberTOKEN"):
return
# Name taken: print error and exit
print(f"{RED}{get_translation(gv.LOCALE, 'name_taken')}{RESET}")
quit()
# if it is register it
# Register the name
response = requests.post(f"{gv.VERSION_URL}/register", json={"name": NAME}, headers={"Content-Type": "application/json"})
if response.status_code == 200:
data = response.json()
token = data.get("token")
if not os.getenv("gooberTOKEN"):
# Print instructions to add token and exit
print(f"{gv.GREEN}{get_translation(gv.LOCALE, 'add_token').format(token=token)} gooberTOKEN=<token>.{gv.RESET}")
quit()
else:
print(f"{gv.GREEN}{gv.RESET}")
return token
else:
# Print error if registration failed
print(f"{gv.RED}{get_translation(gv.LOCALE, 'token_exists').format()}{RESET}", response.json())
return None
except Exception as e:
# Print exception if registration failed
print(f"{gv.RED}{get_translation(gv.LOCALE, 'registration_error').format()}{RESET}", e)
return None
# Attempt to register the name at module load
register_name(gv.NAME)

View file

@ -4,6 +4,8 @@ import markovify
import pickle
from modules.globalvars import *
from modules.translations import *
# Get file size and line count for a given file path
def get_file_info(file_path):
try:
file_size = os.path.getsize(file_path)
@ -13,16 +15,18 @@ def get_file_info(file_path):
except Exception as e:
return {"error": str(e)}
# Load memory data from file, or use default dataset if not loaded yet
def load_memory():
data = []
# load data from MEMORY_FILE
# Try to load data from MEMORY_FILE
try:
with open(MEMORY_FILE, "r") as f:
data = json.load(f)
except FileNotFoundError:
pass
# If MEMORY_LOADED_FILE does not exist, load default data and mark as loaded
if not os.path.exists(MEMORY_LOADED_FILE):
try:
with open(DEFAULT_DATASET_FILE, "r") as f:
@ -34,10 +38,12 @@ def load_memory():
f.write("Data loaded")
return data
# Save memory data to MEMORY_FILE
def save_memory(memory):
with open(MEMORY_FILE, "w") as f:
json.dump(memory, f, indent=4)
# Train a Markov model using memory and optional additional data
def train_markov_model(memory, additional_data=None):
if not memory:
return None
@ -47,13 +53,14 @@ def train_markov_model(memory, additional_data=None):
model = markovify.NewlineText(text, state_size=2)
return model
# Save the Markov model to a pickle file
def save_markov_model(model, filename='markov_model.pkl'):
with open(filename, 'wb') as f:
pickle.dump(model, f)
print(f"Markov model saved to {filename}.")
# Load the Markov model from a pickle file
def load_markov_model(filename='markov_model.pkl'):
try:
with open(filename, 'rb') as f:
model = pickle.load(f)

View file

@ -3,20 +3,29 @@ from modules.globalvars import *
from modules.translations import *
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from nltk.tokenize import word_tokenize
# Initialize the sentiment analyzer
analyzer = SentimentIntensityAnalyzer()
def is_positive(sentence):
"""
Determines if the sentiment of the sentence is positive.
Prints debug information and returns True if sentiment score > 0.1.
"""
scores = analyzer.polarity_scores(sentence)
sentiment_score = scores['compound']
# forcin this fucker
# Print debug message with sentiment score
debug_message = f"{DEBUG}{get_translation(LOCALE, 'sentence_positivity')} {sentiment_score}{RESET}"
print(debug_message)
return sentiment_score > 0.1
async def send_message(ctx, message=None, embed=None, file=None, edit=False, message_reference=None):
"""
Sends or edits a message in a Discord context.
Handles both slash command and regular command contexts.
"""
if edit and message_reference:
try:
# Editing the existing message
@ -34,7 +43,7 @@ async def send_message(ctx, message=None, embed=None, file=None, edit=False, mes
if file:
sent_message = await ctx.respond(file=file, ephemeral=False)
else:
# For regular command contexts
sent_message = None
if embed:
sent_message = await ctx.send(embed=embed)
@ -43,26 +52,36 @@ async def send_message(ctx, message=None, embed=None, file=None, edit=False, mes
if file:
sent_message = await ctx.send(file=file)
return sent_message
#this doesnt work and im extremely pissed and mad
def append_mentions_to_18digit_integer(message):
"""
Removes 18-digit integers from the message (commonly used for Discord user IDs).
"""
pattern = r'\b\d{18}\b'
return re.sub(pattern, lambda match: f"", message)
def preprocess_message(message):
"""
Preprocesses the message by removing 18-digit integers and non-alphanumeric tokens.
Returns the cleaned message as a string.
"""
message = append_mentions_to_18digit_integer(message)
tokens = word_tokenize(message)
tokens = [token for token in tokens if token.isalnum()]
return " ".join(tokens)
def improve_sentence_coherence(sentence):
"""
Improves sentence coherence by capitalizing isolated 'i' pronouns.
"""
sentence = sentence.replace(" i ", " I ")
return sentence
def rephrase_for_coherence(sentence):
"""
Rephrases the sentence for coherence by joining words with spaces.
(Currently a placeholder function.)
"""
words = sentence.split()
coherent_sentence = " ".join(words)
return coherent_sentence

View file

@ -2,21 +2,38 @@ import os
import json
import pathlib
from modules.globalvars import RED, RESET
def load_translations():
"""
Loads all translation JSON files from the 'locales' directory.
Returns a dictionary mapping language codes to their translation dictionaries.
"""
translations = {}
# Get the path to the 'locales' directory (one level up from this file)
translations_dir = pathlib.Path(__file__).parent.parent / 'locales'
# Iterate over all files in the 'locales' directory
for filename in os.listdir(translations_dir):
if filename.endswith(".json"):
# Extract language code from filename (e.g., 'en' from 'en.json')
lang_code = filename.replace(".json", "")
# Open and load the JSON file
with open(os.path.join(translations_dir, filename), "r", encoding="utf-8") as f:
translations[lang_code] = json.load(f)
return translations
# Load all translations at module import
translations = load_translations()
def get_translation(lang: str, key: str):
"""
Retrieves the translation for a given key and language.
Falls back to English if the language is not found.
Prints a warning if the key is missing.
"""
# Get translations for the specified language, or fall back to English
lang_translations = translations.get(lang, translations["en"])
if key not in lang_translations:
# Print a warning if the key is missing in the selected language
print(f"{RED}Missing key: {key} in language {lang}{RESET}")
# Return the translation if found, otherwise return the key itself
return lang_translations.get(key, key)

View file

@ -6,18 +6,21 @@ import requests
import subprocess
import sys
# Run a shell command and return its output
def run_cmd(cmd):
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
return result.stdout.strip()
# Check if the remote branch is ahead of the local branch
def is_remote_ahead(branch='main', remote='origin'):
run_cmd(f'git fetch {remote}')
count = run_cmd(f'git rev-list --count HEAD..{remote}/{branch}')
return int(count) > 0
# Automatically update the local repository if the remote is ahead
def auto_update(branch='main', remote='origin'):
if AUTOUPDATE != "True":
pass
pass # Auto-update is disabled
if is_remote_ahead(branch, remote):
print(f"Remote {remote}/{branch} is ahead. Updating...")
pull_result = run_cmd(f'git pull {remote} {branch}')
@ -27,10 +30,9 @@ def auto_update(branch='main', remote='origin'):
else:
print(f"Local {remote}/{branch} is ahead and/or up to par. Not Updating...")
# Fetch the latest version info from the update server
def get_latest_version_info():
try:
response = requests.get(UPDATE_URL, timeout=5)
if response.status_code == 200:
return response.json()
@ -40,16 +42,17 @@ def get_latest_version_info():
except requests.RequestException as e:
print(f"{RED}{get_translation(LOCALE, 'version_error')} {e}{RESET}")
return None
# Check if an update is available and perform update if needed
def check_for_update():
if ALIVEPING != "True":
pass
global latest_version, local_version
pass # Update check is disabled
global latest_version, local_version
latest_version_info = get_latest_version_info()
if not latest_version_info:
print(f"{get_translation(LOCALE, 'fetch_update_fail')}")
return None, None
return None, None
latest_version = latest_version_info.get("version")
os.environ['gooberlatest_version'] = latest_version
@ -57,12 +60,14 @@ def check_for_update():
if not latest_version or not download_url:
print(f"{RED}{get_translation(LOCALE, 'invalid_server')}{RESET}")
return None, None
return None, None
# Check if local_version is valid
if local_version == "0.0.0" or None:
print(f"{RED}I cant find the local_version variable! Or its been tampered with and its not an interger!{RESET}")
return
# Compare local and latest versions
if local_version < latest_version:
print(f"{YELLOW}{get_translation(LOCALE, 'new_version').format(latest_version=latest_version, local_version=local_version)}{RESET}")
print(f"{YELLOW}{get_translation(LOCALE, 'changelog').format(VERSION_URL=VERSION_URL)}{RESET}")