Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions application_sdk/common/file_converter.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from collections import namedtuple
from enum import Enum
from typing import List, Optional
Expand Down Expand Up @@ -75,9 +76,11 @@ def convert_json_to_parquet(file_path: str) -> Optional[str]:
"""Convert the downloaded files from json to parquet"""
try:
logger.info(f"Converting {file_path} to parquet")
base_dir = file_path.rsplit("/", 1)[0]
os.makedirs(f"{base_dir}/converted_files", exist_ok=True)
df = pd.read_json(file_path, orient="records", lines=True)
df = df.loc[:, ~df.where(df.astype(bool)).isna().all(axis=0)]
parquet_file_path = file_path.replace(".json", ".parquet")
parquet_file_path = f"{base_dir}/converted_files/{file_path.rsplit('/', 1)[1].replace('.json', '.parquet')}"
df.to_parquet(parquet_file_path)
return parquet_file_path
except Exception as e:
Expand All @@ -90,8 +93,10 @@ def convert_parquet_to_json(file_path: str) -> Optional[str]:
"""Convert the downloaded files from parquet to json"""
try:
logger.info(f"Converting {file_path} to json")
base_dir = file_path.rsplit("/", 1)[0]
os.makedirs(f"{base_dir}/converted_files", exist_ok=True)
df = pd.read_parquet(file_path)
json_file_path = file_path.replace(".parquet", ".json")
json_file_path = f"{base_dir}/converted_files/{file_path.rsplit('/', 1)[1].replace('.parquet', '.json')}"
df.to_json(json_file_path, orient="records", lines=True)
return json_file_path
except Exception as e:
Expand Down
116 changes: 79 additions & 37 deletions tests/unit/common/test_file_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,32 +163,42 @@ async def test_file_extension_detection_from_first_file(self, mock_registry):
class TestConvertJsonToParquet:
"""Test suite for the convert_json_to_parquet function."""

@patch("application_sdk.common.file_converter.os.makedirs")
@patch("application_sdk.common.file_converter.pd.read_json")
def test_successful_json_to_parquet_conversion(self, mock_read_json):
def test_successful_json_to_parquet_conversion(self, mock_read_json, mock_makedirs):
"""Test successful conversion from JSON to Parquet."""
# Arrange
mock_df = MagicMock()
mock_filtered_df = MagicMock()

# Mock the column filtering chain: df.loc[:, ~df.where(df.astype(bool)).isna().all(axis=0)]
mock_df.where.return_value = mock_df
mock_df.astype.return_value = mock_df
mock_df.isna.return_value = mock_df
mock_df.all.return_value = MagicMock()
mock_df.loc.__getitem__.return_value = mock_filtered_df
# Mock the column filtering chain properly
# The actual code: df.loc[:, ~df.where(df.astype(bool)).isna().all(axis=0)]
mock_where_result = MagicMock()
mock_astype_result = MagicMock()
mock_isna_result = MagicMock()
mock_all_result = MagicMock()

mock_df.where.return_value = mock_where_result
mock_where_result.astype.return_value = mock_astype_result
mock_astype_result.isna.return_value = mock_isna_result
mock_isna_result.all.return_value = mock_all_result

# Mock the loc operation to return the same df for simplicity
mock_df.loc = MagicMock()
mock_df.loc.__getitem__ = MagicMock(return_value=mock_df)

mock_read_json.return_value = mock_df

file_path = "/path/input.json"
expected_output = "/path/input.parquet"
expected_output = "/path/converted_files/input.parquet"

# Act
result = convert_json_to_parquet(file_path)

# Assert
assert result == expected_output
mock_read_json.assert_called_once_with(file_path, orient="records", lines=True)
mock_filtered_df.to_parquet.assert_called_once_with(expected_output)
mock_makedirs.assert_called_once_with("/path/converted_files", exist_ok=True)
mock_df.to_parquet.assert_called_once_with(expected_output)

@patch("application_sdk.common.file_converter.pd.read_json")
def test_json_to_parquet_read_error_returns_none(self, mock_read_json):
Expand All @@ -206,22 +216,32 @@ def test_json_to_parquet_read_error_returns_none(self, mock_read_json):
assert result is None
mock_logger.assert_called_once()

@patch("application_sdk.common.file_converter.os.makedirs")
@patch("application_sdk.common.file_converter.pd.read_json")
def test_json_to_parquet_write_error_returns_none(self, mock_read_json):
def test_json_to_parquet_write_error_returns_none(
self, mock_read_json, mock_makedirs
):
"""Test that write errors return None and are logged."""
# Arrange
mock_df = MagicMock()
mock_filtered_df = MagicMock()

# Mock the column filtering chain
mock_df.where.return_value = mock_df
mock_df.astype.return_value = mock_df
mock_df.isna.return_value = mock_df
mock_df.all.return_value = MagicMock()
mock_df.loc.__getitem__.return_value = mock_filtered_df
# Mock the column filtering chain properly
mock_where_result = MagicMock()
mock_astype_result = MagicMock()
mock_isna_result = MagicMock()
mock_all_result = MagicMock()

mock_df.where.return_value = mock_where_result
mock_where_result.astype.return_value = mock_astype_result
mock_astype_result.isna.return_value = mock_isna_result
mock_isna_result.all.return_value = mock_all_result

# Mock the loc operation to return the same df
mock_df.loc = MagicMock()
mock_df.loc.__getitem__ = MagicMock(return_value=mock_df)

