# -*- coding: utf-8 -*-
import itertools
from dipy.io.stateful_tractogram import StatefulTractogram
from dipy.tracking.metrics import length
from dipy.tracking.streamline import set_number_of_points
from dipy.tracking.vox2track import _streamlines_in_mask
from nibabel.affines import apply_affine
from scipy.ndimage import (map_coordinates, generate_binary_structure,
binary_dilation)
import numpy as np
from scilpy.tractanalysis.streamlines_metrics import compute_tract_counts_map
[docs]
def streamlines_in_mask(sft, target_mask, all_in=False):
"""
Parameters
----------
sft : StatefulTractogram
StatefulTractogram containing the streamlines to segment.
target_mask : numpy.ndarray
Binary mask in which the streamlines should pass.
Returns
-------
ids : list
Ids of the streamlines passing through the mask.
"""
sft.to_vox()
sft.to_corner()
# Copy-Paste from Dipy to get indices
if all_in:
target_mask = np.array(target_mask, dtype=bool, copy=True)
target_mask = np.invert(target_mask)
tractogram_mask = compute_tract_counts_map(sft.streamlines,
target_mask.shape)
tractogram_mask[tractogram_mask > 0] = 1
tmp_mask = tractogram_mask.astype(
np.uint8)*target_mask.astype(np.uint8)
streamlines_case = _streamlines_in_mask(list(sft.streamlines),
tmp_mask,
np.eye(3), [0, 0, 0])
return np.where(streamlines_case == [0, 1][False])[0].tolist()
else:
target_mask = np.array(target_mask, dtype=np.uint8, copy=True)
streamlines_case = _streamlines_in_mask(list(sft.streamlines),
target_mask,
np.eye(3), [0, 0, 0])
return np.where(streamlines_case == [0, 1][True])[0].tolist()
[docs]
def filter_grid_roi_both(sft, mask_1, mask_2):
""" Filters streamlines with one end in a mask and the other in
another mask.
Parameters
----------
sft : StatefulTractogram
StatefulTractogram containing the streamlines to segment.
mask_1: numpy.ndarray
Binary mask in which the streamlines should start or end.
mask_2: numpy.ndarray
Binary mask in which the streamlines should start or end.
Returns
-------
new_sft: StatefulTractogram
Filtered sft.
ids: list
Ids of the streamlines passing through the mask.
"""
sft.to_vox()
sft.to_corner()
streamline_vox = sft.streamlines
# For endpoint filtering, we need to keep 2 separately
# Could be faster for either end, but the code look cleaner like this
voxel_beg = np.asarray([s[0] for s in streamline_vox],
dtype=np.int16).transpose(1, 0)
voxel_end = np.asarray([s[-1] for s in streamline_vox],
dtype=np.int16).transpose(1, 0)
map1_beg = map_coordinates(mask_1, voxel_beg, order=0, mode='nearest')
map2_beg = map_coordinates(mask_2, voxel_beg, order=0, mode='nearest')
map1_end = map_coordinates(mask_1, voxel_end, order=0, mode='nearest')
map2_end = map_coordinates(mask_2, voxel_end, order=0, mode='nearest')
line_based_indices = np.logical_or(
np.logical_and(map1_beg, map2_end), np.logical_and(map1_end, map2_beg))
line_based_indices = \
np.arange(len(line_based_indices))[line_based_indices].astype(np.int32)
# From indices to sft
streamlines = sft.streamlines[line_based_indices]
data_per_streamline = sft.data_per_streamline[line_based_indices]
data_per_point = sft.data_per_point[line_based_indices]
new_sft = StatefulTractogram.from_sft(
streamlines, sft,
data_per_streamline=data_per_streamline,
data_per_point=data_per_point)
return new_sft, line_based_indices
[docs]
def filter_grid_roi(sft, mask, filter_type, is_exclude, filter_distance=0):
"""
Parameters
----------
sft : StatefulTractogram
StatefulTractogram containing the streamlines to segment.
mask : numpy.ndarray
Binary mask in which the streamlines should pass.
filter_type: str
One of the 4 following choices, 'any', 'all', 'either_end', 'both_ends'.
is_exclude: bool
Value to indicate if the ROI is an AND (false) or a NOT (true).
Returns
-------
new_sft: StatefulTractogram
Filtered sft.
ids: list
Ids of the streamlines passing through the mask.
"""
if filter_distance != 0:
bin_struct = generate_binary_structure(3, 2)
mask = binary_dilation(mask, bin_struct,
iterations=filter_distance)
line_based_indices = []
if filter_type in ['any', 'all']:
line_based_indices = streamlines_in_mask(sft, mask,
all_in=filter_type == 'all')
else:
sft.to_vox()
sft.to_corner()
streamline_vox = sft.streamlines
# For endpoint filtering, we need to keep 2 separately
# Could be faster for either end, but the code look cleaner like this
line_based_indices_1 = []
line_based_indices_2 = []
for i, line_vox in enumerate(streamline_vox):
voxel_1 = line_vox[0].astype(np.int16)[:, None]
voxel_2 = line_vox[-1].astype(np.int16)[:, None]
if map_coordinates(mask, voxel_1, order=0, mode='nearest'):
line_based_indices_1.append(i)
if map_coordinates(mask, voxel_2, order=0, mode='nearest'):
line_based_indices_2.append(i)
# Both endpoints need to be in the mask (AND)
if filter_type == 'both_ends':
line_based_indices = np.intersect1d(line_based_indices_1,
line_based_indices_2)
# Only one endpoint need to be in the mask (OR)
elif filter_type == 'either_end':
line_based_indices = np.union1d(line_based_indices_1,
line_based_indices_2)
# If the 'exclude' option is used, the selection is inverted
if is_exclude:
line_based_indices = np.setdiff1d(range(len(sft)),
np.unique(line_based_indices))
line_based_indices = np.asarray(line_based_indices, dtype=np.int32)
# From indices to sft
streamlines = sft.streamlines[line_based_indices]
data_per_streamline = sft.data_per_streamline[line_based_indices]
data_per_point = sft.data_per_point[line_based_indices]
new_sft = StatefulTractogram.from_sft(
streamlines, sft,
data_per_streamline=data_per_streamline,
data_per_point=data_per_point)
return new_sft, line_based_indices
[docs]
def pre_filtering_for_geometrical_shape(sft, size,
center, filter_type,
is_in_vox):
"""
Parameters
----------
sft : StatefulTractogram
StatefulTractogram containing the streamlines to segment.
size : numpy.ndarray (3)
Size in mm, x/y/z of the ROI.
center: numpy.ndarray (3)
Center x/y/z of the ROI.
filter_type: str
One of the 3 following choices, 'any', 'all', 'either_end', 'both_ends'.
is_in_vox: bool
Value to indicate if the ROI is in voxel space.
Returns
-------
ids : tuple
Filtered sft.
Ids of the streamlines passing through the mask.
"""
transfo, dim, _, _ = sft.space_attributes
inv_transfo = np.linalg.inv(transfo)
# Create relevant info about the ellipsoid in vox/world space
if is_in_vox:
center = np.asarray(apply_affine(transfo, center), dtype=float)
bottom_corner = center - size
top_corner = center + size
x_val = [bottom_corner[0], top_corner[0]]
y_val = [bottom_corner[1], top_corner[1]]
z_val = [bottom_corner[2], top_corner[2]]
corner_world = list(itertools.product(x_val, y_val, z_val))
corner_vox = apply_affine(inv_transfo, corner_world)
# Since the filtering using a grid is so fast, we pre-filter
# using a BB around the ellipsoid
min_corner = np.min(corner_vox, axis=0) - 1.0
max_corner = np.max(corner_vox, axis=0) + 1.5
pre_mask = np.zeros(dim)
min_x, max_x = int(max(min_corner[0], 0)), int(min(max_corner[0], dim[0]))
min_y, max_y = int(max(min_corner[1], 0)), int(min(max_corner[1], dim[1]))
min_z, max_z = int(max(min_corner[2], 0)), int(min(max_corner[2], dim[2]))
pre_mask[min_x:max_x, min_y:max_y, min_z:max_z] = 1
return filter_grid_roi(sft, pre_mask, filter_type, False)
[docs]
def filter_ellipsoid(sft, ellipsoid_radius, ellipsoid_center,
filter_type, is_exclude, is_in_vox=False):
"""
Parameters
----------
sft : StatefulTractogram
StatefulTractogram containing the streamlines to segment.
ellipsoid_radius : numpy.ndarray (3)
Size in mm, x/y/z of the ellipsoid.
ellipsoid_center: numpy.ndarray (3)
Center x/y/z of the ellipsoid.
filter_type: str
One of the 3 following choices, 'any', 'all', 'either_end', 'both_ends'.
is_exclude: bool
Value to indicate if the ROI is an AND (false) or a NOT (true).
is_in_vox: bool
Value to indicate if the ROI is in voxel space.
Returns
-------
ids : tuple
Filtered sft.
Ids of the streamlines passing through the mask.
"""
pre_filtered_sft, pre_filtered_indices = \
pre_filtering_for_geometrical_shape(sft, ellipsoid_radius,
ellipsoid_center, filter_type,
is_in_vox)
pre_filtered_sft.to_rasmm()
pre_filtered_sft.to_center()
pre_filtered_streamlines = pre_filtered_sft.streamlines
transfo, _, res, _ = sft.space_attributes
if is_in_vox:
ellipsoid_center = np.asarray(apply_affine(transfo,
ellipsoid_center),
dtype=float)
selected_by_ellipsoid = []
line_based_indices_1 = []
line_based_indices_2 = []
# This is still point based (but resampled), I had a ton of problems trying
# to use something with intersection, but even if I could do it :
# The result won't be identical to MI-Brain since I am not using the
# vtkPolydata. Also it won't be identical to TrackVis either,
# because TrackVis is point-based for Spherical ROI...
ellipsoid_radius = np.asarray(ellipsoid_radius, dtype=float)
ellipsoid_center = np.asarray(ellipsoid_center, dtype=float)
for i, line in enumerate(pre_filtered_streamlines):
if filter_type in ['any', 'all']:
# Resample to 1/10 of the voxel size
nb_points = max(int(length(line) / np.average(res) * 10), 2)
line = set_number_of_points(line, nb_points)
points_in_ellipsoid = np.sum(
((line - ellipsoid_center) / ellipsoid_radius) ** 2,
axis=1)
if filter_type == 'any' \
and np.argwhere(points_in_ellipsoid <= 1).any():
# If at least one point was in the ellipsoid
selected_by_ellipsoid.append(pre_filtered_indices[i])
elif filter_type == 'all' \
and len(np.argwhere(points_in_ellipsoid <= 1)) == len(line):
# If all points were in the ellipsoid
selected_by_ellipsoid.append(pre_filtered_indices[i])
else:
points_in_ellipsoid = np.sum(
((line[0] - ellipsoid_center) / ellipsoid_radius) ** 2)
if points_in_ellipsoid <= 1.0:
line_based_indices_1.append(pre_filtered_indices[i])
points_in_ellipsoid = np.sum(
((line[-1] - ellipsoid_center) / ellipsoid_radius) ** 2)
if points_in_ellipsoid <= 1.0:
line_based_indices_2.append(pre_filtered_indices[i])
# Both endpoints need to be in the mask (AND)
if filter_type == 'both_ends':
selected_by_ellipsoid = np.intersect1d(line_based_indices_1,
line_based_indices_2)
# Only one endpoint needs to be in the mask (OR)
elif filter_type == 'either_end':
selected_by_ellipsoid = np.union1d(line_based_indices_1,
line_based_indices_2)
# If the 'exclude' option is used, the selection is inverted
if is_exclude:
selected_by_ellipsoid = np.setdiff1d(range(len(sft)),
np.unique(selected_by_ellipsoid))
line_based_indices = np.asarray(selected_by_ellipsoid, dtype=np.int32)
# From indices to sft
streamlines = sft.streamlines[line_based_indices]
data_per_streamline = sft.data_per_streamline[line_based_indices]
data_per_point = sft.data_per_point[line_based_indices]
new_sft = StatefulTractogram.from_sft(
streamlines, sft,
data_per_streamline=data_per_streamline,
data_per_point=data_per_point)
return new_sft, line_based_indices
[docs]
def filter_cuboid(sft, cuboid_radius, cuboid_center,
filter_type, is_exclude):
"""
Parameters
----------
sft : StatefulTractogram
StatefulTractogram containing the streamlines to segment.
cuboid_radius : numpy.ndarray (3)
Size in mm, x/y/z of the cuboid.
cuboid_center: numpy.ndarray (3)
Center x/y/z of the cuboid.
filter_type: str
One of the 3 following choices, 'any', 'all', 'either_end', 'both_ends'.
is_exclude: bool
Value to indicate if the ROI is an AND (false) or a NOT (true).
is_in_vox: bool
Value to indicate if the ROI is in voxel space.
Returns
-------
ids : tuple
Filtered sft.
Ids of the streamlines passing through the mask.
"""
pre_filtered_sft, pre_filtered_indices = \
pre_filtering_for_geometrical_shape(sft, cuboid_radius,
cuboid_center, filter_type,
False)
pre_filtered_sft.to_rasmm()
pre_filtered_sft.to_center()
pre_filtered_streamlines = pre_filtered_sft.streamlines
_, _, res, _ = sft.space_attributes
selected_by_cuboid = []
line_based_indices_1 = []
line_based_indices_2 = []
# Also here I am not using a mathematical intersection and
# I am not using vtkPolyData like in MI-Brain, so not exactly the same
cuboid_radius = np.asarray(cuboid_radius, dtype=float)
cuboid_center = np.asarray(cuboid_center, dtype=float)
for i, line in enumerate(pre_filtered_streamlines):
if filter_type in ['any', 'all']:
# Resample to 1/10 of the voxel size
nb_points = max(int(length(line) / np.average(res) * 10), 2)
line = set_number_of_points(line, nb_points)
points_in_cuboid = np.abs(line - cuboid_center) / cuboid_radius
points_in_cuboid = np.sum(np.where(points_in_cuboid <= 1, 1, 0),
axis=1)
if filter_type == 'any' \
and np.argwhere(points_in_cuboid == 3).any():
# If at least one point was in the cuboid in x/y/z
selected_by_cuboid.append(pre_filtered_indices[i])
elif filter_type == 'all' \
and len(np.argwhere(points_in_cuboid == 3)) == len(line):
# If all points were in the cuboid in x/y/z
selected_by_cuboid.append(pre_filtered_indices[i])
else:
# Faster to do it twice than trying to do in using an array of 2
points_in_cuboid = np.abs(line[0] - cuboid_center) / cuboid_radius
points_in_cuboid = np.sum(np.where(points_in_cuboid <= 1, 1, 0))
if points_in_cuboid == 3:
line_based_indices_1.append(pre_filtered_indices[i])
points_in_cuboid = np.abs(line[-1] - cuboid_center) / cuboid_radius
points_in_cuboid = np.sum(np.where(points_in_cuboid <= 1, 1, 0))
if points_in_cuboid == 3:
line_based_indices_2.append(pre_filtered_indices[i])
# Both endpoints need to be in the mask (AND)
if filter_type == 'both_ends':
selected_by_cuboid = np.intersect1d(line_based_indices_1,
line_based_indices_2)
# Only one endpoint need to be in the mask (OR)
elif filter_type == 'either_end':
selected_by_cuboid = np.union1d(line_based_indices_1,
line_based_indices_2)
# If the 'exclude' option is used, the selection is inverted
if is_exclude:
selected_by_cuboid = np.setdiff1d(range(len(sft)),
np.unique(selected_by_cuboid))
line_based_indices = np.asarray(selected_by_cuboid, dtype=np.int32)
# From indices to sft
streamlines = sft.streamlines[line_based_indices]
data_per_streamline = sft.data_per_streamline[line_based_indices]
data_per_point = sft.data_per_point[line_based_indices]
new_sft = StatefulTractogram.from_sft(
streamlines, sft,
data_per_streamline=data_per_streamline,
data_per_point=data_per_point)
return new_sft, line_based_indices