Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka microservice can process only one message by topic at the moment #12703

Open
4 of 15 tasks
Klutrem opened this issue Nov 8, 2023 · 24 comments
Open
4 of 15 tasks

Kafka microservice can process only one message by topic at the moment #12703

Klutrem opened this issue Nov 8, 2023 · 24 comments

Comments

@Klutrem
Copy link

Klutrem commented Nov 8, 2023

Is there an existing issue for this?

  • I have searched the existing issues

Current behavior

When sending any message on the topic by kafka, application getting stuck and doesn't process other messages until processing on the previous version will be finished

Minimum reproduction code

https://github.com/Klutrem/kafka-issue

Steps to reproduce

  • pull repository
  • run npm i
  • run kafka and specify port and host in envs, if it's not localhost:9092
  • run npm run start
  • send message to kafka topic test

Expected behavior

It was working on the previous versions:

{
    "name": "kafka-issue",
    "version": "1.0.0",
    "private": true,
    "license": "UNLICENSED",
    "scripts": {
      "build": "nest build",
      "start": "nest start",
      "start:dev": "nest start --watch",
      "start:prod": "node dist/main"
    },
    "dependencies": {
      "@nestjs/common": "^7.6.18",
      "@nestjs/config": "^1.0.0",
      "@nestjs/core": "^7.6.15",
      "@nestjs/mapped-types": "^1.2.2",
      "@nestjs/microservices": "^7.6.18",
      "kafkajs": "^1.15.0"
    },
    "devDependencies": {
      "@nestjs/cli": "^10.1.11",
      "@nestjs/schematics": "^7.3.0",
      "@nestjs/testing": "^7.6.18",
      "@swc/cli": "^0.1.62",
      "@swc/core": "1.3.75",
      "@types/node": "^14.17.5",
      "typescript": "^4.7.4",
      "unplugin-swc": "^1.3.2",
      "vitest": "^0.34.1"
    }
  }

With that package.json, application can process any number of messages at the same time, it doesn't stuck

Package

  • I don't know. Or some 3rd-party package
  • @nestjs/common
  • @nestjs/core
  • @nestjs/microservices
  • @nestjs/platform-express
  • @nestjs/platform-fastify
  • @nestjs/platform-socket.io
  • @nestjs/platform-ws
  • @nestjs/testing
  • @nestjs/websockets
  • Other (see below)

Other package

No response

NestJS version

10.2.7

Packages versions

{
    "name": "kafka-issue",
    "version": "1.0.0",
    "private": true,
    "license": "UNLICENSED",
    "scripts": {
      "build": "nest build",
      "start": "nest start",
      "start:dev": "nest start --watch",
      "start:prod": "node dist/main"
    },
    "dependencies": {
      "@nestjs/common": "^10.2.7",
      "@nestjs/config": "^3.1.1",
      "@nestjs/core": "^10.2.7",
      "@nestjs/mapped-types": "^2.0.2",
      "@nestjs/microservices": "^10.2.7",
      "kafkajs": "^2.2.4"
    },
    "devDependencies": {
      "@nestjs/cli": "^10.2.1",
      "@nestjs/schematics": "^10.0.3",
      "@nestjs/testing": "^10.2.7",
      "@swc/cli": "^0.1.62",
      "@swc/core": "1.3.95",
      "@types/node": "^20.8.10",
      "typescript": "^5.2.2",
      "unplugin-swc": "^1.4.3",
      "vitest": "^0.34.6"
    }
  }

Node.js version

18.16.0

In which operating systems have you tested?

  • macOS
  • Windows
  • Linux

Other

No response

@Klutrem Klutrem added the needs triage This issue has not been looked into label Nov 8, 2023
@Klutrem Klutrem changed the title With @MessagePattern decorator can process only one message by topic at the moment Kafka microservice can process only one message by topic at the moment Nov 29, 2023
@MaxKoval1ov
Copy link

Same.( Any solutions on the latest preset?

@edeesis
Copy link
Contributor

edeesis commented Jan 22, 2024

I think this issue is missing some more detail to clarify whether this is expected behavior or not.

  1. Does your topic have multiple partitions?
  2. If there is more than one partition, have you tried setting https://kafka.js.org/docs/consuming#a-name-concurrent-processing-a-partition-aware-concurrency.

Kafka is intended to process messages in each partition in order, so it doesn't make sense to process messages later until you finish processing the ones in front.

@Klutrem
Copy link
Author

Klutrem commented Jan 24, 2024

I think this issue is missing some more detail to clarify whether this is expected behavior or not.

  1. Does your topic have multiple partitions?
  2. If there is more than one partition, have you tried setting https://kafka.js.org/docs/consuming#a-name-concurrent-processing-a-partition-aware-concurrency.

Kafka is intended to process messages in each partition in order, so it doesn't make sense to process messages later until you finish processing the ones in front.

So, We've updated example repository, readme was added.
https://github.com/Klutrem/kafka-issue

The problem is that consumer doesn't process message even on another topic if some message was sent to the one topic (when we send message to test1 topic, consumer doesn't process any messages on test2 topic)

@edeesis
Copy link
Contributor

edeesis commented Jan 24, 2024

That's just how eachMessage for KafkaJS works. It processes each topic-partition sequentially one at a time. You need to set that partitionsConsumedConcurrently to get what you want.

You're saying this only started happening at an upgrade to 10.x, can you reproduce in another example repository with a lower NestJs version?

@Klutrem
Copy link
Author

Klutrem commented Jan 30, 2024

So, I made another branch async-requests
Firstly, it was working the same, even after update. But then, I provided APP_INTERCEPTOR as logger interceptor, that we're using in our microservices. And after that, on old version (7.x) it works asynchronously (you can see that on screenshot), but on 10.x it works only with one message at a time. Maybe the problem in our logger interceptor, or maybe in providing app interceptor, I don't know actually.

@kamilmysliwiec
Copy link
Member

AFAIR there was a bug in v7 (or v8) that instead of awaiting the eachMessage callback (of kafkajs) we let the message be processed asynchronously which caused the incorrect behavior. It's now fixed (hence the difference).

@micalevisk micalevisk removed the needs triage This issue has not been looked into label Feb 7, 2024
@weissmall
Copy link

It processes each topic-partition sequentially one at a time.

Nothing is mentioned about different topics in documentation of KafkaJS. For sure, messages in the same partition should be processed in order, and partitionsConsumedConcurrently will solve issue with making it asynchronously for different partitions.

Kafka only implies this about partitions but not topics. Imagine that we have two users:

  • First one loads his profile page
  • Second one loads main page of the site

If we're processing two different topics in sync so while one user waits response about his profile second one can get main page of the site.

AFAIR there was a bug in v7 (or v8)
Yes, this was about in the part which related to the partitions but topics should be processed asynchronously, shouldn't they?

@kamilmysliwiec
Copy link
Member

Who said you can't process two topics in parallel though?

@weissmall
Copy link

Who said you can't process two topics in parallel though?

I guess there is misunderstanding because we're not talking about parallel computations but about concurrent.

Guys provided valid test-case in the async-requests branch where two of the requests should be processed asynchronously but they don't.

In the case of CPU-bound tasks and single-thread node for sure I cannot argue but in their case they have Promise that is gonna be returned as response on kafka message:

return new Promise<string>((resolve) => {
      setTimeout(() => {
        resolve("pong1");
        const time2 = new Date()
        console.log(`test1 took ${getTimeDifference(time1, time2)} secs`)
      }, 5000);
});

If we have two different topics with the same behaviour they should be processed concurrently but they don't.

and according to Node.JS loop such things can be ran concurrently.

@kamilmysliwiec
Copy link
Member

I'll investigate this as soon as I can, thank you @weissmall

