Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add component CSVDocumentCleaner for removing empty rows and columns #8816

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/pydoc/config/preprocessors_api.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
loaders:
- type: haystack_pydoc_tools.loaders.CustomPythonLoader
search_path: [../../../haystack/components/preprocessors]
modules: ["document_cleaner", "document_splitter", "recursive_splitter", "text_cleaner"]
modules: ["csv_document_cleaner", "document_cleaner", "document_splitter", "recursive_splitter", "text_cleaner"]
ignore_when_discovered: ["__init__"]
processors:
- type: filter
Expand Down
3 changes: 2 additions & 1 deletion haystack/components/preprocessors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
#
# SPDX-License-Identifier: Apache-2.0

from .csv_document_cleaner import CSVDocumentCleaner
from .document_cleaner import DocumentCleaner
from .document_splitter import DocumentSplitter
from .recursive_splitter import RecursiveDocumentSplitter
from .text_cleaner import TextCleaner

__all__ = ["DocumentSplitter", "DocumentCleaner", "RecursiveDocumentSplitter", "TextCleaner"]
__all__ = ["DocumentSplitter", "DocumentCleaner", "RecursiveDocumentSplitter", "TextCleaner", "CSVDocumentCleaner"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) suggestion: let's keep this list ordered alphabetically as it grows makes it easier to locate components

119 changes: 119 additions & 0 deletions haystack/components/preprocessors/csv_document_cleaner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# SPDX-FileCopyrightText: 2022-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0

from io import StringIO
from typing import Dict, List

import pandas as pd
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we are probably removing pandas as a hard dependency, we should start importing it in a lazy way


from haystack import Document, component, logging

logger = logging.getLogger(__name__)


@component
class CSVDocumentCleaner:
"""
A component for cleaning CSV documents by removing empty rows and columns.

This component processes CSV content stored in Documents, allowing
for the optional ignoring of a specified number of rows and columns before performing
the cleaning operation.
"""

def __init__(self, ignore_rows: int = 0, ignore_columns: int = 0) -> None:
"""
Initializes the CSVDocumentCleaner component.

:param ignore_rows: Number of rows to ignore from the top of the CSV table before processing.
:param ignore_columns: Number of columns to ignore from the left of the CSV table before processing.

Rows and columns ignored using these parameters are preserved in the final output, meaning
they are not considered when removing empty rows and columns.
"""
self.ignore_rows = ignore_rows
self.ignore_columns = ignore_columns

@component.output_types(documents=List[Document])
def run(self, documents: List[Document]) -> Dict[str, List[Document]]:
"""
Cleans CSV documents by removing empty rows and columns while preserving specified ignored rows and columns.

:param documents: List of Documents containing CSV-formatted content.

Processing steps:
1. Reads each document's content as a CSV table.
2. Retains the specified number of `ignore_rows` from the top and `ignore_columns` from the left.
3. Drops any rows and columns that are entirely empty (all NaN values).
4. Reattaches the ignored rows and columns to maintain their original positions.
5. Returns the cleaned CSV content as a new `Document` object.
"""
ignore_rows = self.ignore_rows
ignore_columns = self.ignore_columns

cleaned_documents = []
for document in documents:
try:
df = pd.read_csv(StringIO(document.content), header=None, dtype=object) # type: ignore
except Exception as e:
logger.error(
"Error processing document {id}. Keeping it, but skipping cleaning. Error: {error}",
id=document.id,
error=e,
)
cleaned_documents.append(document)
continue

# Save ignored rows
Copy link
Contributor

@davidsbatista davidsbatista Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would refactor the save ignored rows (and also save ignored columns) logic into helper functions - too keep this for loop concise.

def _handle_ignored_rows(data_frame) -> Tuple[df_saved_rows | None, bool]:

Returning the saved rows/columns or None and True/False depending on the number of rows/columns to ignore is smaller/bigger than the total number of rows

Both use cases (row and columns) can even be handled by a single function.

ignored_rows = None
if ignore_rows > 0:
if ignore_rows > df.shape[0]:
logger.warning(
"Document {id} has fewer rows {shape} than the number of rows to ignore {rows}. "
"Keeping the entire document.",
id=document.id,
shape=df.shape[0],
rows=ignore_rows,
)
cleaned_documents.append(document)
continue
ignored_rows = df.iloc[:ignore_rows, :]

# Save ignored columns
ignored_columns = None
if ignore_columns > 0:
if ignore_columns > df.shape[1]:
logger.warning(
"Document {id} has fewer columns {shape} than the number of columns to ignore {columns}. "
"Keeping the entire document.",
id=document.id,
shape=df.shape[1],
columns=ignore_columns,
)
cleaned_documents.append(document)
continue
ignored_columns = df.iloc[:, :ignore_columns]

# Drop rows and columns that are entirely empty
remaining_df = df.iloc[ignore_rows:, ignore_columns:]
final_df = remaining_df.dropna(axis=0, how="all").dropna(axis=1, how="all")

# Reattach ignored rows
if ignore_rows > 0 and ignored_rows is not None:
# Keep only relevant columns
ignored_rows = ignored_rows.loc[:, final_df.columns]
final_df = pd.concat([ignored_rows, final_df], axis=0)

