Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GroupRowFilter buffer not forwarded when empty line(s) at the end of the csv file #667

Open
Belbli opened this issue Sep 17, 2024 · 0 comments · May be fixed by #681
Open

GroupRowFilter buffer not forwarded when empty line(s) at the end of the csv file #667

Belbli opened this issue Sep 17, 2024 · 0 comments · May be fixed by #681

Comments

@Belbli
Copy link

Belbli commented Sep 17, 2024

Hello!

I'm using FilePulseSourceConnector with CSVFilter:

    "filters.ParseCSVLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.CSVFilter",
    "filters.ParseCSVLine.columns": "col1:STRING;col2:STRING",
    "filters.ParseCSVLine.separator": ";",

and GroupRowFilter:

    "filters.GroupByFirstColumn.type": "io.streamthoughts.kafka.connect.filepulse.filter.GroupRowFilter",
    "filters.GroupByFirstColumn.fields": "col1",
    "filters.GroupByFirstColumn.max.buffered.records": "5",
    "filters.GroupByFirstColumn.target": "batch",

And looks like GroupRowFilter's last buffer is not forwarded when the processed csv file has an empty line at the end of the csv file. Example:

val1;val1
val1;val2
val1;val3
val1;val4
val1;val5
val1;val6
val1;val7
<empty line>

So when I submit this file to connector which has max.buffered.records=5 for GroupRowFilter, I see only one record in the topic instead of two. val1;val6 and val1;val7 are not sent to Kafka

Steps To Reproduce

  1. Create filepulse connector with the filters described above
  2. submit csv file with the number of non empty rows of which is not a multiple of the filters.GroupByFirstColumn.max.buffered.records (in this case 5) and the last line of which is empty

bash script to reproduce it with docker-compose-debug.yml from the project:

#!/bin/bash

./debug.sh

curl -X PUT localhost:8083/connectors/group-filter-connector/config --header "Content-Type: application/json" \
  -d '{
        "schema.registry.url": "http://schema-registry:8085",
        "fs.listing.directory.path": "/tmp/kafka-connect/",
        "tasks.file.status.storage.bootstrap.servers": "kafka:29092",
        "fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing",
        "tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy ",
        "connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path": "/tmp/kafka-connect/",
        "fs.scan.interval.ms": "10000",
        "fs.scan.filters": "io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern": ".*\\.csv$",
        "filters": "ParseCSVLine,GroupByFirstColumn",

        "filters.ParseCSVLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.CSVFilter",
        "filters.ParseCSVLine.columns": "col1:STRING;col2:STRING",
        "filters.ParseCSVLine.separator": ";",

        "filters.GroupByFirstColumn.type": "io.streamthoughts.kafka.connect.filepulse.filter.GroupRowFilter",
        "filters.GroupByFirstColumn.fields": "col1",
        "filters.GroupByFirstColumn.max.buffered.records": "5",
        "filters.GroupByFirstColumn.target": "batch",

        "value.connect.schema":"{\"name\":\"com.example.Data\",\"type\":\"STRUCT\",\"isOptional\":false,\"fieldSchemas\":{\"batch\":{\"type\":\"ARRAY\",\"isOptional\":false,\"valueSchema\":{\"name\":\"com.example.Batch\",\"type\":\"STRUCT\",\"isOptional\":false,\"fieldSchemas\":{\"col1\":{\"type\":\"STRING\",\"isOptional\":false},\"col2\":{\"type\":\"STRING\",\"isOptional\":false}}}}},\"version\":1}",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "topic": "grouped-data",
        "internal.kafka.reporter.bootstrap.servers": "kafka:29092",
        "internal.kafka.reporter.topic": "connect-file-pulse-status"
      }'

echo -e "\n\nCopying test file to kafka-connect-file-pulse-master-connect-1..."

docker cp ./columns.csv kafka-connect-file-pulse-master-connect-1:/tmp/kafka-connect/columns.csv

echo -e "\n\ncheck for messages here (need to wait a bit): http://localhost:8087/ui/docker-kafka-server/topic/grouped-data/data?sort=Oldest&partition=All"

exit

Expected behavior
2 records published to kafka. The first record contains 5 entries in the result array and the second record contains 2 entries.

Actual behavior
Only one record containing 5 entries is sent to kafka

Can you please check it out?
Thank you

Belbli pushed a commit to Belbli/kafka-connect-file-pulse that referenced this issue Oct 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant