import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
x_t = np.linspace(0,60,300)
y_t = np.sin(x_t)
plt.plot(x_t,y_t)
Seq length is an important parameter. Each sample should be of a SEQ length that covers more than one cycle of sinwave (also depends on resolution of linspace). Otherwise, we get abnormal results while serving
SEQ = 100
HIDDEN = 100
INP_SIZE=1
Generator
def generator():
x = np.linspace(0,60,300)
for i in range(2000):
idx = np.random.randint(0,200) #scalar
yield (x[idx:idx+SEQ],np.sin(x[idx:idx+SEQ]))
Check one sample
for i in generator():
plt.plot(i[0],i[1])
break
tf.data.Dataset
dataset = tf.data.Dataset.from_generator(generator,(tf.float32,tf.float32),((SEQ,),(SEQ,)))
dataset = dataset.repeat(3)
dataset = dataset.batch(1)
iterator = dataset.make_one_shot_iterator()
idx,sin_x = iterator.get_next()
sin_x.shape
Check
with tf.Session() as sess:
x_,y_ = sess.run([idx,sin_x])
plt.plot(x_t,y_t)
plt.plot(x_,y_,'*')
X = sin_x[:,:-1]
target = sin_x[:,1:]
X,target
h_next = activation(Whh x h + Whx x x + b)
y = Wyh x h_next
def RNNCell(x:'Input Tensor [batch,1]',h:'Hidden state tensor [batch,hidden size]'):
inp_size = x.shape[1]
hidden_size = h.shape[1]
with tf.variable_scope('params',reuse=tf.AUTO_REUSE):
Whh = tf.get_variable("Whh",shape=(hidden_size,hidden_size),dtype=tf.float32,initializer=tf.initializers.orthogonal())
Whx = tf.get_variable("Whx",shape=(inp_size,hidden_size),dtype=tf.float32,initializer=tf.initializers.glorot_uniform())
b = tf.get_variable("b",shape=(hidden_size,),dtype=tf.float32,initializer=tf.initializers.zeros())
h_next = tf.nn.relu(tf.add(tf.add(tf.matmul(h,Whh),tf.matmul(x,Whx)),b))
return h_next
Unroll the cell
l = []
h = tf.zeros((tf.shape(X)[0],HIDDEN)) #Initial hidden state
for i in range(SEQ-1):
with tf.variable_scope("RNN"):
x = tf.reshape(X[:,i],(tf.shape(X)[0],1)) #(Batch,1)
h_next = RNNCell(x,h)
h = h_next
l.append(h)
H = tf.convert_to_tensor(l)
H
This is done outside the loop for efficiency
outputs = tf.layers.dense(H,1,activation=tf.nn.tanh)
outputs
outputs = tf.transpose(outputs,(1,0,2))
outputs = tf.reshape(outputs,(-1,SEQ-1))
outputs
target,outputs
tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES)
loss = tf.reduce_mean(tf.square(outputs-target))
optimizer = tf.train.AdamOptimizer(0.0001)
train = optimizer.minimize(loss)
saver = tf.train.Saver()
!rm models/RNN/*
def train_rnn():
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
try:
i = 1
tmp = []
while True:
i = i+1
l,_ = sess.run([loss,train])
tmp.append(l)
if i%500 == 0:
avg_loss = np.array(tmp).mean()
print("Batch: ",i,avg_loss)
tmp = []
except tf.errors.OutOfRangeError:
pass
saver.save(sess,'models/RNN/my_first_model.ckpt')
train_rnn()
INVALID Validation : Check abalation study / keras_organized
def val():
with tf.Session() as sess:
saver.restore(sess,'models/RNN/my_first_model.ckpt')
try:
while True:
o,t = sess.run([outputs,target])
plt.plot(o[0],'*')
plt.plot(t[0])
break
except tf.errors.OutOfRangeError:
pass
val()
Works for sequence length greater than that with which it was trained!
Although sometimes, even though the training error is less, it fails during serving. Retrain and try again
h = tf.zeros((1,HIDDEN)) #Initial hidden state
x = tf.constant([[0.1]],dtype=tf.float32)
W_d = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES,scope='dense/kernel:0')[0]
b_d = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES,scope='dense/bias:0')[0]
out = []
for i in range(500):
with tf.variable_scope("RNN"):
h_next = RNNCell(x,h)
x = tf.tanh(tf.add(tf.matmul(h_next,W_d),b_d))
out.append(x)
h = h_next
with tf.Session() as sess:
saver.restore(sess,'models/RNN/my_first_model.ckpt')
o = sess.run(out)
o = np.array(o).reshape(500)
plt.plot(o)
Much slower. Probably because the output matmul is happening inside the loop
cell = tf.contrib.rnn.OutputProjectionWrapper(
tf.contrib.rnn.BasicRNNCell(num_units=HIDDEN, activation=tf.nn.relu),
output_size=1,activation=tf.nn.tanh)
outputs, states = tf.nn.dynamic_rnn(cell, tf.reshape(X,(-1,SEQ-1,1)), dtype=tf.float32)
outputs = tf.reshape(outputs,(-1,SEQ-1))
loss = tf.reduce_mean(tf.square(outputs-target))
optimizer = tf.train.AdamOptimizer(0.0001)
train = optimizer.minimize(loss)
saver = tf.train.Saver()
train_rnn()
val()
Dont forget to pass the states around the loop
start = -0.1
x_ = tf.constant(np.array(start).reshape(1,1,1),dtype=tf.float32)
h = tf.zeros((1,HIDDEN))
out=[]
for i in range(SEQ):
x_, h = tf.nn.dynamic_rnn(cell,x_, initial_state=h, dtype=tf.float32) #I think this is
out.append(x_)
with tf.Session() as sess:
saver.restore(sess,'models/RNN/my_first_model.ckpt')
o = sess.run(out)
o = np.array(o).reshape(SEQ)
plt.plot(o)
1) Make sure to use proper naming conventions. Had to debug for 5 hours cuz of overlooking data format obtained from iterator.get_next()
2) INVALID COMMENT : Seq length is an important parameter. Each sample should be of a SEQ length that covers more than one cycle of sinwave (also depends on resolution of linspace). Otherwise, we get abnormal results while serving
The correct way to evaluavate and generate is shown in the abalation study/ keras organized section. However, sequence covering more cycles does improve results, smaller seq should also be giving considerable results
3) Works for sequence length greater than that with which it was trained!
4) Although sometimes, even though the training error is less, it fails during serving. Retrain and try again
5) Dont forget to pass the hidden states around the loop while unrolling (When done with uncommon evaluation method).
We do not have to pass states if we follow the generation method described at the end of keras organized section