# Reattach ignored columns
if ignore_columns > 0 and ignored_columns is not None:
# Keep only relevant rows
ignored_columns = ignored_columns.loc[final_df.index, :]
final_df = pd.concat([ignored_columns, final_df], axis=1)

cleaned_documents.append(
Document(
content=final_df.to_csv(index=False, header=False, lineterminator="\n"), meta=document.meta.copy()
)
)
return {"documents": cleaned_documents}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
features:
- |
Introduced CSVDocumentCleaner component for cleaning CSV documents.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Introduced CSVDocumentCleaner component for cleaning CSV documents.
Introduced `CSVDocumentCleaner` component for cleaning CSV documents.

- Removes empty rows and columns, while preserving specified ignored rows and columns.
- Customizable number of rows and columns to ignore during processing.
134 changes: 134 additions & 0 deletions test/components/preprocessors/test_csv_document_cleaner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# SPDX-FileCopyrightText: 2022-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0

from haystack import Document

from haystack.components.preprocessors.csv_document_cleaner import CSVDocumentCleaner


def test_empty_column() -> None:
csv_content = """,A,B,C
,1,2,3
,4,5,6
"""
csv_document = Document(content=csv_content)
csv_document_cleaner = CSVDocumentCleaner()
result = csv_document_cleaner.run([csv_document])
cleaned_document = result["documents"][0]
assert cleaned_document.content == "A,B,C\n1,2,3\n4,5,6\n"


def test_empty_row() -> None:
csv_content = """A,B,C
1,2,3
,,
4,5,6
"""
csv_document = Document(content=csv_content)
csv_document_cleaner = CSVDocumentCleaner()
result = csv_document_cleaner.run([csv_document])
cleaned_document = result["documents"][0]
assert cleaned_document.content == "A,B,C\n1,2,3\n4,5,6\n"


def test_empty_column_and_row() -> None:
csv_content = """,A,B,C
,1,2,3
,,,
,4,5,6
"""
csv_document = Document(content=csv_content)
csv_document_cleaner = CSVDocumentCleaner()
result = csv_document_cleaner.run([csv_document])
cleaned_document = result["documents"][0]
assert cleaned_document.content == "A,B,C\n1,2,3\n4,5,6\n"


def test_ignore_rows() -> None:
csv_content = """,,
A,B,C
4,5,6
7,8,9
"""
csv_document = Document(content=csv_content, meta={"name": "test.csv"})
csv_document_cleaner = CSVDocumentCleaner(ignore_rows=1)
result = csv_document_cleaner.run([csv_document])
cleaned_document = result["documents"][0]
assert cleaned_document.content == ",,\nA,B,C\n4,5,6\n7,8,9\n"
assert cleaned_document.meta == {"name": "test.csv"}


def test_ignore_rows_2() -> None:
csv_content = """A,B,C
,,
4,5,6
7,8,9
"""
csv_document = Document(content=csv_content, meta={"name": "test.csv"})
csv_document_cleaner = CSVDocumentCleaner(ignore_rows=1)
result = csv_document_cleaner.run([csv_document])
cleaned_document = result["documents"][0]
assert cleaned_document.content == "A,B,C\n4,5,6\n7,8,9\n"
assert cleaned_document.meta == {"name": "test.csv"}


def test_ignore_rows_3() -> None:
csv_content = """A,B,C
4,,6
7,,9
"""
csv_document = Document(content=csv_content, meta={"name": "test.csv"})
csv_document_cleaner = CSVDocumentCleaner(ignore_rows=1)
result = csv_document_cleaner.run([csv_document])
cleaned_document = result["documents"][0]
assert cleaned_document.content == "A,C\n4,6\n7,9\n"
assert cleaned_document.meta == {"name": "test.csv"}


def test_ignore_columns() -> None:
csv_content = """,,A,B
,2,3,4
,7,8,9
"""
csv_document = Document(content=csv_content)
csv_document_cleaner = CSVDocumentCleaner(ignore_columns=1)
result = csv_document_cleaner.run([csv_document])
cleaned_document = result["documents"][0]
assert cleaned_document.content == ",,A,B\n,2,3,4\n,7,8,9\n"


def test_too_many_ignore_rows() -> None:
csv_content = """,,
A,B,C
4,5,6
"""
csv_document = Document(content=csv_content)
csv_document_cleaner = CSVDocumentCleaner(ignore_rows=4)
result = csv_document_cleaner.run([csv_document])
cleaned_document = result["documents"][0]
assert cleaned_document.content == ",,\nA,B,C\n4,5,6\n"


def test_too_many_ignore_columns() -> None:
csv_content = """,,
A,B,C
4,5,6
"""
csv_document = Document(content=csv_content)
csv_document_cleaner = CSVDocumentCleaner(ignore_columns=4)
result = csv_document_cleaner.run([csv_document])
cleaned_document = result["documents"][0]
assert cleaned_document.content == ",,\nA,B,C\n4,5,6\n"


def test_ignore_rows_and_columns() -> None:
csv_content = """,A,B,C
1,item,s,
2,item2,fd,
"""
csv_document = Document(content=csv_content)
csv_document_cleaner = CSVDocumentCleaner(ignore_columns=1, ignore_rows=1)
result = csv_document_cleaner.run([csv_document])
cleaned_document = result["documents"][0]
assert cleaned_document.content == ",A,B\n1,item,s\n2,item2,fd\n"