如何让TensorFlow + Keras在使用TFRecord数据集时运行得更快?

使用TensorFlow的TFRecord和Keras模型以及tf.session.run()时,如何保持数据集在张量中的示例是什么?同时使用队列运行器?

下面是一个工作的代码片段,但需要以下改进:

  • 使用模型API
  • 指定一个Input()
  • 从TFRecord加载数据集
  • 并行处理数据集(例如使用队列运行器)

这是代码片段,其中有几行TODO指示需要做的事情:

from keras.models import Modelimport tensorflow as tffrom keras import backend as Kfrom keras.layers import Dense, Inputfrom keras.objectives import categorical_crossentropyfrom tensorflow.examples.tutorials.mnist import input_datasess = tf.Session()K.set_session(sess)# 可以比使用占位符和TFRecords更有效地完成吗?img = tf.placeholder(tf.float32, shape=(None, 784))labels = tf.placeholder(tf.float32, shape=(None, 10))# TODO: 使用Input() x = Dense(128, activation='relu')(img)x = Dense(128, activation='relu')(x)preds = Dense(10, activation='softmax')(x)# TODO: 构建模型 = Model(input=inputs, output=preds)loss = tf.reduce_mean(categorical_crossentropy(labels, preds))# TODO: 处理TFRecord数据,它是一样的吗?mnist_data = input_data.read_data_sets('MNIST_data', one_hot=True)train_step = tf.train.GradientDescentOptimizer(0.5).minimize(loss)sess.run(tf.global_variables_initializer())# TODO 删除默认,添加队列运行器with sess.as_default():    for i in range(1000):        batch = mnist_data.train.next_batch(50)        train_step.run(feed_dict={img: batch[0],                                  labels: batch[1]})    print(loss.eval(feed_dict={img:    mnist_data.test.images,                                labels: mnist_data.test.labels}))

为什么这个问题相关?

以下是语义分割问题示例的一些起始信息:


回答:

更新2018-08-29,这现在在keras中直接支持,参见以下示例:

https://github.com/keras-team/keras/blob/master/examples/mnist_tfrecord.py

原始答案:

通过使用外部损失来支持TFRecords。这是构建外部损失的关键行:

# tf yield ops 提供数据集图像和标签x_train_batch, y_train_batch = read_and_decode_recordinput(...)# 创建一个基本的CNNx_train_input = Input(tensor=x_train_batch)x_train_out = cnn_layers(x_train_input)model = Model(inputs=x_train_input, outputs=x_train_out)loss = keras.losses.categorical_crossentropy(y_train_batch, x_train_out)model.add_loss(loss)model.compile(optimizer='rmsprop', loss=None)

这是Keras 2的示例。在应用小补丁#7060后可以工作:

