# -*- coding: utf-8 -*-
from concurrent.futures import ProcessPoolExecutor, as_completed
from copy import deepcopy, copy
import logging
import warnings
from dipy.data import get_sphere
from dipy.reconst.shm import sh_to_sf_matrix
from dipy.segment.clustering import qbx_and_merge
from dipy.segment.fss import FastStreamlineSearch
from dipy.tracking.distances import bundles_distances_mdf
import numpy as np
from numpy.random import RandomState
from scipy.spatial import cKDTree
from scipy.spatial.distance import directed_hausdorff
from sklearn.metrics import cohen_kappa_score
from sklearn.neighbors import KDTree
from tqdm import tqdm
from scilpy.tractanalysis.streamlines_metrics import compute_tract_counts_map
from scilpy.tractanalysis.todi import TrackOrientationDensityImaging, \
get_sh_from_todi
from scilpy.tractograms.streamline_operations import generate_matched_points
from scilpy.tractograms.tractogram_operations import (difference_robust,
intersection_robust,
union_robust)
from scilpy.image.volume_operations import (normalize_metric, merge_metrics)
from scilpy.image.volume_math import neighborhood_correlation_
[docs]
def binary_classification(segmentation_indices,
gold_standard_indices,
original_count,
mask_count=0):
"""
Compute all the binary classification measures using only indices from
a dataset and its ground truth in any representation (voxels
or streamlines).
Parameters
----------
segmentation_indices: list of int
Indices of the data that are part of the segmentation.
gold_standard_indices: list of int
Indices of the ground truth.
original_count: int
Total size of the original dataset (before segmentation),
e.g len(streamlines) or np.prod(mask.shape).
mask_count: int
Number of non-zeros voxels in the original dataset.
Needed in order to get a valid true positive count for the voxel
representation.
Returns
-------
A tuple containing
- float: Value between 0 and 1 that represent the spatial aggrement
between both bundles.
- list of ndarray: intersection_robust of streamlines in both bundle
- list of ndarray: union_robust of streamlines in both bundle
"""
tp = len(np.intersect1d(segmentation_indices, gold_standard_indices))
fp = len(np.setdiff1d(segmentation_indices, gold_standard_indices))
fn = len(np.setdiff1d(gold_standard_indices, segmentation_indices))
tn = len(np.setdiff1d(
range(original_count), np.union1d(
segmentation_indices, gold_standard_indices)))
if mask_count > 0:
tn = tn - original_count + mask_count
# Extreme that is not covered, all indices are in the gold standard
# and the segmentation indices got them 100% right
if tp == 0:
sensitivity = 0
specificity = 0
precision = 0
accuracy = 0
dice = 0
kappa = 0
youden = -1
else:
sensitivity = tp / float(tp + fn)
if np.isclose(tn + fp, 0):
specificity = 1
else:
specificity = tn / float(tn + fp)
precision = tp / float(tp + fp)
accuracy = (tp + tn) / float(tp + fp + fn + tn)
dice = 2 * tp / float(2 * tp + fp + fn)
seg_arr = np.zeros((original_count,))
gs_arr = np.zeros((original_count,))
seg_arr[segmentation_indices] = 1
gs_arr[gold_standard_indices] = 1
# To make sure the amount of indices within the WM mask is consistent
if mask_count > 0:
empty_indices = np.where(seg_arr + gs_arr < 1)[0]
indices_to_removes = original_count - mask_count
seg_arr = np.delete(seg_arr, empty_indices[0:indices_to_removes])
gs_arr = np.delete(gs_arr, empty_indices[0:indices_to_removes])
kappa = cohen_kappa_score(seg_arr, gs_arr)
youden = sensitivity + specificity - 1
return sensitivity, specificity, precision, accuracy, dice, kappa, youden
[docs]
def approximate_surface_node(roi):
"""
Compute the number of surface voxels (i.e. nodes connected to at least one
zero-valued neighboring voxel)
Parameters
----------
roi: ndarray
A ndarray where voxel values represent the density of a bundle and
it is binarized.
Returns
-------
int: the number of surface voxels
"""
ind = np.argwhere(roi > 0)
tree = KDTree(ind)
count = np.sum(7 - tree.query_radius(ind, r=1.0,
count_only=True))
return count
[docs]
def compute_fractal_dimension(density, n_steps=10, box_size_min=1.0,
box_size_max=2.0, threshold=0.0, box_size=None):
"""
Compute the fractal dimension of a bundle to measure the roughness.
The code is extracted from https://github.com/FBK-NILab/fractal_dimension
Parameters. The result is dependent on voxel size and the number of voxels.
If data comparison is performed, the bundles MUST be in same resolution.
Parameters
----------
density: ndarray
A ndarray where voxel values represent the density of a bundle. This
function computes the fractal dimension of the bundle.
n_steps: int
The number of box sizes used to approximate fractal dimension. A larger
number of steps will increase the accuracy of the approximation, but
will also take more time. The default number of boxes sizes is 10.
box_size_min: float
The minimum size of boxes. This number should be larger than or equal
to 1.0 and is defaulted to 1.0.
box_size_max: float
The maximum size of boxes. This number should be larger than the
minimum size of boxes.
threshold: float
The threshold to filter the voxels in the density array. The default is
set to 0, so only nonzero voxels will be considered.
box_size: ndarray
A ndarray of different sizes of boxes in a linear space in an ascending
order.
Returns
-------
float: fractal dimension of a bundle
"""
pixels = np.array(np.where(density > threshold)).T
if box_size is None:
box_size = np.linspace(box_size_min, box_size_max, n_steps)
counts = np.zeros(len(box_size))
for i, bs in enumerate(box_size):
bins = \
[np.arange(0, image_side + bs, bs) for image_side in density.shape]
H, edges = np.histogramdd(pixels, bins=bins)
counts[i] = (H > 0).sum()
if (counts < 1).any():
fractal_dimension = 0.0
else:
# linear regression:
coefficients = np.polyfit(np.log(box_size), np.log(counts), 1)
fractal_dimension = -coefficients[0]
return fractal_dimension
[docs]
def compute_bundle_adjacency_streamlines(bundle_1, bundle_2, non_overlap=False,
centroids_1=None, centroids_2=None):
"""
Compute the distance in millimeters between two bundles. Uses centroids
to limit computation time. Each centroid of the first bundle is matched
to the nearest centroid of the second bundle and vice-versa.
Distance between matched paired is averaged for the final results.
References
----------
[1] Garyfallidis et al. Robust and efficient linear
registration of white-matter fascicles in the space of streamlines,
Neuroimage, 2015.
Parameters
----------
bundle_1: list of ndarray
First set of streamlines.
bundle_2: list of ndarray
Second set of streamlines.
non_overlap: bool
Exclude overlapping streamlines from the computation.
centroids_1: list of ndarray
Pre-computed centroids for the first bundle.
centroids_2: list of ndarray
Pre-computed centroids for the second bundle.
Returns
-------
float: Distance in millimeters between both bundles.
"""
if not bundle_1 or not bundle_2:
return -1
thresholds = [32, 24, 12, 6]
# Intialize the clusters
if centroids_1 is None:
centroids_1 = qbx_and_merge(bundle_1, thresholds, rng=RandomState(0),
verbose=False).centroids
if centroids_2 is None:
centroids_2 = qbx_and_merge(bundle_2, thresholds, rng=RandomState(0),
verbose=False).centroids
if non_overlap:
non_overlap_1, _ = difference_robust([bundle_1, bundle_2])
non_overlap_2, _ = difference_robust([bundle_2, bundle_1])
if non_overlap_1:
non_overlap_centroids_1 = qbx_and_merge(non_overlap_1, thresholds,
rng=RandomState(0),
verbose=False).centroids
distance_matrix_1 = bundles_distances_mdf(non_overlap_centroids_1,
centroids_2)
min_b1 = np.min(distance_matrix_1, axis=0)
distance_b1 = np.average(min_b1)
else:
distance_b1 = 0
if non_overlap_2:
non_overlap_centroids_2 = qbx_and_merge(non_overlap_2, thresholds,
rng=RandomState(0),
verbose=False).centroids
distance_matrix_2 = bundles_distances_mdf(centroids_1,
non_overlap_centroids_2)
min_b2 = np.min(distance_matrix_2, axis=1)
distance_b2 = np.average(min_b2)
else:
distance_b2 = 0
else:
distance_matrix = bundles_distances_mdf(centroids_1, centroids_2)
min_b1 = np.min(distance_matrix, axis=0)
min_b2 = np.min(distance_matrix, axis=1)
distance_b1 = np.average(min_b1)
distance_b2 = np.average(min_b2)
return (distance_b1 + distance_b2) / 2.0
[docs]
def compute_bundle_adjacency_voxel(binary_1, binary_2, non_overlap=False):
"""
Compute the distance in millimeters between two bundles in the voxel
representation. Convert the bundles to binary masks. Each voxel of the
first bundle is matched to the the nearest voxel of the second bundle and
vice-versa.
Distance between matched paired is averaged for the final results.
Parameters
----------
binary_1: ndarray
Binary mask computed from the first bundle
binary_2: ndarray
Binary mask computed from the second bundle
non_overlap: bool
Exclude overlapping voxels from the computation.
Returns
-------
float: Distance in millimeters between both bundles.
"""
if isinstance(binary_1, np.ndarray) and isinstance(binary_2, np.ndarray) \
and binary_1.ndim == 3 and binary_2.ndim == 3:
b1_ind = np.argwhere(binary_1 > 0)
b2_ind = np.argwhere(binary_2 > 0)
else:
b1_ind = binary_1
b2_ind = binary_2
b1_tree = cKDTree(b1_ind)
b2_tree = cKDTree(b2_ind)
distance_1, _ = b1_tree.query(b2_ind)
distance_2, _ = b2_tree.query(b1_ind)
if non_overlap:
non_zeros_1 = np.nonzero(distance_1)
non_zeros_2 = np.nonzero(distance_2)
if not non_zeros_1[0].size == 0:
distance_b1 = np.mean(distance_1[non_zeros_1])
else:
distance_b1 = 0
if not non_zeros_2[0].size == 0:
distance_b2 = np.mean(distance_2[non_zeros_2])
else:
distance_b2 = 0
else:
distance_b1 = np.mean(distance_1)
distance_b2 = np.mean(distance_2)
return (distance_b1 + distance_b2) / 2.0
[docs]
def compute_dice_voxel(density_1, density_2):
"""
Compute the overlap (dice coefficient) between two
density maps (or binary).
Parameters
----------
density_1: ndarray
Density (or binary) map computed from the first bundle
density_2: ndarray
Density (or binary) map computed from the second bundle
Returns
-------
A tuple containing:
- float: Value between 0 and 1 that represent the spatial aggrement
between both bundles.
- float: Value between 0 and 1 that represent the spatial aggrement
between both bundles, weighted by streamlines density.
"""
overlap_idx = np.nonzero(density_1 * density_2)
numerator = 2 * len(overlap_idx[0])
denominator = np.count_nonzero(density_1) + np.count_nonzero(density_2)
if denominator > 0:
dice = numerator / float(denominator)
else:
dice = np.nan
overlap_1 = density_1[overlap_idx]
overlap_2 = density_2[overlap_idx]
w_dice = np.sum(overlap_1) + np.sum(overlap_2)
denominator = np.sum(density_1) + np.sum(density_2)
if denominator > 0:
w_dice /= denominator
else:
w_dice = np.nan
return dice, w_dice
[docs]
def compute_hausdorff_voxel(map_1, map_2):
"""
Compute the Hausdorff distance between two sets of coordinates,
representing density or binary maps.
Parameters
----------
map_1: ndarray
Density (or binary) map computed from the first bundle
map_2: ndarray
Density (or binary) map computed from the second bundle
Returns
-------
distance: float
Value representing the Hausdorff distance in voxels. Should
be multiplied afterwards by the voxel size to get the distance in mm.
"""
coords_1 = np.argwhere(map_1)
coords_2 = np.argwhere(map_2)
dist, _, _ = directed_hausdorff(coords_1, coords_2)
return dist
[docs]
def compute_correlation(density_1, density_2):
"""
Compute the overlap (dice coefficient) between two density
maps (or binary). Correlation being less robust to extreme
case (no overlap, identical array), a lot of check a needed to prevent NaN.
Parameters
----------
density_1: ndarray
Density (or binary) map computed from the first bundle
density_2: ndarray
Density (or binary) map computed from the second bundle
Returns
-------
float: Value between 0 and 1 that represent the spatial aggrement
between both bundles taking into account density.
"""
indices = np.where(density_1 + density_2 > 0)
if np.array_equal(density_1, density_2):
density_correlation = 1
elif (np.sum(density_1) > 0 and np.sum(density_2) > 0) \
and np.count_nonzero(density_1 * density_2):
density_correlation = np.corrcoef(density_1[indices],
density_2[indices])[0, 1]
else:
density_correlation = 0
return max(0, density_correlation)
[docs]
def compute_dice_streamlines(bundle_1, bundle_2):
"""
Compute the overlap (dice coefficient) between two bundles.
Both bundles need to come from the exact same tractogram.
Parameters
----------
bundle_1: list of ndarray
First set of streamlines.
bundle_2: list of ndarray
Second set of streamlines.
Returns
-------
A tuple containing
- float: Value between 0 and 1 that represent the spatial aggrement
between both bundles.
- list of ndarray: intersection_robust of streamlines in both bundle
- list of ndarray: union_robust of streamlines in both bundle
"""
streamlines_intersect, _ = intersection_robust([bundle_1, bundle_2])
streamlines_union_robust, _ = union_robust([bundle_1, bundle_2])
numerator = 2 * len(streamlines_intersect)
denominator = len(bundle_1) + len(bundle_2)
if denominator > 0:
dice = numerator / float(denominator)
else:
dice = np.nan
return dice, streamlines_intersect, streamlines_union_robust
def _compute_difference_for_voxel(chunk_indices,
skip_streamlines_distance=False):
"""
Compute the difference between two sets of streamlines for a given voxel.
This function uses global variable to avoid duplicating the data for each
chunk of voxels.
Use the function tractogram_pairwise_comparison() as an entry point.
To differentiate empty voxels from voxels with no data, the function
returns NaN if no data is found.
Parameters
----------
chunk_indices: list
List of indices of the voxel to process.
skip_streamlines_distance: bool
If true, skip the computation of the distance between streamlines.
Returns
-------
results: list
List of the computed differences in the same order as the input voxels.
"""
global sft_1, sft_2, matched_points_1, matched_points_2, tree_1, tree_2, \
sh_data_1, sh_data_2
results = []
for vox_ind in chunk_indices:
vox_ind = tuple(vox_ind)
global B
has_data = sh_data_1[vox_ind].any() and sh_data_2[vox_ind].any()
if has_data:
sf_1 = np.dot(sh_data_1[vox_ind], B)
sf_2 = np.dot(sh_data_2[vox_ind], B)
acc = np.corrcoef(sf_1, sf_2)[0, 1]
else:
acc = np.nan
if skip_streamlines_distance:
results.append([np.nan, acc])
continue
# Get the streamlines in the first SFT neighborhood (i.e., 1.5mm away)
pts_ind_1 = tree_1.query_ball_point(vox_ind, 1.5)
if not pts_ind_1:
results.append([np.nan, acc])
continue
strs_ind_1 = np.unique(matched_points_1[pts_ind_1])
neighb_streamlines_1 = sft_1.streamlines[strs_ind_1]
# Get the streamlines in the second SFT neighborhood (i.e., 1.5mm away)
pts_ind_2 = tree_2.query_ball_point(vox_ind, 1.5)
if not pts_ind_2:
results.append([np.nan, acc])
continue
strs_ind_2 = np.unique(matched_points_2[pts_ind_2])
neighb_streamlines_2 = sft_2.streamlines[strs_ind_2]
# Using neighb_streamlines (all streamlines in the neighborhood of our
# voxel), we can compute the distance between the two sets of
# streamlines using FSS (FastStreamlineSearch).
with warnings.catch_warnings(record=True) as _:
fss = FastStreamlineSearch(neighb_streamlines_1, 10, resampling=12)
dist_mat = fss.radius_search(neighb_streamlines_2, 10)
sparse_dist_mat = np.abs(dist_mat.tocsr()).toarray()
sparse_ma_dist_mat = np.ma.masked_where(sparse_dist_mat < 1e-3,
sparse_dist_mat)
sparse_ma_dist_vec = np.squeeze(np.min(sparse_ma_dist_mat,
axis=0))
# dists will represent the average distance between the two sets of
# streamlines in the neighborhood of the voxel.
dist = np.average(sparse_ma_dist_vec)
results.append([dist, acc])
return results
def _compare_tractogram_wrapper(mask, nbr_cpu, skip_streamlines_distance):
"""
Wrapper for the comparison of two tractograms. This function uses
multiprocessing to compute the difference between two sets of streamlines
for each voxel.
This function simply calls the function _compute_difference_for_voxel(),
which expect chunks of indices to process and use global variables to avoid
duplicating the data for each chunk of voxels.
Use the function tractogram_pairwise_comparison() as an entry point.
Parameters
----------
mask: np.ndarray
Mask of the data to compare.
nbr_cpu: int
Number of CPU to use.
skip_streamlines_distance: bool
If true, skip the computation of the distance between streamlines.
Returns
-------
diff_data: np.ndarray
Array containing the computed differences (mm).
acc_data: np.ndarray
Array containing the computed angular correlation.
"""
dimensions = mask.shape
# Initialize multiprocessing
indices = np.argwhere(mask > 0)
diff_data = np.zeros(dimensions)
diff_data[:] = np.nan
acc_data = np.zeros(dimensions)
acc_data[:] = np.nan
def chunked_indices(indices, chunk_size=1000):
"""Yield successive chunk_size chunks from indices."""
for i in range(0, len(indices), chunk_size):
yield indices[i:i + chunk_size]
# Initialize tqdm progress bar
progress_bar = tqdm(total=len(indices))
# Create chunks of indices
np.random.shuffle(indices)
index_chunks = list(chunked_indices(indices))
with ProcessPoolExecutor(max_workers=nbr_cpu) as executor:
futures = {executor.submit(
_compute_difference_for_voxel, chunk,
skip_streamlines_distance): chunk for chunk in index_chunks}
for future in as_completed(futures):
chunk = futures[future]
try:
results = future.result()
except Exception as exc:
print(f'Generated an exception: {exc}')
else:
results = np.array(results)
diff_data[tuple(chunk.T)] = results[:, 0]
acc_data[tuple(chunk.T)] = results[:, 1]
# Update tqdm progress bar
progress_bar.update(len(chunk))
return diff_data, acc_data
[docs]
def tractogram_pairwise_comparison(sft_one, sft_two, mask, nbr_cpu=1,
skip_streamlines_distance=True):
"""
Compute the difference between two sets of streamlines for each voxel in
the mask. This function uses multiprocessing to compute the difference
between two sets of streamlines for each voxel.
Parameters
----------
sft_one: StatefulTractogram
First tractogram to compare.
sft_two: StatefulTractogram
Second tractogram to compare.
mask: np.ndarray
Mask of the data to compare (optional).
nbr_cpu: int
Number of CPU to use (default: 1).
skip_streamlines_distance: bool
If true, skip the computation of the distance between streamlines.
(default: True)
Returns
-------
acc_norm: np.ndarray
Angular correlation coefficient.
corr_norm: np.ndarray
Correlation coefficient of density maps.
diff_norm: np.ndarray
Voxelwise distance between sets of streamlines.
heatmap: np.ndarray
Merged heatmap of the three metrics using harmonic mean.
mask: np.ndarray
Final mask. Intersection of given mask (if any) and density masks of
both tractograms.
"""
global sft_1, sft_2
sft_1, sft_2 = sft_one, sft_two
sft_1.to_vox()
sft_2.to_vox()
sft_1.streamlines._data = sft_1.streamlines._data.astype(np.float16)
sft_2.streamlines._data = sft_2.streamlines._data.astype(np.float16)
dimensions = tuple(sft_1.dimensions)
global matched_points_1, matched_points_2
matched_points_1 = generate_matched_points(sft_1)
matched_points_2 = generate_matched_points(sft_2)
logging.info('Computing KDTree...')
global tree_1, tree_2
tree_1 = cKDTree(sft_1.streamlines._data)
tree_2 = cKDTree(sft_2.streamlines._data)
# Limits computation to mask AND streamlines (using density)
if mask is None:
mask = np.ones(dimensions)
logging.info('Computing density maps...')
sft_1.to_corner()
sft_2.to_corner()
density_1 = compute_tract_counts_map(sft_1.streamlines,
dimensions).astype(float)
density_2 = compute_tract_counts_map(sft_2.streamlines,
dimensions).astype(float)
mask = density_1 * density_2 * mask
mask[mask > 0] = 1
# Stop now if no overlap
if np.count_nonzero(mask) == 0:
logging.info("Bundles not overlapping! Not computing metrics.")
acc_data = np.zeros(mask.shape) * np.nan
corr_data = acc_data.copy()
diff_data_norm = acc_data.copy()
heatmap = acc_data.copy()
return acc_data, corr_data, diff_data_norm, heatmap, mask
logging.info('Computing correlation map... May be slow')
corr_data = neighborhood_correlation_([density_1, density_2])
corr_data[mask == 0] = np.nan
logging.info('Computing TODI from tractogram #1...')
global sh_data_1, sh_data_2
sh_data_1 = get_sh_from_todi(sft_1, mask)
sft_1.to_center()
logging.info('Computing TODI from tractogram #2...')
sh_data_2 = get_sh_from_todi(sft_2, mask)
sft_2.to_center()
global B
B, _ = sh_to_sf_matrix(get_sphere(name='repulsion724'), 8, 'descoteaux07')
diff_data, acc_data = _compare_tractogram_wrapper(
mask, nbr_cpu, skip_streamlines_distance)
# Normalize metrics and merge into a single heatmap
diff_data_norm = normalize_metric(diff_data, reverse=True)
heatmap = merge_metrics(acc_data, corr_data, diff_data_norm)
return acc_data, corr_data, diff_data_norm, heatmap, mask
[docs]
def compare_volume_wrapper(data_1, data_2, voxel_size=1, ratio=False,
adjency_no_overlap=False, labels_to_mask=False,
one_sided=False):
"""
Compute the similarity between binary mask or labels maps in the voxel
representation.
This function conviently computes the similarity between two volumes using
different metrics. The function returns a dictionary containing the
computed measures.
Similar to compare_bundle_wrapper (but just for Nifti volumes). If coming
from `scil_volume_pairwise_comparison` with '--single_compare', data_1 is
a candidate volume and data_2 is the reference.
Parameters
----------
data_1: ndarray
First volume to compare.
data_2: ndarray
Second volume to compare.
voxel_size: float
Size of the voxel in mm.
ratio: bool
If true, the measures are normalized by the total number of voxels
in the first volume.
adjency_no_overlap: bool
If true, exclude overlapping voxels (0mm) from the computation.
labels_to_mask: bool
If true, explicitely compare every labels in the first image to
the ROI in the second image. Otherwise, the computation presumes
both images are either label maps or binary masks.
one_sided: bool
If true, compute the measures presuming the second image is a
"ground truth".
Returns
-------
dict: Dictionary containing the computed measures.
"""
# Exclude 0 (background)
unique_values_1 = np.unique(data_1)[1:]
unique_values_2 = np.unique(data_2)[1:]
union_values = np.union1d(unique_values_1, unique_values_2)
measures_name = ['adjacency_voxels',
'dice_voxels',
'hausdorff',
'volume_overlap',
'volume_overreach']
dict_measures = {}
for val in union_values:
binary_1 = np.zeros(data_1.shape, dtype=np.uint8)
binary_1[data_1 == val] = 1
binary_2 = np.zeros(data_2.shape, dtype=np.uint8)
if labels_to_mask:
binary_2[data_2 == unique_values_2[0]] = 1
else:
binary_2[data_2 == val] = 1
if np.count_nonzero(binary_1) == 0 or np.count_nonzero(binary_2) == 0:
tmp_dict = dict.fromkeys(measures_name, np.nan)
for measure_name in measures_name:
if measure_name not in dict_measures:
dict_measures[measure_name] = {}
dict_measures[measure_name].update(
{int(val): tmp_dict[measure_name]})
continue
# These measures are in mm^3
volume_overlap = np.count_nonzero(binary_1 * binary_2)
# If comparison is performed against a reference volume
if one_sided:
# we compute the overreach wrt the second
# volume. The one-sided overreach is the number of
# voxels in the first volume that are not in the second
diff = (binary_1 - (binary_1 * binary_2)) > 0
volume_overreach = np.count_nonzero(diff)
# Otherwise, we compute the overreach between both volumes
# ie the number of voxels in either volume but not in both
else:
volume_overreach = np.abs(np.count_nonzero(
binary_1 + binary_2) - volume_overlap)
if ratio:
if one_sided:
count = np.count_nonzero(binary_2) # wrt reference
else:
count = np.count_nonzero(binary_1) # wrt first volume
if count > 0:
volume_overlap /= count
volume_overreach /= count
else:
volume_overreach = 0
volume_overreach = 0
# These measures are in mm
bundle_adjacency_voxel = compute_bundle_adjacency_voxel(
binary_1, binary_2,
non_overlap=adjency_no_overlap)
# These measures are between 0 and 1
dice_vox, _ = compute_dice_voxel(binary_1,
binary_2)
hausdorff_vox = compute_hausdorff_voxel(binary_1, binary_2)
# If computing ratio, voxel size does not make sense
if ratio:
voxel_size = 1.
measures = [bundle_adjacency_voxel,
dice_vox, hausdorff_vox * voxel_size,
volume_overlap * voxel_size,
volume_overreach * voxel_size]
curr_dict = dict(zip(measures_name, measures))
for measure_name, measure in curr_dict.items():
if measure_name not in dict_measures:
dict_measures[measure_name] = {}
dict_measures[measure_name].update({int(val): float(measure)})
return dict_measures
[docs]
def compare_bundle_wrapper(density_1, density_2, endpoints_density_1,
endpoints_density_2, bundle_1, bundle_2,
centroids_1=None, centroids_2=None, voxel_size=1,
ratio=False, streamline_dice=False,
disable_streamline_distance=False,
bundle_adjency_no_overlap=False):
"""
Compute the similarity between two bundles in the voxel representation and
streamline representation.
This function conviently computes the similarity between two bundles using
different metrics. The function returns a dictionary containing the
computed measures.
Density and endpoints density maps, and centroids are pre-computed to
speed up the computation (the script using this function does multiple
comparisons with the same data).
Similar to compare_volume_wrapper (but for data from streamlines).
Parameters
----------
density_1: ndarray
Density map computed from the first bundle.
density_2: ndarray
Density map computed from the second bundle.
endpoints_density_1: ndarray
Density map computed from the endpoints of the first bundle.
endpoints_density_2: ndarray
Density map computed from the endpoints of the second bundle.
bundle_1: list of ndarray
First set of streamlines.
bundle_2: list of ndarray
Second set of streamlines.
centroids_1: list of ndarray
Pre-computed centroids for the first bundle.
centroids_2: list of ndarray
Pre-computed centroids for the second bundle.
voxel_size: float
Size of the voxel in mm.
ratio: bool
If true, the measures are normalized by the total number of voxels
in the first volume.
streamline_dice: bool
If true, compute the dice coefficient between the two sets of
streamlines.
disable_streamline_distance: bool
If true, skip the computation of the distance between streamlines
bundle_adjency_no_overlap: bool
If true, exclude overlapping voxels (0mm) from the computation.
Returns
-------
dict: Dictionary containing the computed measures
"""
# These measures are in mm^3
binary_1 = copy(density_1)
binary_1[binary_1 > 0] = 1
binary_2 = copy(density_2)
binary_2[binary_2 > 0] = 1
volume_overlap = np.count_nonzero(binary_1 * binary_2)
volume_overlap_endpoints = np.count_nonzero(
endpoints_density_1 * endpoints_density_2)
volume_overreach = np.abs(np.count_nonzero(
binary_1 + binary_2) - volume_overlap)
volume_overreach_endpoints = np.abs(np.count_nonzero(
endpoints_density_1 + endpoints_density_2) - volume_overlap_endpoints)
if ratio:
count = np.count_nonzero(binary_1)
volume_overlap /= count
volume_overlap_endpoints /= count
volume_overreach /= count
volume_overreach_endpoints /= count
# These measures are in mm
bundle_adjacency_voxel = compute_bundle_adjacency_voxel(
density_1, density_2,
non_overlap=bundle_adjency_no_overlap)
if streamline_dice and not disable_streamline_distance:
bundle_adjacency_streamlines = \
compute_bundle_adjacency_streamlines(
bundle_1, bundle_2,
non_overlap=bundle_adjency_no_overlap)
elif not disable_streamline_distance:
bundle_adjacency_streamlines = \
compute_bundle_adjacency_streamlines(
bundle_1, bundle_2,
centroids_1=centroids_1,
centroids_2=centroids_2,
non_overlap=bundle_adjency_no_overlap)
# These measures are between 0 and 1
dice_vox, w_dice_vox = compute_dice_voxel(density_1, density_2)
dice_vox_endpoints, w_dice_vox_endpoints = compute_dice_voxel(
endpoints_density_1,
endpoints_density_2)
density_correlation = compute_correlation(density_1, density_2)
density_correlation_endpoints = compute_correlation(endpoints_density_1,
endpoints_density_2)
measures_name = ['bundle_adjacency_voxels',
'dice_voxels', 'w_dice_voxels',
'volume_overlap',
'volume_overreach',
'dice_voxels_endpoints',
'w_dice_voxels_endpoints',
'volume_overlap_endpoints',
'volume_overreach_endpoints',
'density_correlation',
'density_correlation_endpoints']
# If computing ratio, voxel size does not make sense
if ratio:
voxel_size = 1.
measures = [bundle_adjacency_voxel,
dice_vox, w_dice_vox,
volume_overlap * voxel_size,
volume_overreach * voxel_size,
dice_vox_endpoints,
w_dice_vox_endpoints,
volume_overlap_endpoints * voxel_size,
volume_overreach_endpoints * voxel_size,
density_correlation,
density_correlation_endpoints]
if not disable_streamline_distance:
measures_name += ['bundle_adjacency_streamlines']
measures += [bundle_adjacency_streamlines]
# Only when the tractograms are exactly the same
if streamline_dice:
dice_streamlines, streamlines_intersect, streamlines_union = \
compute_dice_streamlines(bundle_1, bundle_2)
streamlines_count_overlap = len(streamlines_intersect)
streamlines_count_overreach = len(
streamlines_union) - len(streamlines_intersect)
measures_name += ['dice_streamlines',
'streamlines_count_overlap',
'streamlines_count_overreach']
measures += [dice_streamlines,
streamlines_count_overlap,
streamlines_count_overreach]
return dict(zip(measures_name, measures))