%%capture
contraction_mapping = {"ain't": "is not", "aren't": "are not","can't": "cannot", "'cause": "because", "could've": "could have", "couldn't": "could not",
"didn't": "did not", "doesn't": "does not", "don't": "do not", "hadn't": "had not", "hasn't": "has not", "haven't": "have not",
"he'd": "he would","he'll": "he will", "he's": "he is", "how'd": "how did", "how'd'y": "how do you", "how'll": "how will", "how's": "how is",
"I'd": "I would", "I'd've": "I would have", "I'll": "I will", "I'll've": "I will have","I'm": "I am", "I've": "I have", "i'd": "i would",
"i'd've": "i would have", "i'll": "i will", "i'll've": "i will have","i'm": "i am", "i've": "i have", "isn't": "is not", "it'd": "it would",
"it'd've": "it would have", "it'll": "it will", "it'll've": "it will have","it's": "it is", "let's": "let us", "ma'am": "madam",
"mayn't": "may not", "might've": "might have","mightn't": "might not","mightn't've": "might not have", "must've": "must have",
"mustn't": "must not", "mustn't've": "must not have", "needn't": "need not", "needn't've": "need not have","o'clock": "of the clock",
"oughtn't": "ought not", "oughtn't've": "ought not have", "shan't": "shall not", "sha'n't": "shall not", "shan't've": "shall not have",
"she'd": "she would", "she'd've": "she would have", "she'll": "she will", "she'll've": "she will have", "she's": "she is",
"should've": "should have", "shouldn't": "should not", "shouldn't've": "should not have", "so've": "so have","so's": "so as",
"this's": "this is","that'd": "that would", "that'd've": "that would have", "that's": "that is", "there'd": "there would",
"there'd've": "there would have", "there's": "there is", "here's": "here is","they'd": "they would", "they'd've": "they would have",
"they'll": "they will", "they'll've": "they will have", "they're": "they are", "they've": "they have", "to've": "to have",
"wasn't": "was not", "we'd": "we would", "we'd've": "we would have", "we'll": "we will", "we'll've": "we will have", "we're": "we are",
"we've": "we have", "weren't": "were not", "what'll": "what will", "what'll've": "what will have", "what're": "what are",
"what's": "what is", "what've": "what have", "when's": "when is", "when've": "when have", "where'd": "where did", "where's": "where is",
"where've": "where have", "who'll": "who will", "who'll've": "who will have", "who's": "who is", "who've": "who have",
"why's": "why is", "why've": "why have", "will've": "will have", "won't": "will not", "won't've": "will not have",
"would've": "would have", "wouldn't": "would not", "wouldn't've": "would not have", "y'all": "you all",
"y'all'd": "you all would","y'all'd've": "you all would have","y'all're": "you all are","y'all've": "you all have",
"you'd": "you would", "you'd've": "you would have", "you'll": "you will", "you'll've": "you will have",
"you're": "you are", "you've": "you have"}
class AttentionLayer(Layer):
'''
This class implements Bahdanau attention
'''
def __init__(self, **kwargs):
super(AttentionLayer, self).__init__(**kwargs)
def build(self, input_shape):
assert isinstance(input_shape, list)
#create a trainable weight variable for this layer
self.w_a = self.add_weight(name='w_a', shape= tf.TensorShape((input_shape[0][2], input_shape[0][2])), initializer= 'uniform', trainable= True)
self.u_a = self.add_weight(name='u_a', shape= tf.TensorShape((input_shape[1][2], input_shape[0][2])), initializer= 'uniform', trainable= True)
self.v_a = self.add_weight(name='v_a', shape= tf.TensorShape((input_shape[0][2], 1)), initializer= 'uniform', trainable = True)
super(AttentionLayer, self).build(input_shape)
def call(self, inputs, verbose = False):
'''
inputs: [encoder_output_sequence, decoder_output_sequence]
'''
assert type(inputs) == list
encoder_out_seq, decoder_out_seq = inputs
if verbose :
print("encoder out seq:", encoder_out_seq.shape)
print("decoder out seq:", decoder_out_seq.shape)
def energy_step(inputs, states):
'''
step function for computing energy for a single decoder state
'''
assert_msg = 'states must be a list. However, states {} is of type {}'.format(states, type(states))
assert isinstance(states, list) or isinstance(states, tuple), assert_msg
''' some parameters required for computing energy for as single decoder state '''
en_seq_len, en_hidden = encoder_out_seq.shape[1], encoder_out_seq.shape[2]
de_hidden = inputs.shape[-1]
''' computing s.wa where s=[s0, s1, ..., si] '''
# <= batch_size*en_seq_len, latent_dim
reshaped_enc_outputs = K.reshape(encoder_out_seq, (-1, en_hidden))
# <= batch_size*en_seq_len, latent_dim
w_a_dot_s = K.reshape(K.dot(reshaped_enc_outputs, self.w_a), (-1, en_seq_len, en_hidden))
if verbose : print("wa.s>", w_a_dot_s.shape)
'''computing hj.ua '''
u_a_dot_h = K.expand_dims(K.dot(inputs, self.u_a), 1) # <= batch_size, 1, latent_dim
if verbose: print("ua.h", u_a_dot_h.shape)
''' tanh(s.wa + hj.ua) '''
# <= batch_size*en_seq_len, latent_dim
reshaped_ws_plus_uh = K.tanh(K.reshape(w_a_dot_s + u_a_dot_h, (-1, en_hidden)))
if verbose : print("ws+uh>", reshaped_ws_plus_uh.shape)
'''softmax(va.tanh(s.wa+hj.ua))'''
# <= batch_size, en_seq_len
e_i = K.reshape(K.dot(reshaped_ws_plus_uh, self.v_a), (-1, en_seq_len))
# <= batch_size, en_seq_len
e_i = K.softmax(e_i)
if verbose : print("ei>", e_i.shape)
return e_i, [e_i]
def context_step(inputs, states):
'''step function for computing ci using ei '''
c_i = K.sum(encoder_out_seq*K.expand_dims(inputs, -1), axis=1)
if verbose : print("ci>", c_i.shape)
return c_i, [c_i]
def create_initial_state(inputs, hidden_size):
#we are not using initial states, ut need to pass something to K.rnn function
fake_state = K.zeros_like(inputs) # <= (batch_size, enc_seq_len, latent_dim)
fake_state = K.sum(fake_state, axis=[1,2]) # <= (batch_size)
fake_state = K.expand_dims(fake_state) # <= (batch_size, 1)
fake_state = K.tile(fake_state, [1, hidden_size]) # <= (batch_size, latent_dim)
return fake_state
fake_state_c = create_initial_state(encoder_out_seq, encoder_out_seq.shape[-1])
fake_state_e = create_initial_state(encoder_out_seq, encoder_out_seq.shape[1]) # <= (batch_size, enc_seq_len, latent_dim)
'''computing energy outputs'''
# e_outputs => (batch_size, de_seq_len, en_seq_len)
last_out, e_outputs, _ = K.rnn(energy_step, decoder_out_seq, [fake_state_e])
'''computing context vectors'''
last_out, c_outputs, _ = K.rnn(context_step, e_outputs, [fake_state_c])
return c_outputs, e_outputs
def compute_output_shape(self, input_shape):
'''
outputs produced by the layer
'''
return [
tf.TensorShape((input_shape[1][0], input_shape[1][1], input_shape[1][2])),
tf.TensorShape((input_shape[1][0], input_shape[1][1], input_shape[0][1]))
]
def get_dataset():
!wget --load-cookies /tmp/cookies.txt\
"https://docs.google.com/uc?export=download&confirm=$(wget --quiet --save-cookies /tmp/cookies.txt\
--keep-session-cookies --no-check-certificate 'https://docs.google.com/uc?export=download&id=1EzY1IfN_QGCVp9EUVxZ3dZhRF_EUtyJA' -O- \
| sed -rn 's/.*confirm=([0-9A-Za-z_]+).*/\1\n/p')&id=1EzY1IfN_QGCVp9EUVxZ3dZhRF_EUtyJA" -O kaggle.json && rm -rf /tmp/cookies.txt
!pip install kaggle
!mkdir ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json
!kaggle datasets download -d snap/amazon-fine-food-reviews
!unzip 'amazon-fine-food-reviews.zip'
def get_embeddings_matrix(glove, word2ix, word_embed_size = 200, vocab_size = 5000):
embeddings_matrix = np.zeros((vocab_size, word_embed_size))
for word, index in word2ix.items():
embed_vec = glove.get(word)
if embed_vec is not None :
embeddings_matrix[index] = embed_vec
return embeddings_matrix
def get_glove_200():
def read_pickle(pickle_file):
with open(pickle_file, 'rb') as f :
return pickle.load(f)
!wget --load-cookies /tmp/cookies.txt "https://docs.google.com/uc?export=download&confirm=$(wget --quiet --save-cookies /tmp/cookies.txt --keep-session-cookies --no-check-certificate 'https://docs.google.com/uc?export=download&id=10Xgw5e157bD3z7cJG2_0DLmZSwGzXh8c' -O- | sed -rn 's/.*confirm=([0-9A-Za-z_]+).*/\1\n/p')&id=10Xgw5e157bD3z7cJG2_0DLmZSwGzXh8c" -O glove_200.pickle && rm -rf /tmp/cookies.txt
glove_200 = read_pickle('glove_200.pickle')
return glove_200
def cleaner(data, maxlen = 3, RemoveStopWords = True):
def clean(text, threshold, RemoveStopWords):
newtext = text.lower()
newtext = BeautifulSoup(newtext, 'lxml').text
newtext = re.sub(r'\([^)]*\)', '', newtext)
newtext = re.sub('"', '', newtext)
newtext = ' '.join([contraction_mapping[t] if t in contraction_mapping else t for t in newtext.split(" ")])
newtext = re.sub(r"'s\b","",newtext) #?
newtext = re.sub("[^a-zA-Z]", " ", newtext) #?
if RemoveStopWords : tokens = [w for w in newtext.split(" ") if not w in stop_words]
else : tokens = [w for w in newtext.split(" ")]
long_tokens = [t for t in tokens if len(t)>=threshold]
return (" ".join(long_tokens)).strip()
cleaned = [clean(d, maxlen, RemoveStopWords) for d in data]
return cleaned
def get_model(embeddings_matrix, max_len_text, x_voc_size, y_voc_size, latent_dim = 500):
#Encoder
encoder_inputs = Input(shape=(max_len_text,))
enc_emb = Embedding(x_voc_size, 200, weights = [embeddings_matrix], input_length = max_len_text, trainable = False)(encoder_inputs)
#LSTM 1
encoder_lstm1 = LSTM(latent_dim, return_sequences = True, return_state = True)
encoder_output1, state_h1, state_c1 = encoder_lstm1(enc_emb)
#LSTM 2
encoder_lstm2 = LSTM(latent_dim, return_sequences = True, return_state = True)
encoder_output2, state_h2, state_c2 = encoder_lstm2(encoder_output1)
#LSTM 3
encoder_lstm3 = LSTM(latent_dim, return_sequences = True, return_state = True)
encoder_outputs, state_h, state_c = encoder_lstm3(encoder_output2)
print("encoder outputs shape:", encoder_outputs.shape)
#set up the decoder
decoder_inputs = Input(shape=(None,))
dec_emb_layer = Embedding(x_voc_size, 200, weights= [embeddings_matrix], input_length = max_len_text, trainable = False)
dec_emb = dec_emb_layer(decoder_inputs)
#LSTM using encoder_states as initial state
decoder_lstm = LSTM(latent_dim, return_sequences = True, return_state = True)
decoder_outputs, decoder_fwd_state, decoder_back_state = decoder_lstm(dec_emb, initial_state = [state_h, state_c])
print("decoder_outputs shape:", decoder_outputs.shape)
#attention layer
attn_layer = AttentionLayer(name='attention_layer')
attn_out, attn_states = attn_layer([encoder_outputs, decoder_outputs])
#concatenate attention output and decoder LSTM output
decoder_concat_input = Concatenate(axis=-1, name='concat_layer')([decoder_outputs, attn_out])
#Dense layer
decoder_dense = TimeDistributed(Dense(y_voc_size, activation='softmax'))
decoder_outputs = decoder_dense(decoder_concat_input)
#Define the model
model = Model([encoder_inputs, decoder_inputs], decoder_outputs)
return model