from modules.globalvars import * from modules.settings import Settings as SettingsManager from modules.volta.main import _, check_missing_translations import time import os import sys import subprocess import sysconfig import ast import json import re from spacy.util import is_package import importlib.metadata import logging logger = logging.getLogger("goober") settings_manager = SettingsManager() settings = settings_manager.settings MEMORY_FILE = settings["bot"]["active_memory"] with open(settings["splash_text_loc"], "r", encoding="UTF-8") as f: splash_text = "".join(f.readlines()) # import shutil psutilavaliable = True try: import requests import psutil except ImportError: psutilavaliable = False logger.error(_('missing_requests_psutil')) def check_for_model(): if is_package("en_core_web_sm"): logger.info("Model is installed.") else: logger.info("Model is not installed.") def iscloned(): if os.path.exists(".git"): return True else: logger.error(f"{_('not_cloned')}") sys.exit(1) def get_stdlib_modules(): stdlib = pathlib.Path(sysconfig.get_paths()['stdlib']) modules = set(sys.builtin_module_names) modules.update( f.stem for f in stdlib.glob('*.py') if f.stem != '__init__' ) modules.update( d.name for d in stdlib.iterdir() if (d / '__init__.py').exists() ) modules.update( f.stem for f in stdlib.glob('*') if f.suffix in ('.so', '.pyd') ) return modules def check_requirements(): stdlib = get_stdlib_modules() aliases = { "discord": "discord.py", "better_profanity": "better-profanity", "dotenv": "python-dotenv", "pil": "pillow" } req_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'requirements.txt')) if not os.path.exists(req_path): logger.error(_('requirements_not_found').format(path=req_path)) return with open(req_path) as f: requirements = { aliases.get(line.split('==')[0].strip().lower(), line.split('==')[0].strip().lower()) for line in f if line.strip() and not line.startswith('#') } installed = {d.metadata['Name'].lower() for d in importlib.metadata.distributions()} missing = [] for pkg in sorted(requirements): if pkg in stdlib or pkg == 'modules': print(_('std_lib_local_skipped').format(package=pkg)) continue if pkg in installed: logger.info(_('ok_installed').format(package=pkg)) else: logger.error(f"{_('missing_package').format(package=pkg)} {pkg} {_('missing_package2')}") missing.append(pkg) if missing: logger.error(_('missing_packages_detected')) for pkg in missing: print(f" - {pkg}") sys.exit(1) logger.info(_('all_requirements_satisfied')) def check_latency(): host = "1.1.1.1" system = platform.system() cmd, pattern = { "Windows": (["ping", "-n", "1", "-w", "1000", host], r"Average = (\d+)ms"), "Darwin": (["ping", "-c", "1", host], r"time=([\d\.]+) ms") }.get(system, (["ping", "-c", "1", "-W", "1", host], r"time=([\d\.]+) ms")) try: result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if result.returncode != 0: print(result.stderr) return logger.error(_('ping_failed').format(host=host) + RESET) match = re.search(pattern, result.stdout) if not match: return logger.warning(_('could_not_parse_latency')) latency = float(match.group(1)) logger.info(_('ping_to').format(host=host, latency=latency)) if latency > 300: logger.warning(_('high_latency')) except Exception as e: logger.error(_('error_running_ping').format(error=e)) def check_memory(): if not psutilavaliable: return try: mem = psutil.virtual_memory() # type: ignore total = mem.total / 1e9 used = mem.used / 1e9 free = mem.available / 1e9 percent_used = (used / total) * 100 logger.info(_('memory_usage').format(used=used, total=total, percent=percent_used)) if percent_used > 90: print(f"{YELLOW}{_('memory_above_90').format(percent=percent_used)}{RESET}") logger.info(_('total_memory').format(total=total)) logger.info(_('used_memory').format(used=used)) if free < 1: logger.warning(_('low_free_memory').format(free=free)) sys.exit(1) except ImportError: logger.error(_('psutil_not_installed')) def check_cpu(): if psutilavaliable == False: return logger.info((_('measuring_cpu'))) cpu_per_core = psutil.cpu_percent(interval=1, percpu=True) # type: ignore total_cpu = sum(cpu_per_core) / len(cpu_per_core) logger.info((_('total_cpu_usage')).format(usage=total_cpu)) if total_cpu > 85: logger.warning(f"{(_('high_avg_cpu')).format(usage=total_cpu)}") if total_cpu > 95: logger.error(_('really_high_cpu')) sys.exit(1) def check_memoryjson(): try: size_mb = os.path.getsize(MEMORY_FILE) / (1024 ** 2) logger.info(_('memory_file').format(size=size_mb)) if size_mb > 1024: logger.warning(_('memory_file_large')) try: with open(MEMORY_FILE, 'r', encoding='utf-8') as f: json.load(f) except (json.JSONDecodeError, UnicodeDecodeError) as e: msg = _('memory_file_corrupted') if isinstance(e, json.JSONDecodeError) else _('memory_file_encoding') logger.error(msg.format(error=e)) logger.warning(_('consider_backup_memory')) except Exception as e: logger.error(_('error_reading_memory').format(error=e)) except FileNotFoundError: logger.error(_('memory_file_not_found')) def presskey2skip(timeout): if os.name == 'nt': import msvcrt start_time = time.time() while True: if msvcrt.kbhit(): msvcrt.getch() break if time.time() - start_time > timeout: break time.sleep(0.1) else: import select import sys import termios import tty fd = sys.stdin.fileno() old_settings = termios.tcgetattr(fd) try: tty.setcbreak(fd) start_time = time.time() while True: if select.select([sys.stdin], [], [], 0)[0]: sys.stdin.read(1) break if time.time() - start_time > timeout: break time.sleep(0.1) finally: termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) beta = beta def start_checks(): if settings["disable_checks"]: logger.warning(f"{_('checks_disabled')}") return logger.info(_('running_prestart_checks')) check_for_model() iscloned() check_missing_translations() check_requirements() check_latency() check_memory() check_memoryjson() check_cpu() if os.path.exists(".env"): pass else: logger.warning(f"{(_('env_file_not_found'))}") sys.exit(1) if beta == True: logger.warning(f"this build isnt finished yet, some things might not work as expected") else: pass logger.info(_('continuing_in_seconds').format(seconds=5)) presskey2skip(timeout=5) os.system('cls' if os.name == 'nt' else 'clear')