added permission wrapper

This commit is contained in:
ctih1 2025-07-23 10:19:08 +03:00
parent f7042ed8a7
commit f186e079da
29 changed files with 860 additions and 788 deletions

View file

@ -3,17 +3,23 @@ import platform
from dotenv import load_dotenv
import pathlib
import subprocess
def get_git_branch():
try:
branch = subprocess.check_output(
["git", "rev-parse", "--abbrev-ref", "HEAD"],
stderr=subprocess.DEVNULL
).decode('utf-8').strip()
branch = (
subprocess.check_output(
["git", "rev-parse", "--abbrev-ref", "HEAD"], stderr=subprocess.DEVNULL
)
.decode("utf-8")
.strip()
)
return branch
except subprocess.CalledProcessError:
return None
env_path = pathlib.Path(__file__).parent.parent / '.env'
env_path = pathlib.Path(__file__).parent.parent / ".env"
load_dotenv(dotenv_path=env_path)
ANSI = "\033["
@ -25,9 +31,9 @@ DEBUG = f"{ANSI}1;30m"
RESET = f"{ANSI}0m"
VERSION_URL = "https://raw.githubusercontent.com/gooberinc/version/main"
UPDATE_URL = VERSION_URL+"/latest_version.json"
UPDATE_URL = VERSION_URL + "/latest_version.json"
print(UPDATE_URL)
LOCAL_VERSION_FILE = "current_version.txt"
LOCAL_VERSION_FILE = "current_version.txt"
# TOKEN = os.getenv("DISCORDBOTTOKEN", "0")
# PREFIX = os.getenv("BOTPREFIX", "g.")
@ -52,9 +58,9 @@ LOCAL_VERSION_FILE = "current_version.txt"
# IGNOREWARNING = False # is this either??? i don't think so?
# song = os.getenv("song")
arch = platform.machine()
slash_commands_enabled = True # 100% broken, its a newer enough version so its probably enabled by default.... fix this at somepoint or hard code it in goober central code
slash_commands_enabled = True # 100% broken, its a newer enough version so its probably enabled by default.... fix this at somepoint or hard code it in goober central code
launched = False
latest_version = "0.0.0"
local_version = "2.3.3"
os.environ['gooberlocal_version'] = local_version
os.environ["gooberlocal_version"] = local_version
beta = get_git_branch() == "dev"

View file

