TypeError: 无法将符号化的Keras输入/输出转换为numpy数组

尝试升级在这里找到的gumble-softmax-vae的精彩实现。然而,我一直收到

TypeError: Cannot convert a symbolic Keras input/output to a numpy array. 

我很困惑——尝试了很多很多方法。有趣的是,一些搜索结果返回了VAE的其他实现。我认为错误出在损失的“KL”项计算中。

这是几乎可用的代码:

import tensorflow as tffrom tensorflow import kerasimport numpy as npimport matplotlib.pyplot as pltbatch_size = 10data_dim = 784M = 10  # classesN = 30  # how many distributionsnb_epoch = 100epsilon_std = 0.01anneal_rate = 0.0003min_temperature = 0.5tau = tf.Variable(5.0, dtype=tf.float32)class Sampling(keras.layers.Layer):    def call(self, logits_y):        u = tf.random.uniform(tf.shape(logits_y), 0, 1)        y = logits_y - tf.math.log(            -tf.math.log(u + 1e-20) + 1e-20        )  # logits + gumbel noise        y = tf.nn.softmax(tf.reshape(y, (-1, N, M)) / tau)        y = tf.reshape(y, (-1, N * M))        return yencoder_inputs = keras.Input(shape=(data_dim))x = keras.layers.Dense(512, activation="relu")(encoder_inputs)x = keras.layers.Dense(256, activation="relu")(x)logits_y = keras.layers.Dense(M * N, name="logits_y")(x)z = Sampling()(logits_y)encoder = keras.Model(encoder_inputs, z, name="encoder")encoder.build(encoder_inputs)print(encoder.summary())decoder_inputs = keras.Input(shape=(N * M))x = keras.layers.Dense(256, activation="relu")(decoder_inputs)x = keras.layers.Dense(512, activation="relu")(x)decoder_outputs = keras.layers.Dense(data_dim, activation="sigmoid")(x)decoder = keras.Model(decoder_inputs, decoder_outputs, name="decoder")decoder.build(decoder_inputs)print(decoder.summary())class VAE(keras.Model):    def __init__(self, encoder, decoder, **kwargs):        super(VAE, self).__init__(**kwargs)        self.encoder = encoder        self.decoder = decoder        self.bce = tf.keras.losses.BinaryCrossentropy()        self.loss_tracker = keras.metrics.Mean(name="loss")    @property    def metrics(self):        return [self.loss_tracker]    def call(self, x):        z = self.encoder(x)        x_hat = self.decoder(z)        return x_hat    @tf.function    def gumbel_loss(self, y_true, y_pred, logits_y):        q_y = tf.reshape(logits_y, (-1, N, M))        q_y = tf.nn.softmax(q_y)        log_q_y = tf.math.log(q_y + 1e-20)        kl_tmp = q_y * (log_q_y - tf.math.log(1.0 / M))        kl = tf.math.reduce_sum(kl_tmp, axis=(1, 2))        kl = tf.squeeze(kl, axis=0)        elbo = data_dim * self.bce(y_true, y_pred) - kl        return elbo    def train_step(self, data):        x = data        with tf.GradientTape(persistent=True) as tape:            z = self.encoder(x, training=True)            x_hat = self.decoder(z, training=True)            x = tf.cast(x, dtype=tf.float32)            x_hat = tf.cast(x_hat, dtype=tf.float32)            logits_y = self.encoder.get_layer('logits_y').output            loss = self.gumbel_loss(x, x_hat, logits_y)        grads = tape.gradient(loss, self.trainable_weights)        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))        self.loss_tracker.update_state(loss)        return {"loss": self.loss_tracker.result()}def main():    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data(        path="mnist.npz"    )    x_train = x_train.astype("float32") / 255.0    x_test = x_test.astype("float32") / 255.0    x_train = x_train.reshape((len(x_train), np.prod(x_train.shape[1:])))    x_test = x_test.reshape((len(x_test), np.prod(x_test.shape[1:])))    vae = VAE(encoder, decoder, name="vae-model")    vae_inputs = (None, data_dim)    vae.build(vae_inputs)    vae.compile(optimizer="adam", loss=None)    vae.fit(        x_train,        shuffle=True,        epochs=1,        batch_size=batch_size    )if __name__ == "__main__":    main()

回答:

我认为主要问题出现在尝试从logits_y层获取输出时(据我所知),你不能这样做,相反,你需要构建一个具有两个输出的编码器模型。像这样

class VAE(keras.Model):    def __init__(self, encoder, decoder, **kwargs):        super(VAE, self).__init__(**kwargs)        # self.encoder = encoder         self.encoder = tf.keras.Model(inputs=encoder.input,                                       outputs=[encoder.get_layer(name='logits_y').output,                                                encoder.output])                whatever...

所以,在训练循环中,这个self.encoder将产生两个输出,其中一个是logit_y层的输出,这是你为某些损失函数所需要的。最后,在其他地方做一些代码更改,如下所示

def call(self, x):        _, z = self.encoder(x)        x_hat = self.decoder(z)        return x_hat@tf.function    def gumbel_loss(self, y_true, y_pred, logits_y):        q_y = tf.reshape(logits_y, (-1, N, M))        q_y = tf.nn.softmax(q_y)        log_q_y = tf.math.log(q_y + 1e-20)        kl_tmp = q_y * (log_q_y - tf.math.log(1.0 / M))        kl = tf.math.reduce_sum(kl_tmp, axis=(1, 2))        elbo = data_dim * self.bce(y_true, y_pred) - kl        return elbo

最后,train_step函数;请注意,对应的变量已经是tf.float32,不需要转换。

   def train_step(self, data):        x = data        with tf.GradientTape(persistent=True) as tape:            logits_y, z = self.encoder(x, training=True)            x_hat = self.decoder(z, training=True)            loss = self.gumbel_loss(x, x_hat, logits_y)         grads = tape.gradient(loss, self.trainable_weights)        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))        self.loss_tracker.update_state(loss)        return {"loss": self.loss_tracker.result()}

你现在不需要更改上述任何代码,以下是一些训练日志(在CPU上运行,tf 2.5)。

Epoch 1/56000/6000 [==============================] - 60s 10ms/step - loss: 54.4604Epoch 2/56000/6000 [==============================] - 60s 10ms/step - loss: 18.8960Epoch 3/56000/6000 [==============================] - 59s 10ms/step - loss: 12.1036Epoch 4/56000/6000 [==============================] - 59s 10ms/step - loss: 8.5804Epoch 5/56000/6000 [==============================] - 59s 10ms/step - loss: 6.3916

Related Posts

使用LSTM在Python中预测未来值

这段代码可以预测指定股票的当前日期之前的值,但不能预测…

如何在gensim的word2vec模型中查找双词组的相似性

我有一个word2vec模型,假设我使用的是googl…

dask_xgboost.predict 可以工作但无法显示 – 数据必须是一维的

我试图使用 XGBoost 创建模型。 看起来我成功地…

ML Tuning – Cross Validation in Spark

我在https://spark.apache.org/…

如何在React JS中使用fetch从REST API获取预测

我正在开发一个应用程序,其中Flask REST AP…

如何分析ML.NET中多类分类预测得分数组?

我在ML.NET中创建了一个多类分类项目。该项目可以对…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注