Source code for idtxl.data

"""Provide data structures for IDTxl analysis."""
import numpy as np

from . import idtxl_utils as utils

VERBOSE = False


[docs]class Data: """Store data for information dynamics estimation. Data takes a 1- to 3-dimensional array representing realisations of random variables in dimensions: processes, samples (over time), and replications. If necessary, data reshapes provided realisations to fit the format expected by IDTxl, which is a 3-dimensional array with axes representing (process index, sample index, replication index). Indicate the actual order of dimensions in the provided array in a three-character string, e.g. 'spr' for an array with realisations over (1) samples in time, (2) processes, (3) replications. Example: >>> data_mute = Data() # initialise empty data object >>> data_mute.generate_mute_data() # simulate data from MuTE paper >>> >>> # Create data objects with data of various sizes >>> d = np.arange(10000).reshape((2, 1000, 5)) # 2 procs., >>> data_1 = Data(d, dim_order='psr') # 1000 samples, 5 repl. >>> >>> d = np.arange(3000).reshape((3, 1000)) # 3 procs., >>> data_2 = Data(d, dim_order='ps') # 1000 samples >>> >>> # Overwrite data in existing object with random data >>> d = np.arange(5000) >>> data_2.set_data(data_new, 's') Note: Realisations are stored as attribute 'data'. This can only be set via the 'set_data()' method. Args: data : numpy array [optional] 1/2/3-dimensional array with raw data dim_order : string [optional] order of dimensions, accepts any combination of the characters 'p', 's', and 'r' for processes, samples, and replications; must have the same length as the data dimensionality, e.g., 'ps' for a two-dimensional array of data from several processes over time (default='psr') normalise : bool [optional] if True, data gets normalised per process (default=True) seed : int [optional] can be set to a fixed integer to get repetitive results on the same data with multiple runs of analyses. Otherwise a random seed is set as default. Attributes: data : numpy array realisations, can only be set via 'set_data' method n_processes : int number of processes n_replications : int number of replications n_samples : int number of samples in time normalise : bool if true, all data gets z-standardised per process initial_state : array initial state of the seed for shuffled permutations """ def __init__(self, data=None, dim_order="psr", normalise=True, seed=None): np.random.seed(seed) self.initial_state = np.random.get_state() self.normalise = normalise self.n_processes = 0 self.n_samples = 0 self.n_replications = 0 if data is not None: self.set_data(data, dim_order)
[docs] def n_realisations(self, current_value=None): """Number of realisations over samples and replications. Args: current_value : tuple [optional] reference point for calculation of number of realisations (e.g. when using an embedding of length k, we count realisations from the k+1th sample because we loose the first k samples to the embedding); if no current_value is provided, the number of all samples is used """ return self.n_realisations_samples(current_value) * self.n_realisations_repl()
[docs] def n_realisations_samples(self, current_value=None): """Number of realisations over samples. Args: current_value : tuple [optional] reference point for calculation of number of realisations (e.g. when using an embedding of length k, the current value is at sample k + 1; we thus count realisations from the k + 1st sample because we loose the first k samples to the embedding) """ if current_value is None: return self.n_samples if current_value[1] >= self.n_samples: raise RuntimeError( f"The sample index of the current value ({current_value}) is larger than the" f" number of samples in the data set ({self.n_samples})." ) return self.n_samples - current_value[1]
[docs] def n_realisations_repl(self): """Number of realisations over replications.""" return self.n_replications
@property def data(self): """Return data array.""" return self._data @data.setter def data(self, d): if hasattr(self, "data"): raise AttributeError( "You can not assign a value to this attribute" " directly, use the set_data method instead." ) self._data = d @data.deleter def data(self): print("overwriting existing data") del self._data
[docs] def set_data(self, data, dim_order): """Overwrite data in an existing Data object. Args: data : numpy array 1- to 3-dimensional array of realisations dim_order : string order of dimensions, accepts any combination of the characters 'p', 's', and 'r' for processes, samples, and replications; must have the same length as number of dimensions in data """ if len(dim_order) > 3: raise RuntimeError("dim_order can not have more than three entries") if len(dim_order) != data.ndim: raise RuntimeError( f"Data array dimension ({data.ndim}) and length of " f"dim_order ({len(dim_order)}) are not equal." ) # Bring data into the order processes x samples x replications and set # set data. data_ordered = self._reorder_data(data, dim_order) self._set_data_size(data_ordered) print( f"Adding data with properties: {self.n_processes} processes, {self.n_samples} " f"samples, {self.n_replications} replications" ) try: delattr(self, "data") except AttributeError: pass if self.normalise: self.data = self._normalise_data(data_ordered) else: self.data = data_ordered self.data_type = type(self.data[0, 0, 0])
def _normalise_data(self, d): """Z-standardise data separately for each process.""" d_standardised = np.empty(d.shape) for process in range(self.n_processes): s = utils.standardise( d[process, :, :].reshape(1, self.n_realisations()), dimension=1 ) d_standardised[process, :, :] = s.reshape( self.n_samples, self.n_replications ) return d_standardised def _reorder_data(self, data, dim_order): """Reorder data dimensions to processes x samples x replications.""" # add singletons for missing dimensions missing_dims = "psr" for dim in dim_order: missing_dims = missing_dims.replace(dim, "") for dim in missing_dims: data = np.expand_dims(data, data.ndim) dim_order += dim # reorder array dims if necessary if dim_order[0] != "p": ind_p = dim_order.index("p") data = data.swapaxes(0, ind_p) dim_order = utils.swap_chars(dim_order, 0, ind_p) if dim_order[1] != "s": data = data.swapaxes(1, dim_order.index("s")) return data def _set_data_size(self, data): """Set the data size.""" self.n_processes = data.shape[0] self.n_samples = data.shape[1] self.n_replications = data.shape[2]
[docs] def get_seed(self): """return the initial seed of the data""" return self.initial_state
[docs] def get_state(self): """return the current state of the random seed""" return np.random.get_state()
[docs] def get_realisations(self, current_value, idx_list, shuffle=False): """Return realisations for a list of indices. Return realisations for indices in list. Optionally, realisations can be shuffled to create surrogate data for statistical testing. For shuffling, data blocks are permuted over replications while their temporal order stays intact within replications: Original data: +--------------+---------+---------+---------+---------+---------+-----+ | repl. ind. | 1 1 1 1 | 2 2 2 2 | 3 3 3 3 | 4 4 4 4 | 5 5 5 5 | ... | +--------------+---------+---------+---------+---------+---------+-----+ | sample index | 1 2 3 4 | 1 2 3 4 | 1 2 3 4 | 1 2 3 4 | 1 2 3 4 | ... | +--------------+---------+---------+---------+---------+---------+-----+ Shuffled data: +--------------+---------+---------+---------+---------+---------+-----+ | repl. ind. | 3 3 3 3 | 1 1 1 1 | 4 4 4 4 | 2 2 2 2 | 5 5 5 5 | ... | +--------------+---------+---------+---------+---------+---------+-----+ | sample index | 1 2 3 4 | 1 2 3 4 | 1 2 3 4 | 1 2 3 4 | 1 2 3 4 | ... | +--------------+---------+---------+---------+---------+---------+-----+ Args: idx_list: list of tuples variable indices current_value : tuple index of the current value in current analysis, has to have the form (idx process, idx sample); if current_value == idx, all samples for a process are returned shuffle: bool if true permute blocks of replications over trials Returns: numpy array realisations with dimensions (no. samples * no.replications) x number of indices numpy array replication index for each realisation with dimensions (no. samples * no.replications) x number of indices """ if not hasattr(self, "data"): raise AttributeError("No data has been added to this Data() instance.") # Return None if index list is empty. if not idx_list: return None, None # Check if requested indices are smaller than the current_value. if not all(np.array([x[1] for x in idx_list]) <= current_value[1]): print(f"Index list: {idx_list}\ncurrent value: {current_value}") raise RuntimeError( "All indices for which data is retrieved must be smaller than the current value." ) # Allocate memory. n_real_time = self.n_realisations_samples(current_value) n_real_repl = self.n_realisations_repl() realisations = np.empty((n_real_time * n_real_repl, len(idx_list))).astype( self.data_type ) # Shuffle the replication order if requested. This creates surrogate # data by permuting replications while keeping the order of samples # intact. if shuffle: replications_order = np.random.permutation(self.n_replications) else: replications_order = np.arange(self.n_replications) # Retrieve data. i = 0 for idx in idx_list: r = 0 # get the last sample as negative value, i.e., no. samples from the # end of the array last_sample = idx[1] - current_value[1] # indexing is much faster if last_sample == 0: # than looping over time! last_sample = None for replication in replications_order: try: realisations[r : r + n_real_time, i] = self.data[ idx[0], idx[1] : last_sample, replication ] except IndexError as e: raise IndexError( f"You tried to access variable {idx} in a data set with {self.n_processes} " f"processes and {self.n_samples} samples." ) from e r += n_real_time assert not np.isnan( realisations[:, i] ).any(), "There are nans in the retrieved realisations." i += 1 # For each realisation keep the index of the replication it came from. replications_index = np.repeat(replications_order, n_real_time) assert ( replications_index.shape[0] == realisations.shape[0] ), "There seems to be a problem with the replications index." return realisations, replications_index
def _get_data_slice(self, process, offset_samples=0, shuffle=False): """Return data slice for a single process. Return data slice for process. Optionally, an offset can be provided such that data are returned from sample 'offset_samples' onwards. Also, data can be shuffled over replications to create surrogate data for statistical testing. For shuffling, data blocks are permuted over replications while their temporal order stays intact within replications: Original data: +---------------+---------+---------+---------+---------+---------+-----+ | repl. index: | 1 1 1 1 | 2 2 2 2 | 3 3 3 3 | 4 4 4 4 | 5 5 5 5 | ... | +---------------+---------+---------+---------+---------+---------+-----+ | sample index: | 1 2 3 4 | 1 2 3 4 | 1 2 3 4 | 1 2 3 4 | 1 2 3 4 | ... | +---------------+---------+---------+---------+---------+---------+-----+ Shuffled data: +---------------+---------+---------+---------+---------+---------+-----+ | repl. index: | 3 3 3 3 | 1 1 1 1 | 4 4 4 4 | 2 2 2 2 | 5 5 5 5 | ... | +---------------+---------+---------+---------+---------+---------+-----+ | sample index: | 1 2 3 4 | 1 2 3 4 | 1 2 3 4 | 1 2 3 4 | 1 2 3 4 | ... | +---------------+---------+---------+---------+---------+---------+-----+ If the current_value is provided, data are returned from an offset specified by the index wrt the current_value. Args: process: int process index offset_samples : int offset in samples shuffle: bool if true permute blocks of data over trials Returns: numpy array data slice with dimensions no. samples - offset x no. replications numpy array list of replications indices """ # Check if requested indices are smaller than the current_value. if offset_samples > self.n_samples: print( "Offset {0} must be smaller than number of samples in the " " data ({1})".format(offset_samples, self.n_samples) ) raise RuntimeError("Offset must be smaller than no. samples.") # Shuffle the replication order if requested. This creates surrogate # data by permuting replications while keeping the order of samples # intact. if shuffle: replication_index = np.random.permutation(self.n_replications) else: replication_index = np.arange(self.n_replications) try: data_slice = self.data[process, offset_samples:, replication_index] except IndexError as e: raise IndexError( "You tried to access process {process} with an offset of {offset_samples} in a data set " "of {self.n_processes} processes and {self.n_samples} samples." ) from e assert not np.isnan( data_slice ).any(), "There are nans in the retrieved data slice." return data_slice.T, replication_index
[docs] def slice_permute_replications(self, process): """Return data slice with permuted replications (time stays intact). Create surrogate data by permuting realisations over replications while keeping the temporal structure (order of samples) intact. Return realisations for all indices in the list, where an index is expected to have the form (process index, sample index). Realisations are permuted block-wise by permuting the order of replications """ return self._get_data_slice(process, shuffle=True)
[docs] def slice_permute_samples(self, process, perm_settings): """Return slice of data with permuted samples (repl. stays intact). Create surrogate data by permuting data in a slice over samples (time) while keeping the order of replications intact. Return slice for the entry specified by 'process'. Realisations are permuted according to the settings specified in perm_settings: Original data: +--------------+-----------------+-----------------+-----------------+-----+ | repl. ind. | 1 1 1 1 1 1 1 1 | 2 2 2 2 2 2 2 2 | 3 3 3 3 3 3 3 3 | ... | +--------------+-----------------+-----------------+-----------------+-----+ | sample index | 1 2 3 4 5 6 7 8 | 1 2 3 4 5 6 7 8 | 1 2 3 4 5 6 7 8 | ... | +--------------+-----------------+-----------------+-----------------+-----+ Circular shift by 2, 6, and 4 samples: +--------------+-----------------+-----------------+-----------------+-----+ | repl. ind. | 1 1 1 1 1 1 1 1 | 2 2 2 2 2 2 2 2 | 3 3 3 3 3 3 3 3 | ... | +--------------+-----------------+-----------------+-----------------+-----+ | sample index | 7 8 1 2 3 4 5 6 | 3 4 5 6 7 8 1 2 | 5 6 7 8 1 2 3 4 | ... | +--------------+-----------------+-----------------+-----------------+-----+ Permute blocks of 3 samples: +--------------+-----------------+-----------------+-----------------+-----+ | repl. ind. | 1 1 1 1 1 1 1 1 | 2 2 2 2 2 2 2 2 | 3 3 3 3 3 3 3 3 | ... | +--------------+-----------------+-----------------+-----------------+-----+ | sample index | 4 5 6 7 8 1 2 3 | 1 2 3 7 8 4 5 6 | 7 8 4 5 6 1 2 3 | ... | +--------------+-----------------+-----------------+-----------------+-----+ Permute data locally within a range of 4 samples: +--------------+-----------------+-----------------+-----------------+-----+ | repl. ind. | 1 1 1 1 1 1 1 1 | 2 2 2 2 2 2 2 2 | 3 3 3 3 3 3 3 3 | ... | +--------------+-----------------+-----------------+-----------------+-----+ | sample index | 1 2 4 3 8 5 6 7 | 4 1 2 3 5 7 8 6 | 3 1 2 4 8 5 6 7 | ... | +--------------+-----------------+-----------------+-----------------+-----+ Random permutation: +--------------+-----------------+-----------------+-----------------+-----+ | repl. ind. | 1 1 1 1 1 1 1 1 | 2 2 2 2 2 2 2 2 | 3 3 3 3 3 3 3 3 | ... | +--------------+-----------------+-----------------+-----------------+-----+ | sample index | 4 2 5 7 1 3 2 6 | 7 5 3 4 2 1 8 5 | 1 2 4 3 6 8 7 5 | ... | +--------------+-----------------+-----------------+-----------------+-----+ Permuting samples is the fall-back option for surrogate creation if the number of replications is too small to allow for a sufficient number of permutations for the generation of surrogate data. Args: process : int process for which to return data slice perm_settings : dict settings specifying the allowed permutations: - perm_type : str permutation type, can be - 'circular': shifts time series by a random number of samples - 'block': swaps blocks of samples, - 'local': swaps samples within a given range, or - 'random': swaps samples at random, - additional settings depending on the perm_type (n is the number of samples): - if perm_type == 'circular': 'max_shift' : int the maximum number of samples for shifting (default=n/2) - if perm_type == 'block': 'block_size' : int no. samples per block (default=n/10) 'perm_range' : int range in which blocks can be swapped (default=max) - if perm_type == 'local': 'perm_range' : int range in samples over which realisations can be permuted (default=n/10) Returns: numpy array data slice with data permuted over samples with dimensions samples x number of replications numpy array index of permuted samples Note: This permutation scheme is the fall-back option if the number of replications is too small to allow a sufficient number of permutations for the generation of surrogate data. """ data_slice = self._get_data_slice(process, shuffle=True)[0] data_slice_perm = np.empty(data_slice.shape).astype(self.data_type) perm = self._get_permutation_samples(data_slice.shape[0], perm_settings) for r in range(self.n_replications): data_slice_perm[:, r] = data_slice[perm, r] return data_slice_perm, perm
[docs] def permute_replications(self, current_value, idx_list): """Return realisations with permuted replications (time stays intact). Create surrogate data by permuting realisations over replications while keeping the temporal structure (order of samples) intact. Return realisations for all indices in the list, where an index is expected to have the form (process index, sample index). Realisations are permuted block-wise by permuting the order of replications: Original data: +--------------+---------+---------+---------+---------+---------+-----+ | repl. ind. | 1 1 1 1 | 2 2 2 2 | 3 3 3 3 | 4 4 4 4 | 5 5 5 5 | ... | +--------------+---------+---------+---------+---------+---------+-----+ | sample index | 1 2 3 4 | 1 2 3 4 | 1 2 3 4 | 1 2 3 4 | 1 2 3 4 | ... | +--------------+---------+---------+---------+---------+---------+-----+ Permuted data: +--------------+---------+---------+---------+---------+---------+-----+ | repl. ind. | 3 3 3 3 | 1 1 1 1 | 4 4 4 4 | 2 2 2 2 | 5 5 5 5 | ... | +--------------+---------+---------+---------+---------+---------+-----+ | sample index | 1 2 3 4 | 1 2 3 4 | 1 2 3 4 | 1 2 3 4 | 1 2 3 4 | ... | +--------------+---------+---------+---------+---------+---------+-----+ Args: current_value : tuple index of the current_value in the data idx_list : list of tuples indices of variables Returns: numpy array permuted realisations with dimensions replications x number of indices numpy array replication index for each realisation Raises: TypeError if idx_realisations is not a list """ if not isinstance(idx_list, list): raise TypeError("idx needs to be a list of tuples.") return self.get_realisations(current_value, idx_list, shuffle=True)
[docs] def permute_samples(self, current_value, idx_list, perm_settings): """Return realisations with permuted samples (repl. stays intact). Create surrogate data by permuting realisations over samples (time) while keeping the order of replications intact. Surrogates can be created for multiple variables in parallel, where variables are provided as a list of indices. An index is expected to have the form (process index, sample index). Permuting samples in time is the fall-back option for surrogate data creation. The default method for surrogate data creation is the permutation of replications, while keeping the order of samples in time intact. If the number of replications is too small to allow for a sufficient number of permutations for the generation of surrogate data, permutation of samples in time is chosen instead. Different permutation strategies can be chosen to permute realisations in time. Note that if data consists of multiple replications, within each replication, samples are shuffled following the same permutation pattern: Original data: +--------------+-----------------+-----------------+-----------------+-----+ | repl. ind. | 1 1 1 1 1 1 1 1 | 2 2 2 2 2 2 2 2 | 3 3 3 3 3 3 3 3 | ... | +--------------+-----------------+-----------------+-----------------+-----+ | sample index | 1 2 3 4 5 6 7 8 | 1 2 3 4 5 6 7 8 | 1 2 3 4 5 6 7 8 | ... | +--------------+-----------------+-----------------+-----------------+-----+ Circular shift by a random number of samples, e.g. 4 samples: +--------------+-----------------+-----------------+-----------------+-----+ | repl. ind. | 1 1 1 1 1 1 1 1 | 2 2 2 2 2 2 2 2 | 3 3 3 3 3 3 3 3 | ... | +--------------+-----------------+-----------------+-----------------+-----+ | sample index | 5 6 7 8 1 2 3 4 | 5 6 7 8 1 2 3 4 | 5 6 7 8 1 2 3 4 | ... | +--------------+-----------------+-----------------+-----------------+-----+ Permute blocks of 3 samples: +--------------+-----------------+-----------------+-----------------+-----+ | repl. ind. | 1 1 1 1 1 1 1 1 | 2 2 2 2 2 2 2 2 | 3 3 3 3 3 3 3 3 | ... | +--------------+-----------------+-----------------+-----------------+-----+ | sample index | 4 5 6 7 8 1 2 3 | 4 5 6 7 8 1 2 3 | 4 5 6 7 8 1 2 3 | ... | +--------------+-----------------+-----------------+-----------------+-----+ Permute data locally within a range of 4 samples: +--------------+-----------------+-----------------+-----------------+-----+ | repl. ind. | 1 1 1 1 1 1 1 1 | 2 2 2 2 2 2 2 2 | 3 3 3 3 3 3 3 3 | ... | +--------------+-----------------+-----------------+-----------------+-----+ | sample index | 1 2 4 3 8 5 6 7 | 1 2 4 3 8 5 6 7 | 1 2 4 3 8 5 6 7 | ... | +--------------+-----------------+-----------------+-----------------+-----+ Random permutation: +--------------+-----------------+-----------------+-----------------+-----+ | repl. ind. | 1 1 1 1 1 1 1 1 | 2 2 2 2 2 2 2 2 | 3 3 3 3 3 3 3 3 | ... | +--------------+-----------------+-----------------+-----------------+-----+ | sample index | 4 2 5 7 1 3 2 6 | 4 2 5 7 1 3 2 6 | 4 2 5 7 1 3 2 6 | ... | +--------------+-----------------+-----------------+-----------------+-----+ Args: current_value : tuple index of the current_value in the data idx_list : list of tuples indices of variables perm_settings : dict settings specifying the allowed permutations: - perm_type : str permutation type, can be - 'random': swaps samples at random, - 'circular': shifts time series by a random number of samples - 'block': swaps blocks of samples, - 'local': swaps samples within a given range, or - additional settings depending on the perm_type (n is the number of samples): - if perm_type == 'circular': 'max_shift' : int the maximum number of samples for shifting (e.g., number of samples / 2) - if perm_type == 'block': 'block_size' : int no. samples per block (e.g., number of samples / 10) 'perm_range' : int range in which blocks can be swapped (e.g., number of samples / block_size) - if perm_type == 'local': 'perm_range' : int range in samples over which realisations can be permuted (e.g., number of samples / 10) Returns: numpy array permuted realisations with dimensions replications x number of indices numpy array sample index for each realisation Raises: TypeError if idx_realisations is not a list Note: This permutation scheme is the fall-back option if surrogate data can not be created by shuffling replications because the number of replications is too small to generate the requested number of permutations. """ [realisations, replication_idx] = self.get_realisations(current_value, idx_list) n_samples = sum(replication_idx == 0) perm = self._get_permutation_samples(n_samples, perm_settings) # Apply the permutation to data from each replication. realisations_perm = np.empty(realisations.shape).astype(self.data_type) perm_idx = np.empty(realisations_perm.shape[0]) for r in range(max(replication_idx) + 1): mask = replication_idx == r data_temp = realisations[mask, :] realisations_perm[mask, :] = data_temp[perm, :] perm_idx[mask] = perm return realisations_perm, perm_idx
def _get_permutation_samples(self, n_samples, perm_settings): """Generate permutation of n samples. Generate a permutation of n samples under various, possible restrictions. Permuting samples is the fall-back option for surrogate creation if the number of replications is too small to allow for a sufficient number of permutations for the generation of surrogate data. For a detailed descriptions of permutation strategies see the documentation of permute_samples(). Args: n_samples : int length of the permutation perm_settings : dict settings specifying the allowed permutations, see documentation of permute_samples() Returns: numpy array realisations permuted over time numpy Array permuted indices of samples """ perm_type = perm_settings["perm_type"] # Get the permutation 'mask' for one replication (the same mask is then # applied to each replication). if perm_type == "random": perm = np.random.permutation(n_samples) elif perm_type == "circular": max_shift = perm_settings["max_shift"] if not isinstance(max_shift, int) or max_shift < 1: raise TypeError("'max_shift' has to be an int > 0.") perm = self._circular_shift(n_samples, max_shift)[0] elif perm_type == "block": block_size = perm_settings["block_size"] perm_range = perm_settings["perm_range"] if not isinstance(block_size, int) or block_size < 1: raise TypeError("'block_size' has to be an int > 0.") if not isinstance(perm_range, int) or perm_range < 1: raise TypeError("'perm_range' has to be an int > 0.") perm = self._swap_blocks(n_samples, block_size, perm_range) elif perm_type == "local": perm_range = perm_settings["perm_range"] if not isinstance(perm_range, int) or perm_range < 1: raise TypeError("'perm_range' has to be an int > 0.") perm = self._swap_local(n_samples, perm_range) else: raise ValueError(f"Unknown permutation type ({perm_type}).") return perm def _swap_local(self, n, perm_range): """Permute n samples within blocks of length 'perm_range'. Args: n : int number of samples perm_range : int range over which realisations are permuted Returns: numpy array permuted indices with length n """ assert perm_range > 1, ( "Permutation range has to be larger than 1", "otherwise there is nothing to permute.", ) assert n >= perm_range, ( f"Not enough realisations per replication ({n}) to allow for the requested " f"'perm_range' of {perm_range}." ) if perm_range == n: # permute all n samples randomly perm = np.random.permutation(n) else: # build a permutation that permutes only within the perm_range perm = np.empty(n, dtype=int) remainder = n % perm_range i = 0 for p in range(n // perm_range): perm[i : i + perm_range] = np.random.permutation(perm_range) + i i += perm_range if remainder > 0: perm[-remainder:] = np.random.permutation(remainder) + i return perm def _swap_blocks(self, n, block_size, perm_range): """Permute blocks of samples in a time series within a given range. Permute n samples by swapping blocks of samples within a given range. Args: n : int number of samples block_size : int number of samples in a block perm_range : int range over which blocks can be swapped Returns: numpy array permuted indices with length n """ n_blocks = np.ceil(n / block_size).astype(int) rem_samples = n % block_size rem_blocks = n_blocks % perm_range if rem_samples == 0: rem_samples = block_size # First permute block(!) indices. if perm_range == n_blocks: # permute all blocks randomly perm_blocks = np.random.permutation(n_blocks) else: # build a permutation that permutes only within the perm_range perm_blocks = np.empty(n_blocks, dtype=int) i = 0 for p in range(n_blocks // perm_range): perm_blocks[i : i + perm_range] = np.random.permutation(perm_range) + i i += perm_range if rem_blocks > 0: perm_blocks[-rem_blocks:] = np.random.permutation(rem_blocks) + i # Get the block index for each sample index, take into account that the # last block may have fewer samples if n_samples % block_size isn't 0. idx_blocks = np.hstack( ( np.repeat(np.arange(n_blocks - 1), block_size), np.repeat(n_blocks - 1, rem_samples), ) ) # Permute samples indices according to permuted block indices. perm = np.zeros(n).astype(int) # allocate memory idx_orig = np.arange(n) # original order of sample indices i_0 = 0 for b in perm_blocks: # loop over permuted blocks idx = idx_blocks == b # indices of samples in curr. permuted block i_1 = i_0 + sum(idx) perm[i_0:i_1] = idx_orig[idx] # append samples to permutation i_0 = i_1 return perm def _circular_shift(self, n, max_shift): """Permute samples through shifting by a random number of samples. A time series is shifted circularly by a random number of samples. A circular shift of m means, that the last m samples are included at the beginning of the time series and all other sample indices are increased by mn steps. max_shift is an upper limit for m. Args: n : int number of samples max_shift: int maximum possible shift (default=n) Returns: numpy array permuted indices with length n int no. samples by which the time series was shifted """ assert max_shift <= n, ( f"Max_shift ({max_shift}) has to be equal to or smaller than the number of samples in the " f"time series ({n})." ) shift = np.random.randint(low=1, high=max_shift + 1) if VERBOSE: print(f"replications are shifted by {shift} samples") return np.hstack((np.arange(n - shift, n), np.arange(n - shift))), shift
[docs] def generate_mute_data(self, n_samples=1000, n_replications=10): """Generate example data for a 5-process network. Generate example data and overwrite the instance's current data. The network is used as an example the paper on the MuTE toolbox (Montalto, PLOS ONE, 2014, eq. 14) and was orginally proposed by Baccala & Sameshima (2001). The network consists of five auto-regressive (AR) processes with model orders 2 and the following (non-linear) couplings: 0 -> 1, u = 2 (non-linear) 0 -> 2, u = 3 0 -> 3, u = 2 (non-linear) 3 -> 4, u = 1 4 -> 3, u = 1 References: - Montalto, A., Faes, L., & Marinazzo, D. (2014) MuTE: A MATLAB toolbox to compare established and novel estimators of the multivariate transfer entropy. PLoS ONE 9(10): e109462. https://doi.org/10.1371/journal.pone.0109462 - Baccala, L.A. & Sameshima, K. (2001). Partial directed coherence: a new concept in neural structure determination. Biol Cybern 84: 463–474. https://doi.org/10.1007/PL00007990 Args: n_samples : int number of samples simulated for each process and replication n_replications : int number of replications """ n_processes = 5 x = np.zeros((n_processes, n_samples + 3, n_replications)) x[:, 0:3, :] = np.random.normal(size=(n_processes, 3, n_replications)) term_1 = 0.95 * np.sqrt(2) term_2 = 0.25 * np.sqrt(2) term_3 = -0.25 * np.sqrt(2) for r in range(n_replications): for n in range(3, n_samples + 3): x[0, n, r] = ( term_1 * x[0, n - 1, r] - 0.9025 * x[0, n - 2, r] + np.random.normal() ) x[1, n, r] = 0.5 * x[0, n - 2, r] ** 2 + np.random.normal() x[2, n, r] = -0.4 * x[0, n - 3, r] + np.random.normal() x[3, n, r] = ( -0.5 * x[0, n - 2, r] ** 2 + term_2 * x[3, n - 1, r] + term_2 * x[4, n - 1, r] + np.random.normal() ) x[4, n, r] = ( term_3 * x[3, n - 1, r] + term_2 * x[4, n - 1, r] + np.random.normal() ) self.set_data(x[:, 3:, :], "psr")
[docs] def generate_var_data( self, n_samples=1000, n_replications=10, coefficient_matrices=np.array([[[0.5, 0], [0.4, 0.5]]]), noise_std=0.1, ): """Generate discrete-time VAR (vector auto-regressive) time series. Generate data and overwrite the instance's current data. Args: n_samples : int [optional] number of samples simulated for each process and replication n_replications : int [optional] number of replications coefficient_matrices : numpy array [optional] coefficient matrices: numpy array with dimensions (VAR order, number of processes, number of processes). Each square coefficient matrix corresponds to a lag, starting from lag=1. The total number of provided matrices implicitly determines the order of the VAR process. (default = np.array([[[0.5, 0], [0.4, 0.5]]])) noise_std : float [optional] standard deviation of uncorrelated Gaussian noise (default = 0.1) """ order = np.shape(coefficient_matrices)[0] n_processes = np.shape(coefficient_matrices)[1] samples_transient = n_processes * 10 # Check stability of the VAR process, which is a sufficient condition # for stationarity. var_reduced_form = np.zeros((n_processes * order, n_processes * order)) var_reduced_form[0:n_processes, :] = np.reshape( np.transpose(coefficient_matrices, (1, 0, 2)), [n_processes, n_processes * order], ) var_reduced_form[n_processes:, 0 : n_processes * (order - 1)] = np.eye( n_processes * (order - 1) ) # Condition for stability: the absolute values of all the eigenvalues # of the reduced-form coefficient matrix are smaller than 1. A stable # VAR process is also stationary. is_stable = max(np.abs(np.linalg.eigvals(var_reduced_form))) < 1 if not is_stable: raise RuntimeError("VAR process is not stable and may be nonstationary.") # Initialise time series matrix. The 3 dimensions represent # (processes, samples, replications). Only the last n_samples will be # kept, in order to discard transient effects. x = np.zeros( (n_processes, order + samples_transient + n_samples, n_replications) ) # Generate (different) initial conditions for each replication: # Uniformly sample from the [0,1] interval and tile as many times as # the VAR process order along the second dimension. x[:, 0:order, :] = np.tile( np.random.rand(n_processes, 1, n_replications), (1, order, 1) ) # Compute time series for i_repl in range(0, n_replications): for i_sample in range(order, order + samples_transient + n_samples): for i_delay in range(1, order + 1): x[:, i_sample, i_repl] += np.dot( coefficient_matrices[i_delay - 1, :, :], x[:, i_sample - i_delay, i_repl], ) # Add uncorrelated Gaussian noise vector x[:, i_sample, i_repl] += np.random.normal( 0, noise_std, x[:, i_sample, i_repl].shape # mean ) # Discard transient effects (only take end of time series) self.set_data(x[:, -(n_samples + 1) : -1, :], "psr")
[docs] def generate_logistic_maps_data( self, n_samples=1000, n_replications=10, coefficient_matrices=np.array([[[0.5, 0], [0.4, 0.5]]]), noise_std=0.1, ): """Generate discrete-time coupled-logistic-maps time series. Generate data and overwrite the instance's current data. The implemented logistic map function is f(x) = 4 * x * (1 - x). Args: n_samples : int [optional] number of samples simulated for each process and replication n_replications : int [optional] number of replications coefficient_matrices : numpy array [optional] coefficient matrices: numpy array with dimensions (order, number of processes, number of processes). Each square coefficient matrix corresponds to a lag, starting from lag=1. The total number of provided matrices implicitly determines the order of the stochastic process. (default = np.array([[[0.5, 0], [0.4, 0.5]]])) noise_std : float [optional] standard deviation of uncorrelated Gaussian noise (default = 0.1) """ order = np.shape(coefficient_matrices)[0] n_processes = np.shape(coefficient_matrices)[1] samples_transient = n_processes * 10 # Define activation function (logistic map) def f(x): return 4 * x * (1 - x) # Initialise time series matrix. The 3 dimensions represent # (processes, samples, replications). Only the last n_samples will be # kept, in order to discard transient effects. x = np.zeros( (n_processes, order + samples_transient + n_samples, n_replications) ) # Generate (different) initial conditions for each replication: # Uniformly sample from the [0,1] interval and tile as many times as # the stochastic process order along the second dimension. x[:, 0:order, :] = np.tile( np.random.rand(n_processes, 1, n_replications), (1, order, 1) ) # Compute time series for i_repl in range(0, n_replications): for i_sample in range(order, order + samples_transient + n_samples): for i_delay in range(1, order + 1): x[:, i_sample, i_repl] += np.dot( coefficient_matrices[i_delay - 1, :, :], x[:, i_sample - i_delay, i_repl], ) # Compute activation function x[:, i_sample, i_repl] = f(x[:, i_sample, i_repl]) # Add uncorrelated Gaussian noise vector x[:, i_sample, i_repl] += np.random.normal( 0, noise_std, x[:, i_sample, i_repl].shape # mean ) # ensure values are in the [0, 1] range x[:, i_sample, i_repl] = x[:, i_sample, i_repl] % 1 # Discard transient effects (only take end of time series) self.set_data(x[:, -(n_samples + 1) : -1, :], "psr")