-
DQN 코드- cartpoleIT&컴퓨터공학/딥러닝 2020. 12. 7. 16:47
DQN : 타겟신경망 / 학습신경망 다르게 운영
타켓신경망은 에피소드마다 업데이트
학습신경망은 스텝마다 업데이트
리플레이 메모리 : 샘플을 저장해 두는 저장소
if 연속적인 샘플들이 샘플링 되면 학습 속도가 매우느려짐
이를 방지하기 위해서 샘플들을 리플레이 메모리에 저장해두고, 나중에 여기서 랜덤샘플링을 해서 학습 시킴
train.py
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175import osimport sysimport gymimport pylabimport randomimport numpy as npfrom collections import deque # 리플레이 메모리로 사용할 deque 자료구조 importimport tensorflow as tffrom tensorflow.keras.layers import Densefrom tensorflow.keras.optimizers import Adamfrom tensorflow.keras.initializers import RandomUniform# 상태가 입력, 큐함수가 출력인 인공신경망 생성class DQN(tf.keras.Model):def __init__(self, action_size):super(DQN, self).__init__()self.fc1 = Dense(24, activation='relu') #은닉층1self.fc2 = Dense(24, activation='relu') #은닉층2self.fc_out = Dense(action_size, #두개의 노드 (행동의 개수가 2라서)kernel_initializer=RandomUniform(-1e-3, 1e-3))def call(self, x):x = self.fc1(x)x = self.fc2(x)q = self.fc_out(x)return q# 카트폴 예제에서의 DQN 에이전트class DQNAgent:def __init__(self, state_size, action_size): #생성자self.render = False # 학습하는거 그래픽으로 보고싶으면 True 로 고치면 보임# 상태와 행동의 크기 정의self.state_size = state_sizeself.action_size = action_size# DQN 하이퍼파라미터self.discount_factor = 0.99 # 감가율 0.99self.learning_rate = 0.001 # 학습률 0.001self.epsilon = 1.0 # 처음에는 무조건 탐험self.epsilon_decay = 0.999 # 0.999 씩 곱해가면서self.epsilon_min = 0.01 #입실론의 최소는 0.01self.batch_size = 64 #샘플링할 크기self.train_start = 1000 # 샘플이 1000 개 이상 쌓이면 model 훈련시작함.# 리플레이 메모리, 최대 크기 2000 리플레이 메모리 : 강화학습에서 학습의 재료가 되는 샘플을 저장해두는 저장소self.memory = deque(maxlen=2000)# 모델과 타깃 모델 생성 ( 학습모델과 타겟모델을 나눴다는게 바로 off policy - 큐러닝 )self.model = DQN(action_size) # 학습 모델self.target_model = DQN(action_size) #타겟 모델self.optimizer = Adam(lr=self.learning_rate)# 타깃 모델 초기화 - 학습을 시작하기 전에 두 모델의 가중치 값을 통일 ( 같게 시작 )self.update_target_model()# 타깃 모델을 학습 모델의 가중치로 업데이트def update_target_model(self):self.target_model.set_weights(self.model.get_weights())# 입실론 탐욕 정책으로 행동 선택def get_action(self, state):if np.random.rand() <= self.epsilon:return random.randrange(self.action_size)else:q_value = self.model(state)return np.argmax(q_value[0])# 샘플 <s, a, r, s'>을 리플레이 메모리에 저장def append_sample(self, state, action, reward, next_state, done):self.memory.append((state, action, reward, next_state, done))# 리플레이 메모리에서 무작위로 추출한 배치로 모델 학습def train_model(self):if self.epsilon > self.epsilon_min:self.epsilon *= self.epsilon_decay# 메모리에서 배치 크기(64)만큼 무작위로 샘플 추출mini_batch = random.sample(self.memory, self.batch_size)states = np.array([sample[0][0] for sample in mini_batch])actions = np.array([sample[1] for sample in mini_batch])rewards = np.array([sample[2] for sample in mini_batch])next_states = np.array([sample[3][0] for sample in mini_batch])dones = np.array([sample[4] for sample in mini_batch])# 학습 파라메터model_params = self.model.trainable_variableswith tf.GradientTape() as tape:# 현재 상태에 대한 모델의 큐함수predicts = self.model(states) #예측값one_hot_action = tf.one_hot(actions, self.action_size) #one_hot 함수를 통해 실제로 한 행동이 1이고 나머지가 0인 one_hot vector 구함predicts = tf.reduce_sum(one_hot_action * predicts, axis=1) #모든 차원을 제거하고 단 하나의 스칼라 값을 출력함# 다음 상태에 대한 타깃 모델의 큐함수target_predicts = self.target_model(next_states)target_predicts = tf.stop_gradient(target_predicts) # 학습과정에서 타켓모델이 학습되는걸 방지# 벨만 최적 방정식을 이용한 업데이트 타깃max_q = np.amax(target_predicts, axis=-1) # 다음상태에 대한 큐함수중 가장 큰 값targets = rewards + (1 - dones) * self.discount_factor * max_q #타겟값#dones = true인 경우 다음 큐함수는 없으니까 그냥 rewardsloss = tf.reduce_mean(tf.square(targets - predicts)) # MSE 계산# 오류함수를 줄이는 방향으로 모델 업데이트grads = tape.gradient(loss, model_params)self.optimizer.apply_gradients(zip(grads, model_params))if __name__ == "__main__":# CartPole-v1 환경, 최대 타임스텝 수가 500env = gym.make('CartPole-v1')# state = 카트의 수평선 상의 위치, 카트의 속도, 폴의 수직선으로부터 기운 각도, 폴의 각속도state_size = env.observation_space.shape[0] # state_size = 4# action = 왼쪽, 오른쪽 이동action_size = env.action_space.n # action_size = 2# DQN 에이전트 생성agent = DQNAgent(state_size, action_size)scores, episodes = [], []score_avg = 0num_episode = 100for e in range(num_episode):done = Falsescore = 0# env 초기화state = env.reset() #[ 0.02680346 0.00147292 -0.01816686 -0.01559384]state = np.reshape(state, [1, state_size]) #[[ 0.02680346 0.00147292 -0.01816686 -0.01559384]]while not done:if agent.render:env.render()# 현재 상태로 행동을 선택action = agent.get_action(state)# 선택한 행동으로 환경에서 한 타임스텝 진행next_state, reward, done, info = env.step(action)next_state = np.reshape(next_state, [1, state_size])# 타임스텝마다 보상 0.1, 에피소드가 중간에 끝나면 -1 보상score += rewardreward = 0.1 if not done or score == 500 else -1# 리플레이 메모리에 샘플 <s, a, r, s'> 저장agent.append_sample(state, action, reward, next_state, done)# 매 타임스텝마다 학습모델 학습if len(agent.memory) >= agent.train_start:agent.train_model()state = next_stateif done: #에피소드가 끝난경우에# 각 에피소드마다 타깃 모델을 모델의 가중치로 업데이트 ( 타겟모델은 에피소드마다, 학습모델은 스텝마다 )agent.update_target_model()# 에피소드마다 학습 결과 출력score_avg = 0.9 * score_avg + 0.1 * score if score_avg != 0 else scoreprint("episode: {:3d} | score avg: {:3.2f} | memory length: {:4d} | epsilon: {:.4f}".format(e, score_avg, len(agent.memory), agent.epsilon))# 에피소드마다 학습 결과 그래프로 저장scores.append(score_avg)episodes.append(e)pylab.plot(episodes, scores, 'b')pylab.xlabel("episode")pylab.ylabel("average score")pylab.savefig("./save_graph/graph.png")# 이동 평균이 400 이상일 때 종료if score_avg > 400:agent.model.save_weights("./save_model/model", save_format="tf")sys.exit()cs test.py
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576import sysimport gymimport pylabimport randomimport numpy as npfrom collections import dequeimport tensorflow as tffrom tensorflow.keras.layers import Densefrom tensorflow.keras.initializers import RandomUniform# 상태가 입력, 큐함수가 출력인 인공신경망 생성class DQN(tf.keras.Model):def __init__(self, action_size):super(DQN, self).__init__()self.fc1 = Dense(24, activation='relu')self.fc2 = Dense(24, activation='relu')self.fc_out = Dense(action_size,kernel_initializer=RandomUniform(-1e-3, 1e-3))def call(self, x):x = self.fc1(x)x = self.fc2(x)q = self.fc_out(x)return q# 카트폴 예제에서의 DQN 에이전트class DQNAgent:def __init__(self, state_size, action_size):# 상태와 행동의 크기 정의self.state_size = state_sizeself.action_size = action_size# 모델과 타깃 모델 생성self.model = DQN(action_size)self.model.load_weights("./save_model/trained/model")# 입실론 탐욕 정책으로 행동 선택def get_action(self, state):q_value = self.model(state)return np.argmax(q_value[0])if __name__ == "__main__":# CartPole-v1 환경, 최대 타임스텝 수가 500env = gym.make('CartPole-v1')state_size = env.observation_space.shape[0]action_size = env.action_space.n# DQN 에이전트 생성agent = DQNAgent(state_size, action_size)num_episode = 10for e in range(num_episode):done = Falsescore = 0# env 초기화state = env.reset()state = np.reshape(state, [1, state_size])while not done:env.render()# 현재 상태로 행동을 선택action = agent.get_action(state)# 선택한 행동으로 환경에서 한 타임스텝 진행next_state, reward, done, info = env.step(action)next_state = np.reshape(next_state, [1, state_size])score += rewardstate = next_stateif done:# 에피소드마다 학습 결과 출력print("episode: {:3d} | score: {:.3f} ".format(e, score))cs 'IT&컴퓨터공학 > 딥러닝' 카테고리의 다른 글
[딥러닝]1강. 딥러닝이란 무엇인가 ? (0) 2021.02.09 수식 (0) 2020.12.07 폴리시 그레이디언트 (0) 2020.12.07 딥살사 코드 - 딥러닝의 시작 (0) 2020.12.07 큐러닝 코드 ( off policy TD control ) => 학습정책 =! 행동정책 (0) 2020.12.06 댓글