diff --git a/application_sdk/common/file_converter.py b/application_sdk/common/file_converter.py index ff9731ff8..6a7e031a5 100644 --- a/application_sdk/common/file_converter.py +++ b/application_sdk/common/file_converter.py @@ -1,3 +1,4 @@ +import os from collections import namedtuple from enum import Enum from typing import List, Optional @@ -75,9 +76,11 @@ def convert_json_to_parquet(file_path: str) -> Optional[str]: """Convert the downloaded files from json to parquet""" try: logger.info(f"Converting {file_path} to parquet") + base_dir = file_path.rsplit("/", 1)[0] + os.makedirs(f"{base_dir}/converted_files", exist_ok=True) df = pd.read_json(file_path, orient="records", lines=True) df = df.loc[:, ~df.where(df.astype(bool)).isna().all(axis=0)] - parquet_file_path = file_path.replace(".json", ".parquet") + parquet_file_path = f"{base_dir}/converted_files/{file_path.rsplit('/', 1)[1].replace('.json', '.parquet')}" df.to_parquet(parquet_file_path) return parquet_file_path except Exception as e: @@ -90,8 +93,10 @@ def convert_parquet_to_json(file_path: str) -> Optional[str]: """Convert the downloaded files from parquet to json""" try: logger.info(f"Converting {file_path} to json") + base_dir = file_path.rsplit("/", 1)[0] + os.makedirs(f"{base_dir}/converted_files", exist_ok=True) df = pd.read_parquet(file_path) - json_file_path = file_path.replace(".parquet", ".json") + json_file_path = f"{base_dir}/converted_files/{file_path.rsplit('/', 1)[1].replace('.parquet', '.json')}" df.to_json(json_file_path, orient="records", lines=True) return json_file_path except Exception as e: diff --git a/tests/unit/common/test_file_converter.py b/tests/unit/common/test_file_converter.py index e5896784f..4d839944d 100644 --- a/tests/unit/common/test_file_converter.py +++ b/tests/unit/common/test_file_converter.py @@ -163,24 +163,33 @@ async def test_file_extension_detection_from_first_file(self, mock_registry): class TestConvertJsonToParquet: """Test suite for the convert_json_to_parquet function.""" + @patch("application_sdk.common.file_converter.os.makedirs") @patch("application_sdk.common.file_converter.pd.read_json") - def test_successful_json_to_parquet_conversion(self, mock_read_json): + def test_successful_json_to_parquet_conversion(self, mock_read_json, mock_makedirs): """Test successful conversion from JSON to Parquet.""" # Arrange mock_df = MagicMock() - mock_filtered_df = MagicMock() - # Mock the column filtering chain: df.loc[:, ~df.where(df.astype(bool)).isna().all(axis=0)] - mock_df.where.return_value = mock_df - mock_df.astype.return_value = mock_df - mock_df.isna.return_value = mock_df - mock_df.all.return_value = MagicMock() - mock_df.loc.__getitem__.return_value = mock_filtered_df + # Mock the column filtering chain properly + # The actual code: df.loc[:, ~df.where(df.astype(bool)).isna().all(axis=0)] + mock_where_result = MagicMock() + mock_astype_result = MagicMock() + mock_isna_result = MagicMock() + mock_all_result = MagicMock() + + mock_df.where.return_value = mock_where_result + mock_where_result.astype.return_value = mock_astype_result + mock_astype_result.isna.return_value = mock_isna_result + mock_isna_result.all.return_value = mock_all_result + + # Mock the loc operation to return the same df for simplicity + mock_df.loc = MagicMock() + mock_df.loc.__getitem__ = MagicMock(return_value=mock_df) mock_read_json.return_value = mock_df file_path = "/path/input.json" - expected_output = "/path/input.parquet" + expected_output = "/path/converted_files/input.parquet" # Act result = convert_json_to_parquet(file_path) @@ -188,7 +197,8 @@ def test_successful_json_to_parquet_conversion(self, mock_read_json): # Assert assert result == expected_output mock_read_json.assert_called_once_with(file_path, orient="records", lines=True) - mock_filtered_df.to_parquet.assert_called_once_with(expected_output) + mock_makedirs.assert_called_once_with("/path/converted_files", exist_ok=True) + mock_df.to_parquet.assert_called_once_with(expected_output) @patch("application_sdk.common.file_converter.pd.read_json") def test_json_to_parquet_read_error_returns_none(self, mock_read_json): @@ -206,22 +216,32 @@ def test_json_to_parquet_read_error_returns_none(self, mock_read_json): assert result is None mock_logger.assert_called_once() + @patch("application_sdk.common.file_converter.os.makedirs") @patch("application_sdk.common.file_converter.pd.read_json") - def test_json_to_parquet_write_error_returns_none(self, mock_read_json): + def test_json_to_parquet_write_error_returns_none( + self, mock_read_json, mock_makedirs + ): """Test that write errors return None and are logged.""" # Arrange mock_df = MagicMock() - mock_filtered_df = MagicMock() - # Mock the column filtering chain - mock_df.where.return_value = mock_df - mock_df.astype.return_value = mock_df - mock_df.isna.return_value = mock_df - mock_df.all.return_value = MagicMock() - mock_df.loc.__getitem__.return_value = mock_filtered_df + # Mock the column filtering chain properly + mock_where_result = MagicMock() + mock_astype_result = MagicMock() + mock_isna_result = MagicMock() + mock_all_result = MagicMock() + + mock_df.where.return_value = mock_where_result + mock_where_result.astype.return_value = mock_astype_result + mock_astype_result.isna.return_value = mock_isna_result + mock_isna_result.all.return_value = mock_all_result + + # Mock the loc operation to return the same df + mock_df.loc = MagicMock() + mock_df.loc.__getitem__ = MagicMock(return_value=mock_df) # Make to_parquet raise an exception - mock_filtered_df.to_parquet.side_effect = Exception("File write error") + mock_df.to_parquet.side_effect = Exception("File write error") mock_read_json.return_value = mock_df @@ -235,19 +255,31 @@ def test_json_to_parquet_write_error_returns_none(self, mock_read_json): assert result is None mock_logger.assert_called_once() + @patch("application_sdk.common.file_converter.os.makedirs") @patch("application_sdk.common.file_converter.pd.read_json") - def test_json_to_parquet_column_filtering(self, mock_read_json): + def test_json_to_parquet_column_filtering(self, mock_read_json, mock_makedirs): """Test that empty columns are filtered out during conversion.""" # Arrange mock_df = MagicMock() - mock_filtered_df = MagicMock() - # Mock the column filtering chain - mock_df.where.return_value = mock_df - mock_df.astype.return_value = mock_df - mock_df.isna.return_value = mock_df - mock_df.all.return_value = MagicMock() - mock_df.loc.__getitem__.return_value = mock_filtered_df + # Mock the complete column filtering chain properly + # The actual code: df.loc[:, ~df.where(df.astype(bool)).isna().all(axis=0)] + mock_astype_result = MagicMock() + mock_where_result = MagicMock() + mock_isna_result = MagicMock() + mock_all_result = MagicMock() + + # df.astype(bool) is called on the original df + mock_df.astype.return_value = mock_astype_result + # df.where(df.astype(bool)) is called on the original df with astype result + mock_df.where.return_value = mock_where_result + # Then .isna() and .all() are called on the where result + mock_where_result.isna.return_value = mock_isna_result + mock_isna_result.all.return_value = mock_all_result + + # Mock the loc operation + mock_df.loc = MagicMock() + mock_df.loc.__getitem__ = MagicMock(return_value=mock_df) mock_read_json.return_value = mock_df @@ -257,26 +289,30 @@ def test_json_to_parquet_column_filtering(self, mock_read_json): convert_json_to_parquet(file_path) # Assert - # Verify that column filtering logic was applied - mock_df.where.assert_called_once() + # Verify that column filtering logic was applied in the correct order + # The actual code: df.loc[:, ~df.where(df.astype(bool)).isna().all(axis=0)] mock_df.astype.assert_called_once_with(bool) - mock_df.isna.assert_called_once() - mock_df.all.assert_called_once_with(axis=0) - mock_filtered_df.to_parquet.assert_called_once() + mock_df.where.assert_called_once_with(mock_astype_result) + mock_where_result.isna.assert_called_once() + mock_isna_result.all.assert_called_once_with(axis=0) + mock_df.to_parquet.assert_called_once() class TestConvertParquetToJson: """Test suite for the convert_parquet_to_json function.""" + @patch("application_sdk.common.file_converter.os.makedirs") @patch("application_sdk.common.file_converter.pd.read_parquet") - def test_successful_parquet_to_json_conversion(self, mock_read_parquet): + def test_successful_parquet_to_json_conversion( + self, mock_read_parquet, mock_makedirs + ): """Test successful conversion from Parquet to JSON.""" # Arrange mock_df = MagicMock() mock_read_parquet.return_value = mock_df file_path = "/path/input.parquet" - expected_output = "/path/input.json" + expected_output = "/path/converted_files/input.json" # Act result = convert_parquet_to_json(file_path) @@ -284,6 +320,7 @@ def test_successful_parquet_to_json_conversion(self, mock_read_parquet): # Assert assert result == expected_output mock_read_parquet.assert_called_once_with(file_path) + mock_makedirs.assert_called_once_with("/path/converted_files", exist_ok=True) mock_df.to_json.assert_called_once_with( expected_output, orient="records", lines=True ) @@ -304,8 +341,11 @@ def test_parquet_to_json_read_error_returns_none(self, mock_read_parquet): assert result is None mock_logger.assert_called_once() + @patch("application_sdk.common.file_converter.os.makedirs") @patch("application_sdk.common.file_converter.pd.read_parquet") - def test_parquet_to_json_write_error_returns_none(self, mock_read_parquet): + def test_parquet_to_json_write_error_returns_none( + self, mock_read_parquet, mock_makedirs + ): """Test that write errors return None and are logged.""" # Arrange mock_df = MagicMock() @@ -322,8 +362,9 @@ def test_parquet_to_json_write_error_returns_none(self, mock_read_parquet): assert result is None mock_logger.assert_called_once() + @patch("application_sdk.common.file_converter.os.makedirs") @patch("application_sdk.common.file_converter.pd.read_parquet") - def test_parquet_to_json_proper_parameters(self, mock_read_parquet): + def test_parquet_to_json_proper_parameters(self, mock_read_parquet, mock_makedirs): """Test that JSON is written with correct parameters for line-delimited format.""" # Arrange mock_df = MagicMock() @@ -335,8 +376,9 @@ def test_parquet_to_json_proper_parameters(self, mock_read_parquet): convert_parquet_to_json(file_path) # Assert + mock_makedirs.assert_called_once_with("/path/converted_files", exist_ok=True) mock_df.to_json.assert_called_once_with( - "/path/input.json", orient="records", lines=True + "/path/converted_files/input.json", orient="records", lines=True )