ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 정책이터레이션 코드
    IT&컴퓨터공학/딥러닝 2020. 12. 6. 20:53

    environment.py

     

    9,10,

    32,33,34

    70,71,72

    100

    158

    217,218,219

    수정

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
    243
    244
    245
    246
    247
    248
    import tkinter as tk
    from tkinter import Button
    import time
    import numpy as np
    from PIL import ImageTk, Image
     
    PhotoImage = ImageTk.PhotoImage
    UNIT = 100  # 픽셀 수
    HEIGHT = 5  # 그리드월드 세로
    WIDTH = 5  # 그리드월드 가로
    TRANSITION_PROB = 1 # 상태변환확률 : 1이므로 왼쪽으로 가는 행동을 하면 무조건 왼쪽으로 감
    POSSIBLE_ACTIONS = [0123]  # 상,하,좌,우
    ACTIONS = [(-10), (10), (0-1), (01)]  # 좌표로 나타낸 행동
     
    REWARDS = []
     
     
    class GraphicDisplay(tk.Tk):
        def __init__(self, agent):
            super(GraphicDisplay, self).__init__()
            self.title('Policy Iteration')
            self.geometry('{0}x{1}'.format(HEIGHT * UNIT, HEIGHT * UNIT + 50))
            self.texts = []
            self.arrows = []
            self.env = Env()
            self.agent = agent
            self.evaluation_count = 0
            self.improvement_count = 0
            self.is_moving = 0
            (self.up, self.down, self.left, self.right), self.shapes = self.load_images()
            self.canvas = self._build_canvas()
            self.text_reward(22"R : 1.0")
            self.text_reward(12"R : -1.0")
            self.text_reward(21"R : -1.0")
     
        def _build_canvas(self):
            canvas = tk.Canvas(self, bg='white',
                               height=HEIGHT * UNIT,
                               width=WIDTH * UNIT)
            # 버튼 초기화
            iteration_button = Button(self, text="Evaluate",
                                      command=self.evaluate_policy)
            iteration_button.configure(width=10, activebackground="#33B5E5")
            canvas.create_window(WIDTH * UNIT * 0.13, HEIGHT * UNIT + 10,
                                 window=iteration_button)
            policy_button = Button(self, text="Improve",
                                   command=self.improve_policy)
            policy_button.configure(width=10, activebackground="#33B5E5")
            canvas.create_window(WIDTH * UNIT * 0.37, HEIGHT * UNIT + 10,
                                 window=policy_button)
            policy_button = Button(self, text="move", command=self.move_by_policy)
            policy_button.configure(width=10, activebackground="#33B5E5")
            canvas.create_window(WIDTH * UNIT * 0.62, HEIGHT * UNIT + 10,
                                 window=policy_button)
            policy_button = Button(self, text="reset", command=self.reset)
            policy_button.configure(width=10, activebackground="#33B5E5")
            canvas.create_window(WIDTH * UNIT * 0.87, HEIGHT * UNIT + 10,
                                 window=policy_button)
     
            # 그리드 생성
            for col in range(0, WIDTH * UNIT, UNIT):  # 0~400 by 80
                x0, y0, x1, y1 = col, 0, col, HEIGHT * UNIT
                canvas.create_line(x0, y0, x1, y1)
            for row in range(0, HEIGHT * UNIT, UNIT):  # 0~400 by 80
                x0, y0, x1, y1 = 0, row, HEIGHT * UNIT, row
                canvas.create_line(x0, y0, x1, y1)
     
            # 캔버스에 이미지 추가
            self.rectangle = canvas.create_image(5050, image=self.shapes[0])
            canvas.create_image(250150, image=self.shapes[1])
            canvas.create_image(150250, image=self.shapes[1])
            canvas.create_image(250250, image=self.shapes[2])
     
            canvas.pack()
     
            return canvas
     
        def load_images(self):
            up = PhotoImage(Image.open("../img/up.png").resize((1313)))
            right = PhotoImage(Image.open("../img/right.png").resize((1313)))
            left = PhotoImage(Image.open("../img/left.png").resize((1313)))
            down = PhotoImage(Image.open("../img/down.png").resize((1313)))
            rectangle = PhotoImage(Image.open("../img/rectangle.png").resize((6565)))
            triangle = PhotoImage(Image.open("../img/triangle.png").resize((6565)))
            circle = PhotoImage(Image.open("../img/circle.png").resize((6565)))
            return (up, down, left, right), (rectangle, triangle, circle)
     
        def reset(self):
            if self.is_moving == 0:
                self.evaluation_count = 0
                self.improvement_count = 0
                for i in self.texts:
                    self.canvas.delete(i)
     
                for i in self.arrows:
                    self.canvas.delete(i)
                self.agent.value_table = [[0.0* WIDTH for _ in range(HEIGHT)]
                self.agent.policy_table = ([[[0.250.250.250.25]] * WIDTH
                                            for _ in range(HEIGHT)])
                self.agent.policy_table[2][2= []
                x, y = self.canvas.coords(self.rectangle)
                self.canvas.move(self.rectangle, UNIT / 2 - x, UNIT / 2 - y)
     
        def text_value(self, row, col, contents, font='Helvetica', size=10,
                       style='normal', anchor="nw"):
            origin_x, origin_y = 8570
            x, y = origin_y + (UNIT * col), origin_x + (UNIT * row)
            font = (font, str(size), style)
            text = self.canvas.create_text(x, y, fill="black", text=contents,
                                           font=font, anchor=anchor)
            return self.texts.append(text)
     
        def text_reward(self, row, col, contents, font='Helvetica', size=10,
                        style='normal', anchor="nw"):
            origin_x, origin_y = 55
            x, y = origin_y + (UNIT * col), origin_x + (UNIT * row)
            font = (font, str(size), style)
            text = self.canvas.create_text(x, y, fill="black", text=contents,
                                           font=font, anchor=anchor)
            return self.texts.append(text)
     
        def rectangle_move(self, action):
            base_action = np.array([00])
            location = self.find_rectangle()
            self.render()
            if action == 0 and location[0> 0:  # 상
                base_action[1-= UNIT
            elif action == 1 and location[0< HEIGHT - 1:  # 하
                base_action[1+= UNIT
            elif action == 2 and location[1> 0:  # 좌
                base_action[0-= UNIT
            elif action == 3 and location[1< WIDTH - 1:  # 우
                base_action[0+= UNIT
            # move agent
            self.canvas.move(self.rectangle, base_action[0], base_action[1])
     
        def find_rectangle(self):
            temp = self.canvas.coords(self.rectangle)
            x = (temp[0/ 100- 0.5
            y = (temp[1/ 100- 0.5
            return int(y), int(x)
     
        def move_by_policy(self):
            if self.improvement_count != 0 and self.is_moving != 1:
                self.is_moving = 1
     
                x, y = self.canvas.coords(self.rectangle)
                self.canvas.move(self.rectangle, UNIT / 2 - x, UNIT / 2 - y)
     
                x, y = self.find_rectangle()
                while len(self.agent.policy_table[x][y]) != 0:
                    self.after(100,
                               self.rectangle_move(self.agent.get_action([x, y])))
                    x, y = self.find_rectangle()
                self.is_moving = 0
     
        def draw_one_arrow(self, col, row, policy):
            if col == 2 and row == 2:
                return
     
            if policy[0> 0:  # up
                origin_x, origin_y = 50 + (UNIT * row), 10 + (UNIT * col)
                self.arrows.append(self.canvas.create_image(origin_x, origin_y,
                                                            image=self.up))
            if policy[1> 0:  # down
                origin_x, origin_y = 50 + (UNIT * row), 90 + (UNIT * col)
                self.arrows.append(self.canvas.create_image(origin_x, origin_y,
                                                            image=self.down))
            if policy[2> 0:  # left
                origin_x, origin_y = 10 + (UNIT * row), 50 + (UNIT * col)
                self.arrows.append(self.canvas.create_image(origin_x, origin_y,
                                                            image=self.left))
            if policy[3> 0:  # right
                origin_x, origin_y = 90 + (UNIT * row), 50 + (UNIT * col)
                self.arrows.append(self.canvas.create_image(origin_x, origin_y,
                                                            image=self.right))
     
        def draw_from_policy(self, policy_table):
            for i in range(HEIGHT):
                for j in range(WIDTH):
                    self.draw_one_arrow(i, j, policy_table[i][j])
     
        def print_value_table(self, value_table):
            for i in range(WIDTH):
                for j in range(HEIGHT):
                    self.text_value(i, j, round(value_table[i][j], 2))
     
        def render(self):
            time.sleep(0.1)
            self.canvas.tag_raise(self.rectangle)
            self.update()
     
        #evaluate  버튼 눌르면 실행되는 함수 = 정책평가
        def evaluate_policy(self):
            self.evaluation_count += 1
            for i in self.texts:
                self.canvas.delete(i)
            self.agent.policy_evaluation()
            self.print_value_table(self.agent.value_table)
     
        # improve  버튼 눌르면 실행되는 함수 = 정책개선
        def improve_policy(self):
            self.improvement_count += 1
            for i in self.arrows:
                self.canvas.delete(i)
            self.agent.policy_improvement()
            self.draw_from_policy(self.agent.policy_table)
     
     
    class Env:
        def __init__(self):
            self.transition_probability = TRANSITION_PROB # 상태변환확률
            self.width = WIDTH #가로
            self.height = HEIGHT #세로
            self.reward = [[0* WIDTH for _ in range(HEIGHT)]
            self.possible_actions = POSSIBLE_ACTIONS
           self.reward[2][2= 1  # (2,2) 좌표 동그라미 위치에 보상 1 : 목적지
            self.reward[1][2= -1  # (1,2) 좌표 세모 위치에 보상 -1 : 장애물
            self.reward[2][1= -1  # (2,1) 좌표 세모 위치에 보상 -1 : 장애물
            self.all_state = []
     
            for x in range(WIDTH):
                for y in range(HEIGHT):
                    state = [x, y]
                    self.all_state.append(state)
     
        def get_reward(self, state, action): # 다음상태에서 받을 수 있는 보상계산
            next_state = self.state_after_action(state, action) # 다음상태는 state_after_action 함수로 계산
            return self.reward[next_state[0]][next_state[1]] #다음상태에서 받을 수 있는 보상 반환
     
        def state_after_action(self, state, action_index): # 행동 후 어떤 state 인지
            action = ACTIONS[action_index]
            return self.check_boundary([state[0+ action[0], state[1+ action[1]])
     
        @staticmethod # Env 객체를 생성하지않고 직접 사용이 가능
        def check_boundary(state): # 바운더리 벗어나면 체크
            state[0= (0 if state[0< 0 else WIDTH - 1
                        if state[0> WIDTH - 1 else state[0])
            state[1= (0 if state[1< 0 else HEIGHT - 1
                        if state[1> HEIGHT - 1 else state[1])
            return state
     
        def get_transition_prob(self, state, action): #상태변환환률 반환하는함수
            return self.transition_probability
     
        def get_all_states(self): #모든 상태가져오는 함수
            return self.all_state
     
    cs

     

    policy_iteratation.py

     

    15,

    29,

    49,

    수정

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    import numpy as np
    from environment import GraphicDisplay, Env
     
     
    class PolicyIteration:
        def __init__(self, env):
            # 환경에 대한 객체 선언
            self.env = env
            # 가치함수를 2차원 리스트로 초기화
            self.value_table = [[0.0* env.width for _ in range(env.height)]
            # 상 하 좌 우 동일한 확률로 정책 초기화
            self.policy_table = [[[0.250.250.250.25]] * env.width
                                for _ in range(env.height)]
            # 마침 상태의 설정
            self.policy_table[2][2= []
            # 할인율
            self.discount_factor = 0.9
     
        # 벨만 기대 방정식을 통해 다음 가치함수를 계산하는 정책 평가 ( 참 가치함수를 구하는 과정 )
        def policy_evaluation(self):
            # 다음 가치함수 초기화
            next_value_table = [[0.00* self.env.width
                               for _ in range(self.env.height)]
     
            # 모든 상태에 대해서 벨만 기대방정식을 계산
            for state in self.env.get_all_states():
                value = 0.0
                # 마침 상태의 가치 함수 = 0
                if state == [22]:
                    next_value_table[state[0]][state[1]] = value
                    continue
     
                # 벨만 기대 방정식
                for action in self.env.possible_actions: # 상,하,좌,우 행동에 대해서
                    next_state = self.env.state_after_action(state, action)  # 해당 행동 시 다음상태
                    reward = self.env.get_reward(state, action) # 그때의 리워드
                    next_value = self.get_value(next_state) #이후 상태
                    value += (self.get_policy(state)[action] * #벨만 기대방정식 계산 ( 전부 더한다. max 가 아님 ! )
                              (reward + self.discount_factor * next_value))
     
                next_value_table[state[0]][state[1]] = value #가치함수 update
     
            self.value_table = next_value_table #가치함수 테이블 update
     
        # 현재 가치 함수에 대해서 탐욕 정책 발전 ( 정책 update )
        def policy_improvement(self):
            next_policy = self.policy_table #정책 테이블을 가져오고
            for state in self.env.get_all_states(): #모든 상태에 대해서
                if state == [22]:
                    continue
                
                value_list = []
                # 반환할 정책 초기화
                result = [0.00.00.00.0]
     
                # 모든 행동에 대해서 [보상 + (할인율 * 다음 상태 가치함수)] = 큐함수 계산
                for index, action in enumerate(self.env.possible_actions):
                    next_state = self.env.state_after_action(state, action) # 다음상태
                    reward = self.env.get_reward(state, action) #그때의 보상
                    next_value = self.get_value(next_state) #그때의 가치함수
                    value = reward + self.discount_factor * next_value
                    value_list.append(value) # 큐함수 append
     
                # 받을 보상이 최대인 행동들에 대해 탐욕 정책 발전 ex) value_list =[3,1,3,2] 인 경우
                max_idx_list = np.argwhere(value_list == np.amax(value_list)) # amax 하면 3, argwhere 하면 3인 인덱스 반환 => 구한 큐함수들 중에서 가장 큰 값을 가지는 값의 행동을 반환
                max_idx_list = max_idx_list.flatten().tolist() # ex) [[0][2]] 인경우 => 1차원 리스트 [0,2] 로 변경
                prob = 1 / len(max_idx_list) # 길이 2 므로 각각 확률은 0.5 ,0.5
     
                for idx in max_idx_list:
                    result[idx] = prob
     
                next_policy[state[0]][state[1]] = result #[0.5,0,0.5,0] 반환
     
            self.policy_table = next_policy
     
        # 특정 상태에서 정책에 따라 무작위로 행동을 반환
        def get_action(self, state):
            policy = self.get_policy(state)
            policy = np.array(policy)
            return np.random.choice(41, p=policy)[0# 정책을 고려해서 행동을 선택. 4= 행동의 개수, 1= 1개의 행동을 샘플링한다는 뜻 , 세번째 인자에는 정책을 넣음
     
        # 상태에 따른 정책 반환
        def get_policy(self, state):
            return self.policy_table[state[0]][state[1]]
     
        # 가치 함수의 값을 반환
        def get_value(self, state):
            return self.value_table[state[0]][state[1]]
     
     
    if __name__ == "__main__":
        env = Env()
        policy_iteration = PolicyIteration(env)
        grid_world = GraphicDisplay(policy_iteration)
        grid_world.mainloop()
     
    cs

    댓글

Designed by Tistory.