update version info

This commit is contained in:
WhatDidYouExpect 2025-03-16 16:57:47 +01:00
parent ef93452d1c
commit 679b1822cd
5 changed files with 78 additions and 1 deletions

340
cogs/tf.py.disabled Normal file
View file

@ -0,0 +1,340 @@
import discord
from discord.ext import commands
import os
from typing import List, TypedDict
import numpy as np
import json
from time import strftime, localtime
import pickle
import functools
import re
import time
import asyncio
ready: bool = True
MODEL_MATCH_STRING = "[0-9]{2}_[0-9]{2}_[0-9]{4}-[0-9]{2}_[0-9]{2}"
try:
import tensorflow as tf
from tensorflow import keras
from keras.preprocessing.text import Tokenizer
from keras_preprocessing.sequence import pad_sequences
from keras.models import Sequential
from keras.layers import Embedding, LSTM, Dense
from keras.models import load_model
from keras.backend import clear_session
tf.config.optimizer.set_jit(True)
except ImportError:
print("ERROR: Failed to import Tensorflow. Here is a list of required dependencies:",(
"tensorflow==2.10.0"
"(for Nvidia users: tensorflow-gpu==2.10.0)"
"(for macOS: tensorflow-metal==0.6.0, tensorflow-macos==2.10.0)"
"numpy~=1.23"
))
ready = False
class TFCallback(keras.callbacks.Callback):
def __init__(self,bot, progress_embed:discord.Embed, message):
self.embed:discord.Embed = progress_embed
self.bot:commands.Bot = bot
self.message = message
self.times:List[int] = [time.time()]
def on_train_begin(self, logs=None):
pass
async def send_message(self,message:str, description:str, **kwargs):
if "epoch" in kwargs:
self.times.append(time.time())
average_epoch_time:int = np.average(np.diff(np.array(self.times)))
description = f"ETA: {round(average_epoch_time)}s"
self.embed.add_field(name=f"<t:{round(time.time())}:t> - {message}",value=description,inline=False)
await self.message.edit(embed=self.embed)
def on_train_end(self,logs=None):
self.bot.loop.create_task(self.send_message("Training stopped", "training has been stopped."))
def on_epoch_begin(self, epoch, logs=None):
self.bot.loop.create_task(self.send_message(f"Starting epoch {epoch}","This might take a while", epoch=True))
def on_epoch_end(self, epoch, logs=None):
self.bot.loop.create_task(self.send_message(f"Epoch {epoch} ended",f"Accuracy: {round(logs.get('accuracy',0.0),4)}"))
class Ai:
def __init__(self):
model_path = settings.get("model_path")
if model_path:
self.__load_model(model_path)
self.is_loaded = model_path is not None
self.batch_size = 64
def get_model_name_from_path(self,path:str):
match:re.Match = re.search(MODEL_MATCH_STRING, path)
return path[match.start():][:match.end()]
def generate_model_name(self) -> str:
return strftime('%d_%m_%Y-%H_%M', localtime())
def generate_model_abs_path(self, name:str):
name = name or self.generate_model_name()
return os.path.join(".","models",self.generate_model_name(),"model.h5")
def generate_tokenizer_abs_path(self, name:str):
name = name or self.generate_model_name()
return os.path.join(".","models",name,"tokenizer.pkl")
def generate_info_abs_path(self,name:str):
name = name or self.generate_model_name()
return os.path.join(".","models",name,"info.json")
def save_model(self,model, tokenizer, history, _name:str=None):
name:str = _name or self.generate_model_name()
os.makedirs(os.path.join(".","models",name), exist_ok=True)
with open(self.generate_info_abs_path(name),"w") as f:
json.dump(history.history,f)
with open(self.generate_tokenizer_abs_path(name), "wb") as f:
pickle.dump(tokenizer,f)
model.save(self.generate_model_abs_path(name))
def __load_model(self, model_path:str):
clear_session()
self.model = load_model(os.path.join(model_path,"model.h5"))
model_name:str = self.get_model_name_from_path(model_path)
try:
with open(self.generate_tokenizer_abs_path(model_name),"rb") as f:
self.tokenizer = pickle.load(f)
except FileNotFoundError:
print("Failed to load tokenizer for model... Using default")
self.tokenizer = Tokenizer()
with open("memory.json","r") as f:
self.tokenizer.fit_on_sequences(json.load(f))
self.is_loaded = True
def reload_model(self):
clear_session()
model_path:str = settings.get("model_path")
if model_path:
self.model = self.__load_model(model_path)
self.is_loaded = True
async def run_async(self,func,bot,*args,**kwargs):
func = functools.partial(func,*args,**kwargs)
return await bot.loop.run_in_executor(None,func)
class Learning(Ai):
def __init__(self):
super().__init__()
def __generate_labels_and_inputs(self,memory: List[str], tokenizer=None) -> tuple:
if not tokenizer:
tokenizer = Tokenizer()
tokenizer.fit_on_texts(memory)
sequences = tokenizer.texts_to_sequences(memory)
x = []
y = []
for seq in sequences:
for i in range(1, len(seq)):
x.append(seq[:i])
y.append(seq[i])
return x,y, tokenizer
def create_model(self,memory: list, iters:int=2):
memory = memory[:2000]
X,y,tokenizer = self.__generate_labels_and_inputs(memory)
maxlen:int = max([len(x) for x in X])
x_pad = pad_sequences(X, maxlen=maxlen, padding="pre")
y = np.array(y)
model = Sequential()
model.add(Embedding(input_dim=VOCAB_SIZE,output_dim=128,input_length=maxlen))
model.add(LSTM(64))
model.add(Dense(VOCAB_SIZE, activation="softmax"))
model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
history = model.fit(x_pad, y, epochs=iters, batch_size=64, callbacks=[tf_callback])
self.save_model(model, tokenizer, history)
return
def add_training(self,memory: List[str], iters:int=2):
tokenizer_path = os.path.join(settings.get("model_path"),"tokenizer.pkl")
with open(tokenizer_path, "rb") as f:
tokenizer = pickle.load(f)
X,y,_ = self.__generate_labels_and_inputs(memory, tokenizer)
maxlen:int = max([len(x) for x in X])
x_pad = pad_sequences(X, maxlen=maxlen, padding="pre")
y = np.array(y)
history = self.model.fit(x_pad,y, epochs=iters, validation_data=(x_pad,y), batch_size=64, callbacks=[tf_callback]) # Ideally, validation data would be seperate from the actual data
self.save_model(self.model,tokenizer,history,self.get_model_name_from_path(settings.get("model_path")))
return
class Generation(Ai):
def __init__(self):
super().__init__()
def generate_sentence(self, word_amount:int, seed:str):
if not self.is_loaded:
return False
for _ in range(word_amount):
token_list = self.tokenizer.texts_to_sequences([seed])[0]
token_list = pad_sequences([token_list], maxlen=self.model.layers[0].input_shape[1], padding="pre")
output_word = "" # Sometimes model fails to predict the word, so using a fallback
predicted_probs = self.model.predict(token_list, verbose=0)
predicted_word_index = np.argmax(predicted_probs, axis=-1)[0]
for word, index in self.tokenizer.word_index.items():
if index == predicted_word_index:
output_word = word
break
seed += " " + output_word
return seed
VOCAB_SIZE = 100_000
SETTINGS_TYPE = TypedDict("SETTINGS_TYPE", {
"model_path":str, # path to the base folder of the model, aka .../models/05-01-2025-22_31/
"tokenizer_path":str,
})
tf_callback:TFCallback
model_dropdown_items = []
settings: SETTINGS_TYPE = {}
target_message:int
learning:Learning
generation: Generation
class Settings:
def __init__(self):
self.settings_path:str = os.path.join(".","models","settings.json")
def load(self):
global settings
try:
with open(self.settings_path,"r") as f:
settings = json.load(f)
except FileNotFoundError:
with open(self.settings_path,"w") as f:
json.dump({},f)
def change_model(self,new_model_base_path:str):
global settings
new_model_path = os.path.join(".","models",new_model_base_path)
with open(self.settings_path,"r") as f:
settings = json.load(f)
settings["model_path"] = new_model_path
with open(self.settings_path, "w") as f:
json.dump(settings,f)
class Dropdown(discord.ui.Select):
def __init__(self, items:List[str]):
global model_dropdown_items
model_dropdown_items = []
for item in items:
model_dropdown_items.append(
discord.SelectOption(label=item)
)
super().__init__(placeholder="Select model", options=model_dropdown_items)
async def callback(self, interaction: discord.Interaction):
if int(interaction.user.id) != int(os.getenv("ownerid")):
await interaction.message.channel.send("KILL YOURSELF")
Settings().change_model(self.values[0])
await interaction.message.channel.send(f"Changed model to {self.values[0]}")
class DropdownView(discord.ui.View):
def __init__(self, timeout, models):
super().__init__(timeout=timeout)
self.add_item(Dropdown(models))
class Tf(commands.Cog):
def __init__(self,bot):
global learning, generation, ready
os.makedirs(os.path.join(".","models"),exist_ok=True)
Settings().load()
self.bot = bot
learning = Learning()
generation = Generation()
@commands.command()
async def start(self,ctx):
await ctx.defer()
await ctx.send("hi")
@commands.command()
async def generate(self,ctx,seed:str,word_amount:int=5):
await ctx.defer()
await ctx.send(generation.generate_sentence(word_amount,seed))
@commands.command()
async def create(self,ctx:commands.Context, epochs:int=3):
global tf_callback
await ctx.defer()
with open("memory.json","r") as f:
memory:List[str] = json.load(f)
await ctx.send("Initializing tensorflow")
embed = discord.Embed(title="Creating a model...", description="Progress of creating a model")
embed.set_footer(text="Note: Progress tracking might report delayed / wrong data, since the function is run asynchronously")
target_message:discord.Message = await ctx.send(embed=embed)
tf_callback = TFCallback(self.bot,embed,target_message)
await learning.run_async(learning.create_model,self.bot,memory,epochs)
embed = target_message.embeds[0]
embed.add_field(name=f"<t:{round(time.time())}:t> Finished",value="Model saved.")
await target_message.edit(embed=embed)
@commands.command()
async def train(self,ctx, epochs:int=2):
global tf_callback
await ctx.defer()
with open("memory.json","r") as f:
memory:List[str] = json.load(f)
embed = discord.Embed(title="Training model...", description="Progress of training model")
target_message = await ctx.send(embed=embed)
tf_callback = TFCallback(self.bot,embed,target_message)
await learning.run_async(learning.add_training,self.bot,memory,epochs)
await ctx.send("Finished!")
@commands.command()
async def change(self,ctx,model:str=None):
embed = discord.Embed(title="Change model",description="Which model would you like to use?")
if model is None:
models:List[str] = os.listdir(os.path.join(".","models"))
models = [folder for folder in models if re.match(MODEL_MATCH_STRING,folder)]
if len(models) == 0:
models = ["No models available."]
await ctx.send(embed=embed,view=DropdownView(90,models))
learning.reload_model()
generation.reload_model()
async def setup(bot):
await bot.add_cog(Tf(bot))