Source code for mrinufft.io.nsp

"""Read/Write trajectory for Neurospin sequences."""

from __future__ import annotations

import os
import warnings
from array import array
from datetime import datetime

import numpy as np
from typing import Optional

from mrinufft.trajectories.utils import (
    DEFAULT_GMAX,
    DEFAULT_RASTER_TIME,
    DEFAULT_SMAX,
    KMAX,
    Gammas,
    check_hardware_constraints,
    convert_gradients_to_slew_rates,
    convert_trajectory_to_gradients,
)
from mrinufft.trajectories.tools import get_gradient_amplitudes_to_travel_for_set_time

from .siemens import read_siemens_rawdat


[docs] def write_gradients( gradients: np.ndarray, initial_positions: np.ndarray, grad_filename: str, img_size: tuple[int, ...], FOV: tuple[float, ...], TE_pos: float = 0.5, min_osf: int = 5, gamma: float = Gammas.HYDROGEN, version: float = 4.2, recon_tag: float = 1.1, timestamp: float | None = None, keep_txt_file: bool = False, final_positions: np.ndarray | None = None, start_skip_samples: int = 0, end_skip_samples: int = 0, ): """Create gradient file from gradients and initial positions. Parameters ---------- gradients : np.ndarray Gradients. Shape (num_shots, num_samples_per_shot, dimension). initial_positions : np.ndarray Initial positions. Shape (num_shots, dimension). grad_filename : str Gradient filename. img_size : tuple[int, ...] Image size. FOV : tuple[float, ...] Field of view. TE_pos : float, optional The ratio of trajectory when TE occurs, with 0 as start of trajectory and 1 as end. By default 0.5, which is the center of the trajectory (in-out trajectory). min_osf : int, optional Minimum oversampling factor needed at ADC, by default 5 gamma : float, optional Gyromagnetic ratio in kHz/T, by default 42.576e3 version : float, optional Trajectory versioning, by default 4.2 recon_tag : float, optional Reconstruction tag for online recon, by default 1.1 timestamp : float, optional Timestamp of trajectory, by default None keep_txt_file : bool, optional Whether to keep the text file used temporarily which holds data pushed to binary file, by default False final_positions : np.ndarray, optional Final positions. Shape (num_shots, dimension), by default None start_skip_samples : int, optional Number of samples to skip in ADC at start of each shot, by default 0 This works only for version >= 5.1. end_skip_samples : int, optional Number of samples to skip in ADC at end of each shot, by default 0 This works only for version >= 5.1. """ num_shots = gradients.shape[0] num_samples_per_shot = gradients.shape[1] dimension = initial_positions.shape[-1] if len(gradients.shape) == 3: gradients = gradients.reshape(-1, gradients.shape[-1]) # Convert gradients to mT/m gradients = gradients * 1e3 max_grad = np.max(np.abs(gradients)) file = open(grad_filename + ".txt", "w") if version >= 4.1: file.write(str(version) + "\n") # Write the dimension, num_samples_per_shot and num_shots file.write(str(dimension) + "\n") if version >= 4.1: img_size = img_size FOV = FOV if isinstance(img_size, int): img_size = (img_size,) * dimension if isinstance(FOV, float): FOV = (FOV,) * dimension for fov in FOV: file.write(str(fov) + "\n") for sz in img_size: file.write(str(sz) + "\n") file.write(str(min_osf) + "\n") file.write(str(gamma * 1000) + "\n") file.write(str(num_shots) + "\n") file.write(str(num_samples_per_shot) + "\n") if version >= 4.1: if TE_pos == 0: if np.sum(initial_positions) != 0: warnings.warn( "The initial positions are not all zero for center-out trajectory" ) file.write(str(TE_pos) + "\n") # Write the maximum Gradient file.write(str(max_grad) + "\n") # Write recon Pipeline version tag file.write(str(recon_tag) + "\n") left_over = 10 if version >= 4.2: # Inset datetime tag if timestamp is None: timestamp = float(datetime.now().timestamp()) file.write(str(timestamp) + "\n") left_over -= 1 if version >= 5.1: file.write(str(start_skip_samples) + "\n") file.write(str(end_skip_samples) + "\n") left_over -= 2 file.write(str("0\n" * left_over)) # Write all the k0 values file.write( "\n".join( " ".join([f"{iter2:5.4f}" for iter2 in iter1]) for iter1 in initial_positions ) + "\n" ) if version >= 5: if final_positions is None: warnings.warn( "Final positions not provided for version >= 5," "calculating final positions from gradients" ) final_positions = initial_positions + np.sum(gradients, axis=1) file.write( "\n".join( " ".join([f"{iter2:5.4f}" for iter2 in iter1]) for iter1 in final_positions ) + "\n" ) if version < 4.1: # Write the maximum Gradient file.write(str(max_grad) + "\n") # Normalize gradients gradients = gradients / max_grad file.write( "\n".join(" ".join([f"{iter2:5.6f}" for iter2 in iter1]) for iter1 in gradients) + "\n" ) file.close() y = [] with open(grad_filename + ".txt") as txtfile: for line in txtfile: x = line.split(" ") for val in x: y.append(float(val)) float_array = array("f", y) with open(grad_filename + ".bin", "wb") as binfile: float_array.tofile(binfile) if not keep_txt_file: os.remove(grad_filename + ".txt")
def _pop_elements(array, num_elements=1, type=np.float32): """Pop elements from an array. Parameters ---------- array : np.ndarray Array to pop elements from. num_elements : int, optional number of elements to pop, by default 1 type : str, optional Type of the element being popped, by default 'float'. Returns ------- element_popped: Element popped from array with type as specified. array: np.ndarray Array with elements popped. """ if num_elements == 1: return array[0].astype(type, copy=False), array[1:] else: return array[0:num_elements].astype(type, copy=False), array[num_elements:]
[docs] def write_trajectory( trajectory: np.ndarray, FOV: tuple[float, ...], img_size: tuple[int, ...], grad_filename: str, norm_factor: float = KMAX, gamma: float = Gammas.HYDROGEN, raster_time: float = DEFAULT_RASTER_TIME, check_constraints: bool = True, TE_pos: float = 0.5, gmax: float = DEFAULT_GMAX, smax: float = DEFAULT_SMAX, pregrad: str | None = None, postgrad: str | None = None, version: float = 5, **kwargs, ): """Calculate gradients from k-space points and write to file. Parameters ---------- trajectory : np.ndarray Trajectory in k-space points. Shape (num_shots, num_samples_per_shot, dimension). FOV : tuple Field of view img_size : tuple Image size grad_filename : str Gradient filename norm_factor : float, optional Trajectory normalization factor, by default 0.5 gamma : float, optional Gyromagnetic ratio in kHz/T, by default 42.576e3 raster_time : float, optional Gradient raster time in ms, by default 0.01 check_constraints : bool, optional Check scanner constraints, by default True TE_pos : float, optional The ratio of trajectory when TE occurs, with 0 as start of trajectory and 1 as end. By default 0.5, which is the center of the trajectory (in-out trajectory). gmax : float, optional Maximum gradient magnitude in T/m, by default 0.04 smax : float, optional Maximum slew rate in T/m/ms, by default 0.1 pregrad : str, optional Pregrad method, by default `prephase` `prephase` will add a prephasing gradient to the start of the trajectory. postgrad : str, optional Postgrad method, by default 'slowdown_to_edge' `slowdown_to_edge` will add a gradient to slow down to the edge of the k-space along x-axis for all the shots i.e. go to (Kmax, 0, 0). This is useful for sequences needing a spoiler at the end of the trajectory. However, spoiler is still not added, it is expected that the sequence handles the spoilers, which can be variable. `slowdown_to_center` will add a gradient to slow down to the center of the k-space. version: float, optional Trajectory versioning, by default 5 kwargs : dict, optional Additional arguments for writing the gradient file. These are arguments passed to write_gradients function above. """ # Convert normalized trajectory to gradients gradients, initial_positions, final_positions = convert_trajectory_to_gradients( trajectory, norm_factor=norm_factor, resolution=np.asarray(FOV) / np.asarray(img_size), raster_time=raster_time, gamma=gamma, get_final_positions=True, ) Ns_to_skip_at_start = 0 Ns_to_skip_at_end = 0 scan_consts = { "gamma": gamma, "gmax": gmax, "smax": smax, "raster_time": raster_time, } if pregrad == "prephase": if version < 5.1: raise ValueError( "pregrad is only supported for version >= 5.1, " "please set version to 5.1 or higher." ) start_gradients = get_gradient_amplitudes_to_travel_for_set_time( kspace_end_loc=initial_positions, end_gradients=gradients[:, 0], **scan_consts, ) initial_positions = np.zeros_like(initial_positions) gradients = np.hstack([start_gradients, gradients]) Ns_to_skip_at_start = start_gradients.shape[1] if postgrad: if version < 5.1: raise ValueError( "postgrad is only supported for version >= 5.1, " "please set version to 5.1 or higher." ) edge_locations = np.zeros_like(final_positions) if postgrad == "slowdown_to_edge": # Always end at KMax, the spoilers can be handeled by the sequence. edge_locations[..., 0] = img_size[0] / FOV[0] / 2 end_gradients = get_gradient_amplitudes_to_travel_for_set_time( kspace_end_loc=edge_locations, start_gradients=gradients[:, -1], kspace_start_loc=final_positions, **scan_consts, ) gradients = np.hstack([gradients, end_gradients]) Ns_to_skip_at_end = end_gradients.shape[1] # Check constraints if requested if check_constraints: slewrates, _ = convert_gradients_to_slew_rates(gradients, raster_time) valid, maxG, maxS = check_hardware_constraints( gradients=gradients, slewrates=slewrates, gmax=gmax, smax=smax, ) if not valid: warnings.warn( "Hard constraints violated! " f"Maximum gradient amplitude: {maxG:.3f} > {gmax:.3f}" f"Maximum slew rate: {maxS:.3f} > {smax:.3f}" ) if pregrad != "prephase": border_slew_rate = gradients[:, 0] / raster_time if np.any(np.abs(border_slew_rate) > smax): warnings.warn( "Slew rate at start of trajectory exceeds maximum slew rate!" f"Maximum slew rate: {np.max(np.abs(border_slew_rate)):.3f}" f" > {smax:.3f}. Please use prephase gradient to avoid this " " issue." ) # Write gradients in file write_gradients( gradients=gradients, initial_positions=initial_positions, final_positions=final_positions, grad_filename=grad_filename, img_size=img_size, FOV=FOV, TE_pos=TE_pos, gamma=gamma, version=version, start_skip_samples=Ns_to_skip_at_start, end_skip_samples=Ns_to_skip_at_end, **kwargs, )
[docs] def read_trajectory( grad_filename: str, dwell_time: float | str = DEFAULT_RASTER_TIME, num_adc_samples: int | None = None, gamma: Gammas | float = Gammas.HYDROGEN, raster_time: float = DEFAULT_RASTER_TIME, read_shots: bool = False, normalize_factor: float = KMAX, pre_skip: int = 0, ): """Get k-space locations from gradient file. Parameters ---------- grad_filename : str Gradient filename. dwell_time : float | str, optional Dwell time of ADC in ms, by default 0.01 It can also be string 'min_osf' to select dwell time based on minimum OSF needed to get Nyquist sampling (This is obtained from SPARKLING trajectory header). num_adc_samples : int, optional Number of ADC samples, by default None gamma : float, optional Gyromagnetic ratio in kHz/T, by default 42.576e3 gradient_raster_time : float, optional Gradient raster time in ms, by default 0.01 read_shots : bool, optional Whether in read shots configuration which accepts an extra point at end, by default False normalize_factor : float, optional Whether to normalize the k-space locations, by default 0.5 When None, normalization is not done. pre_skip: int, optional Number of samples to skip from the start of each shot, by default 0. This is useful when we want to avoid artifacts from ADC switching in UTE sequences. Returns ------- kspace_loc : np.ndarray K-space locations. Shape (num_shots, num_adc_samples, dimension). """ with open(grad_filename, "rb") as binfile: data = np.fromfile(binfile, dtype=np.float32) if float(data[0]) > 4: version, data = _pop_elements(data) version = np.around(version, 2) else: version = 1 dimension, data = _pop_elements(data, type="int") if version >= 4.1: fov, data = _pop_elements(data, dimension) img_size, data = _pop_elements(data, dimension, type="int") min_osf, data = _pop_elements(data, type="int") gamma, data = _pop_elements(data) gamma = gamma / 1000 if dwell_time == "min_osf": dwell_time = raster_time / min_osf (num_shots, num_samples_per_shot), data = _pop_elements(data, 2, type="int") if version > 4: TE, data = _pop_elements(data) grad_max, data = _pop_elements(data) recon_tag, data = _pop_elements(data) recon_tag = np.around(recon_tag, 2) left_over = 10 if version > 4.1: timestamp, data = _pop_elements(data) timestamp = datetime.fromtimestamp(float(timestamp)) left_over -= 1 if version > 5: packed_skips, data = _pop_elements(data, num_elements=2, type="int") start_skip_samples, end_skip_samples = packed_skips left_over -= 2 else: start_skip_samples = 0 end_skip_samples = 0 _, data = _pop_elements(data, left_over) initial_positions, data = _pop_elements(data, dimension * num_shots) initial_positions = np.reshape(initial_positions, (num_shots, dimension)) if version > 4.5: final_positions, data = _pop_elements(data, dimension * num_shots) final_positions = np.reshape(final_positions, (num_shots, dimension)) dwell_time_ns = dwell_time * 1e6 gradient_raster_time_ns = raster_time * 1e6 if version < 4.1: grad_max, data = _pop_elements(data) gradients, data = _pop_elements( data, dimension * num_samples_per_shot * num_shots, ) # Convert gradients to T/m gradients = np.reshape( grad_max * gradients * 1e-3, (num_shots, num_samples_per_shot, dimension) ) # Handle skipped samples if start_skip_samples > 0: start_location_updates = ( np.sum(gradients[:, :start_skip_samples], axis=1) * raster_time * gamma ) initial_positions += start_location_updates gradients = gradients[:, start_skip_samples:, :] num_samples_per_shot -= start_skip_samples if end_skip_samples > 0: gradients = gradients[:, :-end_skip_samples, :] num_samples_per_shot -= end_skip_samples if num_adc_samples is None: if read_shots: # Acquire one extra sample at the end of each shot in read_shots mode num_adc_samples = num_samples_per_shot + 1 else: num_adc_samples = int(num_samples_per_shot * (raster_time / dwell_time)) kspace_loc = np.zeros((num_shots, num_adc_samples, dimension), dtype=np.float32) kspace_loc[:, 0, :] = initial_positions adc_times = dwell_time_ns * np.arange(1, num_adc_samples) Q, R = divmod(adc_times, gradient_raster_time_ns) Q = Q.astype("int") if not np.all( np.logical_or( Q < num_adc_samples, np.logical_and(Q == num_adc_samples, R == 0) ) ): warnings.warn("Binary file doesn't seem right! Proceeding anyway") grad_accumulated = np.cumsum(gradients, axis=1) * gradient_raster_time_ns for i, (q, r) in enumerate(zip(Q, R)): if q >= gradients.shape[1]: if q > gradients.shape[1]: warnings.warn( "Number of samples is more than what was " "obtained in binary file!\n" "Data will be extended" ) kspace_loc[:, i + 1, :] = ( initial_positions + ( grad_accumulated[:, gradients.shape[1] - 1, :] + gradients[:, gradients.shape[1] - 1, :] * r ) * gamma * 1e-6 ) else: if q == 0: kspace_loc[:, i + 1, :] = ( initial_positions + gradients[:, q, :] * r * gamma * 1e-6 ) else: kspace_loc[:, i + 1, :] = ( initial_positions + (grad_accumulated[:, q - 1, :] + gradients[:, q, :] * r) * gamma * 1e-6 ) params = { "version": version, "dimension": dimension, "num_shots": num_shots, "num_samples_per_shot": num_samples_per_shot, } if version >= 4.1: params["FOV"] = fov params["img_size"] = img_size params["min_osf"] = min_osf params["gamma"] = gamma params["recon_tag"] = recon_tag params["TE"] = TE if version >= 4.2: params["timestamp"] = timestamp if normalize_factor is not None: Kmax = img_size / 2 / fov kspace_loc = kspace_loc / Kmax * normalize_factor if pre_skip > 0: if pre_skip >= num_samples_per_shot: raise ValueError( "skip_first_Nsamples should be less than num_adc_samples" ) oversample_factor = num_adc_samples / num_samples_per_shot skip_samples = pre_skip * int(oversample_factor) kspace_loc = kspace_loc[:, skip_samples:] params["num_adc_samples"] = num_adc_samples - skip_samples return kspace_loc, params
[docs] def read_arbgrad_rawdat( filename: str, removeOS: bool = False, doAverage: bool = True, squeeze: bool = True, slice_num: int | None = None, contrast_num: int | None = None, pre_skip: int = 0, data_type: str = "ARBGRAD_VE11C", ): # pragma: no cover """Read raw data from a Siemens MRI file. Parameters ---------- filename : str The path to the Siemens MRI file. removeOS : bool, optional Whether to remove the oversampling, by default False. doAverage : bool, optional Whether to average the data acquired along NAve dimension, by default True. squeeze : bool, optional Whether to squeeze the dimensions of the data, by default True. slice_num : int, optional The slice to read, by default None. This applies for 2D data. contrast_num: int, optional The contrast to read, by default None. pre_skip : int, optional Number of samples to skip from the start of each shot, by default 0. This is useful when we want to avoid artifacts from ADC switching in UTE sequences. data_type : str, optional The type of data to read, by default 'ARBGRAD_VE11C'. Returns ------- data: ndarray Imported data formatted as n_coils X n_samples X n_slices X n_contrasts hdr: dict Extra information about the data parsed from the twix file Raises ------ ImportError If the mapVBVD module is not available. Notes ----- This function requires the mapVBVD module to be installed. You can install it using the following command: `pip install pymapVBVD` """ data, hdr, twixObj = read_siemens_rawdat( filename=filename, removeOS=removeOS, doAverage=doAverage, squeeze=squeeze, slice_num=slice_num, contrast_num=contrast_num, ) if "ARBGRAD_VE11C" in data_type: hdr["type"] = "ARBGRAD_GRE" hdr["shifts"] = () for s in [6, 7, 8]: shift = twixObj.search_header_for_val( "Phoenix", ("sWiPMemBlock", "adFree", str(s)) ) hdr["shifts"] += (0,) if shift == [] else (shift[0],) hdr["oversampling_factor"] = twixObj.search_header_for_val( "Phoenix", ("sWiPMemBlock", "alFree", "4") )[0] hdr["trajectory_name"] = twixObj.search_header_for_val( "Phoenix", ("sWipMemBlock", "tFree") )[0][1:-1] if hdr["n_contrasts"] > 1: hdr["turboFactor"] = twixObj.search_header_for_val( "Phoenix", ("sFastImaging", "lTurboFactor") )[0] hdr["type"] = "ARBGRAD_MP2RAGE" if pre_skip > 0: samples_to_skip = int(hdr["oversampling_factor"] * pre_skip) if samples_to_skip >= hdr["n_adc_samples"]: raise ValueError( "Samples to skip should be less than n_samples in the data" ) data = data[:, :, samples_to_skip:] hdr["n_adc_samples"] -= samples_to_skip return data, hdr