Source code for scilpy.tractograms.streamline_operations

# -*- coding: utf-8 -*-
import copy
import logging
from multiprocessing import Pool

import numpy as np
import scipy.ndimage as ndi
from dipy.io.stateful_tractogram import StatefulTractogram
from dipy.segment.clustering import qbx_and_merge
from dipy.tracking import metrics as tm
from dipy.tracking.streamlinespeed import (compress_streamlines,
                                           length,
                                           set_number_of_points)
from scipy.interpolate import splev, splprep
from scipy.spatial.transform import Rotation


def _get_streamline_pt_index(points_to_index, vox_index, from_start=True):
    """Get the index of the streamline point in the voxel.

    Parameters
    ----------
    points_to_index: np.ndarray
        The indices of the voxels in the streamline's voxel grid.
    vox_index: int
        The index of the voxel in the voxel grid.
    from_start: bool
        If True, will return the first streamline point in the voxel.
        If False, will return the last streamline point in the voxel.

    Returns
    -------
    index: int or None
        The index of the streamline point in the voxel.
        If None, there is no streamline point in the voxel.
    """

    cur_idx = np.where(points_to_index == vox_index)

    if not len(cur_idx[0]):
        return None

    if from_start:
        idx_to_take = 0
    else:
        idx_to_take = -1

    return cur_idx[0][idx_to_take]


def _get_point_on_line(first_point, second_point, vox_lower_corner):
    """ Get the point on a line that is in a voxel.

    To manage the case where there is no real streamline point in an
    intersected voxel, we need to generate an artificial point.
    We use line / cube intersections as presented in
    Physically Based Rendering, Second edition, pp. 192-195
    Some simplifications are made since we are sure that an intersection
    exists (else this function would not have been called).

    Arguments
    ---------
    first_point: np.ndarray
        The first point of the line.
    second_point: np.ndarray
        The second point of the line.
    vox_lower_corner: np.ndarray
        The lower corner coordinates of the voxel.

    Returns
    -------
    intersection_point: np.ndarray
        The point on the line that is in the voxel.
    """

    ray = second_point - first_point
    ray /= np.linalg.norm(ray)

    corners = np.array([vox_lower_corner, vox_lower_corner + 1])

    t0 = 0
    t1 = np.inf

    for i in range(3):
        if ray[i] != 0.:
            inv_ray = 1. / ray[i]
            v0 = (corners[0, i] - first_point[i]) * inv_ray
            v1 = (corners[1, i] - first_point[i]) * inv_ray
            t0 = max(t0, min(v0, v1))
            t1 = min(t1, max(v0, v1))

    return first_point + ray * (t0 + t1) / 2.


def _get_next_real_point(points_to_index, vox_index):
    """ Return the index after the point in points_to_index is smaller,
    presuming the indices are sorted.

    ~~~ HERE BE DRAGONS, modify with care. ~~~

    Although np.searchsorted may seem like a better option, it is not
    appropriate here since we need to return the first index encountered that
    fits our condition, regardless if there is a better option later.

    Parameters
    ----------
    points_to_index: list
        The indices to search in.
    vox_index: int
        The index to search for.
    """

    map_idx, next_point = -1, -1
    nb_points_to_index = len(points_to_index)
    internal_vox_index = vox_index
    pts_to_index_view = points_to_index

    while map_idx < internal_vox_index and next_point < nb_points_to_index - 1:
        next_point += 1
        map_idx = pts_to_index_view[next_point]

    return next_point


def _get_previous_real_point(points_to_index, vox_index):
    """ Return the index before the point in points_to_index is larger,
    presuming the indices are sorted.

    ~~~ HERE BE DRAGONS, modify with care. ~~~

    Although np.searchsorted may seem like a better option, it is not
    appropriate here since we need to return the first index encountered that
    fits our condition, regardless if there is a better option later.

    Parameters
    ----------
    points_to_index: list
        The indices to search in.
    vox_index: int
        The index to search for.
    """

    previous_point = len(points_to_index)
    internal_vox_index = vox_index

    map_idx = internal_vox_index + 1
    pts_to_index_view = points_to_index

    while map_idx > internal_vox_index and previous_point > 0:
        previous_point -= 1
        map_idx = pts_to_index_view[previous_point]

    return previous_point


