-
딥살사 코드 - 딥러닝의 시작IT&컴퓨터공학/딥러닝 2020. 12. 7. 00:40
이전까지 큐테이블을 이용했지만 장애물이 움직이는 것과 같은경우 상태 개수와 행동갯수가 너무 커지는 경우에
큐테이블을 사용할 수 없음.
더이상 직접 계산이 불가능하므로 인공신경망을 이용해서 모델이 스스로 학습하도록 한다.
딥살사 = 살사알고리즘 + 인공신경망 이용
원래 큐함수 업데이트 식
이거 상태하나하나 계산하는거 불가능 하니까 -> 주황색 글씨만 알면 이걸 최소로 하는 모델을 만들면되겠다.
딥살사에서 사용하는 오차함수 식
이걸 최소로 하는 모델을 만들자 ! -> 경사하강법 이용
train.py
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124import copyimport pylabimport randomimport numpy as npfrom environment import Envimport tensorflow as tffrom tensorflow.keras.layers import Densefrom tensorflow.keras.optimizers import Adam# 딥살사 인공신경망class DeepSARSA(tf.keras.Model):def __init__(self, action_size):super(DeepSARSA, self).__init__()self.fc1 = Dense(30, activation='relu') #은닉층1 : 30개의 노드 활성화함수 : relu ( 0보다 작은값은 0으로 ,0보다 큰값은 그대로)self.fc2 = Dense(30, activation='relu') #은닉층2 : 30개의 노드self.fc_out = Dense(action_size) #행동사이즈만큼 출력층의 노드가 정해짐 ( 큐함수를 인공신경망으로 근사시키는 거니까 ! 여기서는 5 )def call(self, x):x = self.fc1(x)x = self.fc2(x)q = self.fc_out(x)return q# 그리드월드 예제에서의 딥살사 에이전트 ( 에이전트 생성하는 함수 )class DeepSARSAgent:def __init__(self, state_size, action_size): # 생성자# 상태의 크기와 행동의 크기 정의self.state_size = state_sizeself.action_size = action_size# 딥살사 하이퍼 파라메터self.discount_factor = 0.99 #감가율 0.99 ( 1과 가까우므로 미래보상도 현재보상처럼 취급 )self.learning_rate = 0.001 # 학습률 0.001 ( 학습률이 매우 작으므로 현재것을 대부분 유지 )self.epsilon = 1. #처음 입실론 =1 : 무조건 탐험한다self.epsilon_decay = .9999 # 0.9999 를 곱하면서 탐험률을 낮춘다self.epsilon_min = 0.01 #입실론의 최솟값은 0.01 로한다. 즉 0.01만큼은 최소한 탐험으로 남겨두겠다는 뜻self.model = DeepSARSA(self.action_size) # 인공신경망을 생성한다self.optimizer = Adam(lr=self.learning_rate)# 입실론 탐욕 정책으로 행동 선택def get_action(self, state):if np.random.rand() <= self.epsilon: # 랜덤값이 입실론보다 작으면 무작위 행동 선택return random.randrange(self.action_size)else: # 랜덤값이 입실론보다 크면 제일 좋은 행동 선택q_values = self.model(state)return np.argmax(q_values[0])# <s, a, r, s', a'>의 샘플로부터 모델 업데이트def train_model(self, state, action, reward, next_state, next_action, done):if self.epsilon > self.epsilon_min:self.epsilon *= self.epsilon_decay# 학습 파라메터model_params = self.model.trainable_variableswith tf.GradientTape() as tape:tape.watch(model_params)predict = self.model(state)[0]one_hot_action = tf.one_hot([action], self.action_size)predict = tf.reduce_sum(one_hot_action * predict, axis=1) # 내가 계산한 예측값# done = True 일 경우 에피소드가 끝나서 다음 상태가 없음next_q = self.model(next_state)[0][next_action] #다음상태에서 다음 행동을 취했을때 q함수값target = reward + (1 - done) * self.discount_factor * next_q #타겟값 ( 정답 ) 단 done=True 인경우 target = reward# MSE 오류 함수 계산loss = tf.reduce_mean(tf.square(target - predict)) # 이 값이 최소가 되도록 모델을 발전시키면 된다# 오류함수를 줄이는 방향으로 모델 업데이트grads = tape.gradient(loss, model_params)self.optimizer.apply_gradients(zip(grads, model_params))if __name__ == "__main__":# 환경과 에이전트 생성env = Env(render_speed=0.01) # env 객체 초기화state_size = 15 #장애물에 대한 상태값 총 4개 , 장애물이 총 3개라서 4*3= 12 개, 에이전트에 대한 도착점의 상대위치(x,y)2개, 도착지점의 라벨1개#총 15개의 상태를 가진다.action_space = [0, 1, 2, 3, 4] #상,하,우,좌,제자리 순서action_size = len(action_space) # 5 ( 윗줄에서 알수있음 )agent = DeepSARSAgent(state_size, action_size) #에이전트를 생성한다.scores, episodes = [], [] # score 와 에피소드 빈 리스트 초기화EPISODES = 1000for e in range(EPISODES): #에피소드 만큼done = Falsescore = 0# env 초기화state = env.reset()state = np.reshape(state, [1, state_size]) #상태 list를 (1,15)의 numpy.array로 변환while not done:# 현재 상태에 대한 행동 선택action = agent.get_action(state)# 선택한 행동으로 환경에서 한 타임스텝 진행 후 샘플 수집next_state, reward, done = env.step(action)next_state = np.reshape(next_state, [1, state_size]) #다음 상태 list를 (1,15)의 numpy.array로 변환next_action = agent.get_action(next_state) #다음 상태에대해서 다음 행동을 구함# 샘플로 모델 학습agent.train_model(state, action, reward, next_state,next_action, done) # 살사니까 다음상태의 다음 행동까지 다 고려해서 모델학습score += rewardstate = next_stateif done:# 에피소드마다 학습 결과 출력print("episode: {:3d} | score: {:3d} | epsilon: {:.3f}".format(e, score, agent.epsilon))scores.append(score)episodes.append(e)pylab.plot(episodes, scores, 'b')pylab.xlabel("episode")pylab.ylabel("score")pylab.savefig("./save_graph/graph.png")# 100 에피소드마다 모델 저장if e % 100 == 0:agent.model.save_weights('save_model/model', save_format='tf')cs test.py
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374import randomimport numpy as npfrom environment import Envimport tensorflow as tffrom tensorflow.keras.layers import Dense# 딥살사 인공신경망class DeepSARSA(tf.keras.Model):def __init__(self, action_size):super(DeepSARSA, self).__init__()self.fc1 = Dense(30, activation='relu')self.fc2 = Dense(30, activation='relu')self.fc_out = Dense(action_size)def call(self, x):x = self.fc1(x)x = self.fc2(x)q = self.fc_out(x)return q# 그리드월드 예제에서의 딥살사 에이전트class DeepSARSAgent:def __init__(self, state_size, action_size):# 상태의 크기와 행동의 크기 정의self.state_size = state_sizeself.action_size = action_sizeself.epsilon = 0.01self.model = DeepSARSA(self.action_size)self.model.load_weights('save_model/trained/model')# 입실론 탐욕 정책으로 행동 선택def get_action(self, state):if np.random.rand() <= self.epsilon:return random.randrange(self.action_size)else:q_values = self.model(state)return np.argmax(q_values[0])if __name__ == "__main__":# 환경과 에이전트 생성env = Env(render_speed=0.05)state_size = 15action_space = [0, 1, 2, 3, 4]action_size = len(action_space)agent = DeepSARSAgent(state_size, action_size)scores, episodes = [], []EPISODES = 10for e in range(EPISODES):score = 0done = False# env 초기화state = env.reset()state = np.reshape(state, [1, state_size])while not done:# 현재 상태에 대한 행동 선택action = agent.get_action(state)# 선택한 행동으로 환경에서 한 타임스텝 진행 후 샘플 수집next_state, reward, done = env.step(action)next_state = np.reshape(next_state, [1, state_size])state = next_statescore += rewardif done:# 에피소드마다 테스트 결과 출력print("episode: {:3d} | score: {:3d}".format(e, score))cs environment.py
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238import timeimport numpy as npimport tkinter as tkfrom PIL import ImageTk, ImagePhotoImage = ImageTk.PhotoImageUNIT = 50 # 픽셀 수HEIGHT = 5 # 그리드 세로WIDTH = 5 # 그리드 가로np.random.seed(1)class Env(tk.Tk):def __init__(self, render_speed=0.01):super(Env, self).__init__()self.render_speed=render_speedself.action_space = ['u', 'd', 'l', 'r']self.action_size = len(self.action_space)self.title('DeepSARSA')self.geometry('{0}x{1}'.format(HEIGHT * UNIT, HEIGHT * UNIT))self.shapes = self.load_images()self.canvas = self._build_canvas()self.counter = 0self.rewards = []self.goal = []# 장애물 설정self.set_reward([0, 1], -1)self.set_reward([1, 2], -1)self.set_reward([2, 3], -1)# 목표 지점 설정self.set_reward([4, 4], 1)def _build_canvas(self):canvas = tk.Canvas(self, bg='white',height=HEIGHT * UNIT,width=WIDTH * UNIT)# 그리드 생성for c in range(0, WIDTH * UNIT, UNIT): # 0~400 by 80x0, y0, x1, y1 = c, 0, c, HEIGHT * UNITcanvas.create_line(x0, y0, x1, y1)for r in range(0, HEIGHT * UNIT, UNIT): # 0~400 by 80x0, y0, x1, y1 = 0, r, HEIGHT * UNIT, rcanvas.create_line(x0, y0, x1, y1)self.rewards = []self.goal = []# 캔버스에 이미지 추가x, y = UNIT/2, UNIT/2self.rectangle = canvas.create_image(x, y, image=self.shapes[0])canvas.pack()return canvasdef load_images(self):rectangle = PhotoImage(Image.open("../img/rectangle.png").resize((30, 30)))triangle = PhotoImage(Image.open("../img/triangle.png").resize((30, 30)))circle = PhotoImage(Image.open("../img/circle.png").resize((30, 30)))return rectangle, triangle, circledef reset_reward(self): # 보상 초기화 해주는 함수for reward in self.rewards:self.canvas.delete(reward['figure'])self.rewards.clear()self.goal.clear()self.set_reward([0, 1], -1) # (0,1) 자리에 장애물self.set_reward([1, 2], -1) # (1,2) 자리에 장애물self.set_reward([2, 3], -1) # (2,3) 자리에 장애물# #goalself.set_reward([4, 4], 1) # (4,4) 자리는 goal 지점def set_reward(self, state, reward): # 보상 다시 초기화state = [int(state[0]), int(state[1])] #위치를 list 형식으로 저장x = int(state[0])y = int(state[1])temp = {}if reward > 0: # 즉 goal 이면temp['reward'] = rewardtemp['figure'] = self.canvas.create_image((UNIT * x) + UNIT / 2,(UNIT * y) + UNIT / 2,image=self.shapes[2])self.goal.append(temp['figure'])elif reward < 0: # 즉 장애물이면temp['direction'] = -1temp['reward'] = rewardtemp['figure'] = self.canvas.create_image((UNIT * x) + UNIT / 2,(UNIT * y) + UNIT / 2,image=self.shapes[1])temp['coords'] = self.canvas.coords(temp['figure'])temp['state'] = stateself.rewards.append(temp)# 상태를 매개변수르 받아 목표지점에 도착할수있는지 확인하는 함수def check_if_reward(self, state):check_list = dict()check_list['if_goal'] = Falserewards = 0for reward in self.rewards: #장애물3개, 목표지점 1개에 대해서if reward['state'] == state: # 장애물 혹은 목표지점에 도달한다면rewards += reward['reward'] # 보상받기if reward['reward'] == 1: #goal 에 도착시check_list['if_goal'] = Truecheck_list['rewards'] = rewardsreturn check_listdef coords_to_state(self, coords):x = int((coords[0] - UNIT / 2) / UNIT)y = int((coords[1] - UNIT / 2) / UNIT)return [x, y]def reset(self):self.update()time.sleep(0.5)x, y = self.canvas.coords(self.rectangle)self.canvas.move(self.rectangle, UNIT / 2 - x, UNIT / 2 - y)self.reset_reward()return self.get_state()def step(self, action):self.counter += 1 # counter = time step 진행횟수self.render()if self.counter % 2 == 1: # 카운터가 홀수면 ( 즉 2번마다 한번씩 ) reward 계산self.rewards = self.move_rewards()next_coords = self.move(self.rectangle, action) #액션을 통해 움진인다check = self.check_if_reward(self.coords_to_state(next_coords))done = check['if_goal']reward = check['rewards']self.canvas.tag_raise(self.rectangle) # 행동에 따라 이동된 rectangle 을 canvas에 top level 에 표시s_ = self.get_state()return s_, reward, donedef get_state(self): # 정보를 합해서 상태로 저장하는 함수location = self.coords_to_state(self.canvas.coords(self.rectangle))agent_x = location[0] #에이전트 x좌표agent_y = location[1] #에이전트 y좌표states = list()for reward in self.rewards:reward_location = reward['state']states.append(reward_location[0] - agent_x) #상대위치계산states.append(reward_location[1] - agent_y) #상대위치계산if reward['reward'] < 0: #장애물을 만나면states.append(-1) # reward = -1states.append(reward['direction']) # direction = -1else:states.append(1) # reward = +1return statesdef move_rewards(self):new_rewards = []for temp in self.rewards:if temp['reward'] == 1:new_rewards.append(temp)continuetemp['coords'] = self.move_const(temp)temp['state'] = self.coords_to_state(temp['coords'])new_rewards.append(temp)return new_rewardsdef move_const(self, target):s = self.canvas.coords(target['figure'])base_action = np.array([0, 0])if s[0] == (WIDTH - 1) * UNIT + UNIT / 2:target['direction'] = 1elif s[0] == UNIT / 2:target['direction'] = -1if target['direction'] == -1:base_action[0] += UNITelif target['direction'] == 1:base_action[0] -= UNITif (target['figure'] is not self.rectangleand s == [(WIDTH - 1) * UNIT, (HEIGHT - 1) * UNIT]):base_action = np.array([0, 0])self.canvas.move(target['figure'], base_action[0], base_action[1])s_ = self.canvas.coords(target['figure'])return s_def move(self, target, action): # 행동에 따라 행동하고 그 위치를 반환하는 함수s = self.canvas.coords(target) # rectangle 의 위치르 받아서base_action = np.array([0, 0])if action == 0: # 상if s[1] > UNIT: # 위로움직였는데 끝이아니면 y좌표 -50픽셀base_action[1] -= UNITelif action == 1: # 하if s[1] < (HEIGHT - 1) * UNIT: # 아래로움직였는데 끝이아니면 y좌표 +50픽셀base_action[1] += UNITelif action == 2: # 우if s[0] < (WIDTH - 1) * UNIT:base_action[0] += UNITelif action == 3: # 좌if s[0] > UNIT:base_action[0] -= UNITself.canvas.move(target, base_action[0], base_action[1]) #이동s_ = self.canvas.coords(target)return s_def render(self):# 게임 속도 조정time.sleep(self.render_speed)self.update()cs 'IT&컴퓨터공학 > 딥러닝' 카테고리의 다른 글
DQN 코드- cartpole (0) 2020.12.07 폴리시 그레이디언트 (0) 2020.12.07 큐러닝 코드 ( off policy TD control ) => 학습정책 =! 행동정책 (0) 2020.12.06 살사 코드 (0) 2020.12.06 살사 계산문제 = on policy TD control ( 행동정책 = 학습정책 ) (0) 2020.12.06 댓글