ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 딥살사 코드 - 딥러닝의 시작
    IT&컴퓨터공학/딥러닝 2020. 12. 7. 00:40

    이전까지 큐테이블을 이용했지만 장애물이 움직이는 것과 같은경우 상태 개수와 행동갯수가 너무 커지는 경우에 

    큐테이블을 사용할 수 없음.

     

    더이상 직접 계산이 불가능하므로 인공신경망을 이용해서 모델이 스스로 학습하도록 한다.

     

    딥살사 = 살사알고리즘 + 인공신경망 이용

     

    원래 큐함수 업데이트 식

    이거 상태하나하나 계산하는거 불가능 하니까 -> 주황색 글씨만 알면 이걸 최소로 하는 모델을 만들면되겠다.

     

    딥살사에서 사용하는 오차함수 식

    이걸 최소로 하는 모델을 만들자 ! -> 경사하강법 이용 

     

     

    train.py

     

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    import copy
    import pylab
    import random
    import numpy as np
    from environment import Env
    import tensorflow as tf
    from tensorflow.keras.layers import Dense
    from tensorflow.keras.optimizers import Adam
     
     
    # 딥살사 인공신경망
    class DeepSARSA(tf.keras.Model):
        def __init__(self, action_size):
            super(DeepSARSA, self).__init__()
            self.fc1 = Dense(30, activation='relu'#은닉층1 : 30개의 노드 활성화함수 : relu ( 0보다 작은값은 0으로 ,0보다 큰값은 그대로)
            self.fc2 = Dense(30, activation='relu'#은닉층2 : 30개의 노드
            self.fc_out = Dense(action_size) #행동사이즈만큼 출력층의 노드가 정해짐 ( 큐함수를 인공신경망으로 근사시키는 거니까 ! 여기서는 5 )
     
        def call(self, x):
            x = self.fc1(x)
            x = self.fc2(x)
            q = self.fc_out(x)
            return q
     
     
    # 그리드월드 예제에서의 딥살사 에이전트 ( 에이전트 생성하는 함수 )
    class DeepSARSAgent:
        def __init__(self, state_size, action_size): # 생성자
            # 상태의 크기와 행동의 크기 정의
            self.state_size = state_size
            self.action_size = action_size
            
            # 딥살사 하이퍼 파라메터
            self.discount_factor = 0.99 #감가율 0.99 ( 1과 가까우므로 미래보상도 현재보상처럼 취급 )
            self.learning_rate = 0.001 # 학습률 0.001 ( 학습률이 매우 작으므로 현재것을 대부분 유지 )
            self.epsilon = 1.  #처음 입실론 =1 : 무조건 탐험한다
            self.epsilon_decay = .9999 # 0.9999 를 곱하면서 탐험률을 낮춘다
            self.epsilon_min = 0.01 #입실론의 최솟값은 0.01 로한다. 즉 0.01만큼은 최소한 탐험으로 남겨두겠다는 뜻
            self.model = DeepSARSA(self.action_size) # 인공신경망을 생성한다
            self.optimizer = Adam(lr=self.learning_rate)
     
        # 입실론 탐욕 정책으로 행동 선택
        def get_action(self, state):
            if np.random.rand() <= self.epsilon: # 랜덤값이 입실론보다 작으면 무작위 행동 선택
                return random.randrange(self.action_size)
            else# 랜덤값이 입실론보다 크면 제일 좋은 행동 선택
                q_values = self.model(state)
                return np.argmax(q_values[0])
     
        # <s, a, r, s', a'>의 샘플로부터 모델 업데이트
        def train_model(self, state, action, reward, next_state, next_action, done):
            if self.epsilon > self.epsilon_min:
                self.epsilon *= self.epsilon_decay
     
            # 학습 파라메터
            model_params = self.model.trainable_variables
            with tf.GradientTape() as tape:
                tape.watch(model_params)
                predict = self.model(state)[0]
                one_hot_action = tf.one_hot([action], self.action_size)
                predict = tf.reduce_sum(one_hot_action * predict, axis=1# 내가 계산한 예측값
     
                # done = True 일 경우 에피소드가 끝나서 다음 상태가 없음
                next_q = self.model(next_state)[0][next_action] #다음상태에서 다음 행동을 취했을때 q함수값
                target = reward + (1 - done) * self.discount_factor * next_q #타겟값 ( 정답 ) 단 done=True 인경우 target = reward
     
                # MSE 오류 함수 계산
                loss = tf.reduce_mean(tf.square(target - predict)) # 이 값이 최소가 되도록 모델을 발전시키면 된다
            
            # 오류함수를 줄이는 방향으로 모델 업데이트
            grads = tape.gradient(loss, model_params)
            self.optimizer.apply_gradients(zip(grads, model_params))
     
     
    if __name__ == "__main__":
        # 환경과 에이전트 생성
        env = Env(render_speed=0.01# env 객체 초기화
        state_size = 15 #장애물에 대한 상태값 총 4개 , 장애물이 총 3개라서 4*3= 12 개, 에이전트에 대한 도착점의 상대위치(x,y)2개, 도착지점의 라벨1개
        #총 15개의 상태를 가진다.
        action_space = [01234#상,하,우,좌,제자리 순서
        action_size = len(action_space) # 5 ( 윗줄에서 알수있음 )
        agent = DeepSARSAgent(state_size, action_size) #에이전트를 생성한다.
        
        scores, episodes = [], [] # score 와 에피소드 빈 리스트 초기화
     
        EPISODES = 1000
        for e in range(EPISODES): #에피소드 만큼
            done = False
            score = 0
            # env 초기화
            state = env.reset()
            state = np.reshape(state, [1, state_size]) #상태 list를 (1,15)의 numpy.array로 변환
     
            while not done:
                # 현재 상태에 대한 행동 선택
                action = agent.get_action(state)
     
                # 선택한 행동으로 환경에서 한 타임스텝 진행 후 샘플 수집
                next_state, reward, done = env.step(action)
                next_state = np.reshape(next_state, [1, state_size]) #다음 상태 list를 (1,15)의 numpy.array로 변환
                next_action = agent.get_action(next_state) #다음 상태에대해서 다음 행동을 구함
     
                # 샘플로 모델 학습
                agent.train_model(state, action, reward, next_state, 
                                    next_action, done) # 살사니까 다음상태의 다음 행동까지 다 고려해서 모델학습
                score += reward
                state = next_state
     
                if done:
                    # 에피소드마다 학습 결과 출력
                    print("episode: {:3d} | score: {:3d} | epsilon: {:.3f}".format(
                          e, score, agent.epsilon))
     
                    scores.append(score)
                    episodes.append(e)
                    pylab.plot(episodes, scores, 'b')
                    pylab.xlabel("episode")
                    pylab.ylabel("score")
                    pylab.savefig("./save_graph/graph.png")
     
     
            # 100 에피소드마다 모델 저장
            if e % 100 == 0:
                agent.model.save_weights('save_model/model', save_format='tf')
    cs

    test.py

     

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    import random
    import numpy as np
    from environment import Env
    import tensorflow as tf
    from tensorflow.keras.layers import Dense
     
     
    # 딥살사 인공신경망
    class DeepSARSA(tf.keras.Model):
        def __init__(self, action_size):
            super(DeepSARSA, self).__init__()
            self.fc1 = Dense(30, activation='relu')
            self.fc2 = Dense(30, activation='relu')
            self.fc_out = Dense(action_size)
     
        def call(self, x):
            x = self.fc1(x)
            x = self.fc2(x)
            q = self.fc_out(x)
            return q
     
     
    # 그리드월드 예제에서의 딥살사 에이전트
    class DeepSARSAgent:
        def __init__(self, state_size, action_size):
            # 상태의 크기와 행동의 크기 정의
            self.state_size = state_size
            self.action_size = action_size
     
            self.epsilon = 0.01
            self.model = DeepSARSA(self.action_size)
            self.model.load_weights('save_model/trained/model')
     
        # 입실론 탐욕 정책으로 행동 선택
        def get_action(self, state):
            if np.random.rand() <= self.epsilon:
                return random.randrange(self.action_size)
            else:
                q_values = self.model(state)
                return np.argmax(q_values[0])
     
     
    if __name__ == "__main__":
        # 환경과 에이전트 생성
        env = Env(render_speed=0.05)
        state_size = 15
        action_space = [01234]
        action_size = len(action_space)
        agent = DeepSARSAgent(state_size, action_size)
     
        scores, episodes = [], []
     
        EPISODES = 10
        for e in range(EPISODES):
            score = 0
            done = False
            # env 초기화
            state = env.reset()
            state = np.reshape(state, [1, state_size])
     
            while not done:
                # 현재 상태에 대한 행동 선택
                action = agent.get_action(state)
     
                # 선택한 행동으로 환경에서 한 타임스텝 진행 후 샘플 수집
                next_state, reward, done = env.step(action)
                next_state = np.reshape(next_state, [1, state_size])
     
                state = next_state
                score += reward
     
                if done:
                    # 에피소드마다 테스트 결과 출력
                    print("episode: {:3d} | score: {:3d}".format(e, score))
    cs

     

    environment.py

     

     

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    import time
    import numpy as np
    import tkinter as tk
    from PIL import ImageTk, Image
     
    PhotoImage = ImageTk.PhotoImage
    UNIT = 50  # 픽셀 수
    HEIGHT = 5  # 그리드 세로
    WIDTH = 5  # 그리드 가로
     
    np.random.seed(1)
     
     
    class Env(tk.Tk):
        def __init__(self, render_speed=0.01):
            super(Env, self).__init__()
            self.render_speed=render_speed
            self.action_space = ['u''d''l''r']
            self.action_size = len(self.action_space)
            self.title('DeepSARSA')
            self.geometry('{0}x{1}'.format(HEIGHT * UNIT, HEIGHT * UNIT))
            self.shapes = self.load_images()
            self.canvas = self._build_canvas()
            self.counter = 0
            self.rewards = []
            self.goal = []
            # 장애물 설정
            self.set_reward([01], -1)
            self.set_reward([12], -1)
            self.set_reward([23], -1)
            # 목표 지점 설정
            self.set_reward([44], 1)
     
        def _build_canvas(self):
            canvas = tk.Canvas(self, bg='white',
                               height=HEIGHT * UNIT,
                               width=WIDTH * UNIT)
            # 그리드 생성
            for c in range(0, WIDTH * UNIT, UNIT):  # 0~400 by 80
                x0, y0, x1, y1 = c, 0, c, HEIGHT * UNIT
                canvas.create_line(x0, y0, x1, y1)
            for r in range(0, HEIGHT * UNIT, UNIT):  # 0~400 by 80
                x0, y0, x1, y1 = 0, r, HEIGHT * UNIT, r
                canvas.create_line(x0, y0, x1, y1)
     
            self.rewards = []
            self.goal = []
            # 캔버스에 이미지 추가
            x, y = UNIT/2, UNIT/2
            self.rectangle = canvas.create_image(x, y, image=self.shapes[0])
     
            canvas.pack()
     
            return canvas
     
        def load_images(self):
            rectangle = PhotoImage(
                Image.open("../img/rectangle.png").resize((3030)))
            triangle = PhotoImage(
                Image.open("../img/triangle.png").resize((3030)))
            circle = PhotoImage(
                Image.open("../img/circle.png").resize((3030)))
     
            return rectangle, triangle, circle
     
        def reset_reward(self): # 보상 초기화 해주는 함수
     
            for reward in self.rewards:
                self.canvas.delete(reward['figure'])
     
            self.rewards.clear()
            self.goal.clear()
            self.set_reward([01], -1# (0,1) 자리에 장애물
            self.set_reward([12], -1# (1,2) 자리에 장애물
            self.set_reward([23], -1# (2,3) 자리에 장애물
     
            # #goal
            self.set_reward([44], 1# (4,4) 자리는 goal 지점
     
        def set_reward(self, state, reward): # 보상 다시 초기화
            state = [int(state[0]), int(state[1])] #위치를 list 형식으로 저장
            x = int(state[0])
            y = int(state[1])
            temp = {}
            if reward > 0# 즉 goal 이면
                temp['reward'= reward
                temp['figure'= self.canvas.create_image((UNIT * x) + UNIT / 2,
                                                           (UNIT * y) + UNIT / 2,
                                                           image=self.shapes[2])
     
                self.goal.append(temp['figure'])
     
     
            elif reward < 0# 즉 장애물이면
                temp['direction'= -1
                temp['reward'= reward
                temp['figure'= self.canvas.create_image((UNIT * x) + UNIT / 2,
                                                          (UNIT * y) + UNIT / 2,
                                                          image=self.shapes[1])
     
            temp['coords'= self.canvas.coords(temp['figure'])
            temp['state'= state
            self.rewards.append(temp)
     
        # 상태를 매개변수르 받아 목표지점에 도착할수있는지 확인하는 함수
        def check_if_reward(self, state):
            check_list = dict()
            check_list['if_goal'= False
            rewards = 0
     
            for reward in self.rewards: #장애물3개, 목표지점 1개에 대해서
                if reward['state'== state: # 장애물 혹은 목표지점에 도달한다면
                    rewards += reward['reward'# 보상받기
                    if reward['reward'== 1#goal 에 도착시
                        check_list['if_goal'= True
     
            check_list['rewards'= rewards
     
            return check_list
     
        def coords_to_state(self, coords):
            x = int((coords[0- UNIT / 2/ UNIT)
            y = int((coords[1- UNIT / 2/ UNIT)
            return [x, y]
     
        def reset(self):
            self.update()
            time.sleep(0.5)
            x, y = self.canvas.coords(self.rectangle)
            self.canvas.move(self.rectangle, UNIT / 2 - x, UNIT / 2 - y)
            self.reset_reward()
            return self.get_state()
     
        def step(self, action):
            self.counter += 1 # counter = time step 진행횟수
            self.render()
     
            if self.counter % 2 == 1# 카운터가 홀수면 ( 즉 2번마다 한번씩 ) reward 계산
                self.rewards = self.move_rewards()
     
            next_coords = self.move(self.rectangle, action) #액션을 통해 움진인다
            check = self.check_if_reward(self.coords_to_state(next_coords))
            done = check['if_goal']
            reward = check['rewards']
     
            self.canvas.tag_raise(self.rectangle) # 행동에 따라 이동된 rectangle 을 canvas에 top level 에 표시
     
     
            s_ = self.get_state()
     
            return s_, reward, done
     
        def get_state(self): # 정보를 합해서 상태로 저장하는 함수
     
            location = self.coords_to_state(self.canvas.coords(self.rectangle))
            agent_x = location[0#에이전트 x좌표
            agent_y = location[1#에이전트 y좌표
     
            states = list()
     
            for reward in self.rewards:
                reward_location = reward['state']
                states.append(reward_location[0- agent_x) #상대위치계산
                states.append(reward_location[1- agent_y) #상대위치계산
                if reward['reward'< 0#장애물을 만나면
                    states.append(-1# reward = -1
                    states.append(reward['direction']) # direction = -1
                else:
                    states.append(1# reward = +1
     
            return states
     
        def move_rewards(self):
            new_rewards = []
            for temp in self.rewards:
                if temp['reward'== 1:
                    new_rewards.append(temp)
                    continue
                temp['coords'= self.move_const(temp)
                temp['state'= self.coords_to_state(temp['coords'])
                new_rewards.append(temp)
            return new_rewards
     
        def move_const(self, target):
     
            s = self.canvas.coords(target['figure'])
     
            base_action = np.array([00])
     
            if s[0== (WIDTH - 1* UNIT + UNIT / 2:
                target['direction'= 1
            elif s[0== UNIT / 2:
                target['direction'= -1
     
            if target['direction'== -1:
                base_action[0+= UNIT
            elif target['direction'== 1:
                base_action[0-= UNIT
     
            if (target['figure'is not self.rectangle
               and s == [(WIDTH - 1* UNIT, (HEIGHT - 1* UNIT]):
                base_action = np.array([00])
     
            self.canvas.move(target['figure'], base_action[0], base_action[1])
     
            s_ = self.canvas.coords(target['figure'])
     
            return s_
     
        def move(self, target, action): # 행동에 따라 행동하고 그 위치를 반환하는 함수
            s = self.canvas.coords(target) # rectangle 의 위치르 받아서
     
            base_action = np.array([00])
     
            if action == 0:  # 상
                if s[1> UNIT: # 위로움직였는데 끝이아니면 y좌표 -50픽셀
                    base_action[1-= UNIT
            elif action == 1:  # 하
                if s[1< (HEIGHT - 1* UNIT: # 아래로움직였는데 끝이아니면 y좌표 +50픽셀
                    base_action[1+= UNIT
            elif action == 2:  # 우
                if s[0< (WIDTH - 1* UNIT:
                    base_action[0+= UNIT
            elif action == 3:  # 좌
                if s[0> UNIT:
                    base_action[0-= UNIT
     
            self.canvas.move(target, base_action[0], base_action[1]) #이동
     
            s_ = self.canvas.coords(target)
     
            return s_
     
        def render(self):
            # 게임 속도 조정
            time.sleep(self.render_speed)
            self.update()
     
    cs

    댓글

Designed by Tistory.