python使用RNN實現(xiàn)文本分類
本文實例為大家分享了使用RNN進行文本分類,python代碼實現(xiàn),供大家參考,具體內(nèi)容如下
1、本博客項目由來是oxford 的nlp 深度學(xué)習(xí)課程第三周作業(yè),作業(yè)要求使用LSTM進行文本分類。和上一篇CNN文本分類類似,本此代碼風(fēng)格也是仿照sklearn風(fēng)格,三步走形式(模型實體化,模型訓(xùn)練和模型預(yù)測)但因為訓(xùn)練時間較久不知道什么時候訓(xùn)練比較理想,因此在次基礎(chǔ)上加入了繼續(xù)訓(xùn)練的功能。
2、構(gòu)造文本分類的rnn類,(保存文件為ClassifierRNN.py)
2.1 相應(yīng)配置參數(shù)因為較為繁瑣,不利于閱讀,因此仿照tensorflow源碼形式,將代碼分成 網(wǎng)絡(luò)配置參數(shù) nn_config 和計算配置參數(shù): calc_config,也相應(yīng)聲明了其對應(yīng)的類:NN_config,CALC_config。
2.2 聲明 ClassifierRNN類,該類的主要函數(shù)有:(init, build_inputs, build_rnns, build_loss, build_optimizer, random_batches,fit, load_model, predict_accuracy, predict),代碼如下:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import os
import time
class NN_config(object):
def __init__(self,num_seqs=1000,num_steps=10,num_units=128,num_classes = 8,\
num_layers = 1,embedding_size=100,vocab_size = 10000,\
use_embeddings=False,embedding_init=None):
self.num_seqs = num_seqs
self.num_steps = num_steps
self.num_units = num_units
self.num_classes = num_classes
self.num_layers = num_layers
self.vocab_size = vocab_size
self.embedding_size = embedding_size
self.use_embeddings = use_embeddings
self.embedding_init = embedding_init
class CALC_config(object):
def __init__(self,batch_size=64,num_epoches = 20,learning_rate = 1.0e-3, \
keep_prob=0.5,show_every_steps = 10,save_every_steps=100):
self.batch_size = batch_size
self.num_epoches = num_epoches
self.learning_rate = learning_rate
self.keep_prob = keep_prob
self.show_every_steps = show_every_steps
self.save_every_steps = save_every_steps
class ClassifierRNN(object):
def __init__(self, nn_config, calc_config):
# assign revalent parameters
self.num_seqs = nn_config.num_seqs
self.num_steps = nn_config.num_steps
self.num_units = nn_config.num_units
self.num_layers = nn_config.num_layers
self.num_classes = nn_config.num_classes
self.embedding_size = nn_config.embedding_size
self.vocab_size = nn_config.vocab_size
self.use_embeddings = nn_config.use_embeddings
self.embedding_init = nn_config.embedding_init
# assign calc ravalant values
self.batch_size = calc_config.batch_size
self.num_epoches = calc_config.num_epoches
self.learning_rate = calc_config.learning_rate
self.train_keep_prob= calc_config.keep_prob
self.show_every_steps = calc_config.show_every_steps
self.save_every_steps = calc_config.save_every_steps
# create networks models
tf.reset_default_graph()
self.build_inputs()
self.build_rnns()
self.build_loss()
self.build_optimizer()
self.saver = tf.train.Saver()
def build_inputs(self):
with tf.name_scope('inputs'):
self.inputs = tf.placeholder(tf.int32, shape=[None,self.num_seqs],\
name='inputs')
self.targets = tf.placeholder(tf.int32, shape=[None, self.num_classes],\
name='classes')
self.keep_prob = tf.placeholder(tf.float32,name='keep_prob')
self.embedding_ph = tf.placeholder(tf.float32, name='embedding_ph')
if self.use_embeddings == False:
self.embeddings = tf.Variable(tf.random_uniform([self.vocab_size,\
self.embedding_size],-0.1,0.1),name='embedding_flase')
self.rnn_inputs = tf.nn.embedding_lookup(self.embeddings,self.inputs)
else:
embeddings = tf.Variable(tf.constant(0.0,shape=[self.vocab_size,self.embedding_size]),\
trainable=False,name='embeddings_true')
self.embeddings = embeddings.assign(self.embedding_ph)
self.rnn_inputs = tf.nn.embedding_lookup(self.embeddings,self.inputs)
print('self.rnn_inputs.shape:',self.rnn_inputs.shape)
def build_rnns(self):
def get_a_cell(num_units,keep_prob):
rnn_cell = tf.contrib.rnn.BasicLSTMCell(num_units=num_units)
drop = tf.contrib.rnn.DropoutWrapper(rnn_cell, output_keep_prob=keep_prob)
return drop
with tf.name_scope('rnns'):
self.cell = tf.contrib.rnn.MultiRNNCell([get_a_cell(self.num_units,self.keep_prob) for _ in range(self.num_layers)])
self.initial_state = self.cell.zero_state(self.batch_size,tf.float32)
self.outputs, self.final_state = tf.nn.dynamic_rnn(self.cell,tf.cast(self.rnn_inputs,tf.float32),\
initial_state = self.initial_state )
print('rnn_outputs',self.outputs.shape)
def build_loss(self):
with tf.name_scope('loss'):
self.logits = tf.contrib.layers.fully_connected(inputs = tf.reduce_mean(self.outputs, axis=1), \
num_outputs = self.num_classes, activation_fn = None)
print('self.logits.shape:',self.logits.shape)
self.cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=self.logits,\
labels = self.targets))
print('self.cost.shape',self.cost.shape)
self.predictions = self.logits
self.correct_predictions = tf.equal(tf.argmax(self.predictions, axis=1), tf.argmax(self.targets, axis=1))
self.accuracy = tf.reduce_mean(tf.cast(self.correct_predictions,tf.float32))
print(self.cost.shape)
print(self.correct_predictions.shape)
def build_optimizer(self):
with tf.name_scope('optimizer'):
self.optimizer = tf.train.AdamOptimizer(self.learning_rate).minimize(self.cost)
def random_batches(self,data,shuffle=True):
data = np.array(data)
data_size = len(data)
num_batches_per_epoch = int(data_size/self.batch_size)
#del data
for epoch in range(self.num_epoches):
if shuffle :
shuffle_index = np.random.permutation(np.arange(data_size))
shuffled_data = data[shuffle_index]
else:
shuffled_data = data
for batch_num in range(num_batches_per_epoch):
start = batch_num * self.batch_size
end = min(start + self.batch_size,data_size)
yield shuffled_data[start:end]
def fit(self,data,restart=False):
if restart :
self.load_model()
else:
self.session = tf.Session()
self.session.run(tf.global_variables_initializer())
with self.session as sess:
step = 0
accuracy_list = []
# model saving
save_path = os.path.abspath(os.path.join(os.path.curdir, 'models'))
if not os.path.exists(save_path):
os.makedirs(save_path)
plt.ion()
#new_state = sess.run(self.initial_state)
new_state = sess.run(self.initial_state)
batches = self.random_batches(data)
for batch in batches:
x,y = zip(*batch)
x = np.array(x)
y = np.array(y)
print(len(x),len(y),step)
step += 1
start = time.time()
if self.use_embeddings == False:
feed = {self.inputs :x,
self.targets:y,
self.keep_prob : self.train_keep_prob,
self.initial_state: new_state}
else:
feed = {self.inputs :x,
self.targets:y,
self.keep_prob : self.train_keep_prob,
self.initial_state: new_state,
self.embedding_ph: self.embedding_init}
batch_loss, new_state, batch_accuracy , _ = sess.run([self.cost,self.final_state,\
self.accuracy, self.optimizer],feed_dict = feed)
end = time.time()
accuracy_list.append(batch_accuracy)
# control the print lines
if step%self.show_every_steps == 0:
print('steps/epoch:{}/{}...'.format(step,self.num_epoches),
'loss:{:.4f}...'.format(batch_loss),
'{:.4f} sec/batch'.format((end - start)),
'batch_Accuracy:{:.4f}...'.format(batch_accuracy)
)
plt.plot(accuracy_list)
plt.pause(0.5)
if step%self.save_every_steps == 0:
self.saver.save(sess,os.path.join(save_path, 'model') ,global_step = step)
self.saver.save(sess, os.path.join(save_path, 'model'), global_step=step)
def load_model(self, start_path=None):
if start_path == None:
model_path = os.path.abspath(os.path.join(os.path.curdir,"models"))
ckpt = tf.train.get_checkpoint_state(model_path)
path = ckpt.model_checkpoint_path
print("this is the start path of model:",path)
self.session = tf.Session()
self.saver.restore(self.session, path)
print("Restored model parameters is complete!")
else:
self.session = tf.Session()
self.saver.restore(self.session,start_path)
print("Restored model parameters is complete!")
def predict_accuracy(self,data,test=True):
# loading_model
self.load_model()
sess = self.session
iterations = 0
accuracy_list = []
predictions = []
epoch_temp = self.num_epoches
self.num_epoches = 1
batches = self.random_batches(data,shuffle=False)
for batch in batches:
iterations += 1
x_inputs, y_inputs = zip(*batch)
x_inputs = np.array(x_inputs)
y_inputs = np.array(y_inputs)
if self.use_embeddings == False:
feed = {self.inputs: x_inputs,
self.targets: y_inputs,
self.keep_prob: 1.0}
else:
feed = {self.inputs: x_inputs,
self.targets: y_inputs,
self.keep_prob: 1.0,
self.embedding_ph: self.embedding_init}
to_train = [self.cost, self.final_state, self.predictions,self.accuracy]
batch_loss,new_state,batch_pred,batch_accuracy = sess.run(to_train, feed_dict = feed)
accuracy_list.append(np.mean(batch_accuracy))
predictions.append(batch_pred)
print('The trainning step is {0}'.format(iterations),\
'trainning_accuracy: {:.3f}'.format(accuracy_list[-1]))
accuracy = np.mean(accuracy_list)
predictions = [list(pred) for pred in predictions]
predictions = [p for pred in predictions for p in pred]
predictions = np.array(predictions)
self.num_epoches = epoch_temp
if test :
return predictions, accuracy
else:
return accuracy
def predict(self, data):
# load_model
self.load_model()
sess = self.session
iterations = 0
predictionss = []
epoch_temp = self.num_epoches
self.num_epoches = 1
batches = self.random_batches(data)
for batch in batches:
x_inputs = batch
if self.use_embeddings == False:
feed = {self.inputs : x_inputs,
self.keep_prob:1.0}
else:
feed = {self.inputs : x_inputs,
self.keep_prob:1.0,
self.embedding_ph: self.embedding_init}
batch_pred = sess.run([self.predictions],feed_dict=feed)
predictions.append(batch_pred)
predictions = [list(pred) for pred in predictions]
predictions = [p for pred in predictions for p in pred]
predictions = np.array(predictions)
return predictions
3、 進行模型數(shù)據(jù)的導(dǎo)入以及處理和模型訓(xùn)練,集中在一個處理文件中(sampling_trainning.py)
相應(yīng)代碼如下:
ps:在下面文檔用用到glove的文檔,這個可網(wǎng)上搜索進行相應(yīng)的下載,下載后需要將glove對應(yīng)的生成格式轉(zhuǎn)化成word2vec對應(yīng)的格式,就是在文件頭步加入一行 兩個整數(shù)(字典的數(shù)目和嵌入的特征長度),也可用python庫自帶的轉(zhuǎn)化工具,網(wǎng)上進行相應(yīng)使用方法的搜索便可。
import numpy as np
import os
import time
import matplotlib.pyplot as plt
import tensorflow as tf
import re
import urllib.request
import zipfile
import lxml.etree
from collections import Counter
from random import shuffle
from gensim.models import KeyedVectors
# Download the dataset if it's not already there
if not os.path.isfile('ted_en-20160408.zip'):
urllib.request.urlretrieve("https://wit3.fbk.eu/get.php?path=XML_releases/xml/ted_en-20160408.zip&filename=ted_en-20160408.zip", filename="ted_en-20160408.zip")
# extract both the texts and the labels from the xml file
with zipfile.ZipFile('ted_en-20160408.zip', 'r') as z:
doc = lxml.etree.parse(z.open('ted_en-20160408.xml', 'r'))
texts = doc.xpath('//content/text()')
labels = doc.xpath('//head/keywords/text()')
del doc
print("There are {} input texts, each a long string with text and punctuation.".format(len(texts)))
print("")
print(texts[0][:100])
# method remove unused words and labels
inputs_text = [ re.sub(r'\([^)]*\)',' ', text) for text in texts]
inputs_text = [re.sub(r':', ' ', text) for text in inputs_text]
#inputs_text = [text.split() for text in inputs_text]
print(inputs_text[0][0:100])
inputs_text = [ text.lower() for text in texts]
inputs_text = [ re.sub(r'([^a-z0-9\s])', r' <\1_token> ',text) for text in inputs_text]
#input_texts = [re.sub(r'([^a-z0-9\s])', r' <\1_token> ', input_text) for input_text in input_texts]
inputs_text = [text.split() for text in inputs_text]
print(inputs_text[0][0:100])
# label procession
label_lookup = ['ooo','Too','oEo','ooD','TEo','ToD','oED','TED']
new_label = []
for i in range(len(labels)):
labels_pre = ['o','o','o']
label = labels[i].split(', ')
#print(label,i)
if 'technology' in label:
labels_pre[0] = 'T'
if 'entertainment' in label:
labels_pre[1] = 'E'
if 'design' in label:
labels_pre[2] = 'D'
labels_temp = ''.join(labels_pre)
label_index = label_lookup.index(labels_temp)
new_label.append(label_index)
print('the length of labels:{0}'.format(len(new_label)))
print(new_label[0:50])
labels_index = np.zeros((len(new_label),8))
#for i in range(labels_index.shape[0]):
# labels_index[i,new_label[i]] = 1
labels_index[range(len(new_label)),new_label] = 1.0
print(labels_index[0:10])
# feature selections
unions = list(zip(inputs_text,labels_index))
unions = [union for union in unions if len(union[0]) >300]
print(len(unions))
inputs_text, labels_index = zip(*unions)
inputs_text = list(inputs_text)
labels = list(labels_index)
print(inputs_text[0][0:50])
print(labels_index[0:10])
# feature filttering
all_context = [word for text in inputs_text for word in text]
print('the present datas word is :{0}'.format(len(all_context)))
words_count = Counter(all_context)
most_words = [word for word, count in words_count.most_common(50)]
once_words = [word for word, count in words_count.most_common() if count == 1]
print('there {0} words only once to be removed'.format(len(once_words)))
print(most_words)
#print(once_words)
remove_words = set(most_words + once_words)
#print(remove_words)
inputs_new = [[word for word in text if word not in remove_words] for text in inputs_text]
new_all_counts =[word for text in inputs_new for word in text]
print('there new all context length is:{0}'.format(len(new_all_counts)))
# word2index and index2word processings
words_voca = set([word for text in inputs_new for word in text])
word2index = {}
index2word = {}
for i, word in enumerate(words_voca):
word2index[word] = i
index2word[i] = word
inputs_index = []
for text in inputs_new:
inputs_index.append([word2index[word] for word in text])
print(len(inputs_index))
print(inputs_index[0][0:100])
model_glove = KeyedVectors.load_word2vec_format('glove.6B.300d.txt', binary=False)
n_features = 300
embeddings = np.random.uniform(-0.1,0.1,(len(word2index),n_features))
inwords = 0
for word in words_voca:
if word in model_glove.vocab:
inwords += 1
embeddings[word2index[word]] = model_glove[word]
print('there {} words in model_glove'.format(inwords))
print('The voca_word in presents text is:{0}'.format(len(words_voca)))
print('the precentage of words in glove is:{0}'.format(np.float(inwords)/len(words_voca)))
# truncate the sequence length
max_length = 1000
inputs_concat = []
for text in inputs_index:
if len(text)>max_length:
inputs_concat.append(text[0:max_length])
else:
inputs_concat.append(text + [0]*(max_length-len(text)))
print(len(inputs_concat))
inputs_index = inputs_concat
print(len(inputs_index))
# sampling the train data use category sampling
num_class = 8
label_unions = list(zip(inputs_index,labels_index))
print(len(label_unions))
trains = []
devs = []
tests = []
for c in range(num_class):
type_sample = [union for union in label_unions if np.argmax(union[1]) == c]
print('the length of this type length',len(type_sample),c)
shuffle(type_sample)
num_all = len(type_sample)
num_train = int(num_all*0.8)
num_dev = int(num_all*0.9)
trains.extend(type_sample[0:num_train])
devs.extend(type_sample[num_train:num_dev])
tests.extend(type_sample[num_dev:num_all])
shuffle(trains)
shuffle(devs)
shuffle(tests)
print('the length of trains is:{0}'.format(len(trains)))
print('the length of devs is:{0}'.format(len(devs)))
print('the length of tests is:{0}'.format(len(tests)))
#--------------------------------------------------------------------
#------------------------ model processing --------------------------
#--------------------------------------------------------------------
from ClassifierRNN import NN_config,CALC_config,ClassifierRNN
# parameters used by rnns
num_layers = 1
num_units = 60
num_seqs = 1000
step_length = 10
num_steps = int(num_seqs/step_length)
embedding_size = 300
num_classes = 8
n_words = len(words_voca)
# parameters used by trainning models
batch_size = 64
num_epoch = 100
learning_rate = 0.0075
show_every_epoch = 10
nn_config = NN_config(num_seqs =num_seqs,\
num_steps = num_steps,\
num_units = num_units,\
num_classes = num_classes,\
num_layers = num_layers,\
vocab_size = n_words,\
embedding_size = embedding_size,\
use_embeddings = False,\
embedding_init = embeddings)
calc_config = CALC_config(batch_size = batch_size,\
num_epoches = num_epoch,\
learning_rate = learning_rate,\
show_every_steps = 10,\
save_every_steps = 100)
print("this is checking of nn_config:\\\n",
"out of num_seqs:{}\n".format(nn_config.num_seqs),
"out of num_steps:{}\n".format(nn_config.num_steps),
"out of num_units:{}\n".format(nn_config.num_units),
"out of num_classes:{}\n".format(nn_config.num_classes),
"out of num_layers:{}\n".format(nn_config.num_layers),
"out of vocab_size:{}\n".format(nn_config.vocab_size),
"out of embedding_size:{}\n".format(nn_config.embedding_size),
"out of use_embeddings:{}\n".format(nn_config.use_embeddings))
print("this is checing of calc_config: \\\n",
"out of batch_size {} \n".format(calc_config.batch_size),
"out of num_epoches {} \n".format(calc_config.num_epoches),
"out of learning_rate {} \n".format(calc_config.learning_rate),
"out of keep_prob {} \n".format(calc_config.keep_prob),
"out of show_every_steps {} \n".format(calc_config.show_every_steps),
"out of save_every_steps {} \n".format(calc_config.save_every_steps))
rnn_model = ClassifierRNN(nn_config,calc_config)
rnn_model.fit(trains,restart=False)
accuracy = rnn_model.predict_accuracy(devs,test=False)
print("Final accuracy of devs is {}".format(accuracy))
test_accuracy = rnn_model.predict_accuracy(tests,test=False)
print("The final accuracy of tests is :{}".format(test_accuracy))
4、模型評估, 因為在本次算例中模型數(shù)據(jù)較少,總共有2000多個樣本,相對較少,因此難免出現(xiàn)過擬合的狀態(tài),rnn在訓(xùn)練trains樣本時其準確率為接近1.0 但在進行devs和tests集合驗證的時候,發(fā)現(xiàn)準確率為6.0左右,可適當?shù)脑黾觢2 但不在本算例考慮范圍內(nèi),將本模型用于IMDB算例計算的時候,相抵25000個樣本的時候的準確率為89.0%左右。

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- 神經(jīng)網(wǎng)絡(luò)(BP)算法Python實現(xiàn)及應(yīng)用
- Python實現(xiàn)的三層BP神經(jīng)網(wǎng)絡(luò)算法示例
- Python編程實現(xiàn)的簡單神經(jīng)網(wǎng)絡(luò)算法示例
- python實現(xiàn)BP神經(jīng)網(wǎng)絡(luò)回歸預(yù)測模型
- Python與人工神經(jīng)網(wǎng)絡(luò):使用神經(jīng)網(wǎng)絡(luò)識別手寫圖像介紹
- TensorFlow平臺下Python實現(xiàn)神經(jīng)網(wǎng)絡(luò)
- Python實現(xiàn)的NN神經(jīng)網(wǎng)絡(luò)算法完整示例
- python編寫樸素貝葉斯用于文本分類
- Python使用循環(huán)神經(jīng)網(wǎng)絡(luò)解決文本分類問題的方法詳解
相關(guān)文章
使用python編寫腳本獲取手機當前應(yīng)用apk的信息
使用aapt工具獲取apk的信息,保存至腳本所在目錄下的PackageInfo.txt文件中,需要的朋友可以參考下2014-07-07
Python實現(xiàn)實時增量數(shù)據(jù)加載工具的解決方案
這篇文章主要分享結(jié)合單例模式實際應(yīng)用案例:實現(xiàn)實時增量數(shù)據(jù)加載工具的解決方案。最關(guān)鍵的是實現(xiàn)一個可進行添加、修改、刪除等操作的增量ID記錄表。需要的可以參考一下2022-02-02
Python利用pandas對數(shù)據(jù)進行特定排序
本文主要介紹了Python利用pandas對數(shù)據(jù)進行特定排序,主要使用?pandas.DataFrame.sort_values?方法,文中通過示例代碼介紹的非常詳細,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-03-03
使用NumPy讀取MNIST數(shù)據(jù)的實現(xiàn)代碼示例
這篇文章主要介紹了使用NumPy讀取MNIST數(shù)據(jù)的實現(xiàn)代碼示例,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-11-11