@ -6,37 +6,57 @@ import tempfile
from typing import Optional, List
from PIL import Image, ImageDraw, ImageFont, ImageOps
from modules.markovmemory import load_markov_model
from modules.sentenceprocessing import improve_sentence_coherence, rephrase_for_coherence
from modules.sentenceprocessing import (
improve_sentence_coherence,
rephrase_for_coherence,
)
generated_sentences = set()
def load_font(size):
return ImageFont.truetype("assets/fonts/Impact.ttf", size=size)
def load_tnr(size):
return ImageFont.truetype("assets/fonts/TNR.ttf", size=size)
def draw_text_with_outline(draw, text, x, y, font):
outline_offsets = [(-2, -2), (-2, 2), (2, -2), (2, 2), (0, -2), (0, 2), (-2, 0), (2, 0)]
outline_offsets = [
(-2, -2),
(-2, 2),
(2, -2),
(2, 2),
(0, -2),
(0, 2),
(-2, 0),
(2, 0),
]
for ox, oy in outline_offsets:
draw.text((x + ox, y + oy), text, font=font, fill="black")
draw.text((x, y), text, font=font, fill="white")
def fits_in_width(text, font, max_width, draw):
bbox = draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
return text_width <= max_width
def split_text_to_fit(text, font, max_width, draw):
words = text.split()
for i in range(len(words), 0, -1):
top_text = " ".join(words[:i])
bottom_text = " ".join(words[i:])
if fits_in_width(top_text, font, max_width, draw) and fits_in_width(bottom_text, font, max_width, draw):
if fits_in_width(top_text, font, max_width, draw) and fits_in_width(
bottom_text, font, max_width, draw
):
return top_text, bottom_text
midpoint = len(words) // 2
return " ".join(words[:midpoint]), " ".join(words[midpoint:])
async def gen_meme(input_image_path, sentence_size=5, max_attempts=10):
markov_model = load_markov_model()
if not markov_model or not os.path.isfile(input_image_path):
@ -54,11 +74,15 @@ async def gen_meme(input_image_path, sentence_size=5, max_attempts=10):
response = None
for _ in range(20):
if sentence_size == 1:
candidate = markov_model.make_short_sentence(max_chars=100, tries=100)
candidate = markov_model.make_short_sentence(
max_chars=100, tries=100
)
if candidate:
candidate = candidate.split()[0]
else:
candidate = markov_model.make_sentence(tries=100, max_words=sentence_size)
candidate = markov_model.make_sentence(
tries=100, max_words=sentence_size
)
if candidate and candidate not in generated_sentences:
if sentence_size > 1:
@ -70,7 +94,7 @@ async def gen_meme(input_image_path, sentence_size=5, max_attempts=10):
if not response:
response = "NO TEXT GENERATED"
cleaned_response = re.sub(r'[^\w\s]', '', response).lower()
cleaned_response = re.sub(r"[^\w\s]", "", response).lower()
coherent_response = rephrase_for_coherence(cleaned_response).upper()
bbox = draw.textbbox((0, 0), coherent_response, font=font)
@ -79,11 +103,15 @@ async def gen_meme(input_image_path, sentence_size=5, max_attempts=10):
max_text_height = height // 4
if text_width <= width and text_height_px <= max_text_height:
draw_text_with_outline(draw, coherent_response, (width - text_width) / 2, 0, font)
draw_text_with_outline(
draw, coherent_response, (width - text_width) / 2, 0, font
)
img.save(input_image_path)
return input_image_path
else:
top_text, bottom_text = split_text_to_fit(coherent_response, font, width, draw)
top_text, bottom_text = split_text_to_fit(
coherent_response, font, width, draw
)
top_bbox = draw.textbbox((0, 0), top_text, font=font)
bottom_bbox = draw.textbbox((0, 0), bottom_text, font=font)
@ -92,9 +120,21 @@ async def gen_meme(input_image_path, sentence_size=5, max_attempts=10):
bottom_height = bottom_bbox[3] - bottom_bbox[1]
if top_height <= max_text_height and bottom_height <= max_text_height:
draw_text_with_outline(draw, top_text, (width - (top_bbox[2] - top_bbox[0])) / 2, 0, font)
draw_text_with_outline(
draw,
top_text,
(width - (top_bbox[2] - top_bbox[0])) / 2,
0,
font,
)
y_bottom = height - bottom_height - int(height * 0.04)
draw_text_with_outline(draw, bottom_text, (width - (bottom_bbox[2] - bottom_bbox[0])) / 2, y_bottom, font)
draw_text_with_outline(
draw,
bottom_text,
(width - (bottom_bbox[2] - bottom_bbox[0])) / 2,
y_bottom,
font,
)
img.save(input_image_path)
return input_image_path
@ -113,6 +153,7 @@ async def gen_meme(input_image_path, sentence_size=5, max_attempts=10):
img.save(input_image_path)
return input_image_path
async def gen_demotivator(input_image_path, max_attempts=5):
markov_model = load_markov_model()
if not markov_model or not os.path.isfile(input_image_path):
@ -124,7 +165,7 @@ async def gen_demotivator(input_image_path, max_attempts=5):
size = max(img.width, img.height)
frame_thick = int(size * 0.0054)
inner_size = size - 2 * frame_thick
resized_img = img.resize((inner_size, inner_size), Image.LANCZOS)
resized_img = img.resize((inner_size, inner_size), Image.LANCZOS)
framed = Image.new("RGB", (size, size), "white")
framed.paste(resized_img, (frame_thick, frame_thick))
landscape_w = int(size * 1.5)
@ -145,8 +186,10 @@ async def gen_demotivator(input_image_path, max_attempts=5):
title = t.upper()
subtitle = s.capitalize()
break
if not title: title = "DEMOTIVATOR"
if not subtitle: subtitle = "no text generated"
if not title:
title = "DEMOTIVATOR"
if not subtitle:
subtitle = "no text generated"
title_sz = int(caption_h * 0.4)
sub_sz = int(caption_h * 0.25)

View file

