Source code for mrinufft._array_compat

"""Array libraries compatibility utils."""

from typing import overload
import warnings
from functools import wraps, partial
from inspect import cleandoc
from numpy.typing import NDArray, DTypeLike
import numpy as np

ARRAY_LIBS = {
    "numpy": (np, np.ndarray),
    "cupy": (None, None),
    "torch": (None, None),
    "tensorflow": (None, None),
}

# TEST import of array libraries.
TENSORFLOW_AVAILABLE = True
try:
    import tensorflow as tf
except ImportError:
    TENSORFLOW_AVAILABLE = False


CUPY_AVAILABLE = True
try:
    import cupy as cp

    ARRAY_LIBS["cupy"] = (cp, cp.ndarray)
except ImportError:
    CUPY_AVAILABLE = False

AUTOGRAD_AVAILABLE = True
TORCH_AVAILABLE = True
try:
    import torch

    ARRAY_LIBS["torch"] = (torch, torch.Tensor)
except ImportError:
    AUTOGRAD_AVAILABLE = False
    TORCH_AVAILABLE = False
    pass
    NP2TORCH = {}
else:
    NP2TORCH = {
        np.dtype("float64"): torch.float64,
        np.dtype("float32"): torch.float32,
        np.dtype("complex64"): torch.complex64,
        np.dtype("complex128"): torch.complex128,
    }
try:
    from tensorflow.experimental import numpy as tnp

    ARRAY_LIBS["tensorflow"] = (tnp, tnp.ndarray)
except ImportError:
    pass


