Replies: 11 comments 32 replies
-
Hi! Thanks for this description. I have some thoughts, consider them below.
On the one hand it's ok, on the other hand temporary space + uuid.bin() as identifier could protect from occasional reconnects.
It's better to keep it configurable. So, I heard a lots of complaints after "smart discovery" was introduced - it made tests slower (e.g. 30k buckets is discovered for several seconds - it was quite unexpected for some users) and also it affects RTO. It could be tuned with "vshard.const" values change but it's not public interface.
Sometimes it could be useful to know the source (replicaset_uuid) of result. Maybe it's better to keep result in map It would be great to allow to support "buffer" option as well. It could be useful in case of tuple-merger usage. Shouldn't we split "pin_timeout" and map "timeout"?
Not sure "r" is needed here. Reduce stage is performed on router side by user. I propose "map_callrw". |
Beta Was this translation helpful? Give feedback.
-
A space (even without WAL) is order of magnitude slower than a Lua table. I remember Alexander L. had benchamrks in one of the closed tickets in tarantool/tarantool, but I don't remember the issue number. So I would better avoid a space. Heap implementation is super easy, so it is not a big deal to implement it. Talking of UUID - its usage leaves a non-zero possibility, that they will clash. I know, it is extremely unlikely, but it is possible. So I would better avoid any global storage for all pins. Also UUIDs are heavier - will be >16 bytes for each network message.
If a master is switched, you simply won't see the needed sync on the new master in a new session, and the request will fail. Master switch is not different from restart, because I use session local storage. Basically, I have the same guarantees as TCP - no duplicates, no messages from 'foreign' connections whatsoever. I will add this to the RFC if it is not obvious.
Yeah, will think about it. If I won't find a way to make it good and automatic, then will expose a config option. I don't like new config options though because they complicate the config, and tend to outdate.
Sounds sane, should be good.
The same as the other functions - nil + error object. I won't return many errors.
Indeed. I will add it to the RFC.
It means you are ready to wait for pin_timeout + timeout seconds. So why not just pass them as one timeout? It can easily happen, that pin will take longer than the request itself when rebalancing is in progress, so these timeouts won't be super stable as 2 values. Also I am worried that pin_timeout breaks encapsulation, because I am not 100% sure pins will exist always, and I won't come up with something smarter in future.
True, 'r' is excess. Maybe |
Beta Was this translation helpful? Give feedback.
-
Ok. Agree. Also issue you mentioned is tarantool/tarantool#458
It's obvious but may be we can provide some more complex mechanisms that could consider such cases.
It would be great to return problem replicaset_uuid. It will help to investigate problems for users.
My concern is about conditions when I ready to wait 1 second for successful pin and e.g. 5 seconds for successful map call.
Ok. I can't insist here) Also I've missed one idea in the first message. It's "partial map call". Sometimes I have some amount of bucket_ids and choose subset of replicasets to perform map call. I think it could be easily implemented under some option e.g. |
Beta Was this translation helpful? Give feedback.
-
There is a comment from @R-omk which I paste below as is and respond in reply. Нужно предусмотреть возможность включения pin не только при вызове со стороны роутера, но и непосредственно с самого репликасета. Сценарий может быть такой : |
Beta Was this translation helpful? Give feedback.
-
There is a comment from @R-omk which I paste below as is and respond in reply. Другой пример использования: что касается решения map reduce в лоб то не совсем ясно как использовать курсор в случае работы с mcallrw , ведь все время пока не вышел таймаут и курсор не закрыт нужно держать пины на шардах , скажем как клиенту имея уже открытый курсор делать последовательные запросы к роутеру для получения следующей порции данных... в примере который я описал выше таких проблем нет вообще, там и обход проблемы мерджера с мультикей и курсор в любое место и лишних блокировок нет пока клиент забирает данные , минус только временная таблица. |
Beta Was this translation helpful? Give feedback.
-
There is a comment from @R-omk which I paste below as is and respond in reply. Кроме того, было бы не плохо сделать систему триггеров или хуков, для случая когда шедулер запланировал перемещения чтобы он мог посигналиить и "мягко" попросить по возможности завершиться фоновым таскам и снять пины. хотелки: На текущий момент по факту все переключения rw- ro и выключения - это жесткое отрубалово вместо реджекта вновь прибывших и ожидания завершения текущих. |
Beta Was this translation helpful? Give feedback.
-
I agree that it's not good.
|
Beta Was this translation helpful? Give feedback.
-
What if one replicaset has time to perform rebalancing, but all possible target replicasets are still locked? It seems to me that a robust centralized state machine would solve many borderline issues. |
Beta Was this translation helpful? Give feedback.
-
Maybe a bit offtopic question but should "interactive transactions" (AFAIU it's planned to current quarter) ease the pain when we move buckets from one storage to another? Maybe we should add some note about it in "Future work" section? |
Beta Was this translation helpful? Give feedback.
-
I want more comments on the storage API part. Now it looks like this: ref, err = vshard.storage.ref(timeout, {expire = seconds})
ref:delete() If you don't delete manually, Lua GC will do it when But there are questions to this API. Question 1 What if Question 2 When I designed the ref objects, I thought that when you implement your own map-reduce, you will probably do it like the built-in map-reduce with some insignificant difference. It means you would need to create ref objects on all storages, and then send map requests which should execute some function and delete the refs. For the map step you would need to be able to get the ref object after it was created by some previous request. It means you would need to store the refs somewhere globally in addition to them being stored in vshard.storage.internal. So this means kind of a duplication and also probably means we don't need auto GC of refs. The question: what is better - ref objects, or return ref_ids? Because it is easier to store numbers, faster and consumes less memory, and it would also allow eliminate the first question about ref lifetime. Question 3 As you know, map-reduce internally will use refs with expiration timeouts and identified by {session_id, ref_id} pair. It means the only difference between internal refs API and public You will need session ids if you will implement your own map-reduce. By the reasons I specified in the RFC. It means you will need to keep your own session triggers and ref groups by sessions somewhere. Besides, you will need to keep them as 'weak' tables, because if a ref is expired and deleted, you won't get any triggers, and won't be able to delete it from your tables manually. The question is: should we also expose session_id to the Summary If we apply all things proposed above, we end up using exactly the same function both for builtin map-reduces and for public vshard.storage.ref(session_id, ref_id, {
timeout = seconds, -- How long to wait to get the ref.
expire = seconds, -- Expiration timeout.
})
vshard.storage.unref(session_id, ref_id) Which is super flexible, but hardly usable for anything simple. On the other hand, the original API with ref objects, proposed in the RFC, is super easy, but hardly applicable to anything complex like your own map-reduce. Without a lot of complications, way harder than the API above. If somebody has ideas how would they implement map-reduce and what helper API they would prefer, lets discuss. Otherwise I probably won't expose any public API for refs at all in scope of this ticket, until I know better what is it going to be used for. I will have something exposed in |
Beta Was this translation helpful? Give feedback.
-
Just want to add an idea/test case. If storage weight=0 (or maybe bucket_count=0) it should be ignored by map_call. (Seems quite obvious but it was missed in one project). |
Beta Was this translation helpful? Give feedback.
-
The related issue is #147. The discussion starts with a description of how the task looks in my understanding. Then I provide my vision of API and behaviour, some insights at internals, open and frequent questions, alternatives.
Problems with existing functionality
At this moment router offers only methods for going to a certain node by
bucket_id
, these methods are all havingcall
prefix:callro
,callrw
, etc. To scan the whole replicaset it was necessary to userouteall()
method and call a function on each returned replicaset.Unfortunately, implementation of map-reduce via
routeall()
can't be done in a consistent way. At least without significant issues with rebalancing.Meaning of consistency depends on context, but in vshard in scope of map-reduce it means that it should be possible to call a function on each bucket in the cluster, and without duplicate buckets.
If map-reduce would be blindly done via
routeall()
, both bucket miss and bucket duplicate can happen. A bucket can be missed, if it was on one storage node where map is not called yet. Then it is moved to a second node, where map is called already. Then map is called on the first node - the bucket wasn't seen at all.A bucket can be 'duplicated' if map was called on the node where the bucket is stored. Then the bucket is moved to a node where map is not called yet. And now map is called there too. Now the same bucket was scanned twice.
Algorithm of consistent map-reduce
It is easy to see that the problems above arise from the rebalancer working during map-reduce calls. To fix it map-reduce can be turned into a 3-stage process: ref-map-reduce.
'Ref' stage should reserve time on the storages for the 'map' execution while rebalancer is not allowed to work. After all refs are done, the cluster won't rebalance until the refs expire or maps are done. Full name of the stage is 'storage ref'. It is similar to the existing concept of individual bucket refs, but works like if all the buckets on the storage are referenced.
'Map' executes the needed function while being sure that rebalancer won't interfere. Because it is done only after 'ref' was done on all storages.
'Reduce' is done on the router side out of its API, by the user. It operates on whatever 'map' returns.
Request lifecycle
Here is the step-by-step algorithm of what happens on each stage of a map-reduce request.
Step 1
Router generates a unique identifier ref_id, like sync in IProto. Just a number. It is needed to create refs and match the following maps with the refs by the ref_ids.
Step 2
Router sends a ref request with ref_id and a timeout to all master nodes. It must use master nodes, because ref request is supposed to prevent rebalancing and rebalancer talks only with masters (so far).
Step 3
Storage at receipt of a ref request checks if there is any inactive bucket or rebalancing in-progress. If any of this is true, the ref request is saved while the storage will try to delete all garbage buckets, and pause the rebalancing.
If none of that was true, or became non-true before the timeout expired, the ref request is successful. It is kept on the storage to prevent bucket send and receipt for the rest of the timeout, while the router gets successful result.
The ref request is saved into the session-local storage, because otherwise syncs from different routers could clash.
Step 4
Router collects ref results from all storages. If all successful, it can be sure the rebalancer won't interfere during the specified timeout. It sends 'map' requests with the same ref_id to all master nodes.
Step 5
Master nodes receive map requests. They check if there is still a pending ref for the given ref_id. If not, it means the ref has expired, and rebalancer could already interfere. An error is returned.
If ref is still here, it is removed, and a counter of active map requests is incremented to prevent any rebalancing during the request, without any timeout. The map request is executed, and the counter is decremented. Whatever the map function returned is forwarded to the router.
There is no timeout during the map request being executed, because it is already in progress, and can't be stopped anyway. The only choice rebalancer has is to wait until the existing map requests are done.
Step 6
Router collects map results from all storages, and returns them to the user.
This schema ensures that in case of success on all steps the user function was executed on each bucket (had access to data of each bucket without any explicit pins, individual bucket refs, checks).
Scheduler
Just like refs will take into account the moving buckets, rebalancer should start taking existing refs and running maps into account. Or rather not the rebalancer itself, but the individual master nodes. They should apply rebalancer routes more carefully. The same applies to manual bucket moves (
bucket_send
).The work of deciding what should work when - storage ref or bucket move - is
going to be done by a scheduler. It is an algorithm which should be described by a
few numbers representing its state machine in the code, and had some functions to switch the states depending on happening events.
Dumb scheduler
A dumb but simple solution would be to simply make a master node not send nor receive any buckets while there are refs or maps. And not accept new refs if there is working rebalancer.
This unfortunately may easily lead to starvation. Under big map-reduce load rebalancing will basically stop. And vice-versa, with aggressive rebalancing map-reduce requests won't be able to start.
Simple scheduler
The next best solution in terms of fairness vs simplicity is to give preference to map-reduce requests but if there are pending bucket moves, execute not more than N map-reduces, and then give time to bucket moves.
N can be, for example, 1000 by default and be configurable. Should not matter so much if bucket count is big, and bucket sizes are not, and map-reduces are not too long. This should work especially good if parallel rebalancer is enabled (
rebalancer_max_sending
setting).Problem of this scheduler is being useless if a user would want to execute more bucket moves than storage refs. It can easily be necessary, if refs are used in long map-reduce requests involving heavy SQL queries running for seconds. And at the same time the buckets are small and can be moved fast.
Fair scheduler
Another algorithm, which is not very much harder, but more flexible and smart: allow to specify how many moves and refs can be executed sequentially until can be interrupted. For example, you specify that bucket move quota is 2 and storage ref quota is 15. It would mean that at most 2 bucket moves can be done sequentially if there are pending refs. And at most 15 refs can be done if there are pending moves. At the same time, if one of the requests is never or rarely used, the other one is allowed to work until the second one appears. So the quota can be overused if it won't harm the other request.
This looks a bit similar to CFS process scheduler in Linux and 'nice' values. In CFS algorithm the processes also can use more CPU than allowed by their priority if the other processes with higher prio don't need it now.
With this algorithm it is possible to specify more bucket moves than storage refs if necessary.
Technicals
It is not enough to have a scheduler - it is also necessary to patch garbage collection and recovery. Because even if there are no bucket moves in progress, pending refs can't be satisfied until all buckets are active. It means, recovery and garbage collection should be reworked to process the buckets faster, in a reactive manner rather than the proactive one, like now in master branch.
For instance, if
bucket_send()
happened, it leavesSENT
bucket on the storage. It must be deleted with all its content ASAP if there are pending refs. Otherwise the map requests can't count on being able to access any data on the storage safely.Another technical issue - someone should expire the refs. VShard can't create a
ev_timer
per each ref because it is not accessible from Lua. Therefore the refs must be specifically waited for by some fiber. The best option so far I see is to make the garbage collector fiber do that. Because expired refs are garbage.In order to wait for the refs correctly, GC fiber must be able to find the closest deadline in an efficient way. The proposal is to store all the timed refs in a binary heap implemented in Lua. This is exactly what
ev_timer
is, so it looks more or less consistent with the event loop tech. Next deadline search would cost O(1) and update the heap (to add a new ref or delete one) for O(log(N)) which is blasting fast for any sane N value.API and behaviour
Functions
vshard.storage.ref()/unref()
The concept of refs invented for map-reduce appears to be useful not only for this. For instance, it looks useful to ref the storage to perform some format migrations for all the data here, if you don't want to pin/unpin individual buckets or stop rebalancing in the whole cluster.
Also could be used for custom implementation of your own map-reduce, if the built-in one somewhy does not fit the needs by design.
The function refs the node against any bucket moves, incoming and outgoing. The function will block the fiber until the
timeout
is reached or until the ref is successful. If it is successful, it means the storage does not have any moving buckets and won't allow to move them until unreferenced explicitly or until the specifiedexpire
time passes.Returns a reference object on success, and nil + error object on error.
So far it works only on master node, because rebalancer only talks to masters, and the masters don't respect replicas when it comes to bucket or storage refs.
Unref the storage previously referenced by
ref()
. Returns immediately without yields. Returns true on success, and nil + error object on error.If the ref had an expiration timeout, it won't expire after deleted explicitly. Also the refs are auto-deleted if their Lua object is collected by Lua GC. The manual deletion won't do anything if the ref has already expired, and it will return an error then.
Internally these manual refs work exactly like map-reduce refs, and will be used by routers.
vshard.router.map_callrw
Router will use
vshard.storage._call
, which allows to add new service functions without touching_func
. So vshard schema won't change anyhow. It can't usevshard.storage.ref()
directly because need to assign IDs to the refs, and keep them session local. But the created refs won't be any different. The same forunref()
- map requests will be sent as follow-ups and will need to find the ref by ref_id, which is not going to be a part of the public refs API.The signature:
It looks very much like
callrw()
, but does not takebucket_id
. Becausefunc
is called with each bucket visible and safe to access for any read and write operations.func
is your function on the storage which can be sure_bucket
is stable while the function runs.args
passed to the function as is.opts
only supportstimeout
so far.The passed timeout restricts time of the ref and map stages in total. Refs will have this timeout on the storages, and will expire after it passes if are not collected by map requests somewhy.
Note, that a too big timeout and a single map-reduce fail in the middle will lead to refs being kept on the storages for this big timeout, preventing rebalancer work.
The function returns a dictionary of what each storage returned with replicaset UUID as a key. For example, if storage1 returned
'abc'
, storage2 returned{1, 2, 3}
, and storage3 returned4, 5, 6
, then the result will be{[storage1_uuid] = {'abc'}, [storage2_uuid] = {{1, 2, 3}}, [storage3_uuid] = {4, 5, 6}}
.Note how all results are wrapped into additional
{}
. This is for the sake of multireturn. Miltireturn will work like incall*()
functions - at most 3 values at once.In case of error the function returns nil + error object. The error has the format
{uuid = replicaset_uuid, error = error_object}
. Error means thatmap_callrw()
faced an error while trying to work with the replicaset having the returned UUID. The error object indicates what exactly happened.The function has
rw
suffix, because can only work on masters so far. Perhaps it will be possible to improve, when #173 is fixed. Now there is no way to prevent bucket move from a replica.The function will use
is_async
internally so as to send all refs at once, collect their results, then send all maps at once, and collect them too. So its time overhead should be ~2 network exchanges to the most far storage. 1 to send ref requests in parallel and get results, 1 to send map requests in parallel and get results.Configuration
The only global configuration update is the fair scheduler settings: how many refs to execute in a row before giving time to the bucket moves, and vice-versa.
This would be the default config which says it is allowed to make 1 bucket move at most, if there are pending refs. And it is allowed to make 1000 refs in a row if there is a pending move, after what the control is given to this move.
Future work
There are features not going to be implemented in the first version of map-reduce functionality. Their introduction will happen on demand since they seem to be independent from the core part of the main feature.
Option is_async
is_async
option forvshard.router.map_callrw()
.map_callrw()
usesis_async
internally, but it seems tricky to make the whole function work asynchronously. Because it consists of multiple steps.Option buffer
buffer
option forvshard.router.map_callrw()
. It should be useful when result won't be used right away, but will be passed to a merger or sent further.Master switch bug
There is an issue with master switch. It allows to bypass the refs. The problem is exactly the same as with the bucket refs described in #173. If another node becomes a master during execution of a map function, it won't have the refs, and may accept rebalancer requests. Which will be applied, and replicated to the old master without any protection. This must be fixed in scope of #173 when I will find an algorithm how to move buckets with all nodes of a replicaset agreeing with the moves. Or at least some of them not applying the moves until it is ok.
FAQ
What are the guarantees?
With this schema it is worth mentioning, that still some map requests may fail, and some may succeed. Obviously, successful refs do not guarantee that the network won't break now, or that some storages won't crash. It means, it is not ok to rely on map-reduce working 'atomically', that it is either executed everywhere or nowhere. It can be executed partially and return an error then.
The only guarantee here is that if it is executed successfully, it has seen all the buckets. This makes unsafe to use it for arbitrary DML/DDL without additional protection. For instance, if you deploy a new space, you can retry map-reduce with
if_not_exists
option increate_space
.It also does not protect from data being changed inside of the buckets while a map-reduce is being executed. The only 'consistency' vshard can ensure is that the buckets themselves won't move.
Why do you need map and ref separated?
Why can't 'map' simply wait on each storage until it has no inactive buckets and block sending/receiving until 'map' is done? Why not to merge ref and map stages into one? It would be a good optimization.
No, it would be a bug. It still could happen, that a bucket was missed. For example, consider this test: there are 2 storages. 'Map' request is sent to both. On the first storage it is executed fine. Now the second storage sends a bucket to the first storage. Now 'map' reaches the second storage, and the just sent bucket won't be ever seen by this map-reduce request. This proves 'refs' are necessary.
Why not split ref and map timeouts?
Assume you did it, now there are
ref_timeout
andmap_timeout
invshard.router.map_callrw()
function. It means you are ready to wait forref_timeout
+map_timeout
anyway. So why do you need to split them? Their individual values don't really matter if you are ready to wait for the total timeout.Why session local storage? Why not global storage with UUIDs?
In the design it is stated that internally ref_ids will be session-local and generated on the client. This allows to avoid any possible ref clashes from different connections if the client simply uses a global or per-connection monotonic counter for ref_ids.
If the storage wouldn't be session local, 2 clients could generate the same ref_id, and one of them will clash with an error. Why not use global storage but with UUIDs? Because they still can clash. It is much less possible than with integers, but the possibility is not 0.
How is master switch handled?
There is no handling, it is a bug, see #173 which is basically about the same but applied to bucket refs instead of storage refs.
Until the bug is fixed, it can happen, that the master has some refs and running map requests. Now it is switched to replica and another node is switched to master. The new master does not have the refs. Rebalancer is able to work with it. It can move buckets from/to the storage, this change is replicated to the old master where the map requests still can be running, and they may see artifacts of the rebalancing.
Why there is no a partial map-reduce call, for a subset of replicasets?
Because it does not make much sense really. It won't be a map-reduce anymore. Because the only reason you would want to ref certain nodes is for accessing a bunch of buckets at once, but not all buckets of the cluster. It means that on the router you had some data, you extracted bucket_ids, matched them to replicasets using
vshard.router.route()
, removed duplicates, and got several replicasets you want to visit. But by the time you will ref them and visit, the buckets could be already moved from there.It means for the partial map-reduce you still need proper individual bucket refs if you don't want to ref the whole cluster like
map_callrw()
does.Open questions
Function to wipe all refs
Do we need a function to wipe all refs from a storage? In case someone used a too big timeout for a map-reduce call, and then map wasn't done, but ref was. It may block the rebalancer for too long time, and the only fix would be a restart. Something like
vshard.storage.unref_all()
?Timeout split
I thought more about timeouts split. I still don't like it, because it exposes a detail of the implementation, and the ref time is going to be very different with rebalancing in progress and not in progress. So it will be hard to set this time universally. However I left this as an open question, because still think about it. The latest idea I had is add
timeout
as the total time, andmap_timeout
as the time for the map calls. For everything else the router will usetimeout - map_timeout
time. It would allow not to expose that there is a preparation/finalization stage, and how many of them, and their names.RW vs RO refs
I am not sure if it is necessary to have 2 types of refs: RO and RW. Individual buckets have that separation. But I don't know it is makes much sense for RO. Here is why.
A full storage RO ref could be taken on SENDING buckets, and won't prevent bucket moves. Usually (almost always I assume) you want an RO ref on all storages in the cluster, for a map-reduce request. However if there is a SENDING bucket on one storage, there is RECEIVING on the other one. And on the other one the RO ref won't work. Therefore the map-request does not need RO refs.
So it looks to me that having just a 'ref' is enough if it will be used for full cluster ref in scope of map-reduce requests or other global things.
Of course, an RO ref would make sense if you would want to ref only a subset of storages. But I can't imagine this case. In FAQ I described one of the examples, and has proven that it is wrong. Can't find a valid and needed one.
Alternatives
The section is similar to open questions, except that the topics here are rather closed, and kept for history and deeper understanding of the made decisions.
Space instead of a heap?
It is possible to store ref_ids in a space instead of a heap. It could have a compound primary key
{session_id, ref_id}
and would store timeout in its tuples. This would allow not to implement a heap.Even though the space obviously should be temporary (no work with WAL), still it is going to be much less performant that a plain Lua table on top of which there is a heap. See tarantool/tarantool#458 for benchmarks. Besides, heap implementation is super easy to do.
Therefore better use Lua or implement it all in plain C/C++ without Tarantool storage functionality at all when vshard is ported to C/C++.
Beta Was this translation helpful? Give feedback.
All reactions