Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ _templates
*.pyc
.cache/

# venv
*_venv

# pip package metadata
*egg-info/

Expand Down
22 changes: 21 additions & 1 deletion dataset/table.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from enum import Enum
import logging
import io

import pandas as pd

Expand Down Expand Up @@ -74,7 +75,26 @@ def __init__(self, name, src, na_values=None, exclude_attr_cols=['_tid_'],

def store_to_db(self, db_conn, if_exists='replace', index=False, index_label=None):
# TODO: This version supports single session, single worker.
self.df.to_sql(self.name, db_conn, if_exists=if_exists, index=index, index_label=index_label)
# self.df.to_sql(self.name, db_conn, if_exists=if_exists, index=index, index_label=index_label)
print("started")
sep = ","
quotechar = "\""

# Create Table
self.df[:0].to_sql(self.name, db_conn, if_exists=if_exists, index=index, index_label=index_label)

# Prepare data
output = io.StringIO()
self.df.to_csv(output, sep=sep, quotechar=quotechar, header=False, index=index)
output.seek(0)

# Insert data
connection = db_conn.raw_connection()
cursor = connection.cursor()
cursor.copy_expert("copy {} from stdin (format csv)".format(self.name), output)
cursor.connection.commit()
cursor.close()
connection.close()

def get_attributes(self):
"""
Expand Down
82 changes: 55 additions & 27 deletions domain/correlations.py
Original file line number Diff line number Diff line change
@@ -1,51 +1,79 @@
from pyitlib import discrete_random_variable as drv

import time, logging
from utils import NULL_REPR

def compute_norm_cond_entropy_corr(data_df, attrs_from, attrs_to):
"""
Computes the correlations between attributes by calculating
the normalized conditional entropy between them. The conditional
entropy is asymmetric, therefore we need pairwise computation.
from multiprocessing import Pool
from functools import partial
from tqdm import tqdm

The computed correlations are stored in a dictionary in the format:
{
attr_a: { cond_attr_i: corr_strength_a_i,
cond_attr_j: corr_strength_a_j, ... },
attr_b: { cond_attr_i: corr_strength_b_i, ...}
}

:return a dictionary of correlations
"""
corr = {}
# Compute pair-wise conditional entropy.
for x in attrs_from:
corr[x] = {}
for y in attrs_to:
def _compute_norm_cond_entropy_corr(x, attrs, df):
try:
corr = {}
for y in attrs:
# Set correlation to 1 for same attributes.
if x == y:
corr[x][y] = 1.0
corr[y] = 1.0
continue

xy_df = data_df[[x, y]]
xy_df = df[[x, y]]
xy_df = xy_df.loc[~(xy_df[x] == NULL_REPR) & ~(xy_df[y] == NULL_REPR)]
x_vals = xy_df[x]
x_domain_size = x_vals.nunique()

# Set correlation to 0.0 if entropy of x is 1 (only one possible value).
if x_domain_size == 1 or len(xy_df) == 0:
corr[x][y] = 0.0
corr[y] = 0.0
continue

# Compute the conditional entropy H(x|y) = H(x,y) - H(y).
# H(x,y) denotes H(x U y).
# If H(x|y) = 0, then y determines x, i.e., y -> x.
# Use the domain size of x as a log base for normalization.
y_vals = xy_df[y]

x_y_entropy = drv.entropy_conditional(x_vals, y_vals, base=x_domain_size).item()
x_y_entropy = drv.entropy_conditional(x_vals, y_vals, base=x_domain_size)

# The conditional entropy is 0 for strongly correlated attributes and 1 for
# completely independent attributes. We reverse this to reflect the correlation.
corr[x][y] = 1.0 - x_y_entropy
# corrs.append((x, y, 1.0 - x_y_entropy))
corr[y] = 1.0 - x_y_entropy
return (x,corr)
except:
logging.debug('Failed _compute_norm_cond_entropy_corr process: %s: %s' % (x, traceback.format_exc()))
return None

def compute_norm_cond_entropy_corr(data_df, attrs_from, attrs_to, threads = 1):
"""
Computes the correlations between attributes by calculating
the normalized conditional entropy between them. The conditional
entropy is asymmetric, therefore we need pairwise computation.

The computed correlations are stored in a dictionary in the format:
{
attr_a: { cond_attr_i: corr_strength_a_i,
cond_attr_j: corr_strength_a_j, ... },
attr_b: { cond_attr_i: corr_strength_b_i, ...}
}

:return a dictionary of correlations
"""

tic = time.time()
# only create a pool if processes > 1
pool = Pool(threads) if threads > 1 else None
corr = {}

# Compute pair-wise conditional entropy.
f = partial(_compute_norm_cond_entropy_corr, attrs=attrs_to, df=data_df)
if pool is None:
for row in list(map(f,attrs_from)):
corr[row[0]] = row[1]
else:
for row in tqdm(pool.imap_unordered(f, attrs_from)):
corr[row[0]] = row[1]

if pool is not None:
pool.terminate()

toc = time.time()
logging.debug('Time taken in _compute_norm_cond_entropy_corr process: %.2f secs', toc-tic)
return corr
2 changes: 1 addition & 1 deletion domain/domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def compute_correlations(self):
else self.ds.get_raw_data()
self.correlations = compute_norm_cond_entropy_corr(data_df,
self.ds.get_attributes(),
self.ds.get_attributes())
self.ds.get_attributes(), self.env['threads'])
corrs_df = pd.DataFrame.from_dict(self.correlations, orient='columns')
corrs_df.index.name = 'cond_attr'
corrs_df.columns.name = 'attr'
Expand Down