세 가지 순환 신경 망 (RNN) 알고리즘 의 실현 (From scratch, Theano, Keras)

30423 단어 인공지능

  • 프롤로그
    본문
  • RNN From Scratch
  • RNN Using Theano
  • RNN Using Keras


  • 후기

    "제 인 에서 번 거 로 움, 그리고 제 인 까지!"
    머리말
    쓸데없는 말 을 뛰 어 넘 고 바로 본문 을 보다.
    한 동안 의 학습 을 통 해 저 는 RNN 의 기본 원리 와 실현 방법 을 초보 적 으로 알 게 되 었 습 니 다. 여기 서 세 가지 서로 다른 RNN 실현 방법 을 열거 하여 참고 하도록 하 겠 습 니 다.
    RNN 의 원 리 는 인터넷 에서 많은 것 을 찾 을 수 있 습 니 다. 저 는 여기 서 말 하지 않 겠 습 니 다. 말 해도 그것 보다 더 좋 지 않 을 것 입 니 다. 여기 서 먼저 RNN 튜 토리 얼 을 추천 합 니 다. 잘 했 습 니 다. 네 개의 post 를 보고 기본 적 으로 RNN 을 실현 할 수 있 습 니 다.
    본문
    RNN From Scratch
    import nltk
    import csv
    import itertools
    import numpy as np
    from utils import *
    import operator
    from datetime import datetime
    import sys
    
    class RNNNumpy:
        def __init__(self, word_dim, hidden_dim=100, bptt_truncate=4):
            # Assign instance variables
            self.word_dim = word_dim
            self.hidden_dim = hidden_dim
            self.bptt_truncate = bptt_truncate
            # Randomly initialize the network parameters
            self.U = np.random.uniform(-np.sqrt(1./word_dim), np.sqrt(1./word_dim), (hidden_dim, word_dim))
            self.V = np.random.uniform(-np.sqrt(1./hidden_dim), np.sqrt(1./hidden_dim), (word_dim, hidden_dim))
            self.W = np.random.uniform(-np.sqrt(1./hidden_dim), np.sqrt(1./hidden_dim), (hidden_dim, hidden_dim))
    
        def forward_propagation(self, x):
            # The total number of time steps
            T = len(x)
            # During forward propagation we save all hidden states in s because need them later.
            # We add one additional element for the initial hidden, which we set to 0
            s = np.zeros((T + 1, self.hidden_dim))
            s[-1] = np.zeros(self.hidden_dim)
            # The outputs at each time step. Again, we save them for later.
            o = np.zeros((T, self.word_dim))
            # For each time step...
            for t in np.arange(T):
                # Note that we are indxing U by x[t]. This is the same as multiplying U with a one-hot vector.
                s[t] = np.tanh(self.U[:,x[t]] + self.W.dot(s[t-1]))
                o[t] = softmax(self.V.dot(s[t]))
            return [o, s]
    
        def predict(self, x):
            # Perform forward propagation and return index of the highest score
            o, s = self.forward_propagation(x)
            return np.argmax(o, axis=1)
    
        def calculate_total_loss(self, x, y):
            L = 0
            # For each sentence...
            for i in np.arange(len(y)):
                o, s = self.forward_propagation(x[i])
                # We only care about our prediction of the "correct" words
                correct_word_predictions = o[np.arange(len(y[i])), y[i]]
                # Add to the loss based on how off we were
                L += -1 * np.sum(np.log(correct_word_predictions))
            return L
    
        def calculate_loss(self, x, y):
            # Divide the total loss by the number of training examples
            N = np.sum((len(y_i) for y_i in y))
            return self.calculate_total_loss(x,y)/N
    
        def bptt(self, x, y):
            T = len(y)
            # Perform forward propagation
            o, s = self.forward_propagation(x)
            # We accumulate the gradients in these variables
            dLdU = np.zeros(self.U.shape)
            dLdV = np.zeros(self.V.shape)
            dLdW = np.zeros(self.W.shape)
            delta_o = o
            delta_o[np.arange(len(y)), y] -= 1.
            # For each output backwards...
            for t in np.arange(T)[::-1]:
                dLdV += np.outer(delta_o[t], s[t].T)
                # Initial delta calculation
                delta_t = self.V.T.dot(delta_o[t]) * (1 - (s[t] ** 2))
                # Backpropagation through time (for at most self.bptt_truncate steps)
                for bptt_step in np.arange(max(0, t-self.bptt_truncate), t+1)[::-1]:
                    # print "Backpropagation step t=%d bptt step=%d " % (t, bptt_step)
                    dLdW += np.outer(delta_t, s[bptt_step-1])              
                    dLdU[:,x[bptt_step]] += delta_t
                    # Update delta for next step
                    delta_t = self.W.T.dot(delta_t) * (1 - s[bptt_step-1] ** 2)
            return [dLdU, dLdV, dLdW]
    
        def gradient_check(self, x, y, h=0.001, error_threshold=0.01):
            # Calculate the gradients using backpropagation. We want to checker if these are correct.
            bptt_gradients = self.bptt(x, y)
            # List of all parameters we want to check.
            model_parameters = ['U', 'V', 'W']
            # Gradient check for each parameter
            for pidx, pname in enumerate(model_parameters):
                # Get the actual parameter value from the mode, e.g. model.W
                parameter = operator.attrgetter(pname)(self)
                print "Performing gradient check for parameter %s with size %d." % (pname, np.prod(parameter.shape))
                # Iterate over each element of the parameter matrix, e.g. (0,0), (0,1), ...
                it = np.nditer(parameter, flags=['multi_index'], op_flags=['readwrite'])
                while not it.finished:
                    ix = it.multi_index
                    # Save the original value so we can reset it later
                    original_value = parameter[ix]
                    # Estimate the gradient using (f(x+h) - f(x-h))/(2*h)
                    parameter[ix] = original_value + h
                    gradplus = self.calculate_total_loss([x],[y])
                    parameter[ix] = original_value - h
                    gradminus = self.calculate_total_loss([x],[y])
                    estimated_gradient = (gradplus - gradminus)/(2*h)
                    # Reset parameter to original value
                    parameter[ix] = original_value
                    # The gradient for this parameter calculated using backpropagation
                    backprop_gradient = bptt_gradients[pidx][ix]
                    # calculate The relative error: (|x - y|/(|x| + |y|))
                    relative_error = np.abs(backprop_gradient - estimated_gradient)/(np.abs(backprop_gradient) + np.abs(estimated_gradient))
                    # If the error is to large fail the gradient check
                    if relative_error > error_threshold:
                        print "Gradient Check ERROR: parameter=%s ix=%s" % (pname, ix)
                        print "+h Loss: %f" % gradplus
                        print "-h Loss: %f" % gradminus
                        print "Estimated_gradient: %f" % estimated_gradient
                        print "Backpropagation gradient: %f" % backprop_gradient
                        print "Relative Error: %f" % relative_error
                        return
                    it.iternext()
                print "Gradient check for parameter %s passed." % (pname)
    
        # Performs one step of SGD.
        def sgd_step(self, x, y, learning_rate):
            # Calculate the gradients
            dLdU, dLdV, dLdW = self.bptt(x, y)
            # Change parameters according to gradients and learning rate
            self.U -= learning_rate * dLdU
            self.V -= learning_rate * dLdV
            self.W -= learning_rate * dLdW
        # Outer SGD Loop
        # - model: The RNN model instance
        # - X_train: The training data set
        # - y_train: The training data labels
        # - learning_rate: Initial learning rate for SGD
        # - nepoch: Number of times to iterate through the complete dataset
        # - evaluate_loss_after: Evaluate the loss after this many epochs
        def train_with_sgd(self, X_train, y_train, learning_rate=0.005, nepoch=100, evaluate_loss_after=5):
            # We keep track of the losses so we can plot them later
            losses = []
            num_examples_seen = 0
            for epoch in range(nepoch):
                # Optionally evaluate the loss
                if (epoch % evaluate_loss_after == 0):
                    loss = self.calculate_loss(X_train, y_train)
                    losses.append((num_examples_seen, loss))
                    time = datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
                    print "%s: Loss after num_examples_seen=%d epoch=%d: %f" % (time, num_examples_seen, epoch, loss)
                    # Adjust the learning rate if loss increases
                    if (len(losses) > 1 and losses[-1][1] > losses[-2][1]):
                        learning_rate = learning_rate * 0.5 
                        print "Setting learning rate to %f" % learning_rate
                    sys.stdout.flush()
                    # ADDED! Saving model oarameters
                    save_model_parameters_numpy("./data/rnn-numpy-%d-%d-%s.npz" % (self.hidden_dim, self.word_dim, time), self)
                # For each training example...
                for i in range(len(y_train)):
                    # One SGD step
                    self.sgd_step(X_train[i], y_train[i], learning_rate)
                    num_examples_seen += 1

    더 많은 코드 참조 github
    RNN Using Theano
    import numpy as np
    import theano as theano
    import theano.tensor as T
    from utils import *
    import operator
    from datetime import datetime
    import sys
    
    class RNNTheano:
    
        def __init__(self, word_dim, hidden_dim=100, bptt_truncate=4):
            # Assign instance variables
            self.word_dim = word_dim
            self.hidden_dim = hidden_dim
            self.bptt_truncate = bptt_truncate
            # Randomly initialize the network parameters
            U = np.random.uniform(-np.sqrt(1./word_dim), np.sqrt(1./word_dim), (hidden_dim, word_dim))
            V = np.random.uniform(-np.sqrt(1./hidden_dim), np.sqrt(1./hidden_dim), (word_dim, hidden_dim))
            W = np.random.uniform(-np.sqrt(1./hidden_dim), np.sqrt(1./hidden_dim), (hidden_dim, hidden_dim))
            # Theano: Created shared variables
            self.U = theano.shared(name='U', value=U.astype(theano.config.floatX))
            self.V = theano.shared(name='V', value=V.astype(theano.config.floatX))
            self.W = theano.shared(name='W', value=W.astype(theano.config.floatX))      
            # We store the Theano graph here
            self.theano = {}
            self.__theano_build__()
    
        def __theano_build__(self):
            U, V, W = self.U, self.V, self.W
            x = T.ivector('x')
            y = T.ivector('y')
            def forward_prop_step(x_t, s_t_prev, U, V, W):
                s_t = T.tanh(U[:,x_t] + W.dot(s_t_prev))
                o_t = T.nnet.softmax(V.dot(s_t))
                return [o_t[0], s_t]
            [o,s], updates = theano.scan(
                forward_prop_step,
                sequences=x,
                outputs_info=[None, dict(initial=T.zeros(self.hidden_dim))],
                non_sequences=[U, V, W],
                truncate_gradient=self.bptt_truncate,
                strict=True)
    
            prediction = T.argmax(o, axis=1)
            o_error = T.sum(T.nnet.categorical_crossentropy(o, y))
    
            # Gradients
            dU = T.grad(o_error, U)
            dV = T.grad(o_error, V)
            dW = T.grad(o_error, W)
    
            # Assign functions
            self.forward_propagation = theano.function([x], o)
            self.predict = theano.function([x], prediction)
            self.ce_error = theano.function([x, y], o_error)
            self.bptt = theano.function([x, y], [dU, dV, dW])
    
            # SGD
            learning_rate = T.scalar('learning_rate')
            self.sgd_step = theano.function([x,y,learning_rate], [], 
                          updates=[(self.U, self.U - learning_rate * dU),
                                  (self.V, self.V - learning_rate * dV),
                                  (self.W, self.W - learning_rate * dW)])
    
        def calculate_total_loss(self, X, Y):
            return np.sum([self.ce_error(x,y) for x,y in zip(X,Y)])
    
        def calculate_loss(self, X, Y):
            # Divide calculate_loss by the number of words
            num_words = np.sum([len(y) for y in Y])
            return self.calculate_total_loss(X,Y)/float(num_words)
    
        def train_with_sgd(self, X_train, y_train, learning_rate=0.005, nepoch=1, evaluate_loss_after=5):
            # We keep track of the losses so we can plot them later
            losses = []
            num_examples_seen = 0
            for epoch in range(nepoch):
                # Optionally evaluate the loss
                if (epoch % evaluate_loss_after == 0):
                    loss = self.calculate_loss(X_train, y_train)
                    losses.append((num_examples_seen, loss))
                    time = datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
                    print "%s: Loss after num_examples_seen=%d epoch=%d: %f" % (time, num_examples_seen, epoch, loss)
                    # Adjust the learning rate if loss increases
                    if (len(losses) > 1 and losses[-1][1] > losses[-2][1]):
                        learning_rate = learning_rate * 0.5  
                        print "Setting learning rate to %f" % learning_rate
                    sys.stdout.flush()
                    # ADDED! Saving model oarameters
                    save_model_parameters_theano("./data/rnn-theano-%d-%d-%s.npz" % (self.hidden_dim, self.word_dim, time), self)
                # For each training example...
                for i in range(len(y_train)):
                    # One SGD step
                    self.sgd_step(X_train[i], y_train[i], learning_rate)
                    num_examples_seen += 1
    
    
    def gradient_check_theano(model, x, y, h=0.001, error_threshold=0.01):
        # Overwrite the bptt attribute. We need to backpropagate all the way to get the correct gradient
        model.bptt_truncate = 1000
        # Calculate the gradients using backprop
        bptt_gradients = model.bptt(x, y)
        # List of all parameters we want to chec.
        model_parameters = ['U', 'V', 'W']
        # Gradient check for each parameter
        for pidx, pname in enumerate(model_parameters):
            # Get the actual parameter value from the mode, e.g. model.W
            parameter_T = operator.attrgetter(pname)(model)
            parameter = parameter_T.get_value()
            print "Performing gradient check for parameter %s with size %d." % (pname, np.prod(parameter.shape))
            # Iterate over each element of the parameter matrix, e.g. (0,0), (0,1), ...
            it = np.nditer(parameter, flags=['multi_index'], op_flags=['readwrite'])
            while not it.finished:
                ix = it.multi_index
                # Save the original value so we can reset it later
                original_value = parameter[ix]
                # Estimate the gradient using (f(x+h) - f(x-h))/(2*h)
                parameter[ix] = original_value + h
                parameter_T.set_value(parameter)
                gradplus = model.calculate_total_loss([x],[y])
                parameter[ix] = original_value - h
                parameter_T.set_value(parameter)
                gradminus = model.calculate_total_loss([x],[y])
                estimated_gradient = (gradplus - gradminus)/(2*h)
                parameter[ix] = original_value
                parameter_T.set_value(parameter)
                # The gradient for this parameter calculated using backpropagation
                backprop_gradient = bptt_gradients[pidx][ix]
                # calculate The relative error: (|x - y|/(|x| + |y|))
                relative_error = np.abs(backprop_gradient - estimated_gradient)/(np.abs(backprop_gradient) + np.abs(estimated_gradient))
                # If the error is to large fail the gradient check
                if relative_error > error_threshold:
                    print "Gradient Check ERROR: parameter=%s ix=%s" % (pname, ix)
                    print "+h Loss: %f" % gradplus
                    print "-h Loss: %f" % gradminus
                    print "Estimated_gradient: %f" % estimated_gradient
                    print "Backpropagation gradient: %f" % backprop_gradient
                    print "Relative Error: %f" % relative_error
                    return 
                it.iternext()
            print "Gradient check for parameter %s passed." % (pname)

    기타: GRU 버 전의 Theano 코드 참조 github
    RNN Using Keras
    from __future__ import print_function
    from keras.models import Sequential
    from keras.layers import Dense, Activation, Dropout
    from keras.layers import LSTM
    from keras.optimizers import RMSprop
    from keras.utils.data_utils import get_file
    import numpy as np
    import random
    import sys
    
    class RNNKeras:
    
        def __init__(self, sentenceLen, vector_size, output_size, hidden_dim=100):
            # Assign instance variables
            self.sentenceLen = sentenceLen
            self.vector_size = vector_size
            self.output_size = output_size
            self.hidden_dim = hidden_dim
    
            self.__model_build__()
    
        def __model_build__(self):
            self.model = Sequential()
            self.model.add(LSTM(self.output_size, input_shape=(self.sentenceLen, self.vector_size)))
            self.model.add(Dense(self.vector_size)))
            self.model.add(Activation('softmax'))
    
            optimizer = RMSprop(lr=0.01)
            self.model.compile(loss='categorical_crossentropy', optimizer=optimizer)
    
        def train_model(self, X, y, batchSize=128, nepoch=1):
            self.model.fit(X, y, batch_size=batchSize, nb_epoch=nepoch)
    
        def predict(self, x):
            return model.predict(x, verbose=0)[0]

    더 많은 코드 참조 github
    후기
    최근 몇 년 간 딥 러 닝 연구 와 응용 이 뜨 거 워 지면 서 CNN, RNN 이 등장 하면 서 DBN 과 SAE 를 연구 하 는 사람 도 점점 줄 어 들 고 있다.하지만 신경 망 을 잘 써 서 DBN 과 SAE 를 알 아 보 려 면 필요 하 다. 나 도 시간 을 내 서 CNN 을 다시 배 워 야 한다. 시간 이 있 으 면 이 편 을 정리 하고 설명 문 자 를 더 해 야 한다.
    또한 처음부터 케 라 스 라 는 봉 인 된 라 이브 러 리 를 그대로 사용 하지 말고 RNN 저층 의 원리 와 계산 공식 을 먼저 알 아야 RNN 에 대해 더욱 투철 하 게 파악 할 수 있다.그리고 이런 패 키 징 라 이브 러 리 는 만능 이 아니다. 모델 이 비교적 복잡 할 때 일부 기능 은 이런 고도 로 포 장 된 라 이브 러 리 를 통 해 실현 할 수 없 는 것 이 냐, 아니면 theano 나 tensorflow 를 통 해 스스로 실현 해 야 하 는 것 이 냐?

    좋은 웹페이지 즐겨찾기