"""Provide results class for IDTxl network analysis."""
import copy as cp
import sys
import warnings
import numpy as np
from . import idtxl_utils as utils
warnings.simplefilter(action="ignore", category=FutureWarning)
MIN_INT = -sys.maxsize - 1 # minimum integer for initializing adj. matrix
[docs]class DotDict(dict):
"""Dictionary with dot-notation access to values.
Provides the same functionality as a regular dict, but also allows
accessing values using dot-notation.
Example:
>>> from idtxl.results import DotDict
>>> d = DotDict({'a': 1, 'b': 2})
>>> d.a
>>> # Out: 1
>>> d['a']
>>> # Out: 1
"""
__getattr__ = dict.get
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
def __dir__(self):
"""Return dictionary keys as list of attributes."""
return self.keys()
def __deepcopy__(self, memo):
"""Provide deep copy capabilities.
Following a fix described here:
https://github.com/aparo/pyes/pull/115/commits/d2076b385c38d6d00cebfe0df7b0d1ba8df934bc
"""
dot_dict_copy = DotDict(
[(cp.deepcopy(k, memo), cp.deepcopy(v, memo)) for k, v in self.items()]
)
return dot_dict_copy
def __getstate__(self):
# For pickling the object
return self
def __setstate__(self, state):
# For un-pickling the object
self.update(state)
# self.__dict__ = self
[docs]class AdjacencyMatrix:
"""Adjacency matrix representing inferred networks."""
def __init__(self, n_nodes, weight_type):
self.edge_matrix = np.zeros((n_nodes, n_nodes), dtype=bool)
self.weight_matrix = np.zeros((n_nodes, n_nodes), dtype=weight_type)
self.shape = self.weight_matrix.shape
if np.issubdtype(weight_type, np.integer):
self._weight_type = np.integer
elif np.issubdtype(weight_type, np.floating):
self._weight_type = np.floating
elif weight_type is bool:
self._weight_type = weight_type
else:
raise RuntimeError("Unknown weight data type {0}.".format(weight_type))
def __array__(self):
return self.weight_matrix
[docs] def n_nodes(self):
"""Return number of nodes."""
return self.edge_matrix.shape[0]
[docs] def n_edges(self):
return self.edge_matrix.sum()
[docs] def add_edge(self, i, j, weight):
"""Add weighted edge (i, j) to adjacency matrix."""
if not np.issubdtype(type(weight), self._weight_type):
raise TypeError(
"Can not add weight of type {0} to adjacency matrix of type "
"{1}.".format(type(weight), self._weight_type)
)
self.edge_matrix[i, j] = True
self.weight_matrix[i, j] = weight
[docs] def add_edge_list(self, i_list, j_list, weights):
"""Add multiple weighted edges (i, j) to adjacency matrix."""
if len(i_list) != len(j_list):
raise RuntimeError("Lists with edge indices must be of same length.")
if len(i_list) != len(weights):
raise RuntimeError("Edge weights must have same length as edge indices.")
for i, j, weight in zip(i_list, j_list, weights):
self.add_edge(i, j, weight)
[docs] def print_matrix(self):
"""Print weight and edge matrix."""
print(self.edge_matrix)
print(self.weight_matrix)
[docs] def get_edge_list(self):
"""Return list of weighted edges.
Returns
list of tuples
each entry represents one edge in the graph: (i, j, weight)
"""
edge_list = np.zeros(self.n_edges(), dtype=object) # list of tuples
ind = 0
for i in range(self.n_nodes()):
for j in range(self.n_nodes()):
if self.edge_matrix[i, j]:
edge_list[ind] = (i, j, self.weight_matrix[i, j])
ind += 1
return edge_list
[docs]class Results:
"""Parent class for results of network analysis algorithms.
Provide a container for results of network analysis algorithms, e.g.,
MultivariateTE or ActiveInformationStorage.
Attributes:
settings : dict
settings used for estimation of information theoretic measures and
statistical testing
data_properties : dict
data properties, contains
- n_nodes : int - total number of nodes in the network
- n_realisations : int - number of samples available for
analysis given the settings (e.g., a high maximum lag used in
network inference, results in fewer data points available for
estimation)
- normalised : bool - indicates if data were z-standardised
before the estimation
"""
def __init__(self, n_nodes, n_realisations, normalised):
self.settings = DotDict({})
self.data_properties = DotDict(
{
"n_nodes": n_nodes,
"n_realisations": n_realisations,
"normalised": normalised,
}
)
def _print_edge_list(self, adjacency_matrix, weights):
"""Print edge list to console."""
edge_list = adjacency_matrix.get_edge_list()
if edge_list.size > 0:
for e in edge_list:
if weights == "binary":
print("\t{0} -> {1}".format(e[0], e[1]))
else:
print("\t{0} -> {1}, {2}: {3}".format(e[0], e[1], weights, e[2]))
else:
print("No significant links found in the network.")
def _check_result(self, process, settings):
# Check if new result process is part of the network
if process > (self.data_properties.n_nodes - 1):
raise RuntimeError(
"Can not add single result - process {0} is not"
" in no. nodes in the data ({1}).".format(
process, self.data_properties.n_nodes
)
)
# Don't add duplicate processes
if self._is_duplicate_process(process):
raise RuntimeError(
"Can not add single result - results for target"
" or process {0} already exist.".format(process)
)
# Don't add results with conflicting settings
if utils.conflicting_entries(self.settings, settings):
raise RuntimeError(
"Can not add single result - analysis settings are not equal."
)
def _is_duplicate_process(self, process):
# Test if process is already present in object
if process in self._processes_analysed:
return True
else:
return False
[docs] def combine_results(self, *results):
"""Combine multiple (partial) results objects.
Combine a list of partial network analysis results into a single
results object (e.g., results from analysis parallelized over
processes). Raise an error if duplicate processes occur in partial
results, or if analysis settings are not equal.
Note that only conflicting settings cause an error (i.e., settings with
equal keys but different values). If additional settings are included
in partial results (i.e., settings with different keys) these settings
are added to the common settings dictionary.
Remove FDR-corrections from partial results before combining them. FDR-
correction performed on the basis of parts of the network is not valid
for the combined network.
Args:
results : list of Results objects
single process analysis results from .analyse_network or
.analyse_single_process methods, where each object contains
partial results for one or multiple processes
Returns:
dict
combined results object
"""
for r in results:
processes = r._processes_analysed
if utils.conflicting_entries(self.settings, r.settings):
raise RuntimeError(
"Can not combine results - analysis " "settings are not equal."
)
for p in processes:
# Remove potential partial FDR-corrected results. These are no
# longer valid for the combined network.
if self._is_duplicate_process(p):
raise RuntimeError(
"Can not combine results - results for "
"process {0} already exist.".format(p)
)
try:
del r.fdr_corrected
print("Removing FDR-corrected results.")
except AttributeError:
pass
try:
results_to_add = r._single_target[p]
except AttributeError:
try:
results_to_add = r._single_process[p]
except AttributeError:
raise AttributeError(
"Did not find any method attributes to combine "
"(.single_proces or ._single_target)."
)
self._add_single_result(p, results_to_add, r.settings)
[docs]class ResultsSingleProcessAnalysis(Results):
"""Store results of single process analysis.
Provide a container for the results of algorithms for the analysis of
individual processes (nodes) in a multivariate stochastic process,
e.g., estimation of active information storage.
Note that for convenience all dictionaries in this class can additionally
be accessed using dot-notation:
>>> res_network.settings.cmi_estimator
or
>>> res_network.settings['cmi_estimator'].
Attributes:
settings : dict
settings used for estimation of information theoretic measures and
statistical testing
data_properties : dict
data properties, contains
- n_nodes : int - total number of nodes in the network
- n_realisations : int - number of samples available for
analysis given the settings (e.g., a high maximum lag used in
network inference, results in fewer data points available for
estimation)
- normalised : bool - indicates if data were z-standardised
before estimation
processes_analysed : list
list of analysed processes
"""
def __init__(self, n_nodes, n_realisations, normalised):
super().__init__(n_nodes, n_realisations, normalised)
self.processes_analysed = []
self._single_process = {}
self._single_process_fdr = DotDict()
@property
def processes_analysed(self):
"""Get index of the current_value."""
return self._processes_analysed
@processes_analysed.setter
def processes_analysed(self, processes):
self._processes_analysed = processes
def _add_single_result(self, process, results, settings):
"""Add analysis result for a single process."""
self._check_result(process, settings)
self.settings.update(DotDict(settings))
self._single_process[process] = DotDict(results)
self.processes_analysed = list(self._single_process.keys())
def _add_fdr(self, fdr, alpha=None, constant=None):
"""Add settings and results of FDR correction."""
# Add settings of FDR-correction
self.settings["alpha_fdr"] = alpha
self.settings["fdr_constant"] = constant
# Add results of FDR-correction. FDR-correction can be None if
# correction is impossible due to the number of permutations in
# individual analysis being too low to allow for individual p-values
# to reach the FDR-thresholds. Add empty results in that case.
if fdr is None:
self._single_process_fdr = DotDict()
else:
self._single_process_fdr = DotDict(fdr)
[docs] def get_single_process(self, process, fdr=True):
"""Return results for a single process in the network.
Return results for individual processes, contains for each process
- ais : float - AIS-value for current process
- ais_pval : float - p-value of AIS estimate
- ais_sign : bool - significance of AIS estimate wrt. to the
alpha_mi specified in the settings
- selected_var : list of tuples - variables with significant
information about the current value of the process that have
been added to the processes past state, a variable is
described by the index of the process in the data and its lag
in samples
- current_value : tuple - current value used for analysis,
described by target and sample index in the data
Setting fdr to True returns FDR-corrected results (Benjamini, 1995).
Args:
process : int
process id
fdr : bool [optional]
return FDR-corrected results, see documentation of network
inference algorithms and stats.network_fdr (default=True)
Returns:
dict
results for single process. Note that for convenience
dictionary entries can either be accessed via keywords
(result['selected_vars']) or via dot-notation
(result.selected_vars).
"""
# Return required key from required _single_process dictionary, dealing
# with the FDR at a high level
if process not in self.processes_analysed:
raise RuntimeError(f"No results for process {process}.")
if fdr:
try:
return self._single_process_fdr[process]
except AttributeError:
raise RuntimeError(
f"No FDR-corrected results for process {process}. Set fdr=False for uncorrected results."
)
except KeyError:
raise RuntimeError(
f"No FDR-corrected results for process {process}. Set fdr=False for uncorrected results."
)
else:
try:
return self._single_process[process]
except AttributeError:
raise RuntimeError("No results have been added.")
except KeyError:
raise RuntimeError("No results for process {0}.".format(process))
[docs] def get_significant_processes(self, fdr=True):
"""Return statistically-significant processes.
Indicates for each process whether AIS is statistically significant
(equivalent to the adjacency matrix returned for network inference)
Args:
fdr : bool [optional]
return FDR-corrected results, see documentation of network
inference algorithms and stats.network_fdr (default=True)
Returns:
numpy array
Statistical significance for each process
"""
significant_processes = np.array(
[
self.get_single_process(process=p, fdr=fdr)["ais_sign"]
for p in self.processes_analysed
],
dtype=bool,
)
return significant_processes
[docs]class ResultsNetworkAnalysis(Results):
def __init__(self, n_nodes, n_realisations, normalised):
super().__init__(n_nodes, n_realisations, normalised)
self._single_target = {}
self.targets_analysed = []
@property
def targets_analysed(self):
"""Get index of the current_value."""
return self._processes_analysed
@targets_analysed.setter
def targets_analysed(self, targets):
self._processes_analysed = targets
def _add_single_result(self, target, results, settings):
"""Add analysis result for a single target."""
self._check_result(target, settings)
# Add results
self.settings.update(DotDict(settings))
self._single_target[target] = DotDict(results)
self.targets_analysed = list(self._single_target.keys())
[docs] def get_single_target(self, target, fdr=True):
"""Return results for a single target in the network.
Return results for individual processes, contains for each process
Results for single targets include for each target
- omnibus_te : float - TE-value for joint information transfer from all
sources into the target
- omnibus_pval : float - p-value of omnibus information transfer into
the target
- omnibus_sign : bool - significance of omnibus information transfer
wrt. to the alpha_omnibus specified in the settings
- selected_vars_sources : list of tuples - source variables with
significant information about the current value
- selected_vars_target : list of tuples - target variables with
significant information about the current value
- selected_sources_pval : array of floats - p-value for each selected
variable
- selected_sources_te : array of floats - TE-value for each selected
variable
- sources_tested : list of int - list of sources tested for the current
target
- current_value : tuple - current value used for analysis, described by
target and sample index in the data
Setting fdr to True returns FDR-corrected results (Benjamini, 1995).
Args:
target : int
target id
fdr : bool [optional]
return FDR-corrected results, see documentation of network
inference algorithms and stats.network_fdr (default=True)
Returns:
dict
Results for single target. Note that for convenience
dictionary entries can either be accessed via keywords
(result['selected_vars_sources']) or via dot-notation
(result.selected_vars_sources).
"""
if target not in self.targets_analysed:
raise RuntimeError("No results for target {0}.".format(target))
if fdr:
try:
return self._single_target_fdr[target]
except AttributeError:
raise RuntimeError(
"No FDR-corrected results have been added. Set"
" "
"fdr=False"
" to see uncorrected results."
)
except KeyError:
raise RuntimeError(
"No FDR-corrected results for target {0}. Set"
" "
"fdr=False"
" to see uncorrected results.".format(target)
)
else:
try:
return self._single_target[target]
except AttributeError:
raise RuntimeError("No results have been added.")
except KeyError:
raise RuntimeError("No results for target {0}.".format(target))
[docs] def get_target_sources(self, target, fdr=True):
"""Return list of sources (parents) for given target.
Args:
target : int
target index
fdr : bool [optional]
if True, sources are returned for FDR-corrected results
(default=True)
"""
v = self.get_single_target(target, fdr)["selected_vars_sources"]
return np.unique(np.array([s[0] for s in v]))
[docs]class ResultsNetworkInference(ResultsNetworkAnalysis):
"""Store results of network inference.
Provide a container for results of network inference algorithms, e.g.,
MultivariateTE or Bivariate TE.
Note that for convenience all dictionaries in this class can additionally
be accessed using dot-notation:
>>> res_network.settings.cmi_estimator
or
>>> res_network.settings['cmi_estimator'].
Attributes:
settings : dict
settings used for estimation of information theoretic measures and
statistical testing
data_properties : dict
data properties, contains
- n_nodes : int - total number of nodes in the network
- n_realisations : int - number of samples available for
analysis given the settings (e.g., a high maximum lag used in
network inference, results in fewer data points available for
estimation)
- normalised : bool - indicates if data were z-standardised
before estimation
targets_analysed : list
list of analysed targets
"""
def __init__(self, n_nodes, n_realisations, normalised):
super().__init__(n_nodes, n_realisations, normalised)
self._single_target_fdr = DotDict()
def _add_fdr(self, fdr, alpha=None, correct_by_target=None, constant=None):
"""Add settings and results of FDR correction."""
# Add settings of FDR-correction
self.settings["alpha_fdr"] = alpha
self.settings["fdr_correct_by_target"] = correct_by_target
self.settings["fdr_constant"] = constant
# Add results of FDR-correction. FDR-correction can be None if
# correction is impossible due to the number of permutations in
# individual analysis being too low to allow for individual p-values
# to reach the FDR-thresholds. Add empty results in that case.
if fdr is None:
self._single_target_fdr = DotDict()
else:
self._single_target_fdr = DotDict(fdr)
def _get_inference_measure(self, target):
if "selected_sources_te" in self._single_target[target]:
return self._single_target[target].selected_sources_te
elif "selected_sources_mi" in self._single_target[target]:
return self._single_target[target].selected_sources_mi
else:
raise KeyError(
"No entry with network inference measure found for " "current target"
)
[docs] def get_source_variables(self, fdr=True):
"""Return list of inferred past source variables for all targets.
Return a list of dictionaries, where each dictionary holds the selected
past source variables for one analysed target. The list may be used as
and input to significant subgraph mining in the postprocessing module.
Args:
fdr : bool [optional]
return FDR-corrected results (default=True)
Returns:
list of dicts
selected past source variables for each target
"""
source_variables = []
for target in self.targets_analysed:
source_variables.append(
{
"target": target,
"selected_vars_sources": self.get_single_target(
target=target, fdr=fdr
)["selected_vars_sources"],
}
)
return source_variables
[docs] def get_target_delays(self, target, criterion="max_te", fdr=True):
"""Return list of information-transfer delays for a given target.
Return a list of information-transfer delays for a given target.
Information-transfer delays are determined by the lag of the variable
in a source past that has the highest information transfer into the
target process. There are two ways of identifying the variable with
maximum information transfer:
a) use the variable with the highest absolute TE value (highest
information transfer),
b) use the variable with the smallest p-value (highest statistical
significance).
Args:
target : int
target index
criterion : str [optional]
use maximum TE value ('max_te') or p-value ('max_p') to
determine the source-target delay (default='max_te')
fdr : bool [optional]
return FDR-corrected results (default=True)
Returns:
numpy array
information-transfer delays for each source
"""
sources = self.get_target_sources(target=target, fdr=fdr)
delays = np.zeros(sources.shape[0]).astype(int)
# Get the source index for each past source variable of the target
all_vars_sources = np.array(
[
x[0]
for x in self.get_single_target(target=target, fdr=fdr)[
"selected_vars_sources"
]
]
)
# Get the lag for each past source variable of the target
all_vars_lags = np.array(
[
x[1]
for x in self.get_single_target(target=target, fdr=fdr)[
"selected_vars_sources"
]
]
)
# Get p-values and TE-values for past source variable
pval = self.get_single_target(target=target, fdr=fdr)["selected_sources_pval"]
measure = self._get_inference_measure(target)
# Find delay for each source
for ind, s in enumerate(sources):
if criterion == "max_p":
# Find the minimum p-value amongst the variables in source s
delays_ind = np.argmin(pval[all_vars_sources == s])
elif criterion == "max_te":
# Find the maximum TE-value amongst the variables in source s
delays_ind = np.argmax(measure[all_vars_sources == s])
delays[ind] = all_vars_lags[all_vars_sources == s][delays_ind]
return delays
[docs] def get_adjacency_matrix(self, weights, fdr=True):
"""Return adjacency matrix.
Return adjacency matrix resulting from network inference. The adjacency
matrix can either be generated from FDR-corrected results or
uncorrected results. Multiple options for the weight are available.
Args:
weights : str
can either be
- 'max_te_lag': the weights represent the source -> target
lag corresponding to the maximum tranfer entropy value
(see documentation for method get_target_delays for details)
- 'max_p_lag': the weights represent the source -> target
lag corresponding to the maximum p-value
(see documentation for method get_target_delays for details)
- 'vars_count': the weights represent the number of
statistically-significant source -> target lags
- 'binary': return unweighted adjacency matrix with binary
entries
- 1 = significant information transfer;
- 0 = no significant information transfer.
fdr : bool [optional]
return FDR-corrected results (default=True)
Returns:
AdjacencyMatrix instance
"""
adjacency_matrix = AdjacencyMatrix(self.data_properties.n_nodes, int)
if weights == "max_te_lag":
for t in self.targets_analysed:
sources = self.get_target_sources(target=t, fdr=fdr)
delays = self.get_target_delays(target=t, criterion="max_te", fdr=fdr)
adjacency_matrix.add_edge_list(
sources, np.ones(len(sources), dtype=int) * t, delays
)
elif weights == "max_p_lag":
for t in self.targets_analysed:
sources = self.get_target_sources(target=t, fdr=fdr)
delays = self.get_target_delays(target=t, criterion="max_p", fdr=fdr)
adjacency_matrix.add_edge_list(
sources, np.ones(len(sources), dtype=int) * t, delays
)
elif weights == "vars_count":
for t in self.targets_analysed:
single_result = self.get_single_target(target=t, fdr=fdr)
sources = np.zeros(len(single_result.selected_vars_sources))
weights = np.zeros(len(single_result.selected_vars_sources))
for i, s in enumerate(single_result.selected_vars_sources):
sources[i] = s[0]
weights[i] += 1
adjacency_matrix.add_edge_list(
sources, np.ones(len(sources), dtype=int) * t, weights
)
elif weights == "binary":
for t in self.targets_analysed:
single_result = self.get_single_target(target=t, fdr=fdr)
sources = np.zeros(len(single_result.selected_vars_sources), dtype=int)
weights = np.zeros(len(single_result.selected_vars_sources), dtype=int)
for i, s in enumerate(single_result.selected_vars_sources):
sources[i] = s[0]
weights[i] = 1
adjacency_matrix.add_edge_list(
sources, np.ones(len(sources), dtype=int) * t, weights
)
else:
raise RuntimeError("Invalid weights value")
return adjacency_matrix
[docs] def print_edge_list(self, weights, fdr=True):
"""Print results of network inference to console.
Print edge list resulting from network inference to console.
Output may look like this:
>>> 0 -> 1, max_te_lag = 2
>>> 0 -> 2, max_te_lag = 3
>>> 0 -> 3, max_te_lag = 2
>>> 3 -> 4, max_te_lag = 1
>>> 4 -> 3, max_te_lag = 1
The edge list can either be generated from FDR-corrected results
or uncorrected results. Multiple options for the weight
are available (see documentation of method get_adjacency_matrix for
details).
Args:
weights : str
link weights (see documentation of method get_adjacency_matrix
for details)
fdr : bool [optional]
return FDR-corrected results (default=True)
"""
adjacency_matrix = self.get_adjacency_matrix(weights=weights, fdr=fdr)
self._print_edge_list(adjacency_matrix, weights=weights)
[docs]class ResultsPID(ResultsNetworkAnalysis):
"""Store results of Partial Information Decomposition (PID) analysis.
Provide a container for results of Partial Information Decomposition (PID)
algorithms.
Note that for convenience all dictionaries in this class can additionally
be accessed using dot-notation:
>>> res_pid._single_target[2].source_1
or
>>> res_pid._single_target[2].['source_1'].
Attributes:
settings : dict
settings used for estimation of information theoretic measures and
statistical testing
data_properties : dict
data properties, contains
- n_nodes : int - total number of nodes in the network
- n_realisations : int - number of samples available for
analysis given the settings (e.g., a high maximum lag used in
network inference, results in fewer data points available for
estimation)
- normalised : bool - indicates if data were z-standardised
before the estimation
targets_analysed : list
list of analysed targets
"""
def __init__(self, n_nodes, n_realisations, normalised):
super().__init__(n_nodes, n_realisations, normalised)
[docs] def get_single_target(self, target):
"""Return results for a single target in the network.
Results for single targets include for each target
- source_1 : tuple - source variable 1
- source_2 : tuple - source variable 2
- selected_vars_sources : list of tuples - source variables used in PID
estimation
- s1_unq : float - unique information in source 1
- s2_unq : float - unique information in source 2
- syn_s1_s2 : float - synergistic information in sources 1 and 2
- shd_s1_s2 : float - shared information in sources 1 and 2
- current_value : tuple - current value used for analysis, described by
target and sample index in the data
- [estimator-specific settings]
Args:
target : int
target id
Returns:
dict
Results for single target. Note that for convenience
dictionary entries can either be accessed via keywords
(result['selected_vars_sources']) or via dot-notation
(result.selected_vars_sources).
"""
return super(ResultsPID, self).get_single_target(target, fdr=False)
[docs]class ResultsMultivariatePID(ResultsNetworkAnalysis):
"""Store results of Multivariate Partial Information Decomposition (PID)
analysis.
Provide a container for results of Multivariate Partial Information
Decomposition (PID) algorithms.
Note that for convenience all dictionaries in this class can additionally
be accessed using dot-notation:
>>> res_pid._single_target[2].source_1
or
>>> res_pid._single_target[2].['source_1'].
Attributes:
settings : dict
settings used for estimation of information theoretic measures and
statistical testing
data_properties : dict
data properties, contains
- n_nodes : int - total number of nodes in the network
- n_realisations : int - number of samples available for
analysis given the settings (e.g., a high maximum lag used in
network inference, results in fewer data points available for
estimation)
- normalised : bool - indicates if data were z-standardised
before the estimation
targets_analysed : list
list of analysed targets
"""
def __init__(self, n_nodes, n_realisations, normalised):
super().__init__(n_nodes, n_realisations, normalised)
[docs] def get_single_target(self, target):
"""Return results for a single target in the network.
Results for single targets include for each target
- source_i : tuple - source variable i
- selected_vars_sources : list of tuples - source variables used in PID
estimation
- avg : dict - avg pid {alpha -> float} where alpha is a redundancy
lattice node
- ptw : dict of dicts - ptw pid {rlz -> {alpha -> float} } where rlz is
a single realisation of the random variables and alpha is a redundancy
lattice node
- current_value : tuple - current value used for analysis, described by
target and sample index in the data
- [estimator-specific settings]
Args:
target : int
target id
Returns:
dict
Results for single target. Note that for convenience
dictionary entries can either be accessed via keywords
(result['selected_vars_sources']) or via dot-notation
(result.selected_vars_sources).
"""
return super(ResultsMultivariatePID, self).get_single_target(target, fdr=False)
[docs]class ResultsNetworkComparison(ResultsNetworkAnalysis):
"""Store results of network comparison.
Provide a container for results of network comparison algorithms.
Note that for convenience all dictionaries in this class can additionally
be accessed using dot-notation: res_network.settings.cmi_estimator
or res_network.settings['cmi_estimator'].
Attributes:
settings : dict
settings used for estimation of information theoretic measures and
statistical testing
data_properties : dict
data properties, contains
- n_nodes : int - total number of nodes in the network
- n_realisations : int - number of samples available for
analysis given the settings (e.g., a high maximum lag used in
network inference, results in fewer data points available for
estimation)
- normalised : bool - indicates if data were z-standardised
before the estimation
surrogate_distribution : dict
for each target, surrogate distributions used for testing of each
link into the target
targets_analysed : list
list of analysed targets
ab : dict
for each target, list of comparison results for all links into the
target; True if link in condition A > link in condition B
pval : dict
for each target, list of p-values for all compared links
cmi_diff_abs : dict
for each target, list of absolute difference in interaction measure
for all compared links
data_properties : dict
information regarding the data used for analysis
settings : dict
settings used for comparison
"""
def __init__(self, n_nodes, n_realisations, normalised):
super().__init__(n_nodes, n_realisations, normalised)
def _add_results(self, union_network, results, settings):
# Check if results have already been added to this instance.
if self.settings:
raise RuntimeWarning("Overwriting existing results.")
# Add results
self.settings = DotDict(settings)
self.targets_analysed = union_network["targets_analysed"]
for t in self.targets_analysed:
self._single_target[t] = DotDict(union_network._single_target[t])
# self.max_lag = union_network['max_lag']
self.surrogate_distributions = results["cmi_surr"]
self.ab = results["a>b"]
self.cmi_diff_abs = results["cmi_diff_abs"]
self.pval = results["pval"]
[docs] def get_adjacency_matrix(self, weights="comparison"):
"""Return adjacency matrix.
Return adjacency matrix resulting from network inference.
Multiple options for the weights are available.
Args:
weights : str [optional]
can either be
- 'union': all links in the union network, i.e., all
links that were tested for a difference
or return information for links with a significant difference
- 'comparison': True for links with a significant difference in
inferred effective connectivity (default)
- 'pvalue': absolute differences in inferred effective
connectivity for significant links
- 'diff_abs': absolute difference
Returns:
AdjacencyMatrix instance
"""
# Note: right now, the network comparison work on the uncorrected
# networks only. This may have to change in the future, in which case
# the value for 'fdr' when accessing single target results or adjacency
# matrices has to be taken from the analysis settings.
if weights == "comparison":
adjacency_matrix = AdjacencyMatrix(self.data_properties.n_nodes, int)
for t in self.targets_analysed:
sources = self.get_target_sources(t)
for i, s in enumerate(sources):
adjacency_matrix.add_edge(s, t, int(self.ab[t][i]))
elif weights == "union":
adjacency_matrix = AdjacencyMatrix(self.data_properties.n_nodes, int)
for t in self.targets_analysed:
sources = self.get_target_sources(t)
adjacency_matrix.add_edge_list(
sources,
np.ones(len(sources), dtype=int) * t,
np.ones(len(sources), dtype=int),
)
elif weights == "diff_abs":
adjacency_matrix = AdjacencyMatrix(self.data_properties.n_nodes, float)
for t in self.targets_analysed:
sources = self.get_target_sources(t)
for i, s in enumerate(sources):
print(self.cmi_diff_abs)
adjacency_matrix.add_edge(s, t, self.cmi_diff_abs[t][i])
elif weights == "pvalue":
adjacency_matrix = AdjacencyMatrix(self.data_properties.n_nodes, float)
for t in self.targets_analysed:
sources = self.get_target_sources(t)
for i, s in enumerate(sources):
adjacency_matrix.add_edge(s, t, self.pval[t][i])
else:
raise RuntimeError("Invalid weights value")
# self._print_edge_list(adjacency_matrix, weights=weights)
return adjacency_matrix
[docs] def print_edge_list(self, weights="comparison"):
"""Print results of network comparison to console.
Print results of network comparison to console. Output looks like this:
>>> 0 -> 1, diff_abs = 0.2
>>> 0 -> 2, diff_abs = 0.5
>>> 0 -> 3, diff_abs = 0.7
>>> 3 -> 4, diff_abs = 1.3
>>> 4 -> 3, diff_abs = 0.4
indicating differences in the network inference measure for a link
source -> target.
Args:
weights : str [optional]
weights for the adjacency matrix (see documentation of method
get_adjacency_matrix for details)
"""
adjacency_matrix = self.get_adjacency_matrix(weights=weights)
self._print_edge_list(adjacency_matrix, weights=weights)
[docs] def get_single_target(self, target):
"""Return results for a single target in the network.
Results for single targets include for each target
- sources : list of ints - list of sources inferred for the current
target (union of sources from both data sets entering the comparison)
- selected_vars_sources : list of tuples - source variables with
significant information about the current value (union of both
conditions)
- selected_vars_target : list of tuples - target variables with
significant information about the current value (union of both
conditions)
Args:
target : int
target id
Returns:
dict
Results for single target. Note that for convenience
dictionary entries can either be accessed via keywords
(result['selected_vars_sources']) or via dot-notation
(result.selected_vars_sources).
"""
return super(ResultsNetworkComparison, self).get_single_target(
target, fdr=False
)
[docs] def get_target_sources(self, target):
"""Return list of sources (parents) for given target.
Args:
target : int
target index
"""
v = self.get_single_target(target)["selected_vars_sources"]
return np.unique(np.array([s[0] for s in v]))
[docs]class ResultsSingleProcessRudelt:
"""Store results of single process analysis.
Provides a container for the results Rudelt optimization algorithm. To
obtain results for individual processes, call the .get_single_process()
method (see docstring for details).
Note that for convenience all dictionaries in this class can additionally
be accessed using dot-notation:
>>> res_network.settings.estimation_method
or
>>> res_network.settings['estimation_method'].
Attributes:
settings : dict
settings used for estimation of information theoretic measures
data_properties : dict
data properties, contains
- n_processes : int - total number of processes analysed
processes_analysed : list
list of analysed processes
"""
def __init__(self, processes):
self.settings = DotDict({})
self.data_properties = DotDict({"n_processes": len(processes)})
self.processes_analysed = np.zeros(shape=3, dtype=int)
self._single_process = {}
for ii in processes:
self._single_process[ii] = {}
@property
def processes_analysed(self):
"""Get index of the current_value."""
return self._processes_analysed
@processes_analysed.setter
def processes_analysed(self, processes):
self._processes_analysed = processes
def _add_single_result(self, process_count, process, results, settings):
"""Add analysis result for a single process."""
# self._check_result(process, settings)
self.settings.update(DotDict(settings))
self._single_process[process] = DotDict(results)
self.processes_analysed[
process_count
] = process # list(self._single_process.keys())
[docs] def get_single_process(self, process):
"""Return results for a single process.
Return results for individual processes, contains for each process
Args:
process : int
process id
Returns:
dict
results for single process. Note that for convenience
dictionary entries can either be accessed via keywords
(result['selected_vars']) or via dot-notation
(result.selected_vars). Contains keys
- Process : int
Process that was optimized
- estimation_method : String
Estimation method that was used for optimization
- T_D : float
Estimated optimal value for the temporal depth TD
- tau_R :
Information timescale tau_R, a characteristic timescale of history
dependence similar to an autocorrelation time.
- R_tot : float
Estimated value for the total history dependence Rtot,
- AIS_tot : float
Estimated value for the total active information storage
- opt_number_of_bins_d : int
Number of bins d for the embedding that yields (R̂tot ,T̂D)
- opt_scaling_k : int
Scaling exponent κ for the embedding that yields (R̂tot , T̂D)
- opt_first_bin_size : int
Size of the first bin τ1 for the embedding that yields (R̂tot , T̂D ),
- history_dependence : array with floating-point values
Estimated history dependence for each embedding
- firing_rate : float
Firing rate of the neuron/ spike train
- recording_length : float
Length of the recording (in seconds)
- H_spiking : float
Entropy of the spike times
if analyse_auto_MI was set to True additionally:
- auto_MI : dict
numpy array of MI values for each delay
- auto_MI_delays : list of int
list of delays depending on the given auto_MI_bin_sizes and auto_MI_max_delay
"""
# Return required key from required _single_process dictionary
if process not in self.processes_analysed:
raise RuntimeError("No results for process {0}.".format(process))
try:
return self._single_process[process]
except AttributeError:
raise RuntimeError("No results have been added.")
except KeyError:
raise RuntimeError("No results for process {0}.".format(process))