Skip to content

Commit

Permalink
Added CSVData tests for io streams (#327)
Browse files Browse the repository at this point in the history
* Added CSVData tests for io streams

* Change Data to CSVData

* Fixed parameter issue and checked tests

* Added change to fix 3.6 bug

* Made small changes
  • Loading branch information
gautomdas authored Jul 15, 2021
1 parent 14f96da commit 5116641
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 5 deletions.
16 changes: 15 additions & 1 deletion dataprofiler/data_readers/data_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from builtins import next
import re
import json
from io import open, StringIO, BytesIO
from io import open, StringIO, BytesIO, TextIOWrapper
from collections import OrderedDict
import dateutil

Expand Down Expand Up @@ -272,9 +272,23 @@ def read_csv_df(file_path, delimiter, header, selected_columns=[],

if len(selected_columns) > 0:
args['usecols'] = selected_columns

# account for py3.6 requirement for pandas, can remove if >= py3.7
is_buf_wrapped = False
if isinstance(file_path, BytesIO):
# a BytesIO stream has to be wrapped in order to properly be detached
# in 3.6 this avoids read_csv wrapping the stream and closing too early
file_path = TextIOWrapper(file_path, encoding=encoding)
is_buf_wrapped = True

fo = pd.read_csv(file_path, **args)
data = fo.read()

# if the buffer was wrapped, detach it before returning
if is_buf_wrapped:
file_path.detach()
fo.close()

return data


Expand Down
190 changes: 186 additions & 4 deletions dataprofiler/tests/data_readers/test_csv_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ def test_is_match_for_string_streams(self):
for input_file in self.input_file_names:
with open(input_file['path'], 'r',
encoding=input_file['encoding']) as fp:
byte_string = StringIO(fp.read())
self.assertTrue(CSVData.is_match(byte_string))
buffer = StringIO(fp.read())
self.assertTrue(CSVData.is_match(buffer))

def test_is_match_for_byte_streams(self):
"""
Expand All @@ -179,8 +179,8 @@ def test_is_match_for_byte_streams(self):
"""
for input_file in self.input_file_names:
with open(input_file['path'], 'rb') as fp:
byte_string = BytesIO(fp.read())
self.assertTrue(CSVData.is_match(byte_string))
buffer = BytesIO(fp.read())
self.assertTrue(CSVData.is_match(buffer))

def test_auto_file_identification(self):
"""
Expand Down Expand Up @@ -208,6 +208,31 @@ def test_specifying_data_type(self):
input_file['delimiter'],
input_file["path"])

def test_specifying_data_type_as_streams(self):
"""
Determine if the csv file can be loaded with manual data_type setting
for streams
"""
for input_file in self.input_file_names:
with open(input_file['path'], 'rb') as fp:
buffer = BytesIO(fp.read())
input_data_obj = Data(buffer, data_type='csv')
self.assertEqual(input_data_obj.data_type, 'csv')
self.assertEqual(input_data_obj.data_type, 'csv', input_file["path"])
self.assertEqual(input_data_obj.delimiter,
input_file['delimiter'],
input_file["path"])

with open(input_file['path'], 'r', encoding=input_file['encoding']) as fp:
buffer = StringIO(fp.read())
input_data_obj = Data(buffer, data_type='csv')
self.assertEqual(input_data_obj.data_type, 'csv')
self.assertEqual(input_data_obj.data_type, 'csv', input_file["path"])
self.assertEqual(input_data_obj.delimiter,
input_file['delimiter'],
input_file["path"])


def test_data_formats(self):
"""
Test the data format options.
Expand All @@ -228,6 +253,45 @@ def test_data_formats(self):
"['dataframe', 'records']"
)

def test_data_formats_as_streams(self):
"""
Test the data format options for streams
"""
for input_file in self.input_file_names:
with open(input_file['path'], 'rb') as fp:
buffer = BytesIO(fp.read())
input_data_obj = CSVData(buffer)
self.assertEqual(input_data_obj.data_type, 'csv')
self.assertIsInstance(input_data_obj.data, pd.DataFrame)

input_data_obj.data_format = "records"
self.assertIsInstance(input_data_obj.data, list)

with self.assertRaises(ValueError) as exc:
input_data_obj.data_format = "NON_EXISTENT"
self.assertEqual(
str(exc.exception),
"The data format must be one of the following: " +
"['dataframe', 'records']"
)

with open(input_file['path'], 'r', encoding=input_file['encoding']) as fp:
buffer = StringIO(fp.read())
input_data_obj = CSVData(buffer)
self.assertEqual(input_data_obj.data_type, 'csv')
self.assertIsInstance(input_data_obj.data, pd.DataFrame)

input_data_obj.data_format = "records"
self.assertIsInstance(input_data_obj.data, list)

with self.assertRaises(ValueError) as exc:
input_data_obj.data_format = "NON_EXISTENT"
self.assertEqual(
str(exc.exception),
"The data format must be one of the following: " +
"['dataframe', 'records']"
)

def test_reload_data(self):
"""
Determine if the csv file can be reloaded
Expand All @@ -240,6 +304,29 @@ def test_reload_data(self):
input_file['path'])
self.assertEqual(input_file['path'], input_data_obj.input_file_path)

def test_reload_data_as_streams(self):
"""
Determine if the csv file can be reloaded for streams
"""
for input_file in self.input_file_names:
with open(input_file['path'], 'rb') as fp:
buffer = BytesIO(fp.read())
input_data_obj = CSVData(buffer)
input_data_obj.reload(buffer)
self.assertEqual(input_data_obj.data_type, 'csv', input_file['path'])
self.assertEqual(input_data_obj.delimiter, input_file['delimiter'],
input_file['path'])
self.assertEqual(buffer, input_data_obj.input_file_path)

with open(input_file['path'], 'r', encoding=input_file['encoding']) as fp:
buffer = StringIO(fp.read())
input_data_obj = CSVData(buffer)
input_data_obj.reload(buffer)
self.assertEqual(input_data_obj.data_type, 'csv', input_file['path'])
self.assertEqual(input_data_obj.delimiter, input_file['delimiter'],
input_file['path'])
self.assertEqual(buffer, input_data_obj.input_file_path)

def test_allowed_data_formats(self):
"""
Determine if the csv file data_formats can be used
Expand All @@ -257,6 +344,39 @@ def test_allowed_data_formats(self):
self.assertIsInstance(data, list)
self.assertIsInstance(data[0], str)

def test_allowed_data_formats_as_streams(self):
"""
Determine if the csv file data_formats can be used for streams
"""
for input_file in self.input_file_names:
with open(input_file['path'], 'rb') as fp:
buffer = BytesIO(fp.read())
input_data_obj = CSVData(buffer)
for data_format in list(input_data_obj._data_formats.keys()):
input_data_obj.data_format = data_format
self.assertEqual(input_data_obj.data_format, data_format)
data = input_data_obj.data
if data_format == "dataframe":
import pandas as pd
self.assertIsInstance(data, pd.DataFrame)
elif data_format in ["records", "json"]:
self.assertIsInstance(data, list)
self.assertIsInstance(data[0], str)

with open(input_file['path'], 'r', encoding=input_file['encoding']) as fp:
buffer = StringIO(fp.read())
input_data_obj = CSVData(buffer)
for data_format in list(input_data_obj._data_formats.keys()):
input_data_obj.data_format = data_format
self.assertEqual(input_data_obj.data_format, data_format)
data = input_data_obj.data
if data_format == "dataframe":
import pandas as pd
self.assertIsInstance(data, pd.DataFrame)
elif data_format in ["records", "json"]:
self.assertIsInstance(data, list)
self.assertIsInstance(data[0], str)

def test_set_header(self):
test_dir = os.path.join(test_root_path, 'data')
filename = 'csv/sparse-first-and-last-column-two-headers.txt'
Expand Down Expand Up @@ -310,6 +430,40 @@ def test_set_header(self):
self.assertEqual(1, csv_data.header)
self.assertEqual('1', first_value)

def test_set_header_as_streams(self):
test_dir = os.path.join(test_root_path, 'data')
fp = 'csv/sparse-first-and-last-column-two-headers.txt'

for filename in [BytesIO(open(os.path.join(test_dir, fp), 'rb').read()),\
StringIO(open(os.path.join(test_dir, fp), 'r').read())]:
# set header auto
options = dict(header='auto')
csv_data = CSVData(filename, options=options)
first_value = csv_data.data.loc[0][0]
self.assertEqual(1, csv_data.header)
self.assertEqual('1', first_value)

# set header None (no header)
options = dict(header=None)
csv_data = CSVData(filename, options=options)
first_value = csv_data.data.loc[0][0]
self.assertIsNone(csv_data.header) # should be None
self.assertEqual('COUNT', first_value)

# set header 0
options = dict(header=0)
csv_data = CSVData(filename, options=options)
first_value = csv_data.data.loc[0][0]
self.assertEqual(0, csv_data.header)
self.assertEqual('CONTAR', first_value)

# set header 1
options = dict(header=1)
csv_data = CSVData(filename, options=options)
first_value = csv_data.data.loc[0][0]
self.assertEqual(1, csv_data.header)
self.assertEqual('1', first_value)

def test_header_check_files(self):
"""
Determine if files with no header are properly determined.
Expand Down Expand Up @@ -398,6 +552,34 @@ def test_len_data(self):
data.length,
msg=input_file['path'])

def test_len_data_as_streams(self):
"""
Validate that length called on CSVData is appropriately determining the
length value for streams.
"""

for input_file in self.input_file_names:

with open(input_file['path'], 'rb') as fp:
buffer = BytesIO(fp.read())
data = CSVData(buffer)
self.assertEqual(input_file['count'],
len(data),
msg=input_file['path'])
self.assertEqual(input_file['count'],
data.length,
msg=input_file['path'])

with open(input_file['path'], 'r', encoding=input_file['encoding']) as fp:
buffer = StringIO(fp.read())
data = CSVData(buffer)
self.assertEqual(input_file['count'],
len(data),
msg=input_file['path'])
self.assertEqual(input_file['count'],
data.length,
msg=input_file['path'])

def test_is_structured(self):
# Default construction
data = CSVData()
Expand Down

0 comments on commit 5116641

Please sign in to comment.