@weissmall
Copy link

weissmall commented Feb 15, 2024

Update after debugging

File: microservices/server/server-kafka.ts
Code:

  public async handleEvent(
    pattern: string,
    packet: ReadPacket,
    context: KafkaContext,
  ): Promise<any> {
    const handler = this.getHandlerByPattern(pattern);
    if (!handler) {
      return this.logger.error(NO_EVENT_HANDLER`${pattern}`);
    }
    const resultOrStream = await handler(packet.data, context);
    if (isObservable(resultOrStream)) {
      await lastValueFrom(resultOrStream); // Here reason of the issue
    }
  }

It seems a little bit strange but while we're awaiting lastValueFrom(resultOrStream) in the described above scenario we cannot process next message from any topic until we get new response.

I guess for this situation we need to have something kinda message queue for each topic to allow different topics be processed asynchronously.

If we won't await response we'll get issue with processing one partition asynchronously which is bad, so we need some workaround or maybe core issue may be somewhere deeper that I've discovered

@Klutrem
Copy link
Author

Klutrem commented Mar 12, 2024

I'll investigate this as soon as I can, thank you @weissmall

Hi, any updates?

@kamilmysliwiec
Copy link
Member

I can reproduce this with raw kafkajs setup so it doesn't seem to be related to NestJS

import { Kafka } from "kafkajs";

const kafka = new Kafka({
  clientId: "my-app",
  brokers: ["localhost:9092"],
});

const consumer = kafka.consumer({ groupId: "test-group" });

const run = async () => {
  await consumer.connect();
  await consumer.subscribe({ topic: "test1" });
  await consumer.subscribe({ topic: "test2" });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log("Received message", {
        topic,
        partition,
        key: message.value.toString(),
      });
      await new Promise((resolve) => setTimeout(resolve, 5000));
      console.log("Processed message");
    },
  });
};

run().catch(console.error);

Result:

Received message { topic: 'test1', partition: 0, key: 'ping' }
Processed message
Received message { topic: 'test1', partition: 0, key: 'ping' }
Processed message
Received message { topic: 'test2', partition: 0, key: 'ping' }
Processed message
Received message { topic: 'test2', partition: 0, key: 'ping' }
Processed message
Received message { topic: 'test1', partition: 0, key: 'ping' }
Processed message
Received message { topic: 'test2', partition: 0, key: 'ping' }
Processed message
Received message { topic: 'test1', partition: 0, key: 'ping' }
Processed message
Received message { topic: 'test2', partition: 0, key: 'ping' }

@falkorotter
Copy link

falkorotter commented Mar 22, 2024

I'm running into same siutation ... any updates?

@mkaufmaner
Copy link
Contributor

Have you tried using an observable? #3954 (comment)

@goodhumored
Copy link

Have you tried using an observable? #3954 (comment)

Same
image
Am i doing something wrong?

@weissmall
Copy link

With using Blizzard's node-rdkafka everything works perfect

Code:

var Transform = require('stream').Transform;

const { randomInt } = require('crypto');
var Kafka = require('node-rdkafka');

var stream = Kafka.KafkaConsumer.createReadStream({
  'metadata.broker.list': 'localhost:9092',
  'group.id': 'test-group',
  'socket.keepalive.enable': true,
  'enable.auto.commit': true,
  "offset.store.method": 'broker',
}, {
  'auto.offset.reset': "end",
  "offset.store.method": 'broker',
  "enable.auto.commit": true,
  "auto.commit.interval.ms": 5000,
  "offset.store.sync.interval.ms": 5000,
}, {
  topics: ['test1', 'test2'],
  waitInterval: 0,
  objectMode: false
});

stream.on('error', function(err) {
  if (err) console.log(err);
  process.exit(1);
});

stream.on('error', function(err) {
  console.log(err);
  process.exit(1);
});

stream.consumer.on('event.error', function(err) {
  console.log(err);
})

