tensorflow训练ai玩flappybird
# -*-coding: utf-8 -*-import timeimport randomimport numpy as npimport Tkinter as tkimport matplotlib.pyplot as pltimport tensorflow as tftf.enable_eager_execution()tfe = tf.contrib.eagerclass DeepQNetwork: ''' 这是ai的实现 ''' def __init__(self, feature_dimension, action_dimension, adventure_rate=0.1, learning_rate=0.01, reward_decay=0.9, experience_length=20000, experience_replay_cycle=100, learning_cycle=50, learning_batch_length=32, weights_path='./flappybird_dqn'): ''' feature_dimension: 状态的特征数量 action_dimension: 可采取的行动数量 adventure_rate: 在学习阶段,用一点概率去采取随机行动 learning_rate: 学习率 reward_decay: 激励衰减 experience_length: 经验记忆长度 experience_replay_cycle: 参数替换周期 learning_cycle: 学习周期 learning_batch: 学习的批量 ''' self.feature_dimension = feature_dimension self.action_dimension = action_dimension self.adventure_rate = adventure_rate self.learning_rate = learning_rate self.reward_decay = reward_decay self.experience_length = experience_length self.experience_replay_cycle = experience_replay_cycle self.learning_cycle = learning_cycle self.learning_batch_length = learning_batch_length self.model_weights_path = weights_path self.evaluate_model = tf.keras.Sequential([ tf.keras.layers.Dense(16, activation=tf.nn.relu, input_shape=(feature_dimension,)), tf.keras.layers.Dense(action_dimension)]) self.target_model = tf.keras.Sequential([ tf.keras.layers.Dense(16, activation=tf.nn.relu, input_shape=(feature_dimension,)), tf.keras.layers.Dense(action_dimension)]) self.optimizer = tf.train.GradientDescentOptimizer(learning_rate=learning_rate) self.global_step = tf.train.get_or_create_global_step() self.experience_pool = np.zeros((experience_length, feature_dimension + 1 + 1 + feature_dimension)) self.train_loss_results = [] self.train_accuracy_results = [] self.learning_counter = 0 self.experience_counter = 0 def _loss(self, learning_batch): q_next = self.target_model(learning_batch[:, -self.feature_dimension:]).numpy() q = self.evaluate_model(learning_batch[:, :self.feature_dimension]) q_target = q.numpy() batch_index = np.arange(self.learning_batch_length, dtype=np.int32) eval_act_index = learning_batch[:, self.feature_dimension].astype(int) reward = learning_batch[:, self.feature_dimension + 1] q_target[batch_index, eval_act_index] = reward + self.reward_decay * np.max(q_next, axis=1) return tf.reduce_mean(tf.squared_difference(q, q_target)) def _grad(self, learning_batch): with tf.GradientTape() as tape: loss_value = self._loss(learning_batch) return loss_value, tape.gradient(loss_value, self.evaluate_model.trainable_variables) def _train(self, learning_batch): loss_value, grads = self._grad(learning_batch) self.optimizer.apply_gradients(zip(grads, self.evaluate_model.variables), self.global_step) def adventure_action(self, s): # ai给出的下一步动作 # 动作不完全是ai给出的 # 有部分需要冒险,才能训练ai if self.experience_counter < self.experience_length or np.random.uniform() < self.adventure_rate: return np.random.randint(0, self.action_dimension) return self.get_action(s) def get_action(self, s): # 完全由ai给出下一步动作 s = s[np.newaxis, :] q = self.evaluate_model(s) return np.argmax(q) def save_experience(self, s, a, r, s_next): # 保存经验值,这是ai的学习数据 save = False l = False replay = False expericence = np.hstack((s, [a, r], s_next)) self.experience_pool[self.experience_counter % self.experience_length, :] = expericence self.experience_counter += 1 save = True if self.experience_counter >= self.experience_length and self.experience_counter % self.learning_cycle == 0: sample_index = np.random.choice(self.experience_length, size=self.learning_batch_length) learning_batch = self.experience_pool[sample_index, :] self._train(learning_batch) self.learning_counter += 1 l = True if self.experience_counter >= self.experience_length and self.learning_counter % self.experience_replay_cycle == 0: self.evaluate_model.save_weights(self.model_weights_path) self.target_model.load_weights(self.model_weights_path) replay = True return save, l, replay def save_weights(self): # 保存训练的ai self.evaluate_model.save_weights(self.model_weights_path) def load_weights(self): # 读取训练好的ai self.evaluate_model.load_weights(self.model_weights_path) self.target_model.load_weights(self.model_weights_path)TUBE_WIDTH = 18TUBE_HEIGHT = 9RECT_DENSITY = 10def array_create(width, height): array = [] for i in range(height): line = [] for j in range(width): line.append(0) array.append(line) return arraydef block_add(tube, x): direction = random.randint(0, 1) height = random.randint(1, TUBE_HEIGHT - 2) start = 0 end = height if direction > 0: start = TUBE_HEIGHT - height end = TUBE_HEIGHT height = 0 - height for i in range(start, end): tube[i][x] = 1 return heightclass flappy_env: ''' 这是游戏的实现 ''' def __init__(self): self.binary = [] for i in range(TUBE_WIDTH): self.binary.append(2 ** i) self.tube = array_create(TUBE_WIDTH, TUBE_HEIGHT) self.bird = TUBE_HEIGHT / 2 self.tube_len = TUBE_WIDTH self.block = [0, 0] self.block[0] = block_add(self.tube, TUBE_WIDTH / 2 - 1) self.block[1] = block_add(self.tube, TUBE_WIDTH - 1) def _create_observation(self): return self._create_observation2() def _create_observation0(self): observation = [] for i in range(TUBE_HEIGHT): observation.append(0) for j in range(TUBE_WIDTH): observation[i] += self.binary[j] * self.tube[i][j] if 0 <= self.bird and self.bird < TUBE_HEIGHT and self.tube[self.bird][0] == 0: observation[self.bird] += 1 return np.array(observation) def _create_observation1(self): observation = [] for i in range(TUBE_HEIGHT): for j in range(TUBE_WIDTH): observation.append(self.tube[i][j]) if 0 <= self.bird and self.bird < TUBE_HEIGHT and self.tube[self.bird][0] == 0: observation[self.bird * TUBE_WIDTH] = 1 return np.array(observation) def _create_observation2(self): observation = [self.bird, self.tube_len % TUBE_WIDTH, self.block[0], self.block[1]] return np.array(observation) def reset(self): self.tube = array_create(TUBE_WIDTH, TUBE_HEIGHT) self.bird = TUBE_HEIGHT / 2 self.tube_len = TUBE_WIDTH self.block = [0, 0] self.block[0] = block_add(self.tube, TUBE_WIDTH / 2 - 1) self.block[1] = block_add(self.tube, TUBE_WIDTH - 1) def feature_len(self): # return TUBE_HEIGHT * TUBE_WIDTH return 4 def action_len(self): return 2 def step(self, action): ''' 输入 action, 动作 输出 observation,动作之后的状态 reward,该步动作的得分 done,游戏是否结束 ''' done = False reward = 0 action = max(0, min(action, 1)) if action > 0: self.bird -= 1 else: self.bird += 1 for i in range(TUBE_HEIGHT): for j in range(TUBE_WIDTH - 1): self.tube[i][j] = self.tube[i][j + 1] for i in range(TUBE_HEIGHT): self.tube[i][TUBE_WIDTH - 1] = 0 self.tube_len += 1 if self.tube_len % (TUBE_WIDTH / 2) == 0: self.block[0] = self.block[1] self.block[1] = block_add(self.tube, TUBE_WIDTH - 1) if self.bird < 0 or TUBE_HEIGHT <= self.bird or self.tube[self.bird][0] != 0: done = True reward = -10 else : done = False reward = 1 s = 0 for i in range(TUBE_HEIGHT): s += self.tube[i][0] if s > 0: reward = 10 observation = self._create_observation() return observation, reward, done static_data = []def tf_play(): ''' 用训练好的ai玩游戏 ''' env = flappy_env() env.reset() RL = DeepQNetwork(env.feature_len(), env.action_len()) RL.load_weights() # RL.restore_model('/home/kaie/work/web/workspace/HelloPython/src/dqnsample/flappy.ckpt') window = tk.Tk() canvas = tk.Canvas(window, bg='white', height=RECT_DENSITY * TUBE_HEIGHT, width=RECT_DENSITY * TUBE_WIDTH) rect_array = [] for i in range(TUBE_HEIGHT): rect_line = [] for j in range(TUBE_WIDTH): rect_item = canvas.create_rectangle(RECT_DENSITY * j, RECT_DENSITY * i, RECT_DENSITY * j + RECT_DENSITY, RECT_DENSITY * i + RECT_DENSITY, fill='white') rect_line.append(rect_item) rect_array.append(rect_line) canvas.pack() def update(): for i in range(TUBE_HEIGHT): for j in range(TUBE_WIDTH): rect_item = rect_array[i][j] rect_fill = 'white' if env.tube[i][j] > 0: rect_fill = 'black' canvas.itemconfig(rect_item, fill=rect_fill) if 0 <= env.bird and env.bird < TUBE_HEIGHT: rect_item = rect_array[env.bird][0] canvas.itemconfig(rect_item, fill='red') window.update() def show_run(): env.reset() observation = env._create_observation() done = False one_count = 0 while not done: update() time.sleep(0.1) action = RL.get_action(observation) observation_, reward, done = env.step(action) observation = observation_ one_count += 1 if one_count > 2000: done = True def run(): action = 0 while True: try: action = int(input('input number(0<=x<=2):')) except: pass if action == 0: break show_run() window.destroy() window.after(100, run) window.mainloop() def tf_train(): ''' 训练ai玩游戏 ''' global static_data static_data = [] env = flappy_env() env.reset() RL = DeepQNetwork(env.feature_len(), env.action_len()) window = tk.Tk() canvas = tk.Canvas(window, bg='white', height=RECT_DENSITY * TUBE_HEIGHT, width=RECT_DENSITY * TUBE_WIDTH) rect_array = [] for i in range(TUBE_HEIGHT): rect_line = [] for j in range(TUBE_WIDTH): rect_item = canvas.create_rectangle(RECT_DENSITY * j, RECT_DENSITY * i, RECT_DENSITY * j + RECT_DENSITY, RECT_DENSITY * i + RECT_DENSITY, fill='white') rect_line.append(rect_item) rect_array.append(rect_line) canvas.pack() def show_run(): env.reset() observation = env._create_observation() done = False one_count = 0 while not done: update() time.sleep(0.2) action = RL.get_action(observation) observation_, reward, done = env.step(action) observation = observation_ one_count += 1 if one_count > 2000: done = True def tst_rl(count): score_max = 0 score_min = 10000000000 score_sum = 0 for i in range(count): env.reset() observation = env._create_observation() done = False one_count = 0 while not done: action = RL.get_action(observation) observation_, reward, done = env.step(action) observation = observation_ one_count += 1 if one_count > 2000: done = True if done: score_max = max(env.tube_len, score_max) score_min = min(env.tube_len, score_min) score_sum += env.tube_len return score_min, score_max, score_sum / count def update(): for i in range(TUBE_HEIGHT): for j in range(TUBE_WIDTH): rect_item = rect_array[i][j] rect_fill = 'white' if env.tube[i][j] > 0: rect_fill = 'black' canvas.itemconfig(rect_item, fill=rect_fill) if 0 <= env.bird and env.bird < TUBE_HEIGHT: rect_item = rect_array[env.bird][0] canvas.itemconfig(rect_item, fill='red') window.update() def run(): global static_data step = 0 score_sum = 0 score_max = 0 all_done = False for count in range(1000001): # 1000001 env.reset() observation = env._create_observation() done = False one_count = 0 while not done: action = RL.adventure_action(observation) observation_, reward, done = env.step(action) s, l, r = RL.save_experience(observation, action, reward, observation_) observation = observation_ one_count += 1 if one_count > 2000: done = True if l and RL.learning_counter % 100 == 0: done = True if done: score_max = max(env.tube_len, score_max) score_sum += env.tube_len if count % 50 == 0: tst_data = tst_rl(20) print count, tst_data[0], tst_data[1], tst_data[2] static_data.append([count, tst_data[0], tst_data[1], tst_data[2]]) if tst_data[0] >= 2000: all_done = True break step += 1 if all_done: break; for i in range(5): tst_data = tst_rl(20) count += 50 print count, tst_data[0], tst_data[1], tst_data[2] static_data.append([count, tst_data[0], tst_data[1], tst_data[2]]) # RL.save_model('/home/kaie/work/web/workspace/HelloPython/src/dqnsample/flappy.ckpt') RL.save_weights() action = 0 while True: try: action = int(input('input number(0<=x<=2):')) except: pass if action == 0: break show_run() window.destroy() window.after(100, run) window.mainloop() static_data = np.array(static_data) plt.plot(static_data[:, 0], static_data[:, 1]) plt.plot(static_data[:, 0], static_data[:, 2]) plt.plot(static_data[:, 0], static_data[:, 3]) plt.show() plt.plot(RL.loss_history) plt.show() def tk_play(): ''' 人工玩游戏 ''' env = flappy_env() env.reset() window = tk.Tk() canvas = tk.Canvas(window, bg='white', height=RECT_DENSITY * TUBE_HEIGHT, width=RECT_DENSITY * TUBE_WIDTH) rect_array = [] for i in range(TUBE_HEIGHT): rect_line = [] for j in range(TUBE_WIDTH): rect_item = canvas.create_rectangle(RECT_DENSITY * j, RECT_DENSITY * i, RECT_DENSITY * j + RECT_DENSITY, RECT_DENSITY * i + RECT_DENSITY, fill='white') rect_line.append(rect_item) rect_array.append(rect_line) canvas.pack() def update(): for i in range(TUBE_HEIGHT): for j in range(TUBE_WIDTH): rect_item = rect_array[i][j] rect_fill = 'white' if env.tube[i][j] > 0: rect_fill = 'black' canvas.itemconfig(rect_item, fill=rect_fill) if 0 <= env.bird and env.bird < TUBE_HEIGHT: rect_item = rect_array[env.bird][0] canvas.itemconfig(rect_item, fill='red') window.update() def run(): done = False while not done: update() action = 3 while not (0 <= action and action <= 2): try: action = int(input('input number(0<=x<=2):')) except: pass if action == 2: done = True break observation, reward, done = env.step(action) print 'game over:%d' % env.tube_len window.destroy() window.after(100, run) window.mainloop()if __name__ == '__main__': tf_train() # tf_play()