goober/modules/prestartchecks.py

237 lines
7.2 KiB
Python
Raw Normal View History

2025-07-01 16:21:34 +02:00
from modules.globalvars import *
from modules.volta.main import _, check_missing_translations
2025-06-22 20:49:07 +02:00
import time
import os
import sys
import subprocess
import sysconfig
2025-06-22 20:49:07 +02:00
import ast
import json
import re
2025-07-07 17:17:44 +02:00
from spacy.util import is_package
import importlib.metadata
import logging
logger = logging.getLogger("goober")
# import shutil
2025-06-27 19:45:44 +02:00
psutilavaliable = True
try:
import requests
import psutil
except ImportError:
psutilavaliable = False
logger.error(_('missing_requests_psutil'))
2025-07-01 16:21:34 +02:00
2025-07-07 17:17:44 +02:00
def check_for_model():
if is_package("en_core_web_sm"):
logger.info("Model is installed.")
2025-07-07 17:17:44 +02:00
else:
logger.info("Model is not installed.")
2025-07-22 19:34:46 +02:00
def iscloned():
if os.path.exists(".git"):
return True
else:
logger.error(f"{_('not_cloned')}")
sys.exit(1)
def get_stdlib_modules():
2025-07-22 20:03:11 +02:00
stdlib = pathlib.Path(sysconfig.get_paths()['stdlib'])
modules = set(sys.builtin_module_names)
modules.update(
f.stem for f in stdlib.glob('*.py') if f.stem != '__init__'
)
modules.update(
d.name for d in stdlib.iterdir() if (d / '__init__.py').exists()
)
modules.update(
f.stem for f in stdlib.glob('*') if f.suffix in ('.so', '.pyd')
)
return modules
2025-06-22 20:49:07 +02:00
def check_requirements():
2025-07-22 20:03:11 +02:00
stdlib = get_stdlib_modules()
aliases = {
2025-06-22 20:49:07 +02:00
"discord": "discord.py",
"better_profanity": "better-profanity",
2025-07-16 14:39:18 +02:00
"dotenv": "python-dotenv",
"pil": "pillow"
2025-06-22 20:49:07 +02:00
}
2025-07-22 20:03:11 +02:00
req_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'requirements.txt'))
if not os.path.exists(req_path):
logger.error(_('requirements_not_found').format(path=req_path))
2025-06-22 20:49:07 +02:00
return
2025-07-22 20:03:11 +02:00
with open(req_path) as f:
requirements = {
aliases.get(line.split('==')[0].strip().lower(), line.split('==')[0].strip().lower())
for line in f if line.strip() and not line.startswith('#')
}
installed = {d.metadata['Name'].lower() for d in importlib.metadata.distributions()}
2025-06-22 20:49:07 +02:00
missing = []
2025-07-22 20:03:11 +02:00
for pkg in sorted(requirements):
if pkg in stdlib or pkg == 'modules':
print(_('std_lib_local_skipped').format(package=pkg))
2025-06-22 20:49:07 +02:00
continue
2025-07-22 20:03:11 +02:00
if pkg in installed:
logger.info(_('ok_installed').format(package=pkg))
else:
2025-07-22 20:03:11 +02:00
logger.error(f"{_('missing_package').format(package=pkg)} {pkg} {_('missing_package2')}")
missing.append(pkg)
2025-06-22 20:49:07 +02:00
if missing:
logger.error(_('missing_packages_detected'))
2025-06-22 20:49:07 +02:00
for pkg in missing:
print(f" - {pkg}")
sys.exit(1)
2025-07-22 20:03:11 +02:00
logger.info(_('all_requirements_satisfied'))
2025-06-22 20:49:07 +02:00
def check_latency():
2025-06-23 13:56:23 +02:00
host = "1.1.1.1"
system = platform.system()
2025-07-16 14:39:18 +02:00
2025-07-22 20:03:11 +02:00
cmd, pattern = {
"Windows": (["ping", "-n", "1", "-w", "1000", host], r"Average = (\d+)ms"),
"Darwin": (["ping", "-c", "1", host], r"time=([\d\.]+) ms")
}.get(system, (["ping", "-c", "1", "-W", "1", host], r"time=([\d\.]+) ms"))
2025-06-23 13:56:23 +02:00
try:
2025-07-22 20:03:11 +02:00
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode != 0:
2025-06-23 13:56:23 +02:00
print(result.stderr)
2025-07-22 20:03:11 +02:00
return logger.error(_('ping_failed').format(host=host) + RESET)
match = re.search(pattern, result.stdout)
if not match:
return logger.warning(_('could_not_parse_latency'))
latency = float(match.group(1))
logger.info(_('ping_to').format(host=host, latency=latency))
if latency > 300:
logger.warning(_('high_latency'))
2025-06-23 13:56:23 +02:00
except Exception as e:
2025-07-22 20:03:11 +02:00
logger.error(_('error_running_ping').format(error=e))
2025-06-22 20:49:07 +02:00
def check_memory():
2025-07-22 20:03:11 +02:00
if not psutilavaliable:
2025-06-27 19:45:44 +02:00
return
2025-07-22 20:03:11 +02:00
2025-06-22 20:49:07 +02:00
try:
2025-07-22 20:03:11 +02:00
mem = psutil.virtual_memory() # type: ignore
total = mem.total / 1e9
used = mem.used / 1e9
free = mem.available / 1e9
percent_used = (used / total) * 100
logger.info(_('memory_usage').format(used=used, total=total, percent=percent_used))
if percent_used > 90:
print(f"{YELLOW}{_('memory_above_90').format(percent=percent_used)}{RESET}")
logger.info(_('total_memory').format(total=total))
logger.info(_('used_memory').format(used=used))
if free < 1:
logger.warning(_('low_free_memory').format(free=free))
2025-06-22 20:49:07 +02:00
sys.exit(1)
2025-07-22 20:03:11 +02:00
2025-06-22 20:49:07 +02:00
except ImportError:
2025-07-22 20:03:11 +02:00
logger.error(_('psutil_not_installed'))
2025-06-22 20:49:07 +02:00
2025-06-22 23:08:33 +02:00
def check_cpu():
2025-06-27 19:45:44 +02:00
if psutilavaliable == False:
return
logger.info((_('measuring_cpu')))
2025-07-07 20:41:24 +02:00
cpu_per_core = psutil.cpu_percent(interval=1, percpu=True) # type: ignore
total_cpu = sum(cpu_per_core) / len(cpu_per_core)
logger.info((_('total_cpu_usage')).format(usage=total_cpu))
if total_cpu > 85:
logger.warning(f"{(_('high_avg_cpu')).format(usage=total_cpu)}")
if total_cpu > 95:
logger.error(_('really_high_cpu'))
sys.exit(1)
2025-06-22 23:08:33 +02:00
2025-06-22 20:49:07 +02:00
def check_memoryjson():
try:
2025-07-22 20:03:11 +02:00
size_mb = os.path.getsize(MEMORY_FILE) / (1024 ** 2)
logger.info(_('memory_file').format(size=size_mb))
if size_mb > 1024:
logger.warning(_('memory_file_large'))
try:
with open(MEMORY_FILE, 'r', encoding='utf-8') as f:
json.load(f)
2025-07-22 20:03:11 +02:00
except (json.JSONDecodeError, UnicodeDecodeError) as e:
msg = _('memory_file_corrupted') if isinstance(e, json.JSONDecodeError) else _('memory_file_encoding')
logger.error(msg.format(error=e))
logger.warning(_('consider_backup_memory'))
except Exception as e:
2025-07-22 20:03:11 +02:00
logger.error(_('error_reading_memory').format(error=e))
2025-06-22 20:49:07 +02:00
except FileNotFoundError:
2025-07-22 20:03:11 +02:00
logger.error(_('memory_file_not_found'))
2025-06-22 20:49:07 +02:00
def presskey2skip(timeout):
if os.name == 'nt':
import msvcrt
start_time = time.time()
while True:
if msvcrt.kbhit():
msvcrt.getch()
break
if time.time() - start_time > timeout:
break
time.sleep(0.1)
else:
import select
import sys
import termios
import tty
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setcbreak(fd)
start_time = time.time()
while True:
if select.select([sys.stdin], [], [], 0)[0]:
sys.stdin.read(1)
break
if time.time() - start_time > timeout:
break
time.sleep(0.1)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
beta = beta
2025-06-22 20:49:07 +02:00
def start_checks():
if CHECKS_DISABLED == "True":
logger.warning(f"{(_('checks_disabled'))}")
return
logger.info(_('running_prestart_checks'))
2025-07-07 17:17:44 +02:00
check_for_model()
iscloned()
check_missing_translations()
check_requirements()
check_latency()
check_memory()
check_memoryjson()
check_cpu()
if os.path.exists(".env"):
pass
else:
logger.warning(f"{(_('env_file_not_found'))}")
sys.exit(1)
if beta == True:
logger.warning(f"this build isnt finished yet, some things might not work as expected")
else:
pass
logger.info(_('continuing_in_seconds').format(seconds=5))
presskey2skip(timeout=5)
os.system('cls' if os.name == 'nt' else 'clear')
print(splashtext)