编辑(1/3/16): 相关GitHub问题
我正在使用Tensorflow(Python接口)实现一个使用函数逼近的q-learning
代理,并使用随机梯度下降
进行训练。
在实验的每次迭代中,代理中的一个步骤函数会被调用,该函数根据新的奖励和激活值更新逼近器的参数,然后选择一个新的动作执行。
这里是问题(使用强化学习术语):
- 代理计算其状态-动作值预测以选择一个动作。
- 然后将控制权返回给另一个程序,该程序模拟环境中的一步。
- 现在,代理的步骤函数被调用以进行下一次迭代。我想使用Tensorflow的Optimizer类来为我计算梯度。然而,这需要我上一步计算的状态-动作值预测及其图。因此:
- 如果我在整个图上运行优化器,那么它必须重新计算状态-动作值预测。
- 但是,如果我将预测(对于选择的动作)存储为变量,然后将其作为占位符提供给优化器,它就不再具有计算梯度所需的图。
- 我不能在同一个
sess.run()
语句中运行所有这些,因为我必须放弃控制并返回选择的动作,以便获得下一个观察和奖励(用于损失函数的目标)。
那么,有没有一种方法可以(不使用强化学习术语):
- 计算我的图的一部分,返回值1。
- 将值1返回给调用程序以计算值2
- 在下一次迭代中,使用值2作为梯度下降的损失函数的一部分,而不重新计算计算值1的图部分。
当然,我已经考虑了一些显而易见的解决方案:
-
直接硬编码梯度:这对于我现在使用的非常简单的逼近器来说很容易,但如果我在大型卷积网络中尝试不同的滤波器和激活函数,这将非常不便。如果可能的话,我真的很想使用Optimizer类。
-
在代理内部调用环境模拟:这个系统这样做,但这会使我的系统更加复杂,并去掉很多模块化和结构。所以,我不想这样做。
我已经多次阅读了API和白皮书,但似乎找不到解决方案。我试图想出一种方法将目标输入到图中以计算梯度,但无法想出一种方法来自动构建该图。
如果这在TensorFlow中还不行,你认为实现这个作为一个新操作符会很复杂吗?(我已经有几年没用C++了,所以TensorFlow的源代码看起来有点吓人。)或者我最好改用像Torch这样的东西,它有命令式微分Autograd,而不是符号微分?
感谢你花时间帮助我解决这个问题。我尽量让这个问题尽可能简洁。
编辑:经过进一步的搜索,我发现了这个之前问过的问题。它与我的问题有点不同(他们试图避免在Torch中每次迭代更新LSTM网络两次),而且还没有答案。
如果有帮助,这里是一些代码:
'''-Q-Learning agent for a grid-world environment.-Receives input as raw RGB pixel representation of the screen.-Uses an artificial neural network function approximator with one hidden layer2015 Jonathon Byrd'''import randomimport sys#import copyfrom rlglue.agent.Agent import Agentfrom rlglue.agent import AgentLoader as AgentLoaderfrom rlglue.types import Actionfrom rlglue.types import Observationimport tensorflow as tfimport numpy as npworld_size = (3,3)total_spaces = world_size[0] * world_size[1]class simple_agent(Agent): #Contants discount_factor = tf.constant(0.5, name="discount_factor") learning_rate = tf.constant(0.01, name="learning_rate") exploration_rate = tf.Variable(0.2, name="exploration_rate") # used to be a constant :P hidden_layer_size = 12 #Network Parameters - weights and biases W = [tf.Variable(tf.truncated_normal([total_spaces * 3, hidden_layer_size], stddev=0.1), name="layer_1_weights"), tf.Variable(tf.truncated_normal([hidden_layer_size,4], stddev=0.1), name="layer_2_weights")] b = [tf.Variable(tf.zeros([hidden_layer_size]), name="layer_1_biases"), tf.Variable(tf.zeros([4]), name="layer_2_biases")] #Input placeholders - observation and reward screen = tf.placeholder(tf.float32, shape=[1, total_spaces * 3], name="observation") #input pixel rgb values reward = tf.placeholder(tf.float32, shape=[], name="reward") #last step data last_obs = np.array([1, 2, 3], ndmin=4) last_act = -1 #Last step placeholders last_screen = tf.placeholder(tf.float32, shape=[1, total_spaces * 3], name="previous_observation") last_move = tf.placeholder(tf.int32, shape = [], name="previous_action") next_prediction = tf.placeholder(tf.float32, shape = [], name="next_prediction") step_count = 0 def __init__(self): #Initialize computational graphs self.q_preds = self.Q(self.screen) self.last_q_preds = self.Q(self.last_screen) self.action = self.choose_action(self.q_preds) self.next_pred = self.max_q(self.q_preds) self.last_pred = self.act_to_pred(self.last_move, self.last_q_preds) # inefficient recomputation self.loss = self.error(self.last_pred, self.reward, self.next_prediction) self.train = self.learn(self.loss) #Summaries and Statistics tf.scalar_summary(['loss'], self.loss) tf.scalar_summary('reward', self.reward) #w_hist = tf.histogram_summary("weights", self.W[0]) self.summary_op = tf.merge_all_summaries() self.sess = tf.Session() self.summary_writer = tf.train.SummaryWriter('tensorlogs', graph_def=self.sess.graph_def) def agent_init(self,taskSpec): print("agent_init called") self.sess.run(tf.initialize_all_variables()) def agent_start(self,observation): #print("agent_start called, observation = {0}".format(observation.intArray)) o = np.divide(np.reshape(np.asarray(observation.intArray), (1,total_spaces * 3)), 255) return self.control(o) def agent_step(self,reward, observation): #print("agent_step called, observation = {0}".format(observation.intArray)) print("step, reward: {0}".format(reward)) o = np.divide(np.reshape(np.asarray(observation.intArray), (1,total_spaces * 3)), 255) next_prediction = self.sess.run([self.next_pred], feed_dict={self.screen:o})[0] if self.step_count % 10 == 0: summary_str = self.sess.run([self.summary_op, self.train], feed_dict={self.reward:reward, self.last_screen:self.last_obs, self.last_move:self.last_act, self.next_prediction:next_prediction})[0] self.summary_writer.add_summary(summary_str, global_step=self.step_count) else: self.sess.run([self.train], feed_dict={self.screen:o, self.reward:reward, self.last_screen:self.last_obs, self.last_move:self.last_act, self.next_prediction:next_prediction}) return self.control(o) def control(self, observation): results = self.sess.run([self.action], feed_dict={self.screen:observation}) action = results[0] self.last_act = action self.last_obs = observation if (action==0): # convert action integer to direction character action = 'u' elif (action==1): action = 'l' elif (action==2): action = 'r' elif (action==3): action = 'd' returnAction=Action() returnAction.charArray=[action] #print("return action returned {0}".format(action)) self.step_count += 1 return returnAction def Q(self, obs): #calculates state-action value prediction with feed-forward neural net with tf.name_scope('network_inference') as scope: h1 = tf.nn.relu(tf.matmul(obs, self.W[0]) + self.b[0]) q_preds = tf.matmul(h1, self.W[1]) + self.b[1] #linear activation return tf.reshape(q_preds, shape=[4]) def choose_action(self, q_preds): #chooses action epsilon-greedily with tf.name_scope('action_choice') as scope: exploration_roll = tf.random_uniform([]) #greedy_action = tf.argmax(q_preds, 0) # gets the action with the highest predicted Q-value #random_action = tf.cast(tf.floor(tf.random_uniform([], maxval=4.0)), tf.int64) #exploration rate updates #if self.step_count % 10000 == 0: #self.exploration_rate.assign(tf.div(self.exploration_rate, 2)) return tf.select(tf.greater_equal(exploration_roll, self.exploration_rate), tf.argmax(q_preds, 0), #greedy_action tf.cast(tf.floor(tf.random_uniform([], maxval=4.0)), tf.int64)) #random_action ''' Why does this return NoneType?: flag = tf.select(tf.greater_equal(exploration_roll, self.exploration_rate), 'g', 'r') if flag == 'g': #greedy return tf.argmax(q_preds, 0) # gets the action with the highest predicted Q-value elif flag == 'r': #random return tf.cast(tf.floor(tf.random_uniform([], maxval=4.0)), tf.int64) ''' def error(self, last_pred, r, next_pred): with tf.name_scope('loss_function') as scope: y = tf.add(r, tf.mul(self.discount_factor, next_pred)) #target return tf.square(tf.sub(y, last_pred)) #squared difference error def learn(self, loss): #Update parameters using stochastic gradient descent #TODO: Either figure out how to avoid computing the q-prediction twice or just hardcode the gradients. with tf.name_scope('train') as scope: return tf.train.GradientDescentOptimizer(self.learning_rate).minimize(loss, var_list=[self.W[0], self.W[1], self.b[0], self.b[1]]) def max_q(self, q_preds): with tf.name_scope('greedy_estimate') as scope: return tf.reduce_max(q_preds) #best predicted action from current state def act_to_pred(self, a, preds): #get the value prediction for action a with tf.name_scope('get_prediction') as scope: return tf.slice(preds, tf.reshape(a, shape=[1]), [1]) def agent_end(self,reward): pass def agent_cleanup(self): self.sess.close() pass def agent_message(self,inMessage): if inMessage=="what is your name?": return "my name is simple_agent"; else: return "I don't know how to respond to your message";if __name__=="__main__": AgentLoader.loadAgent(simple_agent())
回答:
目前,在Tensorflow(0.6版)中实现你想要的功能非常困难。你最好的选择是硬着头皮多次调用run,尽管这会导致重新计算激活值。然而,我们内部非常清楚这个问题。目前正在开发一个“部分运行”的原型解决方案,但目前还没有完成的时间表。由于一个真正满意的答案可能需要修改TensorFlow本身,你也可以为此创建一个GitHub问题,看看是否有其他人对此有任何看法。
编辑:现在已经支持实验性的partial_run。https://github.com/tensorflow/tensorflow/blob/master/tensorflow/python/client/session.py#L317