-
Notifications
You must be signed in to change notification settings - Fork 171
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: bulksubscribe http #478
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: sadath-12 <[email protected]>
Signed-off-by: sadath-12 <[email protected]>
Signed-off-by: sadath-12 <[email protected]>
Signed-off-by: sadath-12 <[email protected]>
Signed-off-by: sadath-12 <[email protected]>
Signed-off-by: sadath-12 <[email protected]>
Signed-off-by: sadath-12 <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #478 +/- ##
==========================================
+ Coverage 58.04% 58.63% +0.58%
==========================================
Files 55 55
Lines 3568 3701 +133
==========================================
+ Hits 2071 2170 +99
- Misses 1375 1399 +24
- Partials 122 132 +10 ☔ View full report in Codecov by Sentry. |
Signed-off-by: sadath-12 <[email protected]>
Signed-off-by: sadath-12 <[email protected]>
Codes look good, please add unit tests. |
Signed-off-by: sadath-12 <[email protected]>
Added it 🙂 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although the limited tests are passing and likewise with the validator, I'd like to see a validated bulk subscription.
Signed-off-by: sadath-12 <[email protected]>
Signed-off-by: sadath-12 <[email protected]>
Thank you for the review @mikeee . Added them |
Signed-off-by: sadath-12 <[email protected]>
Please fix conflicts. |
Signed-off-by: sadath-12 <[email protected]>
Done @daixiang0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are regressions with the tests moving back to assert
rather than require
when evaluating errors
Signed-off-by: sadath-12 <[email protected]>
Please make CI happy. |
Signed-off-by: sadath-12 <[email protected]>
Signed-off-by: sadath-12 <[email protected]>
Signed-off-by: sadath-12 <[email protected]>
Signed-off-by: sadath-12 <[email protected]>
Signed-off-by: sadath-12 <[email protected]>
Signed-off-by: sadath-12 <[email protected]>
Yup @daixiang0 it seems to be chilling now . 😅 Thanks |
Signed-off-by: sadath-12 <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few quick-win tests can be implemented on the code to improve coverage on top of the identified which I've marked
Signed-off-by: sadath-12 <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've missed this over the past few reviews but Redis is not suitable for testing bulk pub/sub. Would it be wise to migrate to another broker (for example kafka or I believe ASB) so that we can validate this example?
I'd like to see a validated run that doesn't fallback to singular pub/sub before I review again?
@@ -37,16 +37,25 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE | |||
return s.topicRegistrar.AddSubscription(sub, fn) | |||
} | |||
|
|||
func (s *Server) AddBulkTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler, maxMessagesCount, maxAwaitDurationMs int32) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As my previous review - the validation of arguments passed to these parameters should be implemented as per the implementation spec. I do think that this is something we need to address both sdk-side and in the runtime explicitly as part of best practice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what default values you suggest if nil values are given?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error-out if either are X <= 0
this would prevent a negative value, a nil value is not possible for the int type.
func TestTopicErrors(t *testing.T) { | ||
server := getTestServer() | ||
err := server.AddTopicEventHandler(nil, nil) | ||
require.Errorf(t, err, "expected error on nil sub") | ||
require.Error(t, err, "expected error on nil sub with AddTopicEventHandler") | ||
|
||
err = server.AddBulkTopicEventHandler(nil, nil, 0, 0) | ||
require.Error(t, err, "expected error on nil sub with AddBulkTopicEventHandler") | ||
|
||
sub := &common.Subscription{} | ||
err = server.AddTopicEventHandler(sub, nil) | ||
require.Errorf(t, err, "expected error on invalid sub") | ||
require.Error(t, err, "expected error on invalid sub with AddTopicEventHandler") | ||
err = server.AddBulkTopicEventHandler(sub, nil, 0, 0) | ||
require.Error(t, err, "expected error on invalid sub with AddBulkTopicEventHandler") | ||
|
||
sub.PubsubName = "messages" | ||
err = server.AddTopicEventHandler(sub, nil) | ||
require.Errorf(t, err, "expected error on sub without topic") | ||
require.Error(t, err, "expected error on sub without topic with AddTopicEventHandler") | ||
sub.PubsubName = "messages" | ||
err = server.AddBulkTopicEventHandler(sub, nil, 0, 0) | ||
require.Error(t, err, "expected error on sub without topic with AddBulkTopicEventHandler") | ||
|
||
sub.Topic = "test" | ||
err = server.AddTopicEventHandler(sub, nil) | ||
require.Errorf(t, err, "expected error on sub without handler") | ||
require.Error(t, err, "expected error on sub without handler") | ||
err = server.AddBulkTopicEventHandler(sub, nil, 0, 0) | ||
require.Error(t, err, "expected error on sub without handler") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Break this down into idiomatic sub-tests, you can share the setup steps here but it doesn't read well when running tests.
@@ -44,10 +48,9 @@ expected_stdout_lines: | |||
background: true | |||
sleep: 15 | |||
--> | |||
#### Note: pub/pub.go contains both PublishEvents (used for publish of messages) and PublishEvent (used for bulkPublish of messages) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above
#### Note: sub/sub.go contains both AddTopicEventHandler (used for subscribe of messages) and AddBulkTopicEventHandler (used for bulksubscribe of messages) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
go-sdk/examples/pubsub/README.md
Line 3 in de720d9
This folder contains two Go files that use the Go SDK to invoke the Dapr Pub/Sub API. |
Remove this and modify the above to include something along the lines of:
This folder contains a publisher and subscriber that demonstrates both standard pubsub and bulk-pubsub
@@ -35,12 +35,10 @@ var defaultSubscription = &common.Subscription{ | |||
Route: "/orders", | |||
} | |||
|
|||
var importantSubscription = &common.Subscription{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to see the importantSubscription re-implemented as a way to validate the route
being correct for a message on the same topic alongside match
/priority
if err != nil { | ||
http.Error(w, err.Error(), PubSubHandlerDropStatusCode) | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no unhandled error at this point, could you clarify that if a single event is dropped it will be replayed/retried at a later date?
func writeBulkStatus(w http.ResponseWriter, s BulkSubscribeResponse) { | ||
if err := json.NewEncoder(w).Encode(s); err != nil { | ||
http.Error(w, err.Error(), PubSubHandlerRetryStatusCode) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's fine to create a new function as it is manifestly different or is idiomatic, do you think this is better or would it be better to pass the slice to your function as an argument and wrap it within the function?
if assert.Len(t, subs, 2, "unexpected subscription count") { | ||
assert.Equal(t, "messages", subs[0].PubsubName) | ||
assert.Equal(t, "errors", subs[0].Topic) | ||
|
||
assert.Equal(t, "messages", subs[1].PubsubName) | ||
assert.Equal(t, "test", subs[1].Topic) | ||
assert.Equal(t, "", subs[1].Route) | ||
assert.Equal(t, "/", subs[1].Routes.Default) | ||
if assert.Len(t, subs[1].Routes.Rules, 1, "unexpected rules count") { | ||
assert.Equal(t, `event.type == "other"`, subs[1].Routes.Rules[0].Match) | ||
assert.Equal(t, "/other", subs[1].Routes.Rules[0].Path) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is quite confusing to read - would it be more idiomatic to create a known map and compare this to the subscriptions?
func TestAddingInvalidBulkEventHandlers(t *testing.T) { | ||
s := newServer("", nil) | ||
err := s.AddBulkTopicEventHandler(nil, testTopicFunc, 10, 1000) | ||
require.Error(t, err, "expected error adding no sub event handler") | ||
|
||
sub := &common.Subscription{Metadata: map[string]string{}} | ||
err = s.AddBulkTopicEventHandler(sub, testTopicFunc, 10, 1000) | ||
require.Error(t, err, "expected error adding empty sub event handler") | ||
|
||
sub.Topic = "test" | ||
err = s.AddBulkTopicEventHandler(sub, testTopicFunc, 10, 1000) | ||
require.Error(t, err, "expected error adding sub without component event handler") | ||
|
||
sub.PubsubName = "messages" | ||
err = s.AddBulkTopicEventHandler(sub, testTopicFunc, 10, 1000) | ||
require.Error(t, err, "expected error adding sub without route event handler") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please split this into subtests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Provide negative tests
can we keep the migration in separate pr ? |
I have to answer that with another question - has the bulk pub/sub been validated elsewhere? All I'm seeing is fallbacks. |
sure no issues lets test it for others as well . Before I jump wanted to confirm what you mean by fallback here ? because when I see the demo of bulksubscribe in the dapr docs and how js-sdk has implemented bulksubscribe . I have made sure similar behaviour is achieved here . would you like to explain what do you expect clearly ? Happy to implement whatever works best for the project 😊 If possible maybe we could drive the talk on the discord as well since some discussion is required on this |
The issue is that since the broker used does not implement bulk pub/sub methods, we are effectively dropping down to single pub/sub. Looking at the validation run it is highlighted in the logs, likewise in the js-sdk I note this is an issue too as it uses rabbitmq but this is not validated so you'd run into the same result if run in debug mode. |
Ya the dapr itself sends back the response one by one . so with all those brokers we would get the same result right ? we run the callback for each entry and send response to dapr |
@@ -44,7 +44,7 @@ func main() { | |||
} | |||
|
|||
// Publish multiple events | |||
if res := client.PublishEvents(ctx, pubsubName, topicName, publishEventsData); res.Error != nil { | |||
if res := client.PublishEvents(ctx, pubsubName, bulkTopicName, publishEventsData); res.Error != nil { | |||
panic(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem entirely right, you're panicking here with an unrelated (potentially) nil error
@mikeee what you want to say about this approach are we going up with this pr after those things resolved ? |
I think we should definitely try to get this merged even as a fallback validation, as long as there is validation locally just to make sure it's working then that should be fine 👍 |
wondering if there's any progress on this PR? I'm happy to help finish this one. Would you please resolve those conflicts if you'd like me to keep your commits? @sadath-12 |
Signed-off-by: sadath-12 <[email protected]>
Signed-off-by: sadath-12 <[email protected]>
@mikeee any other feedback on this PR? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could the tests implement a TopicEventBulkRequest?
Fixes #423 for http bulksubscribe