# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
# pyre-unsafe
from itertools import zip_longest
from typing import List, Optional, Sequence, Tuple, Union
import numpy as np
import torch
from ..common.datatypes import Device, make_device
from . import utils as struct_utils
[docs]
class Pointclouds:
"""
This class provides functions for working with batches of 3d point clouds,
and converting between representations.
Within Pointclouds, there are three different representations of the data.
List
- only used for input as a starting point to convert to other representations.
Padded
- has specific batch dimension.
Packed
- no batch dimension.
- has auxiliary variables used to index into the padded representation.
Example
Input list of points = [[P_1], [P_2], ... , [P_N]]
where P_1, ... , P_N are the number of points in each cloud and N is the
number of clouds.
# SPHINX IGNORE
List | Padded | Packed
---------------------------|-------------------------|------------------------
[[P_1], ... , [P_N]] | size = (N, max(P_n), 3) | size = (sum(P_n), 3)
| |
Example for locations | |
or colors: | |
| |
P_1 = 3, P_2 = 4, P_3 = 5 | size = (3, 5, 3) | size = (12, 3)
| |
List([ | tensor([ | tensor([
[ | [ | [0.1, 0.3, 0.5],
[0.1, 0.3, 0.5], | [0.1, 0.3, 0.5], | [0.5, 0.2, 0.1],
[0.5, 0.2, 0.1], | [0.5, 0.2, 0.1], | [0.6, 0.8, 0.7],
[0.6, 0.8, 0.7] | [0.6, 0.8, 0.7], | [0.1, 0.3, 0.3],
], | [0, 0, 0], | [0.6, 0.7, 0.8],
[ | [0, 0, 0] | [0.2, 0.3, 0.4],
[0.1, 0.3, 0.3], | ], | [0.1, 0.5, 0.3],
[0.6, 0.7, 0.8], | [ | [0.7, 0.3, 0.6],
[0.2, 0.3, 0.4], | [0.1, 0.3, 0.3], | [0.2, 0.4, 0.8],
[0.1, 0.5, 0.3] | [0.6, 0.7, 0.8], | [0.9, 0.5, 0.2],
], | [0.2, 0.3, 0.4], | [0.2, 0.3, 0.4],
[ | [0.1, 0.5, 0.3], | [0.9, 0.3, 0.8],
[0.7, 0.3, 0.6], | [0, 0, 0] | ])
[0.2, 0.4, 0.8], | ], |
[0.9, 0.5, 0.2], | [ |
[0.2, 0.3, 0.4], | [0.7, 0.3, 0.6], |
[0.9, 0.3, 0.8], | [0.2, 0.4, 0.8], |
] | [0.9, 0.5, 0.2], |
]) | [0.2, 0.3, 0.4], |
| [0.9, 0.3, 0.8] |
| ] |
| ]) |
-----------------------------------------------------------------------------
Auxiliary variables for packed representation
Name | Size | Example from above
-------------------------------|---------------------|-----------------------
| |
packed_to_cloud_idx | size = (sum(P_n)) | tensor([
| | 0, 0, 0, 1, 1, 1,
| | 1, 2, 2, 2, 2, 2
| | )]
| | size = (12)
| |
cloud_to_packed_first_idx | size = (N) | tensor([0, 3, 7])
| | size = (3)
| |
num_points_per_cloud | size = (N) | tensor([3, 4, 5])
| | size = (3)
| |
padded_to_packed_idx | size = (sum(P_n)) | tensor([
| | 0, 1, 2, 5, 6, 7,
| | 8, 10, 11, 12, 13,
| | 14
| | )]
| | size = (12)
-----------------------------------------------------------------------------
# SPHINX IGNORE
"""
_INTERNAL_TENSORS = [
"_points_packed",
"_points_padded",
"_normals_packed",
"_normals_padded",
"_features_packed",
"_features_padded",
"_packed_to_cloud_idx",
"_cloud_to_packed_first_idx",
"_num_points_per_cloud",
"_padded_to_packed_idx",
"valid",
"equisized",
]
[docs]
def __init__(self, points, normals=None, features=None) -> None:
"""
Args:
points:
Can be either
- List where each element is a tensor of shape (num_points, 3)
containing the (x, y, z) coordinates of each point.
- Padded float tensor with shape (num_clouds, num_points, 3).
normals:
Can be either
- None
- List where each element is a tensor of shape (num_points, 3)
containing the normal vector for each point.
- Padded float tensor of shape (num_clouds, num_points, 3).
features:
Can be either
- None
- List where each element is a tensor of shape (num_points, C)
containing the features for the points in the cloud.
- Padded float tensor of shape (num_clouds, num_points, C).
where C is the number of channels in the features.
For example 3 for RGB color.
Refer to comments above for descriptions of List and Padded
representations.
"""
self.device = torch.device("cpu")
# Indicates whether the clouds in the list/batch have the same number
# of points.
self.equisized = False
# Boolean indicator for each cloud in the batch.
# True if cloud has non zero number of points, False otherwise.
self.valid = None
self._N = 0 # batch size (number of clouds)
self._P = 0 # (max) number of points per cloud
self._C = None # number of channels in the features
# List of Tensors of points and features.
self._points_list = None
self._normals_list = None
self._features_list = None
# Number of points per cloud.
self._num_points_per_cloud = None # N
# Packed representation.
self._points_packed = None # (sum(P_n), 3)
self._normals_packed = None # (sum(P_n), 3)
self._features_packed = None # (sum(P_n), C)
self._packed_to_cloud_idx = None # sum(P_n)
# Index of each cloud's first point in the packed points.
# Assumes packing is sequential.
self._cloud_to_packed_first_idx = None # N
# Padded representation.
self._points_padded = None # (N, max(P_n), 3)
self._normals_padded = None # (N, max(P_n), 3)
self._features_padded = None # (N, max(P_n), C)
# Index to convert points from flattened padded to packed.
self._padded_to_packed_idx = None # N * max_P
# Identify type of points.
if isinstance(points, list):
self._points_list = points
self._N = len(self._points_list)
self.valid = torch.zeros((self._N,), dtype=torch.bool, device=self.device)
if self._N > 0:
self.device = self._points_list[0].device
for p in self._points_list:
if len(p) > 0 and (p.dim() != 2 or p.shape[1] != 3):
raise ValueError("Clouds in list must be of shape Px3 or empty")
if p.device != self.device:
raise ValueError("All points must be on the same device")
num_points_per_cloud = torch.tensor(
[len(p) for p in self._points_list], device=self.device
)
self._P = int(num_points_per_cloud.max())
self.valid = torch.tensor(
[len(p) > 0 for p in self._points_list],
dtype=torch.bool,
device=self.device,
)
if len(num_points_per_cloud.unique()) == 1:
self.equisized = True
self._num_points_per_cloud = num_points_per_cloud
else:
self._num_points_per_cloud = torch.tensor([], dtype=torch.int64)
elif torch.is_tensor(points):
if points.dim() != 3 or points.shape[2] != 3:
raise ValueError("Points tensor has incorrect dimensions.")
self._points_padded = points
self._N = self._points_padded.shape[0]
self._P = self._points_padded.shape[1]
self.device = self._points_padded.device
self.valid = torch.ones((self._N,), dtype=torch.bool, device=self.device)
self._num_points_per_cloud = torch.tensor(
[self._P] * self._N, device=self.device
)
self.equisized = True
else:
raise ValueError(
"Points must be either a list or a tensor with \
shape (batch_size, P, 3) where P is the maximum number of \
points in a cloud."
)
# parse normals
normals_parsed = self._parse_auxiliary_input(normals)
self._normals_list, self._normals_padded, normals_C = normals_parsed
if normals_C is not None and normals_C != 3:
raise ValueError("Normals are expected to be 3-dimensional")
# parse features
features_parsed = self._parse_auxiliary_input(features)
self._features_list, self._features_padded, features_C = features_parsed
if features_C is not None:
self._C = features_C
def _parse_auxiliary_input(
self, aux_input
) -> Tuple[Optional[List[torch.Tensor]], Optional[torch.Tensor], Optional[int]]:
"""
Interpret the auxiliary inputs (normals, features) given to __init__.
Args:
aux_input:
Can be either
- List where each element is a tensor of shape (num_points, C)
containing the features for the points in the cloud.
- Padded float tensor of shape (num_clouds, num_points, C).
For normals, C = 3
Returns:
3-element tuple of list, padded, num_channels.
If aux_input is list, then padded is None. If aux_input is a tensor,
then list is None.
"""
if aux_input is None or self._N == 0:
return None, None, None
aux_input_C = None
if isinstance(aux_input, list):
return self._parse_auxiliary_input_list(aux_input)
if torch.is_tensor(aux_input):
if aux_input.dim() != 3:
raise ValueError("Auxiliary input tensor has incorrect dimensions.")
if self._N != aux_input.shape[0]:
raise ValueError("Points and inputs must be the same length.")
if self._P != aux_input.shape[1]:
raise ValueError(
"Inputs tensor must have the right maximum \
number of points in each cloud."
)
if aux_input.device != self.device:
raise ValueError(
"All auxiliary inputs must be on the same device as the points."
)
aux_input_C = aux_input.shape[2]
return None, aux_input, aux_input_C
else:
raise ValueError(
"Auxiliary input must be either a list or a tensor with \
shape (batch_size, P, C) where P is the maximum number of \
points in a cloud."
)
def _parse_auxiliary_input_list(
self, aux_input: list
) -> Tuple[Optional[List[torch.Tensor]], None, Optional[int]]:
"""
Interpret the auxiliary inputs (normals, features) given to __init__,
if a list.
Args:
aux_input:
- List where each element is a tensor of shape (num_points, C)
containing the features for the points in the cloud.
For normals, C = 3
Returns:
3-element tuple of list, padded=None, num_channels.
If aux_input is list, then padded is None. If aux_input is a tensor,
then list is None.
"""
aux_input_C = None
good_empty = None
needs_fixing = False
if len(aux_input) != self._N:
raise ValueError("Points and auxiliary input must be the same length.")
for p, d in zip(self._num_points_per_cloud, aux_input):
valid_but_empty = p == 0 and d is not None and d.ndim == 2
if p > 0 or valid_but_empty:
if p != d.shape[0]:
raise ValueError(
"A cloud has mismatched numbers of points and inputs"
)
if d.dim() != 2:
raise ValueError(
"A cloud auxiliary input must be of shape PxC or empty"
)
if aux_input_C is None:
aux_input_C = d.shape[1]
elif aux_input_C != d.shape[1]:
raise ValueError("The clouds must have the same number of channels")
if d.device != self.device:
raise ValueError(
"All auxiliary inputs must be on the same device as the points."
)
else:
needs_fixing = True
if aux_input_C is None:
# We found nothing useful
return None, None, None
# If we have empty but "wrong" inputs we want to store "fixed" versions.
if needs_fixing:
if good_empty is None:
good_empty = torch.zeros((0, aux_input_C), device=self.device)
aux_input_out = []
for p, d in zip(self._num_points_per_cloud, aux_input):
valid_but_empty = p == 0 and d is not None and d.ndim == 2
if p > 0 or valid_but_empty:
aux_input_out.append(d)
else:
aux_input_out.append(good_empty)
else:
aux_input_out = aux_input
return aux_input_out, None, aux_input_C
def __len__(self) -> int:
return self._N
[docs]
def __getitem__(
self,
index: Union[int, List[int], slice, torch.BoolTensor, torch.LongTensor],
) -> "Pointclouds":
"""
Args:
index: Specifying the index of the cloud to retrieve.
Can be an int, slice, list of ints or a boolean tensor.
Returns:
Pointclouds object with selected clouds. The tensors are not cloned.
"""
normals, features = None, None
normals_list = self.normals_list()
features_list = self.features_list()
if isinstance(index, int):
points = [self.points_list()[index]]
if normals_list is not None:
normals = [normals_list[index]]
if features_list is not None:
features = [features_list[index]]
elif isinstance(index, slice):
points = self.points_list()[index]
if normals_list is not None:
normals = normals_list[index]
if features_list is not None:
features = features_list[index]
elif isinstance(index, list):
points = [self.points_list()[i] for i in index]
if normals_list is not None:
normals = [normals_list[i] for i in index]
if features_list is not None:
features = [features_list[i] for i in index]
elif isinstance(index, torch.Tensor):
if index.dim() != 1 or index.dtype.is_floating_point:
raise IndexError(index)
# NOTE consider converting index to cpu for efficiency
if index.dtype == torch.bool:
# advanced indexing on a single dimension
index = index.nonzero()
index = index.squeeze(1) if index.numel() > 0 else index
index = index.tolist()
points = [self.points_list()[i] for i in index]
if normals_list is not None:
normals = [normals_list[i] for i in index]
if features_list is not None:
features = [features_list[i] for i in index]
else:
raise IndexError(index)
return self.__class__(points=points, normals=normals, features=features)
[docs]
def isempty(self) -> bool:
"""
Checks whether any cloud is valid.
Returns:
bool indicating whether there is any data.
"""
return self._N == 0 or self.valid.eq(False).all()
[docs]
def points_list(self) -> List[torch.Tensor]:
"""
Get the list representation of the points.
Returns:
list of tensors of points of shape (P_n, 3).
"""
if self._points_list is None:
assert (
self._points_padded is not None
), "points_padded is required to compute points_list."
points_list = []
for i in range(self._N):
points_list.append(
self._points_padded[i, : self.num_points_per_cloud()[i]]
)
self._points_list = points_list
return self._points_list
[docs]
def normals_list(self) -> Optional[List[torch.Tensor]]:
"""
Get the list representation of the normals,
or None if there are no normals.
Returns:
list of tensors of normals of shape (P_n, 3).
"""
if self._normals_list is None:
if self._normals_padded is None:
# No normals provided so return None
return None
self._normals_list = struct_utils.padded_to_list(
self._normals_padded, self.num_points_per_cloud().tolist()
)
return self._normals_list
[docs]
def features_list(self) -> Optional[List[torch.Tensor]]:
"""
Get the list representation of the features,
or None if there are no features.
Returns:
list of tensors of features of shape (P_n, C).
"""
if self._features_list is None:
if self._features_padded is None:
# No features provided so return None
return None
self._features_list = struct_utils.padded_to_list(
self._features_padded, self.num_points_per_cloud().tolist()
)
return self._features_list
[docs]
def points_packed(self) -> torch.Tensor:
"""
Get the packed representation of the points.
Returns:
tensor of points of shape (sum(P_n), 3).
"""
self._compute_packed()
return self._points_packed
[docs]
def normals_packed(self) -> Optional[torch.Tensor]:
"""
Get the packed representation of the normals.
Returns:
tensor of normals of shape (sum(P_n), 3),
or None if there are no normals.
"""
self._compute_packed()
return self._normals_packed
[docs]
def features_packed(self) -> Optional[torch.Tensor]:
"""
Get the packed representation of the features.
Returns:
tensor of features of shape (sum(P_n), C),
or None if there are no features
"""
self._compute_packed()
return self._features_packed
[docs]
def packed_to_cloud_idx(self):
"""
Return a 1D tensor x with length equal to the total number of points.
packed_to_cloud_idx()[i] gives the index of the cloud which contains
points_packed()[i].
Returns:
1D tensor of indices.
"""
self._compute_packed()
return self._packed_to_cloud_idx
[docs]
def cloud_to_packed_first_idx(self):
"""
Return a 1D tensor x with length equal to the number of clouds such that
the first point of the ith cloud is points_packed[x[i]].
Returns:
1D tensor of indices of first items.
"""
self._compute_packed()
return self._cloud_to_packed_first_idx
[docs]
def num_points_per_cloud(self) -> torch.Tensor:
"""
Return a 1D tensor x with length equal to the number of clouds giving
the number of points in each cloud.
Returns:
1D tensor of sizes.
"""
return self._num_points_per_cloud
[docs]
def points_padded(self) -> torch.Tensor:
"""
Get the padded representation of the points.
Returns:
tensor of points of shape (N, max(P_n), 3).
"""
self._compute_padded()
return self._points_padded
[docs]
def normals_padded(self) -> Optional[torch.Tensor]:
"""
Get the padded representation of the normals,
or None if there are no normals.
Returns:
tensor of normals of shape (N, max(P_n), 3).
"""
self._compute_padded()
return self._normals_padded
[docs]
def features_padded(self) -> Optional[torch.Tensor]:
"""
Get the padded representation of the features,
or None if there are no features.
Returns:
tensor of features of shape (N, max(P_n), 3).
"""
self._compute_padded()
return self._features_padded
[docs]
def padded_to_packed_idx(self):
"""
Return a 1D tensor x with length equal to the total number of points
such that points_packed()[i] is element x[i] of the flattened padded
representation.
The packed representation can be calculated as follows.
.. code-block:: python
p = points_padded().reshape(-1, 3)
points_packed = p[x]
Returns:
1D tensor of indices.
"""
if self._padded_to_packed_idx is not None:
return self._padded_to_packed_idx
if self._N == 0:
self._padded_to_packed_idx = []
else:
self._padded_to_packed_idx = torch.cat(
[
torch.arange(v, dtype=torch.int64, device=self.device) + i * self._P
for (i, v) in enumerate(self.num_points_per_cloud())
],
dim=0,
)
return self._padded_to_packed_idx
def _compute_padded(self, refresh: bool = False):
"""
Computes the padded version from points_list, normals_list and features_list.
Args:
refresh: whether to force the recalculation.
"""
if not (refresh or self._points_padded is None):
return
self._normals_padded, self._features_padded = None, None
if self.isempty():
self._points_padded = torch.zeros((self._N, 0, 3), device=self.device)
else:
self._points_padded = struct_utils.list_to_padded(
self.points_list(),
(self._P, 3),
pad_value=0.0,
equisized=self.equisized,
)
normals_list = self.normals_list()
if normals_list is not None:
self._normals_padded = struct_utils.list_to_padded(
normals_list,
(self._P, 3),
pad_value=0.0,
equisized=self.equisized,
)
features_list = self.features_list()
if features_list is not None:
self._features_padded = struct_utils.list_to_padded(
features_list,
(self._P, self._C),
pad_value=0.0,
equisized=self.equisized,
)
# TODO(nikhilar) Improve performance of _compute_packed.
def _compute_packed(self, refresh: bool = False):
"""
Computes the packed version from points_list, normals_list and
features_list and sets the values of auxiliary tensors.
Args:
refresh: Set to True to force recomputation of packed
representations. Default: False.
"""
if not (
refresh
or any(
v is None
for v in [
self._points_packed,
self._packed_to_cloud_idx,
self._cloud_to_packed_first_idx,
]
)
):
return
# Packed can be calculated from padded or list, so can call the
# accessor function for the lists.
points_list = self.points_list()
normals_list = self.normals_list()
features_list = self.features_list()
if self.isempty():
self._points_packed = torch.zeros(
(0, 3), dtype=torch.float32, device=self.device
)
self._packed_to_cloud_idx = torch.zeros(
(0,), dtype=torch.int64, device=self.device
)
self._cloud_to_packed_first_idx = torch.zeros(
(0,), dtype=torch.int64, device=self.device
)
self._normals_packed = None
self._features_packed = None
return
points_list_to_packed = struct_utils.list_to_packed(points_list)
self._points_packed = points_list_to_packed[0]
if not torch.allclose(self._num_points_per_cloud, points_list_to_packed[1]):
raise ValueError("Inconsistent list to packed conversion")
self._cloud_to_packed_first_idx = points_list_to_packed[2]
self._packed_to_cloud_idx = points_list_to_packed[3]
self._normals_packed, self._features_packed = None, None
if normals_list is not None:
normals_list_to_packed = struct_utils.list_to_packed(normals_list)
self._normals_packed = normals_list_to_packed[0]
if features_list is not None:
features_list_to_packed = struct_utils.list_to_packed(features_list)
self._features_packed = features_list_to_packed[0]
[docs]
def clone(self):
"""
Deep copy of Pointclouds object. All internal tensors are cloned
individually.
Returns:
new Pointclouds object.
"""
# instantiate new pointcloud with the representation which is not None
# (either list or tensor) to save compute.
new_points, new_normals, new_features = None, None, None
if self._points_list is not None:
new_points = [v.clone() for v in self.points_list()]
normals_list = self.normals_list()
features_list = self.features_list()
if normals_list is not None:
new_normals = [n.clone() for n in normals_list]
if features_list is not None:
new_features = [f.clone() for f in features_list]
elif self._points_padded is not None:
new_points = self.points_padded().clone()
normals_padded = self.normals_padded()
features_padded = self.features_padded()
if normals_padded is not None:
new_normals = self.normals_padded().clone()
if features_padded is not None:
new_features = self.features_padded().clone()
other = self.__class__(
points=new_points, normals=new_normals, features=new_features
)
for k in self._INTERNAL_TENSORS:
v = getattr(self, k)
if torch.is_tensor(v):
setattr(other, k, v.clone())
return other
[docs]
def detach(self):
"""
Detach Pointclouds object. All internal tensors are detached
individually.
Returns:
new Pointclouds object.
"""
# instantiate new pointcloud with the representation which is not None
# (either list or tensor) to save compute.
new_points, new_normals, new_features = None, None, None
if self._points_list is not None:
new_points = [v.detach() for v in self.points_list()]
normals_list = self.normals_list()
features_list = self.features_list()
if normals_list is not None:
new_normals = [n.detach() for n in normals_list]
if features_list is not None:
new_features = [f.detach() for f in features_list]
elif self._points_padded is not None:
new_points = self.points_padded().detach()
normals_padded = self.normals_padded()
features_padded = self.features_padded()
if normals_padded is not None:
new_normals = self.normals_padded().detach()
if features_padded is not None:
new_features = self.features_padded().detach()
other = self.__class__(
points=new_points, normals=new_normals, features=new_features
)
for k in self._INTERNAL_TENSORS:
v = getattr(self, k)
if torch.is_tensor(v):
setattr(other, k, v.detach())
return other
[docs]
def to(self, device: Device, copy: bool = False):
"""
Match functionality of torch.Tensor.to()
If copy = True or the self Tensor is on a different device, the
returned tensor is a copy of self with the desired torch.device.
If copy = False and the self Tensor already has the correct torch.device,
then self is returned.
Args:
device: Device (as str or torch.device) for the new tensor.
copy: Boolean indicator whether or not to clone self. Default False.
Returns:
Pointclouds object.
"""
device_ = make_device(device)
if not copy and self.device == device_:
return self
other = self.clone()
if self.device == device_:
return other
other.device = device_
if other._N > 0:
other._points_list = [v.to(device_) for v in other.points_list()]
if other._normals_list is not None:
other._normals_list = [n.to(device_) for n in other.normals_list()]
if other._features_list is not None:
other._features_list = [f.to(device_) for f in other.features_list()]
for k in self._INTERNAL_TENSORS:
v = getattr(self, k)
if torch.is_tensor(v):
setattr(other, k, v.to(device_))
return other
[docs]
def cpu(self):
return self.to("cpu")
[docs]
def cuda(self):
return self.to("cuda")
[docs]
def get_cloud(self, index: int):
"""
Get tensors for a single cloud from the list representation.
Args:
index: Integer in the range [0, N).
Returns:
points: Tensor of shape (P, 3).
normals: Tensor of shape (P, 3)
features: LongTensor of shape (P, C).
"""
if not isinstance(index, int):
raise ValueError("Cloud index must be an integer.")
if index < 0 or index > self._N:
raise ValueError(
"Cloud index must be in the range [0, N) where \
N is the number of clouds in the batch."
)
points = self.points_list()[index]
normals, features = None, None
normals_list = self.normals_list()
if normals_list is not None:
normals = normals_list[index]
features_list = self.features_list()
if features_list is not None:
features = features_list[index]
return points, normals, features
# TODO(nikhilar) Move function to a utils file.
[docs]
def split(self, split_sizes: list):
"""
Splits Pointclouds object of size N into a list of Pointclouds objects
of size len(split_sizes), where the i-th Pointclouds object is of size
split_sizes[i]. Similar to torch.split().
Args:
split_sizes: List of integer sizes of Pointclouds objects to be
returned.
Returns:
list[Pointclouds].
"""
if not all(isinstance(x, int) for x in split_sizes):
raise ValueError("Value of split_sizes must be a list of integers.")
cloudlist = []
curi = 0
for i in split_sizes:
cloudlist.append(self[curi : curi + i])
curi += i
return cloudlist
[docs]
def offset_(self, offsets_packed):
"""
Translate the point clouds by an offset. In place operation.
Args:
offsets_packed: A Tensor of shape (3,) or the same shape
as self.points_packed giving offsets to be added to
all points.
Returns:
self.
"""
points_packed = self.points_packed()
if offsets_packed.shape == (3,):
offsets_packed = offsets_packed.expand_as(points_packed)
if offsets_packed.shape != points_packed.shape:
raise ValueError("Offsets must have dimension (all_p, 3).")
self._points_packed = points_packed + offsets_packed
new_points_list = list(
self._points_packed.split(self.num_points_per_cloud().tolist(), 0)
)
# Note that since _compute_packed() has been executed, points_list
# cannot be None even if not provided during construction.
self._points_list = new_points_list
if self._points_padded is not None:
for i, points in enumerate(new_points_list):
if len(points) > 0:
self._points_padded[i, : points.shape[0], :] = points
return self
# TODO(nikhilar) Move out of place operator to a utils file.
[docs]
def offset(self, offsets_packed):
"""
Out of place offset.
Args:
offsets_packed: A Tensor of the same shape as self.points_packed
giving offsets to be added to all points.
Returns:
new Pointclouds object.
"""
new_clouds = self.clone()
return new_clouds.offset_(offsets_packed)
[docs]
def subsample(self, max_points: Union[int, Sequence[int]]) -> "Pointclouds":
"""
Subsample each cloud so that it has at most max_points points.
Args:
max_points: maximum number of points in each cloud.
Returns:
new Pointclouds object, or self if nothing to be done.
"""
if isinstance(max_points, int):
max_points = [max_points] * len(self)
elif len(max_points) != len(self):
raise ValueError("wrong number of max_points supplied")
if all(
int(n_points) <= int(max_)
for n_points, max_ in zip(self.num_points_per_cloud(), max_points)
):
return self
points_list = []
features_list = []
normals_list = []
for max_, n_points, points, features, normals in zip_longest(
map(int, max_points),
map(int, self.num_points_per_cloud()),
self.points_list(),
self.features_list() or (),
self.normals_list() or (),
):
if n_points > max_:
keep_np = np.random.choice(n_points, max_, replace=False)
keep = torch.tensor(keep_np, device=points.device, dtype=torch.int64)
points = points[keep]
if features is not None:
features = features[keep]
if normals is not None:
normals = normals[keep]
points_list.append(points)
features_list.append(features)
normals_list.append(normals)
return Pointclouds(
points=points_list,
normals=self.normals_list() and normals_list,
features=self.features_list() and features_list,
)
[docs]
def scale_(self, scale):
"""
Multiply the coordinates of this object by a scalar value.
- i.e. enlarge/dilate
In place operation.
Args:
scale: A scalar, or a Tensor of shape (N,).
Returns:
self.
"""
if not torch.is_tensor(scale):
scale = torch.full((len(self),), scale, device=self.device)
new_points_list = []
points_list = self.points_list()
for i, old_points in enumerate(points_list):
new_points_list.append(scale[i] * old_points)
self._points_list = new_points_list
if self._points_packed is not None:
self._points_packed = torch.cat(new_points_list, dim=0)
if self._points_padded is not None:
for i, points in enumerate(new_points_list):
if len(points) > 0:
self._points_padded[i, : points.shape[0], :] = points
return self
[docs]
def scale(self, scale):
"""
Out of place scale_.
Args:
scale: A scalar, or a Tensor of shape (N,).
Returns:
new Pointclouds object.
"""
new_clouds = self.clone()
return new_clouds.scale_(scale)
# TODO(nikhilar) Move function to utils file.
[docs]
def get_bounding_boxes(self):
"""
Compute an axis-aligned bounding box for each cloud.
Returns:
bboxes: Tensor of shape (N, 3, 2) where bbox[i, j] gives the
min and max values of cloud i along the jth coordinate axis.
"""
all_mins, all_maxes = [], []
for points in self.points_list():
cur_mins = points.min(dim=0)[0] # (3,)
cur_maxes = points.max(dim=0)[0] # (3,)
all_mins.append(cur_mins)
all_maxes.append(cur_maxes)
all_mins = torch.stack(all_mins, dim=0) # (N, 3)
all_maxes = torch.stack(all_maxes, dim=0) # (N, 3)
bboxes = torch.stack([all_mins, all_maxes], dim=2)
return bboxes
[docs]
def estimate_normals(
self,
neighborhood_size: int = 50,
disambiguate_directions: bool = True,
assign_to_self: bool = False,
):
"""
Estimates the normals of each point in each cloud and assigns
them to the internal tensors `self._normals_list` and `self._normals_padded`
The function uses `ops.estimate_pointcloud_local_coord_frames`
to estimate the normals. Please refer to that function for more
detailed information about the implemented algorithm.
Args:
**neighborhood_size**: The size of the neighborhood used to estimate the
geometry around each point.
**disambiguate_directions**: If `True`, uses the algorithm from [1] to
ensure sign consistency of the normals of neighboring points.
**normals**: A tensor of normals for each input point
of shape `(minibatch, num_point, 3)`.
If `pointclouds` are of `Pointclouds` class, returns a padded tensor.
**assign_to_self**: If `True`, assigns the computed normals to the
internal buffers overwriting any previously stored normals.
References:
[1] Tombari, Salti, Di Stefano: Unique Signatures of Histograms for
Local Surface Description, ECCV 2010.
"""
from .. import ops
# estimate the normals
normals_est = ops.estimate_pointcloud_normals(
self,
neighborhood_size=neighborhood_size,
disambiguate_directions=disambiguate_directions,
)
# assign to self
if assign_to_self:
_, self._normals_padded, _ = self._parse_auxiliary_input(normals_est)
self._normals_list, self._normals_packed = None, None
if self._points_list is not None:
# update self._normals_list
self.normals_list()
if self._points_packed is not None:
# update self._normals_packed
self._normals_packed = torch.cat(self._normals_list, dim=0)
return normals_est
[docs]
def extend(self, N: int):
"""
Create new Pointclouds which contains each cloud N times.
Args:
N: number of new copies of each cloud.
Returns:
new Pointclouds object.
"""
if not isinstance(N, int):
raise ValueError("N must be an integer.")
if N <= 0:
raise ValueError("N must be > 0.")
new_points_list, new_normals_list, new_features_list = [], None, None
for points in self.points_list():
new_points_list.extend(points.clone() for _ in range(N))
normals_list = self.normals_list()
if normals_list is not None:
new_normals_list = []
for normals in normals_list:
new_normals_list.extend(normals.clone() for _ in range(N))
features_list = self.features_list()
if features_list is not None:
new_features_list = []
for features in features_list:
new_features_list.extend(features.clone() for _ in range(N))
return self.__class__(
points=new_points_list, normals=new_normals_list, features=new_features_list
)
[docs]
def update_padded(
self, new_points_padded, new_normals_padded=None, new_features_padded=None
):
"""
Returns a Pointcloud structure with updated padded tensors and copies of
the auxiliary tensors. This function allows for an update of
points_padded (and normals and features) without having to explicitly
convert it to the list representation for heterogeneous batches.
Args:
new_points_padded: FloatTensor of shape (N, P, 3)
new_normals_padded: (optional) FloatTensor of shape (N, P, 3)
new_features_padded: (optional) FloatTensor of shape (N, P, C)
Returns:
Pointcloud with updated padded representations
"""
def check_shapes(x, size):
if x.shape[0] != size[0]:
raise ValueError("new values must have the same batch dimension.")
if x.shape[1] != size[1]:
raise ValueError("new values must have the same number of points.")
if size[2] is not None:
if x.shape[2] != size[2]:
raise ValueError(
"new values must have the same number of channels."
)
check_shapes(new_points_padded, [self._N, self._P, 3])
if new_normals_padded is not None:
check_shapes(new_normals_padded, [self._N, self._P, 3])
if new_features_padded is not None:
check_shapes(new_features_padded, [self._N, self._P, self._C])
new = self.__class__(
points=new_points_padded,
normals=new_normals_padded,
features=new_features_padded,
)
# overwrite the equisized flag
new.equisized = self.equisized
# copy normals
if new_normals_padded is None:
# If no normals are provided, keep old ones (shallow copy)
new._normals_list = self._normals_list
new._normals_padded = self._normals_padded
new._normals_packed = self._normals_packed
# copy features
if new_features_padded is None:
# If no features are provided, keep old ones (shallow copy)
new._features_list = self._features_list
new._features_padded = self._features_padded
new._features_packed = self._features_packed
# copy auxiliary tensors
copy_tensors = [
"_packed_to_cloud_idx",
"_cloud_to_packed_first_idx",
"_num_points_per_cloud",
"_padded_to_packed_idx",
"valid",
]
for k in copy_tensors:
v = getattr(self, k)
if torch.is_tensor(v):
setattr(new, k, v) # shallow copy
# update points
new._points_padded = new_points_padded
assert new._points_list is None
assert new._points_packed is None
# update normals and features if provided
if new_normals_padded is not None:
new._normals_padded = new_normals_padded
new._normals_list = None
new._normals_packed = None
if new_features_padded is not None:
new._features_padded = new_features_padded
new._features_list = None
new._features_packed = None
return new
[docs]
def inside_box(self, box):
"""
Finds the points inside a 3D box.
Args:
box: FloatTensor of shape (2, 3) or (N, 2, 3) where N is the number
of clouds.
box[..., 0, :] gives the min x, y & z.
box[..., 1, :] gives the max x, y & z.
Returns:
idx: BoolTensor of length sum(P_i) indicating whether the packed points are
within the input box.
"""
if box.dim() > 3 or box.dim() < 2:
raise ValueError("Input box must be of shape (2, 3) or (N, 2, 3).")
if box.dim() == 3 and box.shape[0] != 1 and box.shape[0] != self._N:
raise ValueError(
"Input box dimension is incompatible with pointcloud size."
)
if box.dim() == 2:
box = box[None]
if (box[..., 0, :] > box[..., 1, :]).any():
raise ValueError("Input box is invalid: min values larger than max values.")
points_packed = self.points_packed()
sumP = points_packed.shape[0]
if box.shape[0] == 1:
box = box.expand(sumP, 2, 3)
elif box.shape[0] == self._N:
box = box.unbind(0)
box = [
b.expand(p, 2, 3) for (b, p) in zip(box, self.num_points_per_cloud())
]
box = torch.cat(box, 0)
coord_inside = (points_packed >= box[:, 0]) * (points_packed <= box[:, 1])
return coord_inside.all(dim=-1)
[docs]
def join_pointclouds_as_batch(pointclouds: Sequence[Pointclouds]) -> Pointclouds:
"""
Merge a list of Pointclouds objects into a single batched Pointclouds
object. All pointclouds must be on the same device.
Args:
batch: List of Pointclouds objects each with batch dim [b1, b2, ..., bN]
Returns:
pointcloud: Poinclouds object with all input pointclouds collated into
a single object with batch dim = sum(b1, b2, ..., bN)
"""
if isinstance(pointclouds, Pointclouds) or not isinstance(pointclouds, Sequence):
raise ValueError("Wrong first argument to join_points_as_batch.")
device = pointclouds[0].device
if not all(p.device == device for p in pointclouds):
raise ValueError("Pointclouds must all be on the same device")
kwargs = {}
for field in ("points", "normals", "features"):
field_list = [getattr(p, field + "_list")() for p in pointclouds]
if None in field_list:
if field == "points":
raise ValueError("Pointclouds cannot have their points set to None!")
if not all(f is None for f in field_list):
raise ValueError(
f"Pointclouds in the batch have some fields '{field}'"
+ " defined and some set to None."
)
field_list = None
else:
field_list = [p for points in field_list for p in points]
if field == "features" and any(
p.shape[1] != field_list[0].shape[1] for p in field_list[1:]
):
raise ValueError("Pointclouds must have the same number of features")
kwargs[field] = field_list
return Pointclouds(**kwargs)
[docs]
def join_pointclouds_as_scene(
pointclouds: Union[Pointclouds, List[Pointclouds]],
) -> Pointclouds:
"""
Joins a batch of point cloud in the form of a Pointclouds object or a list of Pointclouds
objects as a single point cloud. If the input is a list, the Pointclouds objects in the
list must all be on the same device, and they must either all or none have features and
all or none have normals.
Args:
Pointclouds: Pointclouds object that contains a batch of point clouds, or a list of
Pointclouds objects.
Returns:
new Pointclouds object containing a single point cloud
"""
if isinstance(pointclouds, list):
pointclouds = join_pointclouds_as_batch(pointclouds)
if len(pointclouds) == 1:
return pointclouds
points = pointclouds.points_packed()
features = pointclouds.features_packed()
normals = pointclouds.normals_packed()
pointcloud = Pointclouds(
points=points[None],
features=None if features is None else features[None],
normals=None if normals is None else normals[None],
)
return pointcloud