Source code for idtxl.active_information_storage

"""Analysis of AIS in a network of processes.

Analysis of active information storage (AIS) in individual processes of a
network. The algorithm uses non-uniform embedding as described in Faes (2011).

Note:
    Written for Python 3.4+
"""
import numpy as np
from . import stats
from .single_process_analysis import SingleProcessAnalysis
from .results import ResultsSingleProcessAnalysis
from . import idtxl_exceptions as ex


[docs]class ActiveInformationStorage(SingleProcessAnalysis): """Estimate active information storage in individual processes. Estimate active information storage (AIS) in individual processes of the network. To perform AIS estimation call analyse_network() on the whole network or a set of nodes or call analyse_single_process() to estimate AIS for a single process. See docstrings of the two functions for more information. References: - Lizier, J. T., Prokopenko, M., & Zomaya, A. Y. (2012). Local measures of information storage in complex distributed computation. Inform Sci, 208, 39–54. http://doi.org/10.1016/j.ins.2012.04.016 - Wibral, M., Lizier, J. T., Vögler, S., Priesemann, V., & Galuske, R. (2014). Local active information storage as a tool to understand distributed neural information processing. Front Neuroinf, 8, 1. http://doi.org/10.3389/fninf.2014.00001 - Faes, L., Nollo, G., & Porta, A. (2011). Information-based detection of nonlinear Granger causality in multivariate processes via a nonuniform embedding technique. Phys Rev E, 83, 1–15. http://doi.org/10.1103/PhysRevE.83.051112 Attributes: process_set : list list with indices of analyzed processes settings : dict analysis settings current_value : tuple index of the current value in AIS estimation, (idx process, idx sample) selected_vars_full : list of tuples samples in the past state, (idx process, idx sample) ais : float raw AIS value sign : bool true if AIS is significant pvalue: float p-value of AIS """ def __init__(self): self.process = None self.pvalue = None self.sign = False self.ais = None super().__init__()
[docs] def analyse_network(self, settings, data, processes="all"): """Estimate active information storage for multiple network processes. Estimate active information storage for all or a subset of processes in the network. Note: For a detailed description of the algorithm and settings see documentation of the analyse_single_process() method and references in the class docstring. Example: >>> data = Data() >>> data.generate_mute_data(100, 5) >>> settings = { >>> 'cmi_estimator': 'JidtKraskovCMI', >>> 'n_perm_max_stat': 200, >>> 'n_perm_min_stat': 200, >>> 'max_lag': 5, >>> 'tau': 1 >>> } >>> processes = [1, 2, 3] >>> network_analysis = ActiveInformationStorage() >>> results = network_analysis.analyse_network(settings, data, >>> processes) Args: settings : dict parameters for estimation and statistical testing, see documentation of analyse_single_target() for details, settings can further contain - verbose : bool [optional] - toggle console output (default=True) - fdr_correction : bool [optional] - correct results on the network level, see documentation of stats.ais_fdr() for details (default=True) data : Data instance raw data for analysis processes : list of int | 'all' index of processes (default='all'); if 'all', AIS is estimated for all processes; if list of int, AIS is estimated for processes specified in the list. Returns: ResultsSingleProcessAnalysis instance results of network AIS estimation, see documentation of ResultsSingleProcessAnalysis() """ # Set defaults for AIS estimation. settings.setdefault("verbose", True) settings.setdefault("fdr_correction", True) # Check provided processes for analysis. if processes == "all": processes = list(range(data.n_processes)) if isinstance(processes, list) and isinstance(processes[0], int): pass else: raise ValueError(f"Processes were not specified correctly: {processes}.") # Check and set defaults for checkpointing. self.settings = self._set_checkpointing_defaults(settings, data, [], processes) # Perform AIS estimation for each target individually. results = ResultsSingleProcessAnalysis( n_nodes=data.n_processes, n_realisations=data.n_realisations(), normalised=data.normalise, ) for t, process in enumerate(processes): if settings["verbose"]: print(f"\n####### analysing process {process} of {processes}") res_single = self.analyse_single_process(settings, data, process) results.combine_results(res_single) # Get no. realisations actually used for estimation from single target # analysis. results.data_properties.n_realisations = ( res_single.data_properties.n_realisations ) # Perform FDR-correction on the network level. Add FDR-corrected # results as an extra field. Network_fdr/combine_results internally # creates a deep copy of the results. if settings["fdr_correction"]: results = stats.ais_fdr(settings, results) return results
[docs] def analyse_single_process(self, settings, data, process): """Estimate active information storage for a single process. Estimate active information storage for one process in the network. Uses non-uniform embedding found through information maximisation. This is done in three steps (see Lizier and Faes for details): (1) Find all relevant samples in the processes' own past, by iteratively adding candidate samples that have significant conditional mutual information (CMI) with the current value (conditional on all samples that were added previously) (2) Prune the final conditional set by testing the CMI between each sample in the final set and the current value, conditional on all other samples in the final set (3) Calculate AIS using the final set of candidates as the past state (calculate MI between samples in the past and the current value); test for statistical significance using a permutation test Note: For a further description of the algorithm see references in the class docstring. Args: settings : dict parameters for estimator use and statistics: - cmi_estimator : str - estimator to be used for CMI and MI calculation (for estimator settings see the documentation in the estimators_* modules) - max_lag : int - maximum temporal search depth for candidates in the processes' past in samples - tau : int [optional] - spacing between candidates in the sources' past in samples (default=1) - n_perm_* : int [optional] - number of permutations, where * can be 'max_stat', 'min_stat', 'mi' (default=500) - alpha_* : float [optional] - critical alpha level for statistical significance, where * can be 'max_stat', 'min_stat', 'mi' (default=0.05) - add_conditionals : list of tuples | str [optional] - force the estimator to add these conditionals when estimating TE; can either be a list of variables, where each variable is described as (idx process, lag wrt to current value) or can be a string: 'faes' for Faes-Method (see references) - permute_in_time : bool [optional] - force surrogate creation by shuffling realisations in time instead of shuffling replications; see documentation of Data.permute_samples() for further settings (default=False) - verbose : bool [optional] - toggle console output (default=True) - write_ckp : bool [optional] - enable checkpointing, writes analysis state to disk every time a variable is selected; resume crashed analysis using network_analysis.resume_checkpoint() (default=False) - filename_ckp : str [optional] - checkpoint file name (without extension) (default='./idtxl_checkpoint') data : Data instance raw data for analysis process : int index of process Returns: ResultsSingleProcessAnalysis instance results of AIS estimation, see documentation of ResultsSingleProcessAnalysis() """ # Check input and clean up object if it was used before. self._initialise(settings, data, process) # Main algorithm. print("\n---------------------------- (1) include candidates") self._include_process_candidates(data) print("\n---------------------------- (2) prune source candidates") self._prune_candidates(data) print("\n---------------------------- (3) final statistics") self._test_final_conditional(data) # Clean up and return results. if self.settings["verbose"]: print( f"final conditional samples: {self._idx_to_lag(self.selected_vars_full)}" ) results = ResultsSingleProcessAnalysis( n_nodes=data.n_processes, n_realisations=data.n_realisations(self.current_value), normalised=data.normalise, ) results._add_single_result( process=self.process, settings=self.settings, results={ "current_value": self.current_value, "selected_vars": self._idx_to_lag(self.selected_vars_full), "ais": self.ais, "ais_pval": self.pvalue, "ais_sign": self.sign, }, ) self._reset() # remove realisations and min_stats surrogate table return results
def _initialise(self, settings, data, process): """Check input, set initial or default values for analysis settings.""" # Check analysis settings and set defaults. self.settings = settings.copy() self.settings.setdefault("verbose", True) self.settings.setdefault("add_conditionals", None) self.settings.setdefault("tau", 1) self.settings.setdefault("local_values", False) if not isinstance(self.settings["max_lag"], int) or ( self.settings["max_lag"] < 0 ): raise RuntimeError("max_lag has to be an integer >= 0.") if not isinstance(self.settings["tau"], int) or self.settings["tau"] <= 0: raise RuntimeError("tau has to be an integer > 0.") if self.settings["tau"] > self.settings["max_lag"]: raise RuntimeError( f"tau ({self.settings['tau']}) has to be equal to or smaller than max_lag " f"({self.settings['max_lag']})." ) # Set CMI estimator. self._set_cmi_estimator() # Initialise class attributes. self._min_stats_surr_table = None # Check process to be analysed. if not isinstance(process, int) or process < 0: raise RuntimeError( f"The index of the process ({process}) has to be an int >= 0." ) if process > data.n_processes: raise RuntimeError( f"Trying to analyse process with index {process}, which greater than the number " f"of processes in the data ({data.n_processes})." ) self.process = process # Check provided search depths for source and target assert data.n_samples >= self.settings["max_lag"] + 1, ( f"Not enough samples in data ({data.n_samples}) to allow for the chosen maximum lag " f"({self.settings['max_lag']})" ) self.current_value = (process, self.settings["max_lag"]) [cv_realisation, repl_idx] = data.get_realisations( current_value=self.current_value, idx_list=[self.current_value] ) self._current_value_realisations = cv_realisation # Remember which realisations come from which replication. This may be # needed for surrogate creation at a later point. self._replication_index = repl_idx # Check the permutation type and no. permutations requested by the # user. This tests if there is sufficient data to do all tests. # surrogates.check_permutations(self, data) # Check and set defaults for checkpointing. self.settings = self._set_checkpointing_defaults( self.settings, data, [], process ) # Reset all attributes to inital values if the instance has been used # before. if self.selected_vars_full: self.selected_vars_full = [] self.selected_vars_sources = [] self._selected_vars_realisations = None self.pvalue = None self.sign = False self.ais = None self._min_stats_surr_table = None # Check if the user provided a list of candidates that must go into # the conditioning set. These will be added and used for TE estimation, # but never tested for significance. if self.settings["add_conditionals"] is not None: self._force_conditionals(self.settings["add_conditionals"], data) def _include_process_candidates(self, data): """Test candidates in the process's past.""" process = [self.process] samples = np.arange( self.current_value[1] - 1, self.current_value[1] - self.settings["max_lag"] - 1, -self.settings["tau"], ) candidates = self._define_candidates(process, samples) print(candidates) self._include_candidates(candidates, data) def _include_candidates(self, candidate_set, data): """Include informative candidates into the conditioning set. Loop over each candidate in the candidate set and test if it has significant mutual information with the current value, conditional on all samples that were informative in previous rounds and are already in the conditioning set. If this conditional mutual information is significant using maximum statistics, add the current candidate to the conditional set. Args: candidate_set : list of tuples candidate set to be tested, where each entry is a tuple (process index, sample index) data : Data instance raw data Returns: bool True if a significant variable was found in the process's past. """ success = False if self.settings["verbose"]: print(f"testing candidate set: {self._idx_to_lag(candidate_set)}") while candidate_set: # Get realisations for all candidates. cand_real = data.get_realisations(self.current_value, candidate_set)[0] cand_real = cand_real.T.reshape(cand_real.size, 1) # Calculate the (C)MI for each candidate and the target. try: temp_te = self._cmi_estimator.estimate_parallel( n_chunks=len(candidate_set), re_use=["var2", "conditional"], var1=cand_real, var2=self._current_value_realisations, conditional=self._selected_vars_realisations, ) except ex.AlgorithmExhaustedError as aee: # The algorithm cannot continue here, so # we'll terminate the search for more candidates, # though those identified already remain valid print( "AlgorithmExhaustedError encountered in " "estimations: " + aee.message ) print("Halting current estimation set.") # For now we don't need a stack trace: # traceback.print_tb(aee.__traceback__) break # Test max CMI for significance with maximum statistics. te_max_candidate = max(temp_te) max_candidate = candidate_set[np.argmax(temp_te)] if self.settings["verbose"]: print( "testing candidate {0} ".format( self._idx_to_lag([max_candidate])[0] ), end="", ) significant = False try: significant = stats.max_statistic( self, data, candidate_set, te_max_candidate, conditional=self._selected_vars_realisations, )[0] except ex.AlgorithmExhaustedError as aee: # The algorithm cannot continue here, so # we'll terminate the check on the max stats and not let the # source pass print( "AlgorithmExhaustedError encountered in " "estimations: " + aee.message ) print("Halting max stats and further selection for target.") # For now we don't need a stack trace: # traceback.print_tb(aee.__traceback__) break # If the max is significant keep it and test the next candidate. If # it is not significant break. There will be no further significant # sources b/c they all have lesser TE. if significant: # if self.settings['verbose']: # print(' -- significant') success = True # Remove candidate from candidate set and add it to the # selected variables (used as the conditioning set). candidate_set.pop(np.argmax(temp_te)) self._append_selected_vars( [max_candidate], data.get_realisations(self.current_value, [max_candidate])[0], ) if self.settings["write_ckp"]: self._write_checkpoint() else: if self.settings["verbose"]: print(" -- not significant") break return success def _prune_candidates(self, data): """Remove uninformative candidates from the final conditional set. For each sample in the final conditioning set, check if it is informative about the current value given all other samples in the final set. If a sample is not informative, it is removed from the final set. Args: data : Data instance raw data """ # FOR LATER we don't need to test the last included in the first round if self.settings["verbose"]: if self.selected_vars_sources: print( "testing candidate set: {0}".format( self._idx_to_lag(self.selected_vars_sources) ), end="", ) else: print("no sources selected, nothing to prune ...") while self.selected_vars_sources: # Find the candidate with the minimum TE into the target. cond_dim = len(self.selected_vars_sources) - 1 candidate_realisations = np.empty( ( data.n_realisations(self.current_value) * len(self.selected_vars_sources), 1, ) ).astype(data.data_type) conditional_realisations = np.empty( ( data.n_realisations(self.current_value) * len(self.selected_vars_sources), cond_dim, ) ).astype(data.data_type) i_1 = 0 i_2 = data.n_realisations(self.current_value) for candidate in self.selected_vars_sources: # Separate the candidate realisations and all other # realisations to test the candidate's individual contribution. [temp_cond, temp_cand] = self._separate_realisations( self.selected_vars_sources, candidate ) if temp_cond is None: conditional_realisations = None re_use = ["var2", "conditional"] else: conditional_realisations[i_1:i_2,] = temp_cond re_use = ["var2"] candidate_realisations[i_1:i_2,] = temp_cand i_1 = i_2 i_2 += data.n_realisations(self.current_value) try: temp_te = self._cmi_estimator.estimate_parallel( n_chunks=len(self.selected_vars_sources), re_use=re_use, var1=candidate_realisations, var2=self._current_value_realisations, conditional=conditional_realisations, ) except ex.AlgorithmExhaustedError as aee: # The algorithm cannot continue here, so we'll terminate the # pruning check, assuming that we need not prune any more print( "AlgorithmExhaustedError encountered in estimations: " + aee.message ) print("Halting current pruning and allowing others to remain.") # For now we don't need a stack trace: # traceback.print_tb(aee.__traceback__) break # Test min TE for significance with minimum statistics. te_min_candidate = min(temp_te) min_candidate = self.selected_vars_sources[np.argmin(temp_te)] if self.settings["verbose"]: print(f"testing candidate: {self._idx_to_lag([min_candidate])[0]}") remaining_candidates = set(self.selected_vars_sources).difference( set([min_candidate]) ) conditional_realisations = data.get_realisations( self.current_value, remaining_candidates )[0] try: [significant, p, surr_table] = stats.min_statistic( analysis_setup=self, data=data, candidate_set=self.selected_vars_sources, te_min_candidate=te_min_candidate, conditional=conditional_realisations, ) except ex.AlgorithmExhaustedError as aee: # The algorithm cannot continue here, so # we'll terminate the min statistics # assuming that we need not prune any more print( "AlgorithmExhaustedError encountered in estimations: " + aee.message ) print("Halting current pruning and allowing others to remain.") # For now we don't need a stack trace: # traceback.print_tb(aee.__traceback__) break # Remove the minimum it is not significant and test the next min. # candidate. If the minimum is significant, break, all other # sources will be significant as well (b/c they have higher TE). if not significant: # if self.settings['verbose']: # print(' -- not significant') self._remove_selected_var(min_candidate) if self.settings["write_ckp"]: self._write_checkpoint() else: if self.settings["verbose"]: print(" -- significant") self._min_stats_surr_table = surr_table break def _test_final_conditional(self, data): """Perform statistical test on AIS using the final conditional set.""" if self._selected_vars_full: if self.settings["verbose"]: print(f"selected sources: {self._idx_to_lag(self.selected_vars_full)}") try: ais, s, p = stats.mi_against_surrogates(self, data) except ex.AlgorithmExhaustedError as aee: # The algorithm cannot continue here, so # we'll set the results to zero print( "AlgorithmExhaustedError encountered in " "estimations: " + aee.message ) print( "Halting AIS final conditional test and setting to not " "significant." ) # For now we don't need a stack trace: # traceback.print_tb(aee.__traceback__) ais = 0 s = False p = 1 # If a parallel estimator was used, an array of AIS estimates is # returned. Make the output uniform for both estimator types. if isinstance(ais, np.ndarray): assert ais.shape[0] == 1, "AIS result is not a scalar." ais = ais[0] if self.settings["local_values"]: replication_ind = data.get_realisations( self.current_value, self._selected_vars_sources )[1] try: local_ais = self._cmi_estimator_local.estimate( var1=self._current_value_realisations, var2=self._selected_vars_realisations, conditional=None, ) except ex.AlgorithmExhaustedError as aee: # The algorithm cannot continue here, so # we'll set the results to zero print( "AlgorithmExhaustedError encountered in " "final local AIS estimations: " + aee.message ) print( "Setting all local results to zero (but leaving surrogate statistical test results)" ) # For now we don't need a stack trace: # traceback.print_tb(aee.__traceback__) # Return local AIS values of all zeros: # (length gleaned from line below) local_ais = np.zeros( (max(replication_ind) + 1) * sum(replication_ind == 0) ) # Reshape local AIS to a [replications x samples] matrix. self.ais = local_ais.reshape( max(replication_ind) + 1, sum(replication_ind == 0) ) else: self.ais = ais self.sign = s self.pvalue = p else: if self.settings["verbose"]: print("no sources selected") self.ais = np.nan self.sign = False self.pvalue = 1.0 def _force_conditionals(self, cond, data): """Enforce a given conditioning set.""" if type(cond) is tuple: # easily add single variable cond = [cond] print("Adding the following variables to the conditioning set: {cond}.") cond_idx = self._lag_to_idx(cond) self._append_selected_vars( cond_idx, data.get_realisations(self.current_value, cond_idx)[0] ) def _reset(self): """Reset instance after analysis.""" self.__init__() del self.pvalue del self.sign del self.ais del self.settings del self._cmi_estimator