Source code for idtxl.idtxl_utils

"""Provide IDTxl utility functions."""
import copy as cp
import pprint
import threading

import numpy as np


[docs]def swap_chars(s, i_1, i_2): """Swap to characters in a string. Example: >>> print(swap_chars('heLlotHere', 2, 6)) 'heHlotLere' """ if i_1 > i_2: i_1, i_2 = i_2, i_1 return "".join([s[0:i_1], s[i_2], s[i_1 + 1 : i_2], s[i_1], s[i_2 + 1 :]])
[docs]def standardise(a, dimension=0, df=1): """Z-standardise a numpy array along a given dimension. Standardise array along the axis defined in dimension using the denominator (N - df) for the calculation of the standard deviation. Args: a : numpy array data to be standardised dimension : int [optional] dimension along which array should be standardised df : int [optional] degrees of freedom for the denominator of the standard derivation Returns: numpy array standardised data """ # Don't divide by standard devitation if process is constant. a_sd = a.std(axis=dimension, ddof=df) if np.isclose(a_sd, 0): return a - a.mean(axis=dimension) else: return (a - a.mean(axis=dimension)) / a_sd
[docs]def sort_descending(a): """Sort array in descending order.""" # http://stackoverflow.com/questions/26984414/ # efficiently-sorting-a-numpy-array-in-descending-order return np.sort(a)[::-1]
[docs]def argsort_descending(a): """Sort array in descending order and return sortind indices.""" # http://stackoverflow.com/questions/16486252/ # is-it-possible-to-use-argsort-in-descending-order return np.array(a).argsort()[::-1]
[docs]def remove_row(a, i): """Remove a row from a numpy array. This is faster than logical indexing ('25 times faster'), because it does not make copies, see http://scipy.github.io/old-wiki/pages/PerformanceTips Args: a : numpy array 2-dimensional numpy array i : int row index to be removed """ b = np.empty((a.shape[0] - 1, a.shape[1])) b[i:, :] = a[i + 1 :, :] b[:i, :] = a[:i, :] return b.astype(type(a[0][0]))
[docs]def remove_column(a, j): """Remove a column from a numpy array. This is faster than logical indexing ('25 times faster'), because it does not make copies, see http://scipy.github.io/old-wiki/pages/PerformanceTips Args: a : numpy array 2-dimensional numpy array i : int column index to be removed """ b = np.empty((a.shape[0], a.shape[1] - 1)) b[:, j:] = a[:, j + 1 :] b[:, :j] = a[:, :j] return b.astype(type(a[0][0]))
[docs]def autocorrelation(x): """Calculate autocorrelation of a vector.""" pass
[docs]def discretise(a, numBins): """Discretise continuous data. Discretise continuous data into discrete values (with 0 as lowest) by evenly partitioning the range of the data, one dimension at a time. Adapted from infodynamics.utils.MatrixUtils.discretise() from JIDT by J. Lizier. Args: a : numpy array data to be discretised. Dimensions are realisations x variable dimension numBins : int number of discrete levels or bins to partition the data into Returns: numpy array discretised data """ num_samples = a.shape[0] if len(a.shape) == 1: # It's a unidimensional array discretised_values = np.zeros(num_samples, dtype=np.int_) theMin = a.min() theMax = a.max() binInterval = (theMax - theMin) / numBins for t in range(num_samples): discretised_values[t] = int((a[t] - theMin) / binInterval) if discretised_values[t] == numBins: # This occurs for the maximum value; put it in the largest # bin (base - 1). discretised_values[t] = discretised_values[t] - 1 return discretised_values # Else, multivariate array num_dimensions = a.shape[1] discretised_values = np.zeros([num_samples, num_dimensions], dtype=np.int_) for v in range(a.shape[1]): # Bin dimension v: theMin = a[:, v].min() theMax = a[:, v].max() binInterval = (theMax - theMin) / numBins for t in range(num_samples): discretised_values[t, v] = int((a[t, v] - theMin) / binInterval) if discretised_values[t, v] == numBins: # This occurs for the maximum value; put it in the largest bin # (base - 1) discretised_values[t, v] = discretised_values[t, v] - 1 return discretised_values
[docs]def discretise_max_ent(a, numBins): """Discretise continuous data using maximum entropy partitioning. Discretise continuous data into discrete values (with 0 as lowest) by making a maximum entropy partitioning, one dimension at a time. Adapted from infodynamics.utils.MatrixUtils.discretiseMaxEntropy() from JIDT by J. Lizier. Args: a : numpy array data to be discretised. Dimensions are realisations x variable dimension numBins : int number of discrete levels or bins to partition the data into Returns: numpy array discretised data """ num_samples = a.shape[0] if len(a.shape) == 1: # It's a unidimensional array discretised_values = np.zeros(num_samples, dtype=np.int_) cuttoff_values = np.zeros(numBins) sorted_copy = np.sort(a) for bin in range(numBins): compartmentSize = int((bin + 1) * (num_samples) / numBins) - 1 cuttoff_values[bin] = sorted_copy[compartmentSize] for t in range(num_samples): for m in range(numBins): if a[t] <= cuttoff_values[m]: discretised_values[t] = m break return discretised_values # Else, multivariate array num_dimensions = a.shape[1] discretised_values = np.zeros([num_samples, num_dimensions], dtype=np.int_) for v in range(num_dimensions): # Bin dimension v: cuttoff_values = np.zeros(numBins) sorted_copy = np.sort(a[:, v]) for bin in range(numBins): compartmentSize = int((bin + 1) * (num_samples) / numBins) - 1 cuttoff_values[bin] = sorted_copy[compartmentSize] for t in range(num_samples): for m in range(numBins): if a[t, v] <= cuttoff_values[m]: discretised_values[t, v] = m break return discretised_values
[docs]def separate_arrays(idx_all, idx_single, a): """Separate a single column from all other columns in a 2D-array. Return the separated single column and the remaining columns of a 2D- array. Args: idx_all : list<Object> list of variables indicating the full set idx_single : <Object> single variable indicating the column to be separated, variable must be contained in idx_all a : numpy array 2D-array with the same length along axis 1 as idx_all (.shape[1] == len(idx_all)) Returns: numpy array remaining columns in full array numpy array column at single index """ assert len(idx_all) == a.shape[1], ( "Length of full index list does " "not correspond to array size " "along 1st axis." ) array_idx_single = idx_all.index(idx_single) real_single = np.expand_dims(a[:, array_idx_single], axis=1) real_remaining = remove_column(a, array_idx_single) return real_remaining, real_single
[docs]def combine_discrete_dimensions(a, numBins): """Combine multi-dimensional discrete variable into a single dimension. Combine all dimensions for a discrete variable down into a single dimensional value for each sample. This is done basically by multiplying each dimension by a different power of the base (numBins). Adapted from infodynamics.utils.MatrixUtils.computeCombinedValues() from JIDT by J.Lizier. Args: a : numpy array data to be combined across all variable dimensions. Dimensions are realisations (samples) x variable dimension numBins : int number of discrete levels or bins for each variable dimension Returns: numpy array a univariate array -- one entry now for each sample, with all dimensions of the data now combined for that sample """ if len(a.shape) == 1: # It's already a unidimensional array return a # Else, 2D array assumed num_samples = a.shape[0] dimensions = a.shape[1] combined_values = np.zeros(num_samples, dtype=np.int_) for t in range(num_samples): combined_value = 0 multiplier = 1 for c in range(dimensions - 1, -1, -1): combined_value = combined_value + a[t][c] * multiplier multiplier = multiplier * numBins if multiplier <= 0: # Multiplier has overflown raise ArithmeticError( "Combination of numBins and number of dimensions of a " "leads to overflow in making unidimensional array" ) combined_values[t] = int(combined_value) return combined_values
[docs]def equal_dicts(dict_1, dict_2): """Test two dictionaries for equality.""" if dict_1.keys() != dict_2.keys(): return False for k in dict_1.keys(): if isinstance(dict_1[k], (list, np.ndarray)): if (dict_1[k] != dict_2[k]).any(): return False else: if dict_1[k] != dict_2[k]: return False return True
[docs]def conflicting_entries(dict_1, dict_2): """Test two dictionaries for unequal entries. Note that only keys that are present in both dicts are compared. If one dictionary contains an entry not present in the other dictionary, the test passes. """ d1_keys = dict_1.keys() d2_keys = dict_2.keys() intersect_keys = set(d1_keys).intersection(set(d2_keys)) for k in intersect_keys: if np.array(dict_1[k] != dict_2[k]).any(): print( f"Unequal entries for key {k}: dict_1: {dict_1[k]}, dict_2: {dict_2[k]}" ) return True return False
[docs]def calculate_mi(corr): """Calculate mutual information from correlation coefficient.""" return -0.5 * np.log(1 - corr**2)
[docs]class timeout(object): """Context manager for a timeout using threading module. Args: timeout_duration: float number of seconds to wait before timeout is triggered exception_message : string message to put in the exception """ def __init__(self, timeout_duration, exception_message="Timeout"): self.timeout_duration = timeout_duration self.exception_message = exception_message def __enter__(self): self.timer = threading.Timer(self.timeout_duration, self.timeout_handler) self.timer.start() return self.timer def __exit__(self, exc_type, exc_value, traceback): self.timer.cancel()
[docs] def timeout_handler(self): raise TimeoutError(self.exception_message)