ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • DQN 코드- cartpole
    IT&컴퓨터공학/딥러닝 2020. 12. 7. 16:47

    DQN : 타겟신경망 / 학습신경망 다르게 운영

     

    타켓신경망은 에피소드마다 업데이트

    학습신경망은 스텝마다 업데이트

     

    리플레이 메모리 : 샘플을 저장해 두는 저장소

    if 연속적인 샘플들이 샘플링 되면 학습 속도가 매우느려짐

    이를 방지하기 위해서 샘플들을 리플레이 메모리에 저장해두고, 나중에 여기서 랜덤샘플링을 해서 학습 시킴

     

    train.py

     

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    import os
    import sys
    import gym
    import pylab
    import random
    import numpy as np
    from collections import deque # 리플레이 메모리로 사용할 deque 자료구조 import
    import tensorflow as tf
    from tensorflow.keras.layers import Dense
    from tensorflow.keras.optimizers import Adam
    from tensorflow.keras.initializers import RandomUniform
     
     
    # 상태가 입력, 큐함수가 출력인 인공신경망 생성
    class DQN(tf.keras.Model):
        def __init__(self, action_size):
            super(DQN, self).__init__()
            self.fc1 = Dense(24, activation='relu'#은닉층1
            self.fc2 = Dense(24, activation='relu'#은닉층2
            self.fc_out = Dense(action_size, #두개의 노드 (행동의 개수가 2라서)
                                kernel_initializer=RandomUniform(-1e-3, 1e-3))
     
        def call(self, x):
            x = self.fc1(x)
            x = self.fc2(x)
            q = self.fc_out(x)
            return q
     
     
    # 카트폴 예제에서의 DQN 에이전트
    class DQNAgent:
        def __init__(self, state_size, action_size): #생성자
            self.render = False # 학습하는거 그래픽으로 보고싶으면 True 로 고치면 보임
     
            # 상태와 행동의 크기 정의
            self.state_size = state_size
            self.action_size = action_size
     
            # DQN 하이퍼파라미터
            self.discount_factor = 0.99 # 감가율 0.99
            self.learning_rate = 0.001 # 학습률 0.001
            self.epsilon = 1.0 # 처음에는 무조건 탐험
            self.epsilon_decay = 0.999 # 0.999 씩 곱해가면서
            self.epsilon_min = 0.01 #입실론의 최소는 0.01
            self.batch_size = 64 #샘플링할 크기
            self.train_start = 1000 # 샘플이 1000 개 이상 쌓이면 model 훈련시작함.
     
            # 리플레이 메모리, 최대 크기 2000 리플레이 메모리 : 강화학습에서 학습의 재료가 되는 샘플을 저장해두는 저장소
            self.memory = deque(maxlen=2000)
     
            # 모델과 타깃 모델 생성 ( 학습모델과 타겟모델을 나눴다는게 바로 off policy - 큐러닝 )
            self.model = DQN(action_size) # 학습 모델
            self.target_model = DQN(action_size) #타겟 모델
            self.optimizer = Adam(lr=self.learning_rate)
     
            # 타깃 모델 초기화 - 학습을 시작하기 전에 두 모델의 가중치 값을 통일 ( 같게 시작 )
            self.update_target_model()
     
        # 타깃 모델을 학습 모델의 가중치로 업데이트
        def update_target_model(self):
            self.target_model.set_weights(self.model.get_weights())
     
        # 입실론 탐욕 정책으로 행동 선택
        def get_action(self, state):
            if np.random.rand() <= self.epsilon:
                return random.randrange(self.action_size)
            else:
                q_value = self.model(state)
                return np.argmax(q_value[0])
     
        # 샘플 <s, a, r, s'>을 리플레이 메모리에 저장
        def append_sample(self, state, action, reward, next_state, done):
            self.memory.append((state, action, reward, next_state, done))
     
        # 리플레이 메모리에서 무작위로 추출한 배치로 모델 학습
        def train_model(self):
            if self.epsilon > self.epsilon_min:
                self.epsilon *= self.epsilon_decay
     
            # 메모리에서 배치 크기(64)만큼 무작위로 샘플 추출
            mini_batch = random.sample(self.memory, self.batch_size)
     
            states = np.array([sample[0][0for sample in mini_batch])
            actions = np.array([sample[1for sample in mini_batch])
            rewards = np.array([sample[2for sample in mini_batch])
            next_states = np.array([sample[3][0for sample in mini_batch])
            dones = np.array([sample[4for sample in mini_batch])
     
            # 학습 파라메터
            model_params = self.model.trainable_variables
            with tf.GradientTape() as tape:
                # 현재 상태에 대한 모델의 큐함수
                predicts = self.model(states) #예측값
                one_hot_action = tf.one_hot(actions, self.action_size) #one_hot 함수를 통해 실제로 한 행동이 1이고 나머지가 0인 one_hot vector 구함
                predicts = tf.reduce_sum(one_hot_action * predicts, axis=1#모든 차원을 제거하고 단 하나의 스칼라 값을 출력함
     
                # 다음 상태에 대한 타깃 모델의 큐함수
                target_predicts = self.target_model(next_states)
                target_predicts = tf.stop_gradient(target_predicts) # 학습과정에서 타켓모델이 학습되는걸 방지
     
                # 벨만 최적 방정식을 이용한 업데이트 타깃
                max_q = np.amax(target_predicts, axis=-1# 다음상태에 대한 큐함수중 가장 큰 값
                targets = rewards + (1 - dones) * self.discount_factor * max_q #타겟값
                #dones = true인 경우 다음 큐함수는 없으니까 그냥 rewards
                loss = tf.reduce_mean(tf.square(targets - predicts)) # MSE 계산
     
            # 오류함수를 줄이는 방향으로 모델 업데이트
            grads = tape.gradient(loss, model_params)
            self.optimizer.apply_gradients(zip(grads, model_params))
     
     
    if __name__ == "__main__":
        # CartPole-v1 환경, 최대 타임스텝 수가 500
        env = gym.make('CartPole-v1')
        # state = 카트의 수평선 상의 위치, 카트의 속도, 폴의 수직선으로부터 기운 각도, 폴의 각속도
        state_size = env.observation_space.shape[0# state_size = 4
        # action = 왼쪽, 오른쪽 이동
        action_size = env.action_space.n # action_size = 2
     
        # DQN 에이전트 생성
        agent = DQNAgent(state_size, action_size)
     
        scores, episodes = [], []
        score_avg = 0
     
        num_episode = 100
        for e in range(num_episode):
            done = False
            score = 0
            # env 초기화
            state = env.reset() #[ 0.02680346  0.00147292 -0.01816686 -0.01559384]
            state = np.reshape(state, [1, state_size]) #[[ 0.02680346  0.00147292 -0.01816686 -0.01559384]]
     
            while not done:
                if agent.render:
                    env.render()
     
                # 현재 상태로 행동을 선택
                action = agent.get_action(state)
                # 선택한 행동으로 환경에서 한 타임스텝 진행
                next_state, reward, done, info = env.step(action)
                next_state = np.reshape(next_state, [1, state_size])
     
                # 타임스텝마다 보상 0.1, 에피소드가 중간에 끝나면 -1 보상
                score += reward
                reward = 0.1 if not done or score == 500 else -1
     
                # 리플레이 메모리에 샘플 <s, a, r, s'> 저장
                agent.append_sample(state, action, reward, next_state, done)
                # 매 타임스텝마다 학습모델 학습
                if len(agent.memory) >= agent.train_start:
                    agent.train_model()
     
                state = next_state
     
                if done: #에피소드가 끝난경우에
                    # 각 에피소드마다 타깃 모델을 모델의 가중치로 업데이트 ( 타겟모델은 에피소드마다, 학습모델은 스텝마다 )
                    agent.update_target_model()
                    # 에피소드마다 학습 결과 출력
                    score_avg = 0.9 * score_avg + 0.1 * score if score_avg != 0 else score
                    print("episode: {:3d} | score avg: {:3.2f} | memory length: {:4d} | epsilon: {:.4f}".format(
                          e, score_avg, len(agent.memory), agent.epsilon))
     
                    # 에피소드마다 학습 결과 그래프로 저장
                    scores.append(score_avg)
                    episodes.append(e)
                    pylab.plot(episodes, scores, 'b')
                    pylab.xlabel("episode")
                    pylab.ylabel("average score")
                    pylab.savefig("./save_graph/graph.png")
     
                    # 이동 평균이 400 이상일 때 종료
                    if score_avg > 400:
                        agent.model.save_weights("./save_model/model", save_format="tf")
                        sys.exit()
    cs

    test.py

     

     

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    import sys
    import gym
    import pylab
    import random
    import numpy as np
    from collections import deque
    import tensorflow as tf
    from tensorflow.keras.layers import Dense
    from tensorflow.keras.initializers import RandomUniform
     
     
    # 상태가 입력, 큐함수가 출력인 인공신경망 생성
    class DQN(tf.keras.Model):
        def __init__(self, action_size):
            super(DQN, self).__init__()
            self.fc1 = Dense(24, activation='relu')
            self.fc2 = Dense(24, activation='relu')
            self.fc_out = Dense(action_size,
                                kernel_initializer=RandomUniform(-1e-3, 1e-3))
     
        def call(self, x):
            x = self.fc1(x)
            x = self.fc2(x)
            q = self.fc_out(x)
            return q
     
     
    # 카트폴 예제에서의 DQN 에이전트
    class DQNAgent:
        def __init__(self, state_size, action_size):
            # 상태와 행동의 크기 정의
            self.state_size = state_size
            self.action_size = action_size
     
            # 모델과 타깃 모델 생성
            self.model = DQN(action_size)
            self.model.load_weights("./save_model/trained/model")
     
        # 입실론 탐욕 정책으로 행동 선택
        def get_action(self, state):
            q_value = self.model(state)
            return np.argmax(q_value[0])
     
     
    if __name__ == "__main__":
        # CartPole-v1 환경, 최대 타임스텝 수가 500
        env = gym.make('CartPole-v1')
        state_size = env.observation_space.shape[0]
        action_size = env.action_space.n
     
        # DQN 에이전트 생성
        agent = DQNAgent(state_size, action_size)
     
        num_episode = 10
        for e in range(num_episode):
            done = False
            score = 0
            # env 초기화
            state = env.reset()
            state = np.reshape(state, [1, state_size])
     
            while not done:
                env.render()
     
                # 현재 상태로 행동을 선택
                action = agent.get_action(state)
                # 선택한 행동으로 환경에서 한 타임스텝 진행
                next_state, reward, done, info = env.step(action)
                next_state = np.reshape(next_state, [1, state_size])
     
                score += reward
                state = next_state
     
                if done:
                    # 에피소드마다 학습 결과 출력
                    print("episode: {:3d} | score: {:.3f} ".format(e, score))
    cs

    댓글

Designed by Tistory.