我正在尝试为一个变分自编码器(VAE)编写自定义训练循环,这个VAE由两个独立的tf.keras.Model
对象组成。这个VAE的目标是多类分类。如同通常一样,编码器模型的输出被作为输入馈送给解码器模型。解码器是一个循环解码器。同样地,VAE中涉及两个损失函数:重建损失(分类交叉熵)和潜在损失。我当前架构的灵感来源于这个github上的pytorch实现。
问题:每当我为解码器模型使用tape.gradient(loss, decoder.trainable_weights)
计算梯度时,返回的列表中每个元素都是NoneType对象。我认为我可能在使用reconstruction_tensor
时犯了一些错误,这个张量位于我下面写的代码的底部。由于我需要进行迭代解码过程,我如何使用类似reconstruction_tensor
的东西,而不返回一个梯度列表,其中包含NoneType元素?你可以使用这个colab笔记本运行代码,如果你愿意的话。
为了进一步澄清这个问题中的张量是什么样子,我将展示原始输入、将被分配预测“标记”的零张量,以及基于解码器预测的“标记”对零张量的一次更新:
原始输入张量示例,形状为(batch_size, max_seq_length, num_classes): _ _ _ _ _ _ _ _| | 1 0 0 0 | | 0 1 0 0 | | 0 0 0 1 | || | 0 1 0 0 | | 1 0 0 0 | | 1 0 0 0 | ||_ |_ 0 0 1 0 _| , |_ 0 0 0 1 _|, |_ 0 1 0 0 _| _|初始零张量: _ _ _ _ _ _ _ _| | 0 0 0 0 | | 0 0 0 0 | | 0 0 0 0 | || | 0 0 0 0 | | 0 0 0 0 | | 0 0 0 0 | ||_ |_ 0 0 0 0 _| , |_ 0 0 0 0 _|, |_ 0 0 0 0 _| _|解码循环单次迭代后的零张量示例: _ _ _ _ _ _ _ _| | 0.2 0.4 0.1 0.3 | | 0.1 0.2 0.6 0.1 | | 0.7 0.05 0.05 0.2 | || | 0 0 0 0 | | 0 0 0 0 | | 0 0 0 0 | ||_ |_ 0 0 0 0 _| , |_ 0 0 0 0 _|, |_ 0 0 0 0 _| _|
这是重现问题的代码:
# 任意数据batch_size = 3 max_seq_length = 3num_classes = 4original_inputs = tf.one_hot(tf.argmax((np.random.randn(batch_size, max_seq_length, num_classes)), axis=2), depth=num_classes)latent_dims = 5 # 必须小于(max_seq_length * num_classes)def sampling(inputs): """重参数化函数。用于Lambda层""" mus, log_vars = inputs epsilon = tf.keras.backend.random_normal(shape=tf.keras.backend.shape(mus)) z = mus + tf.keras.backend.exp(log_vars/2) * epsilon return zdef latent_loss_fxn(mus, log_vars): """返回均值和对数方差的潜在损失。""" return -0.5 * tf.keras.backend.mean(1. + log_vars - tf.keras.backend.exp(log_vars) - tf.keras.backend.pow(mus, 2))class DummyEncoder(tf.keras.Model): def __init__(self, latent_dimension): """定义隐藏层(瓶颈)和采样层""" super().__init__() self.hidden = tf.keras.layers.Dense(units=32) self.dense_mus = tf.keras.layers.Dense(units=latent_dimension) self.dense_log_vars = tf.keras.layers.Dense(units=latent_dimension) self.sampling = tf.keras.layers.Lambda(function=sampling) def call(self, inputs): """定义输出输入的z、mu、log_var的前向计算。""" dense_projection = self.hidden(inputs) mus = self.dense_mus(dense_projection) log_vars = self.dense_log_vars(dense_projection) z = self.sampling([mus, log_vars]) return z, mus, log_vars class DummyDecoder(tf.keras.Model): def __init__(self, num_classes): """定义GRU层和Dense输出层""" super().__init__() self.gru = tf.keras.layers.GRU(units=1, return_sequences=True, return_state=True) self.dense = tf.keras.layers.Dense(units=num_classes, activation='softmax') def call(self, x, hidden_states=None): """定义前向计算""" outputs, h_t = self.gru(x, hidden_states) # 这个计算的目的是使用GRU的未归一化对数 # 概率,通过Dense层中的softmax激活函数产生归一化概率 reconstructions = self.dense(outputs) return reconstructions, h_t# 实例化模型encoder_model = DummyEncoder(latent_dimension=5)decoder_model = DummyDecoder(num_classes=num_classes)# 实例化重建损失函数cce_loss_fxn = tf.keras.losses.CategoricalCrossentropy()# 开始记录with tf.GradientTape(persistent=True) as tape: # 重塑输入以供编码器使用 reshaped_inputs = tf.reshape(original_inputs, shape=(tf.shape(original_inputs)[0], -1)) # 编码输入 z, mus, log_vars = encoder_model(reshaped_inputs, training=True) # 扩展z的维度以满足循环解码器的要求 # (batch, timesteps, features) z = tf.expand_dims(z, axis=1) ################################ # 疑似问题原因 ################################ # 一个将根据模型输出修改的张量 reconstruction_tensor = tf.Variable(tf.zeros_like(original_inputs)) ################################ # 疑似问题原因结束 ################################ # 一个解码循环,以迭代生成序列中的下一个标记(即输出)... # 在批次中 hidden_states = None for ith_token in range(max_seq_length): # 重建批次中给定样本的ith_token reconstructions, hidden_states = decoder_model(z, hidden_states, training=True) # 重塑重建结果以便分配到reconstruction_tensor reconstructions = tf.squeeze(reconstructions) # 循环迭代完成后,这个张量是模型对原始输入的预测。因此,在单次迭代后, # 批次中每个样本的单个标记预测被分配到 # 这个张量中。 reconstruction_tensor = reconstruction_tensor[:, ith_token,:].assign(reconstructions) # 计算损失 recon_loss = cce_loss_fxn(original_inputs, reconstruction_tensor) latent_loss = latent_loss_fxn(mus, log_vars) loss = recon_loss + latent_loss# 计算梯度encoder_gradients = tape.gradient(loss, encoder_model.trainable_weights)decoder_gradients = tape.gradient(loss, decoder_model.trainable_weights)# 释放记录del tape# 检查梯度print('有效的编码器梯度:', not(None in encoder_gradients))print('有效的解码器梯度:', not(None in decoder_gradients), ' -- ', decoder_gradients)>>> 有效的编码器梯度: True>>> 有效的解码器梯度: False -- [None, None, None, None, None]
回答:
找到了我问题的“解决方案”:
在GradientTape()上下文管理器中使用tf.Variable一定存在一些问题。虽然我不知道那个问题是什么,但通过用列表替换reconstructions_tensor,在解码迭代中向该列表追加元素,然后堆叠该列表,可以无问题地计算梯度。colab笔记本反映了这些更改。请看下面的代码片段以了解修复方法:
........with tf.GradientTape(persistent=True) as tape: .... .... # 修复 reconstructions_tensor = [] hidden_states = None for ith_token in range(max_seq_length): # 重建批次中给定样本的ith_token reconstructions, hidden_states = decoder_model(z, hidden_states, training=True) # 重塑重建结果 reconstructions = tf.squeeze(reconstructions) # 修复 # 向将最终被堆叠的列表中追加元素 reconstructions_tensor.append(reconstructions) # 修复 # 沿axis=1堆叠重建结果以获得与之前使用零张量分配相同的结果 reconstructions_tensor = tf.stack(reconstructions_tensor, axis=1)........# 成功的梯度计算和后续的模型优化# ....
编辑1:
我认为如果有一个可以以图模式运行的模型,这个“解决方案”并不理想。我有限的理解是,图模式不适合处理像list
这样的Python对象。