尝试升级在这里找到的gumble-softmax-vae的精彩实现。然而,我一直收到
TypeError: Cannot convert a symbolic Keras input/output to a numpy array.
我很困惑——尝试了很多很多方法。有趣的是,一些搜索结果返回了VAE的其他实现。我认为错误出在损失的“KL”项计算中。
这是几乎可用的代码:
import tensorflow as tffrom tensorflow import kerasimport numpy as npimport matplotlib.pyplot as pltbatch_size = 10data_dim = 784M = 10 # classesN = 30 # how many distributionsnb_epoch = 100epsilon_std = 0.01anneal_rate = 0.0003min_temperature = 0.5tau = tf.Variable(5.0, dtype=tf.float32)class Sampling(keras.layers.Layer): def call(self, logits_y): u = tf.random.uniform(tf.shape(logits_y), 0, 1) y = logits_y - tf.math.log( -tf.math.log(u + 1e-20) + 1e-20 ) # logits + gumbel noise y = tf.nn.softmax(tf.reshape(y, (-1, N, M)) / tau) y = tf.reshape(y, (-1, N * M)) return yencoder_inputs = keras.Input(shape=(data_dim))x = keras.layers.Dense(512, activation="relu")(encoder_inputs)x = keras.layers.Dense(256, activation="relu")(x)logits_y = keras.layers.Dense(M * N, name="logits_y")(x)z = Sampling()(logits_y)encoder = keras.Model(encoder_inputs, z, name="encoder")encoder.build(encoder_inputs)print(encoder.summary())decoder_inputs = keras.Input(shape=(N * M))x = keras.layers.Dense(256, activation="relu")(decoder_inputs)x = keras.layers.Dense(512, activation="relu")(x)decoder_outputs = keras.layers.Dense(data_dim, activation="sigmoid")(x)decoder = keras.Model(decoder_inputs, decoder_outputs, name="decoder")decoder.build(decoder_inputs)print(decoder.summary())class VAE(keras.Model): def __init__(self, encoder, decoder, **kwargs): super(VAE, self).__init__(**kwargs) self.encoder = encoder self.decoder = decoder self.bce = tf.keras.losses.BinaryCrossentropy() self.loss_tracker = keras.metrics.Mean(name="loss") @property def metrics(self): return [self.loss_tracker] def call(self, x): z = self.encoder(x) x_hat = self.decoder(z) return x_hat @tf.function def gumbel_loss(self, y_true, y_pred, logits_y): q_y = tf.reshape(logits_y, (-1, N, M)) q_y = tf.nn.softmax(q_y) log_q_y = tf.math.log(q_y + 1e-20) kl_tmp = q_y * (log_q_y - tf.math.log(1.0 / M)) kl = tf.math.reduce_sum(kl_tmp, axis=(1, 2)) kl = tf.squeeze(kl, axis=0) elbo = data_dim * self.bce(y_true, y_pred) - kl return elbo def train_step(self, data): x = data with tf.GradientTape(persistent=True) as tape: z = self.encoder(x, training=True) x_hat = self.decoder(z, training=True) x = tf.cast(x, dtype=tf.float32) x_hat = tf.cast(x_hat, dtype=tf.float32) logits_y = self.encoder.get_layer('logits_y').output loss = self.gumbel_loss(x, x_hat, logits_y) grads = tape.gradient(loss, self.trainable_weights) self.optimizer.apply_gradients(zip(grads, self.trainable_weights)) self.loss_tracker.update_state(loss) return {"loss": self.loss_tracker.result()}def main(): (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( path="mnist.npz" ) x_train = x_train.astype("float32") / 255.0 x_test = x_test.astype("float32") / 255.0 x_train = x_train.reshape((len(x_train), np.prod(x_train.shape[1:]))) x_test = x_test.reshape((len(x_test), np.prod(x_test.shape[1:]))) vae = VAE(encoder, decoder, name="vae-model") vae_inputs = (None, data_dim) vae.build(vae_inputs) vae.compile(optimizer="adam", loss=None) vae.fit( x_train, shuffle=True, epochs=1, batch_size=batch_size )if __name__ == "__main__": main()
回答:
我认为主要问题出现在尝试从logits_y
层获取输出时(据我所知),你不能这样做,相反,你需要构建一个具有两个输出的编码器模型。像这样
class VAE(keras.Model): def __init__(self, encoder, decoder, **kwargs): super(VAE, self).__init__(**kwargs) # self.encoder = encoder self.encoder = tf.keras.Model(inputs=encoder.input, outputs=[encoder.get_layer(name='logits_y').output, encoder.output]) whatever...
所以,在训练循环中,这个self.encoder
将产生两个输出,其中一个是logit_y
层的输出,这是你为某些损失函数所需要的。最后,在其他地方做一些代码更改,如下所示
def call(self, x): _, z = self.encoder(x) x_hat = self.decoder(z) return x_hat@tf.function def gumbel_loss(self, y_true, y_pred, logits_y): q_y = tf.reshape(logits_y, (-1, N, M)) q_y = tf.nn.softmax(q_y) log_q_y = tf.math.log(q_y + 1e-20) kl_tmp = q_y * (log_q_y - tf.math.log(1.0 / M)) kl = tf.math.reduce_sum(kl_tmp, axis=(1, 2)) elbo = data_dim * self.bce(y_true, y_pred) - kl return elbo
最后,train_step
函数;请注意,对应的变量已经是tf.float32
,不需要转换。
def train_step(self, data): x = data with tf.GradientTape(persistent=True) as tape: logits_y, z = self.encoder(x, training=True) x_hat = self.decoder(z, training=True) loss = self.gumbel_loss(x, x_hat, logits_y) grads = tape.gradient(loss, self.trainable_weights) self.optimizer.apply_gradients(zip(grads, self.trainable_weights)) self.loss_tracker.update_state(loss) return {"loss": self.loss_tracker.result()}
你现在不需要更改上述任何代码,以下是一些训练日志(在CPU上运行,tf 2.5
)。
Epoch 1/56000/6000 [==============================] - 60s 10ms/step - loss: 54.4604Epoch 2/56000/6000 [==============================] - 60s 10ms/step - loss: 18.8960Epoch 3/56000/6000 [==============================] - 59s 10ms/step - loss: 12.1036Epoch 4/56000/6000 [==============================] - 59s 10ms/step - loss: 8.5804Epoch 5/56000/6000 [==============================] - 59s 10ms/step - loss: 6.3916