@ -30,6 +30,8 @@ import time
NOTICE = """
# This file was automatically created from localization JSON files.
# DO NOT EDIT THIS FILE DIRECTLY. If you want to edit a translation, please use the language's JSON file.
#fmt: off
"""
@ -42,10 +44,11 @@ logging.basicConfig(
logger = logging.getLogger("kaannos")
class LanguageCollector:
def __init__(self, language_dir: str) -> None:
self.path: str = language_dir
self.languages: Dict[str, Dict[str,str]] = {}
self.languages: Dict[str, Dict[str, str]] = {}
for file in os.listdir(self.path):
if not file.endswith(".json") or len(file) > 7:
@ -55,13 +58,12 @@ class LanguageCollector:
locale: str = file.split(".json")[0]
logger.info(f"Discovered {file}")
with open(os.path.join(self.path, file), "r", encoding="UTF-8") as f:
keys: Dict[str,str] = json.load(f)
keys: Dict[str, str] = json.load(f)
self.languages[locale] = keys
print(self.languages)
self.find_missing_keys()
def find_missing_keys(self) -> None:
primary_language_keys: Dict[str, str] = self.languages["en"]
@ -69,21 +71,24 @@ class LanguageCollector:
for language in self.languages:
if key not in self.languages[language]:
logger.warning(f"Key {key} missing from {language}")
for language in self.languages:
for key in self.languages[language]:
if key not in primary_language_keys:
logger.warning(f"Leftover key {key} found from {language}")
class Script:
def __init__(self) -> None:
self.script: str = ""
def add_line(self, content, indent: int=0, newline: bool = True) -> None:
def add_line(self, content, indent: int = 0, newline: bool = True) -> None:
self.script += f"{'\t' * indent}{content}{'\n' if newline else ''}"
def process_name(key: str) -> str:
return key.replace(" ", "_").replace(":","").lower()
return key.replace(" ", "_").replace(":", "").lower()
def find_args(string: str) -> List[str]:
variable_open: bool = False
@ -97,7 +102,7 @@ def find_args(string: str) -> List[str]:
variables.append(temp_content)
temp_content = ""
continue
if char == "{":
raise SyntaxError("Variable already open!")
@ -106,17 +111,17 @@ def find_args(string: str) -> List[str]:
else:
if char == "}":
raise SyntaxError("Trying to close a nonexistant variable")
if char == "{":
variable_open = True
return variables
def convert_args(inp: str, vars: List[str], mode: Literal["brackets", "none"] = "brackets") -> str:
replacements = {
".": "_",
",": "_"
}
def convert_args(
inp: str, vars: List[str], mode: Literal["brackets", "none"] = "brackets"
) -> str:
replacements = {".": "_", ",": "_"}
for var in vars:
cleaned_var = var
@ -131,9 +136,15 @@ def convert_args(inp: str, vars: List[str], mode: Literal["brackets", "none"] =
return inp
class GenerateScript:
def __init__(self, primary_lang:str, language_data: Dict[str, Dict[str,str]], use_typing: bool = True, output_path: str = "out.py", generate_comments: bool = True):
def __init__(
self,
primary_lang: str,
language_data: Dict[str, Dict[str, str]],
use_typing: bool = True,
output_path: str = "out.py",
generate_comments: bool = True,
):
self.data = language_data
self.primary = primary_lang
self.script = Script()
@ -142,31 +153,41 @@ class GenerateScript:
self.generate_comments = generate_comments
def create(self):
# I really don't like this implementation but also it works
# I really don't like this implementation but also it works
self.script.add_line(NOTICE)
if self.uses_typing:
self.script.add_line("from typing import Literal, List")
self.script.add_line(f"Language=Literal{list(self.data.keys())}")
self.script.add_line(f"languages: List[Language] = {list(self.data.keys())}")
self.script.add_line(
f"languages: List[Language] = {list(self.data.keys())}"
)
self.script.add_line(f"default_lang: Language | str='{self.primary}'")
self.script.add_line("def change_language(new_lang: Language | str) -> None: global default_lang; default_lang = new_lang")
self.script.add_line(
"def change_language(new_lang: Language | str) -> None: global default_lang; default_lang = new_lang"
)
else:
self.script.add_line(f"languages = {list(self.data.keys())}")
self.script.add_line(f"default_lang='{self.primary}'")
self.script.add_line("def change_language(new_lang): global default_lang; default_lang = new_lang")
self.script.add_line(
"def change_language(new_lang): global default_lang; default_lang = new_lang"
)
self.primary_data = self.data[self.primary]
for key in self.primary_data:
args = find_args(self.primary_data[key])
self.script.add_line(f"def {process_name(key)}({convert_args(','.join([*args, "lang:str|None=None" if self.uses_typing else "lang"]), args, "none")}):")
self.script.add_line(
f"def {process_name(key)}({convert_args(','.join([*args, "lang:str|None=None" if self.uses_typing else "lang"]), args, "none")}):"
)
if self.generate_comments:
self.script.add_line('"""', 1)
self.script.add_line("### Locales", 1)
for language in self.data:
self.script.add_line(f'- {language.capitalize()}: **{self.data[language].get(key, self.primary_data[key])}**', 1)
self.script.add_line(
f"- {language.capitalize()}: **{self.data[language].get(key, self.primary_data[key])}**",
1,
)
self.script.add_line('"""', 1)
self.script.add_line("if not lang: lang=default_lang", 1)
for language in self.data:
@ -174,18 +195,31 @@ class GenerateScript:
for arg in args:
formatted_map += f'"{convert_args(arg, args, "none")}": {convert_args(arg, args, "none")},'
formatted_map = formatted_map[:-1] + "}"
self.script.add_line(f"""if lang == '{language}': return {convert_args(json.dumps(
self.script.add_line(
f"""if lang == '{language}': return {convert_args(json.dumps(
self.data[language].get(key,self.primary_data[key]),
ensure_ascii=False
), args)}{f'.format_map({formatted_map})' if len(args) > 0 else ''}""", 1)
), args)}{f'.format_map({formatted_map})' if len(args) > 0 else ''}""",
1,
)
self.script.add_line("else: raise ValueError(f'Invalid language {lang}')", 1)
self.script.add_line(
"else: raise ValueError(f'Invalid language {lang}')", 1
)
with open(self.output, "w", encoding="UTF-8") as f:
f.write(self.script.script)
def build_result(primary_lang: str, locale_dir: str, types: bool, output_path: str, generate_comments: bool = True):
def build_result(
primary_lang: str,
locale_dir: str,
types: bool,
output_path: str,
generate_comments: bool = True,
):
start = time.time()
lc = LanguageCollector(locale_dir)
GenerateScript(primary_lang, lc.languages, types, output_path, generate_comments).create()
logger.info(f"Done in {time.time() - start}s")
GenerateScript(
primary_lang, lc.languages, types, output_path, generate_comments
).create()
logger.info(f"Done in {time.time() - start}s")

View file

@ -2,6 +2,8 @@
# This file was automatically created from localization JSON files.
# DO NOT EDIT THIS FILE DIRECTLY. If you want to edit a translation, please use the language's JSON file.
#fmt: off
from typing import Literal, List
Language=Literal['en', 'es', 'fi', 'fr', 'it']
languages: List[Language] = ['en', 'es', 'fi', 'fr', 'it']

View file

@ -1,25 +1,28 @@
import logging
from modules.globalvars import *
class GooberFormatter(logging.Formatter):
def __init__(self, colors: bool = True): # Disable colors for TXT output
def __init__(self, colors: bool = True): # Disable colors for TXT output
self.colors = colors
self._format = f"[ %(levelname)-8s ]: %(message)s {DEBUG} [%(asctime)s.%(msecs)03d] (%(filename)s:%(funcName)s) {RESET}"
self.FORMATS = {
logging.DEBUG: DEBUG + self._format + RESET,
logging.INFO: self._format.replace("%(levelname)-8s", f"{GREEN}%(levelname)-8s{RESET}"),
logging.INFO: self._format.replace(
"%(levelname)-8s", f"{GREEN}%(levelname)-8s{RESET}"
),
logging.WARNING: YELLOW + self._format + RESET,
logging.ERROR: RED + self._format + RESET,
logging.CRITICAL: PURPLE + self._format + RESET
logging.CRITICAL: PURPLE + self._format + RESET,
}
def format(self, record: logging.LogRecord):
if self.colors:
log_fmt = self.FORMATS.get(record.levelno) # Add colors
log_fmt = self.FORMATS.get(record.levelno) # Add colors
else:
log_fmt = self._format # Just use the default format
log_fmt = self._format # Just use the default format
formatter = logging.Formatter(log_fmt, datefmt="%m/%d/%y %H:%M:%S")
return formatter.format(record)

View file

@ -6,11 +6,14 @@ from modules.globalvars import *
import logging
import modules.keys as k
from modules.settings import Settings as SettingsManager
settings_manager = SettingsManager()
settings = settings_manager.settings
logger = logging.getLogger("goober")
# Get file size and line count for a given file path
def get_file_info(file_path):
try:
@ -21,6 +24,7 @@ def get_file_info(file_path):
except Exception as e:
return {"error": str(e)}
# Load memory data from file, or use default dataset if not loaded yet
def load_memory():
data = []
@ -34,36 +38,42 @@ def load_memory():
return data
# Save memory data to MEMORY_FILE
def save_memory(memory):
with open(settings["bot"]["active_memory"], "w") as f:
json.dump(memory, f, indent=4)
def train_markov_model(memory, additional_data=None):
if not memory:
return None
filtered_memory = [line for line in memory if isinstance(line, str)]
if additional_data:
filtered_memory.extend(line for line in additional_data if isinstance(line, str))
filtered_memory.extend(
line for line in additional_data if isinstance(line, str)
)
if not filtered_memory:
return None
text = "\n".join(filtered_memory)
model = markovify.NewlineText(text, state_size=2)
return model
# Save the Markov model to a pickle file
def save_markov_model(model, filename='markov_model.pkl'):
with open(filename, 'wb') as f:
def save_markov_model(model, filename="markov_model.pkl"):
with open(filename, "wb") as f:
pickle.dump(model, f)
logger.info(f"Markov model saved to {filename}.")
# Load the Markov model from a pickle file
def load_markov_model(filename='markov_model.pkl'):
def load_markov_model(filename="markov_model.pkl"):
try:
with open(filename, 'rb') as f:
with open(filename, "rb") as f:
model = pickle.load(f)
logger.info(f"{k.model_loaded()} {filename}.{RESET}")
return model
except FileNotFoundError:
logger.error(f"{filename} {k.not_found()}{RESET}")
return None
return None

37
modules/permission.py Normal file
View file

@ -0,0 +1,37 @@
from functools import wraps
import discord
import discord.ext
import discord.ext.commands
from modules.settings import Settings as SettingsManager
import logging
logger = logging.getLogger("goober")
settings_manager = SettingsManager()
settings = settings_manager.settings
class PermissionError(Exception):
pass
def requires_admin():
async def wrapper(ctx: discord.ext.commands.Context):
if ctx.author.id not in settings["bot"]["owner_ids"]:
await ctx.send(
"You don't have the necessary permissions to run this command!"
)
return False
command = ctx.command
if not command:
logger.info(f"Unknown command ran {ctx.message}")
else:
logger.info(
f'Command {settings["bot"]["prefix"]}{command.name} @{ctx.author.name}'
)
return True
return discord.ext.commands.check(wrapper)

View file

@ -12,6 +12,7 @@ import importlib.metadata
import logging
import modules.keys as k
from modules.settings import Settings as SettingsManager
settings_manager = SettingsManager()
settings = settings_manager.settings
@ -27,68 +28,75 @@ except ImportError:
psutilavaliable = False
logger.error(k.missing_requests_psutil())
def check_for_model():
if is_package("en_core_web_sm"):
logger.info("Model is installed.")
else:
logger.info("Model is not installed.")
def iscloned():
if os.path.exists(".git"):
return True
else:
logger.error(f"{k.not_cloned()}")
logger.error(f"{k.not_cloned()}")
sys.exit(1)
def get_stdlib_modules():
stdlib_path = pathlib.Path(sysconfig.get_paths()['stdlib'])
stdlib_path = pathlib.Path(sysconfig.get_paths()["stdlib"])
modules = set()
if hasattr(sys, 'builtin_module_names'):
if hasattr(sys, "builtin_module_names"):
modules.update(sys.builtin_module_names)
for file in stdlib_path.glob('*.py'):
if file.stem != '__init__':
for file in stdlib_path.glob("*.py"):
if file.stem != "__init__":
modules.add(file.stem)
for folder in stdlib_path.iterdir():
if folder.is_dir() and (folder / '__init__.py').exists():
if folder.is_dir() and (folder / "__init__.py").exists():
modules.add(folder.name)
for file in stdlib_path.glob('*.*'):
if file.suffix in ('.so', '.pyd'):
for file in stdlib_path.glob("*.*"):
if file.suffix in (".so", ".pyd"):
modules.add(file.stem)
return modules
def check_requirements():
STD_LIB_MODULES = get_stdlib_modules()
PACKAGE_ALIASES = {
"discord": "discord.py",
"better_profanity": "better-profanity",
"dotenv": "python-dotenv",
"pil": "pillow"
"pil": "pillow",
}
parent_dir = os.path.dirname(os.path.abspath(__file__))
requirements_path = os.path.abspath(os.path.join(parent_dir, '..', 'requirements.txt'))
requirements_path = os.path.abspath(
os.path.join(parent_dir, "..", "requirements.txt")
)
if not os.path.exists(requirements_path):
logger.error(f"{k.requirements_not_found(path=requirements_path)}")
return
with open(requirements_path, 'r') as f:
with open(requirements_path, "r") as f:
lines = f.readlines()
requirements = set()
for line in lines:
line = line.strip()
if line and not line.startswith('#'):
base_pkg = line.split('==')[0].lower()
if line and not line.startswith("#"):
base_pkg = line.split("==")[0].lower()
aliased_pkg = PACKAGE_ALIASES.get(base_pkg, base_pkg)
requirements.add(aliased_pkg)
installed_packages = {dist.metadata['Name'].lower() for dist in importlib.metadata.distributions()}
installed_packages = {
dist.metadata["Name"].lower() for dist in importlib.metadata.distributions()
}
missing = []
for req in sorted(requirements):
if req in STD_LIB_MODULES or req == 'modules':
if req in STD_LIB_MODULES or req == "modules":
print(k.std_lib_local_skipped(package=req))
continue
@ -108,6 +116,7 @@ def check_requirements():
else:
logger.info(k.all_requirements_satisfied())
def check_latency():
host = "1.1.1.1"
system = platform.system()
@ -126,10 +135,7 @@ def check_latency():
try:
result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
if result.returncode == 0:
@ -147,25 +153,37 @@ def check_latency():
except Exception as e:
logger.error(k.error_running_ping(error=e))
def check_memory():
if psutilavaliable == False:
return
try:
memory_info = psutil.virtual_memory() # type: ignore
total_memory = memory_info.total / (1024 ** 3)
used_memory = memory_info.used / (1024 ** 3)
free_memory = memory_info.available / (1024 ** 3)
total_memory = memory_info.total / (1024**3)
used_memory = memory_info.used / (1024**3)
free_memory = memory_info.available / (1024**3)
logger.info(k.memory_usage(used=used_memory, total=total_memory, percent=(used_memory / total_memory) * 100))
logger.info(
k.memory_usage(
used=used_memory,
total=total_memory,
percent=(used_memory / total_memory) * 100,
)
)
if used_memory > total_memory * 0.9:
print(f"{YELLOW}{k.memory_above_90(percent=(used_memory / total_memory) * 100)}{RESET}")
print(
f"{YELLOW}{k.memory_above_90(percent=(used_memory / total_memory) * 100)}{RESET}"
)
logger.info(k.total_memory(total=total_memory))
logger.info(k.used_memory(used=used_memory))
if free_memory < 1:
logger.warning(f"{k.low_free_memory(free=free_memory)}")
sys.exit(1)
except ImportError:
logger.error(k.psutil_not_installed()) # todo: translate this into italian and put it in the translations "psutil is not installed. Memory check skipped."
logger.error(
k.psutil_not_installed()
) # todo: translate this into italian and put it in the translations "psutil is not installed. Memory check skipped."
def check_cpu():
if psutilavaliable == False:
@ -180,13 +198,18 @@ def check_cpu():
logger.error(k.really_high_cpu())
sys.exit(1)
def check_memoryjson():
try:
logger.info(k.memory_file(size=os.path.getsize(settings["bot"]["active_memory"]) / (1024 ** 2)))
logger.info(
k.memory_file(
size=os.path.getsize(settings["bot"]["active_memory"]) / (1024**2)
)
)
if os.path.getsize(settings["bot"]["active_memory"]) > 1_073_741_824:
logger.warning(f"{k.memory_file_large()}")
try:
with open(settings["bot"]["active_memory"], 'r', encoding='utf-8') as f:
with open(settings["bot"]["active_memory"], "r", encoding="utf-8") as f:
json.load(f)
except json.JSONDecodeError as e:
logger.error(f"{k.memory_file_corrupted(error=e)}")
@ -199,9 +222,11 @@ def check_memoryjson():
except FileNotFoundError:
logger.info(f"{k.memory_file_not_found()}")
def presskey2skip(timeout):
if os.name == 'nt':
if os.name == "nt":
import msvcrt
start_time = time.time()
while True:
if msvcrt.kbhit():
@ -230,12 +255,16 @@ def presskey2skip(timeout):
time.sleep(0.1)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
beta = beta
def start_checks():
if settings["disable_checks"]:
logger.warning(f"{k.checks_disabled()}")
return
logger.info(k.running_prestart_checks())
check_for_model()
iscloned()
@ -250,12 +279,14 @@ def start_checks():
logger.warning(f"{k.env_file_not_found()}")
sys.exit(1)
if beta == True:
logger.warning(f"this build isnt finished yet, some things might not work as expected")
logger.warning(
f"this build isnt finished yet, some things might not work as expected"
)
else:
pass
logger.info(k.continuing_in_seconds(seconds=5))
presskey2skip(timeout=5)
os.system('cls' if os.name == 'nt' else 'clear')
os.system("cls" if os.name == "nt" else "clear")
with open(settings ["splash_text_loc"], "r") as f:
with open(settings["splash_text_loc"], "r") as f:
print("".join(f.readlines()))

View file

@ -9,6 +9,7 @@ import discord
import modules.keys as k
import logging
logger = logging.getLogger("goober")
@ -17,18 +18,20 @@ def check_resources():
nlp = spacy.load("en_core_web_sm")
except OSError:
logging.critical(k.spacy_model_not_found())
spacy.cli.download("en_core_web_sm") # type: ignore
spacy.cli.download("en_core_web_sm") # type: ignore
nlp = spacy.load("en_core_web_sm")
if "spacytextblob" not in nlp.pipe_names:
nlp.add_pipe("spacytextblob")
logger.info(k.spacy_initialized())
check_resources()
nlp = spacy.load("en_core_web_sm")
nlp.add_pipe("spacytextblob")
Doc.set_extension("polarity", getter=lambda doc: doc._.blob.polarity)
def is_positive(sentence):
doc = nlp(sentence)
sentiment_score = doc._.polarity # from spacytextblob
@ -36,18 +39,22 @@ def is_positive(sentence):
debug_message = f"{k.sentence_positivity()} {sentiment_score}{RESET}"
logger.debug(debug_message)
return sentiment_score > 0.6 # had to raise the bar because it kept saying "death to jews" was fine and it kept reacting to them
return (
sentiment_score > 0.6
) # had to raise the bar because it kept saying "death to jews" was fine and it kept reacting to them
async def send_message(ctx: discord.ext.commands.Context,
message: str | None = None,
embed: discord.Embed | None = None,
file: discord.File | None = None,
edit: bool = False,
message_reference: discord.Message | None = None
) -> discord.Message | None:
async def send_message(
ctx: discord.ext.commands.Context,
message: str | None = None,
embed: discord.Embed | None = None,
file: discord.File | None = None,
edit: bool = False,
message_reference: discord.Message | None = None,
) -> discord.Message | None:
sent_message: discord.Message | None = None
if edit and message_reference:
try:
await message_reference.edit(content=message, embed=embed)
@ -62,21 +69,25 @@ async def send_message(ctx: discord.ext.commands.Context,
sent_message = await ctx.send(file=file, content=message)
else:
sent_message = await ctx.send(content=message)
return sent_message
def append_mentions_to_18digit_integer(message):
pattern = r'\b\d{18}\b'
pattern = r"\b\d{18}\b"
return re.sub(pattern, lambda match: "", message)
def preprocess_message(message):
message = append_mentions_to_18digit_integer(message)
doc = nlp(message)
tokens = [token.text for token in doc if token.is_alpha or token.is_digit]
return " ".join(tokens)
def improve_sentence_coherence(sentence):
return re.sub(r'\bi\b', 'I', sentence)
return re.sub(r"\bi\b", "I", sentence)
def rephrase_for_coherence(sentence):
words = sentence.split()

View file

@ -2,17 +2,19 @@ import json
import os
from typing import List, Mapping, Any, TypedDict
from modules.keys import Language
import logging
import logging
import copy
logger = logging.getLogger("goober")
class MiscBotOptions(TypedDict):
ping_line: str
active_song: str
positive_gifs: List[str]
block_profanity: bool
class BotSettings(TypedDict):
prefix: str
owner_ids: List[int]
@ -24,6 +26,7 @@ class BotSettings(TypedDict):
enabled_cogs: List[str]
active_memory: str
class SettingsType(TypedDict):
bot: BotSettings
locale: Language
@ -32,20 +35,21 @@ class SettingsType(TypedDict):
disable_checks: bool
splash_text_loc: str
class Settings:
def __init__(self) -> None:
self.path: str = os.path.join(".", "settings", "settings.json")
if not os.path.exists(self.path):
raise ValueError("settings.json file does not exist!")
self.settings: SettingsType
self.original_settings: SettingsType
with open(self.path, "r") as f:
self.__kv_store: dict = json.load(f)
self.settings = SettingsType(self.__kv_store) # type: ignore
self.settings = SettingsType(self.__kv_store) # type: ignore
self.original_settings = copy.deepcopy(self.settings)
def commit(self) -> None:
@ -53,6 +57,6 @@ class Settings:
json.dump(self.settings, f, indent=4)
self.original_settings = self.settings
def discard(self) -> None:
self.settings = self.original_settings
self.settings = self.original_settings

View file

@ -10,14 +10,15 @@ settings_manager = SettingsManager()
settings = settings_manager.settings
logger = logging.getLogger("goober")
def handle_exception(exc_type, exc_value, exc_traceback, *, context=None):
os.system('cls' if os.name == 'nt' else 'clear')
os.system("cls" if os.name == "nt" else "clear")
if issubclass(exc_type, KeyboardInterrupt):
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
with open(settings['splash_text_loc'], "r") as f:
with open(settings["splash_text_loc"], "r") as f:
print("".join(f.readlines()))
print(f"{RED}=====BEGINNING OF TRACEBACK====={RESET}")
@ -25,9 +26,5 @@ def handle_exception(exc_type, exc_value, exc_traceback, *, context=None):
print(f"{RED}========END OF TRACEBACK========{RESET}")
print(f"{RED}{k.unhandled_exception()}{RESET}")
if context:
print(f"{RED}Context: {context}{RESET}")

View file

@ -1,113 +0,0 @@
import requests
import subprocess
import sys
import logging
import json
import time
import random
import modules.keys as k
from modules.globalvars import *
from modules.settings import Settings as SettingsManager
settings_manager = SettingsManager()
settings = settings_manager.settings
logger = logging.getLogger("goober")
launched = False
# Run a shell command and return its output
def run_cmd(cmd):
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
return result.stdout.strip()
# Check if the remote branch is ahead of the local branch
def is_remote_ahead(branch='main', remote='origin'):
run_cmd(f'git fetch {remote}')
count = run_cmd(f'git rev-list --count HEAD..{remote}/{branch}')
return int(count) > 0
# Automatically update the local repository if the remote is ahead
def auto_update(branch='main', remote='origin'):
if launched == True:
print(k.already_started())
return
if settings["auto_update"] != "True":
pass # Auto-update is disabled
if is_remote_ahead(branch, remote):
logger.info(k.remote_ahead(remote, branch))
pull_result = run_cmd(f'git pull {remote} {branch}')
logger.info(pull_result)
logger.info(k.please_restart())
sys.exit(0)
else:
logger.info(k.local_ahead(remote, branch))
def get_latest_version_info():
try:
unique_suffix = f"{int(time.time())}_{random.randint(0, 9999)}"
url = f"{UPDATE_URL}?_={unique_suffix}"
curl_cmd = [
"curl",
"-s",
"-H", "Cache-Control: no-cache",
"-H", "Pragma: no-cache",
url
]
result = subprocess.run(curl_cmd, capture_output=True, text=True, timeout=5)
content = result.stdout
if result.returncode != 0:
logger.error(f"curl failed with return code {result.returncode}")
return None
try:
data = json.loads(content)
return data
except json.JSONDecodeError:
logger.error("JSON decode failed")
logger.error(content[:500])
return None
except Exception as e:
logger.error(f"Exception in get_latest_version_info: {e}")
return None
# Check if an update is available and perform update if needed
def check_for_update():
global latest_version, local_version, launched
latest_version_info = get_latest_version_info()
if not latest_version_info:
logger.error(f"{k.fetch_update_fail()}")
return None
latest_version = latest_version_info.get("version")
os.environ['gooberlatest_version'] = latest_version
download_url = latest_version_info.get("download_url")
if not latest_version or not download_url:
logger.error(k.invalid_server())
return None
# Check if local_version is valid
if local_version == "0.0.0" or None:
logger.error(k.cant_find_local_version())
return
# Compare local and latest versions
if local_version < latest_version:
logger.warning(k.new_version(latest_version=latest_version, local_version=local_version))
logger.warning(k.changelog(VERSION_URL=VERSION_URL))
auto_update()
elif beta == True:
logger.warning(f"You are running an \"unstable\" version of Goober, do not expect it to work properly.\nVersion {local_version}\nServer: {latest_version}{RESET}")
elif local_version > latest_version:
logger.warning(f"{k.modification_warning()}")
elif local_version == latest_version:
logger.info(f"{k.latest_version()} {local_version}")
logger.info(f"{k.latest_version2(VERSION_URL=VERSION_URL)}\n\n")
launched = True
return latest_version

View file

@ -1,207 +0,0 @@
# If you're seeing this after cloning the Goober repo, note that this is a standalone module for translations.
# While it's used by Goober Core, it lives in its own repository and should not be modified here.
# For updates or contributions, visit: https://github.com/gooberinc/volta
# Also, Note to self: Add more comments it needs more love
import os
import locale
import json
import pathlib
import threading
import time
from dotenv import load_dotenv
ANSI = "\033["
RED = f"{ANSI}31m"
GREEN = f"{ANSI}32m"
YELLOW = f"{ANSI}33m"
DEBUG = f"{ANSI}1;30m"
RESET = f"{ANSI}0m"
LOCALE = os.getenv("LOCALE")
module_dir = pathlib.Path(__file__).parent.parent
working_dir = pathlib.Path.cwd()
EXCLUDE_DIRS = {'.git', '__pycache__'}
locales_dirs = []
ENGLISH_MISSING = False
FALLBACK_LOCALE = "en"
if os.getenv("fallback_locale"):
FALLBACK_LOCALE = os.getenv("fallback_locale")
def find_locales_dirs(base_path):
found = []
for root, dirs, files in os.walk(base_path):
dirs[:] = [d for d in dirs if d not in EXCLUDE_DIRS]
if 'locales' in dirs:
locales_path = pathlib.Path(root) / 'locales'
found.append(locales_path)
dirs.remove('locales')
return found
def find_dotenv(start_path: pathlib.Path) -> pathlib.Path | None:
current = start_path.resolve()
while current != current.parent:
candidate = current / ".env"
if candidate.exists():
return candidate
current = current.parent
return None
env_path = find_dotenv(pathlib.Path(__file__).parent)
if env_path:
load_dotenv(dotenv_path=env_path)
print(f"[VOLTA] {GREEN}Loaded .env from {env_path}{RESET}")
else:
print(f"[VOLTA] {YELLOW}No .env file found from {__file__} upwards.{RESET}")
locales_dirs.extend(find_locales_dirs(module_dir))
if working_dir != module_dir:
locales_dirs.extend(find_locales_dirs(working_dir))
translations = {}
_file_mod_times = {}
import locale
import platform
import os
import sys
def get_system_locale():
system = platform.system() # fallback incase locale isnt set
if system == "Windows":
lang, _ = locale.getdefaultlocale()
return lang or os.getenv("LANG")
elif system == "Darwin":
try:
import subprocess
result = subprocess.run(
["defaults", "read", "-g", "AppleLocale"],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True
)
return result.stdout.strip() or locale.getdefaultlocale()[0]
except Exception:
return locale.getdefaultlocale()[0]
elif system == "Linux":
return (
os.getenv("LC_ALL") or
os.getenv("LANG") or
locale.getdefaultlocale()[0]
)
return locale.getdefaultlocale()[0]
def load_translations():
global translations, _file_mod_times
translations.clear()
_file_mod_times.clear()
for locales_dir in locales_dirs:
for filename in os.listdir(locales_dir):
if filename.endswith(".json"):
lang_code = filename[:-5]
file_path = locales_dir / filename
try:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
if lang_code not in translations:
translations[lang_code] = {}
translations[lang_code].update(data)
_file_mod_times[(lang_code, file_path)] = file_path.stat().st_mtime
except Exception as e:
print(f"[VOLTA] {RED}Failed loading {file_path}: {e}{RESET}")
def reload_if_changed():
while True:
for (lang_code, file_path), last_mtime in list(_file_mod_times.items()):
try:
current_mtime = file_path.stat().st_mtime
if current_mtime != last_mtime:
print(f"[VOLTA] {RED}Translation file changed: {file_path}, reloading...{RESET}")
load_translations()
break
except FileNotFoundError:
print(f"[VOLTA] {RED}Translation file removed: {file_path}{RESET}")
_file_mod_times.pop((lang_code, file_path), None)
if lang_code in translations:
translations.pop(lang_code, None)
def set_language(lang: str):
global LOCALE, ENGLISH_MISSING
if not LOCALE:
LOCALE = get_system_locale()
elif lang in translations:
LOCALE = lang
else:
print(f"[VOLTA] {RED}Language '{lang}' not found, defaulting to 'en'{RESET}")
if FALLBACK_LOCALE in translations:
LOCALE = FALLBACK_LOCALE
else:
print(f"[VOLTA] {RED}The fallback translations cannot be found! No fallback available.{RESET}")
ENGLISH_MISSING = True
def check_missing_translations():
global LOCALE, ENGLISH_MISSING
load_translations()
if FALLBACK_LOCALE not in translations:
print(f"[VOLTA] {RED}Fallback translations ({FALLBACK_LOCALE}.json) missing from assets/locales.{RESET}")
ENGLISH_MISSING = True
return
if LOCALE == "en":
print("[VOLTA] Locale is English, skipping missing key check.")
return
en_keys = set(translations.get("en", {}).keys())
locale_keys = set(translations.get(LOCALE, {}).keys())
missing_keys = en_keys - locale_keys
total_keys = len(en_keys)
missing_count = len(missing_keys)
if missing_count > 0:
percent_missing = (missing_count / total_keys) * 100
if percent_missing == 100:
print(f"[VOLTA] {YELLOW}Warning: All keys are missing in locale '{LOCALE}'! Defaulting back to {FALLBACK_LOCALE}{RESET}")
set_language(FALLBACK_LOCALE)
elif percent_missing > 0:
print(f"[VOLTA] {YELLOW}Warning: {missing_count}/{total_keys} keys missing in locale '{LOCALE}' ({percent_missing:.1f}%)!{RESET}")
for key in sorted(missing_keys):
print(f" - {key}")
time.sleep(2)
else:
print(f"[VOLTA] All translation keys present for locale: {LOCALE}")
printedsystemfallback = False
def get_translation(lang: str, key: str):
global printedsystemfallback
if ENGLISH_MISSING:
return f"[VOLTA] {RED}No fallback available!{RESET}"
fallback_translations = translations.get(FALLBACK_LOCALE, {})
sys_lang = get_system_locale().split("_")[0] if get_system_locale() else None
sys_translations = translations.get(sys_lang, {}) if sys_lang else {}
lang_translations = translations.get(lang, {})
if key in lang_translations:
return lang_translations[key]
if sys_lang and sys_lang != lang and key in sys_translations:
if not printedsystemfallback:
print(f"[VOLTA] {YELLOW}Falling back to system language {sys_lang}!{RESET}")
printedsystemfallback = True
return sys_translations[key]
if key in fallback_translations:
print(f"[VOLTA] {YELLOW}Missing key: '{key}' in '{lang}', falling back to fallback locale '{FALLBACK_LOCALE}'{RESET}")
return fallback_translations[key]
return f"[VOLTA] {YELLOW}Missing key: '{key}' in all locales!{RESET}"
def _(key: str) -> str:
return get_translation(LOCALE, key)
load_translations()
watchdog_thread = threading.Thread(target=reload_if_changed, daemon=True)
watchdog_thread.start()
if __name__ == '__main__':
print("Volta should not be run directly! Please use it as a module..")