-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Draft] [RFC] Introduce shard_pg #7609
base: master
Are you sure you want to change the base?
Conversation
CT Test Results 2 files 66 suites 1h 1m 53s ⏱️ For more details on these failures, see this check. Results for commit dbd5ded. ♻️ This comment has been updated with latest results. To speed up review, make sure that you have read Contributing to Erlang/OTP and that all checks pass. See the TESTING and DEVELOPMENT HowTo guides for details about how to run test locally. Artifacts// Erlang/OTP Github Action Bot |
@zzydxm For Key in [1..10000] you could use a solution like: 1> Key = 11.
11
2> GroupId = ((Key - 1) div 10) + 1.
2
3> GroupName = "group_" ++ erlang:integer_to_list(GroupId).
"group_2" That avoids the need to store a mapping. Is the use-case of |
"GroupId = ((Key - 1) div 10) + 1." is the mapping: fun(Key) -> GroupId What if the cluster need expand/shrink and group size need to be changed? How can we do the following change without modifying client code?
|
@zzydxm Erlang has hot code loading available, so there is no reason to not change the client source code. A new separate mapping using a different hash function can always use a different way of creating group names, or it could use a different scope process, to represent the same processes in process groups. The client source code change to use a different way of creating group names or a different scope process is minimal. That helps to avoid the storage consumption if mappings were stored that gradually become irrelevant (as they are changed) because that would be a waste of memory. |
Doing client code change is more complex than a hot load, it will have to be 3 steps: 1. All server need to first replicate data to have data in both hash function and join both groups, 2. Then we hot load new hash function to all clients. 3 Finally we leave all old group and delete data in all servers. We used to do a 2x expansion every time where a simple hash function might work. With a cluster of thousands of machines it is not realistic to do a double expansion. In that case, we will either need to do a full data copy to have a simple hash function, or use a complex hash function for minimal data movement and safe data migration, which has no difference than storing the full shard mapping. If we use shard_pg, this can happen one node at a time, without any client change. |
@zzydxm What you have described should have mapping lookup failures as the mapping data changes on a single node, which could be avoided by blocking the lookup for some timeout length of time (before it causes a lookup failure while the mapping data changes). It seems typical that all nodes would not be blocked to avoid failures on all nodes at the same time, so the mapping data would change gradually among the nodes (gradually causing delays and possibly failures). If you were to do a hot code load with the client source code after changing the hash function it is using, the hot code load is an atomic action on the single node and the delay associated with the change is limited in milliseconds. The only requirement is that both the old and new process groups coexist during the client source code update, which both allows the upgrade and the rollback if there is a reason to reverse the change. By minimizing the data being changed you are able to minimize the downtime. With that perspective, the |
@okeuday Hmmm I don't quite get your point. Shard map and hash function should be equivalent: they are both a projection from Key to ShardId. Both can be hot-loaded as compiled code, or as a dynamic config. You can literally write such a function, compile it, and hot-load it: With shard_pg that uses ETS, why do you think lookup failures or downtime will be more than what pg has? pg has an issue where if a process leave the group and immediately stop serving the traffic without a grace period, some request will be sent to a blackhole. But if you keep serve the traffic for a little bit more time after leaving the group it would be good enough, same as shard_pg. Is your most concern that the shard map should be as simple as a hash function which can be described in a few bytes instead of the full map, or the shard map change should happen transactionally on all nodes at the same time? Both of these do not work for us at least: My example in the beginning is simple, but we do have a big cluster with 40k+ nodes. As described above, keep the shard map simple is not very realistic: it needs significant extra capacity on shard remapping. In fact, we already store the full map on all our client in a ordered_set ETS for now and do the exactly same lookup as shard_pg to get the pg group name. We also live in a world that cloud container restart is relatively frequent (each restarts several times a week), and each time it restarts we are not even promised to have the same hardware: we lose the extra capacity of the host if we do static mapping. Later we will be moved to a state that we don't even know how many containers we will have, but only a total capacity, which means the shard map need to be super dynamic, where a phased hot code load everytime is not acceptable. This might be a special requirement for us to make the shard map that dynamic, but I would expect this also saving huge operational time with all sharded Erlang services: manual operation is no longer needed for shard map changes (simply deploy new config on server side, without multiple phases applied), the extra host capacity to do the shard map change will also be minimized (can be 1 node at a time, and the node don't even need to serve double data). |
As we see it, this is more or less the same as having a group for each shard in Regardless, we feel that this is so close to It feels relatively easy to extend the API so that shard ranges and pid metadata can be included. However, the protocol between nodes is problematic. @sverker introduced a new |
Yes exactly, this is mostly a performance improvement when shard count is super large, as well as providing metadata information. A few difference is that it supports re-join with different shard info and metadata, instead of duplicated record in a list. Also internally we are using slightly different thing than this commit: returning [{Pid, Metadata}] format instead of [Pid]. Yeah OTP 28 seems very late. If it is acceptable by default disable the feature, and warn that new version should not be used with old OTP (but can start a new pg Scope instead) then I think it works. Not sure if there is better solution though. |
I can work on a version that have the logic inside the pg module, but I think a few things need to be decided first (star-ed are my personal preference):
How do you think? |
We were thinking of extending the existing API more tightly with the new features (metadata and shards). Since the old functionality support Below is an example of how we think the API could look like. This is of course up for discussion. We've added the shard information (range or shard id) in between the scope and the group arguments which makes it possible to distinguish between different arguments in all functions. The optional arguments (scope and shard info) live at the beginning of the argument list. The shard information also gets close to group which it is closely related to.
|
Thanks a lot! Let me think of the implementation. There is a limitation on the double join behavior: With map (single join) we can do metadata update in-place, but with double join we have to join new metadata/ranges and remove old ones separately, which leaves more consideration for users and racing issues. Also double join can be represented as {Pid, Count} if user wants. Do you think we can make the API better? Can we only allow double join of pid() but not member_metadata()? Also about how to make it compatible with old version. For hot code load I think everything except swapping the ETS from sets to ordered_sets can be done using code_change callback in gen_server with a version control: only accept old API and send old sync message when application:get_env(kernel, use_new_pg) is not true. However I don't see a way that does the ETS migration yet. |
Regarding upgrade and compatibility: There is no need to support upgrading via hot code loading. The kernel application requires restart of the runtime system when upgrading. An upgraded system, however, needs to be able to communicate with a system that has not been upgraded. Code in one OTP release X needs to be able to communicate with OTP releases in the range [X-2, X+2]. The current implementation of Up until now we have not decided what kind of term should be included in the new A kernel parameter named When a In OTP 28 the kernel parameter(s) can be removed and the capability(ies) fixated to Regarding double join: I tried to look a bit more at the implementation. To me it seems doable to support double join without too much hassle. However, since there are very few comments, not trivial code and no documentation I might be missing something. Currently, a Assuming the above I think it would be possible to have ranges repeated for double join in the groups map in the state, join count in the pids map in the gb_tree, and such a pids map for values in the ets table as well (which needs to be converted to a list when presented to the user). Perhaps I misunderstand what you meant about using pid count inside metadata for implementing double join, but I don't like using the metadata for implementation of double join. I think that metadata either should be a service provided to the user where they can set whatever they want, or it should be a completely internal thing not exposed to the user at all. Note that I have nothing against the single join semantics, but since we already got double join semantics in Metadata can be specific for process and group as in this PR (otherwise it would be really complicated), and changed if a new metadata is passed in a join. |
Got it, thanks for the feedbacks! I think it is enough for me to start implementing. For double join, my question is mostly on how to provide a user friendly API:
Also, in current PR I'm putting metadata in the ETS separately, but in our production we store |
I shared a basically functioning commit, not fully tested but would like to share early on the API choices: For version compatibility, let's say we have 4 types of pg: C currently can accept local shard/metadata join/leave request, but will only propagate such updates to D, not to C (we can change this behavior to not accept such requests at all) When monitoring groups on C, it will return old update messages and only get updates of singleton groups. When monitoring groups on D, it will return new update messages When monitoring a group on D, it will monitor both the singleton group and sharded group. Note that monitoring for sharded groups is not user friendly, I may add monitoring of a specific shard later. Other decisions
Please let me know if there is any concerns, thanks! |
@rickard-green |
I thought metadata should be tied to |
ea0d24d
to
a0ad58d
Compare
Updated the documentation as discussed offline. Also a brief explain of the the range ETS update algorithm: Assume for
An update The final map will be:
Internally, the update is split to 6 steps:
The algorithm does this way to make sure that during update, an ets:prev lookup will either find the range with old value or find the range with new value. There won't be a situation that it returns empty. The reason is that the algorithm always inserts split/merged entries first, and then do delete. For example, a lookup of shard |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some initial comments about the API documentation.
<name name="update" arity="1" since="OTP 27.0"/> | ||
<name name="update" arity="2" since="OTP 27.0"/> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought update()
was an internal function. How come it became a new API function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adding update allows user to do join and leave at the same time, (e.g. change metadata without having two tuples in the list for too long).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If added to the API, it should not have different return types depending on whether or not shards_and_metadata
is enabled or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation should also describe what kinds of guarantees this operation gives or not gives regarding what clients can see while an update operation is ongoing.
@@ -162,13 +278,22 @@ | |||
<fsummary>Starts group membership monitoring for a scope.</fsummary> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
monitor_scope()
will be incompatible once shards_and_metadata
becomes default with this approach. The arity 1 and 2 versions needs to be compatible. An arity 3 version with an options argument where you can specify what you want to monitor would solve this.
I'm not to fond of the update lists either. The existing API returns current state, and sends messages for updates made and I think the extended monitoring should do that too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The update list should be [update()] where
update() = {group(), single | shard_ranges(), members(), members()}
I should annotate this.
With shard and metadatas user will anyway need to handle update(), so much easier if initial return is also a list of update (note that returning a different data structure like map won't save anything on actual size)
I will change the implementation to return only singleton groups with old reply format by default, and new format if specified in option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With shard and metadatas user will anyway need to handle update(), so much easier if initial return is also a list of update (note that returning a different data structure like map won't save anything on actual size)
Yes, assuming that monitor messages include data of type updates()
, but one could use an approach that looks more like the already existing API. Return a map with current state with extended info, and send join
/leave
messages with extended info.
If the update()
function is introduced in the API, the updates()
approach is probably best though.
local node in the singleton group <c>Group</c>. If <c>Shard</c> is | ||
specified, returns all members running on the local node in the | ||
<c>Shard</c> of sharded group <c>Group</c>. Members are returned in | ||
no specific order. This function is optimised for speed.</p> | ||
</desc> | ||
</func> | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happened to the get_metadatas()
functions? No way to get the metadata now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get_members()
function now returns things like
[Pid1, {Pid1, Metadata1}, Pid2, ...]
In my previous commit this was the return of get_metadatas()
function, and get_members()
was purely doing
[Pid || Pid <- get_metadatas(), is_pid(Pid)]
If this makes more sense I will add it back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If get_members()
return anything else than a list of pid()
s, it will be incompatible, so the get_metadatas()
or something similar is needed. The name get_metadatas()
is no good, though, "datas" is not the plural form of "data". Data can be used both as singular and plural, so it should be named get_metadata()
(or perhaps something completely different).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking of these options and mark get_members as deprecated later:
which_members
members
get_all_members
I'm thinking of other names like these but it will also confuse people and make then think these will exclude Pid members but only have tuple members.
get_members_with_metadata
get_metadata_members
so far members
is the best I like
d35bc67
to
be7916c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some more comments about the API. I think I covered all incompatibilities, but check to be sure. Old code using pg
needs to work without modifications also when shards_and_metadata
is enabled. Focus on the documentation for now, so we can determine how the API should look like.
<fsummary>Make a process leave a group.</fsummary> | ||
<name name="leave" arity="4" since="OTP 27.0"/> | ||
<fsummary>Make a member or a list of members leave a group.</fsummary> | ||
<desc> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leave()
must continue to return ok | not_joined
even with shards_and_metadata
enabled for compatibility reasons.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The leave behavior is a bit complex.
In current pg, leave
returns not_joined
only if ALL leave requests doesn't success. If any of the pid in the list was joined, it will return ok.
This behavior I think is not good, or even bug-like, although documented. If the server can handle unexpected leave, it should always return ok. If it cannot, it should fail if ANY pid was not joined.
When in shard case the behavior will be more confusing to user, what if you only joined {2, 4}
and then leave {3, 6}
? Should it leave {3, 4}
or completely not leave? I think the server should reply what was not expected and do not execute the whole operation at all (especially with update()
function). That's what I'm currently implementing.
Any choice here should be implementable, although the "let unexpected leave silently work but only return (a simple) error when non of them success" will introduce a lot of code. What do you suggest?
<name name="update" arity="1" since="OTP 27.0"/> | ||
<name name="update" arity="2" since="OTP 27.0"/> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If added to the API, it should not have different return types depending on whether or not shards_and_metadata
is enabled or not.
<name name="update" arity="1" since="OTP 27.0"/> | ||
<name name="update" arity="2" since="OTP 27.0"/> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation should also describe what kinds of guarantees this operation gives or not gives regarding what clients can see while an update operation is ongoing.
@@ -162,13 +278,22 @@ | |||
<fsummary>Starts group membership monitoring for a scope.</fsummary> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With shard and metadatas user will anyway need to handle update(), so much easier if initial return is also a list of update (note that returning a different data structure like map won't save anything on actual size)
Yes, assuming that monitor messages include data of type updates()
, but one could use an approach that looks more like the already existing API. Return a map with current state with extended info, and send join
/leave
messages with extended info.
If the update()
function is introduced in the API, the updates()
approach is probably best though.
update_ets(Group, Ranges, GR, GA, LR, LA, Tid). | ||
|
||
do_update_ets(Group, {L, R}, GR, GA, LR, LA, Tid) -> | ||
% Find the right most range that overlap with {L, R} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There needs to be a comment in this function describing how the guarantees that update()
will give are achieved, so that others maintaining this in the future easily can understand the code.
For example, something like this.
lib/kernel/doc/src/pg.xml
Outdated
<desc><p>A monitor scope reply.</p></desc> | ||
</datatype> | ||
<datatype> | ||
<name name="monitor_group_return_old"/> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The "old" name issue here as well.
lib/kernel/doc/src/pg.xml
Outdated
<desc><p>A list of update requests.</p></desc> | ||
</datatype> | ||
<datatype> | ||
<name name="update_return_old"/> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should probably be renamed to leave_return()
and only used for the leave()
function.
lib/kernel/doc/src/pg.xml
Outdated
<desc><p>A monitor group reply.</p></desc> | ||
</datatype> | ||
<datatype> | ||
<name name="monitor_message_old"/> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The "old" name issue here as well.
lib/kernel/doc/src/pg.xml
Outdated
notifications. The content is a list of processes by default; or | ||
<c>updates()</c> if <c>shards_and_metadata</c> is true in | ||
<c>Features</c>, the same as the input in | ||
<seemfa marker="#update/2"><c>update/2</c></seemfa>. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
<seemfa marker="#update/2"><c>update/2</c></seemfa>. | |
<seemfa marker="#update/2"><c>update/2</c></seemfa>.</p> |
Doc build fails on at least this.
lib/kernel/doc/src/pg.xml
Outdated
or leave the groups.</p> | ||
|
||
<p>In addition, when the feature is enabled, besides the singleton group | ||
defined above, the module can also join or leave to a range of shards |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defined above, the module can also join or leave to a range of shards | |
defined above, the process can also join or leave to a range of shards |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defined above, the module can also join or leave to a range of shards | |
defined above, `pg` can also join or leave to a range of shards |
Perhaps it should be pg
instead of process. Not sure. Module felt wrong though.
Thanks for the comments! I will do the updates based on suggestions. The leave behavior is a bit complex. When in shard case the behavior will be more confusing to user, what if you only joined Any choice here should be implementable, although the "let unexpected leave silently work but only return (a simple) error when non of them success" will introduce a lot of code. What do you suggest? |
Updated the commit, with |
Hmmm while doing implementation, I plan to change the leave API from |
Again update leave API documentation, and fix some other errors |
leave() with singleton group should not return the new 'bad_leave' tuple. leave() with shard_ranges() should not return the old awkward 'not_joined'. Do the new update() function really have to inherit the old awkward behavior from leave() with 'not_joined'? Can we not give update() clean consistent semantics. Regarding removing non existing members. I think you should do what is most useful to the user. Rejecting the entire leave or update op and returning a 'bad_leave' tuple seems to me to put the user in a difficult situation. The caller then has to enter some recovery code to figure out what input to give in order to get the desired new state. But I assume you know the use case better and may have a different opinion. |
Yep the current implementation don't return
This is mostly for coding simplicity, we can surely only return not_joined for leave, not for update.
This is mostly because my previous question: For us, we will probably only do leave when shard mapping changes, which we know what to remove, and if it is not joined there is surely some problem which we want to know. |
Another issue would be if the above |
6375d6a
to
772131b
Compare
Quick question: what is the motivation for adding this to Erlang/OTP instead of maintaining it as a separate package? Is this something that could be used in other parts of OTP or inside Ericsson? Is it a limitation of Note I am mostly being curious. :) Thank you and happy new year! |
@josevalim That's actually a good question. I think we should consider taking a step back and think about if this is better done as a separate module as a first step. In that way the interface can evolve and mature to best fit the actual use case. Then we also don't need to be constrained by the limitations/mistakes of As a future part of OTP I personally think something with a higher abstraction level would be more interesting. Something where you get support for doing the mapping and remapping of the shard ranges. Maybe the individual shard ranges do not need even to be exposed at all in such an interface. Just a mapping from key (hash) to pid. |
Hmmm what do we do now? I'm ok to make it a new module. |
I agree that sharding should not be an integral part of (basic) To me, such a module can be a part of the library implementing "RPC at scale" solution (e.g. sharded RPC). Something I've been entertaining a few years ago (https://github.com/max-au/lambda). Hence, I'd think that |
Good call! Since |
Follow up after the offline discussion: The API: -module(data_publisher). This behavior module provides a server for publishing certain local data and its updates to all the connected nodes. A module implementing this behavior should define a data type local_data() and an update type local_update(). Whenever a local data change happens through update_data/2 or replace_data/2 API, the change will also be propagated to all the connected peers where the Scope is started. Any server that receives the data update, will run the apply_update/3 or apply_replace/3 callbacks to update the local cache of the data. A typical usage of this behavior is the pg service discovery. Every node can publish a (process group name) -> (local pids) map, so that on any node in the cluster, one can get all the pids in the entire cluster that belong to this process group. -type scope() :: atom(). -spec start(scope(), module()) -> gen_server:start_ret(). -spec update_data(scope(), local_update()) -> ok. -callback init_data() -> local_data(). -callback apply_update(node(), local_data(), local_update()) -> local_data(). How pg works:
How pg works in new world:
The API can also be made to support versioning, e.g.: -type scope() :: atom(). -spec start(scope(), module()) -> gen_server:start_ret(). -callback supported() -> dynamic(). |
This commit introduced shard_pg, which is close to the current pg module, but is designed for propagating sharding information for sharded service. This is also a very early draft with many APIs to be changed. I just did it with some reasonable choices and would like collect some feedbacks.
Consider a service that hashes key to 1000 shards, and having one process serving 1..10, one serving 11..20, .... etc. If using current pg, an obvious solution is to name shard 1..10 "group_1", 11..20 "group_2" and do pg:join(group_1, Pid1), .... etc.
However, for this approach, the client side need to maintain a copy of the entire shard map (as either an actual map, or a hash function). And more importantly, if we want to change the shard map (for example split group_1 to two, one shard 1..5 group, and one shard 6..10 group), it would need multi-phases change on both server side and client side to make it work safely.
shard_pg is exactly to solve this issue, it stores the shard mapping in a ordered_set ETS table with mapping {GroupName, RangeStart, RangEnd} => Pids, so that client can easily find the pid that is serving a specific shard using binary search, and no need to store any extra shard mapping locally. Shard map change can happen purely on the server side.
The entire data syncing protocol of shard_pg is exactly the same as pg, but there are still many design choices left to be decided:
Please let me know if there is any feedbacks, thanks a lot!