alf.experience_replayers#

alf.experience_replayers.replay_buffer#

Replay buffer.

class BatchInfo(env_ids, positions, importance_weights, replay_buffer, discounted_return)#

Bases: tuple

Create new instance of BatchInfo(env_ids, positions, importance_weights, replay_buffer, discounted_return)

discounted_return#

Alias for field number 4

env_ids#

Alias for field number 0

importance_weights#

Alias for field number 2

positions#

Alias for field number 1

replay_buffer#

Alias for field number 3

class ReplayBuffer(data_spec, num_environments, max_length=1024, num_earliest_frames_ignored=0, prioritized_sampling=False, initial_priority=1.0, recent_data_steps=1, recent_data_ratio=0.0, with_replacement=False, device='cpu', allow_multiprocess=False, keep_episodic_info=None, record_episodic_return=False, default_return=- 1000.0, gamma=0.99, reward_clip=None, enable_checkpoint=False, name='ReplayBuffer')[source]#

Bases: alf.utils.data_buffer.RingBuffer

Replay buffer with RingBuffer as implementation.

Terminology: consistent with RingBuffer, we use pos to refer to the always increasing position of an element in the infinitly long buffer, and idx as the actual index of the element in the underlying store (_buffer). That means idx == pos % _max_length is always true, and one should use _buffer[idx] to retrieve the stored data.

Parameters
  • data_spec (alf.TensorSpec) – spec of an entry nest in the buffer.

  • num_environments (int) – total number of parallel environments stored in the buffer.

  • max_length (int) – maximum number of time steps stored in buffer.

  • num_earliest_frames_ignored (int) – ignore the earlist so many frames from the buffer when sampling. This is typically required when FrameStacker is used. keep_episodic_info will be set to True if num_earliest_frames_ignored > 0 as FrameStacker need episode information.

  • prioritized_sampling (bool) – Use prioritized sampling if this is True.

  • initial_priority (float) – initial priority used for new experiences. The actual initial priority used for new experience is the maximum of this value and the current maximum priority of all experiences.

  • recent_data_steps (int) – the most recent so many steps of data is considered as recent data for get_batch(). Note that this quantity is per environment.

  • recent_data_ratio (float) – recent_data_ratio * batch_size samples in the batch are sampled from recent data for get_batch().

  • with_replacement (bool) – If False, sample without replacement whenever poissible for get_batch(). If True, a batch may contains duplicated samples.

  • device (string) – “cpu” or “cuda” where tensors are created.

  • allow_multiprocess (bool) – whether multiprocessing is supported.

  • keep_episodic_info (bool) – index episode start and ending positions. If None, its value will be set to True if num_earliest_frames_ignored>0

  • record_episodic_return (bool) –

    If True, computes and stores episodic return for every step in the episode upon episode completion. The field discounted_return stores the information. When episodes are incomplete, all steps get the default_return. NOTE: 1) Reward transformations like RewardClipping.minmax=(-1, 1) set in

    TrainerConfig.data_transformer_ctor have to be set manually for the ReplayBuffer to be consistent: ReplayBuffer.reward_clip=(-1,1).

    1. Discount gamma needs to be set consistent with TDLoss.gamma.

    2. Assumes keep_episodic_info to be True.

  • default_return (float) – The default values of discounted_return when the episode has not ended. For value target lower bounding, default_return should not be bigger than the smallest possible discounted return. It can be 0 if reward is always non-negative.

  • gamma (float) – The value of discount used to compute discounted_return. Usually consistent with TDLoss.gamma.

  • reward_clip (tuple|None) – None or (min, max) for reward clipping.

  • enable_checkpoint (bool) – whether checkpointing this replay buffer.

  • name (string) – name of the replay buffer object.

ONE_MINUS = 0.9999999#
add_batch(batch, env_ids=None, blocking=False)[source]#

Add a batch of entries to buffer updating indices as needed.

We build an index of episode beginning indices for each element in the buffer. The beginning point stores where episode end is.

Parameters
  • batch (Tensor) – of shape [batch_size] + tensor_spec.shape

  • env_ids (Tensor) – If None, batch_size must be num_environments. If not None, its shape should be [batch_size]. We assume there are no duplicate ids in env_id. batch[i] is generated by environment env_ids[i].

  • blocking (bool) – If True, blocks if there is no free slot to add data. If False, enqueue can overwrite oldest data.

dequeue(env_ids=None)[source]#

Return earliest n steps and mark them removed in the buffer.

Parameters
  • env_ids (Tensor) – If None, batch_size must be num_environments. If not None, dequeue from these environments. We assume there is no duplicate ids in env_id. result[i] will be from environment env_ids[i].

  • n (int) – Number of steps to dequeue.

  • blocking (bool) – If True, blocks if there is not enough data to dequeue.

Returns

nested Tensors or None when blocking dequeue gets terminated by stop event. The shape of the Tensors is [batch_size, n, ...].

Raises
  • AssertionError – when not enough data is present, in non-blocking

  • mode.

gather_all(ignore_earliest_frames=False, convert_to_default_device=True)[source]#

Returns all the items in the buffer.

Parameters
  • ignore_earliest_frames (bool) – if set to True, gather_all() will respect num_earliest_frames_ignored and for each environment it will return the trajectory with the first num_earliest_frames_ignored experiences removed.

  • convert_to_default_device (bool) – if set to True, the gathered experiences will be converted to the default alf device before returning.

