-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add component CSVDocumentCleaner for removing empty rows and columns #8816
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
# SPDX-FileCopyrightText: 2022-present deepset GmbH <[email protected]> | ||
# | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
from io import StringIO | ||
from typing import Dict, List | ||
|
||
import pandas as pd | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since we are probably removing pandas as a hard dependency, we should start importing it in a lazy way |
||
|
||
from haystack import Document, component, logging | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
@component | ||
class CSVDocumentCleaner: | ||
""" | ||
A component for cleaning CSV documents by removing empty rows and columns. | ||
|
||
This component processes CSV content stored in Documents, allowing | ||
for the optional ignoring of a specified number of rows and columns before performing | ||
the cleaning operation. | ||
""" | ||
|
||
def __init__(self, ignore_rows: int = 0, ignore_columns: int = 0) -> None: | ||
""" | ||
Initializes the CSVDocumentCleaner component. | ||
|
||
:param ignore_rows: Number of rows to ignore from the top of the CSV table before processing. | ||
:param ignore_columns: Number of columns to ignore from the left of the CSV table before processing. | ||
|
||
Rows and columns ignored using these parameters are preserved in the final output, meaning | ||
they are not considered when removing empty rows and columns. | ||
""" | ||
self.ignore_rows = ignore_rows | ||
self.ignore_columns = ignore_columns | ||
|
||
@component.output_types(documents=List[Document]) | ||
def run(self, documents: List[Document]) -> Dict[str, List[Document]]: | ||
""" | ||
Cleans CSV documents by removing empty rows and columns while preserving specified ignored rows and columns. | ||
|
||
:param documents: List of Documents containing CSV-formatted content. | ||
|
||
Processing steps: | ||
1. Reads each document's content as a CSV table. | ||
2. Retains the specified number of `ignore_rows` from the top and `ignore_columns` from the left. | ||
3. Drops any rows and columns that are entirely empty (all NaN values). | ||
4. Reattaches the ignored rows and columns to maintain their original positions. | ||
5. Returns the cleaned CSV content as a new `Document` object. | ||
""" | ||
ignore_rows = self.ignore_rows | ||
ignore_columns = self.ignore_columns | ||
|
||
cleaned_documents = [] | ||
for document in documents: | ||
try: | ||
df = pd.read_csv(StringIO(document.content), header=None, dtype=object) # type: ignore | ||
except Exception as e: | ||
logger.error( | ||
"Error processing document {id}. Keeping it, but skipping cleaning. Error: {error}", | ||
id=document.id, | ||
error=e, | ||
) | ||
cleaned_documents.append(document) | ||
continue | ||
|
||
# Save ignored rows | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would refactor the save ignored rows (and also save ignored columns) logic into helper functions - too keep this for loop concise. def _handle_ignored_rows(data_frame) -> Tuple[df_saved_rows | None, bool]: Returning the saved rows/columns or None and True/False depending on the number of rows/columns to ignore is smaller/bigger than the total number of rows Both use cases (row and columns) can even be handled by a single function. |
||
ignored_rows = None | ||
if ignore_rows > 0: | ||
if ignore_rows > df.shape[0]: | ||
logger.warning( | ||
"Document {id} has fewer rows {shape} than the number of rows to ignore {rows}. " | ||
"Keeping the entire document.", | ||
id=document.id, | ||
shape=df.shape[0], | ||
rows=ignore_rows, | ||
) | ||
cleaned_documents.append(document) | ||
continue | ||
ignored_rows = df.iloc[:ignore_rows, :] | ||
|
||
# Save ignored columns | ||
ignored_columns = None | ||
if ignore_columns > 0: | ||
if ignore_columns > df.shape[1]: | ||
logger.warning( | ||
"Document {id} has fewer columns {shape} than the number of columns to ignore {columns}. " | ||
"Keeping the entire document.", | ||
id=document.id, | ||
shape=df.shape[1], | ||
columns=ignore_columns, | ||
) | ||
cleaned_documents.append(document) | ||
continue | ||
ignored_columns = df.iloc[:, :ignore_columns] | ||
|
||
# Drop rows and columns that are entirely empty | ||
remaining_df = df.iloc[ignore_rows:, ignore_columns:] | ||
final_df = remaining_df.dropna(axis=0, how="all").dropna(axis=1, how="all") | ||
|
||
# Reattach ignored rows | ||
if ignore_rows > 0 and ignored_rows is not None: | ||
# Keep only relevant columns | ||
ignored_rows = ignored_rows.loc[:, final_df.columns] | ||
final_df = pd.concat([ignored_rows, final_df], axis=0) | ||
|
||
# Reattach ignored columns | ||
if ignore_columns > 0 and ignored_columns is not None: | ||
# Keep only relevant rows | ||
ignored_columns = ignored_columns.loc[final_df.index, :] | ||
final_df = pd.concat([ignored_columns, final_df], axis=1) | ||
|
||
cleaned_documents.append( | ||
Document( | ||
content=final_df.to_csv(index=False, header=False, lineterminator="\n"), meta=document.meta.copy() | ||
) | ||
) | ||
return {"documents": cleaned_documents} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,6 @@ | ||||||
--- | ||||||
features: | ||||||
- | | ||||||
Introduced CSVDocumentCleaner component for cleaning CSV documents. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
- Removes empty rows and columns, while preserving specified ignored rows and columns. | ||||||
- Customizable number of rows and columns to ignore during processing. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
# SPDX-FileCopyrightText: 2022-present deepset GmbH <[email protected]> | ||
# | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
from haystack import Document | ||
|
||
from haystack.components.preprocessors.csv_document_cleaner import CSVDocumentCleaner | ||
|
||
|
||
def test_empty_column() -> None: | ||
csv_content = """,A,B,C | ||
,1,2,3 | ||
,4,5,6 | ||
""" | ||
csv_document = Document(content=csv_content) | ||
csv_document_cleaner = CSVDocumentCleaner() | ||
result = csv_document_cleaner.run([csv_document]) | ||
cleaned_document = result["documents"][0] | ||
assert cleaned_document.content == "A,B,C\n1,2,3\n4,5,6\n" | ||
|
||
|
||
def test_empty_row() -> None: | ||
csv_content = """A,B,C | ||
1,2,3 | ||
,, | ||
4,5,6 | ||
""" | ||
csv_document = Document(content=csv_content) | ||
csv_document_cleaner = CSVDocumentCleaner() | ||
result = csv_document_cleaner.run([csv_document]) | ||
cleaned_document = result["documents"][0] | ||
assert cleaned_document.content == "A,B,C\n1,2,3\n4,5,6\n" | ||
|
||
|
||
def test_empty_column_and_row() -> None: | ||
csv_content = """,A,B,C | ||
,1,2,3 | ||
,,, | ||
,4,5,6 | ||
""" | ||
csv_document = Document(content=csv_content) | ||
csv_document_cleaner = CSVDocumentCleaner() | ||
result = csv_document_cleaner.run([csv_document]) | ||
cleaned_document = result["documents"][0] | ||
assert cleaned_document.content == "A,B,C\n1,2,3\n4,5,6\n" | ||
|
||
|
||
def test_ignore_rows() -> None: | ||
csv_content = """,, | ||
A,B,C | ||
4,5,6 | ||
7,8,9 | ||
""" | ||
csv_document = Document(content=csv_content, meta={"name": "test.csv"}) | ||
csv_document_cleaner = CSVDocumentCleaner(ignore_rows=1) | ||
result = csv_document_cleaner.run([csv_document]) | ||
cleaned_document = result["documents"][0] | ||
assert cleaned_document.content == ",,\nA,B,C\n4,5,6\n7,8,9\n" | ||
assert cleaned_document.meta == {"name": "test.csv"} | ||
|
||
|
||
def test_ignore_rows_2() -> None: | ||
csv_content = """A,B,C | ||
,, | ||
4,5,6 | ||
7,8,9 | ||
""" | ||
csv_document = Document(content=csv_content, meta={"name": "test.csv"}) | ||
csv_document_cleaner = CSVDocumentCleaner(ignore_rows=1) | ||
result = csv_document_cleaner.run([csv_document]) | ||
cleaned_document = result["documents"][0] | ||
assert cleaned_document.content == "A,B,C\n4,5,6\n7,8,9\n" | ||
assert cleaned_document.meta == {"name": "test.csv"} | ||
|
||
|
||
def test_ignore_rows_3() -> None: | ||
csv_content = """A,B,C | ||
4,,6 | ||
7,,9 | ||
""" | ||
csv_document = Document(content=csv_content, meta={"name": "test.csv"}) | ||
csv_document_cleaner = CSVDocumentCleaner(ignore_rows=1) | ||
result = csv_document_cleaner.run([csv_document]) | ||
cleaned_document = result["documents"][0] | ||
assert cleaned_document.content == "A,C\n4,6\n7,9\n" | ||
assert cleaned_document.meta == {"name": "test.csv"} | ||
|
||
|
||
def test_ignore_columns() -> None: | ||
csv_content = """,,A,B | ||
,2,3,4 | ||
,7,8,9 | ||
""" | ||
csv_document = Document(content=csv_content) | ||
csv_document_cleaner = CSVDocumentCleaner(ignore_columns=1) | ||
result = csv_document_cleaner.run([csv_document]) | ||
cleaned_document = result["documents"][0] | ||
assert cleaned_document.content == ",,A,B\n,2,3,4\n,7,8,9\n" | ||
|
||
|
||
def test_too_many_ignore_rows() -> None: | ||
csv_content = """,, | ||
A,B,C | ||
4,5,6 | ||
""" | ||
csv_document = Document(content=csv_content) | ||
csv_document_cleaner = CSVDocumentCleaner(ignore_rows=4) | ||
result = csv_document_cleaner.run([csv_document]) | ||
cleaned_document = result["documents"][0] | ||
assert cleaned_document.content == ",,\nA,B,C\n4,5,6\n" | ||
|
||
|
||
def test_too_many_ignore_columns() -> None: | ||
csv_content = """,, | ||
A,B,C | ||
4,5,6 | ||
""" | ||
csv_document = Document(content=csv_content) | ||
csv_document_cleaner = CSVDocumentCleaner(ignore_columns=4) | ||
result = csv_document_cleaner.run([csv_document]) | ||
cleaned_document = result["documents"][0] | ||
assert cleaned_document.content == ",,\nA,B,C\n4,5,6\n" | ||
|
||
|
||
def test_ignore_rows_and_columns() -> None: | ||
csv_content = """,A,B,C | ||
1,item,s, | ||
2,item2,fd, | ||
""" | ||
csv_document = Document(content=csv_content) | ||
csv_document_cleaner = CSVDocumentCleaner(ignore_columns=1, ignore_rows=1) | ||
result = csv_document_cleaner.run([csv_document]) | ||
cleaned_document = result["documents"][0] | ||
assert cleaned_document.content == ",A,B\n1,item,s\n2,item2,fd\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) suggestion: let's keep this list ordered alphabetically as it grows makes it easier to locate components