Source code for mrinufft.trajectories.tools

"""Functions to manipulate/modify trajectories."""

from typing import Any, Callable, Literal

import numpy as np
from numpy.typing import NDArray
from scipy.interpolate import CubicSpline, interp1d
from scipy.stats import norm

from .maths import Rv, Rx, Ry, Rz
from .utils import KMAX, initialize_tilt, VDSpdf, VDSorder

################
# DIRECT TOOLS #
################


[docs] def stack( trajectory: NDArray, nb_stacks: int, z_tilt: str | float | None = None, *, hard_bounded: bool = True, ) -> NDArray: """Stack 2D or 3D trajectories over the :math:`k_z`-axis. Parameters ---------- trajectory : NDArray Trajectory in 2D or 3D to stack. nb_stacks : int Number of stacks repeating the provided trajectory. z_tilt : str | float, optional Tilt of the stacks, by default `None`. hard_bounded : bool, optional Whether the stacks should be strictly within the limits of the k-space. Returns ------- NDArray Stacked trajectory. """ # Check dimensionality and initialize output Nc, Ns = trajectory.shape[:2] if trajectory.shape[-1] == 2: trajectory = np.concatenate([trajectory, np.zeros((Nc, Ns, 1))], axis=-1) trajectory = trajectory.reshape((Nc * Ns, 3)) new_trajectory = np.zeros((nb_stacks, Nc * Ns, 3)) # Initialize z-axis with boundaries, and z-rotation ub, lb = KMAX / nb_stacks, -KMAX / nb_stacks if hard_bounded: ub = max(np.max(trajectory[..., 2]), ub) lb = min(np.min(trajectory[..., 2]), lb) z_axis = np.linspace(-KMAX - lb, KMAX - ub, nb_stacks) z_rotation = Rz(initialize_tilt(z_tilt, nb_stacks)).T # Start stacking the trajectories new_trajectory[0] = trajectory new_trajectory[0, :, 2] += z_axis[0] for i in range(1, nb_stacks): new_trajectory[i] = new_trajectory[i - 1] @ z_rotation new_trajectory[i, :, 2] = z_axis[i] + trajectory[..., 2] return new_trajectory.reshape(nb_stacks * Nc, Ns, 3)
[docs] def rotate( trajectory: NDArray, nb_rotations: int, x_tilt: str | float | None = None, y_tilt: str | float | None = None, z_tilt: str | float | None = None, ) -> NDArray: """Rotate 2D or 3D trajectories over the different axes. Parameters ---------- trajectory : NDArray Trajectory in 2D or 3D to rotate. nb_rotations : int Number of rotations repeating the provided trajectory. x_tilt : str | float, optional Tilt of the trajectory over the :math:`k_x`-axis, by default `None`. y_tilt : str | float, optional Tilt of the trajectory over the :math:`k_y`-axis, by default `None`. z_tilt : str | float, optional Tilt of the trajectory over the :math:`k_z`-axis, by default `None`. Returns ------- NDArray Rotated trajectory. """ # Check dimensionality and initialize output Nc, Ns = trajectory.shape[:2] if trajectory.shape[-1] == 2: trajectory = np.concatenate([trajectory, np.zeros((Nc, Ns, 1))], axis=-1) trajectory = trajectory.reshape((Nc * Ns, 3)) new_trajectory = np.zeros((nb_rotations, Nc * Ns, 3)) # Start rotating the planes x_angle = initialize_tilt(x_tilt, nb_rotations) y_angle = initialize_tilt(y_tilt, nb_rotations) z_angle = initialize_tilt(z_tilt, nb_rotations) new_trajectory[0] = trajectory for i in range(1, nb_rotations): rotation = (Rx(i * x_angle) @ Ry(i * y_angle) @ Rz(i * z_angle)).T new_trajectory[i] = new_trajectory[0] @ rotation return new_trajectory.reshape(nb_rotations * Nc, Ns, 3)
[docs] def precess( trajectory: NDArray, nb_rotations: int, tilt: str | float = "golden", half_sphere: bool = False, partition: Literal["axial", "polar"] = "axial", axis: int | NDArray | None = None, ) -> NDArray: """Rotate trajectories as a precession around the :math:`k_z`-axis. Parameters ---------- trajectory : NDArray Trajectory in 2D or 3D to rotate. nb_rotations : int Number of rotations repeating the provided trajectory while precessing. tilt : str | float, optional Angle tilt between consecutive rotations around the :math:`k_z`-axis, by default "golden". half_sphere : bool, optional Whether the precession should be limited to the upper half of the k-space sphere. It is typically used for in-out trajectories or planes. partition : Literal["axial", "polar"], optional Partition type between an "axial" or "polar" split of the :math:`k_z`-axis, designating whether the axis should be fragmented by radius or angle respectively, by default "axial". axis : int, NDArray, optional Axis selected for alignment reference when rotating the trajectory around the :math:`k_z`-axis, generally corresponding to the shot direction for single shot ``trajectory`` inputs. It can either be an integer for one of the three k-space axes, or directly a 3D array. The default behavior when `None` is to select the last coordinate of the first shot as the axis, by default `None`. Returns ------- NDArray Precessed trajectory. """ # Check for partition option error if partition not in ["polar", "axial"]: raise NotImplementedError(f"Unknown partition type: {partition}") # Check dimensionality and initialize output Nc, Ns = trajectory.shape[:2] if trajectory.shape[-1] == 2: trajectory = np.concatenate([trajectory, np.zeros((Nc, Ns, 1))], axis=-1) trajectory = trajectory.reshape((Nc * Ns, 3)) new_trajectory = np.zeros((nb_rotations, Nc * Ns, 3)) # Determine direction vectors on a sphere vectors = np.zeros((nb_rotations, 3)) phi = initialize_tilt(tilt, nb_rotations) * np.arange(nb_rotations) vectors[:, 2] = np.linspace(-1 + half_sphere, 1, nb_rotations) if partition == "polar": vectors[:, 2] = np.sin(np.pi / 2 * vectors[:, 2]) radius = np.sqrt(1 - vectors[:, 2] ** 2) vectors[:, 0] = np.cos(phi) * radius vectors[:, 1] = np.sin(phi) * radius # Select rotation axis when axis is not already a vector if axis is None: axis_vector = np.copy(trajectory[Ns - 1]) axis_vector /= np.linalg.norm(axis_vector) elif isinstance(axis, int): axis_vector = np.zeros(3) axis_vector[axis] = 1 else: axis_vector = axis # Rotate initial trajectory for i in np.arange(nb_rotations): rotation = Rv(axis_vector, vectors[i], normalize=False).T new_trajectory[i] = trajectory @ rotation return new_trajectory.reshape((nb_rotations * Nc, Ns, 3))
[docs] def conify( trajectory: NDArray, nb_cones: int, z_tilt: str | float | None = None, in_out: bool = False, max_angle: float = np.pi / 2, borderless: bool = True, ) -> NDArray: """Distort 2D or 3D trajectories into cones along the :math:`k_z`-axis. Parameters ---------- trajectory : NDArray Trajectory to conify. nb_cones : int Number of cones repeating the provided trajectory. z_tilt : str | float, optional Tilt of the trajectory over the :math:`k_z`-axis, by default `None`. in_out : bool, optional Whether to account for the in-out nature of some trajectories to avoid hard angles around the center, by default False. max_angle : float, optional Maximum angle of the cones, by default pi / 2. borderless : bool, optional Whether the cones should reach `max_angle` or not, and avoid 1D cones if equal to pi / 2, by default True. Returns ------- NDArray Conified trajectory. """ # Check dimensionality and initialize output Nc, Ns = trajectory.shape[:2] if trajectory.shape[-1] == 2: trajectory = np.concatenate([trajectory, np.zeros((Nc, Ns, 1))], axis=-1) trajectory = trajectory.reshape((Nc * Ns, 3)) new_trajectory = np.zeros((nb_cones, Nc * Ns, 3)) # Initialize angles z_angle = initialize_tilt(z_tilt, nb_cones) alphas = np.linspace(-max_angle, +max_angle, nb_cones + 2 * borderless) if borderless: alphas = alphas[1:-1] # Remove partition borders # Start processing the trajectory new_trajectory[:] = trajectory for i, alpha in enumerate(alphas): # Apply tilt rotation = Rz(np.abs(i - nb_cones // 2) * z_angle).T # Symmetrical for in-out new_trajectory[i] = new_trajectory[i] @ rotation # Convert to spherical coordinates norms = np.linalg.norm(new_trajectory[i], axis=-1) polar_angles = np.arccos( new_trajectory[i, ..., 2] / np.where(norms == 0, 1, norms) ) # Conify by changing polar angle new_trajectory[i, :, 0] = ( new_trajectory[i, :, 0] / np.sin(polar_angles) * np.sin(polar_angles + alpha) ) new_trajectory[i, :, 1] = ( new_trajectory[i, :, 1] / np.sin(polar_angles) * np.sin(polar_angles + alpha) ) new_trajectory[i, :, 2] = norms * np.cos(polar_angles + alpha) new_trajectory = new_trajectory.reshape(nb_cones * Nc, Ns, 3) # Handle in-out trajectories to avoid hard transition at the center if in_out: new_trajectory[:, Ns // 2 :, 2] = -new_trajectory[:, Ns // 2 :, 2] return new_trajectory
[docs] def epify( trajectory: NDArray, Ns_transitions: int, nb_trains: int, *, reverse_odd_shots: bool = False, ) -> NDArray: """Create multi-readout shots from trajectory composed of single-readouts. Assemble multiple single-readout shots together by adding transition steps in the trajectory to create EPI-like multi-readout shots. Parameters ---------- trajectory : NDArray Trajectory to change by prolonging and merging the shots. Ns_transitions : int Number of samples/steps between the merged readouts. nb_trains : int Number of resulting multi-readout shots, or trains. reverse_odd_shots : bool, optional Whether to reverse every odd shots such that, as in most trajectories, even shots end up closer to the start of odd shots. Returns ------- NDArray Trajectory with fewer but longer multi-readout shots. """ Nc, Ns, Nd = trajectory.shape if Nc % nb_trains != 0: raise ValueError( "`nb_trains` should divide the number of shots in `trajectory`." ) nb_shot_per_train = Nc // nb_trains # Reverse odd shots to facilitate concatenation if requested trajectory = np.copy(trajectory) trajectory = trajectory.reshape((nb_trains, -1, Ns, Nd)) if reverse_odd_shots: trajectory[:, 1::2] = trajectory[:, 1::2, ::-1] # Assemble shots together per concatenation assembled_trajectory = [] source_sample_ids = np.concatenate( [np.arange(Ns) + i * (Ns_transitions + Ns) for i in range(nb_shot_per_train)] ) target_sample_ids = np.arange( nb_shot_per_train * Ns + (nb_shot_per_train - 1) * Ns_transitions ) for i_c in range(nb_trains): spline = CubicSpline(source_sample_ids, np.concatenate(trajectory[i_c], axis=0)) assembled_trajectory.append(spline(target_sample_ids)) return np.array(assembled_trajectory)
[docs] def unepify(trajectory: NDArray, Ns_readouts: int, Ns_transitions: int) -> NDArray: """Recover single-readout shots from multi-readout trajectory. Reformat an EPI-like trajectory with multiple readouts and transitions to more single-readout shots by discarding the transition parts. Note that it can also be applied to any array of shape (Nc, Ns_readouts + Ns_transitions, ...) such as acquired samples for example. Parameters ---------- trajectory : NDArray Trajectory to reduce by discarding transitions between readouts. Ns_readouts : int Number of samples within a single readout. Ns_transitions : int Number of samples/steps between the readouts. Returns ------- NDArray Trajectory with more but shorter single shots. """ Nc, Ns, Nd = trajectory.shape if Ns % (Ns_readouts + Ns_transitions) != Ns_readouts: raise ValueError( "`trajectory` shape does not match `Ns_readouts` or `Ns_transitions`." ) readout_mask = np.zeros(Ns).astype(bool) for i in range(1, Ns // (Ns_readouts + Ns_transitions) + 2): readout_mask[ (i - 1) * Ns_readouts + (i - 1) * Ns_transitions : i * Ns_readouts + (i - 1) * Ns_transitions ] = True trajectory = trajectory[:, readout_mask, :] trajectory = trajectory.reshape((-1, Ns_readouts, Nd)) return trajectory
[docs] def prewind(trajectory: NDArray, Ns_transitions: int) -> NDArray: """Add pre-winding/positioning to the trajectory. The trajectory is extended to start before the readout from the k-space center with null gradients and reach each shot position with the required gradient strength. Parameters ---------- trajectory : NDArray Trajectory to extend with rewind gradients. Ns_transitions : int Number of pre-winding/positioning steps used to leave the k-space center and prepare for each shot to start. Returns ------- NDArray Extended trajectory with pre-winding/positioning. """ Nc, Ns, Nd = trajectory.shape if Ns_transitions < 3: raise ValueError("`Ns_transitions` should be at least 2.") # Assemble shots together per concatenation assembled_trajectory = [] source_sample_ids = np.concatenate([[0, 1], Ns_transitions + np.arange(Ns)]) target_sample_ids = np.arange(Ns_transitions + Ns) for i_c in range(Nc): spline = CubicSpline( source_sample_ids, np.concatenate([np.zeros((2, Nd)), trajectory[i_c]], axis=0), ) assembled_trajectory.append(spline(target_sample_ids)) return np.array(assembled_trajectory)
[docs] def rewind(trajectory: NDArray, Ns_transitions: int) -> NDArray: """Add rewinding to the trajectory. The trajectory is extended to come back to the k-space center after the readouts with null gradients. Parameters ---------- trajectory : NDArray Trajectory to extend with rewind gradients. Ns_transitions : int Number of rewinding steps used to come back to the k-space center. Returns ------- NDArray Extended trajectory with rewinding. """ Nc, Ns, Nd = trajectory.shape if Ns_transitions < 3: raise ValueError("`Ns_transitions` should be at least 2.") # Assemble shots together per concatenation assembled_trajectory = [] source_sample_ids = np.concatenate( [np.arange(Ns), Ns + Ns_transitions - np.arange(3, 1, -1)] ) target_sample_ids = np.arange(Ns_transitions + Ns) for i_c in range(Nc): spline = CubicSpline( source_sample_ids, np.concatenate([trajectory[i_c], np.zeros((2, Nd))], axis=0), ) assembled_trajectory.append(spline(target_sample_ids)) return np.array(assembled_trajectory)
[docs] def oversample( trajectory: NDArray, new_Ns: int, kind: Literal["linear", "quadratic", "cubic"] = "cubic", ) -> NDArray: """ Resample a trajectory to increase the number of samples using interpolation. Parameters ---------- trajectory : NDArray The original trajectory array, where interpolation is applied along the second axis. new_Ns : int The desired number of samples in the resampled trajectory. kind : Literal, optional The type of interpolation to use, such as 'linear', 'quadratic', or 'cubic', by default "cubic". Returns ------- NDArray The resampled trajectory array with ``new_Ns`` points along the second axis. Notes ----- This function uses ``scipy.interpolate.interp1d`` to perform the interpolation along the second axis of the input `trajectory` array. Warnings -------- Using 'quadratic' or 'cubic' interpolations is likely to generate samples located slightly beyond the original k-space limits by making smooth transitions. See Also -------- scipy.interpolate.interp1d : The underlying interpolation function used for resampling. """ f = interp1d(np.linspace(0, 1, trajectory.shape[1]), trajectory, axis=1, kind=kind) return f(np.linspace(0, 1, new_Ns))
#################### # FUNCTIONAL TOOLS # ####################
[docs] def stack_spherically( trajectory_func: Callable[..., NDArray], Nc: int, nb_stacks: int, z_tilt: str | float | None = None, hard_bounded: bool = True, **traj_kwargs: Any, # noqa ANN401 ) -> NDArray: """Stack 2D or 3D trajectories over the :math:`k_z`-axis to make a sphere. Parameters ---------- trajectory_func : Callable[..., NDArray] Trajectory function that should return an array-like with the usual (Nc, Ns, Nd) size. Nc : int Number of shots to use for the whole spherically stacked trajectory. nb_stacks : int Number of stacks of trajectories. z_tilt : str | float, optional Tilt of the stacks, by default `None`. hard_bounded : bool, optional Whether the stacks should be strictly within the limits of the k-space, by default `True`. **traj_kwargs : Any Trajectory initialization parameters for the function provided with `trajectory_func`. Returns ------- NDArray Stacked trajectory. """ # Handle argument errors if Nc < nb_stacks: raise ValueError("Nc should be higher than nb_stacks.") # Initialize a plane to estimate potential thickness trajectory = trajectory_func(Nc=Nc // nb_stacks, **traj_kwargs) if trajectory.shape[-1] == 2: trajectory = np.concatenate( [trajectory, np.zeros((*(trajectory.shape[:2]), 1))], axis=-1 ) # Initialize z-axis with boundaries, and z-rotation ub, lb = KMAX / nb_stacks, -KMAX / nb_stacks if hard_bounded: ub = max(np.max(trajectory[..., 2]), ub) lb = min(np.min(trajectory[..., 2]), lb) z_axis = np.linspace(-KMAX - lb, KMAX - ub, nb_stacks) radii = np.cos(np.arcsin(z_axis / KMAX)) # Attribute shots to stacks following density proportional to surface Nc_per_stack = np.ones(nb_stacks).astype(int) density = radii**2 # simplified version for _ in range(Nc - nb_stacks): idx = np.argmax(density / Nc_per_stack) Nc_per_stack[idx] += 1 # Start stacking the trajectories new_trajectory = [] for i in range(nb_stacks): # Initialize a single stack stack = trajectory_func(Nc=Nc_per_stack[i], **traj_kwargs) if stack.shape[-1] == 2: stack = np.concatenate([stack, np.zeros((*(stack.shape[:2]), 1))], axis=-1) stack[..., :2] = radii[i] * stack[..., :2] stack[..., 2] = z_axis[i] + stack[..., 2] # Apply z tilt rotation = Rz(i * initialize_tilt(z_tilt, nb_stacks)).T stack = stack @ rotation new_trajectory.append(stack) # Concatenate or handle varying Ns value Ns_values = np.array([stk.shape[1] for stk in new_trajectory]) if (Ns_values == Ns_values[0]).all(): output = np.concatenate(new_trajectory, axis=0) return output.reshape(Nc, Ns_values[0], 3) return np.concatenate([stk.reshape((-1, 3)) for stk in new_trajectory], axis=0)
[docs] def shellify( trajectory_func: Callable[..., NDArray], Nc: int, nb_shells: int, z_tilt: str | float = "golden", hemisphere_mode: Literal["symmetric", "reversed"] = "symmetric", **traj_kwargs: Any, # noqa ANN401 ) -> NDArray: """Stack 2D or 3D trajectories over the :math:`k_z`-axis to make a sphere. Parameters ---------- trajectory_func : Callable[..., NDArray] Trajectory function that should return an array-like with the usual (Nc, Ns, Nd) size. Nc : int Number of shots to use for the whole spherically stacked trajectory. nb_shells : int Number of shells of distorted trajectories. z_tilt : str | float, optional Tilt of the shells, by default "golden". hemisphere_mode : Literal["symmetric", "reversed"], optional Define how the lower hemisphere should be oriented relatively to the upper one, with "symmetric" providing a :math:`k_x-k_y` planar symmetry by changing the polar angle, and with "reversed" promoting continuity (for example in spirals) by reversing the azimuth angle. The default is "symmetric". **traj_kwargs : Any Trajectory initialization parameters for the function provided with `trajectory_func`. Returns ------- NDArray Concentric shell trajectory. """ # Handle argument errors if hemisphere_mode not in ["symmetric", "reversed"]: raise ValueError(f"Unknown hemisphere_mode: `{hemisphere_mode}`.") if Nc < 2 * nb_shells: raise ValueError("Nc should be at least twice higher than nb_shells.") # Attribute shots to shells following a prescribed density Nc_per_shell = np.ones(nb_shells).astype(int) density = np.arange(1, nb_shells + 1) ** 2 # simplified version for _ in range((Nc - 2 * nb_shells) // 2): idx = np.argmax(density / Nc_per_shell) Nc_per_shell[idx] += 1 # Create shells one by one radii = (0.5 + np.arange(nb_shells)) / nb_shells new_trajectory = [] for i in range(nb_shells): # Initialize trajectory shell_upper = trajectory_func(Nc=Nc_per_shell[i], **traj_kwargs) if shell_upper.shape[-1] < 3: shell_upper = np.concatenate( [shell_upper, np.zeros((*(shell_upper.shape[:-1]), 1))], axis=-1 ) # Carve upper hemisphere from trajectory z_coords = KMAX**2 - shell_upper[..., 0] ** 2 - shell_upper[..., 1] ** 2 z_signs = np.sign(z_coords) shell_upper[..., 2] += z_signs * np.sqrt(np.abs(z_coords)) # Initialize lower hemisphere from upper shell_lower = np.copy(shell_upper) if hemisphere_mode in ["symmetric", "reversed"]: shell_lower[..., 2] = -shell_lower[..., :, 2] # Invert polar angle if hemisphere_mode in ["reversed"]: shell_lower[..., 1] = -shell_lower[..., :, 1] # Invert azimuthal angle # Apply shell tilt rotation = Rz(i * initialize_tilt(z_tilt, nb_shells)).T shell_upper = shell_upper @ rotation shell_lower = shell_lower @ rotation # Scale them and add them to the trajectory new_trajectory.append(radii[i] * shell_upper) new_trajectory.append(radii[i] * shell_lower) # Concatenate or handle varying Ns value Ns_values = np.array([hem.shape[1] for hem in new_trajectory]) if (Ns_values == Ns_values[0]).all(): output = np.concatenate(new_trajectory, axis=0) return output.reshape(Nc, Ns_values[0], 3) return np.concatenate([hem.reshape((-1, 3)) for hem in new_trajectory], axis=0)
######### # UTILS # #########
[docs] def duplicate_along_axes( trajectory: NDArray, axes: tuple[int, ...] = (0, 1, 2) ) -> NDArray: """ Duplicate a trajectory along the specified axes. The provided trajectories are replicated with different orientation, with the :math:`k_x`-axis being considered as the default orientation of the base trajectory. Parameters ---------- trajectory : NDArray Trajectory to duplicate. axes : tuple[int, ...], optional Axes along which to duplicate the trajectory, by default (0, 1, 2) Returns ------- NDArray Duplicated trajectory along the specified axes. """ # Copy input trajectory along other axes new_trajectory = [] if 0 in axes: new_trajectory.append(trajectory) if 1 in axes: dp_trajectory = np.copy(trajectory) dp_trajectory[..., [1, 2]] = dp_trajectory[..., [2, 1]] new_trajectory.append(dp_trajectory) if 2 in axes: dp_trajectory = np.copy(trajectory) dp_trajectory[..., [2, 0]] = dp_trajectory[..., [0, 2]] new_trajectory.append(dp_trajectory) return np.concatenate(new_trajectory, axis=0)
def _radialize_center_out(trajectory: NDArray, nb_samples: int) -> NDArray: """Radialize a trajectory from the center to the outside. Parameters ---------- trajectory : NDArray Trajectory to radialize. nb_samples : int Number of samples to radialize from the center. Returns ------- NDArray Radialized trajectory. """ Nc, Ns = trajectory.shape[:2] new_trajectory = np.copy(trajectory) for i in range(Nc): point = trajectory[i, nb_samples] new_trajectory[i, :nb_samples, 0] = np.linspace(0, point[0], nb_samples) new_trajectory[i, :nb_samples, 1] = np.linspace(0, point[1], nb_samples) new_trajectory[i, :nb_samples, 2] = np.linspace(0, point[2], nb_samples) return new_trajectory def _radialize_in_out(trajectory: NDArray, nb_samples: int) -> NDArray: """Radialize a trajectory from the inside to the outside. Parameters ---------- trajectory : NDArray Trajectory to radialize. nb_samples : int Number of samples to radialize from the inside out. Returns ------- NDArray Radialized trajectory. """ Nc, Ns = trajectory.shape[:2] new_trajectory = np.copy(trajectory) first, half, second = (Ns - nb_samples) // 2, Ns // 2, (Ns + nb_samples) // 2 for i in range(Nc): p1 = trajectory[i, first] new_trajectory[i, first:half, 0] = np.linspace(0, p1[0], nb_samples // 2) new_trajectory[i, first:half, 1] = np.linspace(0, p1[1], nb_samples // 2) new_trajectory[i, first:half, 2] = np.linspace(0, p1[2], nb_samples // 2) p2 = trajectory[i, second] new_trajectory[i, half:second, 0] = np.linspace(0, p2[0], nb_samples // 2) new_trajectory[i, half:second, 1] = np.linspace(0, p2[1], nb_samples // 2) new_trajectory[i, half:second, 2] = np.linspace(0, p2[2], nb_samples // 2) return new_trajectory
[docs] def radialize_center( trajectory: NDArray, nb_samples: int, in_out: bool = False ) -> NDArray: """Radialize a trajectory. Parameters ---------- trajectory : NDArray Trajectory to radialize. nb_samples : int Number of samples to keep. in_out : bool, optional Whether the radialization is from the inside to the outside, by default False Returns ------- NDArray Radialized trajectory. """ # Make nb_samples into straight lines around the center if in_out: return _radialize_in_out(trajectory, nb_samples) return _radialize_center_out(trajectory, nb_samples)
################# # Randomization # ################# def _flip2center(mask_cols: list[int], center_value: int) -> np.ndarray: """ Reorder a list by starting by a center_position and alternating left/right. Parameters ---------- mask_cols: list or np.array List of columns to reorder. center_pos: int Position of the center column. Returns ------- np.array: reordered columns. """ center_pos = np.argmin(np.abs(np.array(mask_cols) - center_value)) mask_cols = list(mask_cols) left = mask_cols[center_pos::-1] right = mask_cols[center_pos + 1 :] new_cols = [] while left or right: if left: new_cols.append(left.pop(0)) if right: new_cols.append(right.pop(0)) return np.array(new_cols)
[docs] def get_random_loc_1d( dim_size: int, center_prop: float | int, accel: float = 4, pdf: Literal["uniform", "gaussian", "equispaced"] | NDArray = "uniform", rng: int | np.random.Generator | None = None, order: Literal["center-out", "top-down", "random"] = "center-out", ) -> NDArray: """Get slice index at a random position. Parameters ---------- dim_size: int Dimension size center_prop: float or int Proportion of center of kspace to continuouly sample accel: float Undersampling/Acceleration factor pdf: str, optional Probability density function for the remaining samples. "gaussian" (default) or "uniform" or np.array rng: int or np.random.Generator random state order: str Order of the lines, "center-out" (default), "random" or "top-down" Returns ------- np.ndarray: array of size dim_size/accel. """ order = VDSorder(order) pdf = VDSpdf(pdf) if isinstance(pdf, str) else pdf if accel == 0 or accel == 1: return np.arange(dim_size) # type: ignore elif accel < 0: raise ValueError("acceleration factor should be positive.") elif isinstance(accel, float): raise ValueError("acceleration factor should be an integer.") indexes = list(range(dim_size)) if not isinstance(center_prop, int): center_prop = int(center_prop * dim_size) center_start = (dim_size - center_prop) // 2 center_stop = (dim_size + center_prop) // 2 center_indexes = indexes[center_start:center_stop] borders = np.asarray([*indexes[:center_start], *indexes[center_stop:]]) n_samples_borders = (dim_size - len(center_indexes)) // accel if n_samples_borders < 1: raise ValueError( "acceleration factor, center_prop and dimension not compatible." "Edges will not be sampled. " ) rng = np.random.default_rng(rng) # get RNG from a seed or existing rng. def _get_samples(p: np.typing.ArrayLike) -> list[int]: p = p / np.sum(p) # automatic casting if needed return list(rng.choice(borders, size=n_samples_borders, replace=False, p=p)) if isinstance(pdf, np.ndarray): if len(pdf) == dim_size: # extract the borders p = pdf[borders] elif len(pdf) == len(borders): p = pdf else: raise ValueError("Invalid size for probability.") sampled_in_border = _get_samples(p) elif pdf == VDSpdf.GAUSSIAN: p = norm.pdf(np.linspace(norm.ppf(0.001), norm.ppf(0.999), len(borders))) sampled_in_border = _get_samples(p) elif pdf == VDSpdf.UNIFORM: p = np.ones(len(borders)) sampled_in_border = _get_samples(p) elif pdf == VDSpdf.EQUISPACED: sampled_in_border = list(borders[::accel]) else: raise ValueError("Unsupported value for pdf use any of . ") # TODO: allow custom pdf as argument (vector or function.) line_locs = np.array(sorted(center_indexes + sampled_in_border)) # apply order of lines if order == VDSorder.CENTER_OUT: line_locs = _flip2center(sorted(line_locs), dim_size // 2) elif order == VDSorder.RANDOM: line_locs = rng.permutation(line_locs) elif order == VDSorder.TOP_DOWN: line_locs = np.array(sorted(line_locs)) else: raise ValueError(f"Unknown direction '{order}'.") return (line_locs / dim_size) * 2 * KMAX - KMAX # rescale to [-0.5,0.5]
[docs] def stack_random( trajectory: NDArray, dim_size: int, center_prop: float | int = 0.0, accel: float | int = 4, pdf: Literal["uniform", "gaussian", "equispaced"] | NDArray = "uniform", rng: int | np.random.Generator | None = None, order: Literal["center-out", "top-down", "random"] = "center-out", ): """Stack a 2D trajectory with random location. Parameters ---------- traj: np.ndarray Existing 2D trajectory. dim_size: int Size of the k_z dimension center_prop: int or float Number of line or proportion of slice to sample in the center of the k-space accel: int Undersampling/Acceleration factor pdf: str or np.array Probability density function for the remaining samples. "uniform" (default), "gaussian" or np.array rng: random state order: str Order of the lines, "center-out" (default), "random" or "top-down" Returns ------- numpy.ndarray The 3D trajectory stacked along the :math:`k_z` axis. """ line_locs = get_random_loc_1d(dim_size, center_prop, accel, pdf, rng, order) if len(trajectory.shape) == 2: Nc, Ns = 1, trajectory.shape[0] else: Nc, Ns = trajectory.shape[:2] new_trajectory = np.zeros((len(line_locs), Nc, Ns, 3)) for i, loc in enumerate(line_locs): new_trajectory[i, :, :, :2] = trajectory[..., :2] if trajectory.shape[-1] == 3: new_trajectory[i, :, :, 2] = trajectory[..., 2] + loc else: new_trajectory[i, :, :, 2] = loc return new_trajectory.reshape(-1, Ns, 3)