import discord
from discord.ext import commands, tasks
import asyncio
from aiohttp import web
import psutil
import os
import json
from datetime import datetime
import time
import aiohttp
from aiohttp import WSMsgType
from config import VERSION_URL
class GooberWeb(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.app = web.Application()
self.runner = None
self.site = None
self.last_command = "No commands executed yet"
self.last_command_time = "Never"
self.start_time = time.time()
self.websockets = set()
self.app.add_routes([
web.get('/', self.handle_index),
web.get('/changesong', self.handle_changesong),
web.get('/stats', self.handle_stats),
web.get('/data', self.handle_json_data),
web.get('/ws', self.handle_websocket),
web.get('/styles.css', self.handle_css),
])
self.bot.loop.create_task(self.start_web_server())
self.update_clients.start()
async def get_blacklisted_users(self):
blacklisted_ids = os.getenv("BLACKLISTED_USERS", "").split(",")
blacklisted_users = []
for user_id in blacklisted_ids:
if not user_id.strip():
continue
try:
user = await self.bot.fetch_user(int(user_id))
blacklisted_users.append({
"name": f"{user.name}#{user.discriminator}",
"avatar_url": str(user.avatar.url) if user.avatar else str(user.default_avatar.url),
"id": user.id
})
except discord.NotFound:
blacklisted_users.append({
"name": f"Unknown User ({user_id})",
"avatar_url": "",
"id": user_id
})
except discord.HTTPException as e:
print(f"Error fetching user {user_id}: {e}")
continue
return blacklisted_users
async def get_enhanced_guild_info(self):
guilds = sorted(self.bot.guilds, key=lambda g: g.member_count, reverse=True)
guild_info = []
for guild in guilds:
icon_url = str(guild.icon.url) if guild.icon else ""
guild_info.append({
"name": guild.name,
"member_count": guild.member_count,
"icon_url": icon_url,
"id": guild.id
})
return guild_info
async def start_web_server(self):
self.runner = web.AppRunner(self.app)
await self.runner.setup()
self.site = web.TCPSite(self.runner, '0.0.0.0', 8080)
await self.site.start()
print("Goober web server started on port 8080")
async def stop_web_server(self):
await self.site.stop()
await self.runner.cleanup()
print("Web server stopped")
def cog_unload(self):
self.update_clients.cancel()
self.bot.loop.create_task(self.stop_web_server())
@tasks.loop(seconds=5)
async def update_clients(self):
if not self.websockets:
return
stats = await self.get_bot_stats()
message = json.dumps(stats)
for ws in set(self.websockets):
try:
await ws.send_str(message)
except ConnectionResetError:
self.websockets.remove(ws)
except Exception as e:
print(f"Error sending to websocket: {e}")
self.websockets.remove(ws)
async def handle_websocket(self, request):
ws = web.WebSocketResponse()
await ws.prepare(request)
self.websockets.add(ws)
try:
async for msg in ws:
if msg.type == WSMsgType.ERROR:
print(f"WebSocket error: {ws.exception()}")
finally:
self.websockets.remove(ws)
return ws
async def handle_css(self, request):
css_path = os.path.join(os.path.dirname(__file__), 'styles.css')
if os.path.exists(css_path):
return web.FileResponse(css_path)
return web.Response(text="CSS file not found", status=404)
@commands.Cog.listener()
async def on_message(self, message):
if message.author.bot:
return
ctx = await self.bot.get_context(message)
if ctx.valid and ctx.command:
self._update_command_stats(ctx.command.name, ctx.author)
@commands.Cog.listener()
async def on_app_command_completion(self, interaction, command):
self._update_command_stats(command.name, interaction.user)
def _update_command_stats(self, command_name, user):
self.last_command = f"{command_name} (by {user.name}#{user.discriminator})"
self.last_command_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if self.websockets:
asyncio.create_task(self.update_clients())
async def get_bot_stats(self):
process = psutil.Process(os.getpid())
mem_info = process.memory_full_info()
cpu_percent = psutil.cpu_percent()
process_cpu = process.cpu_percent()
memory_json_size = "N/A"
if os.path.exists("memory.json"):
memory_json_size = f"{os.path.getsize('memory.json') / 1024:.2f} KB"
guild_info = await self.get_enhanced_guild_info()
blacklisted_users = await self.get_blacklisted_users()
uptime_seconds = int(time.time() - self.start_time)
uptime_str = f"{uptime_seconds // 86400}d {(uptime_seconds % 86400) // 3600}h {(uptime_seconds % 3600) // 60}m {uptime_seconds % 60}s"
return {
"ram_usage": f"{mem_info.rss / 1024 / 1024:.2f} MB",
"cpu_usage": f"{process_cpu}%",
"system_cpu": f"{cpu_percent}%",
"memory_json_size": memory_json_size,
"guild_count": len(guild_info),
"bl_count": len(blacklisted_users),
"guilds": guild_info,
"blacklisted_users": blacklisted_users,
"last_command": self.last_command,
"last_command_time": self.last_command_time,
"bot_uptime": uptime_str,
"latency": f"{self.bot.latency * 1000:.2f} ms",
"bot_name": self.bot.user.name,
"bot_avatar_url": str(self.bot.user.avatar.url) if self.bot.user.avatar else "",
"authenticated": os.getenv("gooberauthenticated"),
"lastmsg": os.getenv("gooberlatestgen"),
"localversion": os.getenv("gooberlocal_version"),
"latestversion": os.getenv("gooberlatest_version"),
"owner": os.getenv("ownerid")
}
async def handle_update(self, request):
if os.path.exists("goob/update.py"):
return web.FileResponse("goob/update.py")
return web.Response(text="Update file not found", status=404)
async def handle_changesong(self, request):
song = request.query.get('song', '')
if song:
await self.bot.change_presence(activity=discord.Activity(type=discord.ActivityType.listening, name=song))
return web.Response(text=f"Changed song to: {song}")
return web.Response(text="Please provide a song parameter", status=400)
async def handle_changes(self, request):
if os.path.exists("goob/changes.txt"):
return web.FileResponse("goob/changes.txt")
return web.Response(text="Changelog not found", status=404)
async def handle_index(self, request):
stats = await self.get_bot_stats()
guild_list_html = ""
for guild in stats['guilds']:
icon_html = f'' if guild["icon_url"] else '
your stupid little goober that learns off other people's messages