diff --git a/README.md b/README.md index c1e9957..a4fa2ce 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ knockoff of genai basically :p - +THIS!! IS THE ACTUAL REPO!!!! NOT THE OTHER ONE!!! THIS ONE!!! Special thanks to [Charlie's Computers](https://github.com/PowerPCFan) for being the only one I know of that's hosting Goober 24/7 diff --git a/assets/cogs/README.md b/assets/cogs/README.md deleted file mode 100644 index 8680835..0000000 --- a/assets/cogs/README.md +++ /dev/null @@ -1,25 +0,0 @@ -# goobers custom commands -[Hello World!](https://github.com/WhatDidYouExpect/goober/blob/main/cogs/hello.py) -by expect - -[WhoAmI (lists username and nickname)](https://github.com/WhatDidYouExpect/goober/blob/main/cogs/whoami.py) -by PowerPCFan - -[Cog Manager](https://github.com/WhatDidYouExpect/goober/blob/main/cogs/cogmanager.py) -by expect - -[Web Scraper](https://raw.githubusercontent.com/WhatDidYouExpect/goober/refs/heads/main/cogs/webscraper.py) -by expect (requires goober version 0.11.7.2 or higher) - -[Status Changer](https://raw.githubusercontent.com/WhatDidYouExpect/goober/refs/heads/main/cogs/songchanger.py) -by expect (requires goober version 0.11.8 or higher) - -[Status Changer](https://raw.githubusercontent.com/WhatDidYouExpect/goober/refs/heads/main/cogs/songchanger.py) -by expect (requires goober version 0.11.8 or higher) - -[webUI](https://raw.githubusercontent.com/WhatDidYouExpect/goober/refs/heads/main/cogs/webserver.py) -by expect (requires goober version 0.11.8 or higher) - -[LastFM](https://raw.githubusercontent.com/WhatDidYouExpect/goober/refs/heads/main/cogs/webserver.py) -by expect (no idea what version it needs i've only tried it on 1.0.3) -- you have to add LASTFM_USERNAME and LASTFM_API_KEY to your .env \ No newline at end of file diff --git a/botminimal.py b/botminimal.py deleted file mode 100644 index 859662f..0000000 --- a/botminimal.py +++ /dev/null @@ -1,245 +0,0 @@ -import discord -from discord.ext import commands, tasks -import json -import markovify -import nltk -from nltk.tokenize import word_tokenize -import random -import os -import time -import re -from dotenv import load_dotenv -load_dotenv() -# download NLTK data files -nltk.download('punkt') -MEMORY_FILE = "memory.json" -MEMORY_LOADED_FILE = "MEMORY_LOADED" - -def load_memory(): - data = [] - - # Try to load data from MEMORY_FILE - try: - with open(MEMORY_FILE, "r") as f: - data = json.load(f) - except FileNotFoundError: - pass - - return data - -# Save memory data to MEMORY_FILE -def save_memory(memory): - with open(MEMORY_FILE, "w") as f: - json.dump(memory, f, indent=4) - -def train_markov_model(memory, additional_data=None): - if not memory: - return None - filtered_memory = [line for line in memory if isinstance(line, str)] - if additional_data: - filtered_memory.extend(line for line in additional_data if isinstance(line, str)) - if not filtered_memory: - return None - text = "\n".join(filtered_memory) - model = markovify.NewlineText(text, state_size=2) - return model - -#this doesnt work and im extremely pissed and mad -def append_mentions_to_18digit_integer(message): - pattern = r'\b\d{18}\b' - return re.sub(pattern, lambda match: f"<@{match.group(0)}>", message) - -def preprocess_message(message): - message = append_mentions_to_18digit_integer(message) - tokens = word_tokenize(message) - tokens = [token for token in tokens if token.isalnum()] - return " ".join(tokens) - - -intents = discord.Intents.default() -intents.messages = True -intents.message_content = True -bot = commands.Bot(command_prefix="g!", intents=intents) -memory = load_memory() -markov_model = train_markov_model(memory) - -generated_sentences = set() -used_words = set() - -@bot.event -async def on_ready(): - print(f"Logged in as {bot.user}") - post_message.start() - -positive_keywords = ["happy", "good", "great", "amazing", "awesome", "joy", "love", "fantastic", "positive", "cheerful", "victory", "favorite", "lmao", "lol", "xd", "XD", "xD", "Xd"] - -positive_gifs = [ - "https://tenor.com/view/chill-guy-my-new-character-gif-2777893510283028272", - "https://tenor.com/view/goodnight-goodnight-friends-weezer-weezer-goodnight-gif-7322052181075806988" -] - -def is_positive(sentence): - sentence_lower = sentence.lower() - return any(keyword in sentence_lower for keyword in positive_keywords) - -@bot.command() -async def ask(ctx): - await ctx.send("Command undergoing fixes!") - #not really lol - -@bot.command() -async def talk(ctx): - if markov_model: - response = None - for _ in range(10): # im going to shit my pants 10 times to get a coherent sentence - response = markov_model.make_sentence(tries=100) - if response and response not in generated_sentences: - # preprocess shit for grammer - response = improve_sentence_coherence(response) - generated_sentences.add(response) - break - - if response: - async with ctx.typing(): - cleaned_response = re.sub(r'[^\w\s]', '', response) - cleaned_response = cleaned_response.lower() - coherent_response = rephrase_for_coherence(cleaned_response) - if random.random() < 0.9: - if is_positive(coherent_response): - gif_url = random.choice(positive_gifs) - combined_message = f"{coherent_response}\n[jif]({gif_url})" - await ctx.send(combined_message) - else: - await ctx.send(coherent_response) - else: - await ctx.send(coherent_response) - else: - await ctx.send("I have nothing to say right now!") - else: - await ctx.send("I need to learn more from messages before I can talk.") - -def improve_sentence_coherence(sentence): - - sentence = sentence.replace(" i ", " I ") - return sentence - -def rephrase_for_coherence(sentence): - - words = sentence.split() - - coherent_sentence = " ".join(words) - return coherent_sentence - -bot.help_command = None - - -@bot.command() -async def help(ctx, *args): - - if args: - command_name = args[0] - command = bot.get_command(command_name) - - if command: - embed = discord.Embed( - title=f"Help: g!{command_name}", - description=f"**Description:** {command.help}", - color=discord.Color.blue() - ) - await ctx.send(embed=embed) - else: - await ctx.send(f"Command `{command_name}` not found.") - else: - - embed = discord.Embed( - title="Bot Help", - description="List of commands grouped by category.", - color=discord.Color.blue() - ) - - command_categories = { - "General": ["show_memory", "talk", "ask", "ping"], - "Debug": ["word_usage"] - } - - for category, commands_list in command_categories.items(): - commands_in_category = "\n".join([f"g!{command}" for command in commands_list]) - embed.add_field(name=category, value=commands_in_category, inline=False) - - await ctx.send(embed=embed) - -@bot.event -async def on_message(message): - global memory, markov_model, last_random_talk_time - - if message.author.bot: - return - - - if message.content.startswith(("g!talk", "g!show_memory", "g!help", "g!")): - await bot.process_commands(message) - return - - if message.content: - formatted_message = append_mentions_to_18digit_integer(message.content) - cleaned_message = preprocess_message(formatted_message) - if cleaned_message: - memory.append(cleaned_message) - save_memory(memory) - markov_model = train_markov_model(memory) - - # process any commands in the message - await bot.process_commands(message) - -@bot.command() -async def ping(ctx): - await ctx.defer() - #stolen from my expect bot very proud - latency = round(bot.latency * 1000) - - LOLembed = discord.Embed( - title="Pong!!", - description=( - f"The Beretta fires fast and won't make you feel any better!\n" - f"`Bot Latency: {latency}ms`\n" - ), - color=discord.Color.blue() - ) - LOLembed.set_footer(text=f"Requested by {ctx.author.name}", icon_url=ctx.author.avatar.url) - - await ctx.send(embed=LOLembed) # use ctx.send instead of respond because it has nothing to respond to and its not a slash command - -@bot.command() -async def show_memory(ctx): - memory = load_memory() - memory_text = json.dumps(memory, indent=4) - if len(memory_text) > 1024: - with open(MEMORY_FILE, "r") as f: - await ctx.send(" ", file=discord.File(f, MEMORY_FILE)) - else: - embed = discord.Embed(title="Memory Contents", description="The bot's memory.", color=discord.Color.blue()) - embed.add_field(name="Memory Data", value=f"```json\n{memory_text}\n```", inline=False) - await ctx.send(embed=embed) - -def improve_sentence_coherence(sentence): - sentence = sentence.replace(" i ", " I ") - return sentence - -@tasks.loop(minutes=60) -async def post_message(): - channel_id = 1296141985253691433 - channel = bot.get_channel(channel_id) - if channel and markov_model: - response = None - for _ in range(10): - response = markov_model.make_sentence(tries=100) - if response and response not in generated_sentences: - generated_sentences.add(response) - break - - if response: - await channel.send(response) - -# run the bot -TOKEN = os.getenv("DISCORDBOTTOKEN", "0") -bot.run(TOKEN) diff --git a/main.py b/main.py index 6ad7ca1..7a21e6c 100644 --- a/main.py +++ b/main.py @@ -179,12 +179,11 @@ async def talk(ctx: commands.Context, sentence_size: int = 5) -> None: await send_message(ctx, f"{(_('command_talk_insufficent_text'))}") return - response: Optional[str] = None + response = None for _ in range(20): if sentence_size == 1: - response = markov_model.make_short_sentence(max_chars=100, tries=100) - if response: - response = response.split()[0] + sentence = markov_model.make_short_sentence(max_chars=100, tries=100) + response = sentence.split()[0] if sentence else None else: response = markov_model.make_sentence(tries=100, max_words=sentence_size) @@ -193,20 +192,23 @@ async def talk(ctx: commands.Context, sentence_size: int = 5) -> None: response = improve_sentence_coherence(response) generated_sentences.add(response) break - - if response: - cleaned_response: str = re.sub(r'[^\w\s]', '', response).lower() - coherent_response: str = rephrase_for_coherence(cleaned_response) - if random.random() < 0.9 and is_positive(coherent_response): - gif_url: str = random.choice(positive_gifs) - combined_message: str = f"{coherent_response}\n[jif]({gif_url})" - else: - combined_message: str = coherent_response - logger.info(combined_message) - os.environ['gooberlatestgen'] = combined_message - await send_message(ctx, combined_message) else: await send_message(ctx, f"{(_('command_talk_generation_fail'))}") + return + + cleaned = re.sub(r'[^\w\s]', '', response).lower() + coherent = rephrase_for_coherence(cleaned) + + if random.random() < 0.9 and is_positive(coherent): + gif_url = random.choice(positive_gifs) + message = f"{coherent}\n[jif]({gif_url})" + else: + message = coherent + + logger.info(message) + os.environ['gooberlatestgen'] = message + await send_message(ctx, message) + @bot.hybrid_command(description=f"RAM") async def ramusage(ctx): @@ -216,24 +218,27 @@ async def ramusage(ctx): @bot.hybrid_command(description=f"{(_('command_desc_help'))}") async def impact(ctx: commands.Context, text: Optional[str] = None) -> None: - assets_folder: str = "assets/images" - temp_input: Optional[str] = None + assets_folder = "assets/images" def get_random_asset_image() -> Optional[str]: - files: List[str] = [f for f in os.listdir(assets_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))] - if not files: + images = [f for f in os.listdir(assets_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))] + if not images: return None - return os.path.join(assets_folder, random.choice(files)) + return os.path.join(assets_folder, random.choice(images)) + temp_input = None + input_path = None + + # Determine input image path if ctx.message.attachments: - attachment: discord.Attachment = ctx.message.attachments[0] + attachment = ctx.message.attachments[0] if attachment.content_type and attachment.content_type.startswith("image/"): - ext: str = os.path.splitext(attachment.filename)[1] + ext = os.path.splitext(attachment.filename)[1] temp_input = f"tempy{ext}" await attachment.save(temp_input) - input_path: str = temp_input + input_path = temp_input else: - fallback_image: Optional[str] = get_random_asset_image() + fallback_image = get_random_asset_image() if fallback_image is None: await ctx.reply(_('no_image_available')) return @@ -249,8 +254,8 @@ async def impact(ctx: commands.Context, text: Optional[str] = None) -> None: shutil.copy(fallback_image, temp_input) input_path = temp_input - output_path: Optional[str] = await gen_meme(input_path, custom_text=text) - + # Generate meme image with one-shot text generation + output_path = await gen_meme(input_path, custom_text=text) if output_path is None or not os.path.isfile(output_path): if temp_input and os.path.exists(temp_input): @@ -263,6 +268,7 @@ async def impact(ctx: commands.Context, text: Optional[str] = None) -> None: if temp_input and os.path.exists(temp_input): os.remove(temp_input) + bot.remove_command('help') # Command: Show help information @bot.hybrid_command(description=f"{(_('command_desc_help'))}") diff --git a/modules/globalvars.py b/modules/globalvars.py index f7dcda4..78b5094 100644 --- a/modules/globalvars.py +++ b/modules/globalvars.py @@ -50,10 +50,9 @@ MEMORY_LOADED_FILE = "MEMORY_LOADED" # is this still even used?? okay just check ALIVEPING = os.getenv("ALIVEPING") AUTOUPDATE = os.getenv("AUTOUPDATE") song = os.getenv("SONG") -arch = platform.machine() launched = False latest_version = "0.0.0" -local_version = "2.3.5" +local_version = "3.0.0" os.environ['gooberlocal_version'] = local_version REACT = os.getenv("REACT") if get_git_branch() != "main": diff --git a/modules/image.py b/modules/image.py index 9e9bc3c..7007e5b 100644 --- a/modules/image.py +++ b/modules/image.py @@ -38,76 +38,74 @@ async def gen_meme(input_image_path, sentence_size=5, max_attempts=10, custom_te if not markov_model or not os.path.isfile(input_image_path): return None + def generate_text(): + if custom_text: + return custom_text + + if sentence_size == 1: + candidate = markov_model.make_short_sentence(max_chars=100, tries=100) + if candidate: + candidate = candidate.split()[0] + return candidate + else: + candidate = markov_model.make_sentence(tries=100, max_words=sentence_size) + if candidate: + return improve_sentence_coherence(candidate) + print(candidate) + return None + + + def draw_centered_text(img, text): + draw = ImageDraw.Draw(img) + width, height = img.size + font_size = int(height / 10) + font = load_font(font_size) + + cleaned = re.sub(r'[^\w\s]', '', text).lower() + coherent = rephrase_for_coherence(cleaned).upper() + + bbox = draw.textbbox((0, 0), coherent, font=font) + text_width, text_height_px = bbox[2] - bbox[0], bbox[3] - bbox[1] + max_text_height = height // 4 + + if text_width <= width and text_height_px <= max_text_height: + draw_text_with_outline(draw, coherent, (width - text_width) / 2, 0, font) + img.save(input_image_path) + return True + + top_text, bottom_text = split_text_to_fit(coherent, font, width, draw) + top_bbox = draw.textbbox((0, 0), top_text, font=font) + bottom_bbox = draw.textbbox((0, 0), bottom_text, font=font) + + top_height = top_bbox[3] - top_bbox[1] + bottom_height = bottom_bbox[3] - bottom_bbox[1] + + if top_height <= max_text_height and bottom_height <= max_text_height: + draw_text_with_outline(draw, top_text, (width - (top_bbox[2] - top_bbox[0])) / 2, 0, font) + y_bottom = height - bottom_height - int(height * 0.04) + draw_text_with_outline(draw, bottom_text, (width - (bottom_bbox[2] - bottom_bbox[0])) / 2, y_bottom, font) + img.save(input_image_path) + return True + + return False + attempt = 0 while attempt < max_attempts: + response = generate_text() or "NO TEXT GENERATED" with Image.open(input_image_path).convert("RGBA") as img: - draw = ImageDraw.Draw(img) - width, height = img.size - - font_size = int(height / 10) - font = load_font(font_size) - - response = None - if custom_text: - response = custom_text - else: - for _ in range(20): - if sentence_size == 1: - candidate = markov_model.make_short_sentence(max_chars=100, tries=100) - if candidate: - candidate = candidate.split()[0] - else: - candidate = markov_model.make_sentence(tries=100, max_words=sentence_size) - - if candidate and candidate not in generated_sentences: - if sentence_size > 1: - candidate = improve_sentence_coherence(candidate) - generated_sentences.add(candidate) - response = candidate - break - - if not response: - response = "NO TEXT GENERATED" - - cleaned_response = re.sub(r'[^\w\s]', '', response).lower() - coherent_response = rephrase_for_coherence(cleaned_response).upper() - - bbox = draw.textbbox((0, 0), coherent_response, font=font) - text_width = bbox[2] - bbox[0] - text_height_px = bbox[3] - bbox[1] - max_text_height = height // 4 - - if text_width <= width and text_height_px <= max_text_height: - draw_text_with_outline(draw, coherent_response, (width - text_width) / 2, 0, font) - img.save(input_image_path) + if draw_centered_text(img, response): return input_image_path - else: - top_text, bottom_text = split_text_to_fit(coherent_response, font, width, draw) - - top_bbox = draw.textbbox((0, 0), top_text, font=font) - bottom_bbox = draw.textbbox((0, 0), bottom_text, font=font) - - top_height = top_bbox[3] - top_bbox[1] - bottom_height = bottom_bbox[3] - bottom_bbox[1] - - if top_height <= max_text_height and bottom_height <= max_text_height: - draw_text_with_outline(draw, top_text, (width - (top_bbox[2] - top_bbox[0])) / 2, 0, font) - y_bottom = height - bottom_height - int(height * 0.04) - draw_text_with_outline(draw, bottom_text, (width - (bottom_bbox[2] - bottom_bbox[0])) / 2, y_bottom, font) - img.save(input_image_path) - return input_image_path - attempt += 1 - with Image.open(input_image_path).convert("RGBA") as img: draw = ImageDraw.Draw(img) width, height = img.size font_size = int(height / 10) font = load_font(font_size) - truncated = coherent_response[:100] + truncated = (rephrase_for_coherence(re.sub(r'[^\w\s]', '', "NO TEXT GENERATED").lower()).upper())[:100] bbox = draw.textbbox((0, 0), truncated, font=font) text_width = bbox[2] - bbox[0] + draw_text_with_outline(draw, truncated, (width - text_width) / 2, 0, font) img.save(input_image_path) return input_image_path diff --git a/modules/prestartchecks.py b/modules/prestartchecks.py index 29e155f..d13e7bf 100644 --- a/modules/prestartchecks.py +++ b/modules/prestartchecks.py @@ -37,130 +37,112 @@ def iscloned(): sys.exit(1) def get_stdlib_modules(): - stdlib_path = pathlib.Path(sysconfig.get_paths()['stdlib']) - modules = set() - if hasattr(sys, 'builtin_module_names'): - modules.update(sys.builtin_module_names) - for file in stdlib_path.glob('*.py'): - if file.stem != '__init__': - modules.add(file.stem) - for folder in stdlib_path.iterdir(): - if folder.is_dir() and (folder / '__init__.py').exists(): - modules.add(folder.name) - for file in stdlib_path.glob('*.*'): - if file.suffix in ('.so', '.pyd'): - modules.add(file.stem) + stdlib = pathlib.Path(sysconfig.get_paths()['stdlib']) + modules = set(sys.builtin_module_names) + + modules.update( + f.stem for f in stdlib.glob('*.py') if f.stem != '__init__' + ) + modules.update( + d.name for d in stdlib.iterdir() if (d / '__init__.py').exists() + ) + modules.update( + f.stem for f in stdlib.glob('*') if f.suffix in ('.so', '.pyd') + ) return modules def check_requirements(): - STD_LIB_MODULES = get_stdlib_modules() - PACKAGE_ALIASES = { + stdlib = get_stdlib_modules() + aliases = { "discord": "discord.py", "better_profanity": "better-profanity", "dotenv": "python-dotenv", "pil": "pillow" } - parent_dir = os.path.dirname(os.path.abspath(__file__)) - requirements_path = os.path.abspath(os.path.join(parent_dir, '..', 'requirements.txt')) - - if not os.path.exists(requirements_path): - logger.error(f"{(_('requirements_not_found')).format(path=requirements_path)}") + req_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'requirements.txt')) + if not os.path.exists(req_path): + logger.error(_('requirements_not_found').format(path=req_path)) return - with open(requirements_path, 'r') as f: - lines = f.readlines() - requirements = set() - for line in lines: - line = line.strip() - if line and not line.startswith('#'): - base_pkg = line.split('==')[0].lower() - aliased_pkg = PACKAGE_ALIASES.get(base_pkg, base_pkg) - requirements.add(aliased_pkg) + with open(req_path) as f: + requirements = { + aliases.get(line.split('==')[0].strip().lower(), line.split('==')[0].strip().lower()) + for line in f if line.strip() and not line.startswith('#') + } - installed_packages = {dist.metadata['Name'].lower() for dist in importlib.metadata.distributions()} + installed = {d.metadata['Name'].lower() for d in importlib.metadata.distributions()} missing = [] - for req in sorted(requirements): - if req in STD_LIB_MODULES or req == 'modules': - print((_('std_lib_local_skipped')).format(package=req)) + for pkg in sorted(requirements): + if pkg in stdlib or pkg == 'modules': + print(_('std_lib_local_skipped').format(package=pkg)) continue - - check_name = req.lower() - - if check_name in installed_packages: - logger.info(f"{_('ok_installed').format(package=check_name)} {check_name}") + if pkg in installed: + logger.info(_('ok_installed').format(package=pkg)) else: - logger.error(f"{(_('missing_package')).format(package=check_name)} {check_name} {(_('missing_package2'))}") - missing.append(check_name) + logger.error(f"{_('missing_package').format(package=pkg)} {pkg} {_('missing_package2')}") + missing.append(pkg) if missing: logger.error(_('missing_packages_detected')) for pkg in missing: print(f" - {pkg}") sys.exit(1) - else: - logger.info(_('all_requirements_satisfied')) + + logger.info(_('all_requirements_satisfied')) + def check_latency(): host = "1.1.1.1" system = platform.system() - if system == "Windows": - cmd = ["ping", "-n", "1", "-w", "1000", host] - latency_pattern = r"Average = (\d+)ms" - - elif system == "Darwin": - cmd = ["ping", "-c", "1", host] - latency_pattern = r"time=([\d\.]+) ms" - - else: - cmd = ["ping", "-c", "1", "-W", "1", host] - latency_pattern = r"time=([\d\.]+) ms" + cmd, pattern = { + "Windows": (["ping", "-n", "1", "-w", "1000", host], r"Average = (\d+)ms"), + "Darwin": (["ping", "-c", "1", host], r"time=([\d\.]+) ms") + }.get(system, (["ping", "-c", "1", "-W", "1", host], r"time=([\d\.]+) ms")) try: - result = subprocess.run( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True - ) - - if result.returncode == 0: - match = re.search(latency_pattern, result.stdout) - if match: - latency_ms = float(match.group(1)) - logger.info((_('ping_to')).format(host=host, latency=latency_ms)) - if latency_ms > 300: - logger.warning(f"{(_('high_latency'))}") - else: - logger.warning((_('could_not_parse_latency'))) - else: + result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + if result.returncode != 0: print(result.stderr) - logger.error(f"{(_('ping_failed')).format(host=host)}{RESET}") + return logger.error(_('ping_failed').format(host=host) + RESET) + + match = re.search(pattern, result.stdout) + if not match: + return logger.warning(_('could_not_parse_latency')) + + latency = float(match.group(1)) + logger.info(_('ping_to').format(host=host, latency=latency)) + if latency > 300: + logger.warning(_('high_latency')) + except Exception as e: - logger.error((_('error_running_ping')).format(error=e)) + logger.error(_('error_running_ping').format(error=e)) def check_memory(): - if psutilavaliable == False: + if not psutilavaliable: return - try: - memory_info = psutil.virtual_memory() # type: ignore - total_memory = memory_info.total / (1024 ** 3) - used_memory = memory_info.used / (1024 ** 3) - free_memory = memory_info.available / (1024 ** 3) - logger.info((_('memory_usage')).format(used=used_memory, total=total_memory, percent=(used_memory / total_memory) * 100)) - if used_memory > total_memory * 0.9: - print(f"{YELLOW}{(_('memory_above_90')).format(percent=(used_memory / total_memory) * 100)}{RESET}") - logger.info((_('total_memory')).format(total=total_memory)) - logger.info((_('used_memory')).format(used=used_memory)) - if free_memory < 1: - logger.warning(f"{(_('low_free_memory')).format(free=free_memory)}") + try: + mem = psutil.virtual_memory() # type: ignore + total = mem.total / 1e9 + used = mem.used / 1e9 + free = mem.available / 1e9 + percent_used = (used / total) * 100 + + logger.info(_('memory_usage').format(used=used, total=total, percent=percent_used)) + if percent_used > 90: + print(f"{YELLOW}{_('memory_above_90').format(percent=percent_used)}{RESET}") + logger.info(_('total_memory').format(total=total)) + logger.info(_('used_memory').format(used=used)) + if free < 1: + logger.warning(_('low_free_memory').format(free=free)) sys.exit(1) + except ImportError: - logger.error(_('psutil_not_installed')) # todo: translate this into italian and put it in the translations "psutil is not installed. Memory check skipped." + logger.error(_('psutil_not_installed')) def check_cpu(): if psutilavaliable == False: @@ -177,22 +159,23 @@ def check_cpu(): def check_memoryjson(): try: - logger.info((_('memory_file')).format(size=os.path.getsize(MEMORY_FILE) / (1024 ** 2))) - if os.path.getsize(MEMORY_FILE) > 1_073_741_824: - logger.warning(f"{(_('memory_file_large'))}") + size_mb = os.path.getsize(MEMORY_FILE) / (1024 ** 2) + logger.info(_('memory_file').format(size=size_mb)) + if size_mb > 1024: + logger.warning(_('memory_file_large')) + try: with open(MEMORY_FILE, 'r', encoding='utf-8') as f: json.load(f) - except json.JSONDecodeError as e: - logger.error(f"{(_('memory_file_corrupted')).format(error=e)}") - logger.warning(f"{(_('consider_backup_memory'))}") - except UnicodeDecodeError as e: - logger.error(f"{(_('memory_file_encoding')).format(error=e)}") - logger.warning(f"{(_('consider_backup_memory'))}") + except (json.JSONDecodeError, UnicodeDecodeError) as e: + msg = _('memory_file_corrupted') if isinstance(e, json.JSONDecodeError) else _('memory_file_encoding') + logger.error(msg.format(error=e)) + logger.warning(_('consider_backup_memory')) except Exception as e: - logger.error(f"{(_('error_reading_memory')).format(error=e)}") + logger.error(_('error_reading_memory').format(error=e)) + except FileNotFoundError: - logger(f"{(_('memory_file_not_found'))}") + logger.error(_('memory_file_not_found')) def presskey2skip(timeout): if os.name == 'nt': diff --git a/modules/unhandledexception.py b/modules/unhandledexception.py index eac98e8..93a3e73 100644 --- a/modules/unhandledexception.py +++ b/modules/unhandledexception.py @@ -1,15 +1,12 @@ import sys import traceback -import os from modules.globalvars import RED, RESET, splashtext from modules.volta.main import _ def handle_exception(exc_type, exc_value, exc_traceback, *, context=None): - os.system('cls' if os.name == 'nt' else 'clear') if issubclass(exc_type, KeyboardInterrupt): sys.__excepthook__(exc_type, exc_value, exc_traceback) return - print(splashtext) print(f"{RED}=====BEGINNING OF TRACEBACK====={RESET}") traceback.print_exception(exc_type, exc_value, exc_traceback) print(f"{RED}========END OF TRACEBACK========{RESET}")