'''使用TensorFlow TFRecords的MNIST数据集。在12个epoch后达到99.25%的测试准确率(仍有很大的参数调整空间)。'''import osimport copyimport timeimport numpy as npimport tensorflow as tffrom tensorflow.python.ops import data_flow_opsfrom keras import backend as Kfrom keras.models import Modelfrom keras.layers import Densefrom keras.layers import Dropoutfrom keras.layers import Flattenfrom keras.layers import Inputfrom keras.layers import Conv2Dfrom keras.layers import MaxPooling2Dfrom keras.callbacks import EarlyStoppingfrom keras.callbacks import TensorBoardfrom keras.objectives import categorical_crossentropyfrom keras.utils import np_utilsfrom keras.utils.generic_utils import Progbarfrom keras import callbacks as cbksfrom keras import optimizers, objectivesfrom keras import metrics as metrics_modulefrom keras.datasets import mnistif K.backend() != 'tensorflow':    raise RuntimeError('这个例子只能使用TensorFlow后端运行,'                       '因为它需要TFRecords,这在其他平台上不受支持。')def images_to_tfrecord(images, labels, filename):    def _int64_feature(value):        return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))    def _bytes_feature(value):        return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))    """ 将数据保存到TFRecord中 """    if not os.path.isfile(filename):        num_examples = images.shape[0]        rows = images.shape[1]        cols = images.shape[2]        depth = images.shape[3]        print('正在写入', filename)        writer = tf.python_io.TFRecordWriter(filename)        for index in range(num_examples):            image_raw = images[index].tostring()            example = tf.train.Example(features=tf.train.Features(feature={                'height': _int64_feature(rows),                'width': _int64_feature(cols),                'depth': _int64_feature(depth),                'label': _int64_feature(int(labels[index])),                'image_raw': _bytes_feature(image_raw)}))            writer.write(example.SerializeToString())        writer.close()    else:        print('tfrecord %s 已经存在' % filename)def read_and_decode_recordinput(tf_glob, one_hot=True, classes=None, is_train=None,                                batch_shape=[1000, 28, 28, 1], parallelism=1):    """ 返回读取TFRecord的张量 """    print '正在为加载 %s TFRecords 创建图...' % tf_glob    with tf.variable_scope("TFRecords"):        record_input = data_flow_ops.RecordInput(            tf_glob, batch_size=batch_shape[0], parallelism=parallelism)        records_op = record_input.get_yield_op()        records_op = tf.split(records_op, batch_shape[0], 0)        records_op = [tf.reshape(record, []) for record in records_op]        progbar = Progbar(len(records_op))        images = []        labels = []        for i, serialized_example in enumerate(records_op):            progbar.update(i)            with tf.variable_scope("parse_images", reuse=True):                features = tf.parse_single_example(                    serialized_example,                    features={                        'label': tf.FixedLenFeature([], tf.int64),                        'image_raw': tf.FixedLenFeature([], tf.string),                    })                img = tf.decode_raw(features['image_raw'], tf.uint8)                img.set_shape(batch_shape[1] * batch_shape[2])                img = tf.reshape(img, [1] + batch_shape[1:])                img = tf.cast(img, tf.float32) * (1. / 255) - 0.5                label = tf.cast(features['label'], tf.int32)                if one_hot and classes:                    label = tf.one_hot(label, classes)                images.append(img)                labels.append(label)        images = tf.parallel_stack(images, 0)        labels = tf.parallel_stack(labels, 0)        images = tf.cast(images, tf.float32)        images = tf.reshape(images, shape=batch_shape)        # StagingArea将跨多个步骤存储张量        # 以加速执行        images_shape = images.get_shape()        labels_shape = labels.get_shape()        copy_stage = data_flow_ops.StagingArea(            [tf.float32, tf.float32],            shapes=[images_shape, labels_shape])        copy_stage_op = copy_stage.put(            [images, labels])        staged_images, staged_labels = copy_stage.get()        return images, labelsdef save_mnist_as_tfrecord():    (X_train, y_train), (X_test, y_test) = mnist.load_data()    X_train = X_train[..., np.newaxis]    X_test = X_test[..., np.newaxis]    images_to_tfrecord(images=X_train, labels=y_train, filename='train.mnist.tfrecord')    images_to_tfrecord(images=X_test, labels=y_test, filename='test.mnist.tfrecord')def cnn_layers(x_train_input):    x = Conv2D(32, (3, 3), activation='relu', padding='valid')(x_train_input)    x = Conv2D(64, (3, 3), activation='relu')(x)    x = MaxPooling2D(pool_size=(2, 2))(x)    x = Dropout(0.25)(x)    x = Flatten()(x)    x = Dense(128, activation='relu')(x)    x = Dropout(0.5)(x)    x_train_out = Dense(classes,                        activation='softmax',                        name='x_train_out')(x)    return x_train_outsess = tf.Session()K.set_session(sess)save_mnist_as_tfrecord()batch_size = 100batch_shape = [batch_size, 28, 28, 1]epochs = 3000classes = 10parallelism = 10x_train_batch, y_train_batch = read_and_decode_recordinput(    'train.mnist.tfrecord',    one_hot=True,    classes=classes,    is_train=True,    batch_shape=batch_shape,    parallelism=parallelism)x_test_batch, y_test_batch = read_and_decode_recordinput(    'test.mnist.tfrecord',    one_hot=True,    classes=classes,    is_train=True,    batch_shape=batch_shape,    parallelism=parallelism)x_batch_shape = x_train_batch.get_shape().as_list()y_batch_shape = y_train_batch.get_shape().as_list()x_train_input = Input(tensor=x_train_batch, batch_shape=x_batch_shape)x_train_out = cnn_layers(x_train_input)y_train_in_out = Input(tensor=y_train_batch, batch_shape=y_batch_shape, name='y_labels')cce = categorical_crossentropy(y_train_batch, x_train_out)train_model = Model(inputs=[x_train_input], outputs=[x_train_out])train_model.add_loss(cce)train_model.compile(optimizer='rmsprop',                    loss=None,                    metrics=['accuracy'])train_model.summary()tensorboard = TensorBoard()# 由于Keras bug,禁用tensorboardtrain_model.fit(batch_size=batch_size,                epochs=epochs)  # callbacks=[tensorboard])train_model.save_weights('saved_wt.h5')K.clear_session()# 第二个会话,纯Keras(X_train, y_train), (X_test, y_test) = mnist.load_data()X_train = X_train[..., np.newaxis]X_test = X_test[..., np.newaxis]x_test_inp = Input(batch_shape=(None,) + (X_test.shape[1:]))test_out = cnn_layers(x_test_inp)test_model = Model(inputs=x_test_inp, outputs=test_out)test_model.load_weights('saved_wt.h5')test_model.compile(optimizer='rmsprop', loss='categorical_crossentropy', metrics=['accuracy'])test_model.summary()loss, acc = test_model.evaluate(X_test, np_utils.to_categorical(y_test), classes)print('\n测试准确率: {0}'.format(acc))

我还在以下问题和拉取请求中努力改进对TFRecords的支持:

  • #6928 支持Yield Op:通过TFRecords和RecordInput实现高性能大数据集
  • #7102 Keras输入张量API设计提案

最后,可以使用tf.contrib.learn.Experiment在TensorFlow中训练Keras模型。

Related Posts

使用LSTM在Python中预测未来值

这段代码可以预测指定股票的当前日期之前的值,但不能预测…

如何在gensim的word2vec模型中查找双词组的相似性

我有一个word2vec模型,假设我使用的是googl…

dask_xgboost.predict 可以工作但无法显示 – 数据必须是一维的

我试图使用 XGBoost 创建模型。 看起来我成功地…

ML Tuning – Cross Validation in Spark

我在https://spark.apache.org/…

如何在React JS中使用fetch从REST API获取预测

我正在开发一个应用程序,其中Flask REST AP…

如何分析ML.NET中多类分类预测得分数组?

我在ML.NET中创建了一个多类分类项目。该项目可以对…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注