Source code for scilpy.tractanalysis.fixel_density

import itertools
import multiprocessing
import numpy as np

from dipy.io.streamline import load_tractogram
from scilpy.tractanalysis.voxel_boundary_intersection import\
    subdivide_streamlines_at_voxel_faces


def _fixel_density_parallel(args):
    (peaks, max_theta, dps_key, bundle) = args

    return _fixel_density_single_bundle(bundle, peaks, max_theta, dps_key)


def _fixel_density_single_bundle(bundle, peaks, max_theta, dps_key):
    sft = load_tractogram(bundle, 'same')
    sft.to_vox()
    sft.to_corner()

    fixel_density_maps = np.zeros((peaks.shape[:-1]) + (5,))

    min_cos_theta = np.cos(np.radians(max_theta))

    all_split_streamlines =\
        subdivide_streamlines_at_voxel_faces(sft.streamlines)
    for i, split_streamlines in enumerate(all_split_streamlines):
        segments = split_streamlines[1:] - split_streamlines[:-1]
        seg_lengths = np.linalg.norm(segments, axis=1)

        # Remove points where the segment is zero.
        # This removes numpy warnings of division by zero.
        non_zero_lengths = np.nonzero(seg_lengths)[0]
        segments = segments[non_zero_lengths]
        seg_lengths = seg_lengths[non_zero_lengths]

        # Those starting points are used for the segment vox_idx computations
        seg_start = split_streamlines[non_zero_lengths]
        vox_indices = (seg_start + (0.5 * segments)).astype(int)

        normalized_seg = np.reshape(segments / seg_lengths[..., None], (-1, 3))

        weight = 1
        if dps_key:
            weight = sft.data_per_streamline[dps_key][i]

        for vox_idx, seg_dir in zip(vox_indices, normalized_seg):
            vox_idx = tuple(vox_idx)
            peaks_at_idx = peaks[vox_idx].reshape((5, 3))

            cos_theta = np.abs(np.dot(seg_dir.reshape((-1, 3)),
                                      peaks_at_idx.T))

            if (cos_theta > min_cos_theta).any():
                lobe_idx = np.argmax(np.squeeze(cos_theta), axis=0)  # (n_segs)
                fixel_density_maps[vox_idx][lobe_idx] += weight

    return fixel_density_maps


[docs] def fixel_density(peaks, bundles, dps_key=None, max_theta=45, nbr_processes=None): """Compute the fixel density map per bundle. Can use parallel processing. Parameters ---------- peaks : np.ndarray (x, y, z, 15) Five principal fiber orientations for each voxel. bundles : list or np.array (N) List of (N) paths to bundles. dps_key : string, optional Key to the data_per_streamline to use as weight instead of the number of streamlines. max_theta : int, optional Maximum angle between streamline and peak to be associated. nbr_processes : int, optional The number of subprocesses to use. Default: multiprocessing.cpu_count() Returns ------- fixel_density : np.ndarray (x, y, z, 5, N) Density per fixel per bundle. """ nbr_processes = multiprocessing.cpu_count() \ if nbr_processes is None or nbr_processes <= 0 \ else nbr_processes # Separating the case nbr_processes=1 to help get good coverage metrics # (codecov does not deal well with multiprocessing) if nbr_processes == 1: results = [] for b in bundles: results.append( _fixel_density_single_bundle(b, peaks, max_theta, dps_key)) else: pool = multiprocessing.Pool(nbr_processes) results = pool.map(_fixel_density_parallel, zip(itertools.repeat(peaks), itertools.repeat(max_theta), itertools.repeat(dps_key), bundles)) pool.close() pool.join() fixel_density = np.moveaxis(np.asarray(results), 0, -1) return fixel_density
[docs] def fixel_maps_to_masks(maps, abs_thr, rel_thr, norm, nb_bundles): """Compute the fixel density masks from fixel density maps. Parameters ---------- maps : np.ndarray (x, y, z, 5, N) Density per fixel per bundle. abs_thr : float Value of density maps threshold to obtain density masks, in number of streamlines or streamline weighting. rel_thr : float Value of density maps threshold to obtain density masks, as a ratio of the normalized density. Must be between 0 and 1. norm : string, ["fixel", "voxel", "none"] Way of normalizing the density maps. If fixel, will normalize the maps per fixel, in each voxel. If voxel, will normalize the maps per voxel. If none, will not normalize the maps. nb_bundles : int (N) Number of bundles (N). Returns ------- masks : np.ndarray (x, y, z, 5, N) Density masks per fixel per bundle. maps : np.ndarray (x, y, z, 5, N) Normalized density maps per fixel per bundle. """ # Apply a threshold on the number of streamlines masks_abs = maps > abs_thr # Normalizing the density maps per voxel or fixel fixel_sum = np.sum(maps, axis=-1) voxel_sum = np.sum(fixel_sum, axis=-1) # Preventing division by 0 voxel_sum[voxel_sum == 0] = 1 fixel_sum[fixel_sum == 0] = 1 for i in range(nb_bundles): if norm == "voxel": maps[..., 0, i] /= voxel_sum maps[..., 1, i] /= voxel_sum maps[..., 2, i] /= voxel_sum maps[..., 3, i] /= voxel_sum maps[..., 4, i] /= voxel_sum elif norm == "fixel": maps[..., i] /= fixel_sum # Apply a threshold on the normalized density if norm == "voxel" or norm == "fixel": masks_rel = maps > rel_thr else: masks_rel = maps > 0 # Compute the fixel density masks from the rel and abs versions masks = masks_rel * masks_abs return masks.astype(np.uint8), maps