Source code for alf.tensor_specs

# Copyright (c) 2020 Horizon Robotics and ALF Contributors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""TensorSpec with PyTorch types; adapted from Tensorflow's tensor_spec.py:

https://github.com/tensorflow/tensorflow/blob/r1.8/tensorflow/python/framework/tensor_spec.py
"""
from __future__ import annotations
from typing import Optional, Union, Tuple, Dict, List

import numpy as np

import torch

import alf


[docs]def torch_dtype_to_str(dtype): assert isinstance(dtype, torch.dtype) return dtype.__str__()[6:]
[docs]@alf.configurable class TensorSpec(object): """Describes a torch.Tensor. A TensorSpec allows an API to describe the Tensors that it accepts or returns, before that Tensor exists. This allows dynamic and flexible graph construction and configuration. """ __slots__ = ["_shape", "_dtype"] def __init__(self, shape, dtype=torch.float32): """ Args: shape (tuple[int]): The shape of the tensor. dtype (str or torch.dtype): The type of the tensor values, e.g., "int32" or torch.int32 """ self._shape = tuple(shape) if isinstance(dtype, str): self._dtype = getattr(torch, dtype) else: assert isinstance(dtype, torch.dtype) self._dtype = dtype
[docs] @classmethod def from_spec(cls, spec): assert isinstance(spec, TensorSpec) return cls(spec.shape, spec.dtype)
[docs] @classmethod def from_tensor(cls, tensor, from_dim=0): """Create TensorSpec from tensor. Args: tensor (Tensor): tensor from which the spec is extracted from_dim (int): use tensor.shape[from_dim:] as shape Returns: TensorSpec """ assert isinstance(tensor, torch.Tensor) return TensorSpec(tensor.shape[from_dim:], tensor.dtype)
[docs] @classmethod def from_array(cls, array, from_dim=0): """Create TensorSpec from numpy array. Args: array (np.ndarray|np.number): array from which the spec is extracted from_dim (int): use ``array.shape[from_dim:]`` as shape Returns: TensorSpec """ assert isinstance(array, (np.ndarray, np.number)) return TensorSpec(array.shape[from_dim:], str(array.dtype))
[docs] def replace(self, shape: Union[None, tuple, torch.Size] = None, dtype: Optional[torch.dtype] = None) -> TensorSpec: """Create a new TensorSpec with part of the properties replaced. For example, if we have a TensorSpec like .. code-block:: python spec = TensorSpec((3, 5), torch.int32) You can explicitly create a similar spec with a different dtype by .. code-block:: python new_spec = spec.replace(dtype=torch.float32) """ new_shape = shape or self.shape new_dtype = dtype or self.dtype return TensorSpec(shape=new_shape, dtype=new_dtype)
[docs] @classmethod def is_bounded(cls): del cls return False
@property def shape(self): """Returns the `TensorShape` that represents the shape of the tensor.""" return self._shape @property def numel(self): """Returns the number of elements.""" return int(np.prod(self._shape)) @property def ndim(self): """Return the rank of the tensor.""" return len(self._shape) @property def dtype(self): """Returns the `dtype` of elements in the tensor.""" return self._dtype @property def dtype_str(self): """The str representation of dtype It can be used to contruct a numpy array. """ return torch_dtype_to_str(self._dtype) @property def is_discrete(self): """Whether spec is discrete.""" return not self.dtype.is_floating_point @property def is_continuous(self): """Whether spec is continuous.""" return self.dtype.is_floating_point def __repr__(self): return "TensorSpec(shape={}, dtype={})".format(self.shape, repr(self.dtype)) def __eq__(self, other): if type(self) != type(other): return False return self.shape == other.shape and self.dtype == other.dtype def __ne__(self, other): return not self == other def __reduce__(self): return TensorSpec, (self._shape, self._dtype) def _calc_shape(self, outer_dims): shape = self._shape if outer_dims is not None: shape = tuple(outer_dims) + shape return shape
[docs] def constant(self, value, outer_dims=None): """Create a constant tensor from the spec. Args: value : a scalar outer_dims (tuple[int]): an optional list of integers specifying outer dimensions to add to the spec shape before sampling. Returns: tensor (torch.Tensor): a tensor of ``self._dtype``. """ return self.ones(outer_dims) * value
[docs] def zeros(self, outer_dims=None): """Create a zero tensor from the spec. Args: outer_dims (tuple[int]): an optional list of integers specifying outer dimensions to add to the spec shape before sampling. Returns: tensor (torch.Tensor): a tensor of ``self._dtype``. """ return torch.zeros(self._calc_shape(outer_dims), dtype=self._dtype)
[docs] def numpy_constant(self, value, outer_dims=None): """Create a constant np.ndarray from the spec. Args: value (Number) : a scalar outer_dims (tuple[int]): an optional list of integers specifying outer dimensions to add to the spec shape before sampling. Returns: np.ndarray: an array of ``self._dtype``. """ shape = self._shape if outer_dims is not None: shape = tuple(outer_dims) + shape return np.full(shape, value, dtype=self.dtype_str)
[docs] def numpy_zeros(self, outer_dims=None): """Create a zero numpy.ndarray from the spec. Args: outer_dims (tuple[int]): an optional list of integers specifying outer dimensions to add to the spec shape before sampling. Returns: np.ndarray: an array of ``self._dtype``. """ return self.numpy_constant(0, outer_dims)
[docs] def ones(self, outer_dims=None): """Create an all-one tensor from the spec. Args: outer_dims (tuple[int]): an optional list of integers specifying outer dimensions to add to the spec shape before sampling. Returns: tensor (torch.Tensor): a tensor of ``self._dtype``. """ return torch.ones(self._calc_shape(outer_dims), dtype=self._dtype)
[docs] def randn(self, outer_dims=None): """Create a tensor filled with random numbers from a std normal dist. Args: outer_dims (tuple[int]): an optional list of integers specifying outer dimensions to add to the spec shape before sampling. Returns: tensor (torch.Tensor): a tensor of ``self._dtype``. """ shape = self._shape if outer_dims is not None: shape = tuple(outer_dims) + shape return torch.randn(*shape, dtype=self._dtype)
[docs] def rand(self, outer_dims: Tuple[int] = None): """Create a tensor filled with random numbers in :math:`[0,1]`. Args: outer_dims: an optional list of integers specifying outer dimensions to add to the spec shape before sampling. Returns: torch.Tensor: a tensor of ``self._dtype``. """ shape = self._shape if outer_dims is not None: shape = tuple(outer_dims) + shape return torch.rand(*shape, dtype=self._dtype)
[docs]@alf.configurable class BoundedTensorSpec(TensorSpec): """A `TensorSpec` that specifies minimum and maximum values. Example usage: .. code-block:: python spec = BoundedTensorSpec((1, 2, 3), torch.float32, 0, (5, 5, 5)) torch_minimum = torch.as_tensor(spec.minimum, dtype=spec.dtype) torch_maximum = torch.as_tensor(spec.maximum, dtype=spec.dtype) Bounds are meant to be inclusive. This is especially important for integer types. The following spec will be satisfied by tensors with values in the set {0, 1, 2}: .. code-block:: python spec = BoundedTensorSpec((3, 5), torch.int32, 0, 2) """ __slots__ = ("_minimum", "_maximum") def __init__(self, shape, dtype=torch.float32, minimum=0, maximum=1): """ Args: shape (tuple[int]): The shape of the tensor. dtype (str or torch.dtype): The type of the tensor values, e.g., "int32" or torch.int32 minimum: numpy number or sequence specifying the minimum element bounds (inclusive). Must be broadcastable to `shape`. maximum: numpy number or sequence specifying the maximum element bounds (inclusive). Must be broadcastable to `shape`. """ super(BoundedTensorSpec, self).__init__(shape, dtype) try: min_max = np.broadcast(minimum, maximum, np.zeros(self.shape)) for m, M, _ in min_max: assert m <= M, "Min {} is greater than Max {}".format(m, M) except ValueError as exception: raise ValueError( "minimum or maximum is not compatible with shape. " "Message: {!r}.".format(exception)) self._minimum = np.array( minimum, dtype=torch_dtype_to_str(self._dtype)) self._minimum.setflags(write=False) self._maximum = np.array( maximum, dtype=torch_dtype_to_str(self._dtype)) self._maximum.setflags(write=False)
[docs] def replace(self, shape: Union[None, tuple, torch.Size] = None, dtype: Optional[torch.dtype] = None, minimum: Union[None, float, np.ndarray] = None, maximum: Union[None, float, np.ndarray] = None ) -> BoundedTensorSpec: """Create a new BoundedTensorSpec with part of the properties replaced. For example, if we have a BoundedTensorSpec like .. code-block:: python spec = BoundedTensorSpec((3, 5), torch.int32, 0, 2) You can explicitly create a similar spec with a different shape and minimum by .. code-block:: python new_spec = spec.replace(shape=(4, 8), minimum=-1) """ new_shape = shape or self.shape new_dtype = dtype or self.dtype new_minimum = minimum if minimum is not None else self.minimum new_maximum = maximum if maximum is not None else self.maximum return BoundedTensorSpec( shape=new_shape, dtype=new_dtype, minimum=new_minimum, maximum=new_maximum)
[docs] @classmethod def is_bounded(cls): del cls return True
[docs] @classmethod def from_spec(cls, spec): assert isinstance(spec, BoundedTensorSpec) minimum = getattr(spec, "minimum") maximum = getattr(spec, "maximum") return BoundedTensorSpec(spec.shape, spec.dtype, minimum, maximum)
@property def minimum(self): """Returns a NumPy array specifying the minimum bounds (inclusive).""" return self._minimum @property def maximum(self): """Returns a NumPy array specifying the maximum bounds (inclusive).""" return self._maximum def __repr__(self): s = "BoundedTensorSpec(shape={}, dtype={}, minimum={}, maximum={})" return s.format(self.shape, repr(self.dtype), repr(self.minimum), repr(self.maximum)) def __eq__(self, other): tensor_spec_eq = super(BoundedTensorSpec, self).__eq__(other) return (tensor_spec_eq and np.allclose(self.minimum, other.minimum) and np.allclose(self.maximum, other.maximum)) def __reduce__(self): return BoundedTensorSpec, (self._shape, self._dtype, self._minimum, self._maximum)
[docs] def sample(self, outer_dims=None): """Sample uniformly given the min/max bounds. Args: outer_dims (list[int]): an optional list of integers specifying outer dimensions to add to the spec shape before sampling. Returns: tensor (torch.Tensor): a tensor of `self._dtype` """ shape = self._shape if outer_dims is not None: shape = tuple(outer_dims) + shape if self.is_continuous: uniform = torch.rand(shape, dtype=self._dtype) return ((1 - uniform) * torch.tensor(self._minimum) + torch.tensor(self._maximum) * uniform) else: # torch.randint cannot have multi-dim lows and highs; currently only # support a scalar minimum and maximum assert (np.shape(self._minimum) == () and np.shape(self._maximum) == ()) return torch.randint( low=self._minimum.item(), high=self._maximum.item() + 1, size=shape, dtype=self._dtype)
[docs] def numpy_sample(self, outer_dims=None, rng=np.random): """Sample numpy arrays uniformly given the min/max bounds. Args: outer_dims (list[int]): an optional list of integers specifying outer dimensions to add to the spec shape before sampling. rng (numpy.random.RandomState): random number generator Returns: np.ndarray: an array of ``self._dtype`` """ shape = self._shape if outer_dims is not None: shape = tuple(outer_dims) + shape if self.is_continuous: uniform = rng.rand(*shape).astype(self.dtype_str) return (1 - uniform) * self._minimum + self._maximum * uniform else: return rng.randint( low=self._minimum, high=self._maximum + 1, size=shape, dtype=self.dtype_str)
# yapf: disable NestedTensorSpec = Union[ TensorSpec, List['NestedTensorSpec'], # An empty tuple is also considered a NestedTensorSpec Tuple[()], # Though Tuple['NestedTensorSpec', ...] is not the tightest specification, it is # here to cover the case of "(named) tuple of NestedTensorSpec". Tuple['NestedTensorSpec', ...], Dict[str, 'NestedTensorSpec'] ] NestedBoundedTensorSpec = Union[ BoundedTensorSpec, List['NestedBoundedTensorSpec'], # An empty tuple is also considered a NestedBoundedTensorSpec Tuple[()], # Though Tuple['NestedBoundedTensorSpec', ...] is not the tightest specification, # it is here to cover the case of "(named) tuple of NestedBoundedTensorSpec". Tuple['NestedBoundedTensorSpec', ...], Dict[str, 'NestedBoundedTensorSpec'] ] # yapf: enable