[docs] def get_angles(sft, degrees=True, add_zeros=False): """ Returns the angle between each segment of the streamlines. Parameters ---------- sft: StatefulTractogram The tractogram, with N streamlines. degrees: bool If True, returns angles in degree. Else, in radian. add_zeros: bool For a streamline of length M, there are M-1 segments, and M-2 angles. If add_zeros is set to True, a 0 angle is added at both ends of the returned values, to get M values. Returns ------- angles: list[np.ndarray] List of N numpy arrays. The angles per streamline, in degree. """ angles = [] for i in range(len(sft.streamlines)): dirs = np.diff(sft.streamlines[i], axis=0) dirs /= np.linalg.norm(dirs, axis=-1, keepdims=True) cos_angles = np.sum(dirs[:-1, :] * dirs[1:, :], axis=1) # Resolve numerical instability cos_angles = np.minimum(np.maximum(-1.0, cos_angles), 1.0) line_angles = list(np.arccos(cos_angles)) if add_zeros: line_angles = [0.0] + line_angles + [0.0] if degrees: line_angles = np.rad2deg(line_angles) angles.append(line_angles) return angles
[docs] def get_streamlines_as_linspaces(sft): """ For each streamline, returns a list of M values ranging between 0 and 1, where M is the number of points in the streamline. This gives the position of each coordinate per respect to the streamline's length. Parameters ---------- sft: StatefulTractogram The tractogram that contains the list of streamlines to be colored Returns ------- positions: list[list] For each streamline, the linear distribution of its length. """ positions = [] for s in sft.streamlines: positions.append(list(np.linspace(0, 1, len(s)))) return positions
[docs] def compress_sft(sft, tol_error=0.01): """ Compress a stateful tractogram. Uses Dipy's compress_streamlines, but deals with space better. Dipy's description: The compression consists in merging consecutive segments that are nearly collinear. The merging is achieved by removing the point the two segments have in common. The linearization process [Presseau15] ensures that every point being removed are within a certain margin (in mm) of the resulting streamline. Recommendations for setting this margin can be found in [Presseau15] (in which they called it tolerance error). The compression also ensures that two consecutive points won't be too far from each other (precisely less or equal than *max_segment_length* mm). This is a tradeoff to speed up the linearization process [Rheault15]. A low value will result in a faster linearization but low compression, whereas a high value will result in a slower linearization but high compression. [Presseau C. et al., A new compression format for fiber tracking datasets, NeuroImage, no 109, 73-83, 2015.] Parameters ---------- sft: StatefulTractogram The sft to compress. tol_error: float (optional) Tolerance error in mm (default: 0.01). A rule of thumb is to set it to 0.01mm for deterministic streamlines and 0.1mm for probabilitic streamlines. Returns ------- compressed_sft: StatefulTractogram """ # Go to world space orig_space = sft.space sft.to_rasmm() # Compress streamlines compressed_streamlines = compress_streamlines(sft.streamlines, tol_error=tol_error) if sft.data_per_point is not None and sft.data_per_point.keys(): logging.warning("Initial StatefulTractogram contained data_per_point. " "This information will not be carried in the final " "tractogram.") compressed_sft = StatefulTractogram.from_sft( compressed_streamlines, sft, data_per_streamline=sft.data_per_streamline) # Return to original space compressed_sft.to_space(orig_space) return compressed_sft
[docs] def cut_invalid_streamlines(sft, epsilon=0.001): """ Cut streamlines so their longest segment are within the bounding box. This function keeps the data_per_point and data_per_streamline. Parameters ---------- sft: StatefulTractogram The sft to remove invalid points from. epsilon: float Error allowed when verifying the bounding box. Returns ------- new_sft : StatefulTractogram New object with the invalid points removed from each streamline. cutting_counter : int Number of streamlines that were cut. """ if not len(sft): return sft, 0 # Keep track of the streamlines' original space/origin space = sft.space origin = sft.origin sft.to_vox() sft.to_corner() copy_sft = copy.deepcopy(sft) indices_to_cut, _ = copy_sft.remove_invalid_streamlines() new_streamlines = [] new_data_per_point = {} new_data_per_streamline = {} for key in sft.data_per_point.keys(): new_data_per_point[key] = [] for key in sft.data_per_streamline.keys(): new_data_per_streamline[key] = [] cutting_counter = 0 for ind in range(len(sft.streamlines)): if ind in indices_to_cut: in_vol = np.logical_and( sft.streamlines[ind] >= epsilon, sft.streamlines[ind] < sft.dimensions - epsilon).all(axis=1) # Get segments in the streamline that are within the volume using # ndi.label blobs, _ = ndi.label(in_vol) # Get the largest blob bins = np.bincount(blobs.ravel())[1:] if len(bins) > 0: largest_blob = np.argmax(bins) + 1 # Get the indices of the points in the largest blob ind_in_vol = np.where(blobs == largest_blob)[0] else: logging.warning('Streamline entirely out of the volume.') continue # Get the streamline segment that is within the volume new_streamline = sft.streamlines[ind][ind_in_vol] new_streamlines.append(new_streamline) for key in sft.data_per_streamline.keys(): new_data_per_streamline[key].append( sft.data_per_streamline[key][ind]) for key in sft.data_per_point.keys(): new_data_per_point[key].append( sft.data_per_point[key][ind][ ind_in_vol]) cutting_counter += 1 else: # No reason to try to cut if all points are within the volume new_streamlines.append(sft.streamlines[ind]) for key in sft.data_per_streamline.keys(): new_data_per_streamline[key].append( sft.data_per_streamline[key][ind]) for key in sft.data_per_point.keys(): new_data_per_point[key].append(sft.data_per_point[key][ind]) new_sft = StatefulTractogram.from_sft( new_streamlines, sft, data_per_streamline=new_data_per_streamline, data_per_point=new_data_per_point) # Move the streamlines back to the original space/origin sft.to_space(space) sft.to_origin(origin) new_sft.to_space(space) new_sft.to_origin(origin) return new_sft, cutting_counter
[docs] def filter_streamlines_by_nb_points(sft, min_nb_points=2): """ Remove streamlines from a StatefulTractogram with fewer nb_points. Parameters ---------- sft: StatefulTractogram The sft to remove single point streamlines from. min_nb_points: int Minimum number of point a streamline needs to have to kept. Returns ------- new_sft : StatefulTractogram New object with the single point streamlines removed. """ if min_nb_points <= 1: raise ValueError("The value of min_nb_points " "should be greater than 1!") indices = [i for i in range(len(sft)) if len(sft.streamlines[i]) > min_nb_points - 1] if len(indices): new_sft = sft[indices] else: new_sft = StatefulTractogram.from_sft([], sft) return new_sft
[docs] def remove_overlapping_points_streamlines(sft, threshold=0.001): """ Remove overlapping points from streamlines in a StatefulTractogram, i.e. points forming a segment of length < threshold in a given streamline. Parameters ---------- sft: StatefulTractogram The sft to remove overlapping points from. threshold: float (optional) Maximum distance between two points to be considered overlapping. Default: 0.001 mm. Returns ------- new_sft : StatefulTractogram New object with the overlapping points removed from each streamline. """ new_streamlines = [] for streamline in sft.streamlines: norm = np.linalg.norm(np.diff(streamline, axis=0), axis=1) indices = np.where(norm < threshold)[0] if len(indices) == 0: new_streamlines.append(streamline.tolist()) else: new_streamline = np.delete(streamline.tolist(), indices, axis=0) new_streamlines.append(new_streamline) new_sft = StatefulTractogram.from_sft(new_streamlines, sft) return new_sft
[docs] def filter_streamlines_by_length(sft, min_length=0., max_length=np.inf, return_rejected=False): """ Filter streamlines using minimum and max length. Parameters ---------- sft: StatefulTractogram SFT containing the streamlines to filter. min_length: float Minimum length of streamlines, in mm. max_length: float Maximum length of streamlines, in mm. return_rejected: bool If true, also returns the sft of rejected streamlines. Return ------ filtered_sft : StatefulTractogram A tractogram without short/long streamlines. valid_length_ids: list The ids of kept streamlines. rejected_sft: StatefulTractogram The rejected (short/long) streamlines. (If return_rejected) A tractogram without short streamlines. """ # Make sure we are in world space orig_space = sft.space sft.to_rasmm() rejected_sft = None if len(sft.streamlines) > 0: # Compute streamlines lengths lengths = length(sft.streamlines) # Filter lengths valid_length_ids = np.logical_and(lengths >= min_length, lengths <= max_length) filtered_sft = sft[valid_length_ids] else: valid_length_ids = np.array([], dtype=bool) filtered_sft = sft # Return to original space sft.to_space(orig_space) filtered_sft.to_space(orig_space) if return_rejected: rejected_sft = sft[~valid_length_ids] return filtered_sft, valid_length_ids, rejected_sft else: return filtered_sft, valid_length_ids
[docs] def filter_streamlines_by_total_length_per_dim( sft, limits_x, limits_y, limits_z, use_abs, save_rejected): """ Filter streamlines using sum of abs length per dimension. Note: we consider that x, y, z are the coordinates of the streamlines; we do not verify if they are aligned with the brain's orientation. Parameters ---------- sft: StatefulTractogram SFT containing the streamlines to filter. limits_x: [float float] The list of [min, max] for the x coordinates. limits_y: [float float] The list of [min, max] for the y coordinates. limits_z: [float float] The list of [min, max] for the z coordinates. use_abs: bool If True, will use the total of distances in absolute value (ex, coming back on yourself will contribute to the total distance instead of cancelling it). save_rejected: bool If true, also returns the SFT of rejected streamlines. Else, returns None. Return ------ filtered_sft : StatefulTractogram A tractogram of accepted streamlines. ids: list The list of good ids. rejected_sft: StatefulTractogram or None A tractogram of rejected streamlines. """ # Make sure we are in world space orig_space = sft.space sft.to_rasmm() # Compute directions all_dirs = [np.diff(s, axis=0) for s in sft.streamlines] if use_abs: total_per_orientation = np.asarray( [np.sum(np.abs(d), axis=0) for d in all_dirs]) else: # We add the abs on the total length, not on each small movement. total_per_orientation = np.abs(np.asarray( [np.sum(d, axis=0) for d in all_dirs])) logging.info("Total length per orientation is:\n" "Average: x: {:.2f}, y: {:.2f}, z: {:.2f} \n" "Min: x: {:.2f}, y: {:.2f}, z: {:.2f} \n" "Max: x: {:.2f}, y: {:.2f}, z: {:.2f} \n" .format(np.mean(total_per_orientation[:, 0]), np.mean(total_per_orientation[:, 1]), np.mean(total_per_orientation[:, 2]), np.min(total_per_orientation[:, 0]), np.min(total_per_orientation[:, 1]), np.min(total_per_orientation[:, 2]), np.max(total_per_orientation[:, 0]), np.max(total_per_orientation[:, 1]), np.max(total_per_orientation[:, 2]))) # Find good ids mask_good_x = np.logical_and(limits_x[0] < total_per_orientation[:, 0], total_per_orientation[:, 0] < limits_x[1]) mask_good_y = np.logical_and(limits_y[0] < total_per_orientation[:, 1], total_per_orientation[:, 1] < limits_y[1]) mask_good_z = np.logical_and(limits_z[0] < total_per_orientation[:, 2], total_per_orientation[:, 2] < limits_z[1]) mask_good_ids = np.logical_and(mask_good_x, mask_good_y) mask_good_ids = np.logical_and(mask_good_ids, mask_good_z) filtered_sft = sft[mask_good_ids] rejected_sft = None if save_rejected: rejected_sft = sft[~mask_good_ids] # Return to original space filtered_sft.to_space(orig_space) return filtered_sft, np.nonzero(mask_good_ids), rejected_sft
[docs] def resample_streamlines_num_points(sft, num_points): """ Resample streamlines using number of points per streamline Parameters ---------- sft: StatefulTractogram SFT containing the streamlines to subsample. num_points: int Number of points per streamline in the output. Return ------ resampled_sft: StatefulTractogram The resampled streamlines as a sft. """ # Checks if num_points <= 1: raise ValueError("The value of num_points should be greater than 1!") # Resampling lines = set_number_of_points(sft.streamlines, num_points) # Creating sft # CAREFUL. Data_per_point will be lost. resampled_sft = _warn_and_save(lines, sft) return resampled_sft
[docs] def resample_streamlines_step_size(sft, step_size): """ Resample streamlines using a fixed step size. Parameters ---------- sft: StatefulTractogram SFT containing the streamlines to subsample. step_size: float Size of the new steps, in mm. Return ------ resampled_sft: StatefulTractogram The resampled streamlines as a sft. """ # Checks if step_size == 0: raise ValueError("Step size can't be 0!") elif step_size < 0.1: logging.info("The value of your step size seems suspiciously low. " "Please check.") elif step_size > np.max(sft.voxel_sizes): logging.info("The value of your step size seems suspiciously high. " "Please check.") # Make sure we are in world space orig_space = sft.space sft.to_rasmm() # Resampling lengths = length(sft.streamlines) nb_points = np.ceil(lengths / step_size).astype(int) if np.any(nb_points == 1): logging.warning("Some streamlines are shorter than the provided " "step size...") nb_points[nb_points == 1] = 2 resampled_streamlines = [set_number_of_points(s, n) for s, n in zip(sft.streamlines, nb_points)] # Creating sft resampled_sft = _warn_and_save(resampled_streamlines, sft) # Return to original space resampled_sft.to_space(orig_space) resampled_sft.streamlines._data = resampled_sft.streamlines._data.astype( np.float32) return resampled_sft
def _warn_and_save(new_streamlines, sft): """Last step of the two resample functions: Warn that we loose data_per_point, then create resampled SFT.""" if sft.data_per_point is not None and sft.data_per_point.keys(): logging.info("Initial StatefulTractogram contained data_per_point. " "This information will not be carried in the final " "tractogram.") new_sft = StatefulTractogram.from_sft( new_streamlines, sft, data_per_streamline=sft.data_per_streamline) return new_sft
[docs] def smooth_line_gaussian(streamline, sigma): """ Smooths a streamline using a gaussian filter. Enforces the endpoints to remain the same. Parameters ---------- streamline: np.ndarray The streamline to smooth. sigma: float The sigma of the gaussian filter. Returns ------- smoothed_streamline: np.ndarray The smoothed streamline. """ if sigma < 0.00001: raise ValueError('Cant have a 0 sigma with gaussian.') if length(streamline) < 1: logging.info('Streamline shorter than 1mm, corner cases possible.') # Smooth each dimension separately x, y, z = streamline.T x3 = ndi.gaussian_filter1d(x, sigma) y3 = ndi.gaussian_filter1d(y, sigma) z3 = ndi.gaussian_filter1d(z, sigma) smoothed_streamline = np.asarray([x3, y3, z3], dtype=float).T # Ensure first and last point remain the same smoothed_streamline[0] = streamline[0] smoothed_streamline[-1] = streamline[-1] return smoothed_streamline
[docs] def smooth_line_spline(streamline, smoothing_parameter, nb_ctrl_points): """ Smooths a streamline using a spline. The number of control points can be specified, but must be at least 3. The final streamline will have the same number of points as the input streamline. Enforces the endpoints to remain the same. Parameters ---------- streamline: np.ndarray The streamline to smooth. smoothing_parameter: float The sigma of the spline. nb_ctrl_points: int The number of control points. Returns ------- smoothed_streamline: np.ndarray The smoothed streamline. """ if smoothing_parameter < 0.00001: raise ValueError('Cant have a 0 sigma with spline.') if length(streamline) < 1: logging.info('Streamline shorter than 1mm, corner cases possible.') if nb_ctrl_points < 3: nb_ctrl_points = 3 initial_nb_of_points = len(streamline) # Resample the streamline to have the desired number of points # which will be used as control points for the spline sampled_streamline = set_number_of_points(streamline, nb_ctrl_points) # Fit the spline using the control points tck, u = splprep(sampled_streamline.T, s=smoothing_parameter) # Evaluate the spline smoothed_streamline = splev(np.linspace(0, 1, initial_nb_of_points), tck) smoothed_streamline = np.squeeze(np.asarray([smoothed_streamline]).T) # Ensure first and last point remain the same smoothed_streamline[0] = streamline[0] smoothed_streamline[-1] = streamline[-1] return smoothed_streamline
[docs] def generate_matched_points(sft): """ Generates an array where each element i is set to the index of the streamline to which it belongs Parameters ----------- sft : StatefulTractogram The stateful tractogram containing the streamlines. Returns -------- matched_points : ndarray An array where each element is set to the index of the streamline to which it belongs """ tmp_len = [len(s) for s in sft.streamlines] total_points = np.sum(tmp_len) offsets = np.insert(np.cumsum(tmp_len), 0, 0) matched_points = np.zeros(total_points, dtype=np.uint64) for i in range(len(offsets) - 1): matched_points[offsets[i]:offsets[i+1]] = i matched_points[offsets[-1]:] = len(offsets) - 1 return matched_points
[docs] def parallel_transport_streamline(streamline, nb_streamlines, radius, rng=None): """ Generate new streamlines by parallel transport of the input streamline. See [0] and [1] for more details. [0]: Hanson, A.J., & Ma, H. (1995). Parallel Transport Approach to Curve Framing. # noqa E501 [1]: TD Essentials: Parallel Transport. https://www.youtube.com/watch?v=5LedteSEgOE Parameters ---------- streamline: ndarray (N, 3) The streamline to transport. nb_streamlines: int The number of streamlines to generate. radius: float The radius of the circle around the original streamline in which the new streamlines will be generated. rng: numpy.random.Generator, optional The random number generator to use. If None, the default numpy random number generator will be used. Returns ------- new_streamlines: list of ndarray (N, 3) The generated streamlines. """ if rng is None: rng = np.random.default_rng(0) # Compute the tangent at each point of the streamline T = np.gradient(streamline, axis=0) # Normalize the tangents T = T / np.linalg.norm(T, axis=1)[:, None] # Placeholder for the normal vector at each point V = np.zeros_like(T) # Set the normal vector at the first point to kind of perpendicular to # the first direction vector V[0] = np.roll(streamline[0] - streamline[1], 1) V[0] = V[0] / np.linalg.norm(V[0]) # For each point for i in range(0, T.shape[0] - 1): # Compute the torsion vector B = np.cross(T[i], T[i+1]) # If the torsion vector is 0, the normal vector does not change if np.linalg.norm(B) < 1e-3: V[i+1] = V[i] # Else, the normal vector is rotated around the torsion vector by # the torsion. else: B = B / np.linalg.norm(B) theta = np.arccos(np.dot(T[i], T[i+1])) # Rotate the vector V[i] around the vector B by theta # radians. rotation = Rotation.from_rotvec(B * theta).as_matrix() V[i+1] = np.dot(rotation, V[i]) # Compute the binormal vector at each point W = np.cross(T, V, axis=1) # Generate the new streamlines # TODO?: This could easily be optimized to avoid the for loop, we have to # see if this becomes a bottleneck. new_streamlines = [] for i in range(nb_streamlines): # Get a random number between -1 and 1 rand_v = rng.uniform(-1, 1) rand_w = rng.uniform(-1, 1) # Compute the norm of the "displacement" norm = np.sqrt(rand_v**2 + rand_w**2) # Displace the normal and binormal vectors by a random amount V_mod = V * rand_v W_mod = W * rand_w # Compute the displacement vector VW = (V_mod + W_mod) # Displace the streamline around the original one following the # parallel frame. Make sure to normalize the displacement vector # so that the new streamline is in a circle around the original one. new_s = streamline + (rng.uniform(0, 1) * VW / norm) * radius new_streamlines.append(new_s) return new_streamlines
[docs] def remove_loops(streamlines, max_angle, num_processes=1): """ Remove loops from a list of streamlines. Parameters ---------- streamlines: list or ndarray The list of streamlines from which to remove loops and sharp turns. max_angle: float Maximal winding angle a streamline can have before being classified as a loop (in degree). num_processes : int Split the calculation to a pool of children processes. Returns ------- ids: list Ids of the streamlines with no loop. streamlines_clean: list or ndarray The remaining streamlines. """ pool = Pool(num_processes) windings = pool.map(tm.winding, streamlines) pool.close() streamlines_clean = streamlines[np.array(windings) < max_angle] ids = list(np.where(np.array(windings) < max_angle)[0]) return ids, streamlines_clean
[docs] def remove_sharp_turns_qb(streamlines, qb_threshold=15.0, qb_seed=0): """ Remove sharp turns from a list of streamlines. Should only be used on bundled streamlines, not on whole-brain tractograms. Parameters ---------- streamlines: list or ndarray The list of streamlines from which to remove loops and sharp turns. qb_threshold: float The Quickbundles distance threshold. qb_seed: int Seed to initialize randomness in QuickBundles Returns ------- ids: list Ids of good streamlines. """ ids = [] if len(streamlines) > 1: curvature = [] rng = np.random.RandomState(qb_seed) clusters = qbx_and_merge(streamlines, [40, 30, 20, qb_threshold], rng=rng, verbose=False) for cc in clusters.centroids: curvature.append(tm.mean_curvature(cc)) mean_curvature = sum(curvature) / len(curvature) for i in range(len(clusters.centroids)): if tm.mean_curvature(clusters.centroids[i]) <= mean_curvature: ids.extend(clusters[i].indices) else: logging.info("Impossible to remove sharp turns using Quickbundles " "because the tractogram does not contain at least 2 " "streamlines.") ids = [0] return ids
[docs] def remove_loops_and_sharp_turns(streamlines, max_angle, qb_threshold=None, qb_seed=0, num_processes=1): """ Remove loops and sharp turns from a list of streamlines. Parameters ---------- streamlines: list or ndarray The list of streamlines from which to remove loops and sharp turns. max_angle: float Maximal winding angle a streamline can have before being classified as a loop (in degree) qb_threshold: float, optional If not None, do the additional QuickBundles pass. This will help remove sharp turns. Should only be used on bundled streamlines, not on whole-brain tractograms. If set, value is the Quickbundles distance threshold. Suggested default: 15. qb_seed: int Seed to initialize randomness in QuickBundles num_processes : int Split the calculation to a pool of children processes. Returns ------- ids: list Ids of good streamlines. Only the ids are returned so proper filtering can be done afterwards. """ ids, streamlines_clean = remove_loops(streamlines, max_angle, num_processes) if qb_threshold is not None: ids = remove_sharp_turns_qb(streamlines_clean, qb_threshold, qb_seed) return ids
[docs] def remove_streamlines_with_overlapping_points(sft, eps=0.001): """ Remove streamlines with overlapping points (step size < eps) Parameters ---------- sft: StatefulTractogram The tractogram. eps: float The minimal step size. Returns ------- sft: StatefulTractogram """ indices = [] for i in range(len(sft)): norm = np.linalg.norm( np.gradient(sft.streamlines[i], axis=0), axis=1) if (norm < eps).any(): # or len(sft.streamlines[i]) <= 1: indices.append(i) indices = np.setdiff1d(range(len(sft)), indices).astype(np.uint32) sft = sft[indices] return sft
[docs] def get_streamlines_bounding_box(streamlines): """ Classify inliers and outliers from a list of streamlines. Parameters ---------- streamlines: list of ndarray The list of streamlines from which inliers and outliers are separated. Returns ------- tuple: Minimum and maximum corner coordinate of the streamlines bounding box """ box_min = np.array([np.inf, np.inf, np.inf]) box_max = -np.array([np.inf, np.inf, np.inf]) for s in streamlines: box_min = np.minimum(box_min, np.min(s, axis=0)) box_max = np.maximum(box_max, np.max(s, axis=0)) return box_min, box_max
[docs] def get_streamlines_as_fixed_array(streamlines): """ Obtain streamlines as a fixed array of shape (nbr of streamlines, max streamline length, 3). Useful for optimization with code precompiling. (See Numba) Parameters ---------- streamlines: list The list of streamlines to convert into a fixed length array Return ------ streamlines_fixed: ndarray Streamlines as a fixed length array, padded with 0. lengths: ndarray Single dimensional array of all the streamline lengths. """ lengths = [len(streamline) for streamline in streamlines] streamlines_fixed = np.zeros((len(streamlines), max(lengths), 3)) for i, f in enumerate(streamlines_fixed): for j, c in enumerate(streamlines[i]): f[j] = c return streamlines_fixed, np.array(lengths)
[docs] def find_seed_indexes_on_streamlines(seeds, streamlines, atol=1.e-8): """ Given a list of seeds and a corresponding list of streamlines, finds the index of each seed on its respective streamline. Parameters ---------- seeds: list List of seeds to locate on streamlines streamlines: list List of streamlines produced from seeds atol: float Absolute tolerance of the comparison between a seed and each of the streamline coordinates. Return ------ seed_indexes: list A list containing the index of each seed on its streamline. """ seed_indexes = [] for seed, streamline in zip(seeds, streamlines): seed_index = -1 for i, point in enumerate(streamline): if np.allclose(point, seed, rtol=0, atol=atol): seed_index = i break if seed_index == -1: raise ValueError('A seed coordinate was not found on streamline.') seed_indexes.append(seed_index) return np.array(seed_indexes, dtype=np.uint32)