如何让TensorFlow + Keras在使用TFRecord数据集时运行得更快?

使用TensorFlow的TFRecord和Keras模型以及tf.session.run()时,如何保持数据集在张量中的示例是什么?同时使用队列运行器?

下面是一个工作的代码片段,但需要以下改进:

  • 使用模型API
  • 指定一个Input()
  • 从TFRecord加载数据集
  • 并行处理数据集(例如使用队列运行器)

这是代码片段,其中有几行TODO指示需要做的事情:

from keras.models import Modelimport tensorflow as tffrom keras import backend as Kfrom keras.layers import Dense, Inputfrom keras.objectives import categorical_crossentropyfrom tensorflow.examples.tutorials.mnist import input_datasess = tf.Session()K.set_session(sess)# 可以比使用占位符和TFRecords更有效地完成吗?img = tf.placeholder(tf.float32, shape=(None, 784))labels = tf.placeholder(tf.float32, shape=(None, 10))# TODO: 使用Input() x = Dense(128, activation='relu')(img)x = Dense(128, activation='relu')(x)preds = Dense(10, activation='softmax')(x)# TODO: 构建模型 = Model(input=inputs, output=preds)loss = tf.reduce_mean(categorical_crossentropy(labels, preds))# TODO: 处理TFRecord数据,它是一样的吗?mnist_data = input_data.read_data_sets('MNIST_data', one_hot=True)train_step = tf.train.GradientDescentOptimizer(0.5).minimize(loss)sess.run(tf.global_variables_initializer())# TODO 删除默认,添加队列运行器with sess.as_default():    for i in range(1000):        batch = mnist_data.train.next_batch(50)        train_step.run(feed_dict={img: batch[0],                                  labels: batch[1]})    print(loss.eval(feed_dict={img:    mnist_data.test.images,                                labels: mnist_data.test.labels}))

为什么这个问题相关?

以下是语义分割问题示例的一些起始信息:


回答:

更新2018-08-29,这现在在keras中直接支持,参见以下示例:

https://github.com/keras-team/keras/blob/master/examples/mnist_tfrecord.py

原始答案:

通过使用外部损失来支持TFRecords。这是构建外部损失的关键行:

# tf yield ops 提供数据集图像和标签x_train_batch, y_train_batch = read_and_decode_recordinput(...)# 创建一个基本的CNNx_train_input = Input(tensor=x_train_batch)x_train_out = cnn_layers(x_train_input)model = Model(inputs=x_train_input, outputs=x_train_out)loss = keras.losses.categorical_crossentropy(y_train_batch, x_train_out)model.add_loss(loss)model.compile(optimizer='rmsprop', loss=None)

这是Keras 2的示例。在应用小补丁#7060后可以工作:

'''使用TensorFlow TFRecords的MNIST数据集。在12个epoch后达到99.25%的测试准确率(仍有很大的参数调整空间)。'''import osimport copyimport timeimport numpy as npimport tensorflow as tffrom tensorflow.python.ops import data_flow_opsfrom keras import backend as Kfrom keras.models import Modelfrom keras.layers import Densefrom keras.layers import Dropoutfrom keras.layers import Flattenfrom keras.layers import Inputfrom keras.layers import Conv2Dfrom keras.layers import MaxPooling2Dfrom keras.callbacks import EarlyStoppingfrom keras.callbacks import TensorBoardfrom keras.objectives import categorical_crossentropyfrom keras.utils import np_utilsfrom keras.utils.generic_utils import Progbarfrom keras import callbacks as cbksfrom keras import optimizers, objectivesfrom keras import metrics as metrics_modulefrom keras.datasets import mnistif K.backend() != 'tensorflow':    raise RuntimeError('这个例子只能使用TensorFlow后端运行,'                       '因为它需要TFRecords,这在其他平台上不受支持。')def images_to_tfrecord(images, labels, filename):    def _int64_feature(value):        return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))    def _bytes_feature(value):        return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))    """ 将数据保存到TFRecord中 """    if not os.path.isfile(filename):        num_examples = images.shape[0]        rows = images.shape[1]        cols = images.shape[2]        depth = images.shape[3]        print('正在写入', filename)        writer = tf.python_io.TFRecordWriter(filename)        for index in range(num_examples):            image_raw = images[index].tostring()            example = tf.train.Example(features=tf.train.Features(feature={                'height': _int64_feature(rows),                'width': _int64_feature(cols),                'depth': _int64_feature(depth),                'label': _int64_feature(int(labels[index])),                'image_raw': _bytes_feature(image_raw)}))            writer.write(example.SerializeToString())        writer.close()    else:        print('tfrecord %s 已经存在' % filename)def read_and_decode_recordinput(tf_glob, one_hot=True, classes=None, is_train=None,                                batch_shape=[1000, 28, 28, 1], parallelism=1):    """ 返回读取TFRecord的张量 """    print '正在为加载 %s TFRecords 创建图...' % tf_glob    with tf.variable_scope("TFRecords"):        record_input = data_flow_ops.RecordInput(            tf_glob, batch_size=batch_shape[0], parallelism=parallelism)        records_op = record_input.get_yield_op()        records_op = tf.split(records_op, batch_shape[0], 0)        records_op = [tf.reshape(record, []) for record in records_op]        progbar = Progbar(len(records_op))        images = []        labels = []        for i, serialized_example in enumerate(records_op):            progbar.update(i)            with tf.variable_scope("parse_images", reuse=True):                features = tf.parse_single_example(                    serialized_example,                    features={                        'label': tf.FixedLenFeature([], tf.int64),                        'image_raw': tf.FixedLenFeature([], tf.string),                    })                img = tf.decode_raw(features['image_raw'], tf.uint8)                img.set_shape(batch_shape[1] * batch_shape[2])                img = tf.reshape(img, [1] + batch_shape[1:])                img = tf.cast(img, tf.float32) * (1. / 255) - 0.5                label = tf.cast(features['label'], tf.int32)                if one_hot and classes:                    label = tf.one_hot(label, classes)                images.append(img)                labels.append(label)        images = tf.parallel_stack(images, 0)        labels = tf.parallel_stack(labels, 0)        images = tf.cast(images, tf.float32)        images = tf.reshape(images, shape=batch_shape)        # StagingArea将跨多个步骤存储张量        # 以加速执行        images_shape = images.get_shape()        labels_shape = labels.get_shape()        copy_stage = data_flow_ops.StagingArea(            [tf.float32, tf.float32],            shapes=[images_shape, labels_shape])        copy_stage_op = copy_stage.put(            [images, labels])        staged_images, staged_labels = copy_stage.get()        return images, labelsdef save_mnist_as_tfrecord():    (X_train, y_train), (X_test, y_test) = mnist.load_data()    X_train = X_train[..., np.newaxis]    X_test = X_test[..., np.newaxis]    images_to_tfrecord(images=X_train, labels=y_train, filename='train.mnist.tfrecord')    images_to_tfrecord(images=X_test, labels=y_test, filename='test.mnist.tfrecord')def cnn_layers(x_train_input):    x = Conv2D(32, (3, 3), activation='relu', padding='valid')(x_train_input)    x = Conv2D(64, (3, 3), activation='relu')(x)    x = MaxPooling2D(pool_size=(2, 2))(x)    x = Dropout(0.25)(x)    x = Flatten()(x)    x = Dense(128, activation='relu')(x)    x = Dropout(0.5)(x)    x_train_out = Dense(classes,                        activation='softmax',                        name='x_train_out')(x)    return x_train_outsess = tf.Session()K.set_session(sess)save_mnist_as_tfrecord()batch_size = 100batch_shape = [batch_size, 28, 28, 1]epochs = 3000classes = 10parallelism = 10x_train_batch, y_train_batch = read_and_decode_recordinput(    'train.mnist.tfrecord',    one_hot=True,    classes=classes,    is_train=True,    batch_shape=batch_shape,    parallelism=parallelism)x_test_batch, y_test_batch = read_and_decode_recordinput(    'test.mnist.tfrecord',    one_hot=True,    classes=classes,    is_train=True,    batch_shape=batch_shape,    parallelism=parallelism)x_batch_shape = x_train_batch.get_shape().as_list()y_batch_shape = y_train_batch.get_shape().as_list()x_train_input = Input(tensor=x_train_batch, batch_shape=x_batch_shape)x_train_out = cnn_layers(x_train_input)y_train_in_out = Input(tensor=y_train_batch, batch_shape=y_batch_shape, name='y_labels')cce = categorical_crossentropy(y_train_batch, x_train_out)train_model = Model(inputs=[x_train_input], outputs=[x_train_out])train_model.add_loss(cce)train_model.compile(optimizer='rmsprop',                    loss=None,                    metrics=['accuracy'])train_model.summary()tensorboard = TensorBoard()# 由于Keras bug,禁用tensorboardtrain_model.fit(batch_size=batch_size,                epochs=epochs)  # callbacks=[tensorboard])train_model.save_weights('saved_wt.h5')K.clear_session()# 第二个会话,纯Keras(X_train, y_train), (X_test, y_test) = mnist.load_data()X_train = X_train[..., np.newaxis]X_test = X_test[..., np.newaxis]x_test_inp = Input(batch_shape=(None,) + (X_test.shape[1:]))test_out = cnn_layers(x_test_inp)test_model = Model(inputs=x_test_inp, outputs=test_out)test_model.load_weights('saved_wt.h5')test_model.compile(optimizer='rmsprop', loss='categorical_crossentropy', metrics=['accuracy'])test_model.summary()loss, acc = test_model.evaluate(X_test, np_utils.to_categorical(y_test), classes)print('\n测试准确率: {0}'.format(acc))

我还在以下问题和拉取请求中努力改进对TFRecords的支持:

  • #6928 支持Yield Op:通过TFRecords和RecordInput实现高性能大数据集
  • #7102 Keras输入张量API设计提案

最后,可以使用tf.contrib.learn.Experiment在TensorFlow中训练Keras模型。

Related Posts

Keras Dense层输入未被展平

这是我的测试代码: from keras import…

无法将分类变量输入随机森林

我有10个分类变量和3个数值变量。我在分割后直接将它们…

如何在Keras中对每个输出应用Sigmoid函数?

这是我代码的一部分。 model = Sequenti…

如何选择类概率的最佳阈值?

我的神经网络输出是一个用于多标签分类的预测类概率表: …

在Keras中使用深度学习得到不同的结果

我按照一个教程使用Keras中的深度神经网络进行文本分…

‘MatMul’操作的输入’b’类型为float32,与参数’a’的类型float64不匹配

我写了一个简单的TensorFlow代码,但不断遇到T…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注