stream.consumer.on('ready', (_) => {
  console.log('Ready');
})

stream.consumer.on('subscribed', (_) => {
  console.log('Subscribed')
})

stream.consumer.on('data', async ({ topic, value, partition }) => {
  const id = randomInt(500);
  console.log(`Got message with [id ${id}] at [${new Date()}]\n`, {
    topic, value: value.toString(), partition,
  })

  await new Promise((resolve) => setTimeout(resolve, 5000));
  console.log(`Processed message with [id ${id}] at [${new Date()}]`);
})

Received logs:

Got message with [id 453] at [Thu Apr 25 2024 18:43:20 GMT+0300 (Moscow Standard Time)]
 { topic: 'test2', value: 'ping', partition: 0 }
Got message with [id 333] at [Thu Apr 25 2024 18:43:20 GMT+0300 (Moscow Standard Time)]
 { topic: 'test2', value: 'ping', partition: 0 }
Got message with [id 159] at [Thu Apr 25 2024 18:43:20 GMT+0300 (Moscow Standard Time)]
 { topic: 'test2', value: 'ping', partition: 0 }
Got message with [id 390] at [Thu Apr 25 2024 18:43:20 GMT+0300 (Moscow Standard Time)]
 { topic: 'test2', value: 'ping', partition: 0 }
Got message with [id 437] at [Thu Apr 25 2024 18:43:21 GMT+0300 (Moscow Standard Time)]
 { topic: 'test1', value: 'ping', partition: 0 }
Got message with [id 155] at [Thu Apr 25 2024 18:43:21 GMT+0300 (Moscow Standard Time)]
 { topic: 'test1', value: 'ping', partition: 0 }
Got message with [id 348] at [Thu Apr 25 2024 18:43:21 GMT+0300 (Moscow Standard Time)]
 { topic: 'test1', value: 'ping', partition: 0 }
Got message with [id 247] at [Thu Apr 25 2024 18:43:21 GMT+0300 (Moscow Standard Time)]
 { topic: 'test1', value: 'ping', partition: 0 }
Got message with [id 74] at [Thu Apr 25 2024 18:43:21 GMT+0300 (Moscow Standard Time)]
 { topic: 'test1', value: 'ping', partition: 0 }
Got message with [id 374] at [Thu Apr 25 2024 18:43:22 GMT+0300 (Moscow Standard Time)]
 { topic: 'test2', value: 'ping', partition: 0 }
Got message with [id 393] at [Thu Apr 25 2024 18:43:22 GMT+0300 (Moscow Standard Time)]
 { topic: 'test2', value: 'ping', partition: 0 }
Got message with [id 414] at [Thu Apr 25 2024 18:43:22 GMT+0300 (Moscow Standard Time)]
 { topic: 'test2', value: 'ping', partition: 0 }
Got message with [id 179] at [Thu Apr 25 2024 18:43:22 GMT+0300 (Moscow Standard Time)]
 { topic: 'test2', value: 'ping', partition: 0 }
Got message with [id 154] at [Thu Apr 25 2024 18:43:23 GMT+0300 (Moscow Standard Time)]
 { topic: 'test2', value: 'ping', partition: 0 }
Got message with [id 486] at [Thu Apr 25 2024 18:43:23 GMT+0300 (Moscow Standard Time)]
 { topic: 'test1', value: 'ping', partition: 0 }
Got message with [id 480] at [Thu Apr 25 2024 18:43:24 GMT+0300 (Moscow Standard Time)]
 { topic: 'test1', value: 'ping', partition: 0 }
Got message with [id 182] at [Thu Apr 25 2024 18:43:24 GMT+0300 (Moscow Standard Time)]
 { topic: 'test1', value: 'ping', partition: 0 }
Got message with [id 300] at [Thu Apr 25 2024 18:43:24 GMT+0300 (Moscow Standard Time)]
 { topic: 'test1', value: 'ping', partition: 0 }
