본문 바로가기
RL with python/Python example code

[RL-PyTorch] Policy gradient method, REINFORCE algorithm

by achrxme 2023. 8. 28.

 

import numpy as np
import torch
import gym
from matplotlib import pyplot as plt


def running_mean(x, N=50):
    kernel = np.ones(N)
    conv_len = x.shape[0] - N
    y = np.zeros(conv_len)
    for i in range(conv_len):
        y[i] = kernel @ x[i:i + N]
        y[i] /= N
    return y


env = gym.make("CartPole-v1", render_mode="human")

l1 = 4       # the length of input data = 4
l2 = 150     # dimension of hidden layer = 150
l3 = 2       # output : 2-length vector containing the probability of 2 actions (left or right)

# [Stochastic Policy Network]
model = torch.nn.Sequential(
    torch.nn.Linear(l1, l2),
    torch.nn.LeakyReLU(),
    torch.nn.Linear(l2, l3),
    torch.nn.Softmax(dim=0)     # a softmax probability distribution over actions
                                # -> to make the sum of the actions = 1
)

learning_rate = 0.009
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

state1 = env.reset()
pred = model(torch.from_numpy(state1[0]).float())                 # Predict the probability of actions w.Policy network
action = np.random.choice(np.array([0, 1]), p=pred.data.numpy())  # Choice 1 action from the probability distribution
state2, reward, done, truncated, info = env.step(action)          # Get the new state and the reward after executing the action


def discount_rewards(rewards, gamma=0.99):
    lenr = len(rewards)
    disc_return = torch.pow(gamma, torch.arange(lenr).float()) * rewards   # Discount factor를 계산
    disc_return /= disc_return.max()                                       # Normalization
    return disc_return


def loss_fn(preds, r):   # input: an array of action probabilities and the discounted rewards.
    # Loss = - sum of (각 동작의 log probability * discounted reward)
    return -1 * torch.sum(r * torch.log(preds))

# [Training loop]
MAX_DUR = 200
MAX_EPISODES = 500
gamma = 0.99
score = []  # The list for the length of episodes (measure for performance)
expectation = 0.0
for episode in range(MAX_EPISODES):
    curr_state = env.reset()
    done = False
    transitions = []  # The list of Tuple(state, action, rewards)

    for t in range(MAX_DUR):  # Until game end or Max duration
        # env.reset()에서 나온 curr_state : (np.array, {}) 형태 -> 그중 np.array만 뽑아서 torchTensor로 변환
        act_prob = model(torch.from_numpy(curr_state[0]).float()) # Get the action probabilities
        action = np.random.choice(np.array([0, 1]), p=act_prob.data.numpy())   # Choice one action
        prev_state = curr_state
        curr_state, _, done, truncated, info = env.step(action)   # Operate that action
        # env.reset()에서 나온 curr_state : np.array 형태 -> 65째 줄에서 [0]으로 추출하면 float만 나옴 -> 같은 tuple 형태로 변환
        curr_state = (curr_state, {})
        transitions.append((prev_state, action, t + 1))  # record the current Tuple(state, action, rewards)
        if done:  # When the game is ended
            break

    ep_len = len(transitions) # ep_len : Performance
    score.append(ep_len)

    print(episode, " : ", ep_len)

    reward_batch = torch.Tensor([r for (s, a, r) in transitions]).flip(dims=(0,))  # episode reward들을 하나의 tensor에 저장
    disc_returns = discount_rewards(reward_batch)   # discounted reward 계산
    # transitions에 담긴 (s, a, r) 중 s는 (np.array, {}) 형태 -> 그중 np.array만 뽑아내 torch Tensor로 변
    state_batch = torch.Tensor([s[0] for (s, a, r) in transitions]) 
    action_batch = torch.Tensor([a for (s, a, r) in transitions])

    pred_batch = model(state_batch) # episode의 모든 state에 대한 action probability를 다시 계산
    prob_batch = pred_batch.gather(
        dim=1, index=action_batch.long().view(-1, 1)).squeeze() # 그 중 실제로 취한 action probability 모음
    loss = loss_fn(prob_batch, disc_returns)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

 

State 등의 type에 주의해야 함