Source code for mrinufft.io.nsp

"""Read/Write trajectory for Neurospin sequences."""

from __future__ import annotations

import os
import warnings
from array import array
from datetime import datetime

import numpy as np

from mrinufft.trajectories.utils import (
    DEFAULT_GMAX,
    DEFAULT_RASTER_TIME,
    DEFAULT_SMAX,
    KMAX,
    Gammas,
    check_hardware_constraints,
    convert_gradients_to_slew_rates,
    convert_trajectory_to_gradients,
)

from .siemens import read_siemens_rawdat


[docs] def write_gradients( gradients: np.ndarray, initial_positions: np.ndarray, grad_filename: str, img_size: tuple[int, ...], FOV: tuple[float, ...], in_out: bool = True, min_osf: int = 5, gamma: float = Gammas.HYDROGEN, version: float = 4.2, recon_tag: float = 1.1, timestamp: float | None = None, keep_txt_file: bool = False, final_positions: np.ndarray | None = None, ): """Create gradient file from gradients and initial positions. Parameters ---------- gradients : np.ndarray Gradients. Shape (num_shots, num_samples_per_shot, dimension). initial_positions : np.ndarray Initial positions. Shape (num_shots, dimension). grad_filename : str Gradient filename. img_size : tuple[int, ...] Image size. FOV : tuple[float, ...] Field of view. in_out : bool, optional Whether it is In-Out trajectory?, by default True min_osf : int, optional Minimum oversampling factor needed at ADC, by default 5 gamma : float, optional Gyromagnetic ratio in kHz/T, by default 42.576e3 version : float, optional Trajectory versioning, by default 4.2 recon_tag : float, optional Reconstruction tag for online recon, by default 1.1 timestamp : float, optional Timestamp of trajectory, by default None keep_txt_file : bool, optional Whether to keep the text file used temporarily which holds data pushed to binary file, by default False final_positions : np.ndarray, optional Final positions. Shape (num_shots, dimension), by default None """ num_shots = gradients.shape[0] num_samples_per_shot = gradients.shape[1] dimension = initial_positions.shape[-1] if len(gradients.shape) == 3: gradients = gradients.reshape(-1, gradients.shape[-1]) # Convert gradients to mT/m gradients = gradients * 1e3 max_grad = np.max(np.abs(gradients)) file = open(grad_filename + ".txt", "w") if version >= 4.1: file.write(str(version) + "\n") # Write the dimension, num_samples_per_shot and num_shots file.write(str(dimension) + "\n") if version >= 4.1: img_size = img_size FOV = FOV if isinstance(img_size, int): img_size = (img_size,) * dimension if isinstance(FOV, float): FOV = (FOV,) * dimension for fov in FOV: file.write(str(fov) + "\n") for sz in img_size: file.write(str(sz) + "\n") file.write(str(min_osf) + "\n") file.write(str(gamma * 1000) + "\n") file.write(str(num_shots) + "\n") file.write(str(num_samples_per_shot) + "\n") if version >= 4.1: if not in_out: if np.sum(initial_positions) != 0: warnings.warn( "The initial positions are not all zero for center-out trajectory" ) file.write("0\n") else: file.write("0.5\n") # Write the maximum Gradient file.write(str(max_grad) + "\n") # Write recon Pipeline version tag file.write(str(recon_tag) + "\n") left_over = 10 if version >= 4.2: # Inset datetime tag if timestamp is None: timestamp = float(datetime.now().timestamp()) file.write(str(timestamp) + "\n") left_over -= 1 file.write(str("0\n" * left_over)) # Write all the k0 values file.write( "\n".join( " ".join([f"{iter2:5.4f}" for iter2 in iter1]) for iter1 in initial_positions ) + "\n" ) if version >= 5: if final_positions is None: warnings.warn( "Final positions not provided for version >= 5," "calculating final positions from gradients" ) final_positions = initial_positions + np.sum(gradients, axis=1) file.write( "\n".join( " ".join([f"{iter2:5.4f}" for iter2 in iter1]) for iter1 in final_positions ) + "\n" ) if version < 4.1: # Write the maximum Gradient file.write(str(max_grad) + "\n") # Normalize gradients gradients = gradients / max_grad file.write( "\n".join(" ".join([f"{iter2:5.6f}" for iter2 in iter1]) for iter1 in gradients) + "\n" ) file.close() y = [] with open(grad_filename + ".txt") as txtfile: for line in txtfile: x = line.split(" ") for val in x: y.append(float(val)) float_array = array("f", y) with open(grad_filename + ".bin", "wb") as binfile: float_array.tofile(binfile) if not keep_txt_file: os.remove(grad_filename + ".txt")
def _pop_elements(array, num_elements=1, type="float"): """Pop elements from an array. Parameters ---------- array : np.ndarray Array to pop elements from. num_elements : int, optional number of elements to pop, by default 1 type : str, optional Type of the element being popped, by default 'float'. Returns ------- element_popped: Element popped from array with type as specified. array: np.ndarray Array with elements popped. """ if num_elements == 1: return array[0].astype(type, copy=False), array[1:] else: return array[0:num_elements].astype(type, copy=False), array[num_elements:]
[docs] def write_trajectory( trajectory: np.ndarray, FOV: tuple[float, ...], img_size: tuple[int, ...], grad_filename: str, norm_factor: float = KMAX, gamma: float = Gammas.HYDROGEN, raster_time: float = DEFAULT_RASTER_TIME, check_constraints: bool = True, gmax: float = DEFAULT_GMAX, smax: float = DEFAULT_SMAX, version: float = 5, **kwargs, ): """Calculate gradients from k-space points and write to file. Parameters ---------- trajectory : np.ndarray Trajectory in k-space points. Shape (num_shots, num_samples_per_shot, dimension). FOV : tuple Field of view img_size : tuple Image size grad_filename : str Gradient filename norm_factor : float, optional Trajectory normalization factor, by default 0.5 gamma : float, optional Gyromagnetic ratio in kHz/T, by default 42.576e3 raster_time : float, optional Gradient raster time in ms, by default 0.01 check_constraints : bool, optional Check scanner constraints, by default True gmax : float, optional Maximum gradient magnitude in T/m, by default 0.04 smax : float, optional Maximum slew rate in T/m/ms, by default 0.1 version: float, optional Trajectory versioning, by default 5 kwargs : dict, optional Additional arguments for writing the gradient file. These are arguments passed to write_gradients function above. """ # Convert normalized trajectory to gradients gradients, initial_positions, final_positions = convert_trajectory_to_gradients( trajectory, norm_factor=norm_factor, resolution=np.asarray(FOV) / np.asarray(img_size), raster_time=raster_time, gamma=gamma, get_final_positions=True, ) # Check constraints if requested if check_constraints: slewrates, _ = convert_gradients_to_slew_rates(gradients, raster_time) valid, maxG, maxS = check_hardware_constraints( gradients=gradients, slewrates=slewrates, gmax=gmax, smax=smax, ) if not valid: warnings.warn( "Hard constraints violated! " f"Maximum gradient amplitude: {maxG:.3f} > {gmax:.3f}" f"Maximum slew rate: {maxS:.3f} > {smax:.3f}" ) # Write gradients in file write_gradients( gradients=gradients, initial_positions=initial_positions, final_positions=final_positions, grad_filename=grad_filename, img_size=img_size, FOV=FOV, gamma=gamma, version=version, **kwargs, )
[docs] def read_trajectory( grad_filename: str, dwell_time: float | str = DEFAULT_RASTER_TIME, num_adc_samples: int = None, gamma: Gammas | float = Gammas.HYDROGEN, raster_time: float = DEFAULT_RASTER_TIME, read_shots: bool = False, normalize_factor: float = KMAX, pre_skip: int = 0, ): """Get k-space locations from gradient file. Parameters ---------- grad_filename : str Gradient filename. dwell_time : float | str, optional Dwell time of ADC in ms, by default 0.01 It can also be string 'min_osf' to select dwell time based on minimum OSF needed to get Nyquist sampling (This is obtained from SPARKLING trajectory header). num_adc_samples : int, optional Number of ADC samples, by default None gamma : float, optional Gyromagnetic ratio in kHz/T, by default 42.576e3 gradient_raster_time : float, optional Gradient raster time in ms, by default 0.01 read_shots : bool, optional Whether in read shots configuration which accepts an extra point at end, by default False normalize_factor : float, optional Whether to normalize the k-space locations, by default 0.5 When None, normalization is not done. pre_skip: int, optional Number of samples to skip from the start of each shot, by default 0. This is useful when we want to avoid artifacts from ADC switching in UTE sequences. Returns ------- kspace_loc : np.ndarray K-space locations. Shape (num_shots, num_adc_samples, dimension). """ with open(grad_filename, "rb") as binfile: data = np.fromfile(binfile, dtype=np.float32) if float(data[0]) > 4: version, data = _pop_elements(data) version = np.around(version, 2) else: version = 1 dimension, data = _pop_elements(data, type="int") if version >= 4.1: fov, data = _pop_elements(data, dimension) img_size, data = _pop_elements(data, dimension, type="int") min_osf, data = _pop_elements(data, type="int") gamma, data = _pop_elements(data) gamma = gamma / 1000 if dwell_time == "min_osf": dwell_time = raster_time / min_osf (num_shots, num_samples_per_shot), data = _pop_elements(data, 2, type="int") if num_adc_samples is None: if read_shots: num_adc_samples = num_samples_per_shot + 1 else: num_adc_samples = int(num_samples_per_shot * (raster_time / dwell_time)) if version >= 4.1: TE, data = _pop_elements(data) grad_max, data = _pop_elements(data) recon_tag, data = _pop_elements(data) recon_tag = np.around(recon_tag, 2) left_over = 10 if version >= 4.2: timestamp, data = _pop_elements(data) timestamp = datetime.fromtimestamp(float(timestamp)) left_over -= 1 _, data = _pop_elements(data, left_over) initial_positions, data = _pop_elements(data, dimension * num_shots) initial_positions = np.reshape(initial_positions, (num_shots, dimension)) if version >= 5: final_positions, data = _pop_elements(data, dimension * num_shots) final_positions = np.reshape(final_positions, (num_shots, dimension)) dwell_time_ns = dwell_time * 1e6 gradient_raster_time_ns = raster_time * 1e6 if version < 4.1: grad_max, data = _pop_elements(data) gradients, data = _pop_elements( data, dimension * num_samples_per_shot * num_shots, ) gradients = np.reshape( grad_max * gradients, (num_shots * num_samples_per_shot, dimension) ) # Convert gradients from mT/m to T/m gradients = np.reshape(gradients * 1e-3, (-1, num_samples_per_shot, dimension)) kspace_loc = np.zeros((num_shots, num_adc_samples, dimension)) kspace_loc[:, 0, :] = initial_positions adc_times = dwell_time_ns * np.arange(1, num_adc_samples) Q, R = divmod(adc_times, gradient_raster_time_ns) Q = Q.astype("int") if not np.all( np.logical_or( Q < num_adc_samples, np.logical_and(Q == num_adc_samples, R == 0) ) ): warnings.warn("Binary file doesn't seem right! Proceeding anyway") grad_accumulated = np.cumsum(gradients, axis=1) * gradient_raster_time_ns for i, (q, r) in enumerate(zip(Q, R)): if q >= gradients.shape[1]: if q > gradients.shape[1]: warnings.warn( "Number of samples is more than what was " "obtained in binary file!\n" "Data will be extended" ) kspace_loc[:, i + 1, :] = ( initial_positions + ( grad_accumulated[:, gradients.shape[1] - 1, :] + gradients[:, gradients.shape[1] - 1, :] * r ) * gamma * 1e-6 ) else: if q == 0: kspace_loc[:, i + 1, :] = ( initial_positions + gradients[:, q, :] * r * gamma * 1e-6 ) else: kspace_loc[:, i + 1, :] = ( initial_positions + (grad_accumulated[:, q - 1, :] + gradients[:, q, :] * r) * gamma * 1e-6 ) params = { "version": version, "dimension": dimension, "num_shots": num_shots, "num_samples_per_shot": num_samples_per_shot, } if version >= 4.1: params["FOV"] = fov params["img_size"] = img_size params["min_osf"] = min_osf params["gamma"] = gamma params["recon_tag"] = recon_tag params["TE"] = TE if version >= 4.2: params["timestamp"] = timestamp if normalize_factor is not None: Kmax = img_size / 2 / fov kspace_loc = kspace_loc / Kmax * normalize_factor if pre_skip > 0: if pre_skip >= num_samples_per_shot: raise ValueError( "skip_first_Nsamples should be less than num_adc_samples" ) oversample_factor = num_adc_samples / num_samples_per_shot skip_samples = pre_skip * int(oversample_factor) kspace_loc = kspace_loc[:, skip_samples:] params["num_adc_samples"] = num_adc_samples - skip_samples return kspace_loc, params
[docs] def read_arbgrad_rawdat( filename: str, removeOS: bool = False, doAverage: bool = True, squeeze: bool = True, slice_num: int | None = None, contrast_num: int | None = None, pre_skip: int = 0, data_type: str = "ARBGRAD_VE11C", ): # pragma: no cover """Read raw data from a Siemens MRI file. Parameters ---------- filename : str The path to the Siemens MRI file. removeOS : bool, optional Whether to remove the oversampling, by default False. doAverage : bool, optional Whether to average the data acquired along NAve dimension, by default True. squeeze : bool, optional Whether to squeeze the dimensions of the data, by default True. slice_num : int, optional The slice to read, by default None. This applies for 2D data. contrast_num: int, optional The contrast to read, by default None. pre_skip : int, optional Number of samples to skip from the start of each shot, by default 0. This is useful when we want to avoid artifacts from ADC switching in UTE sequences. data_type : str, optional The type of data to read, by default 'ARBGRAD_VE11C'. Returns ------- data: ndarray Imported data formatted as n_coils X n_samples X n_slices X n_contrasts hdr: dict Extra information about the data parsed from the twix file Raises ------ ImportError If the mapVBVD module is not available. Notes ----- This function requires the mapVBVD module to be installed. You can install it using the following command: `pip install pymapVBVD` """ data, hdr, twixObj = read_siemens_rawdat( filename=filename, removeOS=removeOS, doAverage=doAverage, squeeze=squeeze, slice_num=slice_num, contrast_num=contrast_num, ) if "ARBGRAD_VE11C" in data_type: hdr["type"] = "ARBGRAD_GRE" hdr["shifts"] = () for s in [6, 7, 8]: shift = twixObj.search_header_for_val( "Phoenix", ("sWiPMemBlock", "adFree", str(s)) ) hdr["shifts"] += (0,) if shift == [] else (shift[0],) hdr["oversampling_factor"] = twixObj.search_header_for_val( "Phoenix", ("sWiPMemBlock", "alFree", "4") )[0] hdr["trajectory_name"] = twixObj.search_header_for_val( "Phoenix", ("sWipMemBlock", "tFree") )[0][1:-1] if hdr["n_contrasts"] > 1: hdr["turboFactor"] = twixObj.search_header_for_val( "Phoenix", ("sFastImaging", "lTurboFactor") )[0] hdr["type"] = "ARBGRAD_MP2RAGE" if pre_skip > 0: samples_to_skip = int(hdr["oversampling_factor"] * pre_skip) if samples_to_skip >= hdr["n_adc_samples"]: raise ValueError( "Samples to skip should be less than n_samples in the data" ) data = data[:, :, samples_to_skip:] hdr["n_adc_samples"] -= samples_to_skip return data, hdr