-
Notifications
You must be signed in to change notification settings - Fork 5
ING-704: Added initial support for DCP. #359
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces initial support for Database Change Protocol (DCP) in the codebase, enabling applications to stream database mutations and events in real-time. DCP allows clients to subscribe to data changes at the vbucket level and receive notifications about mutations, deletions, and other database events.
Key changes:
- Added DCP configuration options and event handlers throughout the client architecture
- Implemented DCP stream management with
DcpStreamSetfor opening/closing vbucket streams - Created router abstractions (static and dynamic) for handling DCP events
Reviewed changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| multikvendpointclientmanager.go | Added DCP options and handlers to manager configuration |
| mock_kvclient_test.go | Generated mock implementations for new DCP client methods |
| kvendpointclientmanager.go | Threaded DCP options through endpoint client manager |
| kvclientpool.go | Updated client pool to propagate DCP configuration |
| kvclientbabysitter.go | Added DCP options to client babysitter initialization |
| kvclient_ops.go | Implemented DCP stream request and close stream operations |
| kvclient_dcp.go | Core DCP bootstrapping logic with feature negotiation |
| kvclient.go | Integrated DCP state management and unsolicited packet handling |
| dcpstreamset.go | Manages multiple DCP streams across vbuckets |
| dcpstreamrouter_static.go | Static router for dispatching DCP events to handlers |
| dcpstreamrouter_dyn.go | Dynamic router with stream registration/unregistration |
| dcpevents.go | Defined DCP event handler types |
| dcpcomponent.go | Stream set manager for creating DCP stream sets |
| agent_ops.go | Agent-level API for creating DCP stream sets |
| agent_dcp_int_test.go | Integration test verifying basic DCP functionality |
| agent.go | Initialized DCP component in agent structure |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return memdClient_SimpleCall(ctx, c, memdx.OpsDcp{ | ||
| ExtFramesEnabled: c.HasFeature(memdx.HelloFeatureAltRequests), | ||
| CollectionsEnabled: c.HasFeature(memdx.HelloFeatureCollections), | ||
| StreamIdsEnabled: false, |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The StreamIdsEnabled field is hardcoded to false, but DCP options include an EnableStreamIds setting. Consider using c.dcpState.streamIdsEnabled or a similar mechanism to dynamically set this value based on the negotiated DCP state, ensuring consistency between the client's DCP configuration and its operational behavior.
| StreamIdsEnabled: false, | |
| StreamIdsEnabled: c.dcpState.streamIdsEnabled, |
| } | ||
|
|
||
| func (r *DcpStreamRouterDyn) getHandlers(streamId uint16) (DcpEventsHandlers, bool) { | ||
| state := r.state.Load() |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential nil pointer dereference if state.Load() returns nil. This can happen when getHandlers is called before any streams are registered and rebuildStateLocked has not been invoked. Add a nil check for state before accessing state.streams.
| state := r.state.Load() | |
| state := r.state.Load() | |
| if state == nil { | |
| return DcpEventsHandlers{}, false | |
| } |
No description provided.