ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 폴리시 그레이디언트
    IT&컴퓨터공학/딥러닝 2020. 12. 7. 01:08

    정책기반 강화학습 - 여기서 인공신경망은 정책신경망이라고 함 ( 때문에 출력층의 활성함수는 'softmax' 를 이용 - 합해서 1이 나와야 하므로)

     

     

     

    train.py

     

     

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    import copy
    import pylab
    import random
    import numpy as np
    from environment import Env
    import tensorflow as tf
    from tensorflow.keras.layers import Dense
    from tensorflow.keras.optimizers import Adam
     
     
    # 상태가 입력, 각 행동의 확률이 출력인 인공신경망 생성
    class REINFORCE(tf.keras.Model):
        def __init__(self, action_size):
            super(REINFORCE, self).__init__()
            self.fc1 = Dense(24, activation='relu')
            self.fc2 = Dense(24, activation='relu')
            self.fc_out = Dense(action_size, activation='softmax')
     
        def call(self, x):
            x = self.fc1(x)
            x = self.fc2(x)
            policy = self.fc_out(x)
            return policy
     
     
    # 그리드월드 예제에서의 REINFORCE 에이전트 ( 에이전트 생성하는 함수 )
    class REINFORCEAgent:
        def __init__(self, state_size, action_size):
            # 상태의 크기와 행동의 크기 정의
            self.state_size = state_size
            self.action_size = action_size
            
            # REINFORCE 하이퍼 파라메터
            self.discount_factor = 0.99 #감가율 0.99
            self.learning_rate = 0.001 #학습률 0.001
     
            self.model = REINFORCE(self.action_size) #정책신경망 생성
            self.optimizer = Adam(lr=self.learning_rate)
            self.states, self.actions, self.rewards = [], [], []
     
        # 정책신경망으로 행동 선택 ( 정책 자체가 확률적이라서 탐욕정책이 필요하지않다 )
        def get_action(self, state):
            policy = self.model(state)[0]
            policy = np.array(policy)
            return np.random.choice(self.action_size, 1, p=policy)[0]
     
        # 반환값 계산
        def discount_rewards(self, rewards):
            discounted_rewards = np.zeros_like(rewards)
            running_add = 0
            for t in reversed(range(0len(rewards))):
                running_add = running_add * self.discount_factor + rewards[t]
                discounted_rewards[t] = running_add
            return discounted_rewards
     
        # 한 에피소드 동안의 상태, 행동, 보상을 저장
        def append_sample(self, state, action, reward):
            self.states.append(state[0])
            self.rewards.append(reward)
            act = np.zeros(self.action_size)
            act[action] = 1
            self.actions.append(act)
     
        # 정책신경망 업데이트
        def train_model(self):
            #Z-score 방법으로 표준화방법으로 정규화함 -> 정책신경망 업데이트 성능이 좋아진다.
            discounted_rewards = np.float32(self.discount_rewards(self.rewards)) #보상을 discount_rewards 함수를 통해 반환 값을 return하고 반환 값을 numpy.float32형식으로 변환
            discounted_rewards -= np.mean(discounted_rewards) # 모집단의 평균을 뺀후에
            discounted_rewards /= np.std(discounted_rewards) #표준편차로 나눠준다.
            
            # 크로스 엔트로피 오류함수 계산
            model_params = self.model.trainable_variables
            with tf.GradientTape() as tape:
                tape.watch(model_params)
                policies = self.model(np.array(self.states))
                actions = np.array(self.actions)
                action_prob = tf.reduce_sum(actions * policies, axis=1)
                cross_entropy = - tf.math.log(action_prob + 1e-5#"크로스 엔트로피 구한다
                loss = tf.reduce_sum(cross_entropy * discounted_rewards)
                entropy = - policies * tf.math.log(policies)
     
            # 오류함수를 줄이는 방향으로 모델 업데이트
            grads = tape.gradient(loss, model_params)
            self.optimizer.apply_gradients(zip(grads, model_params))
            self.states, self.actions, self.rewards = [], [], []
            return np.mean(entropy)
     
     
    if __name__ == "__main__":
        # 환경과 에이전트 생성
        env = Env(render_speed=0.01)
        state_size = 15 # 상태 15개 ( 딥살사와 같음. 장애물 당 4개의 변수 , 장애물 3개 ,나머지 3개)
        action_space = [01234# 상,하,우,좌,제자리 순서
        action_size = len(action_space) # 5
        agent = REINFORCEAgent(state_size, action_size) #에이전트를 생성한다
     
        scores, episodes = [], []
     
        EPISODES = 200
        for e in range(EPISODES):
            done = False
            score = 0
            # env 초기화
            state = env.reset()
            state = np.reshape(state, [1, state_size]) #상태 list를 (1,15)의 numpy.array로 변환
     
            while not done:
                # 현재 상태에 대한 행동 선택
                action = agent.get_action(state)
     
                # 선택한 행동으로 환경에서 한 타임스텝 진행 후 샘플 수집
                next_state, reward, done = env.step(action)
                next_state = np.reshape(next_state, [1, state_size]) #다음상태 list를 (1,15)의 numpy.array로 변환
     
                agent.append_sample(state, action, reward)
                score += reward
     
                state = next_state
     
                if done:
                    # 에피소드마다 정책신경망 업데이트
                    entropy = agent.train_model()
                    # 에피소드마다 학습 결과 출력
                    print("episode: {:3d} | score: {:3d} | entropy: {:.3f}".format(
                          e, score, entropy))
     
                    scores.append(score)
                    episodes.append(e)
                    pylab.plot(episodes, scores, 'b')
                    pylab.xlabel("episode")
                    pylab.ylabel("score")
                    pylab.savefig("./save_graph/graph.png")
                    
     
            # 100 에피소드마다 모델 저장
            if e % 100 == 0:
                agent.model.save_weights('save_model/model', save_format='tf')
    cs

     

    test.py

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    import copy
    import pylab
    import random
    import numpy as np
    from environment import Env
    import tensorflow as tf
    from tensorflow.keras.layers import Dense
    from tensorflow.keras.optimizers import Adam
     
     
    # 상태가 입력, 각 행동의 확률이 출력인 인공신경망 생성
    class REINFORCE(tf.keras.Model):
        def __init__(self, action_size):
            super(REINFORCE, self).__init__()
            self.fc1 = Dense(24, activation='relu')
            self.fc2 = Dense(24, activation='relu')
            self.fc_out = Dense(action_size, activation='softmax')
     
        def call(self, x):
            x = self.fc1(x)
            x = self.fc2(x)
            policy = self.fc_out(x)
            return policy
     
     
    # 그리드월드 예제에서의 REINFORCE 에이전트
    class REINFORCEAgent:
        def __init__(self, state_size, action_size):
            # 상태의 크기와 행동의 크기 정의
            self.state_size = state_size
            self.action_size = action_size
     
            self.model = REINFORCE(self.action_size)
            self.model.load_weights('save_model/trained/model')
     
        # 정책신경망으로 행동 선택
        def get_action(self, state):
            policy = self.model(state)[0]
            policy = np.array(policy)
            return np.random.choice(self.action_size, 1, p=policy)[0]
     
     
    if __name__ == "__main__":
        # 환경과 에이전트 생성
        env = Env(render_speed=0.05)
        state_size = 15
        action_space = [01234]
        action_size = len(action_space)
        agent = REINFORCEAgent(state_size, action_size)
     
        EPISODES = 10
        for e in range(EPISODES):
            done = False
            score = 0
            # env 초기화
            state = env.reset()
            state = np.reshape(state, [1, state_size])
     
            while not done:
                # 현재 상태에 대한 행동 선택
                action = agent.get_action(state)
     
                # 선택한 행동으로 환경에서 한 타임스텝 진행 후 샘플 수집
                next_state, reward, done = env.step(action)
                next_state = np.reshape(next_state, [1, state_size])
     
                score += reward
     
                state = next_state
     
                if done:
                    print("episode: {:3d} | score: {:3d}".format(e, score))
    cs

     

    environment.py

     

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    import time
    import numpy as np
    import tkinter as tk
    from PIL import ImageTk, Image
     
    PhotoImage = ImageTk.PhotoImage
    UNIT = 50  # 픽셀 수
    HEIGHT = 5  # 그리드 세로
    WIDTH = 5  # 그리드 가로
     
    np.random.seed(1)
     
     
    class Env(tk.Tk):
        def __init__(self, render_speed=0.01):
            super(Env, self).__init__()
            self.render_speed=render_speed
            self.action_space = ['u''d''l''r']
            self.action_size = len(self.action_space)
            self.title('REINFORCE')
            self.geometry('{0}x{1}'.format(HEIGHT * UNIT, HEIGHT * UNIT))
            self.shapes = self.load_images()
            self.canvas = self._build_canvas()
            self.counter = 0
            self.rewards = []
            self.goal = []
            # 장애물 설정
            self.set_reward([01], -1)
            self.set_reward([12], -1)
            self.set_reward([23], -1)
            # 목표 지점 설정
            self.set_reward([44], 1)
     
        def _build_canvas(self):
            canvas = tk.Canvas(self, bg='white',
                               height=HEIGHT * UNIT,
                               width=WIDTH * UNIT)
            # 그리드 생성
            for c in range(0, WIDTH * UNIT, UNIT):  # 0~400 by 80
                x0, y0, x1, y1 = c, 0, c, HEIGHT * UNIT
                canvas.create_line(x0, y0, x1, y1)
            for r in range(0, HEIGHT * UNIT, UNIT):  # 0~400 by 80
                x0, y0, x1, y1 = 0, r, HEIGHT * UNIT, r
                canvas.create_line(x0, y0, x1, y1)
     
            self.rewards = []
            self.goal = []
            # 캔버스에 이미지 추가
            x, y = UNIT/2, UNIT/2
            self.rectangle = canvas.create_image(x, y, image=self.shapes[0])
     
            canvas.pack()
     
            return canvas
     
        def load_images(self):
            rectangle = PhotoImage(
                Image.open("../img/rectangle.png").resize((3030)))
            triangle = PhotoImage(
                Image.open("../img/triangle.png").resize((3030)))
            circle = PhotoImage(
                Image.open("../img/circle.png").resize((3030)))
     
            return rectangle, triangle, circle
     
        def reset_reward(self):
     
            for reward in self.rewards:
                self.canvas.delete(reward['figure'])
     
            self.rewards.clear()
            self.goal.clear()
            self.set_reward([01], -1)
            self.set_reward([12], -1)
            self.set_reward([23], -1)
     
            # #goal
            self.set_reward([44], 1)
     
        def set_reward(self, state, reward):
            state = [int(state[0]), int(state[1])]
            x = int(state[0])
            y = int(state[1])
            temp = {}
            if reward > 0:
                temp['reward'= reward
                temp['figure'= self.canvas.create_image((UNIT * x) + UNIT / 2,
                                                           (UNIT * y) + UNIT / 2,
                                                           image=self.shapes[2])
     
                self.goal.append(temp['figure'])
     
     
            elif reward < 0:
                temp['direction'= -1
                temp['reward'= reward
                temp['figure'= self.canvas.create_image((UNIT * x) + UNIT / 2,
                                                          (UNIT * y) + UNIT / 2,
                                                          image=self.shapes[1])
     
            temp['coords'= self.canvas.coords(temp['figure'])
            temp['state'= state
            self.rewards.append(temp)
     
        # new methods
        def check_if_reward(self, state):
            check_list = dict()
            check_list['if_goal'= False
            rewards = 0
     
            for reward in self.rewards:
                if reward['state'== state:
                    rewards += reward['reward']
                    if reward['reward'== 1:
                        check_list['if_goal'= True
     
            check_list['rewards'= rewards
     
            return check_list
     
        def coords_to_state(self, coords):
            x = int((coords[0- UNIT / 2/ UNIT)
            y = int((coords[1- UNIT / 2/ UNIT)
            return [x, y]
     
        def reset(self):
            self.update()
            time.sleep(0.5)
            x, y = self.canvas.coords(self.rectangle)
            self.canvas.move(self.rectangle, UNIT / 2 - x, UNIT / 2 - y)
            self.reset_reward()
            return self.get_state()
     
        def step(self, action):
            self.counter += 1
            self.render()
     
            if self.counter % 2 == 1:
                self.rewards = self.move_rewards()
     
            next_coords = self.move(self.rectangle, action)
            check = self.check_if_reward(self.coords_to_state(next_coords))
            done = check['if_goal']
            reward = check['rewards']
     
            self.canvas.tag_raise(self.rectangle)
     
            s_ = self.get_state()
     
            return s_, reward, done
     
        def get_state(self):
     
            location = self.coords_to_state(self.canvas.coords(self.rectangle))
            agent_x = location[0]
            agent_y = location[1]
     
            states = list()
     
            for reward in self.rewards:
                reward_location = reward['state']
                states.append(reward_location[0- agent_x)
                states.append(reward_location[1- agent_y)
                if reward['reward'< 0:
                    states.append(-1)
                    states.append(reward['direction'])
                else:
                    states.append(1)
     
            return states
     
        def move_rewards(self):
            new_rewards = []
            for temp in self.rewards:
                if temp['reward'== 1:
                    new_rewards.append(temp)
                    continue
                temp['coords'= self.move_const(temp)
                temp['state'= self.coords_to_state(temp['coords'])
                new_rewards.append(temp)
            return new_rewards
     
        def move_const(self, target):
     
            s = self.canvas.coords(target['figure'])
     
            base_action = np.array([00])
     
            if s[0== (WIDTH - 1* UNIT + UNIT / 2:
                target['direction'= 1
            elif s[0== UNIT / 2:
                target['direction'= -1
     
            if target['direction'== -1:
                base_action[0+= UNIT
            elif target['direction'== 1:
                base_action[0-= UNIT
     
            if (target['figure'is not self.rectangle
               and s == [(WIDTH - 1* UNIT, (HEIGHT - 1* UNIT]):
                base_action = np.array([00])
     
            self.canvas.move(target['figure'], base_action[0], base_action[1])
     
            s_ = self.canvas.coords(target['figure'])
     
            return s_
     
        def move(self, target, action):
            s = self.canvas.coords(target)
     
            base_action = np.array([00])
     
            if action == 0:  # 상
                if s[1> UNIT:
                    base_action[1-= UNIT
            elif action == 1:  # 하
                if s[1< (HEIGHT - 1* UNIT:
                    base_action[1+= UNIT
            elif action == 2:  # 우
                if s[0< (WIDTH - 1* UNIT:
                    base_action[0+= UNIT
            elif action == 3:  # 좌
                if s[0> UNIT:
                    base_action[0-= UNIT
     
            self.canvas.move(target, base_action[0], base_action[1])
     
            s_ = self.canvas.coords(target)
     
            return s_
     
        def render(self):
            # 게임 속도 조정
            time.sleep(self.render_speed)
            self.update()
     
    cs

    'IT&컴퓨터공학 > 딥러닝' 카테고리의 다른 글

    수식  (0) 2020.12.07
    DQN 코드- cartpole  (0) 2020.12.07
    딥살사 코드 - 딥러닝의 시작  (0) 2020.12.07
    큐러닝 코드 ( off policy TD control ) => 학습정책 =! 행동정책  (0) 2020.12.06
    살사 코드  (0) 2020.12.06

    댓글

Designed by Tistory.