import discord from discord.ext import commands, tasks import asyncio from aiohttp import web import psutil import os import json from datetime import datetime import time import aiohttp from aiohttp import WSMsgType class GooberWeb(commands.Cog): def __init__(self, bot): self.bot = bot self.app = web.Application() self.runner = None self.site = None self.last_command = "No commands executed yet" self.last_command_time = "Never" self.start_time = time.time() self.websockets = set() self.app.add_routes([ web.get('/', self.handle_index), web.get('/changesong', self.handle_changesong), web.get('/stats', self.handle_stats), web.get('/data', self.handle_json_data), web.get('/ws', self.handle_websocket), web.get('/styles.css', self.handle_css), ]) self.bot.loop.create_task(self.start_web_server()) self.update_clients.start() async def get_blacklisted_users(self): blacklisted_ids = os.getenv("BLACKLISTED_USERS", "").split(",") blacklisted_users = [] for user_id in blacklisted_ids: if not user_id.strip(): continue try: user = await self.bot.fetch_user(int(user_id)) blacklisted_users.append({ "name": f"{user.name}#{user.discriminator}", "avatar_url": str(user.avatar.url) if user.avatar else str(user.default_avatar.url), "id": user.id }) except discord.NotFound: blacklisted_users.append({ "name": f"Unknown User ({user_id})", "avatar_url": "", "id": user_id }) except discord.HTTPException as e: print(f"Error fetching user {user_id}: {e}") continue return blacklisted_users async def get_enhanced_guild_info(self): guilds = sorted(self.bot.guilds, key=lambda g: g.member_count, reverse=True) guild_info = [] for guild in guilds: icon_url = str(guild.icon.url) if guild.icon else "" guild_info.append({ "name": guild.name, "member_count": guild.member_count, "icon_url": icon_url, "id": guild.id }) return guild_info async def start_web_server(self): self.runner = web.AppRunner(self.app) await self.runner.setup() self.site = web.TCPSite(self.runner, '0.0.0.0', 8080) await self.site.start() print("Goober web server started on port 8080") async def stop_web_server(self): await self.site.stop() await self.runner.cleanup() print("Web server stopped") def cog_unload(self): self.update_clients.cancel() self.bot.loop.create_task(self.stop_web_server()) @tasks.loop(seconds=5) async def update_clients(self): if not self.websockets: return stats = await self.get_bot_stats() message = json.dumps(stats) for ws in set(self.websockets): try: await ws.send_str(message) except ConnectionResetError: self.websockets.remove(ws) except Exception as e: print(f"Error sending to websocket: {e}") self.websockets.remove(ws) async def handle_websocket(self, request): ws = web.WebSocketResponse() await ws.prepare(request) self.websockets.add(ws) try: async for msg in ws: if msg.type == WSMsgType.ERROR: print(f"WebSocket error: {ws.exception()}") finally: self.websockets.remove(ws) return ws async def handle_css(self, request): css_path = os.path.join(os.path.dirname(__file__), 'styles.css') if os.path.exists(css_path): return web.FileResponse(css_path) return web.Response(text="CSS file not found", status=404) @commands.Cog.listener() async def on_message(self, message): if message.author.bot: return ctx = await self.bot.get_context(message) if ctx.valid and ctx.command: self._update_command_stats(ctx.command.name, ctx.author) @commands.Cog.listener() async def on_app_command_completion(self, interaction, command): self._update_command_stats(command.name, interaction.user) def _update_command_stats(self, command_name, user): self.last_command = f"{command_name} (by {user.name}#{user.discriminator})" self.last_command_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") if self.websockets: asyncio.create_task(self.update_clients()) async def get_bot_stats(self): process = psutil.Process(os.getpid()) mem_info = process.memory_full_info() cpu_percent = psutil.cpu_percent() process_cpu = process.cpu_percent() memory_json_size = "N/A" if os.path.exists("memory.json"): memory_json_size = f"{os.path.getsize('memory.json') / 1024:.2f} KB" guild_info = await self.get_enhanced_guild_info() blacklisted_users = await self.get_blacklisted_users() uptime_seconds = int(time.time() - self.start_time) uptime_str = f"{uptime_seconds // 86400}d {(uptime_seconds % 86400) // 3600}h {(uptime_seconds % 3600) // 60}m {uptime_seconds % 60}s" return { "ram_usage": f"{mem_info.rss / 1024 / 1024:.2f} MB", "cpu_usage": f"{process_cpu}%", "system_cpu": f"{cpu_percent}%", "memory_json_size": memory_json_size, "guild_count": len(guild_info), "bl_count": len(blacklisted_users), "guilds": guild_info, "blacklisted_users": blacklisted_users, "last_command": self.last_command, "last_command_time": self.last_command_time, "bot_uptime": uptime_str, "latency": f"{self.bot.latency * 1000:.2f} ms", "bot_name": self.bot.user.name, "bot_avatar_url": str(self.bot.user.avatar.url) if self.bot.user.avatar else "", "authenticated": os.getenv("gooberauthenticated"), "lastmsg": os.getenv("gooberlatestgen") } async def handle_update(self, request): if os.path.exists("goob/update.py"): return web.FileResponse("goob/update.py") return web.Response(text="Update file not found", status=404) async def handle_changesong(self, request): song = request.query.get('song', '') if song: await self.bot.change_presence(activity=discord.Activity(type=discord.ActivityType.listening, name=song)) return web.Response(text=f"Changed song to: {song}") return web.Response(text="Please provide a song parameter", status=400) async def handle_changes(self, request): if os.path.exists("goob/changes.txt"): return web.FileResponse("goob/changes.txt") return web.Response(text="Changelog not found", status=404) async def handle_index(self, request): stats = await self.get_bot_stats() guild_list_html = "" for guild in stats['guilds']: icon_html = f'guild icon' if guild["icon_url"] else '
' guild_list_html += f"""
{icon_html}
{guild["name"]}
{guild["member_count"]} members
""" blacklisted_users_html = "" for user in stats['blacklisted_users']: avatar_html = f'user avatar' if user["avatar_url"] else '
' blacklisted_users_html += f"""
{avatar_html}
{user["name"]}
ID: {user["id"]}
""" html_content = f""" goobs central
RAM: {stats['ram_usage']}
CPU: {stats['cpu_usage']}
System CPU: {stats['system_cpu']}
Latency: {stats['latency']}
JSON Size: {stats['memory_json_size']}
Uptime: {stats['bot_uptime']}

botvatar {stats['bot_name']}

your stupid little goober that learns off other people's messages

Last Command
{stats['last_command']}
at {stats['last_command_time']}

Logged into goober central
{stats['authenticated']}

Last generated message
{stats['lastmsg']}

Change song
Servers ({stats['guild_count']})
{guild_list_html}

Blacklisted Users ({stats['bl_count']})
{blacklisted_users_html if stats['blacklisted_users'] else "
No blacklisted users
"}
""" return web.Response(text=html_content, content_type='text/html') async def handle_stats(self, request): return await self.handle_index(request) async def handle_json_data(self, request): stats = await self.get_bot_stats() return web.json_response(stats) async def setup(bot): await bot.add_cog(GooberWeb(bot))