Source code for snake.core.transform

"""Mathematical transformations of data."""

import base64
import logging
from collections.abc import Callable
from types import ModuleType
from typing import Any

import numpy as np
from numpy.typing import NDArray

from snake._meta import ThreeInts

from .parallel import run_parallel

log = logging.getLogger(__name__)


[docs] def effective_affine(old: NDArray, new: NDArray) -> NDArray: """Compute the effective affine transformation between two affine matrices.""" new_affine = np.asarray(new, dtype=np.float32) old_affine = np.asarray(old, dtype=np.float32) return np.linalg.inv(old_affine) @ new_affine
[docs] def _validate_gpu_affine(use_gpu: bool = True) -> tuple[bool, Callable, ModuleType]: """Check if we can use the affine_transform from cupy.""" if use_gpu: try: import cupy as xp from cupyx.scipy.ndimage import affine_transform as cu_affine_transform try: use_gpu = xp.cuda.is_available() except Exception as exc: use_gpu = False raise ImportError from exc def affine_transform( x: NDArray, *args: Any, output_shape: ThreeInts, output: NDArray[np.float32] = None, **kwargs: Any, ) -> NDArray: output_gpu = xp.zeros(output_shape, dtype=x.dtype) cu_affine_transform( x, *args, output_shape=output_shape, output=output_gpu, **kwargs, texture_memory=x.dtype == xp.float32, ) if output is not None: xp.copyto(output, output_gpu) return output else: return output_gpu.get() except ImportError: use_gpu = False if not use_gpu: import numpy as xp log.warning("Cupy not available, using CPU.") from scipy.ndimage import affine_transform return use_gpu, affine_transform, xp
[docs] def apply_affine( data: NDArray[np.float32], old_affine: NDArray[np.float32], new_affine: NDArray[np.float32], new_shape: ThreeInts, output: NDArray[np.float32] = None, transform_affine: NDArray[np.float32] = None, use_gpu: bool = True, **kwargs: Any, ) -> NDArray[np.float32]: """Apply the new affine on the data. Parameters ---------- data : NDArray Data to be transformed. 3D Array. old_affine : NDArray Affine of the original data new_affine : NDArray Affine of the new data new_shape : ThreeInts Shape of the new data transform_affine : NDArray, optional Transformation affine, by default None use_gpu : bool, optional Try to use GPU, by default True output: NDArray, optional Output array, by default None Returns ------- NDArray Resampled data with ``new_affine`` orientation and ``new_shape`` shape. """ use_gpu, affine_transform, xp = _validate_gpu_affine(use_gpu) if transform_affine is None: transform_affine = effective_affine(old_affine, new_affine) transform_affine = xp.asarray(transform_affine, dtype=xp.float32) data = xp.asarray(data) new_data = affine_transform( data, matrix=transform_affine, output_shape=new_shape, output=output, **kwargs, ) return new_data
[docs] def __apply_affine( x: NDArray, output: NDArray, i: int, *args: Any, **kwargs: Any ) -> NDArray: return apply_affine(x[i], *args, output=output[i], **kwargs)
[docs] def apply_affine4d( data: NDArray, old_affine: NDArray, new_affine: NDArray, new_shape: ThreeInts, use_gpu: bool = False, n_jobs: int = -1, axis: int = 0, **kwargs: Any, ) -> NDArray: """ Apply the new affine on 4D data. Parameters ---------- data : NDArray Data to be transformed. 3D Array. old_affine : NDArray Affine of the original data new_affine : NDArray Affine of the new data new_shape : ThreeInts Shape of the new data transform_affine : NDArray, optional Transformation affine, by default None use_gpu : bool, optional Returns ------- NDArray Resampled data with ``new_affine`` orientation and ``new_shape`` shape. See Also -------- apply_affine """ n_frames = data.shape[axis] new_array = np.zeros((n_frames, *new_shape), dtype=data.dtype) transform_affine = effective_affine(old_affine, new_affine) use_gpu = _validate_gpu_affine(use_gpu)[0] if not use_gpu: run_parallel( __apply_affine, data, new_array, old_affine=old_affine, new_affine=new_affine, use_gpu=False, transform_affine=transform_affine, new_shape=new_shape, n_jobs=n_jobs, parallel_axis=axis, **kwargs, ) else: for i in range(n_frames): new_array[i] = apply_affine( data[i], old_affine=old_affine, new_affine=new_affine, use_gpu=use_gpu, transform_affine=transform_affine, new_shape=new_shape, ) return new_array
[docs] def serialize_array(arr: NDArray) -> str: """Serialize the array for mrd compatible format.""" return "__".join( [ base64.b64encode(arr.tobytes()).decode(), str(arr.shape), str(arr.dtype), ] )
[docs] def unserialize_array(s: str) -> NDArray: """Unserialize the array for mrd compatible format.""" data, shape, dtype = s.split("__") shape = eval(shape) # FIXME return np.frombuffer(base64.b64decode(data.encode()), dtype=dtype).reshape(*shape)