"""Provide statistics functions."""
# pylint: disable=protected-access
import copy as cp
import numpy as np
from . import idtxl_exceptions as ex
from . import idtxl_utils as utils
[docs]def ais_fdr(settings=None, *results):
"""Perform FDR-correction on results of network AIS estimation.
Perform correction of the false discovery rate (FDR) after estimation of
active information storage (AIS) for all processes in the network. FDR
correction is applied by correcting the AIS estimate's omnibus p-values for
individual processes/nodes in the network.
Input can be a list of partial results to combine results from parallel
analysis.
References:
- Genovese, C.R., Lazar, N.A., & Nichols, T. (2002). Thresholding of
statistical maps in functional neuroimaging using the false discovery
rate. Neuroimage, 15(4), 870-878.
Args:
settings : dict [optional]
parameters for statistical testing with entries:
- alpha_fdr : float [optional] - critical alpha level
(default=0.05)
- fdr_constant : int [optional] - choose one of two constants used
for calculating the FDR-thresholds according to Genovese (2002):
1 will divide alpha by 1, 2 will divide alpha by the sum_i(1/i);
see the paper for details on the assumptions (default=2)
results : instances of ResultsSingleProcessAnalysis
results of network AIS estimation, see documentation of
ResultsSingleProcessAnalysis()
Returns:
ResultsSingleProcessAnalysis instance
input results objects pruned of non-significant estimates
"""
if settings is None:
settings = {}
# Set defaults and get parameters from settings dictionary
alpha = settings.get("alpha_fdr", 0.05)
constant = settings.get("fdr_constant", 2)
# Combine results into single results dict.
if len(results) > 1:
results_comb = cp.deepcopy(results[0])
results_comb.combine_results(*results[1:])
else:
results_comb = cp.deepcopy(results[0])
# Collect p-values of whole processes (determined by the omnibus test).
pval = np.arange(0)
process_idx = np.arange(0).astype(int)
n_perm = np.arange(0).astype(int)
for process in results_comb.processes_analysed:
next_pval = results_comb._single_process[process].ais_pval
pval = np.append(pval, next_pval if next_pval is not None else 1)
process_idx = np.append(process_idx, process)
n_perm = np.append(n_perm, results_comb.settings.n_perm_mi)
if pval.size == 0:
print("FDR correction: no links in final results ...\n")
results_comb._add_fdr(fdr=None, alpha=alpha, constant=constant)
return results_comb
sign, thresh = _perform_fdr_corretion(
pval, constant, alpha, len(results_comb.processes_analysed)
)
# If the number of permutations for calculating p-values for individual
# variables is too low, return without performing any correction.
if (1 / min(n_perm)) > thresh[0]:
print(
"WARNING: Number of permutations ('n_perm_max_seq') for at least one target is too low to allow for "
f"FDR correction (FDR-threshold: {thresh[0]:.4f}, min. theoretically possible p-value: {1 / min(n_perm)})."
)
results_comb._add_fdr(fdr=None, alpha=alpha, constant=constant)
return results_comb
# Go over list of all candidates and remove non-significant results from
# the results object. Create a copy of the results object to leave the
# original intact.
fdr = cp.deepcopy(results_comb._single_process)
for s in range(sign.shape[0]):
if not sign[s]:
t = process_idx[s]
fdr[t].selected_vars = []
fdr[t].ais_pval = 1
fdr[t].ais_sign = False
results_comb._add_fdr(fdr, alpha, constant)
return results_comb
[docs]def network_fdr(settings=None, *results):
"""Perform FDR-correction on results of network inference.
Perform correction of the false discovery rate (FDR) after network
analysis. FDR correction can either be applied at the target level
(by correcting omnibus p-values) or at the single-link level (by correcting
p-values of individual links between single samples and the target).
Input can be a list of partial results to combine results from parallel
analysis.
References:
- Genovese, C.R., Lazar, N.A., & Nichols, T. (2002). Thresholding of
statistical maps in functional neuroimaging using the false discovery
rate. Neuroimage, 15(4), 870-878.
Args:
settings : dict [optional]
parameters for statistical testing with entries:
- alpha_fdr : float [optional] - critical alpha level
(default=0.05)
- correct_by_target : bool [optional] - if true correct p-values on
on the target level (omnibus test p-values), otherwise correct
p_values for individual variables (sequential max stats p-values)
(default=True)
- fdr_constant : int [optional] - choose one of two constants used
for calculating the FDR-thresholds according to Genovese (2002):
1 will divide alpha by 1, 2 will divide alpha by the sum_i(1/i);
see the paper for details on the assumptions (default=2)
results : instances of ResultsNetworkInference
results of network inference, see documentation of
ResultsNetworkInference()
Returns:
ResultsNetworkInference instance
input object pruned of non-significant links
"""
if settings is None:
settings = {}
# Set defaults and get parameters from settings dictionary
alpha = settings.get("alpha_fdr", 0.05)
correct_by_target = settings.get("correct_by_target", True)
constant = settings.get("fdr_constant", 2)
# Combine results into single results dict.
if len(results) > 1:
results_comb = cp.deepcopy(results[0])
results_comb.combine_results(*results[1:])
else:
results_comb = cp.deepcopy(results[0])
# Collect significant source variables for all targets. Either correct
# p-value of whole target (all candidates), or correct p-value of
# individual source variables. Use targets with significant input only
# (determined by the omnibus test).
pval = np.arange(0)
target_idx = np.arange(0).astype(int)
n_perm = np.arange(0).astype(int)
cands = []
if correct_by_target: # whole target
# The total number of tests is the number of targets
n_tests = len(results_comb.targets_analysed)
for target in results_comb.targets_analysed:
next_pval = results_comb._single_target[target].omnibus_pval
pval = np.append(pval, next_pval if next_pval is not None else 1)
target_idx = np.append(target_idx, target)
n_perm = np.append(n_perm, results_comb.settings.n_perm_omnibus)
else: # individual variables
# The total number of tests is the number of targets times the number
# of source candidates (i.e. source processes * time lags) analyzed for each target
n_tests = sum(
len(results_comb._single_target[target].sources_tested)
for target in results_comb.targets_analysed
) * (settings["max_lag_sources"] - settings["min_lag_sources"] + 1)
for target in results_comb.targets_analysed:
if results_comb._single_target[target].selected_sources_pval is None:
continue
n_sign = results_comb._single_target[target].selected_sources_pval.size
pval = np.append(
pval,
[
next_pval if next_pval is not None else 1
for next_pval in results_comb._single_target[
target
].selected_sources_pval
],
)
target_idx = np.append(target_idx, np.ones(n_sign) * target).astype(int)
cands = cands + (results_comb._single_target[target].selected_vars_sources)
n_perm = np.append(n_perm, results_comb.settings.n_perm_max_seq)
if pval.size == 0:
print("No links in final results ...")
results_comb._add_fdr(
fdr=None,
alpha=alpha,
correct_by_target=correct_by_target,
constant=constant,
)
return results_comb
sign, thresh = _perform_fdr_corretion(pval, constant, alpha, n_tests)
# If the number of permutations for calculating p-values for individual
# variables is too low, return without performing any correction.
if (1 / min(n_perm)) > thresh[0]:
print(
"WARNING: Number of permutations ('n_perm_max_seq') for at least one target is too low to allow for "
f"FDR correction (FDR-threshold: {thresh[0]:.4f}, min. theoretically possible p-value: {1 / min(n_perm)})."
)
results_comb._add_fdr(
fdr=None,
alpha=alpha,
correct_by_target=correct_by_target,
constant=constant,
)
return results_comb
# Go over list of all candidates and remove non-significant results from
# the results object. Create a copy of the results object to leave the
# original intact.
fdr = cp.deepcopy(results_comb._single_target)
for i, t in enumerate(target_idx):
if not sign[i]:
if correct_by_target:
fdr[t].selected_vars_full = cp.deepcopy(
results_comb._single_target[t].selected_vars_target
)
fdr[t].selected_sources_te = None
fdr[t].selected_sources_pval = None
fdr[t].selected_vars_sources = []
fdr[t].omnibus_pval = 1
fdr[t].omnibus_sign = False
else:
cand_ind = fdr[t].selected_vars_sources.index(cands[i])
fdr[t].selected_vars_sources.remove(cands[i])
fdr[t].selected_sources_pval = np.delete(
fdr[t].selected_sources_pval, cand_ind
)
fdr[t].selected_sources_te = np.delete(
fdr[t].selected_sources_te, cand_ind
)
fdr[t].selected_vars_full.remove(cands[i])
results_comb._add_fdr(fdr, alpha, correct_by_target, constant)
return results_comb
def _perform_fdr_corretion(pval, constant, alpha, n_tests):
"""Calculate sequential threshold for FDR-correction.
Calculate sequential thresholds for FDR-correction of p-values. The
constant defines how the threshold is calculated. See Genovese (2002) for
details.
References:
- Genovese, C.R., Lazar, N.A., & Nichols, T. (2002). Thresholding of
statistical maps in functional neuroimaging using the false discovery
rate. Neuroimage, 15(4), 870-878.
Args:
pval : numpy array
p-values to be corrected
alpha : float
critical alpha level
constant : int
one of two constants used for calculating the FDR-thresholds
according to Genovese (2002): 1 will divide alpha by 1, 2 will
divide alpha by the sum_i(1/i); see the paper for details on the
assumptions
n_tests : int
total number of tests performed for calculating the FDR-thresholds
Returns:
array of bools
significance of p-values in the order of the input array
array of floats
FDR-thresholds for each p-value in increasing order
"""
# Sort all p-values in ascending order.
sort_idx = np.argsort(pval)
pval_sorted = np.sort(pval)
# Calculate threshold
if constant == 2: # pick the requested constant (see Genovese, p.872)
if n_tests < 1000:
const = np.sum(1 / np.arange(1, n_tests + 1))
else:
const = np.log(n_tests) + np.e # aprx. harmonic sum with Euler's number
elif constant == 1:
# This is less strict than the other one and corresponds to a
# Bonoferroni-correction for the first p-value, however, it makes more
# strict assumptions on the distribution of p-values, while constant 2
# works for any joint distribution of the p-values.
const = 1
# Calculate threshold for each p-value.
thresh = (np.arange(1, len(pval_sorted) + 1) / n_tests) * alpha / const
# Compare data to threshold.
sign = pval_sorted <= thresh
if np.invert(sign).any():
first_false = np.where(np.invert(sign))[0][0]
sign[first_false:] = False # avoids false positives due to equal pvals
sign[sort_idx] = sign.copy() # restore original ordering of significance values
return sign, thresh
[docs]def omnibus_test(analysis_setup, data):
"""Perform an omnibus test on identified conditional variables.
Test the joint information transfer from all identified sources to the
current value conditional on candidates in the target's past. To test for
significance, this is repeated for shuffled realisations of the sources.
The distribution of values from shuffled data is then used as test
distribution.
Args:
analysis_setup : MultivariateTE instance
information on the current analysis, can have an optional attribute
'settings', a dictionary with parameters for statistical testing:
- n_perm_omnibus : int [optional] - number of permutations
(default=500)
- alpha_omnibus : float [optional] - critical alpha level
(default=0.05)
- permute_in_time : bool [optional] - generate surrogates by
shuffling samples in time instead of shuffling whole replications
(default=False)
data : Data instance
raw data
Returns:
bool
statistical significance
float
the test's p-value
float
the estimated test statistic, i.e., the information transfer from
all sources into the target
Raises:
ex.AlgorithmExhaustedError
Raised from estimate() calls when calculation cannot be made
"""
# Set defaults and get parameters from settings dictionary
analysis_setup.settings.setdefault("n_perm_omnibus", 500)
n_permutations = analysis_setup.settings["n_perm_omnibus"]
analysis_setup.settings.setdefault("alpha_omnibus", 0.05)
alpha = analysis_setup.settings["alpha_omnibus"]
permute_in_time = _check_permute_in_time(analysis_setup, data, n_permutations)
assert analysis_setup.selected_vars_sources, "No sources to test."
# Create temporary variables b/c realisations for sources and targets are
# created on the fly, which is costly, so we want to re-use them after
# creation. (This does not apply to the current value realisations).
# If there was no target variable selected (e.g., if MI is used for network
# inference), set conditional to None such that the MI instead of the CMI
# estimator is used when calculating the statistic.
cond_source_realisations = analysis_setup._selected_vars_sources_realisations
if analysis_setup._selected_vars_target:
cond_target_realisations = analysis_setup._selected_vars_target_realisations
else:
cond_target_realisations = None
statistic = analysis_setup._cmi_estimator.estimate(
var1=cond_source_realisations,
var2=analysis_setup._current_value_realisations,
conditional=cond_target_realisations,
)
# Create the surrogate distribution by permuting the conditional sources.
if analysis_setup.settings["verbose"]:
print("omnibus test, n_perm: {0}".format(n_permutations))
if analysis_setup._cmi_estimator.is_analytic_null_estimator() and permute_in_time:
# Generate the surrogates analytically
analysis_setup.settings["analytical_surrogates"] = True
surr_distribution = analysis_setup._cmi_estimator.estimate_surrogates_analytic(
n_perm=n_permutations,
var1=cond_source_realisations,
var2=analysis_setup._current_value_realisations,
conditional=cond_target_realisations,
)
else:
analysis_setup.settings["analytical_surrogates"] = False
surr_cond_real = _get_surrogates(
data,
analysis_setup.current_value,
analysis_setup.selected_vars_sources,
n_permutations,
analysis_setup.settings,
)
surr_distribution = analysis_setup._cmi_estimator.estimate_parallel(
n_chunks=n_permutations,
re_use=["var2", "conditional"],
var1=surr_cond_real,
var2=analysis_setup._current_value_realisations,
conditional=cond_target_realisations,
)
[significance, pvalue] = _find_pvalue(
statistic, surr_distribution, alpha, "one_bigger"
)
if analysis_setup.settings["verbose"]:
if significance:
print(" -- significant\n")
else:
print(" -- not significant\n")
return significance, pvalue, statistic
[docs]def max_statistic(analysis_setup, data, candidate_set, te_max_candidate, conditional):
"""Perform maximum statistics for one candidate source.
Test if a transfer entropy value is significantly bigger than the maximum
values obtained from surrogates of all remanining candidates.
Args:
analysis_setup : MultivariateTE instance
information on the current analysis, can have an optional attribute
'settings', a dictionary with parameters for statistical testing:
- n_perm_max_stat : int [optional] - number of permutations
(default=200)
- alpha_max_stat : float [optional] - critical alpha level
(default=0.05)
- permute_in_time : bool [optional] - generate surrogates by
shuffling samples in time instead of shuffling whole replications
(default=False)
data : Data instance
raw data
candidate_set : list of tuples
list of indices of remaning candidates
te_max_candidate : float
transfer entropy value to be tested
conditional : numpy array
realisations of conditional, 2D numpy array where array dimensions
represent [realisations x variable dimension]
Returns:
bool
statistical significance
float
the test's p-value
numpy array
surrogate table
Raises:
ex.AlgorithmExhaustedError
Raised from _create_surrogate_table() when calculation cannot be
made
"""
# Set defaults and get parameters from settings dictionary
analysis_setup.settings.setdefault("n_perm_max_stat", 200)
n_perm = analysis_setup.settings["n_perm_max_stat"]
analysis_setup.settings.setdefault("alpha_max_stat", 0.05)
alpha = analysis_setup.settings["alpha_max_stat"]
_check_permute_in_time(analysis_setup, data, n_perm)
assert candidate_set, "The candidate set is empty."
if analysis_setup.settings["verbose"]:
print(
"maximum statistic, n_perm: {0}".format(
analysis_setup.settings["n_perm_max_stat"]
)
)
# todo pass correct conditioning set
surr_table = _create_surrogate_table(
analysis_setup, data, candidate_set, n_perm, conditional
)
max_distribution = _find_table_max(surr_table)
[significance, pvalue] = _find_pvalue(
statistic=te_max_candidate,
distribution=max_distribution,
alpha=alpha,
tail="one_bigger",
)
return significance, pvalue, surr_table
[docs]def max_statistic_sequential(analysis_setup, data):
"""Perform sequential maximum statistics for a set of candidate sources.
Test multivariate/bivariate MI/TE values against surrogates. Test highest
TE/MI value against distribution of highest surrogate values, second
highest against distribution of second highest, and so forth. Surrogates
are created from each candidate in the candidate set, including the
candidate that is currently tested. Surrogates are then sorted over
candidates. This is repeated n_perm_max_seq times. Stop comparison if a
TE/MI value is not significant compared to the distribution of surrogate
values of the same rank. All smaller values are considered non-significant
as well.
The conditional for estimation of MI/TE is taken from the current set of
conditional variables in the analysis setup. For multivariate MI or TE
surrogate creation, the full set of conditional variables is used. For
bivariate MI or TE surrogate creation, the conditioning set has to be
restricted to a subset of the current set of conditional variables: for
bivariate MI no conditioning set is required, for bivariate TE only the
past variables from the target are required (not the variables selected
from other relevant sources).
This function will re-use the surrogate table created in the last min-stats
round if that table is in the analysis_setup. This saves the complete
calculation of surrogates for this statistic.
Args:
analysis_setup : MultivariateTE instance
information on the current analysis, can have an optional attribute
settings, a dictionary with parameters for statistical testing:
- n_perm_max_seq : int [optional] - number of permutations
(default=n_perm_min_stat|500)
- alpha_max_seq : float [optional] - critical alpha level
(default=0.05)
- permute_in_time : bool [optional] - generate surrogates by
shuffling samples in time instead of shuffling whole replications
(default=False)
data : Data instance
raw data
Returns:
numpy array, bool
statistical significance of each source
numpy array, float
the test's p-values for each source
numpy array, float
TE values for individual sources
"""
# Set defaults and get test parameters.
analysis_setup.settings.setdefault("n_perm_max_seq", 500)
n_permutations = analysis_setup.settings["n_perm_max_seq"]
analysis_setup.settings.setdefault("alpha_max_seq", 0.05)
alpha = analysis_setup.settings["alpha_max_seq"]
_check_permute_in_time(analysis_setup, data, n_permutations)
permute_in_time = analysis_setup.settings["permute_in_time"]
if analysis_setup.settings["verbose"]:
print(
f"sequential maximum statistic, n_perm: {n_permutations}, testing {len(analysis_setup.selected_vars_sources)} selected sources"
)
assert analysis_setup.selected_vars_sources, "No sources to test."
idx_conditional = analysis_setup.selected_vars_full
conditional_realisations = np.empty(
(
data.n_realisations(analysis_setup.current_value)
* len(analysis_setup.selected_vars_sources),
len(idx_conditional) - 1,
)
).astype(data.data_type)
candidate_realisations = np.empty(
(
data.n_realisations(analysis_setup.current_value)
* len(analysis_setup.selected_vars_sources),
1,
)
).astype(data.data_type)
# Calculate TE for each candidate in the conditional source set, i.e.,
# calculate the conditional MI between each candidate and the current
# value, conditional on all selected variables in the conditioning set,
# excluding the current source. Calculate surrogates for each candidate by
# shuffling the candidate realisations n_perm times. Afterwards, sort the
# estimated TE values.
i_1 = 0
i_2 = data.n_realisations(analysis_setup.current_value)
surr_table = np.zeros((len(analysis_setup.selected_vars_sources), n_permutations))
# Collect data for each candidate and the corresponding conditioning set.
# Use realisations for parallel estimation of the test statistic later.
for idx_c, candidate in enumerate(analysis_setup.selected_vars_sources):
[
conditional_realisations_current,
candidate_realisations_current,
] = analysis_setup._separate_realisations(idx_conditional, candidate)
# The following may happen if either the requested conditing is 'none'
# or if the conditiong set that is tested consists only of a single
# candidate.
if conditional_realisations_current is None:
conditional_realisations = None
re_use = ["var2", "conditional"]
else:
conditional_realisations[i_1:i_2,] = conditional_realisations_current
re_use = ["var2"]
candidate_realisations[i_1:i_2,] = candidate_realisations_current
i_1 = i_2
i_2 += data.n_realisations(analysis_setup.current_value)
# Generate surrogates for the current candidate.
if (
analysis_setup._cmi_estimator.is_analytic_null_estimator()
and permute_in_time
):
# Generate the surrogates analytically
surr_table[
idx_c, :
] = analysis_setup._cmi_estimator.estimate_surrogates_analytic(
n_perm=n_permutations,
var1=data.get_realisations(analysis_setup.current_value, [candidate])[
0
],
var2=analysis_setup._current_value_realisations,
conditional=conditional_realisations_current,
)
else:
analysis_setup.settings["analytical_surrogates"] = False
surr_candidate_realisations = _get_surrogates(
data,
analysis_setup.current_value,
[candidate],
n_permutations,
analysis_setup.settings,
)
try:
surr_table[idx_c, :] = analysis_setup._cmi_estimator.estimate_parallel(
n_chunks=n_permutations,
re_use=["var2", "conditional"],
var1=surr_candidate_realisations,
var2=analysis_setup._current_value_realisations,
conditional=conditional_realisations_current,
)
except ex.AlgorithmExhaustedError as aee:
# The aglorithm cannot continue here, so
# we'll terminate the max sequential stats test,
# and declare all not significant
print(
f"AlgorithmExhaustedError encountered in estimations: {aee.message}. "
"Stopping sequential max stats at candidate with rank 0"
)
return (
np.zeros(len(analysis_setup.selected_vars_sources)).astype(bool),
np.ones(len(analysis_setup.selected_vars_sources)),
np.zeros(len(analysis_setup.selected_vars_sources)),
)
# Calculate original statistic (multivariate/bivariate TE/MI)
try:
individual_stat = analysis_setup._cmi_estimator.estimate_parallel(
n_chunks=len(analysis_setup.selected_vars_sources),
re_use=re_use,
var1=candidate_realisations,
var2=analysis_setup._current_value_realisations,
conditional=conditional_realisations,
)
except ex.AlgorithmExhaustedError as aee:
# The aglorithm cannot continue here, so
# we'll terminate the max sequential stats test,
# and declare all not significant
print(
f"AlgorithmExhaustedError encountered in estimations: {aee.message}. "
"Stopping sequential max stats at candidate with rank 0"
)
# For now we don't need a stack trace:
# traceback.print_tb(aee.__traceback__)
# Return (signficance, pvalue, TEs):
return (
np.zeros(len(analysis_setup.selected_vars_sources)).astype(bool),
np.ones(len(analysis_setup.selected_vars_sources)),
np.zeros(len(analysis_setup.selected_vars_sources)),
)
selected_vars_order = utils.argsort_descending(individual_stat)
individual_stat_sorted = utils.sort_descending(individual_stat)
max_distribution = _sort_table_max(surr_table)
# Compare each original value with the distribution of the same rank,
# starting with the highest value.
significance = np.zeros(individual_stat.shape[0]).astype(bool)
pvalue = np.ones(individual_stat.shape[0])
for c in range(individual_stat.shape[0]):
[s, p] = _find_pvalue(
individual_stat_sorted[c], max_distribution[c,], alpha, tail="one_bigger"
)
significance[c] = s
pvalue[c] = p
if not s: # break as soon as a candidate is no longer significant
if analysis_setup.settings["verbose"]:
print(f"\nStopping sequential max stats at candidate with rank {c}.")
break
# Get back original order and return results.
significance[selected_vars_order] = significance.copy()
pvalue[selected_vars_order] = pvalue.copy()
return significance, pvalue, individual_stat
[docs]def max_statistic_sequential_bivariate(analysis_setup, data):
"""Perform sequential maximum statistics for a set of candidate sources.
Test multivariate/bivariate MI/TE values against surrogates. Test highest
TE/MI value against distribution of highest surrogate values, second
highest against distribution of second highest, and so forth. Surrogates
are created from each candidate in the candidate set, including the
candidate that is currently tested. Surrogates are then sorted over
candidates. This is repeated n_perm_max_seq times. Stop comparison if a
TE/MI value is not significant compared to the distribution of surrogate
values of the same rank. All smaller values are considered non-significant
as well.
The conditional for estimation of MI/TE is taken from the current set of
conditional variables in the analysis setup. For multivariate MI or TE
surrogate creation, the full set of conditional variables is used. For
bivariate MI or TE surrogate creation, the conditioning set has to be
restricted to a subset of the current set of conditional variables: for
bivariate MI no conditioning set is required, for bivariate TE only the
past variables from the target are required (not the variables selected
from other relevant sources).
This function will re-use the surrogate table created in the last min-stats
round if that table is in the analysis_setup. This saves the complete
calculation of surrogates for this statistic.
Args:
analysis_setup : MultivariateTE instance
information on the current analysis, can have an optional attribute
settings, a dictionary with parameters for statistical testing:
- n_perm_max_seq : int [optional] - number of permutations
(default=n_perm_min_stat|500)
- alpha_max_seq : float [optional] - critical alpha level
(default=0.05)
- permute_in_time : bool [optional] - generate surrogates by
shuffling samples in time instead of shuffling whole replications
(default=False)
data : Data instance
raw data
Returns:
numpy array, bool
statistical significance of each source
numpy array, float
the test's p-values for each source
numpy array, float
TE values for individual sources
"""
# Set defaults and get test parameters.
analysis_setup.settings.setdefault("n_perm_max_seq", 500)
n_permutations = analysis_setup.settings["n_perm_max_seq"]
analysis_setup.settings.setdefault("alpha_max_seq", 0.05)
alpha = analysis_setup.settings["alpha_max_seq"]
_check_permute_in_time(analysis_setup, data, n_permutations)
permute_in_time = analysis_setup.settings["permute_in_time"]
if analysis_setup.settings["verbose"]:
print(
f"sequential maximum statistic, n_perm: {n_permutations}, testing {len(analysis_setup.selected_vars_sources)} selected sources"
)
assert analysis_setup.selected_vars_sources, "No sources to test."
# Check if target variables were selected to distinguish between TE and MI
# analysis.
if len(analysis_setup._selected_vars_target) == 0:
conditional_realisations_target = None
else:
conditional_realisations_target = (
analysis_setup._selected_vars_target_realisations
)
# Test all selected sources separately. This way, the conditioning
# uses past variables from the current source only (opposed to past
# variables from all sources as in multivariate network inference).
significant_sources = np.unique(
[s[0] for s in analysis_setup.selected_vars_sources]
)
significance = np.zeros(len(analysis_setup.selected_vars_sources)).astype(bool)
pvalue = np.ones(len(analysis_setup.selected_vars_sources))
individual_stat = np.zeros(len(analysis_setup.selected_vars_sources))
for source in significant_sources:
# Find selected past variables for current source
source_vars = [
s for s in analysis_setup.selected_vars_sources if s[0] == source
]
source_vars_idx = [
i
for i, s in enumerate(analysis_setup.selected_vars_sources)
if s[0] == source
]
# Determine length of conditioning set and allocate memory.
idx_conditional = source_vars.copy()
if conditional_realisations_target is not None:
idx_conditional += analysis_setup.selected_vars_target
conditional_realisations = np.empty(
(
data.n_realisations(analysis_setup.current_value) * len(source_vars),
len(idx_conditional) - 1,
)
).astype(data.data_type)
candidate_realisations = np.empty(
(data.n_realisations(analysis_setup.current_value) * len(source_vars), 1)
).astype(data.data_type)
# Calculate TE/MI for each candidate in the conditional source set,
# i.e., calculate the conditional MI between each candidate and the
# current value, conditional on all selected variables in the
# conditioning set. Then sort the estimated TE/MI values.
i_1 = 0
i_2 = data.n_realisations(analysis_setup.current_value)
surr_table = np.zeros((len(source_vars), n_permutations))
# Collect data for each candidate and the corresponding conditioning set.
for idx_c, candidate in enumerate(source_vars):
temp_cond = data.get_realisations(
analysis_setup.current_value,
set(source_vars).difference(set([candidate])),
)[0]
temp_cand = data.get_realisations(
analysis_setup.current_value, [candidate]
)[0]
# The following may happen if either the requested conditioning is
# 'none' or if the conditioning set that is tested consists only of
# a single candidate.
if temp_cond is None:
conditional_realisations = conditional_realisations_target
re_use = ["var2", "conditional"]
else:
re_use = ["var2"]
if conditional_realisations_target is None:
conditional_realisations[i_1:i_2,] = temp_cond
else:
conditional_realisations[i_1:i_2,] = np.hstack(
(temp_cond, conditional_realisations_target)
)
candidate_realisations[i_1:i_2,] = temp_cand
i_1 = i_2
i_2 += data.n_realisations(analysis_setup.current_value)
# Generate surrogates for the current candidate.
if (
analysis_setup._cmi_estimator.is_analytic_null_estimator()
and permute_in_time
):
# Generate the surrogates analytically
surr_table[
idx_c, :
] = analysis_setup._cmi_estimator.estimate_surrogates_analytic(
n_perm=n_permutations,
var1=data.get_realisations(
analysis_setup.current_value, [candidate]
)[0],
var2=analysis_setup._current_value_realisations,
conditional=temp_cond,
)
else:
analysis_setup.settings["analytical_surrogates"] = False
surr_candidate_realisations = _get_surrogates(
data,
analysis_setup.current_value,
[candidate],
n_permutations,
analysis_setup.settings,
)
try:
surr_table[
idx_c, :
] = analysis_setup._cmi_estimator.estimate_parallel(
n_chunks=n_permutations,
re_use=["var2", "conditional"],
var1=surr_candidate_realisations,
var2=analysis_setup._current_value_realisations,
conditional=temp_cond,
)
except ex.AlgorithmExhaustedError as aee:
# The aglorithm cannot continue here, so
# we'll terminate the max sequential stats test,
# and declare all not significant
print(
f"AlgorithmExhaustedError encountered in estimations: {aee.message}. "
"Stopping sequential max stats at candidate with rank 0"
)
return (
np.zeros(len(analysis_setup.selected_vars_sources)).astype(
bool
),
np.ones(len(analysis_setup.selected_vars_sources)),
np.zeros(len(analysis_setup.selected_vars_sources)),
)
# Calculate original statistic (bivariate TE/MI)
try:
individual_stat_source = analysis_setup._cmi_estimator.estimate_parallel(
n_chunks=len(source_vars),
re_use=re_use,
var1=candidate_realisations,
var2=analysis_setup._current_value_realisations,
conditional=conditional_realisations,
)
except ex.AlgorithmExhaustedError as aee:
# The aglorithm cannot continue here, so
# we'll terminate the max sequential stats test,
# and declare all not significant
print(
f"AlgorithmExhaustedError encountered in estimations: {aee.message}. "
"Stopping sequential max stats at candidate with rank 0"
)
# For now we don't need a stack trace:
# traceback.print_tb(aee.__traceback__)
# Return (signficance, pvalue, TEs):
return (
np.zeros(len(analysis_setup.selected_vars_sources)).astype(bool),
np.ones(len(analysis_setup.selected_vars_sources)),
np.zeros(len(analysis_setup.selected_vars_sources)),
)
selected_vars_order = utils.argsort_descending(individual_stat_source)
individual_stat_source_sorted = utils.sort_descending(individual_stat_source)
max_distribution = _sort_table_max(surr_table)
significance_source = np.zeros(individual_stat_source.shape[0], dtype=bool)
pvalue_source = np.ones(individual_stat_source.shape[0])
# Compare each original value with the distribution of the same rank,
# starting with the highest value.
for c in range(individual_stat_source.shape[0]):
s, p = _find_pvalue(
individual_stat_source_sorted[c],
max_distribution[c,],
alpha,
tail="one_bigger",
)
significance_source[c] = s
pvalue_source[c] = p
if not s: # break as soon as a candidate is no longer significant
if analysis_setup.settings["verbose"]:
print(
f"\nStopping sequential max stats at candidate with rank {c}."
)
break
# Get back original order of variables within the current source. Write
# re-ordered results into global results array at the respective
# variable locations.
significance_source[selected_vars_order] = significance_source.copy()
pvalue_source[selected_vars_order] = pvalue_source.copy()
significance[source_vars_idx] = significance_source
pvalue[source_vars_idx] = pvalue_source
individual_stat[source_vars_idx] = individual_stat_source
return significance, pvalue, individual_stat
[docs]def min_statistic(
analysis_setup, data, candidate_set, te_min_candidate, conditional=None
):
"""Perform minimum statistics for one candidate source.
Test if a transfer entropy value is significantly bigger than the minimum
values obtained from surrogates of all remanining candidates.
Args:
analysis_setup : MultivariateTE instance
information on the current analysis, can have an optional attribute
'settings', a dictionary with parameters for statistical testing:
- n_perm_min_stat : int [optional] - number of permutations
(default=500)
- alpha_min_stat : float [optional] - critical alpha level
(default=0.05)
- permute_in_time : bool [optional] - generate surrogates by
shuffling samples in time instead of shuffling whole replications
(default=False)
data : Data instance
raw data
candidate_set : list of tuples
list of indices of remaning candidates
te_min_candidate : float
transfer entropy value to be tested
conditional : numpy array [optional]
realisations of conditional, 2D numpy array where array dimensions
represent [realisations x variable dimension] (default=None, no
conditioning performed)
Returns:
bool
statistical significance
float
the test's p-value
numpy array
surrogate table
Raises:
ex.AlgorithmExhaustedError
Raised from _create_surrogate_table() when calculation cannot be
made
"""
# Set defaults and get parameters from settings dictionary
analysis_setup.settings.setdefault("n_perm_min_stat", 500)
n_perm = analysis_setup.settings["n_perm_min_stat"]
analysis_setup.settings.setdefault("alpha_min_stat", 0.05)
alpha = analysis_setup.settings["alpha_min_stat"]
_check_permute_in_time(analysis_setup, data, n_perm)
if analysis_setup.settings["verbose"]:
print(
"minimum statistic, n_perm: {0}".format(
analysis_setup.settings["n_perm_min_stat"]
)
)
assert candidate_set, "The candidate set is empty."
surr_table = _create_surrogate_table(
analysis_setup, data, candidate_set, n_perm, conditional
)
min_distribution = _find_table_min(surr_table)
[significance, pvalue] = _find_pvalue(
statistic=te_min_candidate,
distribution=min_distribution,
alpha=alpha,
tail="one_bigger",
)
return significance, pvalue, surr_table
[docs]def mi_against_surrogates(analysis_setup, data):
"""Test estimated mutual information for significance against surrogate data.
Shuffle realisations of the current value (point to be predicted) and re-
calculate mutual information (MI) for shuffled data. The actual estimated
MI is then compared against this distribution of MI values from surrogate
data.
Args:
analysis_setup : MultivariateTE instance
information on the current analysis, can have an optional attribute
'settings', a dictionary with parameters for statistical testing:
- n_perm_mi : int [optional] - number of permutations
(default=500)
- alpha_mi : float [optional] - critical alpha level
(default=0.05)
- permute_in_time : bool [optional] - generate surrogates by
shuffling samples in time instead of shuffling whole replications
(default=False)
data : Data instance
raw data
Returns:
float
estimated MI value
bool
statistical significance
float
p_value for estimated MI value
Raises:
ex.AlgorithmExhaustedError
Raised from estimate() methods when calculation cannot be made
"""
analysis_setup.settings.setdefault("n_perm_mi", 500)
n_perm = analysis_setup.settings["n_perm_mi"]
analysis_setup.settings.setdefault("alpha_mi", 0.05)
alpha = analysis_setup.settings["alpha_mi"]
permute_in_time = _check_permute_in_time(analysis_setup, data, n_perm)
if analysis_setup.settings["verbose"]:
print(
f"mi permutation test against surrogates, n_perm: {analysis_setup.settings['n_perm_mi']}"
)
if analysis_setup._cmi_estimator.is_analytic_null_estimator() and permute_in_time:
# Generate the surrogates analytically
analysis_setup.settings["analytical_surrogates"] = True
surr_dist = analysis_setup._cmi_estimator.estimate_surrogates_analytic(
n_perm=n_perm,
var1=analysis_setup._current_value_realisations,
var2=analysis_setup._selected_vars_realisations,
conditional=None,
)
else:
analysis_setup.settings["analytical_surrogates"] = False
surr_realisations = _get_surrogates(
data,
analysis_setup.current_value,
[analysis_setup.current_value],
n_perm,
analysis_setup.settings,
)
surr_dist = analysis_setup._cmi_estimator.estimate_parallel(
n_chunks=n_perm,
re_use=["var2", "conditional"],
var1=surr_realisations,
var2=analysis_setup._selected_vars_realisations,
conditional=None,
)
orig_mi = analysis_setup._cmi_estimator.estimate(
var1=analysis_setup._current_value_realisations,
var2=analysis_setup._selected_vars_realisations,
conditional=None,
)
[significance, p_value] = _find_pvalue(
statistic=orig_mi, distribution=surr_dist, alpha=alpha, tail="one_bigger"
)
return [orig_mi, significance, p_value]
[docs]def unq_against_surrogates(analysis_setup, data):
"""Test the unique information in the PID estimate against surrogate data.
Shuffle realisations of both sources individually and re-calculate PID,
in particular the unique information from shuffled data. The original
unique information is then compared against the distribution of values
calculated from surrogate data.
Args:
analysis_setup : Partial_information_decomposition instance
information on the current analysis, should have an Attribute
'settings', a dict with optional fields
- n_perm : int [optional] - number of permutations (default=500)
- alpha : float [optional] - critical alpha level (default=0.05)
- permute_in_time : bool [optional] - generate surrogates by
shuffling samples in time instead of shuffling whole replications
(default=False)
data : Data instance
raw data
Returns:
dict
PID estimate from original data
bool
statistical significance of the unique information in source 1
float
p-value of the unique information in source 1
bool
statistical significance of the unique information in source 2
float
p-value of the unique information in source 2
"""
# Get analysis settings and defaults.
analysis_setup.settings.setdefault("n_perm", 500)
n_perm = analysis_setup.settings["n_perm"]
analysis_setup.settings.setdefault("alpha", 0.05)
alpha = analysis_setup.settings["alpha"]
_check_permute_in_time(analysis_setup, data, n_perm)
# Get realisations and estimate PID for orginal data
target_realisations = data.get_realisations(
analysis_setup.current_value, [analysis_setup.current_value]
)[0]
source_1_realisations = data.get_realisations(
analysis_setup.current_value, [analysis_setup.sources[0]]
)[0]
source_2_realisations = data.get_realisations(
analysis_setup.current_value, [analysis_setup.sources[1]]
)[0]
orig_pid = analysis_setup._pid_estimator.estimate(
settings=analysis_setup.settings,
s1=source_1_realisations,
s2=source_2_realisations,
t=target_realisations,
)
# Test unique information from source 1
surr_realisations = _get_surrogates(
data,
analysis_setup.current_value,
[analysis_setup.sources[0]],
n_perm,
analysis_setup.settings,
)
# Calculate surrogate distribution for unique information of source 1.
# Note: calling .estimate_parallel does not work here because the PID
# estimator returns a dictionary not a single value. We have to get the
# unique from the dictionary manually.
surr_dist_s1 = np.empty(n_perm)
chunk_size = int(surr_realisations.shape[0] / n_perm)
i_1 = 0
i_2 = chunk_size
if analysis_setup.settings["verbose"]:
print("\nTesting unq information in s1")
for p in range(n_perm):
if analysis_setup.settings["verbose"]:
print("\tperm {0} of {1}".format(p, n_perm))
pid_est = analysis_setup._pid_estimator.estimate(
settings=analysis_setup.settings,
s1=surr_realisations[i_1:i_2, :],
s2=source_2_realisations,
t=target_realisations,
)
surr_dist_s1[p] = pid_est["unq_s1"]
i_1 = i_2
i_2 += chunk_size
# Test unique information from source 2
surr_realisations = _get_surrogates(
data,
analysis_setup.current_value,
[analysis_setup.sources[1]],
n_perm,
analysis_setup.settings,
)
# Calculate surrogate distribution for unique information of source 2.
surr_dist_s2 = np.empty(n_perm)
chunk_size = int(surr_realisations.shape[0] / n_perm)
i_1 = 0
i_2 = chunk_size
if analysis_setup.settings["verbose"]:
print("\nTesting unq information in s2")
for p in range(n_perm):
if analysis_setup.settings["verbose"]:
print("\tperm {0} of {1}".format(p, n_perm))
pid_est = analysis_setup._pid_estimator.estimate(
settings=analysis_setup.settings,
s1=source_1_realisations,
s2=surr_realisations[i_1:i_2, :],
t=target_realisations,
)
surr_dist_s2[p] = pid_est["unq_s2"]
i_1 = i_2
i_2 += chunk_size
[sign_1, p_val_1] = _find_pvalue(
statistic=orig_pid["unq_s1"],
distribution=surr_dist_s1,
alpha=alpha,
tail="one_bigger",
)
[sign_2, p_val_2] = _find_pvalue(
statistic=orig_pid["unq_s2"],
distribution=surr_dist_s2,
alpha=alpha,
tail="one_bigger",
)
return [orig_pid, sign_1, p_val_1, sign_2, p_val_2]
[docs]def syn_shd_against_surrogates(analysis_setup, data):
"""Test the shared/synergistic information in the PID estimate.
Shuffle realisations of the target and re-calculate PID, in particular the
synergistic and shared information from shuffled data. The original
shared and synergistic information are then compared against the
distribution of values calculated from surrogate data.
Args:
analysis_setup : Partial_information_decomposition instance
information on the current analysis, should have an Attribute
'settings', a dict with optional fields
- n_perm : int [optional] - number of permutations (default=500)
- alpha : float [optional] - critical alpha level (default=0.05)
- permute_in_time : bool [optional] - generate surrogates by
shuffling samples in time instead of shuffling whole replications
(default=False)
data : Data instance
raw data
Returns:
dict
PID estimate from original data
bool
statistical significance of the shared information
float
p-value of the shared information
bool
statistical significance of the synergistic information
float
p-value of the synergistic information
"""
# Get analysis settings and defaults.
analysis_setup.settings.setdefault("n_perm", 500)
n_perm = analysis_setup.settings["n_perm"]
analysis_setup.settings.setdefault("alpha", 0.05)
alpha = analysis_setup.settings["alpha"]
_check_permute_in_time(analysis_setup, data, n_perm)
# Get realisations and estimate PID for original data
target_realisations = data.get_realisations(
analysis_setup.current_value, [analysis_setup.current_value]
)[0]
source_1_realisations = data.get_realisations(
analysis_setup.current_value, [analysis_setup.sources[0]]
)[0]
source_2_realisations = data.get_realisations(
analysis_setup.current_value, [analysis_setup.sources[1]]
)[0]
orig_pid = analysis_setup._pid_estimator.estimate(
settings=analysis_setup.settings,
s1=source_1_realisations,
s2=source_2_realisations,
t=target_realisations,
)
# Test shared and synergistic information from both sources
surr_realisations = _get_surrogates(
data,
analysis_setup.current_value,
[analysis_setup.current_value],
n_perm,
analysis_setup.settings,
)
# Calculate surrogate distribution for shd/syn information of both sources.
# Note: calling .estimate_parallel does not work here because the PID
# estimator returns a dictionary not a single value. We have to get the
# shared info and synergy from the dictionary manually.
surr_dist_shd = np.empty(n_perm)
surr_dist_syn = np.empty(n_perm)
chunk_size = int(surr_realisations.shape[0] / n_perm)
i_1 = 0
i_2 = chunk_size
if analysis_setup.settings["verbose"]:
print("\nTesting shd and syn information in both sources")
for p in range(n_perm):
if analysis_setup.settings["verbose"]:
print("\tperm {0} of {1}".format(p, n_perm))
pid_est = analysis_setup._pid_estimator.estimate(
settings=analysis_setup.settings,
s1=source_1_realisations,
s2=source_2_realisations,
t=surr_realisations[i_1:i_2, :],
)
surr_dist_shd[p] = pid_est["shd_s1_s2"]
surr_dist_syn[p] = pid_est["syn_s1_s2"]
i_1 = i_2
i_2 += chunk_size
[sign_shd, p_val_shd] = _find_pvalue(
statistic=orig_pid["shd_s1_s2"],
distribution=surr_dist_shd,
alpha=alpha,
tail="one_bigger",
)
[sign_syn, p_val_syn] = _find_pvalue(
statistic=orig_pid["syn_s1_s2"],
distribution=surr_dist_syn,
alpha=alpha,
tail="one_bigger",
)
return [orig_pid, sign_shd, p_val_shd, sign_syn, p_val_syn]
[docs]def check_n_perm(n_perm, alpha):
"""Check if no. permutations is big enough to obtain the requested alpha.
Note:
The no. permutations must be big enough to theoretically allow for the
detection of a p-value that is smaller than the critical alpha level.
Otherwise the permutation test is pointless. The smalles possible
p-value is 1/n_perm.
"""
if not 1.0 / n_perm < alpha:
raise RuntimeError(
"The number of permutations {0} is to small to test"
" the requested alpha level {1}. The number of "
"permutations must be greater than 1/alpha.".format(n_perm, alpha)
)
def _create_surrogate_table(
analysis_setup, data, idx_test_set, n_perm, conditional=None
):
"""Create a table of surrogate MI/CMI/TE values.
Calculate MI/CMI/TE between surrogates for each source variable in the test
set and the target in the analysis setup. The conditional is taken from the
current set of conditional variables in the analysis setup. For
multivariate MI or TE surrogate creation, the full set of conditional
variables is used. For bivariate MI or TE surrogate creation, the
conditioning set has to be restricted to a subset of the current set of
conditional variables: for bivariate MI no conditioning set is required,
for bivariate TE only the past variables from the target are required (not
the variables selected from other relevant sources).
Args:
analysis_setup : instance of NetworkAnalysis or child class
information on the current analysis, must contain an attribute
settings with entry 'permute_in_time'
data : Data instance
raw data
idx_test_set : list of tuples
list of indices indicating samples to be used as sources
n_perm : int
number of permutations for testing
conditional : numpy array [optional]
realisations of conditional, 2D numpy array where array dimensions
represent [realisations x variable dimension] (default=None, no
conditioning performed)
Returns:
numpy array
surrogate MI/CMI/TE values, dimensions: (length test set, number of
surrogates)
Raises:
ex.AlgorithmExhaustedError
Raised from estimate_parallel() when calculation cannot be made
"""
# Check which permutation type is requested by the calling function.
permute_in_time = analysis_setup.settings["permute_in_time"]
# Create surrogate table.
surr_table = np.zeros((len(idx_test_set), n_perm))
current_value_realisations = analysis_setup._current_value_realisations
idx_c = 0
for candidate in idx_test_set:
if (
analysis_setup._cmi_estimator.is_analytic_null_estimator()
and permute_in_time
):
# Generate the surrogates analytically
analysis_setup.settings["analytical_surrogates"] = True
surr_table[
idx_c, :
] = analysis_setup._cmi_estimator.estimate_surrogates_analytic(
n_perm=n_perm,
var1=data.get_realisations(analysis_setup.current_value, [candidate])[
0
],
var2=current_value_realisations,
conditional=conditional,
)
else:
analysis_setup.settings["analytical_surrogates"] = False
surr_candidate_realisations = _get_surrogates(
data,
analysis_setup.current_value,
[candidate],
n_perm,
analysis_setup.settings,
)
surr_table[idx_c, :] = analysis_setup._cmi_estimator.estimate_parallel(
n_chunks=n_perm,
re_use=["var2", "conditional"],
var1=surr_candidate_realisations,
var2=current_value_realisations,
conditional=conditional,
)
idx_c += 1
return surr_table
def _find_table_max(table):
"""Find maximum for each column of a table."""
return np.max(table, axis=0)
def _find_table_min(table):
"""Find minimum for each column of a table."""
return np.min(table, axis=0)
def _sort_table_min(table):
"""Sort each column in a table in ascending order."""
for permutation in range(table.shape[1]):
table[:, permutation].sort()
return table
def _sort_table_max(table):
"""Sort each column in a table in descending order."""
table_sorted = np.empty(table.shape)
for permutation in range(0, table.shape[1]):
table_sorted[:, permutation] = utils.sort_descending(table[:, permutation])
return table_sorted
def _find_pvalue(statistic, distribution, alpha, tail):
"""Find p-value of a test statistic under some distribution.
Args:
statistic : numeric
value to be tested against distribution
distribution : numpy array
1-dimensional distribution of values, test distribution
alpha : float
critical alpha level for statistical significance
tail : str
'one' or 'one_bigger' for one-tailed testing H1 > H0,
'one_smaller' for one- tailed testing H1 < H0, or 'two' for two-
tailed testing
Returns:
bool
statistical significance
float
the test's p-value
"""
assert alpha <= 1.0, "Critical alpha levels needs to be smaller than 1."
assert distribution.ndim == 1, "Test distribution must be 1D."
check_n_perm(distribution.shape[0], alpha)
if tail in ["one_bigger", "one"]:
pvalue = sum(distribution >= statistic) / distribution.shape[0]
elif tail == "one_smaller":
pvalue = sum(distribution <= statistic) / distribution.shape[0]
elif tail == "two":
p_bigger = sum(distribution >= statistic) / distribution.shape[0]
p_smaller = sum(distribution <= statistic) / distribution.shape[0]
pvalue = min(p_bigger, p_smaller)
alpha = alpha / 2
else:
raise ValueError(
f"Unkown value for tail: {tail}, should be one, one_bigger, one_smaller, or two"
)
# If the statistic is larger than all values in the test distribution, set
# the p-value to the smallest possible value 1/n_perm.
if pvalue == 0:
pvalue = 1.0 / distribution.shape[0]
significance = pvalue < alpha
return significance, pvalue
def _sufficient_replications(data, n_perm):
"""Test if no. replications is high enough for surrogate creation.
Test if the number of replications is high enough to allow for the required
number of permutations.
"""
if np.math.factorial(data.n_replications) > n_perm:
return True
else:
return False
def _get_surrogates(data, current_value, idx_list, n_perm, perm_settings):
"""Return surrogate data for statistical testing.
Calls surrogate generation methods of the data instance. The method for
surrogate generation depends on whether sufficient replications of the data
exists. If the number of replications is high enough (reps! >
n_permutations), surrogates are created by shuffling data over replications
(while keeping the temporal order of samples intact). If the number of
replications is too low, samples are shuffled over time (while keeping the
order of replications intact). The latter method can be forced by setting
'permute_in_time' to True in 'perm_settings'.
Args:
data : Data instance
raw data for analysis
current_value : tuple
index of the current value in current analysis, has to have the
form (idx process, idx sample)
idx_list : list of tuples
list of variables, for which surrogates have to be created
n_perm : int
number of permutations
perm_settings : dict
settings for surrogate creation by shuffling samples over time, set
'permute_in_time' to True to create surrogates by shuffling data
over time. See Data.permute_samples() for settings for surrogate
creation.
Returns:
numpy array
surrogate data with dimensions
(realisations * n_perm) x len(idx_list)
"""
# Allocate memory for surrogates
n_realisations = data.n_realisations(current_value)
surrogates = np.empty((n_realisations * n_perm, len(idx_list))).astype(
data.data_type
)
# Check if the user requested to permute samples in time and not over
# replications
permute_in_time = perm_settings["permute_in_time"]
# Generate surrogates by permuting over replications if possible (no.
# replications needs to be sufficient); else permute samples over time.
i_1 = 0
i_2 = n_realisations
# permute samples
if permute_in_time:
for perm in range(n_perm):
surrogates[i_1:i_2,] = data.permute_samples(
current_value, idx_list, perm_settings
)[0]
i_1 = i_2
i_2 += n_realisations
else: # permute replications
assert _sufficient_replications(
data, n_perm
), "Not enough replications for surrogate creation."
for perm in range(n_perm):
surrogates[i_1:i_2,] = data.permute_replications(current_value, idx_list)[0]
i_1 = i_2
i_2 += n_realisations
return surrogates
def _generate_spectral_surrogates(data, scale, n_perm, perm_settings):
"""Generate surrogate data for statistical testing of spectral TE.
The method for surrogate generation depends on whether sufficient
replications of the data exists. If the number of replications is high
enough (reps! > n_permutations), surrogates are created by shuffling data
over replications (while keeping the temporal order of samples intact). If
the number of replications is too low, samples are shuffled over time
(while keeping the order of replications intact).
Args:
data : Data instance
raw data for analysis
scale : int
index of the scale to be shuffled
n_perm : int
number of permutations
perm_settings : dict
settings for surrogate creation by shuffling samples over time
Returns:
numpy array
surrogate data with dimensions
(realisations * n_perm) x len(idx_list)
"""
# Allocate memory for surrogates
surrogates = np.empty((data.n_samples, data.n_replications, n_perm)).astype(
data.data_type
)
permute_in_time = perm_settings["permute_in_time"]
# Generate surrogates by permuting over replications if possible (no.
# replications needs to be sufficient); else permute samples over time.
if permute_in_time:
for perm in range(n_perm):
surrogates[:, :, perm] = data.slice_permute_samples(scale, perm_settings)[0]
else:
assert _sufficient_replications(data, n_perm)
for perm in range(n_perm):
surrogates[:, :, perm] = data.slice_permute_replications(scale)[0]
return surrogates
def _check_permute_in_time(analysis_setup, data, n_perm):
"""Set defaults for permuting samples in time.
The default for creating surrogate data is the permutation of original data
over replications (such that the temporal ordering of samples stays
intact). The function checks if this default can be used given the
requested number of permutations and the number of replications in the
data.
The function tries to set the setting 'permute_in_time' to its default
'False' if no value for 'permute_in_time' was provided by the user. If the
number of replications is insufficient to generate the requested number of
permutations, the function sets 'permute_in_time' to true such that
surrogates are created by permuting samples in time (if not requested
otherwise the 'perm_type' is set to 'random', see documentation of
Data().permute_samples() for further settings).
"""
analysis_setup.settings.setdefault("permute_in_time", False)
if not analysis_setup.settings["permute_in_time"] and not _sufficient_replications(
data, n_perm
):
print(
"\nWARNING: Number of replications is not sufficient to "
"generate the desired number of surrogates. Permuting samples "
"in time instead."
)
analysis_setup.settings["permute_in_time"] = True
if analysis_setup.settings["permute_in_time"]:
analysis_setup.settings.setdefault("perm_type", "random")
return analysis_setup.settings["permute_in_time"]