Rainbow: Combination of DQN Extensions (Part 1)

이 글은 DeepMind의 Rainbow: Combining Improvements in Deep Reinforcement Learning 에 사용된 6가지 DQN extension들을 다룹니다. 또한 각 extension을 구현한 코드 일부에 변형/주석을 달았습니다.

DQN and Extensions

0. Vanilla DQN

직접 업데이트하는 대상인 online net $\theta$ 와 별도로 target net $\bar\theta$ 를 두고, target net은 주기적으로 online net을 복사하여 사용하는 off-policy 방법이다. SGD가 요구하는 i.i.d 가정을 위해 큰 experience replay memory buffer에서 transition $(s_t, a_t, r_{t+1}, \gamma_{t+1}, s_{t+1})$ 을 샘플링하여 사용한다.

1. N-step DQN (빠른 수렴)

한 스텝의 return만 보는 대신 n개의 스텝을 사용할 수 있다. n개의 스텝에서 선택한 action이 optimal하다고 가정하여 maximization을 생략하는 방식이다. 이렇게 되면 후반 상태들의 Q값이 초반 상태들로 빠르게 propagate되어 수렴이 빨라질 수 있다.1 하지만 n값이 지나치게 커지면 maximization 단계를 생략함으로써 생긴 Bellman update error가 커져서 학습이 되지 않는다.

2. Double DQN (overestimation bias 해결)

Vanilla Q-learning에서는 TD target으로 target net이 평가하기에 가장 큰 Q값을 사용하는데, 일반적으로 이 값은 과대추정되는 문제가 있다 (overestimation bias). 이를 해결하기 위해 TD target의 계산에 online net도 참여하도록 한다. Online net이 평가하기에 가장 Q값이 큰 action을 고르고, target net이 그에 대해 평가한 Q값을 TD target으로 사용한다.

def calc_loss(batch, net, tgt_net, gamma, device="cpu", double=True):
    states_v, actions_v, rewards_v, dones_mask, next_states_v = unpack_batch(batch) # assume as tensors

    # row는 batch sample, col은 action을 나타내므로, 선택된 actions로 column indexing
    state_action_values = net(states_v).gather(1, actions_v.unsqueeze(-1)).squeeze(-1)
    if double:
        # uses separate Q for action selection and evaluation
        next_state_actions = net(next_states_v).max(1)[1] # (online) net이 평가하기에 가장 좋은 action을 고름
        next_state_values = tgt_net(next_states_v).gather(1, next_state_actions.unsqueeze(-1)).squeeze(-1) # 선택된 action에 대한 평가를 target net이 제공
    else:
        next_state_values = tgt_net(next_states_v).max(1)[0]
    next_state_values[done_mask] = 0.0 # episode가 이미 끝났으면 masking

    expected_state_action_values = next_state_values.detach() * gamma + rewards_v  # TD target
    return nn.MSELoss()(state_action_values, expected_state_action_values) # TD error

3. Prioritized Replay (Sample efficiency)

Replay buffer에서 uniform하게 샘플링하는 대신, 학습에 더 도움이 될 (surprising한) 샘플을 더 자주 뽑도록 priority를 부여한다. TD error가 높으면 더 많은 정보를 준다는 것이므로, 이에 비례하는 priority를 줘서 샘플링을 하는 방식이 일반적이다. 또한 버퍼에 새로 들어온 샘플들에는 높은 priority를 부여한다. 이렇게 하면 i.i.d 가정이 깨지므로, 대신 샘플링 확률에 반비례하는 weight을 줘서 weighted loss를 사용한다. $\beta=1$ 일 경우 prioritizing에 의한 bias가 상쇄되지만, 수렴을 위해서는 [0,1] 사이의 값에서 시작하여 1까지 서서히 올리는 scheduling을 사용한다.

