博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
tensorflow训练ai玩flappybird
阅读量:6855 次
发布时间:2019-06-26

本文共 17582 字,大约阅读时间需要 58 分钟。

hot3.png

tensorflow训练ai玩flappybird

# -*-coding: utf-8 -*-import timeimport randomimport numpy as npimport Tkinter as tkimport matplotlib.pyplot as pltimport tensorflow as tftf.enable_eager_execution()tfe = tf.contrib.eagerclass DeepQNetwork:    '''    这是ai的实现    '''    def __init__(self,                 feature_dimension,                 action_dimension,                 adventure_rate=0.1,                 learning_rate=0.01,                 reward_decay=0.9,                 experience_length=20000,                 experience_replay_cycle=100,                 learning_cycle=50,                 learning_batch_length=32,                 weights_path='./flappybird_dqn'):        '''        feature_dimension: 状态的特征数量        action_dimension: 可采取的行动数量        adventure_rate: 在学习阶段,用一点概率去采取随机行动        learning_rate: 学习率        reward_decay: 激励衰减        experience_length: 经验记忆长度        experience_replay_cycle: 参数替换周期        learning_cycle: 学习周期        learning_batch: 学习的批量        '''                self.feature_dimension = feature_dimension        self.action_dimension = action_dimension        self.adventure_rate = adventure_rate        self.learning_rate = learning_rate        self.reward_decay = reward_decay        self.experience_length = experience_length        self.experience_replay_cycle = experience_replay_cycle        self.learning_cycle = learning_cycle        self.learning_batch_length = learning_batch_length        self.model_weights_path = weights_path                self.evaluate_model = tf.keras.Sequential([            tf.keras.layers.Dense(16, activation=tf.nn.relu, input_shape=(feature_dimension,)),            tf.keras.layers.Dense(action_dimension)])                self.target_model = tf.keras.Sequential([            tf.keras.layers.Dense(16, activation=tf.nn.relu, input_shape=(feature_dimension,)),            tf.keras.layers.Dense(action_dimension)])                self.optimizer = tf.train.GradientDescentOptimizer(learning_rate=learning_rate)        self.global_step = tf.train.get_or_create_global_step()                self.experience_pool = np.zeros((experience_length, feature_dimension + 1 + 1 + feature_dimension))                self.train_loss_results = []        self.train_accuracy_results = []                self.learning_counter = 0        self.experience_counter = 0            def _loss(self, learning_batch):        q_next = self.target_model(learning_batch[:, -self.feature_dimension:]).numpy()        q = self.evaluate_model(learning_batch[:, :self.feature_dimension])        q_target = q.numpy()                batch_index = np.arange(self.learning_batch_length, dtype=np.int32)        eval_act_index = learning_batch[:, self.feature_dimension].astype(int)        reward = learning_batch[:, self.feature_dimension + 1]        q_target[batch_index, eval_act_index] = reward + self.reward_decay * np.max(q_next, axis=1)        return tf.reduce_mean(tf.squared_difference(q, q_target))        def _grad(self, learning_batch):        with tf.GradientTape() as tape:            loss_value = self._loss(learning_batch)        return loss_value, tape.gradient(loss_value, self.evaluate_model.trainable_variables)    def _train(self, learning_batch):        loss_value, grads = self._grad(learning_batch)        self.optimizer.apply_gradients(zip(grads, self.evaluate_model.variables),                                            self.global_step)                def adventure_action(self, s):        # ai给出的下一步动作        # 动作不完全是ai给出的        # 有部分需要冒险,才能训练ai        if self.experience_counter < self.experience_length or np.random.uniform() < self.adventure_rate:            return np.random.randint(0, self.action_dimension)                return self.get_action(s)        def get_action(self, s):        # 完全由ai给出下一步动作        s = s[np.newaxis, :]        q = self.evaluate_model(s)        return np.argmax(q)        def save_experience(self, s, a, r, s_next):        # 保存经验值,这是ai的学习数据        save = False        l = False        replay = False                expericence = np.hstack((s, [a, r], s_next))        self.experience_pool[self.experience_counter % self.experience_length, :] = expericence        self.experience_counter += 1        save = True                if self.experience_counter >= self.experience_length and self.experience_counter % self.learning_cycle == 0:            sample_index = np.random.choice(self.experience_length, size=self.learning_batch_length)            learning_batch = self.experience_pool[sample_index, :]            self._train(learning_batch)                        self.learning_counter += 1            l = True                if self.experience_counter >= self.experience_length and self.learning_counter % self.experience_replay_cycle == 0:            self.evaluate_model.save_weights(self.model_weights_path)            self.target_model.load_weights(self.model_weights_path)            replay = True                    return save, l, replay        def save_weights(self):        # 保存训练的ai        self.evaluate_model.save_weights(self.model_weights_path)            def load_weights(self):        # 读取训练好的ai        self.evaluate_model.load_weights(self.model_weights_path)        self.target_model.load_weights(self.model_weights_path)TUBE_WIDTH = 18TUBE_HEIGHT = 9RECT_DENSITY = 10def array_create(width, height):    array = []    for i in range(height):        line = []        for j in range(width):            line.append(0)        array.append(line)    return arraydef block_add(tube, x):    direction = random.randint(0, 1)    height = random.randint(1, TUBE_HEIGHT - 2)        start = 0    end = height    if direction > 0:        start = TUBE_HEIGHT - height        end = TUBE_HEIGHT        height = 0 - height            for i in range(start, end):        tube[i][x] = 1            return heightclass flappy_env:    '''    这是游戏的实现    '''        def __init__(self):        self.binary = []        for i in range(TUBE_WIDTH):            self.binary.append(2 ** i)                self.tube = array_create(TUBE_WIDTH, TUBE_HEIGHT)        self.bird = TUBE_HEIGHT / 2        self.tube_len = TUBE_WIDTH                self.block = [0, 0]        self.block[0] = block_add(self.tube, TUBE_WIDTH / 2 - 1)        self.block[1] = block_add(self.tube, TUBE_WIDTH - 1)            def _create_observation(self):        return self._create_observation2()        def _create_observation0(self):        observation = []        for i in range(TUBE_HEIGHT):            observation.append(0)            for j in range(TUBE_WIDTH):                observation[i] += self.binary[j] * self.tube[i][j]                         if 0 <= self.bird and self.bird < TUBE_HEIGHT and self.tube[self.bird][0] == 0:            observation[self.bird] += 1                     return np.array(observation)        def _create_observation1(self):        observation = []        for i in range(TUBE_HEIGHT):            for j in range(TUBE_WIDTH):                observation.append(self.tube[i][j])                        if 0 <= self.bird and self.bird < TUBE_HEIGHT and self.tube[self.bird][0] == 0:            observation[self.bird * TUBE_WIDTH] = 1                    return np.array(observation)        def _create_observation2(self):        observation = [self.bird, self.tube_len % TUBE_WIDTH, self.block[0], self.block[1]]        return np.array(observation)            def reset(self):        self.tube = array_create(TUBE_WIDTH, TUBE_HEIGHT)        self.bird = TUBE_HEIGHT / 2        self.tube_len = TUBE_WIDTH        self.block = [0, 0]        self.block[0] = block_add(self.tube, TUBE_WIDTH / 2 - 1)        self.block[1] = block_add(self.tube, TUBE_WIDTH - 1)            def feature_len(self):        # return TUBE_HEIGHT * TUBE_WIDTH        return 4        def action_len(self):        return 2        def step(self, action):        '''        输入        action, 动作        输出        observation,动作之后的状态        reward,该步动作的得分        done,游戏是否结束        '''        done = False        reward = 0                action = max(0, min(action, 1))        if action > 0:            self.bird -= 1        else:            self.bird += 1                    for i in range(TUBE_HEIGHT):            for j in range(TUBE_WIDTH - 1):                self.tube[i][j] = self.tube[i][j + 1]                        for i in range(TUBE_HEIGHT):            self.tube[i][TUBE_WIDTH - 1] = 0                    self.tube_len += 1        if self.tube_len % (TUBE_WIDTH / 2) == 0:            self.block[0] = self.block[1]            self.block[1] = block_add(self.tube, TUBE_WIDTH - 1)                    if self.bird < 0 or TUBE_HEIGHT <= self.bird or self.tube[self.bird][0] != 0:            done = True            reward = -10        else :            done = False            reward = 1            s = 0            for i in range(TUBE_HEIGHT):                s += self.tube[i][0]            if s > 0:                reward = 10                    observation = self._create_observation()        return observation, reward, done    static_data = []def tf_play():    '''    用训练好的ai玩游戏    '''    env = flappy_env()    env.reset()        RL = DeepQNetwork(env.feature_len(), env.action_len())    RL.load_weights()    # RL.restore_model('/home/kaie/work/web/workspace/HelloPython/src/dqnsample/flappy.ckpt')        window = tk.Tk()    canvas = tk.Canvas(window, bg='white', height=RECT_DENSITY * TUBE_HEIGHT, width=RECT_DENSITY * TUBE_WIDTH)    rect_array = []    for i in range(TUBE_HEIGHT):        rect_line = []        for j in range(TUBE_WIDTH):            rect_item = canvas.create_rectangle(RECT_DENSITY * j, RECT_DENSITY * i, RECT_DENSITY * j + RECT_DENSITY, RECT_DENSITY * i + RECT_DENSITY, fill='white')            rect_line.append(rect_item)        rect_array.append(rect_line)    canvas.pack()        def update():        for i in range(TUBE_HEIGHT):            for j in range(TUBE_WIDTH):                rect_item = rect_array[i][j]                rect_fill = 'white'                if env.tube[i][j] > 0:                    rect_fill = 'black'                canvas.itemconfig(rect_item, fill=rect_fill)                if 0 <= env.bird and env.bird < TUBE_HEIGHT:            rect_item = rect_array[env.bird][0]            canvas.itemconfig(rect_item, fill='red')                window.update()        def show_run():        env.reset()        observation = env._create_observation()        done = False        one_count = 0        while not done:            update()            time.sleep(0.1)            action = RL.get_action(observation)            observation_, reward, done = env.step(action)            observation = observation_            one_count += 1            if one_count > 2000:                done = True        def run():        action = 0        while True:            try:                action = int(input('input number(0<=x<=2):'))            except:                pass                        if action == 0:                break                        show_run()                window.destroy()        window.after(100, run)    window.mainloop()    def tf_train():    '''    训练ai玩游戏    '''    global static_data    static_data = []    env = flappy_env()    env.reset()        RL = DeepQNetwork(env.feature_len(), env.action_len())        window = tk.Tk()    canvas = tk.Canvas(window, bg='white', height=RECT_DENSITY * TUBE_HEIGHT, width=RECT_DENSITY * TUBE_WIDTH)    rect_array = []    for i in range(TUBE_HEIGHT):        rect_line = []        for j in range(TUBE_WIDTH):            rect_item = canvas.create_rectangle(RECT_DENSITY * j, RECT_DENSITY * i, RECT_DENSITY * j + RECT_DENSITY, RECT_DENSITY * i + RECT_DENSITY, fill='white')            rect_line.append(rect_item)        rect_array.append(rect_line)    canvas.pack()        def show_run():        env.reset()        observation = env._create_observation()        done = False        one_count = 0        while not done:            update()            time.sleep(0.2)            action = RL.get_action(observation)            observation_, reward, done = env.step(action)            observation = observation_            one_count += 1            if one_count > 2000:                done = True        def tst_rl(count):        score_max = 0        score_min = 10000000000        score_sum = 0        for i in range(count):            env.reset()            observation = env._create_observation()            done = False            one_count = 0            while not done:                action = RL.get_action(observation)                observation_, reward, done = env.step(action)                observation = observation_                one_count += 1                if one_count > 2000:                    done = True                if done:                    score_max = max(env.tube_len, score_max)                    score_min = min(env.tube_len, score_min)                    score_sum += env.tube_len        return score_min, score_max, score_sum / count        def update():        for i in range(TUBE_HEIGHT):            for j in range(TUBE_WIDTH):                rect_item = rect_array[i][j]                rect_fill = 'white'                if env.tube[i][j] > 0:                    rect_fill = 'black'                canvas.itemconfig(rect_item, fill=rect_fill)                if 0 <= env.bird and env.bird < TUBE_HEIGHT:            rect_item = rect_array[env.bird][0]            canvas.itemconfig(rect_item, fill='red')                window.update()            def run():        global static_data        step = 0        score_sum = 0        score_max = 0        all_done = False        for count in range(1000001):  # 1000001            env.reset()            observation = env._create_observation()                        done = False                        one_count = 0            while not done:                action = RL.adventure_action(observation)                observation_, reward, done = env.step(action)                s, l, r = RL.save_experience(observation, action, reward, observation_)                                    observation = observation_                one_count += 1                if one_count > 2000:                    done = True                                    if l and RL.learning_counter % 100 == 0:                    done = True                                    if done:                    score_max = max(env.tube_len, score_max)                    score_sum += env.tube_len                                        if count % 50 == 0:                        tst_data = tst_rl(20)                        print count, tst_data[0], tst_data[1], tst_data[2]                        static_data.append([count, tst_data[0], tst_data[1], tst_data[2]])                        if tst_data[0] >= 2000:                            all_done = True                    break                                step += 1                            if all_done:                break;                    for i in range(5):            tst_data = tst_rl(20)            count += 50            print count, tst_data[0], tst_data[1], tst_data[2]            static_data.append([count, tst_data[0], tst_data[1], tst_data[2]])                    # RL.save_model('/home/kaie/work/web/workspace/HelloPython/src/dqnsample/flappy.ckpt')        RL.save_weights()                action = 0        while True:            try:                action = int(input('input number(0<=x<=2):'))            except:                pass                        if action == 0:                break                        show_run()                window.destroy()        window.after(100, run)    window.mainloop()        static_data = np.array(static_data)    plt.plot(static_data[:, 0], static_data[:, 1])    plt.plot(static_data[:, 0], static_data[:, 2])    plt.plot(static_data[:, 0], static_data[:, 3])    plt.show()    plt.plot(RL.loss_history)    plt.show()    def tk_play():    '''    人工玩游戏    '''    env = flappy_env()    env.reset()        window = tk.Tk()    canvas = tk.Canvas(window, bg='white', height=RECT_DENSITY * TUBE_HEIGHT, width=RECT_DENSITY * TUBE_WIDTH)    rect_array = []    for i in range(TUBE_HEIGHT):        rect_line = []        for j in range(TUBE_WIDTH):            rect_item = canvas.create_rectangle(RECT_DENSITY * j, RECT_DENSITY * i, RECT_DENSITY * j + RECT_DENSITY, RECT_DENSITY * i + RECT_DENSITY, fill='white')            rect_line.append(rect_item)        rect_array.append(rect_line)    canvas.pack()        def update():        for i in range(TUBE_HEIGHT):            for j in range(TUBE_WIDTH):                rect_item = rect_array[i][j]                rect_fill = 'white'                if env.tube[i][j] > 0:                    rect_fill = 'black'                canvas.itemconfig(rect_item, fill=rect_fill)                if 0 <= env.bird and env.bird < TUBE_HEIGHT:            rect_item = rect_array[env.bird][0]            canvas.itemconfig(rect_item, fill='red')                window.update()        def run():        done = False        while not done:            update()                        action = 3            while not (0 <= action and action <= 2):                try:                    action = int(input('input number(0<=x<=2):'))                except:                    pass                        if action == 2:                done = True                break                        observation, reward, done = env.step(action)                print 'game over:%d' % env.tube_len        window.destroy()            window.after(100, run)    window.mainloop()if __name__ == '__main__':    tf_train()    # tf_play()

转载于:https://my.oschina.net/kyle960/blog/2046067

你可能感兴趣的文章