"""Interface to the GPU NUFFT library."""
import numpy as np
import warnings
from ..base import FourierOperatorBase, with_numpy_cupy
from mrinufft._utils import proper_trajectory, get_array_module, auto_cast
from mrinufft.operators.interfaces.utils import is_cuda_array, is_host_array
GPUNUFFT_AVAILABLE = True
try:
from gpuNUFFT import NUFFTOp
except ImportError:
GPUNUFFT_AVAILABLE = False
CUPY_AVAILABLE = True
try:
import cupyx as cx
import cupy as cp
except ImportError:
CUPY_AVAILABLE = False
def _allocator(size):
"""Allocate pinned memory which is context portable."""
flags = cp.cuda.runtime.hostAllocPortable
mem = cp.cuda.PinnedMemory(size, flags=flags)
return cp.cuda.PinnedMemoryPointer(mem, offset=0)
[docs]
def make_pinned_smaps(smaps):
"""Make pinned smaps from smaps.
Parameters
----------
smaps: np.ndarray or None
the sensitivity maps
Returns
-------
np.ndarray or None
the pinned sensitivity maps
"""
if smaps is None:
return None
smaps_ = smaps.T.reshape(-1, smaps.shape[0])
cp.cuda.set_pinned_memory_allocator(_allocator)
pinned_smaps = cx.empty_pinned(smaps_.shape, dtype=np.complex64, order="F")
np.copyto(pinned_smaps, smaps_)
return pinned_smaps
[docs]
class RawGpuNUFFT:
"""GPU implementation of N-D non-uniform fast Fourier Transform class.
Attributes
----------
samples: np.ndarray
the normalized kspace location values in the Fourier domain.
shape: tuple of int
shape of the image
operator: The NUFFTOp object
to carry out operation
n_coils: int default 1
Number of coils used to acquire the signal in case of multiarray
receiver coils acquisition. If n_coils > 1, please organize data as
n_coils X data_per_coil
"""
def __init__(
self,
samples,
shape,
n_coils=1,
density_comp=None,
kernel_width=3,
sector_width=8,
osf=2,
upsampfac=None,
balance_workload=True,
smaps=None,
pinned_smaps=None,
pinned_image=None,
pinned_kspace=None,
use_gpu_direct=False,
):
"""Initialize the 'NUFFT' class.
Parameters
----------
samples: np.ndarray
the kspace sample locations in the Fourier domain,
normalized between -0.5 and 0.5
shape: tuple of int
shape of the image
n_coils: int
Number of coils used to acquire the signal in case of multiarray
receiver coils acquisition
density_comp: np.ndarray default None.
k-space weighting, density compensation, if not specified
equal weightage is given.
kernel_width: int default 3
interpolation kernel width (usually 3 to 7)
sector_width: int default 8
sector width to use
osf: int default 2
oversampling factor (usually between 1 and 2)
upsampfac: int default 2
Same as osf.
balance_workload: bool default True
whether the workloads need to be balanced
smaps: np.ndarray default None
Holds the sensitivity maps for SENSE reconstruction
pinned_smaps: np.ndarray default None
Pinned memory array for the smaps.
use_gpu_direct: bool default False
if True, direct GPU array can be passed.
In this case pinned memory is not used and this saved memory.
It will not be an error if this is False and you pass GPU array,
just that it is inefficient.
Notes
-----
pinned_smaps status (pinned or not) is not checked here, but in the C++ code.
If its not pinned, then an extra copy will be triggered.
"""
if GPUNUFFT_AVAILABLE is False:
raise ValueError(
"gpuNUFFT library is not installed, please refer to README"
)
self.n_coils = n_coils
self.shape = shape
self.samples = samples
self.use_gpu_direct = use_gpu_direct
if density_comp is None:
density_comp = np.ones(samples.shape[0])
if upsampfac is not None:
osf = upsampfac
# pinned memory stuff
self.uses_sense = True
if smaps is not None and pinned_smaps is None:
pinned_smaps = make_pinned_smaps(smaps)
warnings.warn("no pinning provided, pinning existing smaps now.")
elif smaps is not None and pinned_smaps is not None:
# Pinned memory space exists, we will overwrite it
np.copyto(pinned_smaps, smaps.T.reshape(-1, n_coils))
warnings.warn("Overwriting the pinned data.")
elif smaps is None and pinned_smaps is None:
# No smaps provided, we will not use SENSE
self.uses_sense = False
elif smaps is None and pinned_smaps is not None:
warnings.warn("Using pinned_smaps as is.")
else:
raise ValueError("Unknown case")
if not use_gpu_direct:
# We dont need pinned allocations if we are using direct GPU arrays
if pinned_image is None:
pinned_image = cx.empty_pinned(
(np.prod(shape), (1 if self.uses_sense else n_coils)),
dtype=np.complex64,
order="F",
)
if pinned_kspace is None:
pinned_kspace = cx.empty_pinned(
(n_coils, len(samples)),
dtype=np.complex64,
)
self.pinned_image = pinned_image
self.pinned_kspace = pinned_kspace
self.osf = osf
self.pinned_smaps = pinned_smaps
self.operator = NUFFTOp(
np.reshape(samples, samples.shape[::-1], order="F"),
self.shape,
self.n_coils,
self.pinned_smaps,
density_comp,
kernel_width,
sector_width,
osf,
balance_workload,
)
[docs]
def toggle_grad_traj(self):
"""Toggle the gradient mode of the operator."""
self.operator.toggle_grad_mode()
[docs]
def _reshape_image(self, image, direction="op"):
"""Reshape the image to the correct format."""
xp = get_array_module(image)
if direction == "op":
if self.uses_sense or self.n_coils == 1:
return image.reshape((-1, 1), order="F").astype(
xp.complex64, copy=False
)
return xp.asarray([c.ravel(order="F") for c in image], dtype=xp.complex64).T
else:
if self.uses_sense or self.n_coils == 1:
# Support for one additional dimension
return image.squeeze().astype(xp.complex64, copy=False).T[None]
return xp.asarray([c.T for c in image], dtype=xp.complex64).squeeze()
[docs]
def set_smaps(self, smaps):
"""Update the smaps.
Parameters
----------
smaps: np.ndarray[np.complex64])
sensittivity maps
"""
smaps_ = smaps.T.reshape(-1, smaps.shape[0])
np.copyto(self.pinned_smaps, smaps_)
[docs]
def set_pts(self, samples, density=None):
"""Update the kspace locations and density compensation.
Parameters
----------
samples: np.ndarray
the kspace locations
density: np.ndarray|str, optional
the density compensation
if not provided, no density compensation is performed.
if "recompute", the density compensation is recomputed.
Note the recompute option works only if density compensation was computed
at initialization and not provided as ndarray.
"""
if density is None:
density = np.ones(samples.shape[0])
self.operator.set_pts(
np.reshape(samples, samples.shape[::-1], order="F"),
density,
)
[docs]
def op_direct(self, image, kspace=None, interpolate_data=False):
"""Compute the masked non-Cartesian Fourier transform.
The incoming data is on GPU already and we return a GPU array.
Parameters
----------
image: np.ndarray
input array with the same shape as self.shape.
interpolate_data: bool, default False
if set to True, the image is just apodized and interpolated to
kspace locations. This is used for density estimation.
Returns
-------
cp.ndarray
Non-uniform Fourier transform of the input image.
"""
if kspace is None:
kspace = cp.empty(
(self.n_coils, len(self.samples)),
dtype=cp.complex64,
)
reshape_image = self._reshape_image(image)
self.operator.op_direct(
reshape_image.data.ptr,
kspace.data.ptr,
interpolate_data,
)
return kspace
[docs]
def op(self, image, kspace=None, interpolate_data=False):
"""Compute the masked non-Cartesian Fourier transform.
Parameters
----------
image: np.ndarray
input array with the same shape as self.shape.
interpolate_data: bool, default False
if set to True, the image is just apodized and interpolated to
kspace locations. This is used for density estimation.
Returns
-------
np.ndarray
Non-uniform Fourier transform of the input image.
"""
# Base gpuNUFFT Operator is written in CUDA and C++, we need to
# reorganize data to follow a different memory hierarchy
# TODO we need to update codes to use np.reshape for all this directly
make_copy_back = False
if kspace is None:
kspace = self.pinned_kspace
make_copy_back = True
np.copyto(self.pinned_image, self._reshape_image(image))
new_ksp = self.operator.op(
self.pinned_image,
kspace,
interpolate_data,
)
if make_copy_back:
new_ksp = np.copy(new_ksp)
return new_ksp
[docs]
def adj_op(self, coeffs, image=None, grid_data=False):
"""Compute adjoint of non-uniform Fourier transform.
Parameters
----------
coeff: np.ndarray
masked non-uniform Fourier transform data.
grid_data: bool, default False
if True, the kspace data is gridded and returned,
this is used for density compensation
Returns
-------
np.ndarray
adjoint operator of Non Uniform Fourier transform of the
input coefficients.
"""
make_copy_back = False
if image is None:
image = self.pinned_image
make_copy_back = True
np.copyto(self.pinned_kspace, coeffs)
new_image = self.operator.adj_op(self.pinned_kspace, image, grid_data)
if make_copy_back:
new_image = np.copy(new_image)
return self._reshape_image(new_image, "adjoint")
[docs]
def adj_op_direct(self, coeffs, image=None, grid_data=False):
"""Compute adjoint of non-uniform Fourier transform.
The incoming data is on GPU already and we return a GPU array.
Parameters
----------
coeff: np.ndarray
masked non-uniform Fourier transform data.
grid_data: bool, default False
if True, the kspace data is gridded and returned,
this is used for density compensation
Returns
-------
np.ndarray
adjoint operator of Non Uniform Fourier transform of the
input coefficients.
"""
C = 1 if self.uses_sense else self.n_coils
coeffs = coeffs.astype(cp.complex64, copy=False)
if image is None:
image = cp.empty(
(np.prod(self.shape), C),
dtype=cp.complex64,
order="F",
)
self.operator.adj_op_direct(coeffs.data.ptr, image.data.ptr, grid_data)
image = image.T.reshape(C, *self.shape[::-1], order="C")
return self._reshape_image(image, "adjoint")
[docs]
class MRIGpuNUFFT(FourierOperatorBase):
"""Interface for the gpuNUFFT backend.
Parameters
----------
samples: np.ndarray (Mxd)
the samples locations in the Fourier domain where M is the number
of samples and d is the dimensionnality of the output data
(2D for an image, 3D for a volume).
shape: tuple of int
shape of the image (not necessarly a square matrix).
n_coils: int default 1
Number of coils used to acquire the signal in case of multiarray
receiver coils acquisition
density: bool or np.ndarray default None
if True, the density compensation is estimated from the samples
locations. If an array is passed, it is used as the density
compensation.
squeeze_dims: bool, default True
If True, will try to remove the singleton dimension for batch and coils.
smaps: np.ndarray default None
Holds the sensitivity maps for SENSE reconstruction.
n_trans: int, default =1
This has no effect for now.
kwargs: extra keyword args
these arguments are passed to gpuNUFFT operator. This is used
only in gpuNUFFT
"""
backend = "gpunufft"
available = GPUNUFFT_AVAILABLE and CUPY_AVAILABLE
autograd_available = True
def __init__(
self,
samples,
shape,
n_coils=1,
n_batchs=1,
n_trans=1,
density=None,
smaps=None,
squeeze_dims=True,
eps=1e-3,
**kwargs,
):
if GPUNUFFT_AVAILABLE is False:
raise ValueError(
"gpuNUFFT library is not installed, "
"please refer to README"
"or use cpu for implementation"
)
self.shape = shape
self._samples = proper_trajectory(
samples.astype(np.float32, copy=False), normalize="unit"
)
self.dtype = self.samples.dtype
self.n_coils = n_coils
self.n_batchs = n_batchs
self.squeeze_dims = squeeze_dims
self.compute_density(density)
self.compute_smaps(smaps)
self.raw_op = RawGpuNUFFT(
samples=self.samples,
shape=self.shape,
n_coils=self.n_coils,
density_comp=self.density,
smaps=self.smaps,
kernel_width=kwargs.get("kernel_width", -int(np.log10(eps))),
**kwargs,
)
[docs]
@with_numpy_cupy
def op(self, data, coeffs=None):
"""Compute forward non-uniform Fourier Transform.
Parameters
----------
img: np.ndarray
input N-D array with the same shape as self.shape.
coeffs: np.ndarray, optional
output Array. Should be pinned memory for best performances.
Returns
-------
np.ndarray
Masked Fourier transform of the input image.
"""
self.check_shape(image=data, ksp=coeffs)
B, C, XYZ, K = self.n_batchs, self.n_coils, self.shape, self.n_samples
op_func = self.raw_op.op
if is_cuda_array(data):
op_func = self.raw_op.op_direct
if not self.raw_op.use_gpu_direct:
warnings.warn(
"Using direct GPU array without passing "
"`use_gpu_direct=True`, this is memory inefficient."
)
data_ = data.reshape((B, 1 if self.uses_sense else C, *XYZ))
if coeffs is not None:
coeffs.reshape((B, C, K))
result = []
for i in range(B):
if coeffs is None:
result.append(op_func(data_[i], None))
else:
op_func(data_[i], coeffs[i])
if coeffs is None:
coeffs = get_array_module(data).stack(result)
return self._safe_squeeze(coeffs)
[docs]
@with_numpy_cupy
def adj_op(self, coeffs, data=None):
"""Compute adjoint Non Uniform Fourier Transform.
Parameters
----------
coeffs: np.ndarray
masked non-uniform Fourier transform 1D data.
data: np.ndarray, optional
output image array. Should be pinned memory for best performances.
Returns
-------
np.ndarray
Inverse discrete Fourier transform of the input coefficients.
"""
self.check_shape(image=data, ksp=coeffs)
B, C, XYZ, K = self.n_batchs, self.n_coils, self.shape, self.n_samples
adj_op_func = self.raw_op.adj_op
if is_cuda_array(coeffs):
adj_op_func = self.raw_op.adj_op_direct
if not self.raw_op.use_gpu_direct:
warnings.warn(
"Using direct GPU array without passing "
"`use_gpu_direct=True`, this is memory inefficient."
)
coeffs_ = coeffs.reshape(B, C, K)
if data is not None:
data.reshape((B, 1 if self.uses_sense else C, *XYZ))
result = []
for i in range(B):
if data is None:
result.append(adj_op_func(coeffs_[i], None))
else:
adj_op_func(coeffs_[i], data[i])
if data is None:
data = get_array_module(coeffs).stack(result)
return self._safe_squeeze(data)
@property
def uses_sense(self):
"""Return True if the Fourier Operator uses the SENSE method."""
return self.raw_op.uses_sense
@FourierOperatorBase.smaps.setter
def smaps(self, new_smaps):
"""Update pinned smaps from new_smaps.
Parameters
----------
new_smaps: np.ndarray
the new sensitivity maps
"""
self._check_smaps_shape(new_smaps)
self._smaps = new_smaps
if self._smaps is not None and hasattr(self, "raw_op"):
self.raw_op.set_smaps(smaps=new_smaps)
@FourierOperatorBase.samples.setter
def samples(self, samples):
"""Set the samples for the Fourier Operator.
Parameters
----------
samples: np.ndarray
The samples for the Fourier Operator.
"""
self._samples = proper_trajectory(
samples.astype(np.float32, copy=False), normalize="unit"
)
# TODO: gpuNUFFT needs to sort the points twice in this case.
# It could help to have access to directly dorted arrays from gpuNUFFT.
self.compute_density(self._density_method)
self.raw_op.set_pts(
self._samples,
density=self.density,
)
@FourierOperatorBase.density.setter
def density(self, density):
"""Set the density for the Fourier Operator.
Parameters
----------
density: np.ndarray
The density for the Fourier Operator.
"""
self._density = density
self.raw_op.set_pts(
self._samples,
density=density,
)
[docs]
@classmethod
def pipe(
cls,
kspace_loc,
volume_shape,
num_iterations=10,
osf=2,
normalize=True,
**kwargs,
):
"""Compute the density compensation weights for a given set of kspace locations.
Parameters
----------
kspace_loc: np.ndarray
the kspace locations
volume_shape: np.ndarray
the volume shape
num_iterations: int default 10
the number of iterations for density estimation
osf: float or int
The oversampling factor the volume shape
normalize: bool
Whether to normalize the density compensation.
We normalize such that the energy of PSF = 1
"""
if GPUNUFFT_AVAILABLE is False:
raise ValueError(
"gpuNUFFT is not available, cannot " "estimate the density compensation"
)
volume_shape = (np.array(volume_shape) * osf).astype(int)
grid_op = MRIGpuNUFFT(
samples=kspace_loc,
shape=volume_shape,
osf=1,
**kwargs,
)
density_comp = grid_op.raw_op.operator.estimate_density_comp(
max_iter=num_iterations
)
if normalize:
spike = np.zeros(volume_shape)
mid_loc = tuple(v // 2 for v in volume_shape)
spike[mid_loc] = 1
psf = grid_op.adj_op(grid_op.op(spike))
density_comp /= np.linalg.norm(psf)
return density_comp.squeeze()
[docs]
def get_lipschitz_cst(self, max_iter=10, tolerance=1e-5, **kwargs):
"""Return the Lipschitz constant of the operator.
----------
max_iter: int
Number of iteration to perform to estimate the Lipschitz constant.
tolerance: float, optional default 1e-5
Tolerance for the spectral radius estimation.
kwargs:
Extra kwargs for the operator.
Returns
-------
float
Lipschitz constant of the operator.
"""
tmp_op = self.__class__(
self.samples,
self.shape,
density=self.density,
n_coils=1,
smaps=None,
squeeze_dims=True,
**kwargs,
)
return tmp_op.raw_op.operator.get_spectral_radius(
max_iter=max_iter, tolerance=tolerance
)
[docs]
def _safe_squeeze(self, arr):
"""Squeeze the first two dimensions of shape of the operator."""
if self.squeeze_dims:
try:
arr = arr.squeeze(axis=1)
except ValueError:
pass
try:
arr = arr.squeeze(axis=0)
except ValueError:
pass
return arr
[docs]
@with_numpy_cupy
def data_consistency(self, image_data, obs_data):
"""Compute the data consistency estimation directly on gpu.
This mixes the op and adj_op method to perform F_adj(F(x-y))
on a per coil basis. By doing the computation coil wise,
it uses less memory than the naive call to adj_op(op(x)-y)
Parameters
----------
image: array
Image on which the gradient operation will be evaluated.
N_coil x Image shape is not using sense.
obs_data: array
Observed data.
"""
obs_data = auto_cast(obs_data, self.cpx_dtype)
image_data = auto_cast(image_data, self.cpx_dtype)
B, C = self.n_batchs, self.n_coils
self.check_shape(image=image_data, ksp=obs_data)
# dispatch
if is_host_array(image_data) and is_host_array(obs_data):
grad_func = self._dc_host
elif is_cuda_array(image_data) and is_cuda_array(obs_data):
if B > 1 or (C > 1 and not self.uses_sense):
warnings.warn(
"Having all the batches / coils on GPU could be faster, "
"but is memory inefficient!"
)
grad_func = super().data_consistency
if not self.raw_op.use_gpu_direct:
warnings.warn(
"Using direct GPU array without passing "
"`use_gpu_direct=True`, this is memory inefficient."
)
ret = grad_func(image_data, obs_data)
return self._safe_squeeze(ret)
def _dc_host(self, image_data, obs_data):
B, C, XYZ, K = self.n_batchs, self.n_coils, self.shape, self.n_samples
image_data_ = image_data.reshape((B, 1 if self.uses_sense else C, *XYZ))
obs_data_ = obs_data.reshape((B, C, K))
obs_data_tmp = cp.zeros((C, K), dtype=self.cpx_dtype)
tmp_img = cp.zeros((1 if self.uses_sense else C, *XYZ), dtype=np.complex64)
final_img = np.zeros_like(image_data_)
for i in range(B):
tmp_img.set(image_data_[i])
obs_data_tmp.set(obs_data_[i])
ksp_tmp = self.raw_op.op_direct(tmp_img)
ksp_tmp -= obs_data_tmp
final_img[i] = self.raw_op.adj_op_direct(ksp_tmp).get()
return final_img
# TODO : For data consistency the workflow is currently:
# op coil 1 / .../ op coil N / data_consistency / adj_op coil 1 / adj_op coil n
#
# By modifying c++ code and exposing it it should be possible to do
# op coil 1 / data_consistency 1 / adj_op coil 1 / ... / op_coil N /
# data_consistency N / adj_op coil n
#
# This should bring some performance improvements, due to the asynchronous stuff.
[docs]
def toggle_grad_traj(self):
"""Toggle the gradient trajectory of the operator."""
if self.uses_sense:
self.smaps = self.smaps.conj()
self.raw_op.toggle_grad_traj()