class PrioReplayBuffer:
    def __init__(self, exp_source, buf_size, prob_alpha=0.6):
        self.exp_source_iter = iter(exp_source)
        self.prob_alpha = prob_alpha
        self.capacity = buf_size
        self.pos = 0 # 가장 오래된 transition의 index
        self.buffer = []
        self.priorities = np.zeros((buf_size, ), dtype=np.float32)
        
    def populate(self, count):
        # 가장 높은 priority를 가진 샘플을 선택
        max_prio = self.priorities.max() if self.buffer else 1.0  
        
        for _ in range(count):
            sample = next(self.exp_source_iter)
            if len(self.buffer) < self.capacity: # buffer가 차지 않았으면 그냥 저장
                self.buffer.append(sample)
            else:
                self.buffer[self.pos] = sample # buffer가 찼으면 가장 오래된 transition을 바꿈
            self.priorities[self.pos] = max_prio
            self.pos = (self.pos + 1) % self.capacity

    def sample(self, batch_size, beta=0.4):
        if len(self.buffer) == self.capacity:
            prios = self.priorities
        else:
            prios = self.priorities[:self.pos]
        # priority를 확률로 변환
        probs = prios ** self.prob_alpha

        # 각 샘플에 대해 확률을 계산하여 그에 맞게 샘플링
        probs /= probs.sum() 
        indices = np.random.choice(len(self.buffer), batch_size, p=probs)

        samples = [self.buffer[idx] for idx in indices]
        total = len(self.buffer)
        
        # normalized loss weights 계산
        weights = (total * probs[indices]) ** (-beta)
        weights /= weights.max()
        
        return samples, indices, np.array(weights, dtype=np.float32)

    def update_priorities(self, batch_indices, batch_priorities):
        # batch_loss가 batch_priorities로 주어짐. loss를 priority로 사용
        for idx, prio in zip(batch_indices, batch_priorities):
            self.priorities[idx] = prio
def calc_loss(batch, batch_weights, net, tgt_net, gamma):
    states_v, actions_v, rewards_v, dones_mask, next_states_v = unpack_batch(batch) # assume as tensors
    batch_weights_v = torch.tensor(batch_weights)

    state_action_values = net(states_v).gather(1, actions_v.unsqueeze(-1)).squeeze(-1)
    next_state_values = tgt_net(next_states_v).max(1)[0]
    next_state_values[done_mask] = 0.0

    expected_state_action_values = next_state_values.detach() * gamma + rewards_v  # TD target
    losses_v = batch_weights_v * (state_action_values - expected_state_action_values) ** 2  # weighted TD error
    return losses_v.mean(), losses_v + 1e-5 # 1항은 backpropagation, 2항은 update_priorities를 위한 것 (zero priority 막기 위한 처리)
## inside main()

# beta scheduling
beta = min(1.0, BETA_START + frame_idx * (1.0 - BETA_START) / BETA_FRAMES)  

optimizer.zero_grad()
batch, batch_indices, batch_weights = buffer.sample(params['batch_size'], beta)
loss, sample_prios = calc_loss(batch, batch_weights, net, tgt_net.target_model, params['gamma'])
loss_v.backward()
optimizer.step()

# update new priorities for the processed batch
buffer.update_priorities(batch_indices, sample_prios.data.cpu().numpy()) 

4. Dueling Networks (학습 안정성, 빠른 수렴)

Shared convolution encoder 위에 각각 value와 advantage를 예측하는 linear layer를 쌓고, Q=V+A로부터 Q값을 구한다. 단, A의 평균을 빼서 각 상태의 mean advantage가 0이 되도록 한다.

class DuelingDQN(nn.Module):
    def __init__(self, input_shape, n_actions):
        super(DuelingDQN, self).__init__()
        
        # convolutional encoder는 shared
        self.conv = nn.Sequential(
            nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU()
        )
        
        # 동일한 conv feature를 서로 별개의 advantage stream과 value stream으로 feed
        conv_out_size = self._get_conv_out(input_shape)
        self.fc_adv = nn.Sequential(
            nn.Linear(conv_out_size, 512),
            nn.ReLU(),
            nn.Linear(512, n_actions)
        )
        self.fc_val = nn.Sequential(
            nn.Linear(conv_out_size, 512),
            nn.ReLU(),
            nn.Linear(512, 1) # scalar
        )

    def _get_conv_out(self, shape):
        o = self.conv(torch.zeros(1, *shape))
        return int(np.prod(o.size()))

    def forward(self, x):
        fx = x.float() / 256
        conv_out = self.conv(fx).view(fx.size()[0], -1)
        val = self.fc_val(conv_out)
        adv = self.fc_adv(conv_out)
        return val + adv - adv.mean() # Q=V+A

5. Noisy Networks (Parametric exploration)

Exploration을 위해 $\epsilon$을 hyperparameter로 두고 $\epsilon$-greedy 정책을 사용하는 대신, 학습 가능한 noisy linear layer를 따로 두고 fully connected layer의 결과에 더해준다.

