Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] [RFC] Introduce shard_pg #7609

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open

[Draft] [RFC] Introduce shard_pg #7609

wants to merge 1 commit into from

Conversation

zzydxm
Copy link
Contributor

@zzydxm zzydxm commented Aug 30, 2023

This commit introduced shard_pg, which is close to the current pg module, but is designed for propagating sharding information for sharded service. This is also a very early draft with many APIs to be changed. I just did it with some reasonable choices and would like collect some feedbacks.

Consider a service that hashes key to 1000 shards, and having one process serving 1..10, one serving 11..20, .... etc. If using current pg, an obvious solution is to name shard 1..10 "group_1", 11..20 "group_2" and do pg:join(group_1, Pid1), .... etc.

However, for this approach, the client side need to maintain a copy of the entire shard map (as either an actual map, or a hash function). And more importantly, if we want to change the shard map (for example split group_1 to two, one shard 1..5 group, and one shard 6..10 group), it would need multi-phases change on both server side and client side to make it work safely.

shard_pg is exactly to solve this issue, it stores the shard mapping in a ordered_set ETS table with mapping {GroupName, RangeStart, RangEnd} => Pids, so that client can easily find the pid that is serving a specific shard using binary search, and no need to store any extra shard mapping locally. Shard map change can happen purely on the server side.

The entire data syncing protocol of shard_pg is exactly the same as pg, but there are still many design choices left to be decided:

  1. Shall we introduce this feature in pg module or create a new module like this commit? And if in pg, shall it be a different start arg that does a different gen_server init or it will be the same gen_server that supports both API (have two ETS tables)?
  2. pg can join a pid multiple times to a group, and get_member will return a list that contains multiple instance, but shard_pg will only allow one, is it ok?
  3. I removed get_local_members in shard_pg as compared to pg, because I found it not really useful. shall we instead keep it?
  4. In pg, there are 2 copy of the data: One in ETS, One is almost a reverse search map in the gen_server state. In this commit, I added one copy of ETS data in gen_server using gb_tree, the major purpose is to speedup some operations (which avoids lists ++, --, and compare), but the tradeoff is the memory usage. I'm testing different options in our production.
  5. The format of the get_shard_members return: The current implementation matches the pg which returns a list of pids. Alternative is to return a #{Pid=>true}, or #{Pid=>Metadata}. I'm also going to test these performance in our production.
  6. I added Metadata in handler() spec, because I think it is a feature that was discussed to add in pg but eventually didn't happen. How do we store Metadata is a question as described in (5).
  7. pg has monitor_scope feature that I haven't added in shard_pg. I think this is needed but can't think of a user friendly API, will need to discuss.

Please let me know if there is any feedbacks, thanks a lot!

@github-actions
Copy link
Contributor

github-actions bot commented Aug 30, 2023

CT Test Results

       2 files       66 suites   1h 1m 53s ⏱️
1 521 tests 1 266 ✔️ 254 💤 1
1 714 runs  1 411 ✔️ 302 💤 1

For more details on these failures, see this check.

Results for commit dbd5ded.

♻️ This comment has been updated with latest results.

To speed up review, make sure that you have read Contributing to Erlang/OTP and that all checks pass.

See the TESTING and DEVELOPMENT HowTo guides for details about how to run test locally.

Artifacts

// Erlang/OTP Github Action Bot

@zzydxm zzydxm changed the title [Draft] Introduce shard_pg [Draft] [RFC] Introduce shard_pg Aug 30, 2023
@okeuday
Copy link
Contributor

okeuday commented Aug 30, 2023

@zzydxm For Key in [1..10000] you could use a solution like:

1> Key = 11.
11
2> GroupId = ((Key - 1) div 10) + 1.
2
3> GroupName = "group_" ++ erlang:integer_to_list(GroupId).
"group_2"

That avoids the need to store a mapping. Is the use-case of shard_pg for keys that are not sequential, with large gaps? If that was the case a hash function could still be used, so it remains unclear how storing the mapping is justified.

@zzydxm
Copy link
Contributor Author

zzydxm commented Aug 31, 2023

@zzydxm For Key in [1..10000] you could use a solution like:

1> Key = 11.
11
2> GroupId = ((Key - 1) div 10) + 1.
2
3> GroupName = "group_" ++ erlang:integer_to_list(GroupId).
"group_2"

That avoids the need to store a mapping. Is the use-case of shard_pg for keys that are not sequential, with large gaps? If that was the case a hash function could still be used, so it remains unclear how storing the mapping is justified.

"GroupId = ((Key - 1) div 10) + 1." is the mapping: fun(Key) -> GroupId

What if the cluster need expand/shrink and group size need to be changed? How can we do the following change without modifying client code?

for example split group_1 to two, one shard 1..5 group, and one shard 6..10 group

@okeuday
Copy link
Contributor

okeuday commented Aug 31, 2023

@zzydxm Erlang has hot code loading available, so there is no reason to not change the client source code. A new separate mapping using a different hash function can always use a different way of creating group names, or it could use a different scope process, to represent the same processes in process groups. The client source code change to use a different way of creating group names or a different scope process is minimal. That helps to avoid the storage consumption if mappings were stored that gradually become irrelevant (as they are changed) because that would be a waste of memory.

