본문 바로가기
RL with python/Python example code

[RL-PyTorch] Deep Q-learning - Experience replay

by achrxme 2023. 8. 22.
import numpy as np
import torch
from Gridworld import Gridworld
from IPython.display import clear_output
import random
from matplotlib import pylab as plt

from collections import deque

l1 = 64
l2 = 150
l3 = 100
l4 = 4

model = torch.nn.Sequential(
    torch.nn.Linear(l1, l2),
    torch.nn.ReLU(),
    torch.nn.Linear(l2, l3),
    torch.nn.ReLU(),
    torch.nn.Linear(l3,l4)
)
loss_fn = torch.nn.MSELoss()
learning_rate = 1e-3
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

learning_rate = 1e-3
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

gamma = 0.9
epsilon = 0.3

epochs = 5000
losses = []
mem_size = 1000  # Set the length of experience replay
batch_size = 200  # Set the size of mini batch
replay = deque(maxlen=mem_size)  # Utilize the 'deque' instance as experience replay list
max_moves = 50  # 게임을 포기하기 까지의 최대 이동 횟수 설정
h = 0

action_set = {
    0: 'u',
    1: 'd',
    2: 'l',
    3: 'r',
}

for i in range(epochs):
    game = Gridworld(size=4, mode='random')
    state1_ = game.board.render_np().reshape(1, 64) + np.random.rand(1, 64) / 100.0
    state1 = torch.from_numpy(state1_).float()
    status = 1
    mov = 0
    while status == 1:
        mov += 1
        qval = model(state1)  # 현재 상태의 Q 계산
        qval_ = qval.data.numpy()
        if random.random() < epsilon:  # Epsilon greedy
            action_ = np.random.randint(0, 4)
        else:
            action_ = np.argmax(qval_)

        action = action_set[action_]
        game.makeMove(action)
        state2_ = game.board.render_np().reshape(1, 64) + np.random.rand(1, 64) / 100.0
        state2 = torch.from_numpy(state2_).float()
        reward = game.reward()
        done = True if reward > 0 else False
        exp = (state1, action_, reward, state2, done)  # Experience = (current state, action, reward, next state, done) tuple
        replay.append(exp)  # experience 를 experience replay list 에 추가 -> Deque는 가득 차면 첫 항목을 제거
        state1 = state2

        if len(replay) > batch_size:  # Experience replay list의 길이가 batch size보다 크면 학습 시작
            minibatch = random.sample(replay, batch_size)  # Experience replay list에서 무작위로 부분집합 추출
            # Experience의 구성 성분들을 종류 별로 개별 tensor에 넣음
            state1_batch = torch.cat([s1 for (s1, a, r, s2, d) in minibatch])
            action_batch = torch.Tensor([a for (s1, a, r, s2, d) in minibatch])
            reward_batch = torch.Tensor([r for (s1, a, r, s2, d) in minibatch])  # 이전: 정수 1개 -> reward_batch: 정수 100
            state2_batch = torch.cat([s2 for (s1, a, r, s2, d) in minibatch])
            done_batch = torch.Tensor([d for (s1, a, r, s2, d) in minibatch])

            Q1 = model(state1_batch)  # 현재 state들의 mini batch로 Q value들을 재계산
            with torch.no_grad():
                Q2 = model(state2_batch)  # 다음 state들의 mini batch로 Q value들을 재계산 (기울기는 계산 안함)

            Y = reward_batch + gamma * ((1 - done_batch) * torch.max(Q2, dim=1)[0])  # Q-learning target 계산
            # long() : long type으로 casting
            # unsqueeze(dim=1) : 1인 차원을 생성 ex: [3,100,100].unsqueeze(1) -> [3, 1, 100, 100]
            # gather : 특정 인덱스를 뽑아냄
            X = Q1.gather(dim=1, index=action_batch.long().unsqueeze(dim=1)).squeeze()
            loss = loss_fn(X, Y.detach())
            print(i, loss.item())
            clear_output(wait=True)
            optimizer.zero_grad()
            loss.backward()
            losses.append(loss.item())
            optimizer.step()

        if reward != -1 or mov > max_moves:  # 게임이 끝나면 현재 학습 상태와 아동 횟수를 초기화
            status = 0
            mov = 0
losses = np.array(losses)

def running_mean(x,N=50):
    c = x.shape[0] - N
    y = np.zeros(c)
    conv = np.ones(N)
    for i in range(c):
        y[i] = (x[i:i+N] @ conv)/N
    return y


def test_model(model, mode='random', display=True):
    i = 0
    test_game = Gridworld(mode=mode)
    state_ = test_game.board.render_np().reshape(1, 64) + np.random.rand(1, 64) / 10.0
    state = torch.from_numpy(state_).float()
    if display:
        print("Initial State:")
        print(test_game.display())
    status = 1
    while (status == 1):  # A
        qval = model(state)
        qval_ = qval.data.numpy()
        action_ = np.argmax(qval_)  # B
        action = action_set[action_]
        if display:
            print('Move #: %s; Taking action: %s' % (i, action))
        test_game.makeMove(action)
        state_ = test_game.board.render_np().reshape(1, 64) + np.random.rand(1, 64) / 10.0
        state = torch.from_numpy(state_).float()
        if display:
            print(test_game.display())
        reward = test_game.reward()
        if reward != -1:
            if reward > 0:
                status = 2
                if display:
                    print("Game won! Reward: %s" % (reward,))
            else:
                status = 0
                if display:
                    print("Game LOST. Reward: %s" % (reward,))
        i += 1
        if (i > 15):
            if display:
                print("Game lost; too many moves.")
            break

    win = True if status == 2 else False
    return win


max_games = 1000
wins = 0

for i in range(max_games):
    win = test_model(model, mode='random', display=False)
    if win:
        wins += 1
win_perc = float(wins) / float(max_games)
print("Games played: {0}, # of wins: {1}".format(max_games,wins))
print("Win percentage: {}%".format(100.0*win_perc))

test_model(model, mode='random')