Source code for idtxl.estimators_multivariate_pid

"""Multivariate Partical information decomposition for discrete random variables.

This module provides an estimator for multivariate partial information
decomposition as proposed in

- Makkeh, A. & Gutknecht, A. & Wibral, M. (2020). A Differentiable measure
  for shared information. 1- 27 Retrieved from
  http://arxiv.org/abs/2002.03356
"""
import numpy as np

from . import lattices as lt
from . import pid_goettingen
from .estimator import Estimator
from .estimators_pid import _join_variables

# TODO add support for multivariate estimation for Tartu and Sydney estimator


[docs]class SxPID(Estimator): """Estimate partial information decomposition for multiple inputs. Implementation of the multivariate partial information decomposition (PID) estimator for discrete data with (up to 4 inputs) and one output. The estimator finds shared information, unique information and synergistic information between the multiple inputs s1, s2, ..., sn with respect to the output t for each realization (t, s1, ..., sn) and then average them according to their distribution weights p(t, s1, ..., sn). Both the pointwise (on the realization level) PID and the averaged PID are returned (see the 'return' of 'estimate()'). The algorithm uses recursion to compute the partial information decomposition. References: - Makkeh, A. & Wibral, M. (2020). A differentiable pointwise partial Information Decomposition estimator. https://github.com/Abzinger/SxPID. Args: settings : dict estimation parameters (with default parameters) - verbose : bool [optional] - print output to console (default=False) """ def __init__(self, settings): # get estimation parameters self.settings = settings.copy() self.settings.setdefault("verbose", False)
[docs] def is_parallel(): return False
[docs] def is_analytic_null_estimator(self): return False
[docs] def estimate(self, s, t): """Estimate SxPID from list of sources and a target Args: s : list of numpy arrays 1D arrays containing realizations of a discrete random variable t : numpy array 1D array containing realizations of a discrete random variable Returns: dict SxPID results, with entries - 'ptw' -> { realization -> {alpha -> [float, float, float]}}: pointwise decomposition - 'avg' -> {alpha -> [float, float, float]}: average decomposition the list of floats is ordered [informative, misinformative, informative - misinformative] """ s, t, self.settings = _check_input(s, t, self.settings) pdf = _get_pdf_dict(s, t) # Read lattices from a file # Stored as { # n -> [{alpha -> children}, (alpha_1,...) ] # } # children is a list of tuples lattices = lt.lattices num_source_vars = len(s) retval_ptw, retval_avg = pid_goettingen.pid( num_source_vars, pdf_orig=pdf, chld=lattices[num_source_vars][0], achain=lattices[num_source_vars][1], printing=self.settings["verbose"], ) # TODO AskM: Trivariate: does it make sense to name the alphas # for example shared_syn_s1_s2__syn_s1_s3 ? results = { "ptw": retval_ptw, "avg": retval_avg, } return results
def _get_pdf_dict(s, t): """ "Write probability mass function estimated via counting to a dict.""" # Create dictionary with probability mass function counts = dict() n_samples = s[0].shape[0] # Count occurences. for i in range(n_samples): key = tuple([s[j][i] for j in range(len(s))]) + (t[i],) if key in counts.keys(): counts[key] += 1 else: counts[key] = 1 # Create PMF from counts. pmf = dict() for xyz, c in counts.items(): pmf[xyz] = c / float(n_samples) return pmf def _check_input(s, t, settings): """Check input to PID estimators.""" # Check if inputs are numpy arrays. if type(t) != np.ndarray: raise TypeError("Input t must be a numpy array.") for i in range(len(s)): if type(s[i]) != np.ndarray: raise TypeError("All inputs s{0} must be numpy arrays.".format(i + 1)) # In general, IDTxl expects 2D inputs because JIDT/JPYPE only accepts those # and we have a multivariate approach, i.e., a vector is a special case of # 2D-data. The PID estimators on the other hand, expect 1D data. Squeeze 2D # arrays if the dimension of the second axis is 1. Otherwise combine # multivariate sources into a single variable for estimation. for i in range(len(s)): if s[i].ndim != 1: if s[i].shape[1] == 1: s[i] = np.squeeze(s[i]) elif s[i].ndim == 2 and s[i].shape[1] > 1: si_joint = s[i][:, 0] alph_new = len(np.unique(s[i][:, 0])) for col in range(1, s[i].shape[1]): alph_col = len(np.unique(s[i][:, col])) si_joint, alph_new = _join_variables( si_joint, s[i][:, col], alph_new, alph_col ) settings["alph_s" + str(i + 1)] = alph_new else: raise ValueError( "Input source {0} s{0} has to be a 1D or 2D " "numpy array.".format(i + 1) ) if t.ndim != 1: if t.shape[1] == 1: t = np.squeeze(t) else: # For now we only allow 1D-targets raise ValueError("Input target t has to be a vector " "(t.shape[1]=1).") # Check types of remaining inputs. if type(settings) != dict: raise TypeError("The settings argument should be a dictionary.") for i in range(len(s)): if not issubclass(s[i].dtype.type, np.integer): raise TypeError( "Input s{0} (source {0}) must be an integer numpy " "array.".format(i + 1) ) # ^ for if not issubclass(t.dtype.type, np.integer): raise TypeError("Input t (target) must be an integer numpy array.") # Check if variables have equal length. for i in range(len(s)): if len(t) != len(s[i]): raise ValueError("Number of samples s and t must be equal") return s, t, settings