Returns

  • nested Tensors of shape [B, T, …], where B=num_environments, T=current_size

  • BatchInfo: Information about the batch. Its shapes are [B], where B=num_environments

    • env_ids: [0, 1, 2, …, num_envs - 1]

    • positions: starting position in the replay buffer for each of the sequences. For gather_all() all starting positions will be the same.

Return type

tuple

Raises
  • AssertionError – if the current_size is not same for all the

  • environments.

get_batch(batch_size, batch_length)[source]#

Randomly get batch_size trajectories from the buffer.

Note: The environments where the samples are from are ordered in the

returned batch.

Parameters
  • batch_size (int) – get so many trajectories

  • batch_length (int) – the length of each trajectory

Returns

  • nested Tensors: The samples. Its shapes are [batch_size, batch_length, …]

  • BatchInfo: Information about the batch. Its shapes are [batch_size].
    • env_ids: environment id for each sequence

    • positions: starting position in the replay buffer for each sequence.

    • importance_weights: priority divided by the average of all

      non-zero priorities in the buffer.

Return type

tuple

get_disc_0_begin_position(pos, env_ids)[source]#

Note that the discount 0 step may no longer be in the replay buffer.

get_episode_begin_position(pos, env_ids)[source]#

Note that the episode begin may not still be in the replay buffer.

get_field(field_name, env_ids, positions)[source]#

Get stored data of field from the replay buffer by env_ids and positions.

Parameters
  • field_name (str | nest of str) – indicate the path to the field with ‘.’ separating the field name at different level

  • env_ids (Tensor) – 1-D int64 Tensor.

  • positions (Tensor) – 1-D int64 Tensor with same shape as env_ids. These positions should be obtained from the BatchInfo returned by get_batch().

Returns

with the same shape as broadcasted shape of env_ids and positions

Return type

Tensor

property initial_priority#

The initial priority used for newly added experiences.

We use a large value for initial priority so that a new experience can be used for training sooner. We make it at least 1.0 so that it can never be very small.

steps_to_episode_end(pos, env_ids)[source]#

Get the distance to the closest episode end in future.

Parameters
  • pos (tensor) – shape L, positions of the current timesteps in the replay buffer.

  • env_ids (tensor) – shape L

Returns

tensor of shape L.

property total_size#

Total size from all environments.

training: bool#
update_priority(env_ids, positions, priorities)[source]#

Update the priorities for the given experiences.

Parameters
  • env_ids (Tensor) – 1-D int64 Tensor.

  • positions (Tensor) – 1-D int64 Tensor with same shape as env_ids. These positions should be obtained from the BatchInfo returned by get_batch().

  • priorities (Tensor) – 1-D float Tensor with same shape as env_ids. The elements are the new priorities corresponds to experiences indicated by (env_ids, positions)

alf.experience_replayers.segment_tree#

SegmentTree.

class MaxSegmentTree(capacity, dtype=torch.float32, device='cpu', name='MaxSegmentTree')[source]#

Bases: alf.experience_replayers.segment_tree.SegmentTree

SegmentTree with max operation.

Initializes internal Module state, shared by both nn.Module and ScriptModule.

training: bool#
class MinSegmentTree(capacity, dtype=torch.float32, device='cpu', name='MinSegmentTree')[source]#

Bases: alf.experience_replayers.segment_tree.SegmentTree

SegmentTree with min operation.

Initializes internal Module state, shared by both nn.Module and ScriptModule.

training: bool#
class SegmentTree(capacity, op, dtype=torch.float32, device='cpu', name='SegmentTree')[source]#

Bases: torch.nn.modules.module.Module

Data structure to allow efficient calculation of the summary statistics over a segment of elements. See https://en.wikipedia.org/wiki/Segment_tree for detail.

In this implementation, values[1] is the root. values[capacity: 2*capacity] are the leaves. The two children of an internal node values[i] are values[2*i] and values[2*i+1]. And values[i] is set to op(values[2*i], values[2*i+1]). Each leaf represent a value set through __setitem__. All the nodes of tree are initialized to be zeros.

Initializes internal Module state, shared by both nn.Module and ScriptModule.

summary()[source]#

The summary of the tree.

If op is torch.add, it’s the sum of all values. If op is torch.min, it’s the min of all values. If op is torch.max, it’s the max of all values.

Returns

a scalar

training: bool#
class SumSegmentTree(capacity, dtype=torch.float32, device='cpu', name='SumSegmentTree')[source]#

Bases: alf.experience_replayers.segment_tree.SegmentTree

SegmentTree with sum operation.

Initializes internal Module state, shared by both nn.Module and ScriptModule.

find_sum_bound(thresholds)[source]#

The result is an int64 Tensor with the same shape as thresholds. result[i] is the minimum idx such that

thresholds[i] < values[0] + … + values[idx]

values[result[i]] will never be 0.

Parameters

thresholds (Tensor) – 1-D Tensor. All the elements in thresholds should be smaller than self.summary()

Returns

1-D int64 Tensor with the same shape as thresholds.

Note that if thresholds[i] == root, result[i] will be the index of the non-zero value with the largest index.

Return type

Tensor

Raises

ValueError – If one or more of thresholds is greather than summary().

property nnz#

The number of non-zeros.

training: bool#