IT&컴퓨터공학/딥러닝

폴리시 그레이디언트

yan_z 2020. 12. 7. 01:08

정책기반 강화학습 - 여기서 인공신경망은 정책신경망이라고 함 ( 때문에 출력층의 활성함수는 'softmax' 를 이용 - 합해서 1이 나와야 하므로)

 

 

 

train.py

 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import copy
import pylab
import random
import numpy as np
from environment import Env
import tensorflow as tf
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
 
 
# 상태가 입력, 각 행동의 확률이 출력인 인공신경망 생성
class REINFORCE(tf.keras.Model):
    def __init__(self, action_size):
        super(REINFORCE, self).__init__()
        self.fc1 = Dense(24, activation='relu')
        self.fc2 = Dense(24, activation='relu')
        self.fc_out = Dense(action_size, activation='softmax')
 
    def call(self, x):
        x = self.fc1(x)
        x = self.fc2(x)
        policy = self.fc_out(x)
        return policy
 
 
# 그리드월드 예제에서의 REINFORCE 에이전트 ( 에이전트 생성하는 함수 )
class REINFORCEAgent:
    def __init__(self, state_size, action_size):
        # 상태의 크기와 행동의 크기 정의
        self.state_size = state_size
        self.action_size = action_size
        
        # REINFORCE 하이퍼 파라메터
        self.discount_factor = 0.99 #감가율 0.99
        self.learning_rate = 0.001 #학습률 0.001
 
        self.model = REINFORCE(self.action_size) #정책신경망 생성
        self.optimizer = Adam(lr=self.learning_rate)
        self.states, self.actions, self.rewards = [], [], []
 
    # 정책신경망으로 행동 선택 ( 정책 자체가 확률적이라서 탐욕정책이 필요하지않다 )
    def get_action(self, state):
        policy = self.model(state)[0]
        policy = np.array(policy)
        return np.random.choice(self.action_size, 1, p=policy)[0]
 
    # 반환값 계산
    def discount_rewards(self, rewards):
        discounted_rewards = np.zeros_like(rewards)
        running_add = 0
        for t in reversed(range(0len(rewards))):
            running_add = running_add * self.discount_factor + rewards[t]
            discounted_rewards[t] = running_add
        return discounted_rewards
 
    # 한 에피소드 동안의 상태, 행동, 보상을 저장
    def append_sample(self, state, action, reward):
        self.states.append(state[0])
        self.rewards.append(reward)
        act = np.zeros(self.action_size)
        act[action] = 1
        self.actions.append(act)
 
    # 정책신경망 업데이트
    def train_model(self):
        #Z-score 방법으로 표준화방법으로 정규화함 -> 정책신경망 업데이트 성능이 좋아진다.
        discounted_rewards = np.float32(self.discount_rewards(self.rewards)) #보상을 discount_rewards 함수를 통해 반환 값을 return하고 반환 값을 numpy.float32형식으로 변환
        discounted_rewards -= np.mean(discounted_rewards) # 모집단의 평균을 뺀후에
        discounted_rewards /= np.std(discounted_rewards) #표준편차로 나눠준다.
        
        # 크로스 엔트로피 오류함수 계산
        model_params = self.model.trainable_variables
        with tf.GradientTape() as tape:
            tape.watch(model_params)
            policies = self.model(np.array(self.states))
            actions = np.array(self.actions)
            action_prob = tf.reduce_sum(actions * policies, axis=1)
            cross_entropy = - tf.math.log(action_prob + 1e-5#"크로스 엔트로피 구한다
            loss = tf.reduce_sum(cross_entropy * discounted_rewards)
            entropy = - policies * tf.math.log(policies)
 
        # 오류함수를 줄이는 방향으로 모델 업데이트
        grads = tape.gradient(loss, model_params)
        self.optimizer.apply_gradients(zip(grads, model_params))
        self.states, self.actions, self.rewards = [], [], []
        return np.mean(entropy)
 
 
if __name__ == "__main__":
    # 환경과 에이전트 생성
    env = Env(render_speed=0.01)
    state_size = 15 # 상태 15개 ( 딥살사와 같음. 장애물 당 4개의 변수 , 장애물 3개 ,나머지 3개)
    action_space = [01234# 상,하,우,좌,제자리 순서
    action_size = len(action_space) # 5
    agent = REINFORCEAgent(state_size, action_size) #에이전트를 생성한다
 
    scores, episodes = [], []
 
    EPISODES = 200
    for e in range(EPISODES):
        done = False
        score = 0
        # env 초기화
        state = env.reset()
        state = np.reshape(state, [1, state_size]) #상태 list를 (1,15)의 numpy.array로 변환
 
        while not done:
            # 현재 상태에 대한 행동 선택
            action = agent.get_action(state)
 
            # 선택한 행동으로 환경에서 한 타임스텝 진행 후 샘플 수집
            next_state, reward, done = env.step(action)
            next_state = np.reshape(next_state, [1, state_size]) #다음상태 list를 (1,15)의 numpy.array로 변환
 
            agent.append_sample(state, action, reward)
            score += reward
 
            state = next_state
 
            if done:
                # 에피소드마다 정책신경망 업데이트
                entropy = agent.train_model()
                # 에피소드마다 학습 결과 출력
                print("episode: {:3d} | score: {:3d} | entropy: {:.3f}".format(
                      e, score, entropy))
 
                scores.append(score)
                episodes.append(e)
                pylab.plot(episodes, scores, 'b')
                pylab.xlabel("episode")
                pylab.ylabel("score")
                pylab.savefig("./save_graph/graph.png")
                
 
        # 100 에피소드마다 모델 저장
        if e % 100 == 0:
            agent.model.save_weights('save_model/model', save_format='tf')
cs

 

test.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import copy
import pylab
import random
import numpy as np
from environment import Env
import tensorflow as tf
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
 
 
# 상태가 입력, 각 행동의 확률이 출력인 인공신경망 생성
class REINFORCE(tf.keras.Model):
    def __init__(self, action_size):
        super(REINFORCE, self).__init__()
        self.fc1 = Dense(24, activation='relu')
        self.fc2 = Dense(24, activation='relu')
        self.fc_out = Dense(action_size, activation='softmax')
 
    def call(self, x):
        x = self.fc1(x)
        x = self.fc2(x)
        policy = self.fc_out(x)
        return policy
 
 
# 그리드월드 예제에서의 REINFORCE 에이전트
class REINFORCEAgent:
    def __init__(self, state_size, action_size):
        # 상태의 크기와 행동의 크기 정의
        self.state_size = state_size
        self.action_size = action_size
 
        self.model = REINFORCE(self.action_size)
        self.model.load_weights('save_model/trained/model')
 
    # 정책신경망으로 행동 선택
    def get_action(self, state):
        policy = self.model(state)[0]
        policy = np.array(policy)
        return np.random.choice(self.action_size, 1, p=policy)[0]
 
 
if __name__ == "__main__":
    # 환경과 에이전트 생성
    env = Env(render_speed=0.05)
    state_size = 15
    action_space = [01234]
    action_size = len(action_space)
    agent = REINFORCEAgent(state_size, action_size)
 
    EPISODES = 10
    for e in range(EPISODES):
        done = False
        score = 0
        # env 초기화
        state = env.reset()
        state = np.reshape(state, [1, state_size])
 
        while not done:
            # 현재 상태에 대한 행동 선택
            action = agent.get_action(state)
 
            # 선택한 행동으로 환경에서 한 타임스텝 진행 후 샘플 수집
            next_state, reward, done = env.step(action)
            next_state = np.reshape(next_state, [1, state_size])
 
            score += reward
 
            state = next_state
 
            if done:
                print("episode: {:3d} | score: {:3d}".format(e, score))
cs

 

environment.py

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
import time
import numpy as np
import tkinter as tk
from PIL import ImageTk, Image
 
PhotoImage = ImageTk.PhotoImage
UNIT = 50  # 픽셀 수
HEIGHT = 5  # 그리드 세로
WIDTH = 5  # 그리드 가로
 
np.random.seed(1)
 
 
class Env(tk.Tk):
    def __init__(self, render_speed=0.01):
        super(Env, self).__init__()
        self.render_speed=render_speed
        self.action_space = ['u''d''l''r']
        self.action_size = len(self.action_space)
        self.title('REINFORCE')
        self.geometry('{0}x{1}'.format(HEIGHT * UNIT, HEIGHT * UNIT))
        self.shapes = self.load_images()
        self.canvas = self._build_canvas()
        self.counter = 0
        self.rewards = []
        self.goal = []
        # 장애물 설정
        self.set_reward([01], -1)
        self.set_reward([12], -1)
        self.set_reward([23], -1)
        # 목표 지점 설정
        self.set_reward([44], 1)
 
    def _build_canvas(self):
        canvas = tk.Canvas(self, bg='white',
                           height=HEIGHT * UNIT,
                           width=WIDTH * UNIT)
        # 그리드 생성
        for c in range(0, WIDTH * UNIT, UNIT):  # 0~400 by 80
            x0, y0, x1, y1 = c, 0, c, HEIGHT * UNIT
            canvas.create_line(x0, y0, x1, y1)
        for r in range(0, HEIGHT * UNIT, UNIT):  # 0~400 by 80
            x0, y0, x1, y1 = 0, r, HEIGHT * UNIT, r
            canvas.create_line(x0, y0, x1, y1)
 
        self.rewards = []
        self.goal = []
        # 캔버스에 이미지 추가
        x, y = UNIT/2, UNIT/2
        self.rectangle = canvas.create_image(x, y, image=self.shapes[0])
 
        canvas.pack()
 
        return canvas
 
    def load_images(self):
        rectangle = PhotoImage(
            Image.open("../img/rectangle.png").resize((3030)))
        triangle = PhotoImage(
            Image.open("../img/triangle.png").resize((3030)))
        circle = PhotoImage(
            Image.open("../img/circle.png").resize((3030)))
 
        return rectangle, triangle, circle
 
    def reset_reward(self):
 
        for reward in self.rewards:
            self.canvas.delete(reward['figure'])
 
        self.rewards.clear()
        self.goal.clear()
        self.set_reward([01], -1)
        self.set_reward([12], -1)
        self.set_reward([23], -1)
 
        # #goal
        self.set_reward([44], 1)
 
    def set_reward(self, state, reward):
        state = [int(state[0]), int(state[1])]
        x = int(state[0])
        y = int(state[1])
        temp = {}
        if reward > 0:
            temp['reward'= reward
            temp['figure'= self.canvas.create_image((UNIT * x) + UNIT / 2,
                                                       (UNIT * y) + UNIT / 2,
                                                       image=self.shapes[2])
 
            self.goal.append(temp['figure'])
 
 
        elif reward < 0:
            temp['direction'= -1
            temp['reward'= reward
            temp['figure'= self.canvas.create_image((UNIT * x) + UNIT / 2,
                                                      (UNIT * y) + UNIT / 2,
                                                      image=self.shapes[1])
 
        temp['coords'= self.canvas.coords(temp['figure'])
        temp['state'= state
        self.rewards.append(temp)
 
    # new methods
    def check_if_reward(self, state):
        check_list = dict()
        check_list['if_goal'= False
        rewards = 0
 
        for reward in self.rewards:
            if reward['state'== state:
                rewards += reward['reward']
                if reward['reward'== 1:
                    check_list['if_goal'= True
 
        check_list['rewards'= rewards
 
        return check_list
 
    def coords_to_state(self, coords):
        x = int((coords[0- UNIT / 2/ UNIT)
        y = int((coords[1- UNIT / 2/ UNIT)
        return [x, y]
 
    def reset(self):
        self.update()
        time.sleep(0.5)
        x, y = self.canvas.coords(self.rectangle)
        self.canvas.move(self.rectangle, UNIT / 2 - x, UNIT / 2 - y)
        self.reset_reward()
        return self.get_state()
 
    def step(self, action):
        self.counter += 1
        self.render()
 
        if self.counter % 2 == 1:
            self.rewards = self.move_rewards()
 
        next_coords = self.move(self.rectangle, action)
        check = self.check_if_reward(self.coords_to_state(next_coords))
        done = check['if_goal']
        reward = check['rewards']
 
        self.canvas.tag_raise(self.rectangle)
 
        s_ = self.get_state()
 
        return s_, reward, done
 
    def get_state(self):
 
        location = self.coords_to_state(self.canvas.coords(self.rectangle))
        agent_x = location[0]
        agent_y = location[1]
 
        states = list()
 
        for reward in self.rewards:
            reward_location = reward['state']
            states.append(reward_location[0- agent_x)
            states.append(reward_location[1- agent_y)
            if reward['reward'< 0:
                states.append(-1)
                states.append(reward['direction'])
            else:
                states.append(1)
 
        return states
 
    def move_rewards(self):
        new_rewards = []
        for temp in self.rewards:
            if temp['reward'== 1:
                new_rewards.append(temp)
                continue
            temp['coords'= self.move_const(temp)
            temp['state'= self.coords_to_state(temp['coords'])
            new_rewards.append(temp)
        return new_rewards
 
    def move_const(self, target):
 
        s = self.canvas.coords(target['figure'])
 
        base_action = np.array([00])
 
        if s[0== (WIDTH - 1* UNIT + UNIT / 2:
            target['direction'= 1
        elif s[0== UNIT / 2:
            target['direction'= -1
 
        if target['direction'== -1:
            base_action[0+= UNIT
        elif target['direction'== 1:
            base_action[0-= UNIT
 
        if (target['figure'is not self.rectangle
           and s == [(WIDTH - 1* UNIT, (HEIGHT - 1* UNIT]):
            base_action = np.array([00])
 
        self.canvas.move(target['figure'], base_action[0], base_action[1])
 
        s_ = self.canvas.coords(target['figure'])
 
        return s_
 
    def move(self, target, action):
        s = self.canvas.coords(target)
 
        base_action = np.array([00])
 
        if action == 0:  # 상
            if s[1> UNIT:
                base_action[1-= UNIT
        elif action == 1:  # 하
            if s[1< (HEIGHT - 1* UNIT:
                base_action[1+= UNIT
        elif action == 2:  # 우
            if s[0< (WIDTH - 1* UNIT:
                base_action[0+= UNIT
        elif action == 3:  # 좌
            if s[0> UNIT:
                base_action[0-= UNIT
 
        self.canvas.move(target, base_action[0], base_action[1])
 
        s_ = self.canvas.coords(target)
 
        return s_
 
    def render(self):
        # 게임 속도 조정
        time.sleep(self.render_speed)
        self.update()
 
cs