Source code for scilpy.dwi.utils

# -*- coding: utf-8 -*-
import logging

from dipy.core.gradients import get_bval_indices
import numpy as np

from scilpy.gradients.bvec_bval_tools import B0ExtractionStrategy
from scilpy.image.utils import volume_iterator


[docs] def extract_dwi_shell(dwi, bvals, bvecs, bvals_to_extract, tol=20, block_size=None): """Extracts the DWI volumes that are on specific b-value shells. Many shells can be extracted at once by specifying multiple b-values. The extracted volumes are in the same order as in the original file. If the b-values of a shell are not all identical, use the --tolerance argument to adjust the accepted interval. For example, a b-value of 2000 and a tolerance of 20 will extract all volumes with a b-values from 1980 to 2020. Files that are too large to be loaded in memory can still be processed by setting the --block-size argument. A block size of X means that X DWI volumes are loaded at a time for processing. Parameters ---------- dwi : nib.Nifti1Image Original multi-shell volume. bvals : ndarray The b-values in FSL format. bvecs : ndarray The b-vectors in FSL format. bvals_to_extract : list of int The list of b-values to extract. tol : int The tolerated gap between the b-values to extract and the actual b-values. block_size : int, optional Load the data using this block size. Useful when the data is too large to be loaded in memory. Returns ------- indices : ndarray Indices of the volumes corresponding to the provided b-values. shell_data : ndarray Volumes corresponding to the provided b-values. output_bvals : ndarray Selected b-values (raw values, not rounded values, even when tol is given). output_bvecs : ndarray Selected b-vectors. """ indices = [get_bval_indices(bvals, shell, tol=tol) for shell in bvals_to_extract] indices = np.unique(np.sort(np.hstack(indices))) if len(indices) == 0: raise ValueError("There are no volumes that have the supplied b-values" ": {}".format(bvals_to_extract)) logging.info( "Extracting shells [{}], with number of images per shell [{}], " "from {} images from {}." .format(" ".join([str(b) for b in bvals_to_extract]), " ".join([str(len(get_bval_indices(bvals, shell, tol=tol))) for shell in bvals_to_extract]), len(bvals), dwi.get_filename())) if block_size is None: block_size = dwi.shape[-1] # Load the shells by iterating through blocks of volumes. This approach # is slower for small files, but allows very big files to be split # with less memory usage. shell_data = np.zeros((dwi.shape[:-1] + (len(indices),))) for vi, data in volume_iterator(dwi, block_size): in_volume = np.array([i in vi for i in indices]) in_data = np.array([i in indices for i in vi]) shell_data[..., in_volume] = data[..., in_data] output_bvals = bvals[indices].astype(int) output_bvecs = bvecs[indices, :] return indices, shell_data, output_bvals, output_bvecs
[docs] def extract_b0(dwi, b0_mask, extract_in_cluster=False, strategy=B0ExtractionStrategy.MEAN, block_size=None): """ Extract a set of b0 volumes from a dwi dataset Parameters ---------- dwi : nib.Nifti1Image Original multi-shell volume. b0_mask: ndarray of bool Mask over the time dimension (4th) identifying b0 volumes. extract_in_cluster: bool Specify to extract b0's in each continuous sets of b0 volumes appearing in the input data. strategy: enum The extraction strategy. Must be one choice available in our enum B0ExtractionStrategy: either select the first b0 found, select them all or average them. When used in conjunction with the extract_in_cluster parameter set to True, the strategy is applied individually on each continuous set found. block_size : int, optional Load the data using this block size. Useful when the data is too large to be loaded in memory. Returns ------- b0_data : ndarray Extracted b0 volumes. 3D with one b0, else 4D. """ indices = np.where(b0_mask)[0] if block_size is None: block_size = dwi.shape[-1] if not extract_in_cluster and strategy == B0ExtractionStrategy.FIRST: idx = np.min(indices) output_b0 = dwi.dataobj[..., idx:idx + 1].squeeze() else: # Generate list of clustered b0 in the data mask = np.ma.masked_array(b0_mask) mask[~b0_mask] = np.ma.masked b0_clusters = np.ma.notmasked_contiguous(mask, axis=0) if extract_in_cluster or strategy == B0ExtractionStrategy.ALL: if strategy == B0ExtractionStrategy.ALL: time_d = len(indices) else: time_d = len(b0_clusters) output_b0 = np.zeros(dwi.shape[:-1] + (time_d,)) for idx, cluster in enumerate(b0_clusters): if strategy == B0ExtractionStrategy.FIRST: data = dwi.dataobj[..., cluster.start:cluster.start + 1] output_b0[..., idx] = data.squeeze() else: vol_it = volume_iterator(dwi, block_size, cluster.start, cluster.stop) for vi, data in vol_it: if strategy == B0ExtractionStrategy.ALL: in_volume = np.array([i in vi for i in indices]) output_b0[..., in_volume] = data elif strategy == B0ExtractionStrategy.MEAN: output_b0[..., idx] += np.sum(data, -1) if strategy == B0ExtractionStrategy.MEAN: output_b0[..., idx] /= cluster.stop - cluster.start else: output_b0 = np.zeros(dwi.shape[:-1]) for cluster in b0_clusters: vol_it = volume_iterator(dwi, block_size, cluster.start, cluster.stop) for _, data in vol_it: output_b0 += np.sum(data, -1) output_b0 /= len(indices) return output_b0