IT&컴퓨터공학/딥러닝

딥살사 코드 - 딥러닝의 시작

yan_z 2020. 12. 7. 00:40

이전까지 큐테이블을 이용했지만 장애물이 움직이는 것과 같은경우 상태 개수와 행동갯수가 너무 커지는 경우에 

큐테이블을 사용할 수 없음.

 

더이상 직접 계산이 불가능하므로 인공신경망을 이용해서 모델이 스스로 학습하도록 한다.

 

딥살사 = 살사알고리즘 + 인공신경망 이용

 

원래 큐함수 업데이트 식

이거 상태하나하나 계산하는거 불가능 하니까 -> 주황색 글씨만 알면 이걸 최소로 하는 모델을 만들면되겠다.

 

딥살사에서 사용하는 오차함수 식

이걸 최소로 하는 모델을 만들자 ! -> 경사하강법 이용 

 

 

train.py

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import copy
import pylab
import random
import numpy as np
from environment import Env
import tensorflow as tf
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
 
 
# 딥살사 인공신경망
class DeepSARSA(tf.keras.Model):
    def __init__(self, action_size):
        super(DeepSARSA, self).__init__()
        self.fc1 = Dense(30, activation='relu'#은닉층1 : 30개의 노드 활성화함수 : relu ( 0보다 작은값은 0으로 ,0보다 큰값은 그대로)
        self.fc2 = Dense(30, activation='relu'#은닉층2 : 30개의 노드
        self.fc_out = Dense(action_size) #행동사이즈만큼 출력층의 노드가 정해짐 ( 큐함수를 인공신경망으로 근사시키는 거니까 ! 여기서는 5 )
 
    def call(self, x):
        x = self.fc1(x)
        x = self.fc2(x)
        q = self.fc_out(x)
        return q
 
 
# 그리드월드 예제에서의 딥살사 에이전트 ( 에이전트 생성하는 함수 )
class DeepSARSAgent:
    def __init__(self, state_size, action_size): # 생성자
        # 상태의 크기와 행동의 크기 정의
        self.state_size = state_size
        self.action_size = action_size
        
        # 딥살사 하이퍼 파라메터
        self.discount_factor = 0.99 #감가율 0.99 ( 1과 가까우므로 미래보상도 현재보상처럼 취급 )
        self.learning_rate = 0.001 # 학습률 0.001 ( 학습률이 매우 작으므로 현재것을 대부분 유지 )
        self.epsilon = 1.  #처음 입실론 =1 : 무조건 탐험한다
        self.epsilon_decay = .9999 # 0.9999 를 곱하면서 탐험률을 낮춘다
        self.epsilon_min = 0.01 #입실론의 최솟값은 0.01 로한다. 즉 0.01만큼은 최소한 탐험으로 남겨두겠다는 뜻
        self.model = DeepSARSA(self.action_size) # 인공신경망을 생성한다
        self.optimizer = Adam(lr=self.learning_rate)
 
    # 입실론 탐욕 정책으로 행동 선택
    def get_action(self, state):
        if np.random.rand() <= self.epsilon: # 랜덤값이 입실론보다 작으면 무작위 행동 선택
            return random.randrange(self.action_size)
        else# 랜덤값이 입실론보다 크면 제일 좋은 행동 선택
            q_values = self.model(state)
            return np.argmax(q_values[0])
 
    # <s, a, r, s', a'>의 샘플로부터 모델 업데이트
    def train_model(self, state, action, reward, next_state, next_action, done):
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
 
        # 학습 파라메터
        model_params = self.model.trainable_variables
        with tf.GradientTape() as tape:
            tape.watch(model_params)
            predict = self.model(state)[0]
            one_hot_action = tf.one_hot([action], self.action_size)
            predict = tf.reduce_sum(one_hot_action * predict, axis=1# 내가 계산한 예측값
 
            # done = True 일 경우 에피소드가 끝나서 다음 상태가 없음
            next_q = self.model(next_state)[0][next_action] #다음상태에서 다음 행동을 취했을때 q함수값
            target = reward + (1 - done) * self.discount_factor * next_q #타겟값 ( 정답 ) 단 done=True 인경우 target = reward
 
            # MSE 오류 함수 계산
            loss = tf.reduce_mean(tf.square(target - predict)) # 이 값이 최소가 되도록 모델을 발전시키면 된다
        
        # 오류함수를 줄이는 방향으로 모델 업데이트
        grads = tape.gradient(loss, model_params)
        self.optimizer.apply_gradients(zip(grads, model_params))
 
 
if __name__ == "__main__":
    # 환경과 에이전트 생성
    env = Env(render_speed=0.01# env 객체 초기화
    state_size = 15 #장애물에 대한 상태값 총 4개 , 장애물이 총 3개라서 4*3= 12 개, 에이전트에 대한 도착점의 상대위치(x,y)2개, 도착지점의 라벨1개
    #총 15개의 상태를 가진다.
    action_space = [01234#상,하,우,좌,제자리 순서
    action_size = len(action_space) # 5 ( 윗줄에서 알수있음 )
    agent = DeepSARSAgent(state_size, action_size) #에이전트를 생성한다.
    
    scores, episodes = [], [] # score 와 에피소드 빈 리스트 초기화
 
    EPISODES = 1000
    for e in range(EPISODES): #에피소드 만큼
        done = False
        score = 0
        # env 초기화
        state = env.reset()
        state = np.reshape(state, [1, state_size]) #상태 list를 (1,15)의 numpy.array로 변환
 
        while not done:
            # 현재 상태에 대한 행동 선택
            action = agent.get_action(state)
 
            # 선택한 행동으로 환경에서 한 타임스텝 진행 후 샘플 수집
            next_state, reward, done = env.step(action)
            next_state = np.reshape(next_state, [1, state_size]) #다음 상태 list를 (1,15)의 numpy.array로 변환
            next_action = agent.get_action(next_state) #다음 상태에대해서 다음 행동을 구함
 
            # 샘플로 모델 학습
            agent.train_model(state, action, reward, next_state, 
                                next_action, done) # 살사니까 다음상태의 다음 행동까지 다 고려해서 모델학습
            score += reward
            state = next_state
 
            if done:
                # 에피소드마다 학습 결과 출력
                print("episode: {:3d} | score: {:3d} | epsilon: {:.3f}".format(
                      e, score, agent.epsilon))
 
                scores.append(score)
                episodes.append(e)
                pylab.plot(episodes, scores, 'b')
                pylab.xlabel("episode")
                pylab.ylabel("score")
                pylab.savefig("./save_graph/graph.png")
 
 
        # 100 에피소드마다 모델 저장
        if e % 100 == 0:
            agent.model.save_weights('save_model/model', save_format='tf')
cs

test.py

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import random
import numpy as np
from environment import Env
import tensorflow as tf
from tensorflow.keras.layers import Dense
 
 
# 딥살사 인공신경망
class DeepSARSA(tf.keras.Model):
    def __init__(self, action_size):
        super(DeepSARSA, self).__init__()
        self.fc1 = Dense(30, activation='relu')
        self.fc2 = Dense(30, activation='relu')
        self.fc_out = Dense(action_size)
 
    def call(self, x):
        x = self.fc1(x)
        x = self.fc2(x)
        q = self.fc_out(x)
        return q
 
 
# 그리드월드 예제에서의 딥살사 에이전트
class DeepSARSAgent:
    def __init__(self, state_size, action_size):
        # 상태의 크기와 행동의 크기 정의
        self.state_size = state_size
        self.action_size = action_size
 
        self.epsilon = 0.01
        self.model = DeepSARSA(self.action_size)
        self.model.load_weights('save_model/trained/model')
 
    # 입실론 탐욕 정책으로 행동 선택
    def get_action(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        else:
            q_values = self.model(state)
            return np.argmax(q_values[0])
 
 
if __name__ == "__main__":
    # 환경과 에이전트 생성
    env = Env(render_speed=0.05)
    state_size = 15
    action_space = [01234]
    action_size = len(action_space)
    agent = DeepSARSAgent(state_size, action_size)
 
    scores, episodes = [], []
 
    EPISODES = 10
    for e in range(EPISODES):
        score = 0
        done = False
        # env 초기화
        state = env.reset()
        state = np.reshape(state, [1, state_size])
 
        while not done:
            # 현재 상태에 대한 행동 선택
            action = agent.get_action(state)
 
            # 선택한 행동으로 환경에서 한 타임스텝 진행 후 샘플 수집
            next_state, reward, done = env.step(action)
            next_state = np.reshape(next_state, [1, state_size])
 
            state = next_state
            score += reward
 
            if done:
                # 에피소드마다 테스트 결과 출력
                print("episode: {:3d} | score: {:3d}".format(e, score))
cs

 

environment.py

 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
import time
import numpy as np
import tkinter as tk
from PIL import ImageTk, Image
 
PhotoImage = ImageTk.PhotoImage
UNIT = 50  # 픽셀 수
HEIGHT = 5  # 그리드 세로
WIDTH = 5  # 그리드 가로
 
np.random.seed(1)
 
 
class Env(tk.Tk):
    def __init__(self, render_speed=0.01):
        super(Env, self).__init__()
        self.render_speed=render_speed
        self.action_space = ['u''d''l''r']
        self.action_size = len(self.action_space)
        self.title('DeepSARSA')
        self.geometry('{0}x{1}'.format(HEIGHT * UNIT, HEIGHT * UNIT))
        self.shapes = self.load_images()
        self.canvas = self._build_canvas()
        self.counter = 0
        self.rewards = []
        self.goal = []
        # 장애물 설정
        self.set_reward([01], -1)
        self.set_reward([12], -1)
        self.set_reward([23], -1)
        # 목표 지점 설정
        self.set_reward([44], 1)
 
    def _build_canvas(self):
        canvas = tk.Canvas(self, bg='white',
                           height=HEIGHT * UNIT,
                           width=WIDTH * UNIT)
        # 그리드 생성
        for c in range(0, WIDTH * UNIT, UNIT):  # 0~400 by 80
            x0, y0, x1, y1 = c, 0, c, HEIGHT * UNIT
            canvas.create_line(x0, y0, x1, y1)
        for r in range(0, HEIGHT * UNIT, UNIT):  # 0~400 by 80
            x0, y0, x1, y1 = 0, r, HEIGHT * UNIT, r
            canvas.create_line(x0, y0, x1, y1)
 
        self.rewards = []
        self.goal = []
        # 캔버스에 이미지 추가
        x, y = UNIT/2, UNIT/2
        self.rectangle = canvas.create_image(x, y, image=self.shapes[0])
 
        canvas.pack()
 
        return canvas
 
    def load_images(self):
        rectangle = PhotoImage(
            Image.open("../img/rectangle.png").resize((3030)))
        triangle = PhotoImage(
            Image.open("../img/triangle.png").resize((3030)))
        circle = PhotoImage(
            Image.open("../img/circle.png").resize((3030)))
 
        return rectangle, triangle, circle
 
    def reset_reward(self): # 보상 초기화 해주는 함수
 
        for reward in self.rewards:
            self.canvas.delete(reward['figure'])
 
        self.rewards.clear()
        self.goal.clear()
        self.set_reward([01], -1# (0,1) 자리에 장애물
        self.set_reward([12], -1# (1,2) 자리에 장애물
        self.set_reward([23], -1# (2,3) 자리에 장애물
 
        # #goal
        self.set_reward([44], 1# (4,4) 자리는 goal 지점
 
    def set_reward(self, state, reward): # 보상 다시 초기화
        state = [int(state[0]), int(state[1])] #위치를 list 형식으로 저장
        x = int(state[0])
        y = int(state[1])
        temp = {}
        if reward > 0# 즉 goal 이면
            temp['reward'= reward
            temp['figure'= self.canvas.create_image((UNIT * x) + UNIT / 2,
                                                       (UNIT * y) + UNIT / 2,
                                                       image=self.shapes[2])
 
            self.goal.append(temp['figure'])
 
 
        elif reward < 0# 즉 장애물이면
            temp['direction'= -1
            temp['reward'= reward
            temp['figure'= self.canvas.create_image((UNIT * x) + UNIT / 2,
                                                      (UNIT * y) + UNIT / 2,
                                                      image=self.shapes[1])
 
        temp['coords'= self.canvas.coords(temp['figure'])
        temp['state'= state
        self.rewards.append(temp)
 
    # 상태를 매개변수르 받아 목표지점에 도착할수있는지 확인하는 함수
    def check_if_reward(self, state):
        check_list = dict()
        check_list['if_goal'= False
        rewards = 0
 
        for reward in self.rewards: #장애물3개, 목표지점 1개에 대해서
            if reward['state'== state: # 장애물 혹은 목표지점에 도달한다면
                rewards += reward['reward'# 보상받기
                if reward['reward'== 1#goal 에 도착시
                    check_list['if_goal'= True
 
        check_list['rewards'= rewards
 
        return check_list
 
    def coords_to_state(self, coords):
        x = int((coords[0- UNIT / 2/ UNIT)
        y = int((coords[1- UNIT / 2/ UNIT)
        return [x, y]
 
    def reset(self):
        self.update()
        time.sleep(0.5)
        x, y = self.canvas.coords(self.rectangle)
        self.canvas.move(self.rectangle, UNIT / 2 - x, UNIT / 2 - y)
        self.reset_reward()
        return self.get_state()
 
    def step(self, action):
        self.counter += 1 # counter = time step 진행횟수
        self.render()
 
        if self.counter % 2 == 1# 카운터가 홀수면 ( 즉 2번마다 한번씩 ) reward 계산
            self.rewards = self.move_rewards()
 
        next_coords = self.move(self.rectangle, action) #액션을 통해 움진인다
        check = self.check_if_reward(self.coords_to_state(next_coords))
        done = check['if_goal']
        reward = check['rewards']
 
        self.canvas.tag_raise(self.rectangle) # 행동에 따라 이동된 rectangle 을 canvas에 top level 에 표시
 
 
        s_ = self.get_state()
 
        return s_, reward, done
 
    def get_state(self): # 정보를 합해서 상태로 저장하는 함수
 
        location = self.coords_to_state(self.canvas.coords(self.rectangle))
        agent_x = location[0#에이전트 x좌표
        agent_y = location[1#에이전트 y좌표
 
        states = list()
 
        for reward in self.rewards:
            reward_location = reward['state']
            states.append(reward_location[0- agent_x) #상대위치계산
            states.append(reward_location[1- agent_y) #상대위치계산
            if reward['reward'< 0#장애물을 만나면
                states.append(-1# reward = -1
                states.append(reward['direction']) # direction = -1
            else:
                states.append(1# reward = +1
 
        return states
 
    def move_rewards(self):
        new_rewards = []
        for temp in self.rewards:
            if temp['reward'== 1:
                new_rewards.append(temp)
                continue
            temp['coords'= self.move_const(temp)
            temp['state'= self.coords_to_state(temp['coords'])
            new_rewards.append(temp)
        return new_rewards
 
    def move_const(self, target):
 
        s = self.canvas.coords(target['figure'])
 
        base_action = np.array([00])
 
        if s[0== (WIDTH - 1* UNIT + UNIT / 2:
            target['direction'= 1
        elif s[0== UNIT / 2:
            target['direction'= -1
 
        if target['direction'== -1:
            base_action[0+= UNIT
        elif target['direction'== 1:
            base_action[0-= UNIT
 
        if (target['figure'is not self.rectangle
           and s == [(WIDTH - 1* UNIT, (HEIGHT - 1* UNIT]):
            base_action = np.array([00])
 
        self.canvas.move(target['figure'], base_action[0], base_action[1])
 
        s_ = self.canvas.coords(target['figure'])
 
        return s_
 
    def move(self, target, action): # 행동에 따라 행동하고 그 위치를 반환하는 함수
        s = self.canvas.coords(target) # rectangle 의 위치르 받아서
 
        base_action = np.array([00])
 
        if action == 0:  # 상
            if s[1> UNIT: # 위로움직였는데 끝이아니면 y좌표 -50픽셀
                base_action[1-= UNIT
        elif action == 1:  # 하
            if s[1< (HEIGHT - 1* UNIT: # 아래로움직였는데 끝이아니면 y좌표 +50픽셀
                base_action[1+= UNIT
        elif action == 2:  # 우
            if s[0< (WIDTH - 1* UNIT:
                base_action[0+= UNIT
        elif action == 3:  # 좌
            if s[0> UNIT:
                base_action[0-= UNIT
 
        self.canvas.move(target, base_action[0], base_action[1]) #이동
 
        s_ = self.canvas.coords(target)
 
        return s_
 
    def render(self):
        # 게임 속도 조정
        time.sleep(self.render_speed)
        self.update()
 
cs