[docs] def get_array_module(array: NDArray) -> np: # type: ignore """Get the module of the array.""" for lib, array_type in ARRAY_LIBS.values(): if lib is not None and isinstance(array, array_type): return lib raise ValueError(f"Unknown array library (={type(array)}.")
[docs] def auto_cast(array, dtype: DTypeLike): """Cast an array or a tensor to the desired dtype. This automatically convert numpy/torch dtype to suitable format. """ module = get_array_module(array) if module.__name__ == "torch": return array.to(NP2TORCH[np.dtype(dtype)], copy=False) else: return array.astype(dtype, copy=False)
def _tf_cuda_is_available(): """Check whether Tensorflow has CUDA support or not.""" if TENSORFLOW_AVAILABLE: devices = tf.config.list_physical_devices() device_type = [device.device_type for device in devices] return "GPU" in device_type else: return False TF_CUDA_AVAILABLE = _tf_cuda_is_available()
[docs] def is_cuda_array(var) -> bool: """Check if var implement the CUDA Array interface.""" try: return hasattr(var, "__cuda_array_interface__") except Exception: return False
[docs] def is_cuda_tensor(var) -> bool: """Check if var is a CUDA tensor.""" return TORCH_AVAILABLE and isinstance(var, torch.Tensor) and var.is_cuda
[docs] def is_host_array(var) -> bool: """Check if var is a host contiguous np array.""" try: if isinstance(var, np.ndarray): if not var.flags.c_contiguous: warnings.warn("The input is CPU array but not C-contiguous. ") return False return True except Exception: pass return False
[docs] def pin_memory(array: NDArray) -> NDArray: """Create a copy of the array in pinned memory.""" mem = cp.cuda.alloc_pinned_memory(array.nbytes) ret = np.frombuffer(mem, array.dtype, array.size).reshape(array.shape) ret[...] = array return ret
NUMPY_NOTE = """ .. note:: This function uses ``numpy`` internally, and will convert all its array argument to numpy arrays. The outputs will be converted back to the original array module and device. """ NUMPY_CUPY_NOTE = """ .. note:: This function uses ``numpy`` for all CPU arrays, and ``cupy`` for all on-gpu array. It will convert all its array argument to the respective array library. The outputs will be converted back to the original array module and device. """ TORCH_NOTE = """ .. note:: This function uses ``torch`` internally, and will convert all its array argument to torch tensors, but will respect the device they are allocated on. The outputs will be converted back to the original array module and device. """ TF_NOTE = """ .. note:: This function uses ``tensorflow`` internally, and will convert all its array argument to tensorflow tensors. but will respect the device they are allocated on. The outputs will be converted back to the original array module and device. """ def _with(fun, _to_inner_xp, note): """Create a decorator to convert to a given array interface.""" @wraps(fun) def wrapper(*args, **kwargs): leading_arg = _get_leading_argument(args, kwargs) # get array module from leading argument xp = get_array_module(leading_arg) device = _get_device(leading_arg) # convert all to interface args, kwargs = _to_inner_xp(args, kwargs, device=device) # run function ret_ = fun(*args, **kwargs) # convert output to original array module and device return _to_interface(ret_, xp, device) if wrapper.__doc__: wrapper.__doc__ = cleandoc(wrapper.__doc__) + "\n\n" + note return wrapper
[docs] def with_numpy(fun): """Ensure the function works internally with numpy array.""" return _with(fun, _to_numpy, NUMPY_NOTE)
[docs] def with_tensorflow(fun): """Ensure the function works internally with tensorflow array.""" return _with(fun, _to_tensorflow, TF_NOTE)
[docs] def with_numpy_cupy(fun): """Ensure the function works internally with numpy or cupy array.""" return _with(fun, _to_numpy_cupy, NUMPY_CUPY_NOTE)
[docs] def with_torch(fun): """Ensure the function works internally with Torch.""" return _with(fun, _to_torch, TORCH_NOTE)
def _get_device(input): """Determine computational device from input array.""" try: return input.device except AttributeError: return "cpu" def _get_leading_argument(args, kwargs): """Find first array argument.""" for arg in args: try: get_array_module(arg) return arg except Exception: pass for arg in kwargs.values(): try: get_array_module(arg) return arg except Exception: pass return np.asarray(0.0) # by default, use dummy numpy leading arg def _array_to_numpy(_arg): """Convert array to Numpy.""" if is_cuda_array(_arg): warnings.warn("data is on gpu, it will be moved to CPU.") xp = get_array_module(_arg) if xp.__name__ == "torch": _arg = _arg.numpy(force=True) elif xp.__name__ == "cupy": _arg = cp.asnumpy(_arg) elif "tensorflow" in xp.__name__: _arg = _arg.numpy() return _arg def _array_to_cupy(_arg, device=None): """Convert array to Cupy.""" xp = get_array_module(_arg) if xp.__name__ == "numpy": with cp.cuda.Device(device): _arg = cp.asarray(_arg) elif xp.__name__ == "torch": if _arg.requires_grad: _arg = _arg.detach() if _arg.is_cpu: with cp.cuda.Device(device): _arg = cp.asarray(_arg.numpy()) else: _arg = cp.from_dlpack(_arg) elif "tensorflow" in xp.__name__: if "CPU" in _arg.device: with cp.cuda.Device(device): _arg = cp.asarray(_arg.numpy()) else: _arg = cp.from_dlpack(tf.experimental.dlpack.to_dlpack(_arg)) return _arg def _array_to_torch(_arg, device=None): """Convert array to torch.""" xp = get_array_module(_arg) if xp.__name__ == "numpy": _arg = torch.as_tensor(_arg, device=device) elif xp.__name__ == "cupy": if torch.cuda.is_available(): _arg = torch.from_dlpack(_arg) else: warnings.warn("data is on gpu, it will be moved to CPU.") _arg = torch.as_tensor(cp.asnumpy(_arg)) elif "tensorflow" in xp.__name__: if "CPU" in _arg.device: _arg = torch.as_tensor(_arg.numpy(), device=device) else: _arg = torch.from_dlpack(tf.experimental.dlpack.to_dlpack(_arg)) return _arg def _array_to_tensorflow(_arg): xp = get_array_module(_arg) if xp.__name__ == "numpy": with tf.device("CPU"): _arg = tf.convert_to_tensor(_arg) elif xp.__name__ == "cupy": if TF_CUDA_AVAILABLE: _arg = tf.experimental.dlpack.from_dlpack(_arg.toDlpack()) else: warnings.warn("data is on gpu, it will be moved to CPU.") _arg = tf.convert_to_tensor(cp.asnumpy(_arg)) elif xp.__name__ == "torch": if _arg.requires_grad: _arg = _arg.detach() if _arg.is_cpu: _arg = tf.convert_to_tensor(_arg) elif TF_CUDA_AVAILABLE: _arg = tf.experimental.dlpack.from_dlpack( torch.utils.dlpack.to_dlpack(_arg) ) else: _arg = tf.convert_to_tensor(_arg.numpy(force=True)) def _convert(_array_to_xp, args, kwargs=None): # convert positional arguments args = list(args) for n in range(len(args)): _arg = args[n] if hasattr(_arg, "__array__"): args[n] = _array_to_xp(_arg) elif isinstance(_arg, (tuple, list)): args[n], _ = _convert(_array_to_xp, _arg) # objects with attributes that are arrays are also converted elif hasattr(_arg, "__dict__") and not isinstance: process_dict_vals, _ = _convert(*_arg.__dict__.values()) for k, v in zip(_arg.__dict__.keys(), process_dict_vals): try: setattr(_arg, k, v) except Exception: pass # convert keyworded if kwargs: process_kwargs_vals, _ = _convert(_array_to_xp, kwargs.values()) kwargs = {k: v for k, v in zip(kwargs.keys(), process_kwargs_vals)} return args, kwargs def _to_numpy(args, kwargs=None, device=None): """Convert a sequence of arguments to numpy. Non-arrays are ignored. """ return _convert(_array_to_numpy, args, kwargs) def _to_cupy(args, kwargs=None, device=None): """Convert a sequence of arguments to cupy. Non-arrays are ignored. This avoid transfers between different devices (e.g., different GPUs). """ # enforce mutable if str(device) == "cpu": device = 0 else: try: device = device.index except AttributeError: pass return _convert(partial(_array_to_cupy, device=device), args, kwargs) def _to_numpy_cupy(args, kwargs=None, device="cpu"): """Convert a sequence of arguments to numpy or cupy. Non-arrays are ignored. This avoid transfers between different devices (e.g., CPU->GPU, GPU->CPU or different GPUs). """ if CUPY_AVAILABLE and str(device) != "cpu": return _to_cupy(args, kwargs, device) else: return _to_numpy(args, kwargs) def _to_torch(args, kwargs=None, device=None): """Convert a sequence of arguments to Pytorch Tensors. Non-arrays are ignored. This avoid transfers between different devices (e.g., CPU->GPU, GPU->CPU or different GPUs). """ return _convert(partial(_array_to_torch, device=device), args, kwargs) def _to_tensorflow(args, kwargs=None): """Convert a sequence of arguments to Tensorflow tensors. Non-arrays are ignored. This avoid transfers between different devices (e.g., CPU->GPU, GPU->CPU or different GPUs). """ return _convert(_array_to_tensorflow, args, kwargs) def _to_interface(args, array_interface, device=None): """ Convert a list of arguments to a given array interface. User may provide the desired computational device. Non-arrays are ignored. Parameters ---------- args : list[object] List of objects to be converted. array_interface : ModuleType Desired array backend (e.g., numpy). device : Device, optional Desired computational device, (e.g., "cpu" or Device('cuda')). The default is None (i.e., maintain same device as input argument). Returns ------- list[object] List of converted objects. """ # enforce iterable if isinstance(args, (list, tuple)) is False: args = [args] # convert to target interface if array_interface.__name__ == "numpy": args, _ = _to_numpy(args) elif array_interface.__name__ == "cupy": args, _ = _to_cupy(args, device=device) elif array_interface.__name__ == "torch": args, _ = _to_torch(args, device=device) if len(args) == 1: return args[0] return tuple(args)