@zzydxm
Copy link
Contributor Author

zzydxm commented Aug 31, 2023

@okeuday

Doing client code change is more complex than a hot load, it will have to be 3 steps: 1. All server need to first replicate data to have data in both hash function and join both groups, 2. Then we hot load new hash function to all clients. 3 Finally we leave all old group and delete data in all servers.

We used to do a 2x expansion every time where a simple hash function might work. With a cluster of thousands of machines it is not realistic to do a double expansion. In that case, we will either need to do a full data copy to have a simple hash function, or use a complex hash function for minimal data movement and safe data migration, which has no difference than storing the full shard mapping.

If we use shard_pg, this can happen one node at a time, without any client change.

@okeuday
Copy link
Contributor

okeuday commented Sep 1, 2023

@zzydxm What you have described should have mapping lookup failures as the mapping data changes on a single node, which could be avoided by blocking the lookup for some timeout length of time (before it causes a lookup failure while the mapping data changes). It seems typical that all nodes would not be blocked to avoid failures on all nodes at the same time, so the mapping data would change gradually among the nodes (gradually causing delays and possibly failures).

If you were to do a hot code load with the client source code after changing the hash function it is using, the hot code load is an atomic action on the single node and the delay associated with the change is limited in milliseconds. The only requirement is that both the old and new process groups coexist during the client source code update, which both allows the upgrade and the rollback if there is a reason to reverse the change. By minimizing the data being changed you are able to minimize the downtime. With that perspective, the shard_pg approach to a process group key mapping would be encouraging developers to pursue the wrong solution for Erlang development.

@zzydxm
Copy link
Contributor Author

zzydxm commented Sep 1, 2023

