Source code for mrinufft.io.nsp

"""Read/Write trajectory for Neurospin sequences."""

from __future__ import annotations

import os
import warnings
from array import array
from datetime import datetime

import numpy as np

from mrinufft.trajectories.utils import (
    DEFAULT_GMAX,
    DEFAULT_RASTER_TIME,
    DEFAULT_SMAX,
    KMAX,
    Gammas,
    check_hardware_constraints,
    convert_gradients_to_slew_rates,
    convert_trajectory_to_gradients,
)

from .siemens import read_siemens_rawdat


[docs] def write_gradients( gradients: np.ndarray, initial_positions: np.ndarray, grad_filename: str, img_size: tuple[int, ...], FOV: tuple[float, ...], in_out: bool = True, min_osf: int = 5, gamma: float = Gammas.HYDROGEN, version: float = 4.2, recon_tag: float = 1.1, timestamp: float | None = None, keep_txt_file: bool = False, final_positions: np.ndarray | None = None, ): """Create gradient file from gradients and initial positions. Parameters ---------- gradients : np.ndarray Gradients. Shape (num_shots, num_samples_per_shot, dimension). initial_positions : np.ndarray Initial positions. Shape (num_shots, dimension). grad_filename : str Gradient filename. img_size : tuple[int, ...] Image size. FOV : tuple[float, ...] Field of view. in_out : bool, optional Whether it is In-Out trajectory?, by default True min_osf : int, optional Minimum oversampling factor needed at ADC, by default 5 gamma : float, optional Gyromagnetic ratio in kHz/T, by default 42.576e3 version : float, optional Trajectory versioning, by default 4.2 recon_tag : float, optional Reconstruction tag for online recon, by default 1.1 timestamp : float, optional Timestamp of trajectory, by default None keep_txt_file : bool, optional Whether to keep the text file used temporarily which holds data pushed to binary file, by default False final_positions : np.ndarray, optional Final positions. Shape (num_shots, dimension), by default None """ num_shots = gradients.shape[0] num_samples_per_shot = gradients.shape[1] dimension = initial_positions.shape[-1] if len(gradients.shape) == 3: gradients = gradients.reshape(-1, gradients.shape[-1]) # Convert gradients to mT/m gradients = gradients * 1e3 max_grad = np.max(np.abs(gradients)) file = open(grad_filename + ".txt", "w") if version >= 4.1: file.write(str(version) + "\n") # Write the dimension, num_samples_per_shot and num_shots file.write(str(dimension) + "\n") if version >= 4.1: img_size = img_size FOV = FOV if isinstance(img_size, int): img_size = (img_size,) * dimension if isinstance(FOV, float): FOV = (FOV,) * dimension for fov in FOV: file.write(str(fov) + "\n") for sz in img_size: file.write(str(sz) + "\n") file.write(str(min_osf) + "\n") file.write(str(gamma * 1000) + "\n") file.write(str(num_shots) + "\n") file.write(str(num_samples_per_shot) + "\n") if version >= 4.1: if not in_out: if np.sum(initial_positions) != 0: warnings.warn( "The initial positions are not all zero for center-out trajectory" ) file.write("0\n") else: file.write("0.5\n") # Write the maximum Gradient file.write(str(max_grad) + "\n") # Write recon Pipeline version tag file.write(str(recon_tag) + "\n") left_over = 10 if version >= 4.2: # Inset datetime tag if timestamp is None: timestamp = float(datetime.now().timestamp()) file.write(str(timestamp) + "\n") left_over -= 1 file.write(str("0\n" * left_over)) # Write all the k0 values file.write( "\n".join( " ".join([f"{iter2:5.4f}" for iter2 in iter1]) for iter1 in initial_positions ) + "\n" ) if version >= 5: if final_positions is None: warnings.warn( "Final positions not provided for version >= 5," "calculating final positions from gradients" ) final_positions = initial_positions + np.sum(gradients, axis=1) file.write( "\n".join( " ".join([f"{iter2:5.4f}" for iter2 in iter1]) for iter1 in final_positions ) + "\n" ) if version < 4.1: # Write the maximum Gradient file.write(str(max_grad) + "\n") # Normalize gradients gradients = gradients / max_grad file.write( "\n".join(" ".join([f"{iter2:5.6f}" for iter2 in iter1]) for iter1 in gradients) + "\n" ) file.close() y = [] with open(grad_filename + ".txt") as txtfile: for line in txtfile: x = line.split(" ") for val in x: y.append(float(val)) float_array = array("f", y) with open(grad_filename + ".bin", "wb") as binfile: float_array.tofile(binfile) if not keep_txt_file: os.remove(grad_filename + ".txt")
def _pop_elements(array, num_elements=1, type="float"): """Pop elements from an array. Parameters ---------- array : np.ndarray Array to pop elements from. num_elements : int, optional number of elements to pop, by default 1 type : str, optional Type of the element being popped, by default 'float'. Returns ------- element_popped: Element popped from array with type as specified. array: np.ndarray Array with elements popped. """ if num_elements == 1: return array[0].astype(type, copy=False), array[1:] else: return array[0:num_elements].astype(type, copy=False), array[num_elements:]
[docs] def write_trajectory( trajectory: np.ndarray, FOV: tuple[float, ...], img_size: tuple[int, ...], grad_filename: str, norm_factor: float = KMAX, gamma: float = Gammas.HYDROGEN, raster_time: float = DEFAULT_RASTER_TIME, check_constraints: bool = True, gmax: float = DEFAULT_GMAX, smax: float = DEFAULT_SMAX, version: float = 5, **kwargs, ): """Calculate gradients from k-space points and write to file. Parameters ---------- trajectory : np.ndarray Trajectory in k-space points. Shape (num_shots, num_samples_per_shot, dimension). FOV : tuple Field of view img_size : tuple Image size grad_filename : str Gradient filename norm_factor : float, optional Trajectory normalization factor, by default 0.5 gamma : float, optional Gyromagnetic ratio in kHz/T, by default 42.576e3 raster_time : float, optional Gradient raster time in ms, by default 0.01 check_constraints : bool, optional Check scanner constraints, by default True gmax : float, optional Maximum gradient magnitude in T/m, by default 0.04 smax : float, optional Maximum slew rate in T/m/ms, by default 0.1 version: float, optional Trajectory versioning, by default 5 kwargs : dict, optional Additional arguments for writing the gradient file. These are arguments passed to write_gradients function above. """ # Convert normalized trajectory to gradients gradients, initial_positions, final_positions = convert_trajectory_to_gradients( trajectory, norm_factor=norm_factor, resolution=np.asarray(FOV) / np.asarray(img_size), raster_time=raster_time, gamma=gamma, get_final_positions=True, ) # Check constraints if requested if check_constraints: slewrates, _ = convert_gradients_to_slew_rates(gradients, raster_time) valid, maxG, maxS = check_hardware_constraints( gradients=gradients, slewrates=slewrates, gmax=gmax, smax=smax, ) if not valid: warnings.warn( "Hard constraints violated! " f"Maximum gradient amplitude: {maxG:.3f} > {gmax:.3f}" f"Maximum slew rate: {maxS:.3f} > {smax:.3f}" ) # Write gradients in file write_gradients( gradients=gradients, initial_positions=initial_positions, final_positions=final_positions, grad_filename=grad_filename, img_size=img_size, FOV=FOV, gamma=gamma, version=version, **kwargs, )
[docs] def read_trajectory( grad_filename: str, dwell_time: float | str = DEFAULT_RASTER_TIME, num_adc_samples: int = None, gamma: Gammas | float = Gammas.HYDROGEN, raster_time: float = DEFAULT_RASTER_TIME, read_shots: bool = False, normalize_factor: float = KMAX, ): """Get k-space locations from gradient file. Parameters ---------- grad_filename : str Gradient filename. dwell_time : float | str, optional Dwell time of ADC in ms, by default 0.01 It can also be string 'min_osf' to select dwell time based on minimum OSF needed to get Nyquist sampling (This is obtained from SPARKLING trajectory header). num_adc_samples : int, optional Number of ADC samples, by default None gamma : float, optional Gyromagnetic ratio in kHz/T, by default 42.576e3 gradient_raster_time : float, optional Gradient raster time in ms, by default 0.01 read_shots : bool, optional Whether in read shots configuration which accepts an extra point at end, by default False normalize : float, optional Whether to normalize the k-space locations, by default 0.5 When None, normalization is not done. Returns ------- kspace_loc : np.ndarray K-space locations. Shape (num_shots, num_adc_samples, dimension). """ with open(grad_filename, "rb") as binfile: data = np.fromfile(binfile, dtype=np.float32) if float(data[0]) > 4: version, data = _pop_elements(data) version = np.around(version, 2) else: version = 1 dimension, data = _pop_elements(data, type="int") if version >= 4.1: fov, data = _pop_elements(data, dimension) img_size, data = _pop_elements(data, dimension, type="int") min_osf, data = _pop_elements(data, type="int") gamma, data = _pop_elements(data) gamma = gamma / 1000 if dwell_time == "min_osf": dwell_time = raster_time / min_osf (num_shots, num_samples_per_shot), data = _pop_elements(data, 2, type="int") if num_adc_samples is None: if read_shots: num_adc_samples = num_samples_per_shot + 1 else: num_adc_samples = int(num_samples_per_shot * (raster_time / dwell_time)) if version >= 4.1: TE, data = _pop_elements(data) grad_max, data = _pop_elements(data) recon_tag, data = _pop_elements(data) recon_tag = np.around(recon_tag, 2) left_over = 10 if version >= 4.2: timestamp, data = _pop_elements(data) timestamp = datetime.fromtimestamp(float(timestamp)) left_over -= 1 _, data = _pop_elements(data, left_over) initial_positions, data = _pop_elements(data, dimension * num_shots) initial_positions = np.reshape(initial_positions, (num_shots, dimension)) if version >= 5: final_positions, data = _pop_elements(data, dimension * num_shots) final_positions = np.reshape(final_positions, (num_shots, dimension)) dwell_time_ns = dwell_time * 1e6 gradient_raster_time_ns = raster_time * 1e6 if version < 4.1: grad_max, data = _pop_elements(data) gradients, data = _pop_elements( data, dimension * num_samples_per_shot * num_shots, ) gradients = np.reshape( grad_max * gradients, (num_shots * num_samples_per_shot, dimension) ) # Convert gradients from mT/m to T/m gradients = np.reshape(gradients * 1e-3, (-1, num_samples_per_shot, dimension)) kspace_loc = np.zeros((num_shots, num_adc_samples, dimension)) kspace_loc[:, 0, :] = initial_positions adc_times = dwell_time_ns * np.arange(1, num_adc_samples) Q, R = divmod(adc_times, gradient_raster_time_ns) Q = Q.astype("int") if not np.all( np.logical_or( Q < num_adc_samples, np.logical_and(Q == num_adc_samples, R == 0) ) ): warnings.warn("Binary file doesn't seem right! " "Proceeding anyway") grad_accumulated = np.cumsum(gradients, axis=1) * gradient_raster_time_ns for i, (q, r) in enumerate(zip(Q, R)): if q >= gradients.shape[1]: if q > gradients.shape[1]: warnings.warn( "Number of samples is more than what was " "obtained in binary file!\n" "Data will be extended" ) kspace_loc[:, i + 1, :] = ( initial_positions + ( grad_accumulated[:, gradients.shape[1] - 1, :] + gradients[:, gradients.shape[1] - 1, :] * r ) * gamma * 1e-6 ) else: if q == 0: kspace_loc[:, i + 1, :] = ( initial_positions + gradients[:, q, :] * r * gamma * 1e-6 ) else: kspace_loc[:, i + 1, :] = ( initial_positions + (grad_accumulated[:, q - 1, :] + gradients[:, q, :] * r) * gamma * 1e-6 ) params = { "version": version, "dimension": dimension, "num_shots": num_shots, "num_samples_per_shot": num_samples_per_shot, } if version >= 4.1: params["FOV"] = fov params["img_size"] = img_size params["min_osf"] = min_osf params["gamma"] = gamma params["recon_tag"] = recon_tag params["TE"] = TE if version >= 4.2: params["timestamp"] = timestamp if normalize_factor is not None: Kmax = img_size / 2 / fov kspace_loc = kspace_loc / Kmax * normalize_factor return kspace_loc, params
[docs] def read_arbgrad_rawdat( filename: str, removeOS: bool = False, doAverage: bool = True, squeeze: bool = True, slice_num: int | None = None, contrast_num: int | None = None, data_type: str = "ARBGRAD_VE11C", ): # pragma: no cover """Read raw data from a Siemens MRI file. Parameters ---------- filename : str The path to the Siemens MRI file. removeOS : bool, optional Whether to remove the oversampling, by default False. doAverage : bool, optional Whether to average the data acquired along NAve dimension, by default True. squeeze : bool, optional Whether to squeeze the dimensions of the data, by default True. slice_num : int, optional The slice to read, by default None. This applies for 2D data. contrast_num: int, optional The contrast to read, by default None. data_type : str, optional The type of data to read, by default 'ARBGRAD_VE11C'. Returns ------- data: ndarray Imported data formatted as n_coils X n_samples X n_slices X n_contrasts hdr: dict Extra information about the data parsed from the twix file Raises ------ ImportError If the mapVBVD module is not available. Notes ----- This function requires the mapVBVD module to be installed. You can install it using the following command: `pip install pymapVBVD` """ data, hdr, twixObj = read_siemens_rawdat( filename=filename, removeOS=removeOS, doAverage=doAverage, squeeze=squeeze, slice_num=slice_num, contrast_num=contrast_num, ) if "ARBGRAD_VE11C" in data_type: hdr["type"] = "ARBGRAD_GRE" hdr["shifts"] = () for s in [7, 6, 8]: shift = twixObj.search_header_for_val( "Phoenix", ("sWiPMemBlock", "adFree", str(s)) ) hdr["shifts"] += (0,) if shift == [] else (shift[0],) hdr["oversampling_factor"] = twixObj.search_header_for_val( "Phoenix", ("sWiPMemBlock", "alFree", "4") )[0] hdr["trajectory_name"] = twixObj.search_header_for_val( "Phoenix", ("sWipMemBlock", "tFree") )[0][1:-1] if hdr["n_contrasts"] > 1: hdr["turboFactor"] = twixObj.search_header_for_val( "Phoenix", ("sFastImaging", "lTurboFactor") )[0] hdr["type"] = "ARBGRAD_MP2RAGE" return data, hdr