Source code for scilpy.stats.utils

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import csv
import json
import logging
import math
import os

from itertools import combinations
import matplotlib.pyplot as plt
import numpy as np


[docs] class data_for_stat(object): """ Method 'init' in the name will initialise argument of the object Method 'get' in the name return an object generated from the object """ def __init__(self, json_file, participants): # Initialise argument of data_for_stat generated self.data_dictionnary = {} # Initialise dictionnary """ Open the json and tsv file and put the information in a dictionnary """ with open(json_file) as f: json_info = json.load(f) with open(participants) as f: csv_reader = csv.DictReader(f, delimiter=' ') participants_info = [v for v in csv_reader] # Validate the files are compatible self.validation_participant_id(json_info, participants_info) # Construct the data dictionnary for participant in participants_info: self.data_dictionnary.update({ participant['participant_id']: {'bundles': json_info[participant['participant_id']]}}) for variable in participant: if variable != 'participant_id': self.data_dictionnary[participant['participant_id']]\ [variable] = participant[variable] logging.info('Data_dictionnary') logging.info(self.data_dictionnary[self.get_first_participant()]) with open('data.json', 'w') as fp: json.dump(self.data_dictionnary, fp, indent=4)
[docs] def validation_participant_id(self, json_info, participants_info): """ Verify if the json and tsv file has the same participants id """ # Create the list of participants id from the json dictionnary participants_from_json = list(json_info.keys()) logging.info('participant list from json dictionnary:') logging.info(participants_from_json) # Create the list of participants id from the tsv list of dictionnary participants_from_tsv = [] for participant in participants_info: participants_from_tsv.append(participant['participant_id']) logging.info('participant list from tsv file:') logging.info(participants_from_tsv) # Compare the two list participants_from_json.sort() participants_from_tsv.sort() if not participants_from_json == participants_from_tsv: if not len(participants_from_json) == len(participants_from_tsv): logging.info('The number of participants from json file is ' 'not the same as the one in the tsv file.') is_in_tsv = np.in1d(participants_from_json, participants_from_tsv) is_in_json = np.in1d(participants_from_tsv, participants_from_json) logging.info('participants list from json file missing in tsv ' 'file :') logging.info(np.asarray(participants_from_json)[~is_in_tsv]) logging.info('participants list from tsv file missing in json ' 'file :') logging.info(np.asarray(participants_from_tsv)[~is_in_json]) logging.error('The subjects from the json file does not fit ' 'with the subjects of the tsv file. ' 'Impossible to build the data_for_stat object') raise BaseException('The subjects from the json file does not fit ' 'with the subjects of the tsv file. ' 'Impossible to build the data_for_stat object') else: logging.info('The json and the tsv are compatible')
[docs] def get_participants_list(self): # Construct the list of participant_id from the data_dictionnary return list(self.data_dictionnary.keys())
[docs] def get_first_participant(self): # Get the first participant return next(iter(self.data_dictionnary))
[docs] def get_first_bundle(self, participant): # Get the first bundle return next(iter(self.data_dictionnary[participant]['bundles']))
[docs] def get_first_metric(self, participant, bundle): # Get the first metric key return next(iter(self.data_dictionnary[participant]['bundles'][bundle]))
[docs] def get_participant_attributes_list(self): # Construct the list of attribute from data_dictionnary # We take the attributes from first participant # (assume to be consistent across participants) first_participant = self.get_first_participant() attributes_list = list(self.data_dictionnary[first_participant].keys()) attributes_list.remove('bundles') return attributes_list
[docs] def get_bundles_list(self): # Construct the list of bundles_id from data_dictionnary # We take the bundles from first participant # (assume to be consistent across participants) first_participant = self.get_first_participant() return list(self.data_dictionnary[first_participant]['bundles'].keys())
[docs] def get_metrics_list(self): # Construct the list of metrics_id from data_dictionnary # We take the metrics from first participant # (assume to be consistent across participants) first_participant = self.get_first_participant() first_bundle = self.get_first_bundle(first_participant) return list(self.data_dictionnary[first_participant] ['bundles'][first_bundle].keys())
[docs] def get_values_list(self): # Construct the list of values_id from data_dictionnary # We take the values from first participant # (assume to be consistent across participants) first_participant = self.get_first_participant() first_bundle = self.get_first_bundle(first_participant) first_metric = self.get_first_metric(first_participant, first_bundle) return list(self.data_dictionnary[first_participant] ['bundles'] [first_bundle][first_metric].keys())
[docs] def get_groups_dictionnary(self, group_by): """ Parameters ---------- groups_by: string The attribute with which we generate our groups. Returns ------- group_dict: dictionnary of groups keys: group id generated by group_by. values: dictionnary of participants of that specific group. """ group_dict = {} # Verify if the group_by exist if group_by not in self.get_participant_attributes_list(): logging.error('Participants doesn\'t contain the attribute ' '{} necessary to generate groups.'.format(group_by)) raise BaseException('Object data_for_stat has no attirbute ' '{}.'.format(group_by)) # Get the participants separated by group for participant in self.data_dictionnary: curr_group_id = (group_by + '_' + self.data_dictionnary[participant][group_by]) if curr_group_id in group_dict.keys(): group_dict[curr_group_id][participant] = \ self.data_dictionnary[participant] else: group_dict.update({curr_group_id: {participant: self.data_dictionnary[participant]}}) return group_dict
[docs] def get_groups_list(self, group_by): """ Parameters ---------- groups_by: string The attribute with which we generate our groups. Returns ------- group_list: list of string list of group id generated by group_by variable. """ # Generated the list of group generated by group_by variable return list(self.get_groups_dictionnary(group_by).keys())
[docs] def get_data_sample(self, bundle, metric, value): """ Parameters ---------- bundle: string The specific bundle with which we generate our sample. metric: string The specific metric with which we generate our sample. value: string The specific value with which we generate our sample. Returns ------- data_sample: array of float The sample array associate with the parameters. """ data_sample = [] for participant in self.data_dictionnary: data_sample.append(self.data_dictionnary[participant] ['bundles'] [bundle] [metric][value]) return data_sample
[docs] def get_group_data_sample(group_dict, group_id, bundle, metric, value): """ Parameters ---------- group_dict: dictionnary of groups keys: group id generated by group_by. values: dictionnary of participants of that specific group. group_id: string The name of the group with which we generate our sample. bundle: string The specific bundle with which we generate our sample. metric: string The specific metric with which we generate our sample. value: string The specific value with which we generate our sample. Returns ------- data_sample: array of float The sample array associate with the parameters. """ sample_size = len(group_dict[group_id].keys()) data_sample = np.zeros(sample_size) for index, participant in enumerate(group_dict[group_id].keys()): if bundle in group_dict[group_id][participant]['bundles']: # Assure the participants has the bundle in the database if metric in group_dict[group_id][participant]['bundles'][bundle]: # Assure the participants has the metric in the database try: data_sample[index] = group_dict[group_id][participant]\ ['bundles']\ [bundle]\ [metric][value] except KeyError: print('{} is not a valid key.'.format(value)) return data_sample
[docs] def write_current_dictionnary(metric, normality, variance_equality, diff_result, diff_2_by_2): """ Parameters ---------- metric: string The name of the metric in which the group comparison was made on. normality: dictionnary of groups keys: group id values: (result, p-value) variance_equality: (string, bool) The result of the equality of variance test. - 1st dimension: Name of the equal variance test done. - 2nd dimension: Whether or not it equality of variance can be assumed. diff_result: (string, bool, float) The result of the groups difference analysis on the metric. - 1st dimension: Name of the test done. - 2nd dimension: Whether or not we detect a group difference on the metric. - 3rd dimension: p-value result diff_2_by_2: (list of (string, string, bool, float), string) The result of the pairwise groups difference a posteriori analysis. - 1st dimension: Name of the test done. - 2nd dimension: The result of every pairwise combinations of the groups (name of first group, name of second group, result, p-value). Returns ------- curr_dict: dictionnary of test keys: The category of test done (Normality, Homoscedascticity,...) values: The result of those test. """ # First, we create the structure curr_dict = { 'Normality': {'Test': 'Shapiro-Wilk', 'P-value': {}}, 'Homoscedasticity': {'Test': variance_equality[0]}, 'Group difference': {'Test': diff_result[0]} } # Normality for group in normality: if normality[group][0]: curr_dict['Normality']['P-value'][group] = 'Normal ('\ + str(normality[group][1])\ + ')' else: curr_dict['Normality']['P-value'][group] = 'Not Normal ('\ + str(normality[group][1])\ + '*)' # Equality of variance if variance_equality[1]: curr_dict['Homoscedasticity']['P-value'] = 'Equal variance ('\ + str(variance_equality[2])\ + ')' else: curr_dict['Homoscedasticity']['P-value'] = 'Not equal variance ('\ + str(variance_equality[2])\ + '*)' # Main test if diff_result[1] and len(diff_2_by_2) != 0: curr_dict['Group difference']['P-value'] = str(diff_result[2]) + '*' # Pairwise difference curr_dict['Pairwise group difference'] = {'Test': diff_2_by_2[0], 'P-value': {}} for i in range(len(diff_2_by_2[1])): if diff_2_by_2[1][i][0] < diff_2_by_2[1][i][1]: curr_comparison = '{} vs {}'.format(diff_2_by_2[1][i][0], diff_2_by_2[1][i][1]) else: curr_comparison = '{} vs {}'.format(diff_2_by_2[1][i][1], diff_2_by_2[1][i][0]) if diff_2_by_2[1][i][2]: curr_dict['Pairwise group difference']\ ['P-value']\ [curr_comparison] = str(diff_2_by_2[1][i][3]) + '*' else: curr_dict['Pairwise group difference']\ ['P-value']\ [curr_comparison] = str(diff_2_by_2[1][i][3]) elif diff_result[1]: curr_dict['Group difference']['P-value'] = str(diff_result[2]) + '*' else: curr_dict['Group difference']['P-value'] = str(diff_result[2]) return curr_dict
[docs] def write_csv_from_json(writer, json_dict): # Header first = next(iter(json_dict)) nb_group = len(json_dict[first]['Normality']['P-value'].keys()) groups_list = list(json_dict[first]['Normality']['P-value'].keys()) groups_list.sort() n_blank = [''] * nb_group n_pvalue = ['p-value ' + x for x in groups_list] if nb_group > 2: nb_pairwise = math.factorial(nb_group) / (2 * math.factorial(nb_group - 2)) pairwise_list = [] for x, y in combinations(range(nb_group), 2): pairwise_list.append(groups_list[x] + ' vs ' + groups_list[y]) pairwise_list.sort() pd_blank = [''] * nb_pairwise pd_pvalue = ['p-value ' + x for x in pairwise_list] writer.writerow(['Test: ', 'Normality'] + n_blank + ['Homoscedasticity', '', 'Group difference', '', 'Pairwise difference'] + pd_blank) writer.writerow(['Metric', 'Test name'] + n_pvalue + ['Test name', 'p-value', 'Test name', 'p-value', 'Test name'] + pd_pvalue) else: writer.writerow(['Test: ', 'Normality'] + n_blank + ['Homoscedasticity', '', 'Group difference', '']) writer.writerow(['Metric', 'Test name'] + n_pvalue + ['Test name', 'p-value', 'Test name']) # Now the result for every metrics metrics = list(json_dict.keys()) metrics.sort() for i in range(len(metrics)): # Normality curr_pvalue_n = [] for j in range(len(groups_list)): curr_pvalue_n.append(json_dict[metrics[i]] ['Normality'] ['P-value'][groups_list[j]]) curr_n = [json_dict[metrics[i]]['Normality']['Test']] + curr_pvalue_n # Homoscedasticity curr_h = [json_dict[metrics[i]]['Homoscedasticity']['Test'], json_dict[metrics[i]]['Homoscedasticity']['P-value']] # Group difference curr_gd = [json_dict[metrics[i]]['Group difference']['Test'], json_dict[metrics[i]]['Group difference']['P-value']] # Pairwise difference if 'Pairwise group difference' in json_dict[metrics[i]].keys(): curr_pvalue_pd = [] for j in range(len(pairwise_list)): curr_pvalue_pd.append(json_dict[metrics[i]] ['Pairwise group difference'] ['P-value'][pairwise_list[j]]) curr_pd = [json_dict[metrics[i]]['Pairwise group difference'] ['Test']] + curr_pvalue_pd writer.writerow([metrics[i]] + curr_n + curr_h + curr_gd + curr_pd) else: writer.writerow([metrics[i]] + curr_n + curr_h + curr_gd)
[docs] def visualise_distribution(data_by_group, participants_id, bundle, metric, value, oFolder, groups_list): """ Parameters ---------- data_by_group: list of array_like The sample data separated by groups. Possibly of different lengths per group. participants_id: list of string Names of the participants id "name". metric: string The name of the metricment in which you want to look at the across groups. oFolder: path-like object Emplacement in which we want to save the graph of the distribution the measurement across groups. groups_list: list of string The names of each group. Returns ------- outliers: list of (string, string) The list of participants that is considered outlier for their group (participant_id, group_id). """ nb_group = len(data_by_group) outliers = [] fig, ax = plt.subplots() ls = np.asarray(data_by_group, dtype=object) ax.boxplot(ls) boxdict = ax.boxplot(ls) fliers = boxdict['fliers'] # loop over boxes in x direction for j in range(len(fliers)): # the y and x positions of the fliers yfliers = boxdict['fliers'][j].get_ydata() xfliers = boxdict['fliers'][j].get_xdata() # the unique locations of fliers in y ufliers = set(yfliers) # loop over unique fliers for i, uf in enumerate(ufliers): # search subject id curr_group = int(round(xfliers[i])) - 1 subjects_id = np.nonzero(np.isclose(data_by_group[curr_group], yfliers[i]))[0].tolist() tag = "" for e in subjects_id: tag += " " + participants_id[e] + " " outliers.append((participants_id[e], groups_list[curr_group])) # print number of fliers ax.text(1.005 * xfliers[i], 1.002 * uf, tag) # Name axes labels = [item.get_text() for item in ax.get_xticklabels()] for k in range(nb_group): labels[k] = groups_list[k] ax.set_xticklabels(labels) plt.ylabel(metric) plt.title("Distribution of the {} data set in bundle {}.".format(metric, bundle)) save_path = os.path.join(oFolder, 'Graph', bundle, 'plot_' + metric) if not os.path.exists(os.path.dirname(save_path)): try: os.makedirs(os.path.dirname(save_path)) except OSError as exc: if exc.errno != exc.EEXIST: raise fig.savefig(os.path.join(oFolder, 'Graph', bundle, metric)) logging.info('outliers:[(id, group)]') logging.info(outliers) return outliers