使用错误批处理对不同长度的序列进行分类

我正在使用带有TensorFlow后端的Keras。我刚刚弄清楚了如何在没有掩码的情况下训练和分类不同长度的序列,因为我无法使掩码工作。在我正在处理的玩具示例中,我试图训练一个LSTM来检测任意长度的序列是否以1开头。

from keras.models import Sequentialfrom keras.layers import LSTM, Denseimport numpy as npdef gen_sig(num_samples, seq_len):    one_indices = np.random.choice(a=num_samples, size=num_samples // 2, replace=False)    x_val = np.zeros((num_samples, seq_len), dtype=np.bool)    x_val[one_indices, 0] = 1    y_val = np.zeros(num_samples, dtype=np.bool)    y_val[one_indices] = 1    return x_val, y_valN_train = 100N_test = 10recall_len = 20X_train, y_train = gen_sig(N_train, recall_len)X_test, y_test = gen_sig(N_train, recall_len)print('Build STATEFUL model...')model = Sequential()model.add(LSTM(10, batch_input_shape=(1, 1, 1), return_sequences=False, stateful=True))model.add(Dense(1, activation='sigmoid'))model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])print('Train...')for epoch in range(15):    mean_tr_acc = []    mean_tr_loss = []    for seq_idx in range(X_train.shape[0]):        start_val = X_train[seq_idx, 0]        assert y_train[seq_idx] == start_val        assert tuple(np.nonzero(X_train[seq_idx, :]))[0].shape[0] == start_val        y_in = np.array([y_train[seq_idx]], dtype=np.bool)        for j in range(np.random.choice(a=np.arange(5, recall_len+1))):            x_in = np.array([[[X_train[seq_idx][j]]]])            tr_loss, tr_acc = model.train_on_batch(x_in, y_in)            mean_tr_acc.append(tr_acc)            mean_tr_loss.append(tr_loss)            model.reset_states()    print('accuracy training = {}'.format(np.mean(mean_tr_acc)))    print('loss training = {}'.format(np.mean(mean_tr_loss)))    print('___________________________________')    mean_te_acc = []    mean_te_loss = []    for seq_idx in range(X_test.shape[0]):        start_val = X_test[seq_idx, 0]        assert y_test[seq_idx] == start_val        assert tuple(np.nonzero(X_test[seq_idx, :]))[0].shape[0] == start_val        y_in = np.array([y_test[seq_idx]], dtype=np.bool)        for j in range(np.random.choice(a=np.arange(5, recall_len+1))):            te_loss, te_acc = model.test_on_batch(np.array([[[X_test[seq_idx][j]]]], dtype=np.bool), y_in)            mean_te_acc.append(te_acc)            mean_te_loss.append(te_loss)        model.reset_states()    print('accuracy testing = {}'.format(np.mean(mean_te_acc)))    print('loss testing = {}'.format(np.mean(mean_te_loss)))    print('___________________________________')

如代码所示,我的错误是在每个时间步上批处理的。这有很多不利之处。我如何在两个步骤中训练网络?例如:

  1. 通过网络运行一组值以累积错误
  2. 根据累积的错误调整网络的权重

回答:

要实现原始问题中描述的内容,最简单的方法是使用掩码训练原始网络,然后用状态网络进行测试,这样就可以对任意长度的输入进行分类:

import numpy as npnp.random.seed(1)import tensorflow as tftf.set_random_seed(1)from keras import modelsfrom keras.layers import Dense, Masking, LSTMimport matplotlib.pyplot as pltdef stateful_model():    hidden_units = 256    model = models.Sequential()    model.add(LSTM(hidden_units, batch_input_shape=(1, 1, 1), return_sequences=False, stateful=True))    model.add(Dense(1, activation='relu', name='output'))    model.compile(loss='binary_crossentropy', optimizer='rmsprop')    return modeldef train_rnn(x_train, y_train, max_len, mask):    epochs = 10    batch_size = 200    vec_dims = 1    hidden_units = 256    in_shape = (max_len, vec_dims)    model = models.Sequential()    model.add(Masking(mask, name="in_layer", input_shape=in_shape,))    model.add(LSTM(hidden_units, return_sequences=False))    model.add(Dense(1, activation='relu', name='output'))    model.compile(loss='binary_crossentropy', optimizer='rmsprop')    model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs,              validation_split=0.05)    return modeldef gen_train_sig_cls_pair(t_stops, num_examples, mask):    x = []    y = []    max_t = int(np.max(t_stops))    for t_stop in t_stops:        one_indices = np.random.choice(a=num_examples, size=num_examples // 2, replace=False)        sig = np.zeros((num_examples, max_t), dtype=np.int8)        sig[one_indices, 0] = 1        sig[:, t_stop:] = mask        x.append(sig)        cls = np.zeros(num_examples, dtype=np.bool)        cls[one_indices] = 1        y.append(cls)    return np.concatenate(x, axis=0), np.concatenate(y, axis=0)def gen_test_sig_cls_pair(t_stops, num_examples):    x = []    y = []    for t_stop in t_stops:        one_indices = np.random.choice(a=num_examples, size=num_examples // 2, replace=False)        sig = np.zeros((num_examples, t_stop), dtype=np.bool)        sig[one_indices, 0] = 1        x.extend(list(sig))        cls = np.zeros((num_examples, t_stop), dtype=np.bool)        cls[one_indices] = 1        y.extend(list(cls))    return x, yif __name__ == '__main__':    noise_mag = 0.01    mask_val = -10    signal_lengths = (10, 15, 20)    x_in, y_in = gen_train_sig_cls_pair(signal_lengths, 10, mask_val)    mod = train_rnn(x_in[:, :, None], y_in, int(np.max(signal_lengths)), mask_val)    testing_dat, expected = gen_test_sig_cls_pair(signal_lengths, 3)    state_mod = stateful_model()    state_mod.set_weights(mod.get_weights())    res = []    for s_i in range(len(testing_dat)):        seq_in = list(testing_dat[s_i])        seq_len = len(seq_in)        for t_i in range(seq_len):            res.extend(state_mod.predict(np.array([[[seq_in[t_i]]]])))        state_mod.reset_states()    fig, axes = plt.subplots(2)    axes[0].plot(np.concatenate(testing_dat), label="input")    axes[1].plot(res, "ro", label="result", alpha=0.2)    axes[1].plot(np.concatenate(expected, axis=0), "bo", label="expected", alpha=0.2)    axes[1].legend(bbox_to_anchor=(1.1, 1))    plt.show()

Related Posts

L1-L2正则化的不同系数

我想对网络的权重同时应用L1和L2正则化。然而,我找不…

使用scikit-learn的无监督方法将列表分类成不同组别,有没有办法?

我有一系列实例,每个实例都有一份列表,代表它所遵循的不…

f1_score metric in lightgbm

我想使用自定义指标f1_score来训练一个lgb模型…

通过相关系数矩阵进行特征选择

我在测试不同的算法时,如逻辑回归、高斯朴素贝叶斯、随机…

可以将机器学习库用于流式输入和输出吗?

已关闭。此问题需要更加聚焦。目前不接受回答。 想要改进…

在TensorFlow中,queue.dequeue_up_to()方法的用途是什么?

我对这个方法感到非常困惑,特别是当我发现这个令人费解的…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注