import discord from discord.ext import commands import os from typing import List, TypedDict import numpy as np import json from time import strftime, localtime import pickle import re ready: bool = True MODEL_MATCH_STRING = "[0-9]{2}_[0-9]{2}_[0-9]{4}-[0-9]{2}_[0-9]{2}" try: import tensorflow as tf from tensorflow import keras from keras.preprocessing.text import Tokenizer from keras_preprocessing.sequence import pad_sequences from keras.models import Sequential from keras.layers import Embedding, LSTM, Dense from keras.models import load_model from keras.backend import clear_session tf.config.optimizer.set_jit(True) except ImportError: print("ERROR: Failed to import Tensorflow. Here is a list of required dependencies:",( "tensorflow==2.10.0" "(for Nvidia users: tensorflow-gpu==2.10.0)" "(for macOS: tensorflow-metal==0.6.0, tensorflow-macos==2.10.0)" "numpy~=1.23" )) ready = False class Ai: def __init__(self): model_path = settings.get("model_path") if model_path: self.__load_model(model_path) self.is_loaded = model_path is not None self.batch_size = 64 def get_model_name_from_path(self,path:str): print(path) match:re.Match = re.search(MODEL_MATCH_STRING, path) print(match.start) return path[match.start():][:match.end()] def generate_model_name(self) -> str: return strftime('%d_%m_%Y-%H_%M', localtime()) def generate_model_abs_path(self, name:str): name = name or self.generate_model_name() return os.path.join(".","models",self.generate_model_name(),"model.h5") def generate_tokenizer_abs_path(self, name:str): name = name or self.generate_model_name() return os.path.join(".","models",name,"tokenizer.pkl") def generate_info_abs_path(self,name:str): name = name or self.generate_model_name() return os.path.join(".","models",name,"info.json") def save_model(self,model, tokenizer, history, _name:str=None): name:str = _name or self.generate_model_name() os.makedirs(os.path.join(".","models",name), exist_ok=True) with open(self.generate_info_abs_path(name),"w") as f: json.dump(history.history,f) with open(self.generate_tokenizer_abs_path(name), "wb") as f: pickle.dump(tokenizer,f) model.save(self.generate_model_abs_path(name)) def __load_model(self, model_path:str): clear_session() self.model = load_model(os.path.join(model_path,"model.h5")) model_name:str = self.get_model_name_from_path(model_path) try: with open(self.generate_tokenizer_abs_path(model_name),"rb") as f: self.tokenizer = pickle.load(f) except FileNotFoundError: print("Failed to load tokenizer for model... Using default") self.tokenizer = Tokenizer() with open("memory.json","r") as f: self.tokenizer.fit_on_sequences(json.load(f)) self.is_loaded = True def reload_model(self): clear_session() model_path:str = settings.get("model_path") if model_path: self.model = self.__load_model(model_path) class Learning(Ai): def __init__(self): super().__init__() def __generate_labels_and_inputs(self,memory: List[str], tokenizer=None) -> tuple: if not tokenizer: tokenizer = Tokenizer() tokenizer.fit_on_texts(memory) sequences = tokenizer.texts_to_sequences(memory) x = [] y = [] for seq in sequences: for i in range(1, len(seq)): x.append(seq[:i]) y.append(seq[i]) return x,y, tokenizer def create_model(self,memory: List[str], iters:int=2): X,y,tokenizer = self.__generate_labels_and_inputs(memory) maxlen:int = max([len(x) for x in X]) x_pad = pad_sequences(X, maxlen=maxlen, padding="pre") y = np.array(y) model = Sequential() model.add(Embedding(input_dim=VOCAB_SIZE,output_dim=128,input_length=maxlen)) model.add(LSTM(64)) model.add(Dense(VOCAB_SIZE, activation="softmax")) model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) history = model.fit(x_pad, y, epochs=iters, batch_size=32) self.save_model(model, tokenizer, history) def add_training(self,memory: List[str], iters:int=2): tokenizer_path = os.path.join(settings.get("model_path"),"tokenizer.pkl") with open(tokenizer_path, "rb") as f: tokenizer = pickle.load(f) X,y,_ = self.__generate_labels_and_inputs(memory, tokenizer) maxlen:int = max([len(x) for x in X]) x_pad = pad_sequences(X, maxlen=maxlen, padding="pre") y = np.array(y) history = self.model.fit(x_pad,y, epochs=iters, validation_data=(x_pad,y), batch_size=64) # Idelaly, validation data would be seperate from the actual data self.save_model(self.model,tokenizer,history,self.get_model_name_from_path(settings.get("model_path"))) class Generation(Ai): def __init__(self): super().__init__() def generate_sentence(self, word_amount:int, seed:str): if not self.is_loaded: return False for _ in range(word_amount): token_list = self.tokenizer.texts_to_sequences([seed])[0] token_list = pad_sequences([token_list], maxlen=self.model.layers[0].input_shape[1], padding="pre") output_word = "" # Sometimes model fails to predict the word, so using a fallback predicted_probs = self.model.predict(token_list, verbose=0) predicted_word_index = np.argmax(predicted_probs, axis=-1)[0] for word, index in self.tokenizer.word_index.items(): if index == predicted_word_index: output_word = word break seed += " " + output_word return seed VOCAB_SIZE = 100_000 SETTINGS_TYPE = TypedDict("SETTINGS_TYPE", { "model_path":str, # path to the base folder of the model, aka .../models/05-01-2025-22_31/ "tokenizer_path":str, }) model_dropdown_items = [] settings: SETTINGS_TYPE = {} learning:Learning generation: Generation class Settings: def __init__(self): self.settings_path:str = os.path.join(".","models","settings.json") def load(self): global settings try: with open(self.settings_path,"r") as f: settings = json.load(f) except FileNotFoundError: with open(self.settings_path,"w") as f: json.dump({},f) def change_model(self,new_model_base_path:str): global settings new_model_path = os.path.join(".","models",new_model_base_path) with open(self.settings_path,"r") as f: settings = json.load(f) settings["model_path"] = new_model_path with open(self.settings_path, "w") as f: json.dump(settings,f) class Dropdown(discord.ui.Select): def __init__(self, items:List[str]): global model_dropdown_items model_dropdown_items = [] for item in items: model_dropdown_items.append( discord.SelectOption(label=item) ) super().__init__(placeholder="Select model", options=model_dropdown_items) async def callback(self, interaction: discord.Interaction): if int(interaction.user.id) != int(os.getenv("ownerid")): await interaction.message.channel.send("KILL YOURSELF") Settings().change_model(self.values[0]) await interaction.message.channel.send(f"Changed model to {self.values[0]}") class DropdownView(discord.ui.View): def __init__(self, timeout, models): super().__init__(timeout=timeout) self.add_item(Dropdown(models)) class Tf(commands.Cog): @staticmethod def needs_ready(func): def inner(args:tuple, kwargs:dict): if not ready: raise AttributeError("Not ready!") a = func(*args, **kwargs) return a return inner def __init__(self,bot): global learning, generation global ready os.makedirs(os.path.join(".","models"),exist_ok=True) Settings().load() self.bot = bot learning = Learning() generation = Generation() @commands.command() async def start(self,ctx): await ctx.defer() await ctx.send("hi") @commands.command() async def generate(self,ctx,seed:str,word_amount:int=5): await ctx.defer() await ctx.send(generation.generate_sentence(word_amount,seed)) @commands.command() async def create(self,ctx): await ctx.defer() with open("memory.json","r") as f: memory:List[str] = json.load(f) learning.create_model(memory) # TODO: CHANGE await ctx.send("Trained succesfully!") @commands.command() async def train(self,ctx): await ctx.defer() with open("memory.json","r") as f: memory:List[str] = json.load(f) learning.add_training(memory,2) await ctx.send("Finished!") @commands.command() async def change(self,ctx,model:str=None): embed = discord.Embed(title="Change model",description="Which model would you like to use?") if model is None: models:List[str] = os.listdir(os.path.join(".","models")) models = [folder for folder in models if re.match(MODEL_MATCH_STRING,folder)] if len(models) == 0: models = ["No models available."] await ctx.send(embed=embed,view=DropdownView(90,models)) learning.reload_model() generation.reload_model() async def setup(bot): await bot.add_cog(Tf(bot))