Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: fluvio produce is not batching when using interactive mode and --delivery-semantic at-least-once #3143

Open
morenol opened this issue Apr 14, 2023 · 10 comments
Labels

Comments

@morenol
Copy link
Contributor

morenol commented Apr 14, 2023

No description provided.

@morenol morenol added the bug Something isn't working label Apr 14, 2023
@TanNgocDo
Copy link
Contributor

Hi @morenol , I'm looking into this with fluvio cli.
I saw that there is batch created with at-least-once, and multiple backoff after that:

TRACE send{topic=greetings}: fluvio::producer::accumulator: Batch is full. Creating a new batch for partition partition_id=0
2023-05-16T05:09:09.716210Z DEBUG run: fluvio::producer::partition_producer: new batch event
2023-05-16T05:09:09.732963Z TRACE run:send_receive_with_retry{retries=Take { iter: ExponentialBackoff(ExponentialBackoff { base_millis: 20, current_millis: 20, max_delay: Some(200s) }), n: 4 }}:send_and_receive{self=MultiplexerSocket 10}:send_request{self=fd(10) req=RequestMessage { header: RequestHeader { api_key: 0, api_version: 8, correlation_id: 1, client_id: "FLUVIO_CLI" }, request: ProduceRequest { transactional_id: None, isolation: ReadUncommitted, timeout: 1.5s, topics: [TopicProduceData { name: "greetings", partitions: [PartitionProduceData { partition_index: 0, records: RecordSet { batches: [Batch { base_offset: 0, batch_len: 14, header: BatchHeader { partition_leader_epoch: -1, magic: 2, crc: 0, attributes: 0, last_offset_delta: 0, first_timestamp: 1684213749715, max_time_stamp: 1684213749716, producer_id: -1, producer_epoch: -1, first_sequence: -1 }, records: RawRecords(b"\0\0\0\x01\x12\0\x02\0\0\x06Tan\0") }] } }], data: PhantomData<fluvio_protocol::record::data::RecordSet<fluvio_protocol::record::batch::RawRecords>> }], smartmodules: [], data: PhantomData<fluvio_protocol::record::data::RecordSet<fluvio_protocol::record::batch::RawRecords>> } }}: fluvio_protocol::codec: size="encoding data with write size"

Not sure if I got it correctly, or may you give a more detail description?
Thanks

@morenol
Copy link
Contributor Author

morenol commented May 16, 2023

@TanNgocDo from what I remember. When we use atLeastOnce after every producer push on cli, we wait for the record send output. Therefore we wait for the record be successfully produced before adding more records to the batch

produce_output.wait().await?;

So, I think that this is mostly an issue on the CLI impl

@TanNgocDo
Copy link
Contributor

Thanks @morenol , it seems to be relating to this: #2808 ?
the throughput of atleastone is not good comparing with atmostonce.
Of course, will need to test more.

@morenol
Copy link
Contributor Author

morenol commented May 16, 2023

For sure, it should be related to that

@TanNgocDo
Copy link
Contributor

Looking on that 👍

@TanNgocDo
Copy link
Contributor

TanNgocDo commented May 19, 2023

@morenol , as I debug, the wait() method will go to

let base_offset = self.batch_metadata.base_offset().await?;

Which is blocked and will be notified by :
if let Err(_e) = batch_notifier.send(partition_response_fut).await {

It means this wait() method can be only notified by flush():
pub(crate) async fn flush(&self, force: bool) -> Result<()> {

And to trigger flush(), the linger_sleep must be surpassed:
_ = async { linger_sleep.as_mut().expect("unexpected failure").await }, if linger_sleep.is_some() => {

So whatever the value of linger_time we set, we need to wait till the interval is ended => linger_time is not useful here.
To sum up: with current implementation: wait() -> block ->linger_time ends -> flush the batch with only one record -> get response from socket->end of wait().
I'm not sure if this is the right behavior here: at-least-one: should we only send one record per batch = no batching ?, at least the linger_time is not useful with the current implementation of at-least-once(the bigger linger is ,the longer blocking time to send out the batch with one record). So do we need to update the documentation here: https://www.fluvio.io/docs/concepts/delivery-semantics/ ?
If we fix it by batching multiple records in at-least-once: should we change from "at-least-once delivery means that for each record handed to the producer potentially multiple attempts are made at delivering it, such that at least one succeeds"
to "at-least-once delivery means that for each batch ..."
Or keep the current implementation and disable linger_time (for al-least-once).... ?

@morenol
Copy link
Contributor Author

morenol commented May 19, 2023

@TanNgocDo I think that the ideal solution should be to fix it by batching, regarding the update of the documentation that seems accurate to me though maybe the fact that for every record it is retried could already imply that. Seems to me that the fact that if it is done per batch or record is an implementation detail and is not needed for the general documentation

@TanNgocDo
Copy link
Contributor

Thanks @morenol,
Wait() is locking point that prevents other record added to a batch.Shoud we do one of these option:?

  1. remove wait() : this will prevent us from getting future meta data...
    Or
  2. Store of list output and wait() for all output(similar with reading from file -f)
    Or
  3. Other solution... ?

@morenol
Copy link
Contributor Author

morenol commented May 22, 2023

I think that option 2 should be ok

@TanNgocDo
Copy link
Contributor

I have just added a small draft change(to make sure in the right direction) for stdin(need to refactor to reduce redundant code), and in interactive mode, it seems that we don't know how it ends input. Maybe a special key ? or keep the list number of record(add more flag to specify the number of record ?) or control the batchsize to maintain list of records for a batch ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants