import numpy as np
import torch
from Gridworld import Gridworld
from IPython.display import clear_output
import random
from matplotlib import pylab as plt
from collections import deque
l1 = 64
l2 = 150
l3 = 100
l4 = 4
model = torch.nn.Sequential(
torch.nn.Linear(l1, l2),
torch.nn.ReLU(),
torch.nn.Linear(l2, l3),
torch.nn.ReLU(),
torch.nn.Linear(l3,l4)
)
loss_fn = torch.nn.MSELoss()
learning_rate = 1e-3
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
learning_rate = 1e-3
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
gamma = 0.9
epsilon = 0.3
epochs = 5000
losses = []
mem_size = 1000 # Set the length of experience replay
batch_size = 200 # Set the size of mini batch
replay = deque(maxlen=mem_size) # Utilize the 'deque' instance as experience replay list
max_moves = 50 # 게임을 포기하기 까지의 최대 이동 횟수 설정
h = 0
action_set = {
0: 'u',
1: 'd',
2: 'l',
3: 'r',
}
for i in range(epochs):
game = Gridworld(size=4, mode='random')
state1_ = game.board.render_np().reshape(1, 64) + np.random.rand(1, 64) / 100.0
state1 = torch.from_numpy(state1_).float()
status = 1
mov = 0
while status == 1:
mov += 1
qval = model(state1) # 현재 상태의 Q 계산
qval_ = qval.data.numpy()
if random.random() < epsilon: # Epsilon greedy
action_ = np.random.randint(0, 4)
else:
action_ = np.argmax(qval_)
action = action_set[action_]
game.makeMove(action)
state2_ = game.board.render_np().reshape(1, 64) + np.random.rand(1, 64) / 100.0
state2 = torch.from_numpy(state2_).float()
reward = game.reward()
done = True if reward > 0 else False
exp = (state1, action_, reward, state2, done) # Experience = (current state, action, reward, next state, done) tuple
replay.append(exp) # experience 를 experience replay list 에 추가 -> Deque는 가득 차면 첫 항목을 제거
state1 = state2
if len(replay) > batch_size: # Experience replay list의 길이가 batch size보다 크면 학습 시작
minibatch = random.sample(replay, batch_size) # Experience replay list에서 무작위로 부분집합 추출
# Experience의 구성 성분들을 종류 별로 개별 tensor에 넣음
state1_batch = torch.cat([s1 for (s1, a, r, s2, d) in minibatch])
action_batch = torch.Tensor([a for (s1, a, r, s2, d) in minibatch])
reward_batch = torch.Tensor([r for (s1, a, r, s2, d) in minibatch]) # 이전: 정수 1개 -> reward_batch: 정수 100
state2_batch = torch.cat([s2 for (s1, a, r, s2, d) in minibatch])
done_batch = torch.Tensor([d for (s1, a, r, s2, d) in minibatch])
Q1 = model(state1_batch) # 현재 state들의 mini batch로 Q value들을 재계산
with torch.no_grad():
Q2 = model(state2_batch) # 다음 state들의 mini batch로 Q value들을 재계산 (기울기는 계산 안함)
Y = reward_batch + gamma * ((1 - done_batch) * torch.max(Q2, dim=1)[0]) # Q-learning target 계산
# long() : long type으로 casting
# unsqueeze(dim=1) : 1인 차원을 생성 ex: [3,100,100].unsqueeze(1) -> [3, 1, 100, 100]
# gather : 특정 인덱스를 뽑아냄
X = Q1.gather(dim=1, index=action_batch.long().unsqueeze(dim=1)).squeeze()
loss = loss_fn(X, Y.detach())
print(i, loss.item())
clear_output(wait=True)
optimizer.zero_grad()
loss.backward()
losses.append(loss.item())
optimizer.step()
if reward != -1 or mov > max_moves: # 게임이 끝나면 현재 학습 상태와 아동 횟수를 초기화
status = 0
mov = 0
losses = np.array(losses)
def running_mean(x,N=50):
c = x.shape[0] - N
y = np.zeros(c)
conv = np.ones(N)
for i in range(c):
y[i] = (x[i:i+N] @ conv)/N
return y
def test_model(model, mode='random', display=True):
i = 0
test_game = Gridworld(mode=mode)
state_ = test_game.board.render_np().reshape(1, 64) + np.random.rand(1, 64) / 10.0
state = torch.from_numpy(state_).float()
if display:
print("Initial State:")
print(test_game.display())
status = 1
while (status == 1): # A
qval = model(state)
qval_ = qval.data.numpy()
action_ = np.argmax(qval_) # B
action = action_set[action_]
if display:
print('Move #: %s; Taking action: %s' % (i, action))
test_game.makeMove(action)
state_ = test_game.board.render_np().reshape(1, 64) + np.random.rand(1, 64) / 10.0
state = torch.from_numpy(state_).float()
if display:
print(test_game.display())
reward = test_game.reward()
if reward != -1:
if reward > 0:
status = 2
if display:
print("Game won! Reward: %s" % (reward,))
else:
status = 0
if display:
print("Game LOST. Reward: %s" % (reward,))
i += 1
if (i > 15):
if display:
print("Game lost; too many moves.")
break
win = True if status == 2 else False
return win
max_games = 1000
wins = 0
for i in range(max_games):
win = test_model(model, mode='random', display=False)
if win:
wins += 1
win_perc = float(wins) / float(max_games)
print("Games played: {0}, # of wins: {1}".format(max_games,wins))
print("Win percentage: {}%".format(100.0*win_perc))
test_model(model, mode='random')