From bc9cde2cd19a751a2c4172c74e5e3e648b35391c Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Wed, 5 Feb 2025 12:49:12 +0100 Subject: [PATCH 1/3] Initial commit for csv cleaner --- docs/pydoc/config/preprocessors_api.yml | 2 +- haystack/components/preprocessors/__init__.py | 3 +- .../preprocessors/csv_document_cleaner.py | 117 +++++++++++++++ .../test_csv_document_cleaner.py | 134 ++++++++++++++++++ 4 files changed, 254 insertions(+), 2 deletions(-) create mode 100644 haystack/components/preprocessors/csv_document_cleaner.py create mode 100644 test/components/preprocessors/test_csv_document_cleaner.py diff --git a/docs/pydoc/config/preprocessors_api.yml b/docs/pydoc/config/preprocessors_api.yml index d5a0df24c6..abbf221239 100644 --- a/docs/pydoc/config/preprocessors_api.yml +++ b/docs/pydoc/config/preprocessors_api.yml @@ -1,7 +1,7 @@ loaders: - type: haystack_pydoc_tools.loaders.CustomPythonLoader search_path: [../../../haystack/components/preprocessors] - modules: ["document_cleaner", "document_splitter", "recursive_splitter", "text_cleaner"] + modules: ["csv_document_cleaner", "document_cleaner", "document_splitter", "recursive_splitter", "text_cleaner"] ignore_when_discovered: ["__init__"] processors: - type: filter diff --git a/haystack/components/preprocessors/__init__.py b/haystack/components/preprocessors/__init__.py index 26d30c1520..0bb2c88d75 100644 --- a/haystack/components/preprocessors/__init__.py +++ b/haystack/components/preprocessors/__init__.py @@ -2,9 +2,10 @@ # # SPDX-License-Identifier: Apache-2.0 +from .csv_document_cleaner import CSVDocumentCleaner from .document_cleaner import DocumentCleaner from .document_splitter import DocumentSplitter from .recursive_splitter import RecursiveDocumentSplitter from .text_cleaner import TextCleaner -__all__ = ["DocumentSplitter", "DocumentCleaner", "RecursiveDocumentSplitter", "TextCleaner"] +__all__ = ["DocumentSplitter", "DocumentCleaner", "RecursiveDocumentSplitter", "TextCleaner", "CSVDocumentCleaner"] diff --git a/haystack/components/preprocessors/csv_document_cleaner.py b/haystack/components/preprocessors/csv_document_cleaner.py new file mode 100644 index 0000000000..d93469cb75 --- /dev/null +++ b/haystack/components/preprocessors/csv_document_cleaner.py @@ -0,0 +1,117 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from io import StringIO +from typing import Dict, List + +import pandas as pd + +from haystack import Document, component, logging + +logger = logging.getLogger(__name__) + + +@component +class CSVDocumentCleaner: + """ + A component for cleaning CSV documents by removing empty rows and columns. + + This component processes CSV content stored in Documents, allowing + for the optional ignoring of a specified number of rows and columns before performing + the cleaning operation. + """ + + def __init__(self, ignore_rows: int = 0, ignore_columns: int = 0) -> None: + """ + Initializes the CSVDocumentCleaner component. + + :param ignore_rows: Number of rows to ignore from the top of the CSV table before processing. + :param ignore_columns: Number of columns to ignore from the left of the CSV table before processing. + + Rows and columns ignored using these parameters are preserved in the final output, meaning + they are not considered when removing empty rows and columns. + """ + self.ignore_rows = ignore_rows + self.ignore_columns = ignore_columns + + @component.output_types(documents=List[Document]) + def run(self, documents: List[Document]) -> Dict[str, List[Document]]: + """ + Cleans CSV documents by removing empty rows and columns while preserving specified ignored rows and columns. + + :param documents: List of Documents containing CSV-formatted content. + + Processing steps: + 1. Reads each document's content as a CSV table. + 2. Retains the specified number of `ignore_rows` from the top and `ignore_columns` from the left. + 3. Drops any rows and columns that are entirely empty (all NaN values). + 4. Reattaches the ignored rows and columns to maintain their original positions. + 5. Returns the cleaned CSV content as a new `Document` object. + """ + ignore_rows = self.ignore_rows + ignore_columns = self.ignore_columns + + cleaned_documents = [] + for document in documents: + try: + df = pd.read_csv(StringIO(document.content), header=None, dtype=object) # type: ignore + except Exception as e: + logger.error( + "Error processing document {id}. Keeping it, but skipping cleaning. Error: {error}", + id=document.id, + error=e, + ) + cleaned_documents.append(document) + continue + + # Save ignored rows + ignored_rows = None + if ignore_rows > 0: + if ignore_rows > df.shape[0]: + logger.warning( + "Document {id} has fewer rows {shape} than the number of rows to ignore {rows}. " + "Keeping the entire document.", + id=document.id, + shape=df.shape[0], + rows=ignore_rows, + ) + cleaned_documents.append(document) + continue + ignored_rows = df.iloc[:ignore_rows, :] + + # Save ignored columns + ignored_columns = None + if ignore_columns > 0: + if ignore_columns > df.shape[1]: + logger.warning( + "Document {id} has fewer columns {shape} than the number of columns to ignore {columns}. " + "Keeping the entire document.", + id=document.id, + shape=df.shape[1], + columns=ignore_columns, + ) + cleaned_documents.append(document) + continue + ignored_columns = df.iloc[:, :ignore_columns] + + # Drop rows and columns that are entirely empty + remaining_df = df.iloc[ignore_rows:, ignore_columns:] + final_df = remaining_df.dropna(axis=0, how="all").dropna(axis=1, how="all") + + # Reattach ignored rows + if ignore_rows > 0 and ignored_rows is not None: + # Keep only relevant columns + ignored_rows = ignored_rows.loc[:, final_df.columns] + final_df = pd.concat([ignored_rows, final_df], axis=0) + + # Reattach ignored columns + if ignore_columns > 0 and ignored_columns is not None: + # Keep only relevant rows + ignored_columns = ignored_columns.loc[final_df.index, :] + final_df = pd.concat([ignored_columns, final_df], axis=1) + + cleaned_documents.append( + Document(content=final_df.to_csv(index=False, header=False), meta=document.meta.copy()) + ) + return {"documents": cleaned_documents} diff --git a/test/components/preprocessors/test_csv_document_cleaner.py b/test/components/preprocessors/test_csv_document_cleaner.py new file mode 100644 index 0000000000..06db2da31d --- /dev/null +++ b/test/components/preprocessors/test_csv_document_cleaner.py @@ -0,0 +1,134 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from haystack import Document + +from haystack.components.preprocessors.csv_document_cleaner import CSVDocumentCleaner + + +def test_empty_column() -> None: + csv_content = """,A,B,C +,1,2,3 +,4,5,6 +""" + csv_document = Document(content=csv_content) + csv_document_cleaner = CSVDocumentCleaner() + result = csv_document_cleaner.run([csv_document]) + cleaned_document = result["documents"][0] + assert cleaned_document.content == "A,B,C\n1,2,3\n4,5,6\n" + + +def test_empty_row() -> None: + csv_content = """A,B,C +1,2,3 +,, +4,5,6 +""" + csv_document = Document(content=csv_content) + csv_document_cleaner = CSVDocumentCleaner() + result = csv_document_cleaner.run([csv_document]) + cleaned_document = result["documents"][0] + assert cleaned_document.content == "A,B,C\n1,2,3\n4,5,6\n" + + +def test_empty_column_and_row() -> None: + csv_content = """,A,B,C +,1,2,3 +,,, +,4,5,6 +""" + csv_document = Document(content=csv_content) + csv_document_cleaner = CSVDocumentCleaner() + result = csv_document_cleaner.run([csv_document]) + cleaned_document = result["documents"][0] + assert cleaned_document.content == "A,B,C\n1,2,3\n4,5,6\n" + + +def test_ignore_rows() -> None: + csv_content = """,, +A,B,C +4,5,6 +7,8,9 +""" + csv_document = Document(content=csv_content, meta={"name": "test.csv"}) + csv_document_cleaner = CSVDocumentCleaner(ignore_rows=1) + result = csv_document_cleaner.run([csv_document]) + cleaned_document = result["documents"][0] + assert cleaned_document.content == ",,\nA,B,C\n4,5,6\n7,8,9\n" + assert cleaned_document.meta == {"name": "test.csv"} + + +def test_ignore_rows_2() -> None: + csv_content = """A,B,C +,, +4,5,6 +7,8,9 +""" + csv_document = Document(content=csv_content, meta={"name": "test.csv"}) + csv_document_cleaner = CSVDocumentCleaner(ignore_rows=1) + result = csv_document_cleaner.run([csv_document]) + cleaned_document = result["documents"][0] + assert cleaned_document.content == "A,B,C\n4,5,6\n7,8,9\n" + assert cleaned_document.meta == {"name": "test.csv"} + + +def test_ignore_rows_3() -> None: + csv_content = """A,B,C +4,,6 +7,,9 +""" + csv_document = Document(content=csv_content, meta={"name": "test.csv"}) + csv_document_cleaner = CSVDocumentCleaner(ignore_rows=1) + result = csv_document_cleaner.run([csv_document]) + cleaned_document = result["documents"][0] + assert cleaned_document.content == "A,C\n4,6\n7,9\n" + assert cleaned_document.meta == {"name": "test.csv"} + + +def test_ignore_columns() -> None: + csv_content = """,,A,B +,2,3,4 +,7,8,9 +""" + csv_document = Document(content=csv_content) + csv_document_cleaner = CSVDocumentCleaner(ignore_columns=1) + result = csv_document_cleaner.run([csv_document]) + cleaned_document = result["documents"][0] + assert cleaned_document.content == ",,A,B\n,2,3,4\n,7,8,9\n" + + +def test_too_many_ignore_rows() -> None: + csv_content = """,, +A,B,C +4,5,6 +""" + csv_document = Document(content=csv_content) + csv_document_cleaner = CSVDocumentCleaner(ignore_rows=4) + result = csv_document_cleaner.run([csv_document]) + cleaned_document = result["documents"][0] + assert cleaned_document.content == ",,\nA,B,C\n4,5,6\n" + + +def test_too_many_ignore_columns() -> None: + csv_content = """,, +A,B,C +4,5,6 +""" + csv_document = Document(content=csv_content) + csv_document_cleaner = CSVDocumentCleaner(ignore_columns=4) + result = csv_document_cleaner.run([csv_document]) + cleaned_document = result["documents"][0] + assert cleaned_document.content == ",,\nA,B,C\n4,5,6\n" + + +def test_ignore_rows_and_columns() -> None: + csv_content = """,A,B,C +1,item,s, +2,item2,fd, +""" + csv_document = Document(content=csv_content) + csv_document_cleaner = CSVDocumentCleaner(ignore_columns=1, ignore_rows=1) + result = csv_document_cleaner.run([csv_document]) + cleaned_document = result["documents"][0] + assert cleaned_document.content == ",A,B\n1,item,s\n2,item2,fd\n" From db4154ca9d6106e6ae066e9d1ce46d110907fbee Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Wed, 5 Feb 2025 13:11:58 +0100 Subject: [PATCH 2/3] Add release notes --- .../notes/csv-document-cleaner-8eca67e884684c56.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 releasenotes/notes/csv-document-cleaner-8eca67e884684c56.yaml diff --git a/releasenotes/notes/csv-document-cleaner-8eca67e884684c56.yaml b/releasenotes/notes/csv-document-cleaner-8eca67e884684c56.yaml new file mode 100644 index 0000000000..e4d34ad5d9 --- /dev/null +++ b/releasenotes/notes/csv-document-cleaner-8eca67e884684c56.yaml @@ -0,0 +1,6 @@ +--- +features: + - | + Introduced CSVDocumentCleaner component for cleaning CSV documents. + - Removes empty rows and columns, while preserving specified ignored rows and columns. + - Customizable number of rows and columns to ignore during processing. From 30573c8b44e0b74841e7373772fa09ecf5d2103e Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Wed, 5 Feb 2025 13:24:38 +0100 Subject: [PATCH 3/3] Update lineterminator --- haystack/components/preprocessors/csv_document_cleaner.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/haystack/components/preprocessors/csv_document_cleaner.py b/haystack/components/preprocessors/csv_document_cleaner.py index d93469cb75..94f05e649f 100644 --- a/haystack/components/preprocessors/csv_document_cleaner.py +++ b/haystack/components/preprocessors/csv_document_cleaner.py @@ -112,6 +112,8 @@ def run(self, documents: List[Document]) -> Dict[str, List[Document]]: final_df = pd.concat([ignored_columns, final_df], axis=1) cleaned_documents.append( - Document(content=final_df.to_csv(index=False, header=False), meta=document.meta.copy()) + Document( + content=final_df.to_csv(index=False, header=False, lineterminator="\n"), meta=document.meta.copy() + ) ) return {"documents": cleaned_documents}