Processed message with [id 453] at [Thu Apr 25 2024 18:43:25 GMT+0300 (Moscow Standard Time)]
Processed message with [id 333] at [Thu Apr 25 2024 18:43:25 GMT+0300 (Moscow Standard Time)]
Processed message with [id 159] at [Thu Apr 25 2024 18:43:25 GMT+0300 (Moscow Standard Time)]
Processed message with [id 390] at [Thu Apr 25 2024 18:43:25 GMT+0300 (Moscow Standard Time)]
Processed message with [id 437] at [Thu Apr 25 2024 18:43:26 GMT+0300 (Moscow Standard Time)]
Processed message with [id 155] at [Thu Apr 25 2024 18:43:26 GMT+0300 (Moscow Standard Time)]
Processed message with [id 348] at [Thu Apr 25 2024 18:43:26 GMT+0300 (Moscow Standard Time)]
Processed message with [id 247] at [Thu Apr 25 2024 18:43:26 GMT+0300 (Moscow Standard Time)]
Processed message with [id 74] at [Thu Apr 25 2024 18:43:26 GMT+0300 (Moscow Standard Time)]
Processed message with [id 374] at [Thu Apr 25 2024 18:43:27 GMT+0300 (Moscow Standard Time)]
Processed message with [id 393] at [Thu Apr 25 2024 18:43:27 GMT+0300 (Moscow Standard Time)]
Processed message with [id 414] at [Thu Apr 25 2024 18:43:27 GMT+0300 (Moscow Standard Time)]
Processed message with [id 179] at [Thu Apr 25 2024 18:43:27 GMT+0300 (Moscow Standard Time)]
Processed message with [id 154] at [Thu Apr 25 2024 18:43:28 GMT+0300 (Moscow Standard Time)]
Processed message with [id 486] at [Thu Apr 25 2024 18:43:28 GMT+0300 (Moscow Standard Time)]
Processed message with [id 480] at [Thu Apr 25 2024 18:43:29 GMT+0300 (Moscow Standard Time)]
Processed message with [id 182] at [Thu Apr 25 2024 18:43:29 GMT+0300 (Moscow Standard Time)]
Processed message with [id 300] at [Thu Apr 25 2024 18:43:29 GMT+0300 (Moscow Standard Time)]

All messages are processing in the correct order and we can receive them in the time when we're processing previous messages

@kamilmysliwiec

@kamilmysliwiec
Copy link
Member

Would you like to create a PR replacing kafkajs with node-rdkafka?

@mkaufmaner
Copy link
Contributor

Would you like to create a PR replacing kafkajs with node-rdkafka?

@kamilmysliwiec This will be a breaking change but I can take a swing at it, it has been a while.

@edeesis
Copy link
Contributor

edeesis commented May 4, 2024

Does it have to be a breaking change? Could it be introduced as a new transport type and mark the old one for deprecation?

@mkaufmaner
Copy link
Contributor

Does it have to be a breaking change? Could it be introduced as a new transport type and mark the old one for deprecation?

It can be implemented as a new transport which is what I will work towards first. @kamilmysliwiec let me know which direction you would want to take.

I am opening a new issue here; #13535

@kamilmysliwiec
Copy link
Member

Sounds good @mkaufmaner

@stitch20
Copy link

stitch20 commented Aug 7, 2024

I think waiting for "combineStreamsAndThrowIfRetriable()" in the handleMessage method of the ServerKafka class might be an issue.
When I changed the code to asynchronous, I was able to receive multiple messages before the previous message processing ended.

Another problem is that the HeartBeat operation cannot be performed, resulting in a problem of rebalancing because it is blocked while processing one message.

Do you have any updates planned to address these issues?

@kamilmysliwiec
Copy link
Member

combineStreamsAndThrowIfRetriable is not an issue, you can reproduce this with raw kafkajs setup #12703 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

10 participants