@okeuday Hmmm I don't quite get your point. Shard map and hash function should be equivalent: they are both a projection from Key to ShardId. Both can be hot-loaded as compiled code, or as a dynamic config. You can literally write such a function, compile it, and hot-load it:
get_shard_id(Key) -> maps:get(Key, #{1=>group1, 2=>group1, 11=>group2, ...}).
performance-wise there will be no real difference than putting this function to a persistent_term and change it dynamically.

With shard_pg that uses ETS, why do you think lookup failures or downtime will be more than what pg has? pg has an issue where if a process leave the group and immediately stop serving the traffic without a grace period, some request will be sent to a blackhole. But if you keep serve the traffic for a little bit more time after leaving the group it would be good enough, same as shard_pg.

Is your most concern that the shard map should be as simple as a hash function which can be described in a few bytes instead of the full map, or the shard map change should happen transactionally on all nodes at the same time?

Both of these do not work for us at least: My example in the beginning is simple, but we do have a big cluster with 40k+ nodes. As described above, keep the shard map simple is not very realistic: it needs significant extra capacity on shard remapping. In fact, we already store the full map on all our client in a ordered_set ETS for now and do the exactly same lookup as shard_pg to get the pg group name.

We also live in a world that cloud container restart is relatively frequent (each restarts several times a week), and each time it restarts we are not even promised to have the same hardware: we lose the extra capacity of the host if we do static mapping. Later we will be moved to a state that we don't even know how many containers we will have, but only a total capacity, which means the shard map need to be super dynamic, where a phased hot code load everytime is not acceptable.

This might be a special requirement for us to make the shard map that dynamic, but I would expect this also saving huge operational time with all sharded Erlang services: manual operation is no longer needed for shard map changes (simply deploy new config on server side, without multiple phases applied), the extra host capacity to do the shard map change will also be minimized (can be 1 node at a time, and the node don't even need to serve double data).

@rickard-green rickard-green added the team:VM Assigned to OTP team VM label Sep 4, 2023
@rickard-green
Copy link
Contributor

@zzydxm

As we see it, this is more or less the same as having a group for each shard in pg and joining a process in multiple groups, where each group represents a shard in the group, instead of joining a process in a range of shards in the group. At least when disregarding the metadata associated with pids. That is, instead of shard_pg:join(mygroup, {Pid, [1,10], []}) you would do lists:foreach(fun (Shard) -> pg:join({mygroup, Shard}, Pid) end, lists:seq(1,10)). Is this correct, or are we missing something? Assuming that this is correct, this is more or less an optimization for the shard scenario.

Regardless, we feel that this is so close to pg's functionality that if introduced it should be introduced into pg and expand the pg API instead of introducing a new module.

It feels relatively easy to extend the API so that shard ranges and pid metadata can be included. However, the protocol between nodes is problematic. @sverker introduced a new discovery message that is intended to be used as of OTP 28. Depending on the new discovery message would, however, delay the introduction until OTP 28. Another solution could be to document that using the new functionality when connected to old nodes will result in crashes in the old nodes. Perhaps other solutions exist as well.

@zzydxm
Copy link
Contributor Author

zzydxm commented Sep 20, 2023

@rickard-green

Yes exactly, this is mostly a performance improvement when shard count is super large, as well as providing metadata information.

A few difference is that it supports re-join with different shard info and metadata, instead of duplicated record in a list.

Also internally we are using slightly different thing than this commit: returning [{Pid, Metadata}] format instead of [Pid].

Yeah OTP 28 seems very late. If it is acceptable by default disable the feature, and warn that new version should not be used with old OTP (but can start a new pg Scope instead) then I think it works. Not sure if there is better solution though.

@zzydxm
Copy link
Contributor Author

zzydxm commented Sep 20, 2023

I can work on a version that have the logic inside the pg module, but I think a few things need to be decided first (star-ed are my personal preference):

  1. ETS table: (1) one ordered_set for both old and new API, (2) two ets tables with user-specified name for sharded groups, different with the gen_server registered name, (3*) one table + one gen_server, either only work for old API or only work for new API
  2. return of get_shard_members: (1) [Pid], (2*) [{Pid, Metadata}], (3) #{Pid => Metadata}
  3. allow double join (1) yes (2*) no
  4. support (efficient) local_shard_member query (1) yes (2*) no, use gen_server:call or ets scan instead

How do you think?

@rickard-green
Copy link
Contributor

We were thinking of extending the existing API more tightly with the new features (metadata and shards). Since the old functionality support get_local_members() we think this functionality should do that as well. The same for double join. We also think you should be able to attach metadata to members of non sharded groups. Groups without sharding could also be considered as a group with only one zero shard. I guess one server would be needed in order to integrate the new functionality like this. Perhaps other processes are needed in order to help it out.

Below is an example of how we think the API could look like. This is of course up for discussion. We've added the shard information (range or shard id) in between the scope and the group arguments which makes it possible to distinguish between different arguments in all functions. The optional arguments (scope and shard info) live at the beginning of the argument list. The shard information also gets close to group which it is closely related to.

-type scope() :: atom().
-type group() :: any().
-type metadata() :: any().
-type member_metadata() -> {pid(), metadata()}
-type member() :: pid() | member_metadata()
-type members() :: [member()].
-type shard() : non_neg_integer().
-type shard_range() :: {shard(), shard()}.
-type shard_ranges() :: [shard_range()].

join(group(), members()) -> ok.
join(scope(), group(), members()) -> ok.
join(shard_ranges(), group(), members()) -> ok.
join(scope(), shard_ranges(), group(), members()) -> ok.

leave(group(), pid() | [pid()]) -> ok.
leave(scope(), group(), pid() | [pid()]) -> ok.
leave(shard_ranges(), group(), pid() | [pid()]) -> ok.
leave(scope(), shard_ranges(), group(), pid() | [pid()]) -> ok.

get_members(group()) -> [pid()].
get_members(scope(), group()) -> [pid()].
get_members(shard(), group()) -> [pid()].
get_members(scope(), shard(), group()) -> [pid()].

get_metadata(group()) -> members().
get_metadata(scope(), group()) -> members().
get_metadata(shard(), group()) -> members().
get_metadata(scope(), shard(), group()) -> members().

get_local_members(group()) -> [pid()].
get_local_members(scope(), group()) -> [pid()].
get_local_members(shard(), group()) -> [pid()].
get_local_members(scope(), shard(), group()) -> [pid()].

get_local_metadata(group()) -> members().
get_local_metadata(scope(), group()) -> members().
get_local_metadata(shard(), group()) -> members().
get_local_metadata(scope(), shard(), group()) -> members().

get_shard_ranges(group()) -> shard_ranges().
get_shard_ranges(scope(), group()) -> shard_ranges().

%% Other functions works as before...

@zzydxm
Copy link
Contributor Author

zzydxm commented Sep 27, 2023

Thanks a lot! Let me think of the implementation.

There is a limitation on the double join behavior: With map (single join) we can do metadata update in-place, but with double join we have to join new metadata/ranges and remove old ones separately, which leaves more consideration for users and racing issues. Also double join can be represented as {Pid, Count} if user wants. Do you think we can make the API better? Can we only allow double join of pid() but not member_metadata()?

Also about how to make it compatible with old version. For hot code load I think everything except swapping the ETS from sets to ordered_sets can be done using code_change callback in gen_server with a version control: only accept old API and send old sync message when application:get_env(kernel, use_new_pg) is not true. However I don't see a way that does the ETS migration yet.

@rickard-green
Copy link
Contributor

Regarding upgrade and compatibility:

There is no need to support upgrading via hot code loading. The kernel application requires restart of the runtime system when upgrading. An upgraded system, however, needs to be able to communicate with a system that has not been upgraded. Code in one OTP release X needs to be able to communicate with OTP releases in the range [X-2, X+2].

The current implementation of pg's protocol does not allow for modifications of the protocol. If a message is sent to a pg server that it does not understand, it will just crash. @sverker added support for receiving a new discover message with an extra, currently undefined term, as third element in OTP 26. The new discover message can not be sent until in OTP 28, since OTP 27 needs to be able to communicate with OTP 25 nodes. However, it could be sent in OTP 27 if it is optionally enabled through, for example, setting a kernel parameter. If the user enables this feature, they need to ensure that they wont try to communicate with nodes of earlier releases than OTP 26. There needs to be a big warning about this in the documentation since OTP 25 nodes will simply crash if communicated with.

Up until now we have not decided what kind of term should be included in the new discover message. It could for example be an integer representing protocol version, but we think it is better with a map used as a "capability map" where each key corresponds to a capability. When a new capability is introduced one also defines what type of value the capability should have. Often a boolean is sufficient, but perhaps not always.

A kernel parameter named pg_shards_and_metadata with a boolean value could be introduced implying that we send the new discover message with a map containing the capability pg_shards_and_metadata. The name becomes a bit weird since it includes both features. Another approach could be two kernel parameters pg_shards and pg_metadata which would give nicer names. Or yet another approach, another better name than pg_shards_and_metadata for both features, but we cannot come up with one. We dislike names like new_pg and such since you easily end up with newer_pg, even_newer_pg etc. I personally like the approach with two kernel parameters even though code supporting both of them are introduced at the same time, just because good naming becomes easier.

When a pg server receives the discover message with a capability map, it knows what type of messages it is allowed to send to the corresponding remote pg server. The capability information needs to be kept in the state, so it knows what messages it is allowed to send to which remote pg server. It should also keep its own local capability map in its state which it sends in discover messages to other pg servers and also use in order to check if API operations are allowed. When the local capability for a feature has not been enabled, usage of that feature in the API should fail, for example by raising an error exception with notsup error reason.

In OTP 28 the kernel parameter(s) can be removed and the capability(ies) fixated to true. It still needs to be prepared for old nodes that haven't those capabilities, though.

Regarding double join:

I tried to look a bit more at the implementation. To me it seems doable to support double join without too much hassle. However, since there are very few comments, not trivial code and no documentation I might be missing something.

Currently, a join() of a process in different ranges than it already is joined in will imply leave in ranges not given in the new join operation. That can not be the case with double join semantics. For example, if process X joins in the range {4-7}, but already is joined in {1-10}, it should as a result appear once in {1-3}, once in {8,10}, and twice in {4,7}. If the single join semantics is an important use-case, it could be possible to add a rejoin() function to the API which implies a leave in all ranges not part of the rejoin operation.

Assuming the above I think it would be possible to have ranges repeated for double join in the groups map in the state, join count in the pids map in the gb_tree, and such a pids map for values in the ets table as well (which needs to be converted to a list when presented to the user).

Perhaps I misunderstand what you meant about using pid count inside metadata for implementing double join, but I don't like using the metadata for implementation of double join. I think that metadata either should be a service provided to the user where they can set whatever they want, or it should be a completely internal thing not exposed to the user at all.

Note that I have nothing against the single join semantics, but since we already got double join semantics in pg I think we should continue with that approach for new functionality as well otherwise the API becomes inconsistent.

Metadata can be specific for process and group as in this PR (otherwise it would be really complicated), and changed if a new metadata is passed in a join.

@zzydxm
Copy link
Contributor Author

zzydxm commented Oct 23, 2023

@rickard-green

Got it, thanks for the feedbacks! I think it is enough for me to start implementing.

For double join, my question is mostly on how to provide a user friendly API:
Assume one Pid already did join([{1,3}, {5,7}], Group, [{Pid, Data1}]) and join([{1,5}], Group, [{Pid, Data2}])
When we call get_metadata(2, Group) should it return [{Pid, Data1}, {Pid, Data2}] or [{Pid, Data2}, {Pid, Data2}]?
If it is the first case, then after we call leave([{2,3}], Group, Pid) and call the get_metadata again I'm not sure whether Data1 or Data2 should be removed from the return
If it is the second case, we probably want a set_metadata(Scope, Group, Pid, Metadata) API, and make join API only support joining [pid()] instead of joining [{pid(), metadata()}], I will start implement this way if no objections.

rejoin is also an essential feature for us so I will add it

Also, in current PR I'm putting metadata in the ETS separately, but in our production we store {Group, RangeStart, RangeEnd} => [{Pid1, Metadata1}, {Pid2, Metadata2}, ...] directly in the ETS table, this might cause metadata to be stored more than once in the ETS table and uses more memory, but we think it is a better approach to avoid doing huge amount of ETS queries for get_metadata API. Please let me know if there are concerns with this choice. Thanks!

@zzydxm
Copy link
Contributor Author

zzydxm commented Nov 2, 2023

I shared a basically functioning commit, not fully tested but would like to share early on the API choices:

For version compatibility, let's say we have 4 types of pg: A is what we have in OTP 25, B is what we have in OTP 26, C is the new one but don't have shard_and_metadata feature enabled, and D is with shard_and_metadata feature .
A can only communicate with A, B, C in old protocol, not D
B can communicate with A, B, C, D in old protocol
C will send old protocol messages to A, B, C, but new protocol messages to D
D will send old protocol messages to B, C, and new protocol messages to D

C currently can accept local shard/metadata join/leave request, but will only propagate such updates to D, not to C (we can change this behavior to not accept such requests at all)
D accepts shard/metadata join/leave request, updates will be propagated to all D peers, but only singleton group and pids will be propagated to B and C.

When monitoring groups on C, it will return old update messages and only get updates of singleton groups.

When monitoring groups on D, it will return new update messages

When monitoring a group on D, it will monitor both the singleton group and sharded group.

Note that monitoring for sharded groups is not user friendly, I may add monitoring of a specific shard later.

Other decisions

  1. Metadata is stored together with Pid in ETS. pg:leave must provide the same metadata as when it is joined for the change to be successful.
  2. Not sharded group will be stored in ETS with key {Group, single} while sharded group will be stored with key {Group, BeginShard, EndShard}
  3. pg:get_memberAPI only returns pids that does not have metadata, in an not very efficient way. This is mostly for making the API compatible with old version. Changing current use of pg:get_members to pg:get_metadatas shouldn't have any issue if no one in the cluster joins pid with metadatas

Please let me know if there is any concerns, thanks!

@zzydxm
Copy link
Contributor Author

zzydxm commented Nov 9, 2023

@rickard-green
I added an end-to-end test, I think the server code is in a good shape for reviewing again.
I will work on documentation and more unit tests if everything looks good to you, thanks!

@rickard-green
Copy link
Contributor

For double join, my question is mostly on how to provide a user friendly API: Assume one Pid already did join([{1,3}, {5,7}], Group, [{Pid, Data1}]) and join([{1,5}], Group, [{Pid, Data2}]) When we call get_metadata(2, Group) should it return [{Pid, Data1}, {Pid, Data2}] or [{Pid, Data2}, {Pid, Data2}]? If it is the first case, then after we call leave([{2,3}], Group, Pid) and call the get_metadata again I'm not sure whether Data1 or Data2 should be removed from the return If it is the second case, we probably want a set_metadata(Scope, Group, Pid, Metadata) API, and make join API only support joining [pid()] instead of joining [{pid(), metadata()}], I will start implement this way if no objections.

I thought metadata should be tied to pid()/group(). That is, when a pid is joined in a group with metadata it would overwrite metadata saved for that pid() in that group(). It also feels a bit awkward having to supply the metadata when leaving.

@zzydxm zzydxm force-pushed the pg branch 2 times, most recently from ea0d24d to a0ad58d Compare November 22, 2023 00:08
@zzydxm
Copy link
Contributor Author

zzydxm commented Nov 22, 2023

Updated the documentation as discussed offline.

Also a brief explain of the the range ETS update algorithm:
The algorithm promises that an update() :: {group(), single | shard_ranges(), Remove :: members(), Add :: members()} is an 'atomic' operation by a single shard's view (if the shard ranges are disjoint).
A join is just Remove = [], a leave is just Add = [].

Assume for Group we currently have this shard mapping:

{1, 5} => [Pid1, {Pid2, Meta2}]
{6,10} => [Pid1, Pid3]

An update
{Group, [{4,5}], [{Pid2, Meta2}], [Pid3]}
will remove {Pid2, Meta2} from range {4, 5} and add Pid3 to range {4, 5} at the same time.

The final map will be:

{1, 3} => [Pid1, {Pid2, Meta2}]
{4,10} => [Pid1, Pid3]

Internally, the update is split to 6 steps:
(split and update current ranges)

  1. ​insert {1, 3} => [Pid1, {Pid2, Meta2}]
  2. insert {4, 5} => [Pid1, Pid3]
  3. delete {1, 5} => [Pid1, {Pid2, Meta2}]
    (merge adjacent ranges with same members)
  4. insert {4,10} => [Pid1, Pid3]
  5. delete {4, 5} => [Pid1, Pid3]
  6. delete {6,10} => [Pid1, Pid3]

The algorithm does this way to make sure that during update, an ets:prev lookup will either find the range with old value or find the range with new value. There won't be a situation that it returns empty. The reason is that the algorithm always inserts split/merged entries first, and then do delete.

For example, a lookup of shard 4 will do ets:prev(Scope, {Group, 5, 5}), which still gives {1, 5} => [Pid1, {Pid2, Meta2}] after step 2, but will give {4, 5} => [Pid1, Pid3] after step 3.

Copy link
Contributor

@rickard-green rickard-green left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some initial comments about the API documentation.

Comment on lines +256 to +266
<name name="update" arity="1" since="OTP 27.0"/>
<name name="update" arity="2" since="OTP 27.0"/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought update() was an internal function. How come it became a new API function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding update allows user to do join and leave at the same time, (e.g. change metadata without having two tuples in the list for too long).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If added to the API, it should not have different return types depending on whether or not shards_and_metadata is enabled or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation should also describe what kinds of guarantees this operation gives or not gives regarding what clients can see while an update operation is ongoing.

@@ -162,13 +278,22 @@
<fsummary>Starts group membership monitoring for a scope.</fsummary>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

monitor_scope() will be incompatible once shards_and_metadata becomes default with this approach. The arity 1 and 2 versions needs to be compatible. An arity 3 version with an options argument where you can specify what you want to monitor would solve this.

I'm not to fond of the update lists either. The existing API returns current state, and sends messages for updates made and I think the extended monitoring should do that too.

Copy link
Contributor Author

@zzydxm zzydxm Dec 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The update list should be [update()] where
update() = {group(), single | shard_ranges(), members(), members()}
I should annotate this.

With shard and metadatas user will anyway need to handle update(), so much easier if initial return is also a list of update (note that returning a different data structure like map won't save anything on actual size)

I will change the implementation to return only singleton groups with old reply format by default, and new format if specified in option.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With shard and metadatas user will anyway need to handle update(), so much easier if initial return is also a list of update (note that returning a different data structure like map won't save anything on actual size)

Yes, assuming that monitor messages include data of type updates(), but one could use an approach that looks more like the already existing API. Return a map with current state with extended info, and send join/leave messages with extended info.

If the update() function is introduced in the API, the updates() approach is probably best though.

local node in the singleton group <c>Group</c>. If <c>Shard</c> is
specified, returns all members running on the local node in the
<c>Shard</c> of sharded group <c>Group</c>. Members are returned in
no specific order. This function is optimised for speed.</p>
</desc>
</func>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happened to the get_metadatas() functions? No way to get the metadata now?

Copy link
Contributor Author

@zzydxm zzydxm Dec 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_members() function now returns things like
[Pid1, {Pid1, Metadata1}, Pid2, ...]

In my previous commit this was the return of get_metadatas() function, and get_members() was purely doing
[Pid || Pid <- get_metadatas(), is_pid(Pid)]

If this makes more sense I will add it back.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If get_members() return anything else than a list of pid()s, it will be incompatible, so the get_metadatas() or something similar is needed. The name get_metadatas() is no good, though, "datas" is not the plural form of "data". Data can be used both as singular and plural, so it should be named get_metadata() (or perhaps something completely different).

Copy link
Contributor Author

@zzydxm zzydxm Dec 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking of these options and mark get_members as deprecated later:
which_members
members
get_all_members

I'm thinking of other names like these but it will also confuse people and make then think these will exclude Pid members but only have tuple members.
get_members_with_metadata
get_metadata_members

so far members is the best I like

@zzydxm zzydxm force-pushed the pg branch 2 times, most recently from d35bc67 to be7916c Compare December 5, 2023 01:08
Copy link
Contributor

@rickard-green rickard-green left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more comments about the API. I think I covered all incompatibilities, but check to be sure. Old code using pg needs to work without modifications also when shards_and_metadata is enabled. Focus on the documentation for now, so we can determine how the API should look like.

<fsummary>Make a process leave a group.</fsummary>
<name name="leave" arity="4" since="OTP 27.0"/>
<fsummary>Make a member or a list of members leave a group.</fsummary>
<desc>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leave() must continue to return ok | not_joined even with shards_and_metadata enabled for compatibility reasons.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The leave behavior is a bit complex.
In current pg, leave returns not_joined only if ALL leave requests doesn't success. If any of the pid in the list was joined, it will return ok.
This behavior I think is not good, or even bug-like, although documented. If the server can handle unexpected leave, it should always return ok. If it cannot, it should fail if ANY pid was not joined.

When in shard case the behavior will be more confusing to user, what if you only joined {2, 4} and then leave {3, 6}? Should it leave {3, 4} or completely not leave? I think the server should reply what was not expected and do not execute the whole operation at all (especially with update() function). That's what I'm currently implementing.

Any choice here should be implementable, although the "let unexpected leave silently work but only return (a simple) error when non of them success" will introduce a lot of code. What do you suggest?

Comment on lines +256 to +266
<name name="update" arity="1" since="OTP 27.0"/>
<name name="update" arity="2" since="OTP 27.0"/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If added to the API, it should not have different return types depending on whether or not shards_and_metadata is enabled or not.

Comment on lines +256 to +266
<name name="update" arity="1" since="OTP 27.0"/>
<name name="update" arity="2" since="OTP 27.0"/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation should also describe what kinds of guarantees this operation gives or not gives regarding what clients can see while an update operation is ongoing.

@@ -162,13 +278,22 @@
<fsummary>Starts group membership monitoring for a scope.</fsummary>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With shard and metadatas user will anyway need to handle update(), so much easier if initial return is also a list of update (note that returning a different data structure like map won't save anything on actual size)

Yes, assuming that monitor messages include data of type updates(), but one could use an approach that looks more like the already existing API. Return a map with current state with extended info, and send join/leave messages with extended info.

If the update() function is introduced in the API, the updates() approach is probably best though.

update_ets(Group, Ranges, GR, GA, LR, LA, Tid).

do_update_ets(Group, {L, R}, GR, GA, LR, LA, Tid) ->
% Find the right most range that overlap with {L, R}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There needs to be a comment in this function describing how the guarantees that update() will give are achieved, so that others maintaining this in the future easily can understand the code.

For example, something like this.

<desc><p>A monitor scope reply.</p></desc>
</datatype>
<datatype>
<name name="monitor_group_return_old"/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "old" name issue here as well.

<desc><p>A list of update requests.</p></desc>
</datatype>
<datatype>
<name name="update_return_old"/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably be renamed to leave_return() and only used for the leave() function.

<desc><p>A monitor group reply.</p></desc>
</datatype>
<datatype>
<name name="monitor_message_old"/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "old" name issue here as well.

notifications. The content is a list of processes by default; or
<c>updates()</c> if <c>shards_and_metadata</c> is true in
<c>Features</c>, the same as the input in
<seemfa marker="#update/2"><c>update/2</c></seemfa>.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
<seemfa marker="#update/2"><c>update/2</c></seemfa>.
<seemfa marker="#update/2"><c>update/2</c></seemfa>.</p>

Doc build fails on at least this.

or leave the groups.</p>

<p>In addition, when the feature is enabled, besides the singleton group
defined above, the module can also join or leave to a range of shards
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
defined above, the module can also join or leave to a range of shards
defined above, the process can also join or leave to a range of shards

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
defined above, the module can also join or leave to a range of shards
defined above, `pg` can also join or leave to a range of shards

Perhaps it should be pg instead of process. Not sure. Module felt wrong though.

@zzydxm
Copy link
Contributor Author

zzydxm commented Dec 5, 2023

Thanks for the comments! I will do the updates based on suggestions.

The leave behavior is a bit complex.
In current pg, leave returns not_joined only if ALL leave requests doesn't success. If any of the pid in the list was joined, it will return ok.
This behavior I think is not good, or even bug-like, although documented. If the server can handle unexpected leave, it should always return ok. If it cannot, it should fail if ANY pid was not joined.

When in shard case the behavior will be more confusing to user, what if you only joined {2, 4} and then leave {3, 6}? Should it leave {3, 4} or completely not leave? I think the server should reply what was not expected and do not execute the whole operation at all (especially with update() function). That's what I'm currently implementing.

Any choice here should be implementable, although the "let unexpected leave silently work but only return (a simple) error when non of them success" will introduce a lot of code. What do you suggest?

@zzydxm
Copy link
Contributor Author

zzydxm commented Dec 6, 2023

Updated the commit, with leave return not implemented exactly as described in the documentation yet

@zzydxm
Copy link
Contributor Author

zzydxm commented Dec 7, 2023

Hmmm while doing implementation, I plan to change the leave API from
"leaving a member with only sub-portion of the requested shard ranges will result in member leaving that sub-portion"
to
"leaving a member that didn't joined all requested shard ranges will result in a non-op also"
How do you think?

@zzydxm
Copy link
Contributor Author

zzydxm commented Dec 8, 2023

Again update leave API documentation, and fix some other errors

@sverker
Copy link
Contributor

sverker commented Dec 21, 2023

leave() with singleton group should not return the new 'bad_leave' tuple.

leave() with shard_ranges() should not return the old awkward 'not_joined'.

Do the new update() function really have to inherit the old awkward behavior from leave() with 'not_joined'? Can we not give update() clean consistent semantics.

Regarding removing non existing members. I think you should do what is most useful to the user.
My guess would be silently doing nothing for non existing members is most useful. And that is also consistent with the current leave() behavior except for the 'not_joined' return value.

Rejecting the entire leave or update op and returning a 'bad_leave' tuple seems to me to put the user in a difficult situation. The caller then has to enter some recovery code to figure out what input to give in order to get the desired new state.

But I assume you know the use case better and may have a different opinion.

@zzydxm
Copy link
Contributor Author

zzydxm commented Dec 21, 2023

leave() with singleton group should not return the new 'bad_leave' tuple.
leave() with shard_ranges() should not return the old awkward 'not_joined'.

Yep the current implementation don't return bad_leave for singleton group and don't return not_joined for sharded group, I will change the spec.

Do the new update() function really have to inherit the old awkward behavior from leave() with 'not_joined'? Can we not give update() clean consistent semantics.

This is mostly for coding simplicity, we can surely only return not_joined for leave, not for update.

Regarding removing non existing members. I think you should do what is most useful to the user. My guess would be silently doing nothing for non existing members is most useful. And that is also consistent with the current leave() behavior except for the 'not_joined' return value.

Rejecting the entire leave or update op and returning a 'bad_leave' tuple seems to me to put the user in a difficult situation. The caller then has to enter some recovery code to figure out what input to give in order to get the desired new state.

But I assume you know the use case better and may have a different opinion.

This is mostly because my previous question:
What if you do leave([[{3,6}], Group, [Pid1, Pid2]]) where Pid2 joined {3,6} but Pid1 only joined {2, 4}? Should it leave {3, 4} or completely not leave?
We claim one update() tuple is atomic, so cannot split this to two operations (one leaving {3,6} for Pid1, one leaving {3, 4} for Pid2).

For us, we will probably only do leave when shard mapping changes, which we know what to remove, and if it is not joined there is surely some problem which we want to know.

@zzydxm
Copy link
Contributor Author

zzydxm commented Dec 21, 2023

Another issue would be if the above leave([[{3,6}], Group, [Pid1, Pid2]]) happens, shall we send the same update to monitored process, or send real leaved pids/ranges to monitored process. The later one would need to do much more computation before sending the message.

@zzydxm zzydxm force-pushed the pg branch 2 times, most recently from 6375d6a to 772131b Compare December 22, 2023 00:51
@zzydxm
Copy link
Contributor Author

zzydxm commented Dec 22, 2023

I updated two versions, 6375d6a with only doc change but leave still returns error
dbd5ded makes leave/update silently success, I still prefer returning error though.

@josevalim
Copy link
Contributor

Quick question: what is the motivation for adding this to Erlang/OTP instead of maintaining it as a separate package? Is this something that could be used in other parts of OTP or inside Ericsson? Is it a limitation of pg that must be addressed within Erlang/OTP itself?

Note I am mostly being curious. :) Thank you and happy new year!

@sverker
Copy link
Contributor

sverker commented Jan 4, 2024

@josevalim That's actually a good question.

I think we should consider taking a step back and think about if this is better done as a separate module as a first step. In that way the interface can evolve and mature to best fit the actual use case. Then we also don't need to be constrained by the limitations/mistakes of pg, like lacking upgrade support and awkward double join semantics.

As a future part of OTP I personally think something with a higher abstraction level would be more interesting. Something where you get support for doing the mapping and remapping of the shard ranges. Maybe the individual shard ranges do not need even to be exposed at all in such an interface. Just a mapping from key (hash) to pid.

@zzydxm
Copy link
Contributor Author

zzydxm commented Jan 5, 2024

Hmmm what do we do now? I'm ok to make it a new module.

@max-au
Copy link
Contributor

max-au commented Jan 6, 2024

I agree that sharding should not be an integral part of (basic) pg. A separate module could be a viable solution. Although, I'm not sure whether suggested implementation is beneficial for all OTP users. It looks like a good fit for one specific implementation of the remote procedure call, rather than a generic module.

To me, such a module can be a part of the library implementing "RPC at scale" solution (e.g. sharded RPC). Something I've been entertaining a few years ago (https://github.com/max-au/lambda). Hence, I'd think that shard_pg can be made a part of a larger library, supporting more than just sharded service discovery. Likely, as a separate repository, a new hex.pm package etc.. It can help with quicker iteration. If it proves to be useful for generic OTP, it can be then contributed as a whole. I've taken that approach in the past (e.g. spg -> pg, peer, argparse), and it worked well.

@max-au
Copy link
Contributor

max-au commented Jan 6, 2024

we also don't need to be constrained by the limitations/mistakes of pg, like lacking upgrade support and awkward double join semantics.

Good call! Since pg was almost a drop-in replacement for pg2, it had to implement a fair share of pg2 join semantic. For sharded RPC libraries, there is no need to follow anything pre-2000s.

@zzydxm
Copy link
Contributor Author

zzydxm commented Jun 20, 2024

Follow up after the offline discussion:
Can we separate the current pg to two parts: one part for peer communication (data_publisher), and another part for local caching, so that we can use the peer communication part to do the shard_pg ourselves?

The API:

-module(data_publisher).

This behavior module provides a server for publishing certain local data and its updates to all the connected nodes.

A module implementing this behavior should define a data type local_data() and an update type local_update().

Whenever a local data change happens through update_data/2 or replace_data/2 API, the change will also be propagated to all the connected peers where the Scope is started.

Any server that receives the data update, will run the apply_update/3 or apply_replace/3 callbacks to update the local cache of the data.

A typical usage of this behavior is the pg service discovery. Every node can publish a (process group name) -> (local pids) map, so that on any node in the cluster, one can get all the pids in the entire cluster that belong to this process group.

-type scope() :: atom().
-type local_data() :: dynamic().
-type local_update() :: dynamic().

-spec start(scope(), module()) -> gen_server:start_ret().
-spec start_link(scope(), module()) -> gen_server:start_ret().
Starts the data publisher scope with the implementation module specified

-spec update_data(scope(), local_update()) -> ok.
-spec replace_data(scope(), local_data()) -> ok.
Make a change to the local data and sends the update to all connected peers

-callback init_data() -> local_data().
Initialize the data type

-callback apply_update(node(), local_data(), local_update()) -> local_data().
-callback apply_replace(node(), Old :: local_data(), New :: local_data()) -> ok.

How pg works:

  1. start gen_server with name Scope
  2. send discover message to all {Node, Scope} in nodes()
  3. when receiving sync data (all local information on the peer node)
    1. cache it locally
  4. when join happens
    1. monitor pid
    2. broadcast update to all peers
  5. when leave/process_exit happens
    1. broadcast update to all peers
  6. when peer is down
    1. clear all local cache of the peer’s joined pids

How pg works in new world:

  1. start gen_server with name Scope
  2. start another gen_server with name ‘$Scope’
  3. ‘$Scope’ send discover message to all {Node, ‘$Scope’} in nodes()
  4. when receiving sync data (all local information on the peer node)
    1. call pg:apply_replace(Peer, pg:init_data(), SyncedData)
      1. Scope gen_server will receive a message, and do a local cache update
  5. when some Pid joins Scope
    1. Scope gen_server monitor the Pid
    2. call data_publisher:update_data(Scope, Update)
      1. data_publisher broadcast the Update
  6. when receiving update from data_publisher:
    1. call pg:apply_update(Peer, Update)
      1. Scope gen_server will receive a message, and do a local cache update
  7. when leave/process_exit happens
    1. call data_publisher:update_data(Scope, Update)
      1. data_publisher broadcast the Update
  8. when Peer is down
    1. call pg:apply_replace(Peer, CurrentData, pg:init_data())
      1. Scope gen_server will receive a message, and do a local cache update

The API can also be made to support versioning, e.g.:
-module(data_publisher).

-type scope() :: atom().
-type supported() :: dynamic().
-type local_data() :: dynamic().
-type local_update() :: dynamic().

-spec start(scope(), module()) -> gen_server:start_ret().
-spec start_link(scope(), module()) -> gen_server:start_ret().
-spec update_data(scope(), local_update()) -> ok.
-spec replace_data(scope(), local_data()) -> ok.

-callback supported() -> dynamic().
-callback init_data() -> local_data().
-callback update_message(supported(), local_update()) -> local_update().
-callback replace_message(supported(), local_data()) -> local_data().
-callback apply_update(node(), local_data(), local_update()) -> local_data().
-callback apply_replace(node(), Old :: local_data(), New :: local_data()) -> local_data().

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
team:VM Assigned to OTP team VM
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants