Spaces:
Runtime error
Runtime error
| # import matplotlib.pyplot as plt | |
| # %matplotlib inline | |
| # import seaborn as sns | |
| import pickle | |
| import pandas as pd | |
| import re | |
| import os | |
| import tensorflow as tf | |
| from tensorflow.keras.layers import Embedding, LSTM, Dense,Bidirectional | |
| from tensorflow.keras.models import Model | |
| from tensorflow.keras.preprocessing.text import Tokenizer | |
| from tensorflow.keras.preprocessing.sequence import pad_sequences | |
| from tensorflow.keras import backend as K | |
| import numpy as np | |
| import string | |
| from string import digits | |
| from sklearn.utils import shuffle | |
| from sklearn.model_selection import train_test_split | |
| import nltk | |
| from nltk.tokenize import word_tokenize | |
| from tqdm import tqdm | |
| from Data import Dataset,Dataloder | |
| """########################################------MODEL------######################################## | |
| """ | |
| ########################################------Encoder model------######################################## | |
| class Encoder(tf.keras.Model): | |
| def __init__(self,inp_vocab_size,embedding_size,lstm_size,input_length): | |
| super().__init__() | |
| self.inp_vocab_size = inp_vocab_size | |
| self.embedding_size = embedding_size | |
| self.lstm_size = lstm_size | |
| self.input_length = input_length | |
| #Initialize Embedding layer | |
| def build(self,input_shape): | |
| self.embedding = Embedding(input_dim=self.inp_vocab_size, output_dim=self.embedding_size, | |
| input_length=self.input_length,trainable=True,name="encoder_embed") | |
| #Intialize Encoder LSTM layer | |
| self.bilstm = tf.keras.layers.Bidirectional(LSTM(units = self.lstm_size,return_sequences=True,return_state=True),merge_mode='sum') | |
| def call(self,input_sequence,initial_state): | |
| ''' | |
| Input:Input_sequence[batch_size,input_length] | |
| Initial_state 4x[batch_size,encoder_units] | |
| Output: lstm_enc_output [batch_size,input_length,encoder_units] | |
| forward_h/c & backward_h/c [batch_size,encoder_units] | |
| ''' | |
| # print("initial_state",len(initial_state)) | |
| input_embd = self.embedding(input_sequence) | |
| lstm_enc_output, forward_h, forward_c, backward_h, backward_c = self.bilstm(input_embd,initial_state) | |
| return lstm_enc_output, forward_h, forward_c, backward_h, backward_c | |
| # return lstm_enc_output, forward_h, forward_c | |
| def initialize_states(self,batch_size): | |
| ''' | |
| Given a batch size it will return intial hidden state and intial cell state. | |
| If batch size is 32- Hidden state is zeros of size [32,lstm_units], cell state zeros is of size [32,lstm_units] | |
| ''' | |
| self.lstm_state_h = tf.random.uniform(shape=[batch_size,self.lstm_size],dtype=tf.float32) | |
| self.lstm_state_c = tf.random.uniform(shape=[batch_size,self.lstm_size],dtype=tf.float32) | |
| return self.lstm_state_h,self.lstm_state_c | |
| def initialize_states_bidirectional(self,batch_size): | |
| states = [tf.zeros((batch_size, self.lstm_size)) for i in range(4)] | |
| return states | |
| ########################################------Attention model------######################################## | |
| class Attention(tf.keras.layers.Layer): | |
| def __init__(self,scoring_function, att_units): | |
| super().__init__() | |
| self.att_units = att_units | |
| self.scoring_function = scoring_function | |
| # self.batch_size = batch_size | |
| # Please go through the reference notebook and research paper to complete the scoring functions | |
| if self.scoring_function=='dot': | |
| pass | |
| elif scoring_function == 'general': | |
| self.dense = Dense(self.att_units) | |
| elif scoring_function == 'concat': | |
| self.dense = tf.keras.layers.Dense(att_units, activation='tanh') | |
| self.dense1 = tf.keras.layers.Dense(1) | |
| def call(self,decoder_hidden_state,encoder_output): | |
| if self.scoring_function == 'dot': | |
| decoder_hidden_state = tf.expand_dims(decoder_hidden_state,axis=2) | |
| similarity = tf.matmul(encoder_output,decoder_hidden_state) | |
| weights = tf.nn.softmax(similarity,axis=1) | |
| context_vector = tf.matmul(weights,encoder_output,transpose_a=True) | |
| context_vector = tf.squeeze(context_vector, axis=1) | |
| return context_vector,weights | |
| elif self.scoring_function == 'general': | |
| decoder_hidden_state=tf.expand_dims(decoder_hidden_state, 1) | |
| score = tf.matmul(decoder_hidden_state, self.dense( | |
| encoder_output), transpose_b=True) | |
| attention_weights = tf.keras.activations.softmax(score, axis=-1) | |
| context_vector = tf.matmul(attention_weights, encoder_output) | |
| context_vector=tf.reduce_sum(context_vector, axis=1) | |
| attention_weights=tf.reduce_sum(attention_weights, axis=1) | |
| attention_weights=tf.expand_dims(attention_weights, 2) | |
| return context_vector,attention_weights | |
| elif self.scoring_function == 'concat': | |
| decoder_hidden_state=tf.expand_dims(decoder_hidden_state, 1) | |
| decoder_hidden_state = tf.tile( | |
| decoder_hidden_state, [1,30, 1]) | |
| score = self.dense1( | |
| self.dense(tf.concat((decoder_hidden_state, encoder_output), axis=-1))) | |
| score = tf.transpose(score, [0, 2, 1]) | |
| attention_weights = tf.keras.activations.softmax(score, axis=-1) | |
| context_vector = tf.matmul(attention_weights, encoder_output) | |
| context_vector=tf.reduce_sum(context_vector, axis=1) | |
| attention_weights=tf.reduce_sum(attention_weights, axis=1) | |
| attention_weights=tf.expand_dims(attention_weights, 2) | |
| return context_vector,attention_weights | |
| ########################################------OneStepDecoder model------######################################## | |
| class OneStepDecoder(tf.keras.Model): | |
| def __init__(self,tar_vocab_size, embedding_dim, input_length, dec_units ,score_fun ,att_units): | |
| # Initialize decoder embedding layer, LSTM and any other objects needed | |
| super().__init__() | |
| self.tar_vocab_size = tar_vocab_size | |
| self.embedding_dim = embedding_dim | |
| self.input_length = input_length | |
| self.dec_units = dec_units | |
| self.score_fun = score_fun | |
| self.att_units = att_units | |
| def build(self,input_shape): | |
| self.attention = Attention('concat', self.att_units) | |
| self.embedding = Embedding(input_dim=self.tar_vocab_size,output_dim=self.embedding_dim, | |
| input_length=self.input_length,mask_zero=True,trainable=True,name="Decoder_Embed") | |
| self.bilstm = tf.keras.layers.Bidirectional(LSTM(units = self.dec_units,return_sequences=True,return_state=True),merge_mode='sum') | |
| self.dense = Dense(self.tar_vocab_size) | |
| def call(self,input_to_decoder, encoder_output, f_state_h,f_state_c,b_state_h,b_state_c): | |
| dec_embd = self.embedding(input_to_decoder) | |
| context_vectors,attention_weights = self.attention(f_state_h,encoder_output) | |
| context_vectors_ = tf.expand_dims(context_vectors,axis=1) | |
| concat_vector = tf.concat([dec_embd,context_vectors_],axis=2) | |
| states = [f_state_h,f_state_c,b_state_h,b_state_c] | |
| decoder_outputs,dec_f_state_h,dec_f_state_c,dec_b_state_h,dec_b_state_c = self.bilstm(concat_vector,states) | |
| decoder_outputs = tf.squeeze(decoder_outputs,axis=1) | |
| dense_output = self.dense(decoder_outputs) | |
| return dense_output,dec_f_state_h,dec_f_state_c,attention_weights,context_vectors | |
| ########################################------Decoder model------######################################## | |
| class Decoder(tf.keras.Model): | |
| def __init__(self,out_vocab_size, embedding_dim, input_length, dec_units ,score_fun ,att_units): | |
| #Intialize necessary variables and create an object from the class onestepdecoder | |
| super().__init__() | |
| self.out_vocab_size = out_vocab_size | |
| self.embedding_dim = embedding_dim | |
| self.input_length = input_length | |
| self.dec_units = dec_units | |
| self.score_fun = score_fun | |
| self.att_units = att_units | |
| def build(self,input_shape): | |
| self.onestep_decoder = OneStepDecoder(self.out_vocab_size, self.embedding_dim, self.input_length, self.dec_units ,self.score_fun , | |
| self.att_units) | |
| def call(self, input_to_decoder,encoder_output,f_decoder_hidden_state,f_decoder_cell_state,b_decoder_hidden_state,b_decoder_cell_state ): | |
| all_outputs = tf.TensorArray(tf.float32, size=self.input_length,name="output_array") | |
| for timestep in range(self.input_length): | |
| output,state_h,state_c,attention_weights,context_vector = self.onestep_decoder(input_to_decoder[:,timestep:timestep+1],encoder_output, | |
| f_decoder_hidden_state,f_decoder_cell_state,b_decoder_hidden_state,b_decoder_cell_state) | |
| all_outputs = all_outputs.write(timestep,output) | |
| all_outputs = tf.transpose(all_outputs.stack(),[1,0,2]) | |
| return all_outputs | |
| ########################################------encoder_decoder model------######################################## | |
| class encoder_decoder(tf.keras.Model): | |
| def __init__(self,out_vocab_size,inp_vocab_size,embedding_dim,embedding_size,in_input_length,tar_input_length,dec_units,lstm_size,att_units,batch_size): | |
| super().__init__() | |
| #Intialize objects from encoder decoder | |
| self.out_vocab_size = out_vocab_size | |
| self.inp_vocab_size = inp_vocab_size | |
| self.embedding_dim_target = embedding_dim | |
| self.embedding_dim_input = embedding_size | |
| self.in_input_length = in_input_length | |
| self.tar_input_length = tar_input_length | |
| self.dec_lstm_size = dec_units | |
| self.enc_lstm_size = lstm_size | |
| self.att_units = att_units | |
| self.batch_size = batch_size | |
| def build(self,input_shape): | |
| self.encoder = Encoder(self.inp_vocab_size,self.embedding_dim_input,self.enc_lstm_size,self.in_input_length) | |
| self.decoder = Decoder(self.out_vocab_size,self.embedding_dim_target, self.tar_input_length, self.dec_lstm_size ,'general' ,self.att_units) | |
| def call(self,data): | |
| input_sequence, target_sequence = data[0],data[1] | |
| # print(input_sequence.shape) | |
| encoder_initial_state = self.encoder.initialize_states_bidirectional(self.batch_size) | |
| # print(len(encoder_initial_state)) | |
| encoder_output,f_encoder_state_h,f_encoder_state_c,b_encoder_state_h,b_encoder_state_c = self.encoder(input_sequence,encoder_initial_state) | |
| decoder_output = self.decoder(target_sequence,encoder_output,f_encoder_state_h,f_encoder_state_c,b_encoder_state_h,b_encoder_state_c) | |
| return decoder_output | |
| def loss_function(real, pred): | |
| loss_object = tf.keras.losses.SparseCategoricalCrossentropy( | |
| from_logits=True) | |
| mask = tf.math.logical_not(tf.math.equal(real, 0)) | |
| loss_ = loss_object(real, pred) | |
| mask = tf.cast(mask, dtype=loss_.dtype) | |
| loss_ *= mask | |
| return tf.reduce_mean(loss_) | |
| def accuracy(real,pred): | |
| pred_val = K.cast(K.argmax(pred,axis=-1),dtype='float32') | |
| real_val = K.cast(K.equal(real,pred_val),dtype='float32') | |
| mask = K.cast(K.greater(real,0),dtype='float32') | |
| n_correct = K.sum(mask*real_val) | |
| n_total = K.sum(mask) | |
| return n_correct/n_total | |
| def load_weights(): | |
| """======================================================LOADING======================================================""" | |
| # Dataset | |
| with open('dataset/30_length/train.pickle', 'rb') as handle: | |
| train = pickle.load(handle) | |
| with open('dataset/30_length/validation.pickle', 'rb') as handle: | |
| validation = pickle.load(handle) | |
| # Tokenizer | |
| with open('tokenizer/30_tokenizer_eng.pickle', 'rb') as handle: | |
| tokenizer_eng = pickle.load(handle) | |
| with open('tokenizer/30_tokenizer_ass.pickle', 'rb') as handle: | |
| tokenizer_ass = pickle.load(handle) | |
| # Vocab Size | |
| vocab_size_ass = len(tokenizer_ass.word_index.keys()) | |
| vocab_size_eng = len(tokenizer_eng.word_index.keys()) | |
| return train,validation,tokenizer_eng,tokenizer_ass,vocab_size_ass,vocab_size_eng | |
| def main(): | |
| train,validation,tokenizer_eng,tokenizer_ass,vocab_size_ass,vocab_size_eng = load_weights() | |
| in_input_length = 30 | |
| tar_input_length = 30 | |
| inp_vocab_size = vocab_size_ass | |
| out_vocab_size = vocab_size_eng | |
| dec_units = 128 | |
| lstm_size = 128 | |
| att_units = 256 | |
| batch_size = 32 | |
| embedding_dim = 300 | |
| embedding_size = 300 | |
| train_dataset = Dataset(train, tokenizer_ass, tokenizer_eng, in_input_length) | |
| test_dataset = Dataset(validation, tokenizer_ass, tokenizer_eng, in_input_length) | |
| train_dataloader = Dataloder(train_dataset, batch_size) | |
| test_dataloader = Dataloder(test_dataset, batch_size) | |
| print(train_dataloader[0][0][0].shape, train_dataloader[0][0][1].shape, train_dataloader[0][1].shape) | |
| model = encoder_decoder(out_vocab_size,inp_vocab_size,embedding_dim,embedding_size,in_input_length,tar_input_length,dec_units,lstm_size,att_units,batch_size) | |
| optimizer = tf.keras.optimizers.Adam() | |
| model.compile(optimizer=optimizer,loss=loss_function,metrics=[accuracy]) | |
| # train_steps=train.shape[0]//32 | |
| # valid_steps=validation.shape[0]//32 | |
| model.fit(train_dataloader, steps_per_epoch=10, epochs=1,verbose=1, validation_data=train_dataloader, validation_steps=1) | |
| model.load_weights('models/bi_directional_concat_256_batch_160_epoch_30_length_ass_eng_nmt_weights.h5') | |
| model.fit(train_dataloader, steps_per_epoch=10, epochs=1,verbose=1, validation_data=train_dataloader, validation_steps=1) | |
| model.summary() | |
| return model,tokenizer_eng,tokenizer_ass,in_input_length | |
| # if __name__=="__main__": | |
| # main() |