Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kq下多Processors位移提交的问题 #28

Open
wanzirong opened this issue Jun 13, 2022 · 6 comments
Open

kq下多Processors位移提交的问题 #28

wanzirong opened this issue Jun 13, 2022 · 6 comments

Comments

@wanzirong
Copy link

感谢对kafka-go进行服务化封装,的确用起来更简单了!

我对消费代码有个疑惑,望解答:

for i := 0; i < q.c.Processors; i++ {
		q.consumerRoutines.Run(func() {
			for msg := range q.channel {
				if err := q.consumeOne(string(msg.Key), string(msg.Value)); err != nil {
					logx.Errorf("Error on consuming: %s, error: %v", string(msg.Value), err)
				}
				q.consumer.CommitMessages(context.Background(), msg)
			}
		})
	}

多个goroutine并行提交位移是否有问题?如提前把大位移提交导致丢消息,或滞后把小位移提交导致重复消费。

@colinrs
Copy link

colinrs commented Oct 19, 2023

我看了代码,认为也会是这样,会导致丢消息和重复消费

@dinofei
Copy link

dinofei commented Mar 29, 2024

感谢对kafka-go进行服务化封装,的确用起来更简单了!

我对消费代码有个疑惑,望解答:

for i := 0; i < q.c.Processors; i++ {
		q.consumerRoutines.Run(func() {
			for msg := range q.channel {
				if err := q.consumeOne(string(msg.Key), string(msg.Value)); err != nil {
					logx.Errorf("Error on consuming: %s, error: %v", string(msg.Value), err)
				}
				q.consumer.CommitMessages(context.Background(), msg)
			}
		})
	}

多个goroutine并行提交位移是否有问题?如提前把大位移提交导致丢消息,或滞后把小位移提交导致重复消费。

不会,因为channel是无容量的,多个goroutine一起消费也只会按顺序提交

@1426919587
Copy link

感谢对kafka-go进行服务化封装,的确用起来更简单了!
我对消费代码有个疑惑,望解答:

for i := 0; i < q.c.Processors; i++ {
		q.consumerRoutines.Run(func() {
			for msg := range q.channel {
				if err := q.consumeOne(string(msg.Key), string(msg.Value)); err != nil {
					logx.Errorf("Error on consuming: %s, error: %v", string(msg.Value), err)
				}
				q.consumer.CommitMessages(context.Background(), msg)
			}
		})
	}

多个goroutine并行提交位移是否有问题?如提前把大位移提交导致丢消息,或滞后把小位移提交导致重复消费。

不会,因为channel是无容量的,多个goroutine一起消费也只会按顺序提交

为啥,consumeOne时间不确定,即使消费者按顺序,提交也不一定按顺序

@dinofei
Copy link

dinofei commented Apr 10, 2024

感谢对kafka-go进行服务化封装,的确用起来更简单了!
我对消费代码有个疑惑,望解答:

for i := 0; i < q.c.Processors; i++ {
		q.consumerRoutines.Run(func() {
			for msg := range q.channel {
				if err := q.consumeOne(string(msg.Key), string(msg.Value)); err != nil {
					logx.Errorf("Error on consuming: %s, error: %v", string(msg.Value), err)
				}
				q.consumer.CommitMessages(context.Background(), msg)
			}
		})
	}

多个goroutine并行提交位移是否有问题?如提前把大位移提交导致丢消息,或滞后把小位移提交导致重复消费。

不会,因为channel是无容量的,多个goroutine一起消费也只会按顺序提交

为啥,consumeOne时间不确定,即使消费者按顺序,提交也不一定按顺序

kafka-go 会merge commit,这里面会按照message的offset排序的

@1426919587
Copy link

1426919587 commented Apr 10, 2024

感谢对kafka-go进行服务化封装,的确用起来更简单了!
我对消费代码有个疑惑,望解答:

for i := 0; i < q.c.Processors; i++ {
		q.consumerRoutines.Run(func() {
			for msg := range q.channel {
				if err := q.consumeOne(string(msg.Key), string(msg.Value)); err != nil {
					logx.Errorf("Error on consuming: %s, error: %v", string(msg.Value), err)
				}
				q.consumer.CommitMessages(context.Background(), msg)
			}
		})
	}

多个goroutine并行提交位移是否有问题?如提前把大位移提交导致丢消息,或滞后把小位移提交导致重复消费。

不会,因为channel是无容量的,多个goroutine一起消费也只会按顺序提交

为啥,consumeOne时间不确定,即使消费者按顺序,提交也不一定按顺序

kafka-go 会merge commit,这里面会按照message的offset排序的

看了一下源码,是有merge操作
func (o offsetStash) merge(commits []commit) { for _, c := range commits { offsetsByPartition, ok := o[c.topic] if !ok { offsetsByPartition = map[int]int64{} o[c.topic] = offsetsByPartition } if offset, ok := offsetsByPartition[c.partition]; !ok || c.offset > offset { offsetsByPartition[c.partition] = c.offset } } }

但是上面只要大于就更新,不是连续的啊,比如现在有offset的2,3,4个消息处理,4先提交,那2,3都没提交,万一2,3失败了,是不是也相当于他们成功,下一次获取到5的消息

@why2go
Copy link

why2go commented Jun 5, 2024

我也有同样的疑问,看起来是不是只能定义一个processer才能保证提交正常呀,是不是应该设计成像spring-kafka那样,每个consumer对应着一个消费线程,这样就可以保证提交不错乱了

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants