python 은 RNN 을 사용 하여 텍스트 분 류 를 실현 합 니 다.
1.본 블 로그 프로젝트 의 유래 는 oxford 의 nlp 깊이 학습 과정 3 주차 숙제 이 고 작업 은 LSTM 를 사용 하여 텍스트 분 류 를 해 야 합 니 다.이전 CNN 텍스트 와 유사 하 게 분 류 된 이 코드 스타일 도 sklearn 스타일 을 모방 하여 3 단계 걷 기 형식(모델 실체 화,모델 훈련 과 모델 예측)이지 만 훈련 시간 이 오래 되 어 언제 훈련 하 는 것 이 이상 적 인지 모 르 기 때문에 다음 에 계속 훈련 하 는 기능 을 추가 했다.
2.구조 텍스트 분류의 rnn 클래스,(파일 을 ClassifierRNN.py 로 저장)
2.1 해당 설정 매개 변 수 는 비교적 번 거 롭 고 읽 기 에 불리 하기 때문에 tensor flow 소스 코드 형식 을 모방 하여 코드 를 네트워크 설정 매개 변수 N 로 나눈다.config 와 계산 설정 파라미터:calcconfig,대응 하 는 클래스:NNconfig,CALC_config。
2.2 ClassifierRNN 클래스 를 설명 합 니 다.이 클래스 의 주요 함 수 는 다음 과 같 습 니 다.(init,buildinputs, build_rnns, build_loss, build_optimizer, random_batches,fit, load_model, predict_정확도,predict),코드 는 다음 과 같 습 니 다.
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import os
import time
class NN_config(object):
def __init__(self,num_seqs=1000,num_steps=10,num_units=128,num_classes = 8,\
num_layers = 1,embedding_size=100,vocab_size = 10000,\
use_embeddings=False,embedding_init=None):
self.num_seqs = num_seqs
self.num_steps = num_steps
self.num_units = num_units
self.num_classes = num_classes
self.num_layers = num_layers
self.vocab_size = vocab_size
self.embedding_size = embedding_size
self.use_embeddings = use_embeddings
self.embedding_init = embedding_init
class CALC_config(object):
def __init__(self,batch_size=64,num_epoches = 20,learning_rate = 1.0e-3, \
keep_prob=0.5,show_every_steps = 10,save_every_steps=100):
self.batch_size = batch_size
self.num_epoches = num_epoches
self.learning_rate = learning_rate
self.keep_prob = keep_prob
self.show_every_steps = show_every_steps
self.save_every_steps = save_every_steps
class ClassifierRNN(object):
def __init__(self, nn_config, calc_config):
# assign revalent parameters
self.num_seqs = nn_config.num_seqs
self.num_steps = nn_config.num_steps
self.num_units = nn_config.num_units
self.num_layers = nn_config.num_layers
self.num_classes = nn_config.num_classes
self.embedding_size = nn_config.embedding_size
self.vocab_size = nn_config.vocab_size
self.use_embeddings = nn_config.use_embeddings
self.embedding_init = nn_config.embedding_init
# assign calc ravalant values
self.batch_size = calc_config.batch_size
self.num_epoches = calc_config.num_epoches
self.learning_rate = calc_config.learning_rate
self.train_keep_prob= calc_config.keep_prob
self.show_every_steps = calc_config.show_every_steps
self.save_every_steps = calc_config.save_every_steps
# create networks models
tf.reset_default_graph()
self.build_inputs()
self.build_rnns()
self.build_loss()
self.build_optimizer()
self.saver = tf.train.Saver()
def build_inputs(self):
with tf.name_scope('inputs'):
self.inputs = tf.placeholder(tf.int32, shape=[None,self.num_seqs],\
name='inputs')
self.targets = tf.placeholder(tf.int32, shape=[None, self.num_classes],\
name='classes')
self.keep_prob = tf.placeholder(tf.float32,name='keep_prob')
self.embedding_ph = tf.placeholder(tf.float32, name='embedding_ph')
if self.use_embeddings == False:
self.embeddings = tf.Variable(tf.random_uniform([self.vocab_size,\
self.embedding_size],-0.1,0.1),name='embedding_flase')
self.rnn_inputs = tf.nn.embedding_lookup(self.embeddings,self.inputs)
else:
embeddings = tf.Variable(tf.constant(0.0,shape=[self.vocab_size,self.embedding_size]),\
trainable=False,name='embeddings_true')
self.embeddings = embeddings.assign(self.embedding_ph)
self.rnn_inputs = tf.nn.embedding_lookup(self.embeddings,self.inputs)
print('self.rnn_inputs.shape:',self.rnn_inputs.shape)
def build_rnns(self):
def get_a_cell(num_units,keep_prob):
rnn_cell = tf.contrib.rnn.BasicLSTMCell(num_units=num_units)
drop = tf.contrib.rnn.DropoutWrapper(rnn_cell, output_keep_prob=keep_prob)
return drop
with tf.name_scope('rnns'):
self.cell = tf.contrib.rnn.MultiRNNCell([get_a_cell(self.num_units,self.keep_prob) for _ in range(self.num_layers)])
self.initial_state = self.cell.zero_state(self.batch_size,tf.float32)
self.outputs, self.final_state = tf.nn.dynamic_rnn(self.cell,tf.cast(self.rnn_inputs,tf.float32),\
initial_state = self.initial_state )
print('rnn_outputs',self.outputs.shape)
def build_loss(self):
with tf.name_scope('loss'):
self.logits = tf.contrib.layers.fully_connected(inputs = tf.reduce_mean(self.outputs, axis=1), \
num_outputs = self.num_classes, activation_fn = None)
print('self.logits.shape:',self.logits.shape)
self.cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=self.logits,\
labels = self.targets))
print('self.cost.shape',self.cost.shape)
self.predictions = self.logits
self.correct_predictions = tf.equal(tf.argmax(self.predictions, axis=1), tf.argmax(self.targets, axis=1))
self.accuracy = tf.reduce_mean(tf.cast(self.correct_predictions,tf.float32))
print(self.cost.shape)
print(self.correct_predictions.shape)
def build_optimizer(self):
with tf.name_scope('optimizer'):
self.optimizer = tf.train.AdamOptimizer(self.learning_rate).minimize(self.cost)
def random_batches(self,data,shuffle=True):
data = np.array(data)
data_size = len(data)
num_batches_per_epoch = int(data_size/self.batch_size)
#del data
for epoch in range(self.num_epoches):
if shuffle :
shuffle_index = np.random.permutation(np.arange(data_size))
shuffled_data = data[shuffle_index]
else:
shuffled_data = data
for batch_num in range(num_batches_per_epoch):
start = batch_num * self.batch_size
end = min(start + self.batch_size,data_size)
yield shuffled_data[start:end]
def fit(self,data,restart=False):
if restart :
self.load_model()
else:
self.session = tf.Session()
self.session.run(tf.global_variables_initializer())
with self.session as sess:
step = 0
accuracy_list = []
# model saving
save_path = os.path.abspath(os.path.join(os.path.curdir, 'models'))
if not os.path.exists(save_path):
os.makedirs(save_path)
plt.ion()
#new_state = sess.run(self.initial_state)
new_state = sess.run(self.initial_state)
batches = self.random_batches(data)
for batch in batches:
x,y = zip(*batch)
x = np.array(x)
y = np.array(y)
print(len(x),len(y),step)
step += 1
start = time.time()
if self.use_embeddings == False:
feed = {self.inputs :x,
self.targets:y,
self.keep_prob : self.train_keep_prob,
self.initial_state: new_state}
else:
feed = {self.inputs :x,
self.targets:y,
self.keep_prob : self.train_keep_prob,
self.initial_state: new_state,
self.embedding_ph: self.embedding_init}
batch_loss, new_state, batch_accuracy , _ = sess.run([self.cost,self.final_state,\
self.accuracy, self.optimizer],feed_dict = feed)
end = time.time()
accuracy_list.append(batch_accuracy)
# control the print lines
if step%self.show_every_steps == 0:
print('steps/epoch:{}/{}...'.format(step,self.num_epoches),
'loss:{:.4f}...'.format(batch_loss),
'{:.4f} sec/batch'.format((end - start)),
'batch_Accuracy:{:.4f}...'.format(batch_accuracy)
)
plt.plot(accuracy_list)
plt.pause(0.5)
if step%self.save_every_steps == 0:
self.saver.save(sess,os.path.join(save_path, 'model') ,global_step = step)
self.saver.save(sess, os.path.join(save_path, 'model'), global_step=step)
def load_model(self, start_path=None):
if start_path == None:
model_path = os.path.abspath(os.path.join(os.path.curdir,"models"))
ckpt = tf.train.get_checkpoint_state(model_path)
path = ckpt.model_checkpoint_path
print("this is the start path of model:",path)
self.session = tf.Session()
self.saver.restore(self.session, path)
print("Restored model parameters is complete!")
else:
self.session = tf.Session()
self.saver.restore(self.session,start_path)
print("Restored model parameters is complete!")
def predict_accuracy(self,data,test=True):
# loading_model
self.load_model()
sess = self.session
iterations = 0
accuracy_list = []
predictions = []
epoch_temp = self.num_epoches
self.num_epoches = 1
batches = self.random_batches(data,shuffle=False)
for batch in batches:
iterations += 1
x_inputs, y_inputs = zip(*batch)
x_inputs = np.array(x_inputs)
y_inputs = np.array(y_inputs)
if self.use_embeddings == False:
feed = {self.inputs: x_inputs,
self.targets: y_inputs,
self.keep_prob: 1.0}
else:
feed = {self.inputs: x_inputs,
self.targets: y_inputs,
self.keep_prob: 1.0,
self.embedding_ph: self.embedding_init}
to_train = [self.cost, self.final_state, self.predictions,self.accuracy]
batch_loss,new_state,batch_pred,batch_accuracy = sess.run(to_train, feed_dict = feed)
accuracy_list.append(np.mean(batch_accuracy))
predictions.append(batch_pred)
print('The trainning step is {0}'.format(iterations),\
'trainning_accuracy: {:.3f}'.format(accuracy_list[-1]))
accuracy = np.mean(accuracy_list)
predictions = [list(pred) for pred in predictions]
predictions = [p for pred in predictions for p in pred]
predictions = np.array(predictions)
self.num_epoches = epoch_temp
if test :
return predictions, accuracy
else:
return accuracy
def predict(self, data):
# load_model
self.load_model()
sess = self.session
iterations = 0
predictionss = []
epoch_temp = self.num_epoches
self.num_epoches = 1
batches = self.random_batches(data)
for batch in batches:
x_inputs = batch
if self.use_embeddings == False:
feed = {self.inputs : x_inputs,
self.keep_prob:1.0}
else:
feed = {self.inputs : x_inputs,
self.keep_prob:1.0,
self.embedding_ph: self.embedding_init}
batch_pred = sess.run([self.predictions],feed_dict=feed)
predictions.append(batch_pred)
predictions = [list(pred) for pred in predictions]
predictions = [p for pred in predictions for p in pred]
predictions = np.array(predictions)
return predictions
3.모델 데이터 가 져 오기 및 처리 및 모델 훈련 을 실시 하여 하나의 처리 파일 에 집중(samplingtrainning.py) 해당 코드 는 다음 과 같 습 니 다.
ps:아래 문 서 는 glove 문 서 를 사용 합 니 다.이 문 서 는 인터넷 검색 을 통 해 해당 하 는 다운 로드 를 할 수 있 습 니 다.다운로드 후 glove 에 대응 하 는 생 성 형식 을 word2vec 에 대응 하 는 형식 으로 바 꿔 야 합 니 다.즉,파일 의 첫 번 째 단계 에 두 개의 정수(사전 의 수량 과 끼 워 넣 은 특징 길이)를 추가 하고 python 라 이브 러 리 자체 의 전환 도 구 를 사용 할 수 있 습 니 다.인터넷 에서 해당 하 는 사용 방법 을 검색 하면 된다.
import numpy as np
import os
import time
import matplotlib.pyplot as plt
import tensorflow as tf
import re
import urllib.request
import zipfile
import lxml.etree
from collections import Counter
from random import shuffle
from gensim.models import KeyedVectors
# Download the dataset if it's not already there
if not os.path.isfile('ted_en-20160408.zip'):
urllib.request.urlretrieve("https://wit3.fbk.eu/get.php?path=XML_releases/xml/ted_en-20160408.zip&filename=ted_en-20160408.zip", filename="ted_en-20160408.zip")
# extract both the texts and the labels from the xml file
with zipfile.ZipFile('ted_en-20160408.zip', 'r') as z:
doc = lxml.etree.parse(z.open('ted_en-20160408.xml', 'r'))
texts = doc.xpath('//content/text()')
labels = doc.xpath('//head/keywords/text()')
del doc
print("There are {} input texts, each a long string with text and punctuation.".format(len(texts)))
print("")
print(texts[0][:100])
# method remove unused words and labels
inputs_text = [ re.sub(r'\([^)]*\)',' ', text) for text in texts]
inputs_text = [re.sub(r':', ' ', text) for text in inputs_text]
#inputs_text = [text.split() for text in inputs_text]
print(inputs_text[0][0:100])
inputs_text = [ text.lower() for text in texts]
inputs_text = [ re.sub(r'([^a-z0-9\s])', r' <\1_token> ',text) for text in inputs_text]
#input_texts = [re.sub(r'([^a-z0-9\s])', r' <\1_token> ', input_text) for input_text in input_texts]
inputs_text = [text.split() for text in inputs_text]
print(inputs_text[0][0:100])
# label procession
label_lookup = ['ooo','Too','oEo','ooD','TEo','ToD','oED','TED']
new_label = []
for i in range(len(labels)):
labels_pre = ['o','o','o']
label = labels[i].split(', ')
#print(label,i)
if 'technology' in label:
labels_pre[0] = 'T'
if 'entertainment' in label:
labels_pre[1] = 'E'
if 'design' in label:
labels_pre[2] = 'D'
labels_temp = ''.join(labels_pre)
label_index = label_lookup.index(labels_temp)
new_label.append(label_index)
print('the length of labels:{0}'.format(len(new_label)))
print(new_label[0:50])
labels_index = np.zeros((len(new_label),8))
#for i in range(labels_index.shape[0]):
# labels_index[i,new_label[i]] = 1
labels_index[range(len(new_label)),new_label] = 1.0
print(labels_index[0:10])
# feature selections
unions = list(zip(inputs_text,labels_index))
unions = [union for union in unions if len(union[0]) >300]
print(len(unions))
inputs_text, labels_index = zip(*unions)
inputs_text = list(inputs_text)
labels = list(labels_index)
print(inputs_text[0][0:50])
print(labels_index[0:10])
# feature filttering
all_context = [word for text in inputs_text for word in text]
print('the present datas word is :{0}'.format(len(all_context)))
words_count = Counter(all_context)
most_words = [word for word, count in words_count.most_common(50)]
once_words = [word for word, count in words_count.most_common() if count == 1]
print('there {0} words only once to be removed'.format(len(once_words)))
print(most_words)
#print(once_words)
remove_words = set(most_words + once_words)
#print(remove_words)
inputs_new = [[word for word in text if word not in remove_words] for text in inputs_text]
new_all_counts =[word for text in inputs_new for word in text]
print('there new all context length is:{0}'.format(len(new_all_counts)))
# word2index and index2word processings
words_voca = set([word for text in inputs_new for word in text])
word2index = {}
index2word = {}
for i, word in enumerate(words_voca):
word2index[word] = i
index2word[i] = word
inputs_index = []
for text in inputs_new:
inputs_index.append([word2index[word] for word in text])
print(len(inputs_index))
print(inputs_index[0][0:100])
model_glove = KeyedVectors.load_word2vec_format('glove.6B.300d.txt', binary=False)
n_features = 300
embeddings = np.random.uniform(-0.1,0.1,(len(word2index),n_features))
inwords = 0
for word in words_voca:
if word in model_glove.vocab:
inwords += 1
embeddings[word2index[word]] = model_glove[word]
print('there {} words in model_glove'.format(inwords))
print('The voca_word in presents text is:{0}'.format(len(words_voca)))
print('the precentage of words in glove is:{0}'.format(np.float(inwords)/len(words_voca)))
# truncate the sequence length
max_length = 1000
inputs_concat = []
for text in inputs_index:
if len(text)>max_length:
inputs_concat.append(text[0:max_length])
else:
inputs_concat.append(text + [0]*(max_length-len(text)))
print(len(inputs_concat))
inputs_index = inputs_concat
print(len(inputs_index))
# sampling the train data use category sampling
num_class = 8
label_unions = list(zip(inputs_index,labels_index))
print(len(label_unions))
trains = []
devs = []
tests = []
for c in range(num_class):
type_sample = [union for union in label_unions if np.argmax(union[1]) == c]
print('the length of this type length',len(type_sample),c)
shuffle(type_sample)
num_all = len(type_sample)
num_train = int(num_all*0.8)
num_dev = int(num_all*0.9)
trains.extend(type_sample[0:num_train])
devs.extend(type_sample[num_train:num_dev])
tests.extend(type_sample[num_dev:num_all])
shuffle(trains)
shuffle(devs)
shuffle(tests)
print('the length of trains is:{0}'.format(len(trains)))
print('the length of devs is:{0}'.format(len(devs)))
print('the length of tests is:{0}'.format(len(tests)))
#--------------------------------------------------------------------
#------------------------ model processing --------------------------
#--------------------------------------------------------------------
from ClassifierRNN import NN_config,CALC_config,ClassifierRNN
# parameters used by rnns
num_layers = 1
num_units = 60
num_seqs = 1000
step_length = 10
num_steps = int(num_seqs/step_length)
embedding_size = 300
num_classes = 8
n_words = len(words_voca)
# parameters used by trainning models
batch_size = 64
num_epoch = 100
learning_rate = 0.0075
show_every_epoch = 10
nn_config = NN_config(num_seqs =num_seqs,\
num_steps = num_steps,\
num_units = num_units,\
num_classes = num_classes,\
num_layers = num_layers,\
vocab_size = n_words,\
embedding_size = embedding_size,\
use_embeddings = False,\
embedding_init = embeddings)
calc_config = CALC_config(batch_size = batch_size,\
num_epoches = num_epoch,\
learning_rate = learning_rate,\
show_every_steps = 10,\
save_every_steps = 100)
print("this is checking of nn_config:\\
",
"out of num_seqs:{}
".format(nn_config.num_seqs),
"out of num_steps:{}
".format(nn_config.num_steps),
"out of num_units:{}
".format(nn_config.num_units),
"out of num_classes:{}
".format(nn_config.num_classes),
"out of num_layers:{}
".format(nn_config.num_layers),
"out of vocab_size:{}
".format(nn_config.vocab_size),
"out of embedding_size:{}
".format(nn_config.embedding_size),
"out of use_embeddings:{}
".format(nn_config.use_embeddings))
print("this is checing of calc_config: \\
",
"out of batch_size {}
".format(calc_config.batch_size),
"out of num_epoches {}
".format(calc_config.num_epoches),
"out of learning_rate {}
".format(calc_config.learning_rate),
"out of keep_prob {}
".format(calc_config.keep_prob),
"out of show_every_steps {}
".format(calc_config.show_every_steps),
"out of save_every_steps {}
".format(calc_config.save_every_steps))
rnn_model = ClassifierRNN(nn_config,calc_config)
rnn_model.fit(trains,restart=False)
accuracy = rnn_model.predict_accuracy(devs,test=False)
print("Final accuracy of devs is {}".format(accuracy))
test_accuracy = rnn_model.predict_accuracy(tests,test=False)
print("The final accuracy of tests is :{}".format(test_accuracy))
4.모델 평 가 는 이번 사례 에서 모델 데이터 가 비교적 적 고 모두 2000 여 개의 견본 이 있 으 며 상대 적 으로 적 기 때문에 적합 한 상태 가 나타 날 수 밖 에 없다.rnn 은 trains 샘플 을 훈련 할 때 정확 율 은 1.0 에 가 깝 지만 devs 와 tests 집합 검증 을 할 때 정확 율 은 6.0 정도 이 고 l2 를 적당 하 게 증가 할 수 있 으 나 본 사례 의 고려 범위 에 있 지 않다.이 모델 을 IMDB 산 례 계산 에 사 용 했 을 때 2 천 500 개 견본 에 해당 할 때의 정확 도 는 89.0%정 도 였 다.이상 이 바로 본 고의 모든 내용 입 니 다.여러분 의 학습 에 도움 이 되 고 저 희 를 많이 응원 해 주 셨 으 면 좋 겠 습 니 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
로마 숫자를 정수로 또는 그 반대로 변환그 중 하나는 로마 숫자를 정수로 변환하는 함수를 만드는 것이었고 두 번째는 그 반대를 수행하는 함수를 만드는 것이었습니다. 문자만 포함합니다'I', 'V', 'X', 'L', 'C', 'D', 'M' ; 문자열이 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.