class NoisyFactorizedLinear(nn.Linear):
    def __init__(self, in_features, out_features, sigma_zero=0.4, bias=True):
        super(NoisyFactorizedLinear, self).__init__(in_features, out_features, bias=bias)
        sigma_init = sigma_zero / math.sqrt(in_features)
        self.sigma_weight = nn.Parameter(torch.full((out_features, in_features), sigma_init))
        self.register_buffer("epsilon_input", torch.zeros(1, in_features)) 
        self.register_buffer("epsilon_output", torch.zeros(out_features, 1))
        if bias:
            self.sigma_bias = nn.Parameter(torch.full((out_features,), sigma_init))

    def forward(self, input):
        self.epsison_input.normal_() # normal distribution에서 샘플링하여 buffer를 채움
        self.epsilon_output.normal_()

        func = lambda x: torch.sign(x) * torch.sqrt(torch.abs(x))
        eps_in = func(self.epsilon_input.data)
        eps_out = func(self.epsilon_output.data)

        bias = self.bias
        if bias is not None:
            bias = bias + self.sigma_bias * eps_out.t()
            
        noise = torch.mul(eps_in, eps_out) # outer product로 random matrix를 생성
        return F.linear(input, self.weight + self.sigma_weight * noise, bias)

6. Categorical DQN (Generic Q)

Q값의 스칼라 기댓값 대신 Q값의 확률분포를 구할 수 있다. 가능한 Q value를 $\mid Q \mid$개의 bin으로 나누어 categorical distribution으로 표현한다. $p_{\theta}^i (s_t, a_t)$ 이 i번째 bin의 probability mass라 할 때, Q의 분포는 discrete latent variable인 $\mathbf{z} \in \mathbb{N}_{\mid Q \mid}$ 에 기반하여 나타난다.

Target 분포 $d_t’$ 는 target net이 계산한 평균 Q값 $Q_{\bar\theta}$ (스칼라)에 관해 선택되는 action $a^\ast$를 가지고 동일하게 Bellman equation으로 정의된다. $a^\ast$는 일반적으로 $\mathrm{argmax}_{a’} Q$ 로 사용하지만, Q의 분포를 구한 만큼 다른 selection strategy로 구할 수도 있다. Loss는 분포를 비교하는 것으로 변경되어야 한다 (KL-divergence, Wasserstein distance 등).

def distr_projection(next_distr, rewards, dones, Vmin, Vmax, n_atoms, gamma):
    # n_atoms: bin의 개수
    # dones: flag
    
    batch_size = len(rewards)
    proj_distr = np.zeros((batch_size, n_atoms), dtype=np.float32)
    delta_z = (Vmax - Vmin) / (n_atoms - 1)  # bin width
    for atom in range(n_atoms):
        # target value distribution with clipped range
        tz_j = np.minimum(Vmax, np.maximum(Vmin, rewards + (Vmin + atom * delta_z) * gamma)) # bellman update
        # projection of value to bin (shifting to right)
        b_j = (tz_j - Vmin) / delta_z # 실수 값을 가질 수 있음
        
        l = np.floor(b_j).astype(np.int64)
        u = np.ceil(b_j).astype(np.int64)
        
        # 특정 bin에 정확히 해당될 경우 (b_j가 int일 경우), 이번 bin의 source distribution value를 그냥 assign
        eq_mask = u == l
        proj_distr[eq_mask, l[eq_mask]] += next_distr[eq_mask, atom] 
        # otherwise, distribution value를 양 옆의 bin으로 분산시킴 (e.g. b_j = 31.48..일 경우, 31번째와 32번째 bin으로 분산)
        ne_mask = u != l
        proj_distr[ne_mask, l[ne_mask]] += next_distr[ne_mask, atom] * (u - b_j)[ne_mask] # (batch_size,)
        proj_distr[ne_mask, u[ne_mask]] += next_distr[ne_mask, atom] * (b_j - l)[ne_mask] 
        
    if dones.any():
        proj_distr[dones] = 0.0
        # episode가 끝났을 경우 Q는 분포로 나타나지 않고 실제 reward에 해당하는 스칼라로 나타난다.
        tz_j = np.minimum(Vmax, np.maximum(Vmin, rewards[dones]))
        b_j = (tz_j - Vmin) / delta_z # shifted reward
        l = np.floor(b_j).astype(np.int64)
        u = np.ceil(b_j).astype(np.int64)
        eq_mask = u == l
        eq_dones = dones.copy()
        eq_dones[dones] = eq_mask
        if eq_dones.any():
            proj_distr[eq_dones, l] = 1.0 # shifted reward에 해당하는 bin의 prob=1이 됨
        ne_mask = u != l
        ne_dones = dones.copy()
        ne_dones[dones] = ne_mask
        if ne_dones.any():
            proj_distr[ne_dones, l] = (u - b_j)[ne_mask]
            proj_distr[ne_dones, u] = (b_j - l)[ne_mask]
    return proj_distr
