tensorflow使用tf.data.Dataset 處理大型數(shù)據集問題
最近深度學習用到的數(shù)據集比較大,如果一次性將數(shù)據集讀入內存,那服務器是頂不住的,所以需要分批進行讀取,這里就用到了tf.data.Dataset構建數(shù)據集:
概括一下,tf.data.Dataset主要有幾個部分最重要:
- 構建生成器函數(shù)
- 使用tf.data.Dataset的from_generator函數(shù),通過指定數(shù)據類型,數(shù)據的shape等參數(shù),構建一個Dataset
- 指定batch_size
- 使用make_one_shot_iterator()函數(shù),構建一個iterator
- 使用上面構建的迭代器開始get_next() 。(必須要有這個get_next(),迭代器才會工作)
一.構建生成器
生成器的要點是要在while True中加入yield,yield的功能有點類似return,有yield才能起到迭代的作用。
我的數(shù)據是一個[6047, 6000, 1]的文本數(shù)據,我每次迭代返回的shape為[1,6000,1],要注意的是返回的shape要和構建Dataset時的shape一致,下面會說到。
代碼如下:
def gen():?? ??? ??? ??? ? ?? ??? ?train=pd.read_csv('/home/chenqiren/PycharmProjects/code/test/formal/small_sample/train2.csv', header=None) ? ? ? ? train.fillna(0, inplace = True) ? ? ? ? label_encoder = LabelEncoder().fit(train[6000]) ? ? ? ? label = label_encoder.transform(train[6000]) ? ? ? ? ? train = train.drop([6000], axis=1)? ? ? ? ? scaler = StandardScaler().fit(train.values) ? #train.values中的值是csv文件中的那些值, ? ? 這步標準化可以保留 ? ? ? ? scaled_train = scaler.transform(train.values) ? ? ? ? #print(scaled_train) ? ? ? ? #拆分訓練集和測試集-------------- ? ? ? ? sss=StratifiedShuffleSplit(test_size=0.1, random_state=23) ? ? ? ? for train_index, valid_index in sss.split(scaled_train, label): ? #需要的是數(shù)組,train.values得到的是數(shù)組 ? ? ? ? ? ? X_train, X_valid=scaled_train[train_index], scaled_train[valid_index] ?#https://www.cnblogs.com/Allen-rg/p/9453949.html ? ? ? ? ? ? y_train, y_valid=label[train_index], label[valid_index] ? ? ? ? X_train_r=np.zeros((len(X_train), 6000, 1)) ? #先構建一個框架出來,下面再賦值 ? ? ? ? X_train_r[:,: ,0]=X_train[:,0:6000] ? ?? ? ?? ? ? ? ? X_valid_r=np.zeros((len(X_valid), 6000, 1)) ? ? ? ? X_valid_r[:,: ,0]=X_valid[:,0:6000] ? ?? ? ? ? ? y_train=np_utils.to_categorical(y_train, 3) ? ? ? ? y_valid=np_utils.to_categorical(y_valid, 3) ? ? ? ?? ? ? ? ? leng=len(X_train_r) ? ? ? ? index=0 ? ? ? ? while True: ? ? ? ? ? ? x_train_batch=X_train_r[index, :, 0:1] ? ? ? ? ? ? y_train_batch=y_train[index, :] ? ? ? ? ? ? yield (x_train_batch, y_train_batch) ? ? ? ? ? ? index=index+1 ? ? ? ? ? ? if index>leng: ? ? ? ? ? ? ? ? break
代碼中while True上面的部分是標準化數(shù)據的代碼,可以不用看,只需要看 while True中的代碼即可。
x_train_batch, y_train_batch都只是一行的數(shù)據,這里是一行一行數(shù)據迭代。
二.使用tf.data.Dataset包裝生成器
data=tf.data.Dataset.from_generator(gen_1, (tf.float32, tf.float32), (tf.TensorShape([6000,1]), tf.TensorShape([3]))) data=data.batch(128) iterator=data.make_one_shot_iterator()
這里的tf.TensorShape([6000,1]) 和 tf.TensorShape([3])中的shape要和上面生成器yield返回的數(shù)據的shape一致。
data=data.batch(128)
是設置batchsize,這里設為128,在運行時,因為我們yield的是一行的數(shù)據[1, 6000, 1],所以將會循環(huán)yield夠128次,得到[128, 6000, 1],即一個batch,才會開始訓練。iterator=data.make_one_shot_iterator()
是構建迭代器,one_shot迭代器人如其名,意思就是數(shù)據輸出一次后就丟棄了。
三.獲取生成器返回的數(shù)據
x, y=iterator.get_next() x_batch, y_batch=sess.run([x,y])
注意要有get_next(),迭代器才能開始工作。
第二行是run第一行代碼。獲取訓練數(shù)據和訓練標簽。
這里做個關于yield的小筆記:
上一次迭代,yield返回了值,然后get_next()開啟了下一次迭代,此時,程序是從yield處開始運行的,也就是說,如果yield后面還有程序,那就會運行yield后面的程序。一直運行的是while True中的程序,沒有運行while True外面的程序。
下面是我寫的總的代碼??梢圆挥每?。
import os import keras import numpy as np import pandas as pd from sklearn.preprocessing import LabelEncoder from sklearn.preprocessing import StandardScaler from sklearn.model_selection import StratifiedShuffleSplit from sklearn.model_selection import train_test_split from keras.models import Sequential, Model from keras.layers import Dense, Activation, Flatten, Conv1D, Dropout, MaxPooling1D, GlobalAveragePooling1D from keras.layers import GlobalAveragePooling2D,BatchNormalization, UpSampling1D, RepeatVector,Reshape from keras.layers.core import Lambda from keras.optimizers import SGD, Adam, Adadelta from keras.utils import np_utils from keras.applications.inception_resnet_v2 import InceptionResNetV2 from keras.backend import conv3d,reshape, shape, categorical_crossentropy, mean, square from keras.applications.vgg16 import VGG16 from keras.layers import Input,LSTM from keras import regularizers from keras.utils import multi_gpu_model import tensorflow as tf import keras.backend.tensorflow_backend as KTF os.environ["CUDA_VISIBLE_DEVICES"]="2" config = tf.ConfigProto() config.gpu_options.allow_growth = True session = tf.Session(config=config) keep_prob = tf.placeholder("float") # 設置session KTF.set_session(session ) #-----生成訓練數(shù)據----------------------------------------------- def gen_1(): train=pd.read_csv('/home/chenqiren/PycharmProjects/code/test/formal/small_sample/train2.csv', header=None) train.fillna(0, inplace = True) label_encoder = LabelEncoder().fit(train[6000]) label = label_encoder.transform(train[6000]) train = train.drop([6000], axis=1) scaler = StandardScaler().fit(train.values) #train.values中的值是csv文件中的那些值, 這步標準化可以保留 scaled_train = scaler.transform(train.values) #print(scaled_train) #拆分訓練集和測試集-------------- sss=StratifiedShuffleSplit(test_size=0.1, random_state=23) for train_index, valid_index in sss.split(scaled_train, label): #需要的是數(shù)組,train.values得到的是數(shù)組 X_train, X_valid=scaled_train[train_index], scaled_train[valid_index] #https://www.cnblogs.com/Allen-rg/p/9453949.html y_train, y_valid=label[train_index], label[valid_index] X_train_r=np.zeros((len(X_train), 6000, 1)) #先構建一個框架出來,下面再賦值 #開始賦值 #https://stackoverflow.com/questions/43290202/python-typeerror-unhashable-type-slice-for-encoding-categorical-data X_train_r[:,: ,0]=X_train[:,0:6000] X_valid_r=np.zeros((len(X_valid), 6000, 1)) X_valid_r[:,: ,0]=X_valid[:,0:6000] y_train=np_utils.to_categorical(y_train, 3) y_valid=np_utils.to_categorical(y_valid, 3) leng=len(X_train_r) index=0 while True: x_train_batch=X_train_r[index, :, 0:1] y_train_batch=y_train[index, :] yield (x_train_batch, y_train_batch) index=index+1 if index>leng: break #----生成測試數(shù)據-------------------------------------- def gen_2(): train=pd.read_csv('/home/chenqiren/PycharmProjects/code/test/formal/small_sample/train2.csv', header=None) train.fillna(0, inplace = True) label_encoder = LabelEncoder().fit(train[6000]) label = label_encoder.transform(train[6000]) train = train.drop([6000], axis=1) scaler = StandardScaler().fit(train.values) #train.values中的值是csv文件中的那些值, 這步標準化可以保留 scaled_train = scaler.transform(train.values) #print(scaled_train) #拆分訓練集和測試集-------------- sss=StratifiedShuffleSplit(test_size=0.1, random_state=23) for train_index, valid_index in sss.split(scaled_train, label): #需要的是數(shù)組,train.values得到的是數(shù)組 X_train, X_valid=scaled_train[train_index], scaled_train[valid_index] #https://www.cnblogs.com/Allen-rg/p/9453949.html y_train, y_valid=label[train_index], label[valid_index] X_train_r=np.zeros((len(X_train), 6000, 1)) #先構建一個框架出來,下面再賦值 #開始賦值 #https://stackoverflow.com/questions/43290202/python-typeerror-unhashable-type-slice-for-encoding-categorical-data X_train_r[:,: ,0]=X_train[:,0:6000] X_valid_r=np.zeros((len(X_valid), 6000, 1)) X_valid_r[:,: ,0]=X_valid[:,0:6000] y_train=np_utils.to_categorical(y_train, 3) y_valid=np_utils.to_categorical(y_valid, 3) leng=len(X_valid_r) index=0 while True: x_test_batch=X_valid_r[index, :, 0:1] y_test_batch=y_valid[index, :] yield (x_test_batch, y_test_batch) index=index+1 if index>leng: break #--------------------------------------------------------------------- def custom_mean_squared_error(y_true, y_pred): return mean(square(y_pred - y_true)) def custom_categorical_crossentropy(y_true, y_pred): return categorical_crossentropy(y_true, y_pred) def loss_func(y_loss, x_loss): return categorical_crossentropy + 0.05 * mean_squared_error #建立模型 with tf.device('/cpu:0'): inputs1=tf.placeholder(tf.float32, shape=(None,6000,1)) x1=LSTM(128, return_sequences=True)(inputs1) encoded=LSTM(64 ,return_sequences=True)(x1) print('encoded shape:',shape(encoded)) #decode x1=LSTM(128, return_sequences=True)(encoded) decoded=LSTM(1, return_sequences=True,name='decode')(x1) #classify labels=tf.placeholder(tf.float32, shape=(None,3)) x2=Conv1D(20,kernel_size=50, strides=2, activation='relu' )(encoded) #步數(shù)論文中未提及,第一層 x2=MaxPooling1D(pool_size=2, strides=1)(x2) x2=Conv1D(20,kernel_size=50, strides=2, activation='relu')(x2) #第二層 x2=MaxPooling1D(pool_size=2, strides=1)(x2) x2=Dropout(0.25)(x2) x2=Conv1D(24,kernel_size=30, strides=2, activation='relu')(x2) #第三層 x2=MaxPooling1D(pool_size=2, strides=1)(x2) x2=Dropout(0.25)(x2) x2=Conv1D(24,kernel_size=30, strides=2, activation='relu')(x2) #第四層 x2=MaxPooling1D(pool_size=2, strides=1)(x2) x2=Dropout(0.25)(x2) x2=Conv1D(24,kernel_size=10, strides=2, activation='relu')(x2) #第五層 x2=MaxPooling1D(pool_size=2, strides=1)(x2) x2=Dropout(0.25)(x2) x2=Dense(192)(x2) #第一個全連接層 x2=Dense(192)(x2) #第二個全連接層 x2=Flatten()(x2) x2=Dense(3,activation='softmax', name='classify')(x2) def get_accuracy(x2, labels): current = tf.cast(tf.equal(tf.argmax(x2, 1), tf.argmax(labels, 1)), 'float') accuracy = tf.reduce_mean(current) return accuracy #實例化獲取準確率函數(shù) getAccuracy = get_accuracy(x2, labels) #定義損失函數(shù) all_loss=tf.reduce_mean(categorical_crossentropy(x2 , labels) + tf.convert_to_tensor(0.5)*square(decoded-inputs1)) train_option=tf.train.AdamOptimizer(0.01).minimize(all_loss) #----------------------------------------- #生成訓練數(shù)據 data=tf.data.Dataset.from_generator(gen_1, (tf.float32, tf.float32), (tf.TensorShape([6000,1]), tf.TensorShape([3]))) data=data.batch(128) iterator=data.make_one_shot_iterator() #生成測試數(shù)據 data2=tf.data.Dataset.from_generator(gen_2, (tf.float32, tf.float32), (tf.TensorShape([6000,1]), tf.TensorShape([3]))) data2=data2.batch(128) iterator2=data2.make_one_shot_iterator() #----------------------------------------- with tf.Session() as sess: init=tf.global_variables_initializer() sess.run(init) i=-1 for k in range(20): #----------------------------------------- x, y=iterator.get_next() x_batch, y_batch=sess.run([x,y]) print('batch shape:',x_batch.shape, y_batch.shape) #----------------------------------------- if k%2==0: print('第',k,'輪') x3=sess.run(x2, feed_dict={inputs1:x_batch, labels:y_batch }) dc=sess.run(decoded, feed_dict={inputs1:x_batch}) accuracy=sess.run(getAccuracy, feed_dict={x2:x3, labels:y_batch, keep_prob: 1.0}) loss=sess.run(all_loss, feed_dict={x2:x3, labels:y_batch, inputs1:x_batch, decoded:dc}) print("step(s): %d ----- accuracy: %g -----loss: %g" % (i, accuracy, loss)) sess.run(train_option, feed_dict={inputs1:x_batch, labels:y_batch, keep_prob: 0.5}) x, y=iterator2.get_next() x_test_batch, y_test_batch=sess.run([x,y]) print('batch shape:',x_test_batch.shape, y_test_batch.shape) x_test=sess.run(x2, feed_dict={inputs1:x_test_batch, labels:y_test_batch }) print ("test accuracy %f"%getAccuracy.eval(feed_dict={x2:x_test, labels:y_test_batch, keep_prob: 1.0}))
總結
以上為個人經驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
對django 2.x版本中models.ForeignKey()外鍵說明介紹
這篇文章主要介紹了對django 2.x版本中models.ForeignKey()外鍵說明介紹,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-03-03cv2.getStructuringElement()函數(shù)及開、閉、腐蝕、膨脹原理講解
getStructuringElement()函數(shù)可用于構造一個特定大小和形狀的結構元素,用于圖像形態(tài)學處理,這篇文章主要介紹了cv2.getStructuringElement()函數(shù)及開、閉、腐蝕、膨脹原理講解的相關資料,需要的朋友可以參考下2022-12-12python機器學習XGBoost梯度提升決策樹的高效且可擴展實現(xiàn)
這篇文章主要為大家介紹了python機器學習XGBoost梯度提升決策樹的高效且可擴展實現(xiàn),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2024-01-01python多線程實現(xiàn)同時執(zhí)行兩個while循環(huán)的操作
這篇文章主要介紹了python多線程實現(xiàn)同時執(zhí)行兩個while循環(huán)的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-05-05