Source code for mrinufft.operators.interfaces.bart
"""Interface for the BART NUFFT.BART uses a command line interfaces, and read/writes data to files.The file format is described here: https://bart-doc.readthedocs.io/en/latest/data.html#non-cartesian-datasets"""importosimportsubprocessassubpimporttempfilefrompathlibimportPathfrommrinufft._utilsimportproper_trajectoryfrommrinufft.operators.baseimportFourierOperatorCPUimportnumpyasnpfrommrinufft.io.cflimporttraj2cfl,_writecfl,_readcfl# available if return code is 0try:BART_AVAILABLE=notsubp.call(["which","bart"],stdout=subp.DEVNULL,stderr=subp.DEVNULL)exceptException:BART_AVAILABLE=False
[docs]classRawBartNUFFT:"""Wrapper around BART NUFFT CLI."""def__init__(self,samples,shape,extra_op_args=None,extra_adj_op_args=None):self.samples=samples# To normalize and send to fileself.shape=shapeself.shape_str=":".join([str(s)forsinshape])self.shape_str+=":1"iflen(shape)==2else""self._op_args=extra_op_argsor[]self._adj_op_args=extra_adj_op_argsor[]self._temp_dir=tempfile.TemporaryDirectory()# Write trajectory to temp filetmp_path=Path(self._temp_dir.name)self._traj_file=tmp_path/"traj"self._ksp_file=tmp_path/"ksp"self._grid_file=tmp_path/"grid"traj2cfl(self.samples,self.shape,self._traj_file)
[docs]def_tmp_file(self):"""Return a temporary file name."""returnos.path.join(self._temp_dir.name,next(tempfile._get_candidate_names()))
def__del__(self):"""Delete also the temporary files."""self._temp_dir.cleanup()
[docs]defop(self,coeffs_data,grid_data):"""Forward Operator."""grid_data_=grid_data.reshape(self.shape)_writecfl(grid_data_,self._grid_file)cmd=["bart","nufft","-d",self.shape_str,*self._op_args,str(self._traj_file),str(self._grid_file),str(self._ksp_file),]try:subp.run(cmd,check=True,capture_output=True)exceptsubp.CalledProcessErrorasexc:msg="Failed to run BART NUFFT\n"msg+=f"error code: {exc.returncode}\n"msg+="cmd: "+" ".join(cmd)+"\n"msg+=f"stdout: {exc.output}\n"msg+=f"stderr: {exc.stderr}"raiseRuntimeError(msg)fromexcksp_raw=_readcfl(self._ksp_file)np.copyto(coeffs_data,ksp_raw)returncoeffs_data
[docs]defadj_op(self,coeffs_data,grid_data):"""Adjoint Operator."""# Format grid data to cfl format, and write to file# Run bart nufft with argument in subprocesscoeffs_=coeffs_data.reshape(len(self.samples))_writecfl(coeffs_[None,...,None,None,None],self._ksp_file)cmd=["bart","nufft","-d",self.shape_str,"-a"if"-i"notinself._adj_op_argselse"",*self._adj_op_args,str(self._traj_file),str(self._ksp_file),str(self._grid_file),]try:subp.run(cmd,check=True,capture_output=True)exceptsubp.CalledProcessErrorasexc:msg="Failed to run BART NUFFT\n"msg+=f"error code: {exc.returncode}\n"msg+="cmd: "+" ".join(cmd)+"\n"msg+=f"stdout: {exc.output}\n"msg+=f"stderr: {exc.stderr}"raiseRuntimeError(msg)fromexcgrid_raw=_readcfl(self._grid_file)np.copyto(grid_data,grid_raw)returngrid_data
[docs]classMRIBartNUFFT(FourierOperatorCPU):"""BART implementation of MRI NUFFT transform."""# TODO override Data consistency function: use toeplizbackend="bart"available=BART_AVAILABLEdef__init__(self,samples,shape,density=False,n_coils=1,n_batchs=1,smaps=None,squeeze_dims=True,**kwargs,):samples_=proper_trajectory(samples,normalize="unit")ifdensityisTrue:density=Falseifgetattr(kwargs,"extra_adj_op_args",None):kwargs["extra_adj_op_args"]+=["-i"]else:kwargs["extra_adj_op_args"]=["-i"]self.raw_op=RawBartNUFFT(samples_,shape,**kwargs)super().__init__(samples_,shape,density,n_coils=n_coils,n_batchs=n_batchs,n_trans=1,smaps=smaps,raw_op=self.raw_op,squeeze_dims=squeeze_dims,)@propertydefnorm_factor(self):"""Normalization factor of the operator."""# return 1.0returnnp.sqrt(2**len(self.shape))