-
폴리시 그레이디언트IT&컴퓨터공학/딥러닝 2020. 12. 7. 01:08
정책기반 강화학습 - 여기서 인공신경망은 정책신경망이라고 함 ( 때문에 출력층의 활성함수는 'softmax' 를 이용 - 합해서 1이 나와야 하므로)
train.py
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137import copyimport pylabimport randomimport numpy as npfrom environment import Envimport tensorflow as tffrom tensorflow.keras.layers import Densefrom tensorflow.keras.optimizers import Adam# 상태가 입력, 각 행동의 확률이 출력인 인공신경망 생성class REINFORCE(tf.keras.Model):def __init__(self, action_size):super(REINFORCE, self).__init__()self.fc1 = Dense(24, activation='relu')self.fc2 = Dense(24, activation='relu')self.fc_out = Dense(action_size, activation='softmax')def call(self, x):x = self.fc1(x)x = self.fc2(x)policy = self.fc_out(x)return policy# 그리드월드 예제에서의 REINFORCE 에이전트 ( 에이전트 생성하는 함수 )class REINFORCEAgent:def __init__(self, state_size, action_size):# 상태의 크기와 행동의 크기 정의self.state_size = state_sizeself.action_size = action_size# REINFORCE 하이퍼 파라메터self.discount_factor = 0.99 #감가율 0.99self.learning_rate = 0.001 #학습률 0.001self.model = REINFORCE(self.action_size) #정책신경망 생성self.optimizer = Adam(lr=self.learning_rate)self.states, self.actions, self.rewards = [], [], []# 정책신경망으로 행동 선택 ( 정책 자체가 확률적이라서 탐욕정책이 필요하지않다 )def get_action(self, state):policy = self.model(state)[0]policy = np.array(policy)return np.random.choice(self.action_size, 1, p=policy)[0]# 반환값 계산def discount_rewards(self, rewards):discounted_rewards = np.zeros_like(rewards)running_add = 0for t in reversed(range(0, len(rewards))):running_add = running_add * self.discount_factor + rewards[t]discounted_rewards[t] = running_addreturn discounted_rewards# 한 에피소드 동안의 상태, 행동, 보상을 저장def append_sample(self, state, action, reward):self.states.append(state[0])self.rewards.append(reward)act = np.zeros(self.action_size)act[action] = 1self.actions.append(act)# 정책신경망 업데이트def train_model(self):#Z-score 방법으로 표준화방법으로 정규화함 -> 정책신경망 업데이트 성능이 좋아진다.discounted_rewards = np.float32(self.discount_rewards(self.rewards)) #보상을 discount_rewards 함수를 통해 반환 값을 return하고 반환 값을 numpy.float32형식으로 변환discounted_rewards -= np.mean(discounted_rewards) # 모집단의 평균을 뺀후에discounted_rewards /= np.std(discounted_rewards) #표준편차로 나눠준다.# 크로스 엔트로피 오류함수 계산model_params = self.model.trainable_variableswith tf.GradientTape() as tape:tape.watch(model_params)policies = self.model(np.array(self.states))actions = np.array(self.actions)action_prob = tf.reduce_sum(actions * policies, axis=1)cross_entropy = - tf.math.log(action_prob + 1e-5) #"크로스 엔트로피 구한다loss = tf.reduce_sum(cross_entropy * discounted_rewards)entropy = - policies * tf.math.log(policies)# 오류함수를 줄이는 방향으로 모델 업데이트grads = tape.gradient(loss, model_params)self.optimizer.apply_gradients(zip(grads, model_params))self.states, self.actions, self.rewards = [], [], []return np.mean(entropy)if __name__ == "__main__":# 환경과 에이전트 생성env = Env(render_speed=0.01)state_size = 15 # 상태 15개 ( 딥살사와 같음. 장애물 당 4개의 변수 , 장애물 3개 ,나머지 3개)action_space = [0, 1, 2, 3, 4] # 상,하,우,좌,제자리 순서action_size = len(action_space) # 5agent = REINFORCEAgent(state_size, action_size) #에이전트를 생성한다scores, episodes = [], []EPISODES = 200for e in range(EPISODES):done = Falsescore = 0# env 초기화state = env.reset()state = np.reshape(state, [1, state_size]) #상태 list를 (1,15)의 numpy.array로 변환while not done:# 현재 상태에 대한 행동 선택action = agent.get_action(state)# 선택한 행동으로 환경에서 한 타임스텝 진행 후 샘플 수집next_state, reward, done = env.step(action)next_state = np.reshape(next_state, [1, state_size]) #다음상태 list를 (1,15)의 numpy.array로 변환agent.append_sample(state, action, reward)score += rewardstate = next_stateif done:# 에피소드마다 정책신경망 업데이트entropy = agent.train_model()# 에피소드마다 학습 결과 출력print("episode: {:3d} | score: {:3d} | entropy: {:.3f}".format(e, score, entropy))scores.append(score)episodes.append(e)pylab.plot(episodes, scores, 'b')pylab.xlabel("episode")pylab.ylabel("score")pylab.savefig("./save_graph/graph.png")# 100 에피소드마다 모델 저장if e % 100 == 0:agent.model.save_weights('save_model/model', save_format='tf')cs test.py
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172import copyimport pylabimport randomimport numpy as npfrom environment import Envimport tensorflow as tffrom tensorflow.keras.layers import Densefrom tensorflow.keras.optimizers import Adam# 상태가 입력, 각 행동의 확률이 출력인 인공신경망 생성class REINFORCE(tf.keras.Model):def __init__(self, action_size):super(REINFORCE, self).__init__()self.fc1 = Dense(24, activation='relu')self.fc2 = Dense(24, activation='relu')self.fc_out = Dense(action_size, activation='softmax')def call(self, x):x = self.fc1(x)x = self.fc2(x)policy = self.fc_out(x)return policy# 그리드월드 예제에서의 REINFORCE 에이전트class REINFORCEAgent:def __init__(self, state_size, action_size):# 상태의 크기와 행동의 크기 정의self.state_size = state_sizeself.action_size = action_sizeself.model = REINFORCE(self.action_size)self.model.load_weights('save_model/trained/model')# 정책신경망으로 행동 선택def get_action(self, state):policy = self.model(state)[0]policy = np.array(policy)return np.random.choice(self.action_size, 1, p=policy)[0]if __name__ == "__main__":# 환경과 에이전트 생성env = Env(render_speed=0.05)state_size = 15action_space = [0, 1, 2, 3, 4]action_size = len(action_space)agent = REINFORCEAgent(state_size, action_size)EPISODES = 10for e in range(EPISODES):done = Falsescore = 0# env 초기화state = env.reset()state = np.reshape(state, [1, state_size])while not done:# 현재 상태에 대한 행동 선택action = agent.get_action(state)# 선택한 행동으로 환경에서 한 타임스텝 진행 후 샘플 수집next_state, reward, done = env.step(action)next_state = np.reshape(next_state, [1, state_size])score += rewardstate = next_stateif done:print("episode: {:3d} | score: {:3d}".format(e, score))cs environment.py
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237import timeimport numpy as npimport tkinter as tkfrom PIL import ImageTk, ImagePhotoImage = ImageTk.PhotoImageUNIT = 50 # 픽셀 수HEIGHT = 5 # 그리드 세로WIDTH = 5 # 그리드 가로np.random.seed(1)class Env(tk.Tk):def __init__(self, render_speed=0.01):super(Env, self).__init__()self.render_speed=render_speedself.action_space = ['u', 'd', 'l', 'r']self.action_size = len(self.action_space)self.title('REINFORCE')self.geometry('{0}x{1}'.format(HEIGHT * UNIT, HEIGHT * UNIT))self.shapes = self.load_images()self.canvas = self._build_canvas()self.counter = 0self.rewards = []self.goal = []# 장애물 설정self.set_reward([0, 1], -1)self.set_reward([1, 2], -1)self.set_reward([2, 3], -1)# 목표 지점 설정self.set_reward([4, 4], 1)def _build_canvas(self):canvas = tk.Canvas(self, bg='white',height=HEIGHT * UNIT,width=WIDTH * UNIT)# 그리드 생성for c in range(0, WIDTH * UNIT, UNIT): # 0~400 by 80x0, y0, x1, y1 = c, 0, c, HEIGHT * UNITcanvas.create_line(x0, y0, x1, y1)for r in range(0, HEIGHT * UNIT, UNIT): # 0~400 by 80x0, y0, x1, y1 = 0, r, HEIGHT * UNIT, rcanvas.create_line(x0, y0, x1, y1)self.rewards = []self.goal = []# 캔버스에 이미지 추가x, y = UNIT/2, UNIT/2self.rectangle = canvas.create_image(x, y, image=self.shapes[0])canvas.pack()return canvasdef load_images(self):rectangle = PhotoImage(Image.open("../img/rectangle.png").resize((30, 30)))triangle = PhotoImage(Image.open("../img/triangle.png").resize((30, 30)))circle = PhotoImage(Image.open("../img/circle.png").resize((30, 30)))return rectangle, triangle, circledef reset_reward(self):for reward in self.rewards:self.canvas.delete(reward['figure'])self.rewards.clear()self.goal.clear()self.set_reward([0, 1], -1)self.set_reward([1, 2], -1)self.set_reward([2, 3], -1)# #goalself.set_reward([4, 4], 1)def set_reward(self, state, reward):state = [int(state[0]), int(state[1])]x = int(state[0])y = int(state[1])temp = {}if reward > 0:temp['reward'] = rewardtemp['figure'] = self.canvas.create_image((UNIT * x) + UNIT / 2,(UNIT * y) + UNIT / 2,image=self.shapes[2])self.goal.append(temp['figure'])elif reward < 0:temp['direction'] = -1temp['reward'] = rewardtemp['figure'] = self.canvas.create_image((UNIT * x) + UNIT / 2,(UNIT * y) + UNIT / 2,image=self.shapes[1])temp['coords'] = self.canvas.coords(temp['figure'])temp['state'] = stateself.rewards.append(temp)# new methodsdef check_if_reward(self, state):check_list = dict()check_list['if_goal'] = Falserewards = 0for reward in self.rewards:if reward['state'] == state:rewards += reward['reward']if reward['reward'] == 1:check_list['if_goal'] = Truecheck_list['rewards'] = rewardsreturn check_listdef coords_to_state(self, coords):x = int((coords[0] - UNIT / 2) / UNIT)y = int((coords[1] - UNIT / 2) / UNIT)return [x, y]def reset(self):self.update()time.sleep(0.5)x, y = self.canvas.coords(self.rectangle)self.canvas.move(self.rectangle, UNIT / 2 - x, UNIT / 2 - y)self.reset_reward()return self.get_state()def step(self, action):self.counter += 1self.render()if self.counter % 2 == 1:self.rewards = self.move_rewards()next_coords = self.move(self.rectangle, action)check = self.check_if_reward(self.coords_to_state(next_coords))done = check['if_goal']reward = check['rewards']self.canvas.tag_raise(self.rectangle)s_ = self.get_state()return s_, reward, donedef get_state(self):location = self.coords_to_state(self.canvas.coords(self.rectangle))agent_x = location[0]agent_y = location[1]states = list()for reward in self.rewards:reward_location = reward['state']states.append(reward_location[0] - agent_x)states.append(reward_location[1] - agent_y)if reward['reward'] < 0:states.append(-1)states.append(reward['direction'])else:states.append(1)return statesdef move_rewards(self):new_rewards = []for temp in self.rewards:if temp['reward'] == 1:new_rewards.append(temp)continuetemp['coords'] = self.move_const(temp)temp['state'] = self.coords_to_state(temp['coords'])new_rewards.append(temp)return new_rewardsdef move_const(self, target):s = self.canvas.coords(target['figure'])base_action = np.array([0, 0])if s[0] == (WIDTH - 1) * UNIT + UNIT / 2:target['direction'] = 1elif s[0] == UNIT / 2:target['direction'] = -1if target['direction'] == -1:base_action[0] += UNITelif target['direction'] == 1:base_action[0] -= UNITif (target['figure'] is not self.rectangleand s == [(WIDTH - 1) * UNIT, (HEIGHT - 1) * UNIT]):base_action = np.array([0, 0])self.canvas.move(target['figure'], base_action[0], base_action[1])s_ = self.canvas.coords(target['figure'])return s_def move(self, target, action):s = self.canvas.coords(target)base_action = np.array([0, 0])if action == 0: # 상if s[1] > UNIT:base_action[1] -= UNITelif action == 1: # 하if s[1] < (HEIGHT - 1) * UNIT:base_action[1] += UNITelif action == 2: # 우if s[0] < (WIDTH - 1) * UNIT:base_action[0] += UNITelif action == 3: # 좌if s[0] > UNIT:base_action[0] -= UNITself.canvas.move(target, base_action[0], base_action[1])s_ = self.canvas.coords(target)return s_def render(self):# 게임 속도 조정time.sleep(self.render_speed)self.update()cs 'IT&컴퓨터공학 > 딥러닝' 카테고리의 다른 글
수식 (0) 2020.12.07 DQN 코드- cartpole (0) 2020.12.07 딥살사 코드 - 딥러닝의 시작 (0) 2020.12.07 큐러닝 코드 ( off policy TD control ) => 학습정책 =! 행동정책 (0) 2020.12.06 살사 코드 (0) 2020.12.06 댓글