Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ _templates
.DS_store
.venv
*.swp
*.swn
*.pyc

# pip package metadata
Expand Down
215 changes: 167 additions & 48 deletions dataset/dataset.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import logging
import time
from enum import Enum
import logging
import os
import time
import pandas as pd
import time

from .dbengine import DBengine
from .table import Table, Source
Expand Down Expand Up @@ -49,9 +48,7 @@ def __init__(self, name, env):
self.raw_data = None
self.repaired_data = None
self.constraints = None
self.aux_table = {}
for tab in AuxTables:
self.aux_table[tab] = None
self.aux_tables = {}
# start dbengine
self.engine = DBengine(
env['db_user'],
Expand Down Expand Up @@ -119,7 +116,6 @@ def load_data(self, name, fpath, na_values=None, entity_col=None, src_col=None):
df.fillna('_nan_', inplace=True)

# Call to store to database

self.raw_data.store_to_db(self.engine.engine)
status = 'DONE Loading {fname}'.format(fname=os.path.basename(fpath))

Expand All @@ -143,6 +139,25 @@ def load_data(self, name, fpath, na_values=None, entity_col=None, src_col=None):
def set_constraints(self, constraints):
self.constraints = constraints

def aux_table_exists(self, aux_table):
"""
get_aux_table returns True if :param aux_table: has been generated.

:param aux_table: (AuxTables(Enum)) auxiliary table to check
"""
return aux_table in self.aux_tables

def get_aux_table(self, aux_table):
"""
get_aux_table returns the Table associated with :param aux_table:.

:param aux_table: (AuxTables(Enum)) auxiliary table to retrieve
"""
if not self.aux_table_exists(aux_table):
raise Exception("{} auxiliary table has not been generated".format(aux_table))
return self.aux_tables[aux_table]


def generate_aux_table(self, aux_table, df, store=False, index_attrs=False):
"""
generate_aux_table writes/overwrites the auxiliary table specified by
Expand All @@ -160,13 +175,13 @@ def generate_aux_table(self, aux_table, df, store=False, index_attrs=False):
also creates indexes on Postgres table.
"""
try:
self.aux_table[aux_table] = Table(aux_table.name, Source.DF, df=df)
self.aux_tables[aux_table] = Table(aux_table.name, Source.DF, df=df)
if store:
self.aux_table[aux_table].store_to_db(self.engine.engine)
self.aux_tables[aux_table].store_to_db(self.engine.engine)
if index_attrs:
self.aux_table[aux_table].create_df_index(index_attrs)
self.aux_tables[aux_table].create_df_index(index_attrs)
if store and index_attrs:
self.aux_table[aux_table].create_db_index(self.engine, index_attrs)
self.aux_tables[aux_table].create_db_index(self.engine, index_attrs)
except Exception:
logging.error('generating aux_table %s', aux_table.name)
raise
Expand All @@ -177,10 +192,10 @@ def generate_aux_table_sql(self, aux_table, query, index_attrs=False):
:param query: (str) SQL query whose result is used for generating the auxiliary table.
"""
try:
self.aux_table[aux_table] = Table(aux_table.name, Source.SQL, table_query=query, db_engine=self.engine)
self.aux_tables[aux_table] = Table(aux_table.name, Source.SQL, table_query=query, db_engine=self.engine)
if index_attrs:
self.aux_table[aux_table].create_df_index(index_attrs)
self.aux_table[aux_table].create_db_index(self.engine, index_attrs)
self.aux_tables[aux_table].create_df_index(index_attrs)
self.aux_tables[aux_table].create_db_index(self.engine, index_attrs)
except Exception:
logging.error('generating aux_table %s', aux_table.name)
raise
Expand Down Expand Up @@ -225,14 +240,14 @@ def get_statistics(self):
<count>: frequency (# of entities) where attr1: val1 AND attr2: val2
"""
if not self.stats_ready:
self.collect_stats()
self.collect_init_stats()
stats = (self.total_tuples, self.single_attr_stats, self.pair_attr_stats)
self.stats_ready = True
return stats

def collect_stats(self):
def collect_init_stats(self):
"""
collect_stats memoizes:
collect_init_stats calculates and memoizes: (based on RAW/INITIAL data)
1. self.single_attr_stats ({ attribute -> { value -> count } })
the frequency (# of entities) of a given attribute-value
2. self.pair_attr_stats ({ attr1 -> { attr2 -> {val1 -> {val2 -> count } } } })
Expand All @@ -243,35 +258,112 @@ def collect_stats(self):
Also known as co-occurrence count.
"""

self.total_tuples = self.get_raw_data().shape[0]
self.total_tuples = self.get_raw_data()['_tid_'].nunique()
# Single attribute-value frequency
for attr in self.get_attributes():
self.single_attr_stats[attr] = self._get_init_stats_single(attr)
# Co-occurence frequency
for first_attr in self.get_attributes():
self.pair_attr_stats[first_attr] = {}
for second_attr in self.get_attributes():
if second_attr != first_attr:
self.pair_attr_stats[first_attr][second_attr] = self._get_init_stats_pair(first_attr,second_attr)

def collect_current_stats(self, attr):
"""
collect_current_stats calculates and memoizes frequency and co-occurence
statistics based on the CURRENT values/data.

See collect_init_stats for which member variables are memoized/overwritten.
Does NOT overwrite self.total_tuples.
"""
# Single attribute-value frequency
for attr in self.get_attributes():
self.single_attr_stats[attr] = self.get_stats_single(attr)
self.single_attr_stats[attr] = self._get_current_stats_single(attr)
# Co-occurence frequency
for cond_attr in self.get_attributes():
self.pair_attr_stats[cond_attr] = {}
for trg_attr in self.get_attributes():
if trg_attr != cond_attr:
self.pair_attr_stats[cond_attr][trg_attr] = self.get_stats_pair(cond_attr,trg_attr)
for first_attr in self.get_attributes():
self.pair_attr_stats[first_attr] = {}
for second_attr in self.get_attributes():
if second_attr != first_attr:
self.pair_attr_stats[first_attr][second_attr] = self._get_current_stats_pair(first_attr,second_attr)

def get_stats_single(self, attr):
def _get_init_stats_single(self, attr):
"""
_get_init_stats_single returns a dictionary where the keys possible
values for :param attr: and the values contain the frequency count in
the RAW/INITIAL data of that value for this attribute.
"""
# We need to iterate through this in a for loop instead of groupby & size
# since our values may be '|||' separated
freq_count = {}
for (cell,) in self.get_raw_data()[[attr]].itertuples(index=False):
vals = cell.split('|||')
for val in vals:
# Correct for if there are multiple values: equal weight all
# values and their contribution to counts
freq_count[val] = freq_count.get(val, 0) + 1. / len(vals)
return freq_count

def _get_current_stats_single(self, attr):
"""
Returns a dictionary where the keys possible values for :param attr: and
the values contain the frequency count of that value for this attribute.
_get_current_stats_single a dictionary where the keys possible values
for :param attr: and the values contain the frequency count in the
CURRENT data of that value for this attribute.
"""
# Retrieve statistics on current value from cell_domain
df_domain = self.get_aux_table(AuxTables.cell_domain).df
df_count = df_domain.loc[df_domain['attribute'] == attr, 'current_value'].value_counts()
# We do not store attributes with only NULL values in cell_domain:
# we require _nan_ in our single stats however
if df_count.empty:
return {'_nan_': self.total_tuples}
return df_count.to_dict()

def _get_init_stats_pair(self, first_attr, second_attr):
"""
_get_init_stats_pair returns a dictionary {first_val -> {second_val ->
count } } where (based on RAW/INITIAL dataset):
<first_val>: all possible values for first_attr
<second_val>: all values for second_attr that appeared at least once with <first_val>
<count>: frequency (# of entities) where first_attr: <first_val> AND second_attr: <second_val>
"""
# need to decode values into unicode strings since we do lookups via
# unicode strings from Postgres
return self.get_raw_data()[[attr]].groupby([attr]).size().to_dict()

def get_stats_pair(self, first_attr, second_attr):
# We need to iterate through this in a for loop instead of groupby & size
# since our values may be '|||' separated
cooccur_freq_count = {}
for cell1, cell2 in self.get_raw_data()[[first_attr,second_attr]].itertuples(index=False):
vals1 = cell1.split('|||')
vals2 = cell2.split('|||')
for val1 in vals1:
cooccur_freq_count[val1] = cooccur_freq_count.get(val1, {})
for val2 in vals2:
# Correct for if there are multiple values: equal weight all
# co-pairs and their contribution to co-occur counts
cooccur_freq_count[val1][val2] = cooccur_freq_count[val1].get(val2, 0) + 1. / (len(vals1) * len(vals2))
return cooccur_freq_count

def _get_current_stats_pair(self, first_attr, second_attr):
"""
Returns a dictionary {first_val -> {second_val -> count } } where:
_get_current_stats_pair returns a dictionary {first_val -> {second_val ->
count } } where (based on CURRENT dataset):
<first_val>: all possible values for first_attr
<second_val>: all values for second_attr that appeared at least once with <first_val>
<count>: frequency (# of entities) where first_attr: <first_val> AND second_attr: <second_val>
"""
tmp_df = self.get_raw_data()[[first_attr,second_attr]].groupby([first_attr,second_attr]).size().reset_index(name="count")
return _dictify(tmp_df)
# Retrieve pairwise statistics on current value from cell_domain
df_domain = self.get_aux_table(AuxTables.cell_domain).df
# Filter cell_domain for only the attributes we care about
df_domain = df_domain[df_domain['attribute'].isin([first_attr, second_attr])]
# Convert to wide form so we have our :param first_attr:
# and :second_attr: as columns along with the _tid_ column
df_domain = df_domain[['_tid_', 'attribute', 'current_value']].pivot(index='_tid_', columns='attribute', values='current_value')
# We do not store cells for attributes consisting of only NULL values in cell_domain.
# We require this for pair stats though.
if first_attr not in df_domain.columns:
df_domain[first_attr] = '_nan_'
if second_attr not in df_domain.columns:
df_domain[second_attr] = '_nan_'
return _dictify(df_domain.groupby([first_attr, second_attr]).size().reset_index(name="count"))

def get_domain_info(self):
"""
Expand All @@ -284,30 +376,39 @@ def get_domain_info(self):
classes = int(res[0][1])
return total_vars, classes

def get_inferred_values(self):
def generate_inferred_values(self):
tic = time.clock()
query = "SELECT t1._tid_, t1.attribute, domain[inferred_assignment + 1] as rv_value " \
"FROM " \
"(SELECT _tid_, attribute, " \
"_vid_, init_value, string_to_array(regexp_replace(domain, \'[{\"\"}]\', \'\', \'gi\'), \'|||\') as domain " \
"FROM %s) as t1, %s as t2 " \
"WHERE t1._vid_ = t2._vid_"%(AuxTables.cell_domain.name, AuxTables.inf_values_idx.name)
query = """
SELECT t1._tid_,
t1.attribute,
domain[inferred_assignment + 1] AS rv_value
FROM (
SELECT _tid_,
attribute,
_vid_,
current_value,
string_to_array(regexp_replace(domain, \'[{{\"\"}}]\', \'\', \'gi\'), \'|||\') AS domain
FROM {cell_domain}) AS t1,
{inf_values_idx} AS t2
WHERE t1._vid_ = t2._vid_
""".format(cell_domain=AuxTables.cell_domain.name,
inf_values_idx=AuxTables.inf_values_idx.name)
self.generate_aux_table_sql(AuxTables.inf_values_dom, query, index_attrs=['_tid_'])
self.aux_table[AuxTables.inf_values_dom].create_db_index(self.engine, ['attribute'])
self.get_aux_table(AuxTables.inf_values_dom).create_db_index(self.engine, ['attribute'])
status = "DONE collecting the inferred values."
toc = time.clock()
total_time = toc - tic
return status, total_time

def get_repaired_dataset(self):
def generate_repaired_dataset(self):
tic = time.clock()
init_records = self.raw_data.df.sort_values(['_tid_']).to_records(index=False)
t = self.aux_table[AuxTables.inf_values_dom]
records = self.raw_data.df.sort_values(['_tid_']).to_records(index=False)
t = self.aux_tables[AuxTables.inf_values_dom]
repaired_vals = _dictify(t.df.reset_index())
for tid in repaired_vals:
for attr in repaired_vals[tid]:
init_records[tid][attr] = repaired_vals[tid][attr]
repaired_df = pd.DataFrame.from_records(init_records)
records[tid][attr] = repaired_vals[tid][attr]
repaired_df = pd.DataFrame.from_records(records)
name = self.raw_data.name+'_repaired'
self.repaired_data = Table(name, Source.DF, df=repaired_df)
self.repaired_data.store_to_db(self.engine.engine)
Expand All @@ -316,4 +417,22 @@ def get_repaired_dataset(self):
total_time = toc - tic
return status, total_time


def update_current_values(self):
"""
update_current_values takes the inferred values from inf_values_dom
(auxiliary table) and updates them in the current_value column in
cell_domain (auxiliary table).
"""
df_inferred = self.get_aux_table(AuxTables.inf_values_dom).df
df_cell_domain = self.get_aux_table(AuxTables.cell_domain).df

# Update current_value column with rv_values for inferred TIDs
df_updated = df_cell_domain.reset_index().merge(df_inferred, on=['_tid_', 'attribute'], how='left')
update_filter = df_updated['rv_value'].notnull()
df_updated.loc[update_filter, 'current_value'] = df_updated.loc[update_filter, 'rv_value']
df_updated.drop('rv_value', axis=1, inplace=True)

self.generate_aux_table(AuxTables.cell_domain, df_updated, store=True, index_attrs=['_vid_'])
# self.ds.get_aux_table(AuxTables.cell_domain).create_db_index(self.ds.engine, ['_tid_'])
# self.ds.get_aux_table(AuxTables.cell_domain).create_db_index(self.ds.engine, ['_cid_'])
logging.info('DONE updating current values with inferred values')
2 changes: 1 addition & 1 deletion dataset/table.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from enum import Enum
import os
import pandas as pd
from enum import Enum

class Source(Enum):
FILE = 1
Expand Down
7 changes: 4 additions & 3 deletions detect/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .detect import DetectEngine
from .detector import Detector
from .nulldetector import NullDetector
from .violationdetector import ViolationDetector
from .multi_init_detector import MultiInitDetector
from .null_detector import NullDetector
from .violation_detector import ViolationDetector

__all__ = ['DetectEngine', 'Detector', 'NullDetector', 'ViolationDetector']
__all__ = ['DetectEngine', 'Detector', 'MultiInitDetector', 'NullDetector', 'ViolationDetector']
2 changes: 1 addition & 1 deletion detect/detect.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ def store_detected_errors(self, errors_df):
if errors_df.empty:
raise Exception("ERROR: Detected errors dataframe is empty.")
self.ds.generate_aux_table(AuxTables.dk_cells, errors_df, store=True)
self.ds.aux_table[AuxTables.dk_cells].create_db_index(self.ds.engine, ['_cid_'])
self.ds.get_aux_table(AuxTables.dk_cells).create_db_index(self.ds.engine, ['_cid_'])

33 changes: 33 additions & 0 deletions detect/multi_init_detector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import pandas as pd
from .detector import Detector

class MultiInitDetector(Detector):
"""
MultiInitDetector detects every cell with multiple initial values as errors.
"""
def __init__(self, name='MultiInitDetector'):
super(MultiInitDetector, self).__init__(name)

def setup(self, dataset, env):
self.ds = dataset
self.env = env
self.df = self.ds.get_raw_data()

def detect_noisy_cells(self):
"""
detech_noisy_cells returns a pandas.DataFrame containing all cells with
multiple '|||' separated values.

:return: pandas.DataFrame with columns:
_tid_: entity ID
attribute: attribute with NULL value for this entity
"""
attributes = self.ds.get_attributes()
errors = []
for attr in attributes:
tmp_df = self.df[self.df[attr].str.contains(r'\|\|\|')]['_tid_'].to_frame()
tmp_df.insert(1, "attribute", attr)
errors.append(tmp_df)
errors_df = pd.concat(errors, ignore_index=True)
return errors_df

3 changes: 2 additions & 1 deletion detect/nulldetector.py → detect/null_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ def detect_noisy_cells(self):
attributes = self.ds.get_attributes()
errors = []
for attr in attributes:
tmp_df = self.df[self.df[attr].isnull()]['_tid_'].to_frame()
# self.df i.e. raw_data has all NULL values converted to '_nan_'
tmp_df = self.df[self.df[attr] == '_nan_']['_tid_'].to_frame()
tmp_df.insert(1, "attribute", attr)
errors.append(tmp_df)
errors_df = pd.concat(errors, ignore_index=True)
Expand Down
Loading