class DistributionalDQN(nn.Module):
    def __init__(self, input_shape, n_actions):
        # 여기는 non-categorical dqn과 동일
        super(DistributionalDQN, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU()
        )
        conv_out_size = self._get_conv_out(input_shape)
        
        # output size가 다름
        self.fc = nn.Sequential(
            nn.Linear(conv_out_size, 512),
            nn.ReLU(),
            nn.Linear(512, n_actions * N_ATOMS)
        )
        self.register_buffer("supports", torch.arange(Vmin, Vmax+DELTA_Z, DELTA_Z)) 
        self.softmax = nn.Softmax(dim=1)

    def _get_conv_out(self, shape):
        o = self.conv(torch.zeros(1, *shape))
        return int(np.prod(o.size()))

    def forward(self, x):
        batch_size = x.size()[0]
        fx = x.float() / 256
        conv_out = self.conv(fx).view(batch_size, -1)
        fc_out = self.fc(conv_out)
        return fc_out.view(batch_size, -1, N_ATOMS) # adjusted output size

    def both(self, x):
        # raw distribution과 평균 binned-Q값을 반환 
        cat_out = self(x)
        probs = self.apply_softmax(cat_out)
        weights = probs * self.supports
        res = weights.sum(dim=2) # distribution과 bin 값들의 weighted sum
        return cat_out, res

    def qvals(self, x):
        return self.both(x)[1]

    def apply_softmax(self, t): # obtain the best actions to take in the next state
        return self.softmax(t.view(-1, N_ATOMS)).view(t.size()) # 모든 bin에 걸쳐 normalize
def calc_loss(batch, net, tgt_net, gamma, save_prefix=None):
    states_v, actions_v, rewards, dones, next_states_v = unpack_batch(batch) # assume as tensors
    dones_mask = dones.astype(np.bool)
    batch_size = len(batch)

    # target net의 output
    next_distr_v, next_qvals_v = tgt_net.both(next_states_v)  # target net이 Q의 raw distribution, 평균 Q값 (binned)을 계산
    next_actions = next_qvals_v.max(1)[1].data.cpu().numpy() # greedy selection
    next_distr = tgt_net.apply_softmax(next_distr_v).data.cpu().numpy()
    next_best_distr = next_distr[range(batch_size), next_actions] # target distribution
    # target distribution을 Bellman update로 project
    proj_distr = distr_projection(next_best_distr, rewards, dones_mask, Vmin, Vmax, N_ATOMS, gamma)
    proj_distr_v = torch.tensor(proj_distr)

    # online net의 output
    distr_v = net(states_v)
    state_action_values = distr_v[range(batch_size), actions_v.data]
    state_log_sm_v = F.log_softmax(state_action_values, dim=1)

    # KL(d_target||d_online)
    loss_v = proj_distr_v * -state_log_sm_v 
    return loss_v.sum(dim=1).mean()
  1. 에피소드 초반 상태들의 Q값은 랜덤하게 초기화되므로 훈련 초기에는 의미가 없는 값이며, 정확하게 계산될 수 있는 것은 reward의 sum으로 표현되는 에피소드 후반 상태들의 Q값이다. 초반 상태들의 Q값은 후반 상태들의 Q값으로 bootstrap하여 계산된다. 따라서 1-step reward만 본다면, $Q(s_{T-1}, a_{T-1}) = R_T$ 가 $Q(s_{T-2}, a_{T_2}) = R_{T-1} + \gamma_{T-1} \max_{a’}Q_{\bar\theta}(S_{T-1}, a’),Q(s_{T-3}, a_{t-3})$ …를 거쳐 $Q(s_0, a_0)$ 까지 전달되어야 한다. 하지만 n-step reward를 본다면 $Q(S_{T-n}, A_{T-n}) = \sum_{k=0}^{n-1}\gamma^k R_{T-n+1+k}$ 까지도 정확하게 계산되므로 $Q(S_0, A_0)$ 까지 값이 전달되는 데 필요한 업데이트가 줄어든다. 

Leave a comment