"""Provides Operator for MR Image processing on GPU."""
import warnings
import numpy as np
from mrinufft.operators.base import FourierOperatorBase, with_numpy_cupy
from mrinufft._utils import (
proper_trajectory,
get_array_module,
auto_cast,
power_method,
)
from .utils import (
CUPY_AVAILABLE,
is_cuda_array,
is_host_array,
nvtx_mark,
pin_memory,
sizeof_fmt,
)
CUFINUFFT_AVAILABLE = CUPY_AVAILABLE
try:
import cupy as cp
from cufinufft import Plan
except ImportError:
CUFINUFFT_AVAILABLE = False
OPTS_FIELD_DECODE = {
"gpu_method": {1: "nonuniform pts driven", 2: "shared memory"},
"gpu_sort": {0: "no sort (GM)", 1: "sort (GM-sort)"},
"kerevalmeth": {0: "direct eval exp(sqrt())", 1: "Horner ppval"},
"gpu_spreadinterponly": {
0: "NUFFT",
1: "spread or interpolate only",
},
}
DTYPE_R2C = {"float32": "complex64", "float64": "complex128"}
def _error_check(ier, msg):
if ier != 0:
raise RuntimeError(msg)
[docs]
class RawCufinufftPlan:
"""Light wrapper around the guru interface of finufft."""
def __init__(
self,
samples,
shape,
n_trans=1,
eps=1e-6,
**kwargs,
):
self.shape = shape
self.ndim = len(shape)
self.eps = float(eps)
self.n_trans = n_trans
self._dtype = samples.dtype
# the first element is dummy to index type 1 with 1
# and type 2 with 2.
self.plans = [None, None, None]
self.grad_plan = None
for i in [1, 2]:
self._make_plan(i, **kwargs)
self._set_pts(i, samples)
@property
def dtype(self):
"""Return the dtype (precision) of the transform."""
try:
return self.plans[1].dtype
except AttributeError:
return DTYPE_R2C[str(self._dtype)]
def _make_plan(self, typ, **kwargs):
self.plans[typ] = Plan(
typ,
self.shape,
self.n_trans,
self.eps,
dtype=DTYPE_R2C[str(self._dtype)],
**kwargs,
)
def _set_pts(self, typ, samples):
plan = self.grad_plan if typ == "grad" else self.plans[typ]
plan.setpts(
cp.array(samples[:, 0], copy=False),
cp.array(samples[:, 1], copy=False),
cp.array(samples[:, 2], copy=False) if self.ndim == 3 else None,
)
def _destroy_plan(self, typ):
if self.plans[typ] is not None:
p = self.plans[typ]
del p
self.plans[typ] = None
def _destroy_plan_grad(self):
if self.grad_plan is not None:
p = self.grad_plan
del p
self.grad_plan = None
[docs]
def type1(self, coeff_data, grid_data):
"""Type 1 transform. Non Uniform to Uniform."""
return self.plans[1].execute(coeff_data, grid_data)
[docs]
def type2(self, grid_data, coeff_data):
"""Type 2 transform. Uniform to non-uniform."""
return self.plans[2].execute(grid_data, coeff_data)
[docs]
def toggle_grad_traj(self):
"""Toggle between the gradient trajectory and the plan for type 1 transform."""
self.plans[2], self.grad_plan = self.grad_plan, self.plans[2]
[docs]
class MRICufiNUFFT(FourierOperatorBase):
"""MRI Transform operator, build around cufinufft.
This operator adds density estimation and compensation (preconditioning)
and multicoil support.
Parameters
----------
samples: np.ndarray or GPUArray.
The samples location of shape ``Nsamples x N_dimensions``.
shape: tuple
Shape of the image space.
n_coils: int
Number of coils.
n_batchs: int
Size of the batch dimension.
density: bool or array
Density compensation support.
- If array, use this for density compensation
- If True, the density compensation will be automatically estimated,
using the fixed point method.
- If False, density compensation will not be used.
smaps: np.ndarray or GPUArray , optional
- If None: no Smaps wil be used.
- If np.ndarray: Smaps will be copied on the device,
according to `smaps_cached`.
- If GPUArray, the smaps are already cached.
smaps_cached: bool, default False
- If False the smaps are copied on device and free at each iterations.
- If True, the smaps are copied on device and stay on it.
squeeze_dims: bool, default False
If True, will try to remove the singleton dimension for batch and coils.
n_trans: int, default 1
Number of transform to perform in parallel by cufinufft.
kwargs :
Extra kwargs for the raw cufinufft operator
Notes
-----
Cufinufft is able to run multiple transform in parallel, this is controlled
by the n_trans parameter. The data provided should be of shape, (n_batch,
n_coils, img_shape) for op (type2) and (n_batch, n_coils, n_samples) for
adjoint (type1). and in contiguous memory order.
For now only single precision (float32 and complex64) is supported
See Also
--------
cufinufft.raw_operator.RawCufinufft
"""
backend = "cufinufft"
available = CUFINUFFT_AVAILABLE and CUPY_AVAILABLE
autograd_available = True
def __init__(
self,
samples,
shape,
density=False,
n_coils=1,
n_batchs=1,
smaps=None,
smaps_cached=False,
verbose=False,
squeeze_dims=False,
n_trans=1,
**kwargs,
):
# run the availaility check here to get detailled output.
if not CUPY_AVAILABLE:
raise RuntimeError("cupy is not installed")
if not CUFINUFFT_AVAILABLE:
raise RuntimeError("Failed to found cufinufft binary.")
super().__init__()
if (n_batchs * n_coils) % n_trans != 0:
raise ValueError("n_batchs * n_coils should be a multiple of n_transf")
self.shape = shape
self.n_batchs = n_batchs
self.n_trans = n_trans
self.squeeze_dims = squeeze_dims
self.n_coils = n_coils
self.autograd_available = True
# For now only single precision is supported
self._samples = np.asfortranarray(
proper_trajectory(samples, normalize="pi").astype(np.float32, copy=False)
)
self.dtype = self.samples.dtype
# density compensation support
if is_cuda_array(density):
self.density = density
else:
self.compute_density(density)
if is_host_array(self.density):
self.density = cp.array(self.density)
self.smaps_cached = smaps_cached
self.compute_smaps(smaps)
# Smaps support
if self.smaps is not None and (
not (is_host_array(self.smaps) or is_cuda_array(self.smaps))
):
raise ValueError(
"Smaps should be either a C-ordered np.ndarray, or a GPUArray."
)
self.raw_op = RawCufinufftPlan(
self.samples,
tuple(shape),
n_trans=n_trans,
**kwargs,
)
@FourierOperatorBase.smaps.setter
def smaps(self, new_smaps):
"""Update smaps.
Parameters
----------
new_smaps: C-ordered ndarray or a GPUArray.
"""
self._check_smaps_shape(new_smaps)
if new_smaps is not None and hasattr(self, "smaps_cached"):
if self.smaps_cached or is_cuda_array(new_smaps):
self.smaps_cached = True
warnings.warn(
f"{sizeof_fmt(new_smaps.size * np.dtype(self.cpx_dtype).itemsize)}"
"used on gpu for smaps."
)
self._smaps = cp.array(
new_smaps, order="C", copy=False, dtype=self.cpx_dtype
)
else:
if self._smaps is None:
self._smaps = pin_memory(
new_smaps.astype(self.cpx_dtype, copy=False)
)
self._smap_d = cp.empty(self.shape, dtype=self.cpx_dtype)
else:
# copy the array to pinned memory
np.copyto(self._smaps, new_smaps.astype(self.cpx_dtype, copy=False))
else:
self._smaps = new_smaps
@FourierOperatorBase.samples.setter
def samples(self, samples):
"""Update the plans when changing the samples."""
self._samples = np.asfortranarray(
proper_trajectory(samples, normalize="pi").astype(np.float32, copy=False)
)
for typ in [1, 2, "grad"]:
if typ == "grad" and not self._grad_wrt_traj:
continue
self.raw_op._set_pts(typ, samples)
self.compute_density(self._density_method)
@FourierOperatorBase.density.setter
def density(self, new_density):
"""Update the density compensation."""
if new_density is None:
self._density = None
return
xp = get_array_module(new_density)
if xp.__name__ == "numpy":
self._density = cp.array(new_density)
elif xp.__name__ == "cupy":
self._density = new_density
[docs]
@with_numpy_cupy
@nvtx_mark()
def op(self, data, ksp_d=None):
r"""Non Cartesian MRI forward operator.
Parameters
----------
data: np.ndarray or GPUArray
The uniform (2D or 3D) data in image space.
Returns
-------
Results array on the same device as data.
Notes
-----
this performs for every coil \ell:
..math:: \mathcal{F}\mathcal{S}_\ell x
"""
self.check_shape(image=data, ksp=ksp_d)
data = auto_cast(data, self.cpx_dtype)
# Dispatch to special case.
if self.uses_sense and is_cuda_array(data):
op_func = self._op_sense_device
elif self.uses_sense:
op_func = self._op_sense_host
elif is_cuda_array(data):
op_func = self._op_calibless_device
else:
op_func = self._op_calibless_host
ret = op_func(data, ksp_d)
ret /= self.norm_factor
return self._safe_squeeze(ret)
def _op_sense_device(self, data, ksp_d=None):
T, B, C = self.n_trans, self.n_batchs, self.n_coils
K, XYZ = self.n_samples, self.shape
data = cp.asarray(data)
image_dataf = cp.reshape(data, (B, *XYZ))
ksp_d = ksp_d or cp.empty((B * C, K), dtype=self.cpx_dtype)
smaps_batched = cp.empty((T, *XYZ), dtype=self.cpx_dtype)
for i in range((B * C) // T):
idx_coils = np.arange(i * T, (i + 1) * T) % C
idx_batch = np.arange(i * T, (i + 1) * T) // C
data_batched = image_dataf[idx_batch].reshape((T, *XYZ))
if not self.smaps_cached:
smaps_batched.set(self.smaps[idx_coils].reshape((T, *XYZ)))
else:
smaps_batched = self.smaps[idx_coils].reshape((T, *XYZ))
data_batched *= smaps_batched
self.__op(data_batched, ksp_d[i * T : (i + 1) * T])
return ksp_d.reshape((B, C, K))
def _op_sense_host(self, data, ksp=None):
T, B, C = self.n_trans, self.n_batchs, self.n_coils
K, XYZ = self.n_samples, self.shape
coil_img_d = cp.empty((T, *XYZ), dtype=self.cpx_dtype)
dataf = data.reshape((B, *XYZ))
data_batched = cp.zeros((T, *XYZ), dtype=self.cpx_dtype)
if ksp is None:
ksp = np.zeros((B, C, K), dtype=self.cpx_dtype)
ksp = ksp.reshape((B * C, K))
ksp_batched = cp.zeros((T, K), dtype=self.cpx_dtype)
for i in range((B * C) // T):
idx_coils = np.arange(i * T, (i + 1) * T) % C
idx_batch = np.arange(i * T, (i + 1) * T) // C
data_batched.set(dataf[idx_batch].reshape((T, *XYZ)))
if not self.smaps_cached:
coil_img_d.set(self.smaps[idx_coils])
else:
cp.copyto(coil_img_d, self.smaps[idx_coils])
coil_img_d *= data_batched
self.__op(coil_img_d, ksp_batched)
ksp[i * T : (i + 1) * T] = ksp_batched.get()
ksp = ksp.reshape((B, C, K))
return ksp
def _op_calibless_device(self, data, ksp_d=None):
T, B, C = self.n_trans, self.n_batchs, self.n_coils
K, XYZ = self.n_samples, self.shape
data = cp.asarray(data).reshape(B * C, *XYZ)
if ksp_d is None:
ksp_d = cp.empty((B * C, K), dtype=self.cpx_dtype)
for i in range((B * C) // T):
self.__op(
data[i * T : (i + 1) * T],
ksp_d[i * T : (i + 1) * T],
)
return ksp_d.reshape(B, C, K)
def _op_calibless_host(self, data, ksp=None):
# calibrationless, data on host
T, B, C = self.n_trans, self.n_batchs, self.n_coils
K, XYZ = self.n_samples, self.shape
coil_img_d = cp.empty((T, *XYZ), dtype=self.cpx_dtype)
ksp_d = cp.empty((T, K), dtype=self.cpx_dtype)
if ksp is None:
ksp = np.zeros((B * C, K), dtype=self.cpx_dtype)
ksp = ksp.reshape((B * C, K))
# TODO: Add concurrency compute batch n while copying batch n+1 to device
# and batch n-1 to host
data_ = data.reshape(B * C, *XYZ)
for i in range((B * C) // T):
coil_img_d.set(data_[i * T : (i + 1) * T])
self.__op(coil_img_d, ksp_d)
ksp[i * T : (i + 1) * T] = ksp_d.get()
ksp = ksp.reshape((B, C, K))
return ksp
@nvtx_mark()
def __op(self, image_d, coeffs_d):
# ensure everything is pointers before going to raw level.
return self.raw_op.type2(image_d, coeffs_d)
[docs]
@nvtx_mark()
@with_numpy_cupy
def adj_op(self, coeffs, img_d=None):
"""Non Cartesian MRI adjoint operator.
Parameters
----------
coeffs: np.array or GPUArray
Returns
-------
Array in the same memory space of coeffs. (ie on cpu or gpu Memory).
"""
self.check_shape(image=img_d, ksp=coeffs)
coeffs = auto_cast(coeffs, self.cpx_dtype)
# Dispatch to special case.
if self.uses_sense and is_cuda_array(coeffs):
adj_op_func = self._adj_op_sense_device
elif self.uses_sense:
adj_op_func = self._adj_op_sense_host
elif is_cuda_array(coeffs):
adj_op_func = self._adj_op_calibless_device
else:
adj_op_func = self._adj_op_calibless_host
ret = adj_op_func(coeffs, img_d)
ret /= self.norm_factor
return self._safe_squeeze(ret)
[docs]
def _adj_op_sense_device(self, coeffs, img_d=None):
"""Perform sense reconstruction when data is on device."""
# Define short name
T, B, C = self.n_trans, self.n_batchs, self.n_coils
K, XYZ = self.n_samples, self.shape
coeffs = cp.asarray(coeffs).reshape(B * C, K)
# Allocate memory
if img_d is None:
img_d = cp.zeros((B, *XYZ), dtype=self.cpx_dtype)
coil_img_d = cp.empty((T, *XYZ), dtype=self.cpx_dtype)
if self.uses_density:
ksp_new = cp.empty((T, K), dtype=self.cpx_dtype)
smaps_batched = cp.empty((T, *XYZ), dtype=self.cpx_dtype)
for i in range((B * C) // T):
idx_coils = np.arange(i * T, (i + 1) * T) % C
idx_batch = np.arange(i * T, (i + 1) * T) // C
if not self.smaps_cached:
smaps_batched.set(self.smaps[idx_coils])
else:
smaps_batched = self.smaps[idx_coils]
if self.uses_density:
cp.copyto(ksp_new, coeffs[i * T : (i + 1) * T])
ksp_new *= self.density
else:
ksp_new = coeffs[i * T : (i + 1) * T]
self.__adj_op(ksp_new, coil_img_d)
for t, b in enumerate(idx_batch):
img_d[b, :] += coil_img_d[t] * smaps_batched[t].conj()
img_d = img_d.reshape((B, 1, *XYZ))
return img_d
[docs]
def _adj_op_sense_host(self, coeffs, img_d=None):
"""Perform sense reconstruction when data is on host.
On device the following array are involved:
- coil_img(S, T, 1, X,Y,Z)
- ksp_batch(B, 1, X,Y,Z)
- smaps_batched(S, T, X,Y,Z)
- density_batched(T, K)
"""
# Define short name
T, B, C = self.n_trans, self.n_batchs, self.n_coils
K, XYZ = self.n_samples, self.shape
coeffs_f = coeffs.flatten()
# Allocate memory
coil_img_d = cp.empty((T, *XYZ), dtype=self.cpx_dtype)
if img_d is None:
img_d = cp.zeros((B, *XYZ), dtype=self.cpx_dtype)
smaps_batched = cp.empty((T, *XYZ), dtype=self.cpx_dtype)
ksp_batched = cp.empty((T, K), dtype=self.cpx_dtype)
if self.uses_density:
density_batched = cp.repeat(self.density[None, :], T, axis=0)
for i in range((B * C) // T):
idx_coils = np.arange(i * T, (i + 1) * T) % C
idx_batch = np.arange(i * T, (i + 1) * T) // C
if not self.smaps_cached:
smaps_batched.set(self.smaps[idx_coils])
else:
smaps_batched = self.smaps[idx_coils]
ksp_batched.set(coeffs_f[i * T * K : (i + 1) * T * K].reshape(T, K))
if self.uses_density:
ksp_batched *= density_batched
self.__adj_op(ksp_batched, coil_img_d)
for t, b in enumerate(idx_batch):
img_d[b, :] += coil_img_d[t] * smaps_batched[t].conj()
img = img_d.get()
img = img.reshape((B, 1, *XYZ))
return img
def _adj_op_calibless_device(self, coeffs, img_d=None):
T, B, C = self.n_trans, self.n_batchs, self.n_coils
K, XYZ = self.n_samples, self.shape
coeffs = cp.asarray(coeffs)
coeffs_f = coeffs.reshape(B * C, K)
ksp_batched = cp.empty((T, K), dtype=self.cpx_dtype)
if self.uses_density:
density_batched = cp.repeat(self.density[None, :], T, axis=0)
img_d = img_d or cp.empty((B, C, *XYZ), dtype=self.cpx_dtype)
img_d = img_d.reshape(B * C, *XYZ)
for i in range((B * C) // T):
if self.uses_density:
cp.copyto(ksp_batched, coeffs_f[i * T : (i + 1) * T])
ksp_batched *= density_batched
self.__adj_op(ksp_batched, img_d[i * T : (i + 1) * T])
else:
self.__adj_op(
coeffs_f[i * T : (i + 1) * T],
img_d[i * T : (i + 1) * T],
)
return img_d.reshape(B, C, *XYZ)
def _adj_op_calibless_host(self, coeffs, img_batched=None):
T, B, C = self.n_trans, self.n_batchs, self.n_coils
K, XYZ = self.n_samples, self.shape
coeffs_ = coeffs.reshape(B * C, K)
ksp_batched = cp.empty((T, K), dtype=self.cpx_dtype)
if self.uses_density:
density_batched = cp.repeat(self.density[None, :], T, axis=0)
img = np.zeros((B * C, *XYZ), dtype=self.cpx_dtype)
if img_batched is None:
img_batched = cp.empty((T, *XYZ), dtype=self.cpx_dtype)
# TODO: Add concurrency compute batch n while copying batch n+1 to device
# and batch n-1 to host
for i in range((B * C) // T):
ksp_batched.set(coeffs_[i * T : (i + 1) * T])
if self.uses_density:
ksp_batched *= density_batched
self.__adj_op(ksp_batched, img_batched)
img[i * T : (i + 1) * T] = img_batched.get()
img = img.reshape((B, C, *XYZ))
return img
@nvtx_mark()
def __adj_op(self, coeffs_d, image_d):
return self.raw_op.type1(coeffs_d, image_d)
[docs]
def data_consistency(self, image_data, obs_data):
"""Compute the data consistency estimation directly on gpu.
This mixes the op and adj_op method to perform F_adj(F(x-y))
on a per coil basis. By doing the computation coil wise,
it uses less memory than the naive call to adj_op(op(x)-y)
Parameters
----------
image: array
Image on which the gradient operation will be evaluated.
N_coil x Image shape is not using sense.
obs_data: array
Observed data.
"""
xp = get_array_module(image_data)
if xp.__name__ == "torch" and image_data.is_cpu:
image_data = image_data.numpy()
xp = get_array_module(obs_data)
if xp.__name__ == "torch" and obs_data.is_cpu:
obs_data = obs_data.numpy()
obs_data = auto_cast(obs_data, self.cpx_dtype)
image_data = auto_cast(image_data, self.cpx_dtype)
self.check_shape(image=image_data, ksp=obs_data)
if self.uses_sense and is_host_array(image_data):
grad_func = self._dc_sense_host
elif self.uses_sense and is_cuda_array(image_data):
grad_func = self._dc_sense_device
elif not self.uses_sense and is_host_array(image_data):
grad_func = self._dc_calibless_host
elif not self.uses_sense and is_cuda_array(image_data):
grad_func = self._dc_calibless_device
else:
raise ValueError("No suitable gradient function found.")
ret = grad_func(image_data, obs_data)
ret = self._safe_squeeze(ret)
if xp.__name__ == "torch" and is_cuda_array(ret):
ret = xp.as_tensor(ret, device=image_data.device)
elif xp.__name__ == "torch":
ret = xp.from_numpy(ret)
return ret
[docs]
def _dc_sense_host(self, image_data, obs_data):
"""Gradient computation when all data is on host."""
T, B, C = self.n_trans, self.n_batchs, self.n_coils
K, XYZ = self.n_samples, self.shape
image_dataf = np.reshape(image_data, (B, *XYZ))
obs_dataf = np.reshape(obs_data, (B * C, K))
data_batched = cp.empty((T, *XYZ), dtype=self.cpx_dtype)
smaps_batched = cp.empty((T, *XYZ), dtype=self.cpx_dtype)
ksp_batched = cp.empty((T, K), dtype=self.cpx_dtype)
obs_batched = cp.empty((T, K), dtype=self.cpx_dtype)
grad_d = cp.zeros((B, *XYZ), dtype=self.cpx_dtype)
grad = np.empty((B, *XYZ), dtype=self.cpx_dtype)
for i in range(B * C // T):
idx_coils = np.arange(i * T, (i + 1) * T) % C
idx_batch = np.arange(i * T, (i + 1) * T) // C
data_batched.set(image_dataf[idx_batch].reshape((T, *XYZ)))
obs_batched.set(obs_dataf[i * T : (i + 1) * T])
if not self.smaps_cached:
smaps_batched.set(self.smaps[idx_coils].reshape((T, *XYZ)))
else:
smaps_batched = self.smaps[idx_coils].reshape((T, *XYZ))
data_batched *= smaps_batched
self.__op(data_batched, ksp_batched)
ksp_batched /= self.norm_factor
ksp_batched -= obs_batched
if self.uses_density:
ksp_batched *= self.density
self.__adj_op(ksp_batched, data_batched)
for t, b in enumerate(idx_batch):
grad_d[b, :] += data_batched[t] * smaps_batched[t].conj()
grad_d /= self.norm_factor
grad = grad_d.get()
grad = grad.reshape((B, 1, *XYZ))
return grad
[docs]
def _dc_sense_device(self, image_data, obs_data):
"""Gradient computation when all data is on device."""
T, B, C = self.n_trans, self.n_batchs, self.n_coils
K, XYZ = self.n_samples, self.shape
image_data = cp.asarray(image_data)
obs_data = cp.asarray(obs_data)
image_dataf = cp.reshape(image_data, (B, *XYZ))
obs_dataf = cp.reshape(obs_data, (B * C, K))
data_batched = cp.empty((T, *XYZ), dtype=self.cpx_dtype)
smaps_batched = cp.empty((T, *XYZ), dtype=self.cpx_dtype)
ksp_batched = cp.empty((T, K), dtype=self.cpx_dtype)
grad = cp.zeros((B, *XYZ), dtype=self.cpx_dtype)
for i in range(B * C // T):
idx_coils = np.arange(i * T, (i + 1) * T) % C
idx_batch = np.arange(i * T, (i + 1) * T) // C
cp.copyto(data_batched, image_dataf[idx_batch])
if not self.smaps_cached:
smaps_batched.set(self.smaps[idx_coils].reshape((T, *XYZ)))
else:
smaps_batched = self.smaps[idx_coils].reshape((T, *XYZ))
data_batched *= smaps_batched
self.__op(data_batched, ksp_batched)
ksp_batched /= self.norm_factor
ksp_batched -= obs_dataf[i * T : (i + 1) * T]
if self.uses_density:
ksp_batched *= self.density
self.__adj_op(ksp_batched, data_batched)
for t, b in enumerate(idx_batch):
# TODO write a kernel for that.
grad[b] += data_batched[t] * smaps_batched[t].conj()
grad = grad.reshape((B, 1, *XYZ))
grad /= self.norm_factor
return grad
[docs]
def _dc_calibless_host(self, image_data, obs_data):
"""Calibrationless Gradient computation when all data is on host."""
T, B, C = self.n_trans, self.n_batchs, self.n_coils
K, XYZ = self.n_samples, self.shape
image_dataf = np.reshape(image_data, (B * C, *XYZ))
obs_dataf = np.reshape(obs_data, (B * C, K))
data_batched = cp.empty((T, *XYZ), dtype=self.cpx_dtype)
ksp_batched = cp.empty((T, K), dtype=self.cpx_dtype)
obs_batched = cp.empty((T, K), dtype=self.cpx_dtype)
grad = np.empty((B * C, *XYZ), dtype=self.cpx_dtype)
for i in range(B * C // T):
data_batched.set(image_dataf[i * T : (i + 1) * T])
obs_batched.set(obs_dataf[i * T : (i + 1) * T])
self.__op(data_batched, ksp_batched)
ksp_batched /= self.norm_factor
ksp_batched -= obs_batched
if self.uses_density:
ksp_batched *= self.density
self.__adj_op(ksp_batched, data_batched)
data_batched /= self.norm_factor
grad[i * T : (i + 1) * T] = data_batched.get()
grad = grad.reshape((B, C, *XYZ))
return grad
[docs]
def _dc_calibless_device(self, image_data, obs_data):
"""Calibrationless Gradient computation when all data is on device."""
T, B, C = self.n_trans, self.n_batchs, self.n_coils
K, XYZ = self.n_samples, self.shape
image_data = cp.asarray(image_data).reshape(B * C, *XYZ)
obs_data = cp.asarray(obs_data).reshape(B * C, K)
data_batched = cp.empty((T, *XYZ), dtype=self.cpx_dtype)
ksp_batched = cp.empty((T, K), dtype=self.cpx_dtype)
grad = cp.empty((B * C, *XYZ), dtype=self.cpx_dtype)
for i in range(B * C // T):
cp.copyto(data_batched, image_data[i * T : (i + 1) * T])
self.__op(data_batched, ksp_batched)
ksp_batched /= self.norm_factor
ksp_batched -= obs_data[i * T : (i + 1) * T]
if self.uses_density:
ksp_batched *= self.density
self.__adj_op(ksp_batched, data_batched)
grad[i * T : (i + 1) * T] = data_batched
grad = grad.reshape((B, C, *XYZ))
grad /= self.norm_factor
return grad
[docs]
def _safe_squeeze(self, arr):
"""Squeeze the first two dimensions of shape of the operator."""
if self.squeeze_dims:
try:
arr = arr.squeeze(axis=1)
except ValueError:
pass
try:
arr = arr.squeeze(axis=0)
except ValueError:
pass
return arr
@property
def eps(self):
"""Return the underlying precision parameter."""
return self.raw_op.eps
@property
def bsize_ksp(self):
"""Size in Bytes of the compute batch of samples."""
return self.n_trans * self.ksp_size
@property
def bsize_img(self):
"""Size in Bytes of the compute batch of images."""
return self.n_trans * self.img_size
@property
def img_size(self):
"""Image size in bytes."""
return int(np.prod(self.shape) * np.dtype(self.cpx_dtype).itemsize)
@property
def ksp_size(self):
"""k-space size in bytes."""
return int(self.n_samples * np.dtype(self.cpx_dtype).itemsize)
@property
def norm_factor(self):
"""Norm factor of the operator."""
return np.sqrt(np.prod(self.shape) * 2 ** len(self.shape))
def __repr__(self):
"""Return info about the MRICufiNUFFT Object."""
return (
"MRICufiNUFFT(\n"
f" shape: {self.shape}\n"
f" n_coils: {self.n_coils}\n"
f" n_samples: {self.n_samples}\n"
f" n_trans: {self.n_trans}\n"
f" n_batchs: {self.n_batchs}\n"
f" uses_density: {self.uses_density}\n"
f" uses_sense: {self.uses_sense}\n"
f" smaps_cached: {self.smaps_cached}\n"
f" eps:{self.raw_op.eps:.0e}\n"
")"
)
def _make_plan_grad(self, **kwargs):
self.raw_op.grad_plan = Plan(
2,
self.shape,
self.n_trans,
self.raw_op.eps,
dtype=DTYPE_R2C[str(self.samples.dtype)],
isign=1,
**kwargs,
)
self.raw_op._set_pts(typ="grad", samples=self.samples)
[docs]
def get_lipschitz_cst(self, max_iter=10, **kwargs):
"""Return the Lipschitz constant of the operator.
Parameters
----------
max_iter: int
Number of iteration to perform to estimate the Lipschitz constant.
kwargs:
Extra kwargs for the cufinufft operator.
Returns
-------
float
Lipschitz constant of the operator.
"""
tmp_op = self.__class__(
self.samples,
self.shape,
density=self.density,
n_coils=1,
smaps=None,
squeeze_dims=True,
**kwargs,
)
x = 1j * np.random.random(self.shape).astype(self.cpx_dtype, copy=False)
x += np.random.random(self.shape).astype(self.cpx_dtype, copy=False)
x = cp.asarray(x)
return power_method(
max_iter, tmp_op, norm_func=lambda x: cp.linalg.norm(x.flatten()), x=x
)
[docs]
def toggle_grad_traj(self):
"""Toggle between the gradient trajectory and the plan for type 1 transform."""
if self.uses_sense:
self.smaps = self.smaps.conj()
self.raw_op.toggle_grad_traj()