# Make to_parquet raise an exception
mock_filtered_df.to_parquet.side_effect = Exception("File write error")
mock_df.to_parquet.side_effect = Exception("File write error")

mock_read_json.return_value = mock_df

Expand All @@ -235,19 +255,31 @@ def test_json_to_parquet_write_error_returns_none(self, mock_read_json):
assert result is None
mock_logger.assert_called_once()

@patch("application_sdk.common.file_converter.os.makedirs")
@patch("application_sdk.common.file_converter.pd.read_json")
def test_json_to_parquet_column_filtering(self, mock_read_json):
def test_json_to_parquet_column_filtering(self, mock_read_json, mock_makedirs):
"""Test that empty columns are filtered out during conversion."""
# Arrange
mock_df = MagicMock()
mock_filtered_df = MagicMock()

# Mock the column filtering chain
mock_df.where.return_value = mock_df
mock_df.astype.return_value = mock_df
mock_df.isna.return_value = mock_df
mock_df.all.return_value = MagicMock()
mock_df.loc.__getitem__.return_value = mock_filtered_df
# Mock the complete column filtering chain properly
# The actual code: df.loc[:, ~df.where(df.astype(bool)).isna().all(axis=0)]
mock_astype_result = MagicMock()
mock_where_result = MagicMock()
mock_isna_result = MagicMock()
mock_all_result = MagicMock()

# df.astype(bool) is called on the original df
mock_df.astype.return_value = mock_astype_result
# df.where(df.astype(bool)) is called on the original df with astype result
mock_df.where.return_value = mock_where_result
# Then .isna() and .all() are called on the where result
mock_where_result.isna.return_value = mock_isna_result
mock_isna_result.all.return_value = mock_all_result

# Mock the loc operation
mock_df.loc = MagicMock()
mock_df.loc.__getitem__ = MagicMock(return_value=mock_df)

mock_read_json.return_value = mock_df

Expand All @@ -257,33 +289,38 @@ def test_json_to_parquet_column_filtering(self, mock_read_json):
convert_json_to_parquet(file_path)

# Assert
# Verify that column filtering logic was applied
mock_df.where.assert_called_once()
# Verify that column filtering logic was applied in the correct order
# The actual code: df.loc[:, ~df.where(df.astype(bool)).isna().all(axis=0)]
mock_df.astype.assert_called_once_with(bool)
mock_df.isna.assert_called_once()
mock_df.all.assert_called_once_with(axis=0)
mock_filtered_df.to_parquet.assert_called_once()
mock_df.where.assert_called_once_with(mock_astype_result)
mock_where_result.isna.assert_called_once()
mock_isna_result.all.assert_called_once_with(axis=0)
mock_df.to_parquet.assert_called_once()


class TestConvertParquetToJson:
"""Test suite for the convert_parquet_to_json function."""

@patch("application_sdk.common.file_converter.os.makedirs")
@patch("application_sdk.common.file_converter.pd.read_parquet")
def test_successful_parquet_to_json_conversion(self, mock_read_parquet):
def test_successful_parquet_to_json_conversion(
self, mock_read_parquet, mock_makedirs
):
"""Test successful conversion from Parquet to JSON."""
# Arrange
mock_df = MagicMock()
mock_read_parquet.return_value = mock_df

file_path = "/path/input.parquet"
expected_output = "/path/input.json"
expected_output = "/path/converted_files/input.json"

# Act
result = convert_parquet_to_json(file_path)

# Assert
assert result == expected_output
mock_read_parquet.assert_called_once_with(file_path)
mock_makedirs.assert_called_once_with("/path/converted_files", exist_ok=True)
mock_df.to_json.assert_called_once_with(
expected_output, orient="records", lines=True
)
Expand All @@ -304,8 +341,11 @@ def test_parquet_to_json_read_error_returns_none(self, mock_read_parquet):
assert result is None
mock_logger.assert_called_once()

@patch("application_sdk.common.file_converter.os.makedirs")
@patch("application_sdk.common.file_converter.pd.read_parquet")
def test_parquet_to_json_write_error_returns_none(self, mock_read_parquet):
def test_parquet_to_json_write_error_returns_none(
self, mock_read_parquet, mock_makedirs
):
"""Test that write errors return None and are logged."""
# Arrange
mock_df = MagicMock()
Expand All @@ -322,8 +362,9 @@ def test_parquet_to_json_write_error_returns_none(self, mock_read_parquet):
assert result is None
mock_logger.assert_called_once()

@patch("application_sdk.common.file_converter.os.makedirs")
@patch("application_sdk.common.file_converter.pd.read_parquet")
def test_parquet_to_json_proper_parameters(self, mock_read_parquet):
def test_parquet_to_json_proper_parameters(self, mock_read_parquet, mock_makedirs):
"""Test that JSON is written with correct parameters for line-delimited format."""
# Arrange
mock_df = MagicMock()
Expand All @@ -335,8 +376,9 @@ def test_parquet_to_json_proper_parameters(self, mock_read_parquet):
convert_parquet_to_json(file_path)

# Assert
mock_makedirs.assert_called_once_with("/path/converted_files", exist_ok=True)
mock_df.to_json.assert_called_once_with(
"/path/input.json", orient="records", lines=True
"/path/converted_files/input.json", orient="records", lines=True
)


Expand Down
Loading