import sys
import logging
from pkg_resources import resource_filename
from scipy.special import digamma
import numpy as np
from idtxl.estimator import Estimator
from . import idtxl_exceptions as ex
try:
import pyopencl as cl
except ImportError as err:
ex.package_missing(err, 'PyOpenCl is not available on this system. Install'
' it using pip or the package manager to use '
'OpenCL-powered CMI estimation.')
logger = logging.getLogger(__name__)
C = 1024**2
[docs]class OpenCLKraskov(Estimator):
"""Abstract class for implementation of OpenCL estimators.
Abstract class for implementation of OpenCL estimators, child classes
implement estimators for mutual information (MI) and conditional mutual
information (CMI) using the Kraskov-Grassberger-Stoegbauer estimator for
continuous data.
References:
- Kraskov, A., Stoegbauer, H., & Grassberger, P. (2004). Estimating mutual
information. Phys Rev E, 69(6), 066138.
- Lizier, Joseph T., Mikhail Prokopenko, and Albert Y. Zomaya. (2012).
Local measures of information storage in complex distributed computation.
Inform Sci, 208, 39-54.
- Schreiber, T. (2000). Measuring information transfer. Phys Rev Lett,
85(2), 461.
Estimators can be used to perform multiple, independent searches in
parallel. Each of these parallel searches is called a 'chunk'. To search
multiple chunks, provide point sets as 2D arrays, where the first
dimension represents samples or points, and the second dimension
represents the points' dimensions. Concatenate chunk data in the first
dimension and pass the number of chunks to the estimators. Chunks must be
of equal size.
Set common estimation parameters for OpenCL estimators. For usage of these
estimators see documentation for the child classes.
Args:
settings : dict [optional]
set estimator parameters:
- gpuid : int [optional] - device ID used for estimation (if more
than one device is available on the current platform) (default=0)
- kraskov_k : int [optional] - no. nearest neighbours for KNN
search (default=4)
- normalise : bool [optional] - z-standardise data (default=False)
- theiler_t : int [optional] - no. next temporal neighbours ignored
in KNN and range searches (default=0)
- noise_level : float [optional] - random noise added to the data
(default=1e-8)
- padding : bool [optional] - pad data to a length that is a
multiple of 1024, workaround for a
- debug : bool [optional] - calculate intermediate results, i.e.
neighbour counts from range searches and KNN distances, print
debug output to console (default=False)
- return_counts : bool [optional] - return intermediate results,
i.e. neighbour counts from range searches and KNN distances
(default=False)
"""
def __init__(self, settings=None):
# Get defaults for estimator settings
settings = self._check_settings(settings)
self.settings = settings.copy()
self.settings.setdefault('gpuid', int(0))
self.settings.setdefault('kraskov_k', int(4))
self.settings.setdefault('theiler_t', int(0))
self.settings.setdefault('noise_level', np.float32(1e-8))
self.settings.setdefault('local_values', False)
self.settings.setdefault('padding', True)
self.settings.setdefault('debug', False)
self.settings.setdefault('return_counts', False)
self.settings.setdefault('verbose', True)
self.sizeof_float = int(np.dtype(np.float32).itemsize)
self.sizeof_int = int(np.dtype(np.int32).itemsize)
if self.settings['return_counts'] and not self.settings['debug']:
raise RuntimeError(
'Set debug option to True to return neighbor counts.')
# Get kernel and devices.
self.devices, self.context, self.queue = self._get_device(
self.settings['gpuid'])
self.kernel_location = resource_filename(__name__,
'gpuKnnKernelNoIdx.cl')
self.kNN_kernel, self.RS_kernel = self._get_kernels()
[docs] def is_parallel(self):
return True
[docs] def is_analytic_null_estimator(self):
return False
def _get_device(self, gpuid):
"""Return GPU devices, context, and queue."""
all_platforms = cl.get_platforms()
platform = next((p for p in all_platforms if
p.get_devices(device_type=cl.device_type.GPU) != []),
None)
if platform is None:
raise RuntimeError('No OpenCL GPU device found.')
my_gpu_devices = platform.get_devices(device_type=cl.device_type.GPU)
context = cl.Context(devices=my_gpu_devices)
if gpuid > len(my_gpu_devices)-1:
raise RuntimeError(
'No device with gpuid {0} (available device IDs: {1}).'.format(
gpuid, np.arange(len(my_gpu_devices))))
queue = cl.CommandQueue(context, my_gpu_devices[gpuid])
logger.debug("Selected Device: {}".format(my_gpu_devices[gpuid].name))
return my_gpu_devices, context, queue
def _get_kernels(self):
"""Return KNN and range search OpenCL kernels."""
kernel_source = open(self.kernel_location).read()
program = cl.Program(self.context, kernel_source).build()
kNN_kernel = program.kernelKNNshared
kNN_kernel.set_scalar_arg_dtypes([None, None, None, np.int32,
np.int32, np.int32, np.int32,
np.int32, np.int32, None]) # MW: added one int32 argument
RS_kernel = program.kernelBFRSAllshared
RS_kernel.set_scalar_arg_dtypes([None, None, None, None,
np.int32, np.int32, np.int32,
np.int32, np.int32, None]) # MW: added one int32 argument
return (kNN_kernel, RS_kernel)
def _get_max_mem(self):
"""Return max. GPU main memory available for computation."""
if 'max_mem' in self.settings:
return self.settings['max_mem']
elif 'max_mem_frac' in self.settings:
return self.settings['max_mem_frac'] * self.devices[
self.settings['gpuid']].global_mem_size
else:
return 0.9 * self.devices[self.settings['gpuid']].global_mem_size
[docs]class OpenCLKraskovMI(OpenCLKraskov):
"""Calculate mutual information with OpenCL Kraskov implementation.
Calculate the mutual information (MI) between two variables using OpenCL
GPU-code. See parent class for references.
Results are returned in nats.
Args:
settings : dict [optional]
set estimator parameters:
- gpuid : int [optional] - device ID used for estimation (if more
than one device is available on the current platform) (default=0)
- kraskov_k : int [optional] - no. nearest neighbours for KNN
search (default=4)
- normalise : bool [optional] - z-standardise data (default=False)
- theiler_t : int [optional] - no. next temporal neighbours ignored
in KNN and range searches (default=0)
- noise_level : float [optional] - random noise added to the data
(default=1e-8)
- debug : bool [optional] - return intermediate results, i.e.
neighbour counts from range searches and KNN distances
(default=False)
- return_counts : bool [optional] - return intermediate results,
i.e. neighbour counts from range searches and KNN distances
(default=False)
- lag_mi : int [optional] - time difference in samples to calculate
the lagged MI between processes (default=0)
"""
def __init__(self, settings=None):
# Set default estimator settings.
super().__init__(settings)
self.settings.setdefault('lag_mi', 0)
[docs] def estimate(self, var1, var2, n_chunks=1):
"""Estimate mutual information.
Args:
var1 : numpy array
realisations of first variable, either a 2D numpy array where
array dimensions represent [(realisations * n_chunks) x
variable dimension] or a 1D array representing [realisations],
array type should be int32
var2 : numpy array
realisations of the second variable (similar to var1)
n_chunks : int
number of data chunks, no. data points has to be the same for
each chunk
Returns:
float | numpy array
average MI over all samples or local MI for individual
samples if 'local_values'=True
numpy arrays
distances and neighborhood counts for var1 and var2 if
debug=True and return_counts=True
"""
# Prepare data: check if variable realisations are passed as 1D or 2D
# arrays and have equal no. observations.
var1 = self._ensure_two_dim_input(var1)
var2 = self._ensure_two_dim_input(var2)
assert var1.shape[0] == var2.shape[0]
assert var1.shape[0] % n_chunks == 0
# Shift variables to calculate a lagged MI.
if self.settings['lag_mi'] > 0:
var1 = var1[:-self.settings['lag_mi'], :]
var2 = var2[self.settings['lag_mi']:, :]
self._check_number_of_points(var1.shape[0])
signallength = var1.shape[0]
chunklength = signallength // n_chunks
var1dim = var1.shape[1]
var2dim = var2.shape[1]
pointdim = var1dim + var2dim
kraskov_k = self.settings['kraskov_k']
mem_data = self.sizeof_float * chunklength * pointdim
mem_dist = self.sizeof_float * chunklength * kraskov_k
mem_ncnt = 2 * self.sizeof_int * chunklength
mem_chunk = mem_data + mem_dist + mem_ncnt
max_mem = self._get_max_mem()
max_chunks_per_run = np.floor(max_mem/mem_chunk).astype(int)
chunks_per_run = min(max_chunks_per_run, n_chunks)
logger.debug(
'Memory per chunk: {0:.5f} MB, GPU global memory: {1} MB, chunks '
'per run: {2}.'.format(
mem_chunk / C, max_mem / C, chunks_per_run))
if mem_chunk > max_mem:
raise RuntimeError('Size of single chunk exceeds GPU global '
'memory.')
mi_array = np.array([])
if self.settings['debug']:
distances = np.array([])
count_var1 = np.array([])
count_var2 = np.array([])
for r in range(0, n_chunks, chunks_per_run):
startidx = r*chunklength
stopidx = min(r+chunks_per_run, n_chunks)*chunklength
subset1 = var1[startidx:stopidx, :]
subset2 = var2[startidx:stopidx, :]
n_chunks_current_run = subset1.shape[0] // chunklength
results = self._estimate_single_run(subset1, subset2,
n_chunks_current_run)
if self.settings['debug']:
mi_array = np.concatenate((mi_array, results[0]))
distances = np.concatenate((distances, results[1]))
count_var1 = np.concatenate((count_var1, results[2]))
count_var2 = np.concatenate((count_var2, results[3]))
else:
mi_array = np.concatenate((mi_array, results))
if self.settings['return_counts']:
return mi_array, distances, count_var1, count_var2
else:
return mi_array
def _estimate_single_run(self, var1, var2, n_chunks=1):
"""Estimate mutual information in a single GPU run.
This method should not be called directly, only inside estimate()
after memory bounds have been checked.
Args:
var1 : numpy array
realisations of first variable, either a 2D numpy array where
array dimensions represent [(realisations * n_chunks) x
variable dimension] or a 1D array representing [realisations],
array type should be int32
var2 : numpy array
realisations of the second variable (similar to var1)
n_chunks : int
number of data chunks, no. data points has to be the same for
each chunk
Returns:
float | numpy array
average MI over all samples or local MI for individual
samples if 'local_values'=True
"""
# Prepare data and add noise: check if variable realisations are passed
# as 1D or 2D arrays and have equal no. observations.
var1 = self._ensure_two_dim_input(var1)
var2 = self._ensure_two_dim_input(var2)
assert var1.shape[0] == var2.shape[0]
assert var1.shape[0] % n_chunks == 0
self._check_number_of_points(var1.shape[0])
signallength = var1.shape[0]
chunklength = signallength // n_chunks
assert signallength % n_chunks == 0
var1dim = var1.shape[1]
var2dim = var2.shape[1]
pointdim = var1dim + var2dim
# prepare for the padding
signallength_orig = signallength # used for clarity at present
if self.settings['padding']:
# Pad time series to make GPU memory regions a multiple of 1024
# This value of 1024 should be replaced by
# self.devices[self.settings['gpuid']].CL_DEVICE_MEM_BASE_ADDR_ALIGN
# or something similar, as professional cards are known to have
# base adress alignment of 4096 sometimes
pad_target = 4096
pad_size = (int(np.ceil(signallength/pad_target)) * pad_target -
signallength)
pad_var1 = np.vstack(
[var1, 999999 + 0.1 * np.random.rand(pad_size, var1dim)])
pad_var2 = np.vstack(
[var2, 999999 + 0.1 * np.random.rand(pad_size, var2dim)])
pointset = np.hstack((pad_var1, pad_var2)).T.copy()
signallength_padded = signallength + pad_size
else:
pad_size = 0
pointset = np.hstack((var1, var2)).T.copy()
signallength_padded = signallength
if not pointset.dtype == np.float32:
pointset = pointset.astype(np.float32)
if self.settings['noise_level'] > 0:
pointset += np.random.normal(
scale=self.settings['noise_level'],
size=pointset.shape).astype(np.float32)
if self.settings['debug']:
# Print memory requirements after padding
mem_data_pad = (self.sizeof_float *
pointset.shape[0] * pointset.shape[1])
mem_dist = (self.sizeof_float * signallength_padded *
self.settings['kraskov_k'])
mem_ncnt = 2 * self.sizeof_int * signallength_padded
mem_total = mem_data_pad + mem_dist + mem_ncnt
logger.debug(
'Memory req. after padding: {0:.2f} MB ({1} elements, shape: '
'{2}, {3} chunks, chunksize: {4}) -- Padding: {5}'.format(
mem_total / C, pointset.size, pointset.shape,
n_chunks, chunklength, pad_size))
assert (pointset.shape[1] - pad_size) % n_chunks == 0
# Set OpenCL kernel launch parameters
if chunklength < self.devices[
self.settings['gpuid']].max_work_group_size:
workitems_x = 8
elif self.devices[self.settings['gpuid']].max_work_group_size < 256:
workitems_x = self.devices[
self.settings['gpuid']].max_work_group_size
else:
workitems_x = 256
NDRange_x = (workitems_x *
(int((signallength_padded - 1)/workitems_x) + 1))
logger.debug('NDRange_x: {}, workitems_x: {}'.format(
NDRange_x, workitems_x))
# Allocate and copy memory to device
kraskov_k = self.settings['kraskov_k']
d_pointset = cl.Buffer(
self.context,
cl.mem_flags.READ_ONLY | cl.mem_flags.COPY_HOST_PTR,
hostbuf=pointset)
d_var1 = d_pointset.get_sub_region(
0,
self.sizeof_float * signallength_padded * var1dim,
cl.mem_flags.READ_ONLY)
d_var2 = d_pointset.get_sub_region(
self.sizeof_float * signallength_padded * var1dim,
self.sizeof_float * signallength_padded * var2dim,
cl.mem_flags.READ_ONLY)
d_distances = cl.Buffer(
self.context, cl.mem_flags.READ_WRITE,
self.sizeof_float * kraskov_k * signallength_padded)
d_vecradius = d_distances.get_sub_region(
signallength_padded * (kraskov_k - 1) * self.sizeof_float,
signallength_padded * self.sizeof_float)
d_npointsrange_x = cl.Buffer(self.context, cl.mem_flags.READ_WRITE,
self.sizeof_int * signallength_padded)
d_npointsrange_y = cl.Buffer(self.context, cl.mem_flags.READ_WRITE,
self.sizeof_int * signallength_padded)
# Neighbour search
theiler_t = np.int32(self.settings['theiler_t'])
localmem = cl.LocalMemory(self.sizeof_float * kraskov_k * workitems_x)
self.kNN_kernel(self.queue, (NDRange_x,), (workitems_x,), d_pointset,
d_pointset, d_distances, np.int32(pointdim),
np.int32(chunklength), np.int32(signallength_padded),
np.int32(signallength_orig),
np.int32(kraskov_k), theiler_t, localmem)
distances = np.zeros(signallength_padded * kraskov_k, dtype=np.float32)
try:
cl.enqueue_copy(self.queue, distances, d_distances)
except cl._cl.RuntimeError as e:
print(e)
# Print memory requirements after padding
mem_data_pad = (self.sizeof_float *
pointset.shape[0] * pointset.shape[1])
mem_dist = (self.sizeof_float * signallength_padded *
self.settings['kraskov_k'])
mem_ncnt = 2 * self.sizeof_int * signallength_padded
mem_total = mem_data_pad + mem_dist + mem_ncnt
print(
'Memory req. after padding: {0:.2f} MB ({1} elements, shape: '
'{2}, {3} chunks, chunksize: {4}) -- Padding: {5}'.format(
mem_total / C, pointset.size, pointset.shape,
n_chunks, chunklength, pad_size))
assert (pointset.shape[1] - pad_size) % n_chunks == 0
sys.exit(1)
self.queue.finish()
# Range search in var1
localmem = cl.LocalMemory(self.sizeof_int * workitems_x)
self.RS_kernel(
self.queue, (NDRange_x,), (workitems_x,), d_var1,
d_var1, d_vecradius, d_npointsrange_x,
var1dim, chunklength, signallength_padded, signallength_orig,
theiler_t, localmem) # MW: added signallength_orig
count_var1 = np.zeros(signallength_padded, dtype=np.int32)
cl.enqueue_copy(self.queue, count_var1, d_npointsrange_x)
# Range search in var2
self.RS_kernel(
self.queue, (NDRange_x,), (workitems_x,), d_var2,
d_var2, d_vecradius, d_npointsrange_y,
var2dim, chunklength, signallength_padded, signallength_orig,
theiler_t, localmem) # MW: added signallength_orig
count_var2 = np.zeros(signallength_padded, dtype=np.int32)
cl.enqueue_copy(self.queue, count_var2, d_npointsrange_y)
d_pointset.release()
d_distances.release()
d_npointsrange_x.release()
d_npointsrange_y.release()
d_var1.release()
d_var2.release()
d_vecradius.release()
# Calculate and sum digammas
if self.settings['local_values']:
mi_array = -np.inf * np.ones(chunklength * n_chunks,
dtype=np.float64)
idx = 0
for c in range(n_chunks):
mi = (digamma(kraskov_k) + digamma(chunklength) -
digamma(count_var1[c*chunklength:(c+1)*chunklength]+1) -
digamma(count_var2[c*chunklength:(c+1)*chunklength]+1))
mi_array[idx:idx+chunklength] = mi
idx += chunklength
else:
mi_array = -np.inf * np.ones(n_chunks, dtype=np.float64)
for c in range(n_chunks):
mi = (digamma(kraskov_k) + digamma(chunklength) - np.mean(
digamma(count_var1[c*chunklength:(c+1)*chunklength]+1) +
digamma(count_var2[c*chunklength:(c+1)*chunklength]+1)))
mi_array[c] = mi
assert signallength_orig == (c+1)*chunklength, 'Original signal length does not match no. processed points.'
if self.settings['debug']:
return (mi_array,
distances[:signallength_orig],
count_var1[:signallength_orig],
count_var2[:signallength_orig])
else:
return mi_array
[docs]class OpenCLKraskovCMI(OpenCLKraskov):
"""Calculate conditional mutual inform with OpenCL Kraskov implementation.
Calculate the conditional mutual information (CMI) between three variables
using OpenCL GPU-code. If no conditional is given (is None), the function
returns the mutual information between var1 and var2. See parent class for
references.
Results are returned in nats.
Args:
settings : dict [optional]
set estimator parameters:
- gpuid : int [optional] - device ID used for estimation (if more
than one device is available on the current platform) (default=0)
- kraskov_k : int [optional] - no. nearest neighbours for KNN
search (default=4)
- normalise : bool [optional] - z-standardise data (default=False)
- theiler_t : int [optional] - no. next temporal neighbours ignored
in KNN and range searches (default=0)
- noise_level : float [optional] - random noise added to the data
(default=1e-8)
- debug : bool [optional] - return intermediate results, i.e.
neighbour counts from range searches and KNN distances
(default=False)
- return_counts : bool [optional] - return intermediate results,
i.e. neighbour counts from range searches and KNN distances
(default=False)
"""
def __init__(self, settings=None):
super().__init__(settings)
[docs] def estimate(self, var1, var2, conditional=None, n_chunks=1):
"""Estimate conditional mutual information.
If conditional is None, the mutual information between var1 and var2 is
calculated.
Args:
var1 : numpy array
realisations of first variable, either a 2D numpy array where
array dimensions represent [(realisations * n_chunks) x
variable dimension] or a 1D array representing [realisations],
array type should be int32
var2 : numpy array
realisations of the second variable (similar to var1)
conditional : numpy array
realisations of conditioning variable (similar to var1)
n_chunks : int
number of data chunks, no. data points has to be the same for
each chunk
Returns:
float | numpy array
average CMI over all samples or local CMI for individual
samples if 'local_values'=True
numpy arrays
distances and neighborhood counts for var1 and var2 if
debug=True and return_counts=True
"""
# Return MI if no conditional is provided
if conditional is None:
est_mi = OpenCLKraskovMI(self.settings)
return est_mi.estimate(var1, var2, n_chunks)
# Prepare data: check if variable realisations are passed as 1D or 2D
# arrays and have equal no. observations.
var1 = self._ensure_two_dim_input(var1)
var2 = self._ensure_two_dim_input(var2)
conditional = self._ensure_two_dim_input(conditional)
assert var1.shape[0] == var2.shape[0]
assert var1.shape[0] == conditional.shape[0]
assert var1.shape[0] % n_chunks == 0
self._check_number_of_points(var1.shape[0])
signallength = var1.shape[0]
chunklength = signallength // n_chunks
var1dim = var1.shape[1]
var2dim = var2.shape[1]
conddim = conditional.shape[1]
pointdim = var1dim + var2dim + conddim
kraskov_k = self.settings['kraskov_k']
mem_data = self.sizeof_float * chunklength * pointdim
mem_dist = self.sizeof_float * chunklength * kraskov_k
mem_ncnt = 2 * self.sizeof_int * chunklength
mem_chunk = mem_data + mem_dist + mem_ncnt
max_mem = self._get_max_mem()
max_chunks_per_run = np.floor(max_mem/mem_chunk).astype(int)
chunks_per_run = min(max_chunks_per_run, n_chunks)
logger.debug(
'Memory per chunk: {0:.5f} MB, GPU global memory: {1} MB, chunks '
'per run: {2}.'.format(
mem_chunk / C, max_mem / C, chunks_per_run))
if mem_chunk > max_mem:
raise RuntimeError('Size of single chunk exceeds GPU global '
'memory.')
cmi_array = np.array([])
if self.settings['debug']:
distances = np.array([])
count_var1 = np.array([])
count_var2 = np.array([])
count_cond = np.array([])
for r in range(0, n_chunks, chunks_per_run):
startidx = r*chunklength
stopidx = min(r+chunks_per_run, n_chunks)*chunklength
subset1 = var1[startidx:stopidx, :]
subset2 = var2[startidx:stopidx, :]
subset3 = conditional[startidx:stopidx, :]
n_chunks_current_run = subset1.shape[0] // chunklength
results = self._estimate_single_run(subset1, subset2, subset3,
n_chunks_current_run)
if self.settings['debug']:
cmi_array = np.concatenate((cmi_array, results[0]))
distances = np.concatenate((distances, results[1]))
count_var1 = np.concatenate((count_var1, results[2]))
count_var2 = np.concatenate((count_var2, results[3]))
count_cond = np.concatenate((count_cond, results[4]))
else:
cmi_array = np.concatenate((cmi_array, results))
if self.settings['return_counts']:
return cmi_array, distances, count_var1, count_var2, count_cond
else:
return cmi_array
def _estimate_single_run(self, var1, var2, conditional=None, n_chunks=1):
"""Estimate conditional mutual information in a single GPU run.
This method should not be called directly, only inside estimate()
after memory bounds have been checked.
If conditional is None, the mutual information between var1 and var2 is
calculated.
Args:
var1 : numpy array
realisations of first variable, either a 2D numpy array where
array dimensions represent [(realisations * n_chunks) x
variable dimension] or a 1D array representing [realisations],
array type should be int32
var2 : numpy array
realisations of the second variable (similar to var1)
conditional : numpy array
realisations of conditioning variable (similar to var1)
n_chunks : int
number of data chunks, no. data points has to be the same for
each chunk
Returns:
float | numpy array
average CMI over all samples or local CMI for individual
samples if 'local_values'=True
"""
# Return MI if no conditional is provided
if conditional is None:
est_mi = OpenCLKraskovMI(self.settings)
return est_mi.estimate(var1, var2, n_chunks)
# Prepare data and add noise: check if variable realisations are passed
# as 1D or 2D arrays and have equal no. observations.
var1 = self._ensure_two_dim_input(var1)
var2 = self._ensure_two_dim_input(var2)
conditional = self._ensure_two_dim_input(conditional)
assert var1.shape[0] == var2.shape[0]
assert var1.shape[0] == conditional.shape[0]
assert var1.shape[0] % n_chunks == 0
self._check_number_of_points(var1.shape[0])
signallength = var1.shape[0]
chunklength = signallength // n_chunks
var1dim = var1.shape[1]
var2dim = var2.shape[1]
conddim = conditional.shape[1]
pointdim = var1dim + var2dim + conddim
# prepare padding
signallength_orig = signallength
if self.settings['padding']:
# Pad time series to make GPU memory regions a multiple of 4096
# 4096 is the largestknown value for opencl subbuffer alignment targets
# but see comment in MI estimator above
pad_target = 4096
pad_size = (int(np.ceil(signallength/pad_target)) * pad_target -
signallength)
pad_var1 = np.vstack(
[var1, 999999 + 0.1 * np.random.rand(pad_size, var1dim)])
pad_var2 = np.vstack(
[var2, 999999 + 0.1 * np.random.rand(pad_size, var2dim)])
pad_conditional = np.vstack(
[conditional, 999999 + 0.1 * np.random.rand(pad_size, conddim)])
pointset = np.hstack((pad_var1, pad_conditional, pad_var2)).T.copy()
signallength_padded = signallength + pad_size
else:
pad_size = 0
pointset = np.hstack((var1, conditional, var2)).T.copy()
signallength_padded = signallength
if not pointset.dtype == np.float32:
pointset = pointset.astype(np.float32)
if self.settings['noise_level'] > 0:
pointset += np.random.normal(
scale=self.settings['noise_level'],
size=pointset.shape).astype(np.float32)
if self.settings['debug']:
# Print memory requirements after padding
mem_data_pad = (self.sizeof_float *
pointset.shape[0] * pointset.shape[1])
mem_dist = (self.sizeof_float * signallength_padded *
self.settings['kraskov_k'])
mem_ncnt = 2 * self.sizeof_int * signallength_padded
mem_total = mem_data_pad + mem_dist + mem_ncnt
logger.debug(
'Memory req. after padding: {0:.2f} MB ({1} elements) -- Padding: {2}.'.format(
mem_total / C, pointset.size, pad_size))
# Set OpenCL kernel launch parameters
if chunklength < self.devices[
self.settings['gpuid']].max_work_group_size:
workitems_x = 8
elif self.devices[self.settings['gpuid']].max_work_group_size < 256:
workitems_x = self.devices[
self.settings['gpuid']].max_work_group_size
else:
workitems_x = 256
NDRange_x = (workitems_x *
(int((signallength_padded - 1)/workitems_x) + 1))
# Allocate and copy memory to device
kraskov_k = self.settings['kraskov_k']
d_pointset = cl.Buffer(
self.context,
cl.mem_flags.READ_ONLY | cl.mem_flags.COPY_HOST_PTR,
hostbuf=pointset)
d_src = d_pointset.get_sub_region(
0,
self.sizeof_float * signallength_padded * var1dim,
cl.mem_flags.READ_ONLY)
d_cnd = d_pointset.get_sub_region(
self.sizeof_float * signallength_padded * var1dim,
self.sizeof_float * signallength_padded * conddim,
cl.mem_flags.READ_ONLY)
d_distances = cl.Buffer(
self.context, cl.mem_flags.READ_WRITE,
self.sizeof_float * kraskov_k * signallength_padded)
d_vecradius = d_distances.get_sub_region(
signallength_padded * (kraskov_k - 1) * self.sizeof_float,
signallength_padded * self.sizeof_float)
d_npointsrange_x = cl.Buffer(self.context,
cl.mem_flags.READ_WRITE,
self.sizeof_int * signallength_padded)
d_npointsrange_y = cl.Buffer(self.context, cl.mem_flags.READ_WRITE,
self.sizeof_int * signallength_padded)
d_npointsrange_z = cl.Buffer(self.context, cl.mem_flags.READ_WRITE,
self.sizeof_int * signallength_padded)
# Neighbour search in full space
theiler_t = np.int32(self.settings['theiler_t'])
localmem = cl.LocalMemory(self.sizeof_float * kraskov_k * workitems_x)
self.kNN_kernel(self.queue, (NDRange_x,), (workitems_x,), d_pointset,
d_pointset, d_distances, np.int32(pointdim),
np.int32(chunklength), np.int32(signallength_padded),
np.int32(signallength_orig),
np.int32(kraskov_k),
theiler_t, localmem) # MW: added signallength_orig
distances = np.zeros(signallength_padded * kraskov_k, dtype=np.float32)
cl.enqueue_copy(self.queue, distances, d_distances)
self.queue.finish()
# Range search in source and conditional
localmem = cl.LocalMemory(self.sizeof_int * workitems_x)
self.RS_kernel(self.queue, (NDRange_x,), (workitems_x,), d_src, d_src,
d_vecradius, d_npointsrange_x, var1dim + conddim,
chunklength, signallength_padded, signallength_orig,
theiler_t, localmem) # MW: added signallength_orig
count_src = np.zeros(signallength_padded, dtype=np.int32)
cl.enqueue_copy(self.queue, count_src, d_npointsrange_x)
# Range search in target and conditional
self.RS_kernel(self.queue, (NDRange_x,), (workitems_x,), d_cnd, d_cnd,
d_vecradius, d_npointsrange_y, var2dim + conddim,
chunklength, signallength_padded, signallength_orig,
theiler_t, localmem) # MW: added signallength_orig
count_tgt = np.zeros(signallength_padded, dtype=np.int32)
cl.enqueue_copy(self.queue, count_tgt, d_npointsrange_y)
# Range search in conditional
self.RS_kernel(self.queue, (NDRange_x,), (workitems_x,), d_cnd, d_cnd,
d_vecradius, d_npointsrange_z, conddim, chunklength,
signallength_padded, signallength_orig,
theiler_t, localmem) # MW: added signallength_orig
count_cnd = np.zeros(signallength_padded, dtype=np.int32)
cl.enqueue_copy(self.queue, count_cnd, d_npointsrange_z)
d_pointset.release()
d_distances.release()
d_npointsrange_x.release()
d_npointsrange_y.release()
d_npointsrange_z.release()
d_src.release()
d_cnd.release()
d_vecradius.release()
# Calculate and sum digammas
if self.settings['local_values']:
cmi_array = -np.inf * np.ones(n_chunks * chunklength,
dtype=np.float64)
idx = 0
for c in range(n_chunks):
cmi = (digamma(kraskov_k) +
digamma(count_cnd[c*chunklength:(c+1)*chunklength]+1) -
digamma(count_src[c*chunklength:(c+1)*chunklength]+1) -
digamma(count_tgt[c*chunklength:(c+1)*chunklength]+1))
cmi_array[idx:idx+chunklength] = cmi
idx += chunklength
else:
cmi_array = -np.inf * np.ones(n_chunks, dtype=np.float64)
for c in range(n_chunks):
cmi = (digamma(kraskov_k) + np.mean(
digamma(count_cnd[c*chunklength:(c+1)*chunklength]+1) -
digamma(count_src[c*chunklength:(c+1)*chunklength]+1) -
digamma(count_tgt[c*chunklength:(c+1)*chunklength]+1)))
cmi_array[c] = cmi
assert signallength_orig == (c+1)*chunklength, 'Original signal length does not match no. processed points.'
if self.settings['debug']:
return (cmi_array,
distances[:signallength_orig],
count_src[:signallength_orig],
count_tgt[:signallength_orig],
count_cnd[:signallength_orig])
else:
return cmi_array