diff --git a/doc/developer/design/20230921_distributed_ts_oracle.md b/doc/developer/design/20230921_distributed_ts_oracle.md new file mode 100644 index 0000000000000..f95bd06156e9e --- /dev/null +++ b/doc/developer/design/20230921_distributed_ts_oracle.md @@ -0,0 +1,482 @@ +# Platform v2: Distributed/External Timestamp Oracle + +## Context + +As part of the platform v2 work (specifically use-case isolation) we want to +develop a scalable and isolated serving layer that is made up of multiple +processes that interact with distributed primitives at the moments where +coordination is required. In order to do that, we need to introduce strict +boundaries (APIs) between the different responsibilities of the Coordinator and +Adapter, where Adapter are those parts of the current `environmentd` process +that comprise today's serving layer. + +The distributed TimestampOracle is one of these distributed primitives/APIs that +we want to extricate from the Coordinator. + +Creating a distributed timestamp oracle is one of the necessary prerequisites +for the full realization of use-case isolation, but it is not sufficient. As we +will show below, though, implementing our proposal now can already improve +Coordinator/serving performance today (both for throughput and latency) and we +want to implement the necessary primitives incrementally and roll them out as +early as we can and as it makes sense. This way, we can incrementally provide +customer benefit on the path to implementing full use-case isolation. + +## Goals + +- implement a distributed TimestampOracle that is independent of the rest of + Coordinator/the Catalog/state stored in Stash, such that it _could_ be + interacted with from multiple processes concurrently +- provide the same correctness guarantees as the current timestamp oracle: + _strict serializable_ reads/writes when requested +- don't regress in performance (latency and throughput), within _reasonable_ + bounds +- (side effect) actually improve performance (latency and throughput) for the types + of queries that we care about in the context of use-case isolation + +## Non-Goals + +- the rest of the use-case isolation milestones, including an actually + distributed serving layer +- linearization (or _strict serializable_ guarantees) around DDL beyond what we + support currently +- getting rid of the (process-)global write lock for tables + +## Overview + +The [design doc for transactional +consistency](./20220516_transactional_consistency.md) describes how the +TimestampOracle is used in service of our consistency guarantees and what +requirements we have for its behavior. The oracle, as currently implemented, +cannot easily be used from multiple processes so we have to implement a +replacement for use in the distributed serving layer. Our proposal for this +replacement is a distributed TimestampOracle that uses CRDB as the backing +store. + +Below, we first outline the expected behavior of any TimestampOracle (in the +context of Materialize) at a high level, then we describe the current oracle, +and finally we propose our distributed oracle. We will also examine the +implications of the new oracle for performance (both good and bad), mitigations +we have to apply so we don't regress in performance, and how we plan to roll out +this change. + +> [!NOTE] +> Keep in mind that whenever we write _reads_ below we are talking about +> _linearized reads_. For serializable reads we don't need to consult the +> timestamp oracle. + +## TimestampOracle state and operations + +A timestamp oracle logically has to keep this state (distributed or not): + +- `read_ts`: the timestamp at which to perform reads +- `write_ts`: the most recent timestamp that has been allocated for a write + +And it has to provide these operations: + +- `allocate_write_ts()`: Allocates a new timestamp for writing. This must be + strictly greater than all previous timestamps returned from + `allocate_write_ts()` and `get_read_ts()`. For a real-time oracle, we + additionally want to make it so that allocated timestamps closely track + wall-clock time, so we pick a timestamp that obeys both requirements. +- `peek_write_ts()`: Returns the most recently allocated `write_ts`. +- `get_read_ts()`: Returns the current `read_ts`. This must be greater than or + equal to all previously applied writes (via `apply_write()`) and strictly less + than all future timestamps returned from `allocate_write_ts()`. +- `apply_write(write_ts)`: Marks a write at `write_ts` as completed. This has + implications for what can be returned from the other operations in the future. + +It is important to note that these operations have to be linearized across all +interaction with the timestamp oracle, potentially from multiple processes. For +the current implementation this is easier, because we only have a single +serving-layer process. Although there is some complexity around ensuring that +there _is_ actually only one process serving queries. We will see how the +proposed distributed TimestampOracle solves this problem below. + +## The current in-memory/backed-by-stash TimestampOracle + +The current TimestampOracle is an in-memory, single-process oracle that relies +on the Stash for persisting the read/write timestamp. Stash is used for failure +recovery and correctness (read linearization) guarantees: + +- the oracle reserves ranges of timestamps by writing a timestamp to Stash +- timestamp operations are "served" out of memory +- periodically, a higher timestamp is written to Stash to reserve more time +- there can only be one "leader" `environmentd` process, meaning one process + that does serving work and interacts with the timestamp oracle +- the Stash is used to fence out other processes, both in case of + failure/restart and during upgrades + +The upside of this approach is that oracle operations are very fast, +essentially "free", because they are served out of memory and modify in-memory +state. + +The downsides are: + +- it's not (easily) possible to extend the approach to multiple processes +- at some point between getting a read timestamp and returning a result, the + serving process has to assert that it is still the leader +- asserting leadership incurs a costly Stash transaction, through + `confirm_leadership()` + +Those last two points are subtle, but what could happen if we did not do that is: + +1. old leader reserves timestamps up until t1 +2. a new leader (new `environmentd` process) starts up +3. it reserves a new range of timestamps up until t2 +4. a write happens at a timestamp higher than t1 +5. the old leader could accidentally serve a read at a timestamp lower than t1 + -> a linearization anomaly + +We will see below how doing the costly Stash transaction is amortized, making +it viable in practice, and how we have to do a similar thing to make the +distributed TimestampOracle viable. + +## The proposed distributed TimestampOracle backed by CRDB + +The core idea of the distributed TimestampOracle is that oracle state for a +timeline is stored in a row in CRDB. And all timestamp operations become +queries against CRDB. We outsource correctness/linearization to CRDB, which +comes with the obvious cost of doing a CRDB query for every timestamp +operation. We will see below what optimizations we have to apply to make this +approach viable and not regress performance (latency and throughput). + +The backing table looks like this: + +```sql +CREATE TABLE IF NOT EXISTS timestamp_oracle ( + timeline text NOT NULL, + read_ts bigint NOT NULL, + write_ts bigint NOT NULL, + PRIMARY KEY(timeline) +); +``` + +And for completeness we also spell out all the oracle operations. + +`allocate_write_ts(timeline, wall_clock_ts)`: + +```sql +UPDATE + timestamp_oracle SET write_ts = GREATEST(write_ts + 1, $wall_clock_ts) + WHERE timeline = $timeline + RETURNING write_ts; +``` + +`peek_write_ts(timeline)`: + +```sql +SELECT write_ts FROM timestamp_oracle + WHERE timeline = $timeline; +``` + +`get_read_ts(timeline)`: + +```sql +SELECT read_ts FROM timestamp_oracle + WHERE timeline = $timeline; +``` + +`apply_write(timeline, write_ts)`: + +```sql +UPDATE + timestamp_oracle SET write_ts = GREATEST(write_ts, $write_ts), + read_ts = GREATEST(read_ts, write_ts) + WHERE timeline = $timeline; +``` + +> [!NOTE] +> All of these queries go through the 1-phase commit fast path in CRDB: the +> output for `EXPLAIN ANALYZE (VERBOSE)` contains `auto commit`. + +An important thing to note here is that timestamp operations are now correct by +themselves: we don't need the `confirm_leadership()` call anymore for +preventing linearization anomalies. + +## Optimizations that are required to make the distributed TimestampOracle viable + +As part of query processing, we perform multiple oracle operations per each +single query so simply replacing the current in-memory TimestampOracle with the +distributed TimestampOracle would not be viable: operations that were +previously "free" would now take multiple milliseconds. + +The solution for this is that we have to batch oracle operations, both for read +and write queries. We will examine below how this is similar to the current +code where we perform expensive `confirm_leadership()` calls, but batch them to +amortize cost. + +### Batching of `get_read_ts` operations + +Currently, the pipeline for read queries (peeks, in internal parlance) looks like this: + +1. peek comes in +2. determine read timestamp using `get_read_ts()` +3. render query graph and send off to compute +4. wait for results from compute +5. when results are ready add them to a `pending_peek_results` buffer +6. whenever we are not busy and have pending query results: perform + `confirm_leadership()` and send results back to client + +The salient points here are that the oracle operations in step #2 are "free", +so we are doing one oracle operation per peek, and that the +`confirm_leadership()` operation of step #6 translates to a CRDB transaction, +which is expensive. Batching up those pending results and doing a single +`confirm_leadership()` operation is what makes this approach viable. + +We have to do a similar optimization for the distributed TimestampOracle but +instead of batching up results for `confirm_leadership()` we need to batch up +incoming peeks and determine a timestamp for multiple peeks using one oracle +operation. We can do this because the distributed TimestampOracle operations +provide linearization in and of themselves and we no longer need the +`confirm_leadership()` operation. + +The modified pipeline for peeks will look like this: + +1. peek comes in +2. add peek to `pending_peeks` +3. whenever we are not busy, determine a read timestamp for all pending peeks + using `get_read_ts()` +4. render query graph and send off to compute +5. wait for results from compute +6. when results are ready, immediately send back to client + +Looking at a processing trace for a single peek, we see that we are doing +roughly the same work but at different moments in the pipeline. Additionally, +the oracle `get_read_ts()` will be ever so slightly faster because it is a +simple read query instead of a write transaction as we have to do for +`confirm_leadership()`. + +### Batching of `allocate_write_ts` operations + +Each write query needs at least an `allocate_write_ts()` and an `apply_write()` +operation. We already have the concept of a _group commit_ for batching +multiple incoming write queries together, see the [design doc for transactional +consistency](./20220516_transactional_consistency.md). This makes it so that we +a) need to perform fewer table writes, and b) have to do fewer timestamp +operations. With the current in-memory TimestampOracle this is a nice +optimization but with the distributed TimestampOracle it becomes necessary +because, again, timestamp operations become much more expensive than "free". + +We mention this here because the way the coordinator main loop is currently +structured we barely batch up any writes, meaning the optimization doesn't +work. This becomes more apparent with the distributed TimestampOracle. Before +enabling the distributed TimestampOracle we therefore have to make sure that +batching of write operations into group commits works as expected. + +## Optimizations that are enabled by the distributed TimestampOracle + +The current in-memory TimestampOracle is not easily shareable (even amongst +threads/tasks) and the `confirm_leadership()` operation has to be performed "on +the coordinator main loop". The new durable TimestampOracle will not have these +limitations: it can be shared within a process and by multiple processes. This +will enable some optimizations, even before we implement more of the use-case +isolation milestones. + +> [!NOTE] +> Some of these optimizations could be achieved by restructuring/reworking the +> existing abstractions, but we get them "for free" with the distributed +> TimestampOracle. + +### Removal of `confirm_leadership()` for linearized reads + +The distributed TimestampOracle is self-contained and relies on CRDB for +linearizability. We therefore don't need the `confirm_leadership()` operations +anymore. These operations were a) "costly", and b) were blocking the +coordinator main loop. Removing both of these will improve both latency and +throughput for user queries because the coordinator is now free to do other +work more often. + +### Moving TimestampOracle operations off the Coordinator main loop + +Because the distributed TimestampOracle is easily shareable, we can move the +`get_read_ts()` operations (and possibly other operations as well) off the +coordinator main loop, again chipping away at things that block said main loop. +Which will again improve latency and throughput. + +## Implementation + +We will not go into implementation details but instead list all sub-tasks such +that someone sufficiently familiar with the involved parts of the code base +will be able to understand the required work. + +1. Extract an object-safe TimestampOracle trait: During rollout we will keep + both the legacy TimestampOracle and the new distributed TimestampOracle and + have a LaunchDarkly (LD) flag for switching between them. We therefore need + to abstract the concrete implementation away from the code that uses + oracles. +2. Extract an `mz_postgres_client` from the persist postgres Consensus impl: We + will re-use the code that persist already has for interacting with + Postgres/CRDB using a thread pool. +3. Implement new distributed TimestampOracle backed by CRDB: This uses the + previously extracted `mz_postgres_client` and implements the previously + extracted TimestampOracle trait. +4. Add optimization for batching `get_read_ts()` calls for pending peeks: We + can only do this optimization when the new oracle is used, because the + in-memory one is not easily shareable. +5. Add optimization for batching write queries: That is, make group commit + batching more aggressively +6. Add temporary code where on bootup we pick the highest timestamp of all + oracles to initialize the configured oracle: We need this while still in the + rollout phase, such that we can switch between oracle implementations when + needed. More on this in Rollout, below. +7. Add LD machinery for switching between oracle implementations. +8. Expose new metrics (which we added where required, in the above tasks) in + relevant Grafana dashboards. + +Intermission: We eventually flip the switch and enable the new oracle in +production. See Rollout below. + +9. Remove `confirm_leadership()` call. +10. Remove now-unused in-memory TimestampOracle. +11. Various further refactorings that we can do now that _all_ oracle + implementations are (easily) shareable: We will put the oracle behind an + `Arc`. We will move the oracle code out into it's own + crate, if we didn't manage to do that before because of dependency + shenanigans. + +## Observability + +We will get observability for the thread-pool-enabled Postgres client by +relying on `mz_postgres_client`, which we extracted from the persist Consensus +implementation. We do, however, have to add specific metrics for the timestamp +oracle itself such that we can observe: + +- number of timestamp operations, per operation type: this way, we can see + whether batching (both for read and write queries) is working as expected +- latency histogram per operation type +- depending on whether we perform retries inside the TimestampOracle + implementation itself, we also want to add metrics for the number of + failed/succeeded postgres operations + +## Rollout + +We will make the TimestampOracle implementation configurable using LaunchDarkly +and roll the distributed oracle out to first staging and then production. The +existing metrics around query latency plus the new metrics for the oracle +itself will allow us to detect any bad performance impact. Additionally, we +will use the `test/scalability` framework to test performance (latency and +throughput) and compare the distributed TimestampOracle to the in-memory one. +See also Appendix A for benchmark results from our current code/oracle +implementation. + +One detail is that we put code in place such that we can switch back and forth +between the oracle implementations, as a safety precaution. This is the reason +why we need to leave the `confirm_leadership()` operation in place, even when +using the new distributed TimestampOracle. If we did not do that, we could +produce linearization anomalies when switching in either direction. + +Only once we have switched production over to the distributed TimestampOracle +and once we are certain that we do not have performance regressions will we +remove the in-memory TimestampOracle and apply the last refactorings and code +cleanups that this enables. + +## Alternatives + +### Keep the current TimestampOracle in the singleton Coordinator process + +We could keep relying on Stash for storage and on the singleton Coordinator +process fencing out other processes. We would also have to keep doing a +`confirm_leadership()` at some point before returning query results. + +The distributed serving-layer processes would do timestamp operations via RPC to +the Coordinator. + +### Add a separate (highly available) TimestampOracle service + +We would essentially pull the current in-memory/backed-by-stash implementation +out into it's own service that clients talk to via (g)RPC. However, we can +either accept that it is not highly available or we would have to do work that +is similar to the current `confirm_leadership()` checks to ensure that operations +return a linearizable timestamp. Operating a distributed, highly available +service is hard, and by using CRDB (our main proposal) we delegate the hard work +to that. Plus, we don't want to be operating yet more services as part of a +customer deployment. + +## Open questions + +### What SQL type to use for the read/write timestamp? + +- `bigint`, so a rust `i64`: This goes until `+9223372036854775807`, which is + roughly `AD 292278994`. +- `timestamp`, which goes until `AD 294276`. This would give us microsecond + resolution so we could squeeze in more than 1000 timestamps per second (which + is what we currently get), if we ever wanted to. + +When using a `timestamp` datatype, we would use `interval` types for our +arithmetic, and we could even use `now()` instead of passing in the wall-clock +time, though that latter part is not needed: + +```sql +UPDATE + timestamp_oracle SET write_ts = GREATEST(write_ts + '1 millisecond', now()::timestamp) + WHERE timeline = $timeline + RETURNING write_ts; +``` + +Preliminary tests on a M1 Max Macbook with an in-memory CRDB deployment in +Docker show no discernible difference for the different data types, which makes +sense because their execution plans are largely the same and query execution +time is dominated by reading from and writing to the underlying KV storage. + +### A separate TimestampOracle for syncing Catalog state? + +In the future, multiple serving-layer processes listen to differential changes +to Catalog state and we have to have a method for knowing if we are up to date. +We can either use a `ReadHandle::fetch_recent_upper()` that guarantees +linearizability, _or_ we could use a TimestampOracle. + +If we use a TimestampOracle, we could either use a completely separate oracle +for Catalog state or we could re-use the TimestampOracle that we use for the +real-time timeline but add another time dimension to it. For example, `read_ts` +would become `(table_read_ts, catalog_read_ts)`. + +The benefit of the multi-dimensional timestamps approach is that we need fewer +oracle operations and that the timestamps that are mostly used together are +stored in one CRDB row. However, it does not provide good separation/doesn't +seem a good abstraction. Additionally, we would have to make that call now or +add the second dimension later, with a migration. The risk with adding this now +is that we might find out later that we added the wrong thing. + +Storing Catalog timestamps in a separate oracle seems like the right +abstraction but we have to make more oracle operations. The good thing is that +timestamp operations can be pipelined/we can make requests to both oracles +concurrently. It _does_ mean, however, that a single read now needs two oracle +operations: one for getting the Catalog `read_ts` and one for getting the +real-time `read_ts`. + +I'm proposing that we store the Catalog timestamp in a separate +TimestampOracle, if we decide to use oracles at all. + +## Appendix A - Benchmark Results + +Here we have a comparison between the in-memory TimestampOracle and the new +distributed TimestampOracle. For both of these, the `test/scalability` +benchmarks were executed against my staging environment. For the distributed +TimestampOracle, I have a PoC-level branch that implements the oracle and all +the required/enabled optimizations mentioned above, which I manually deployed +to my environment. + +For both peeks and inserts, performance (throughput and latency) is better when +using the distributed TimestampOracle because we block the coordinator loop +less _and_ because the oracle operations are cheaper than the +`confirm_leadership()` operation, which we are not doing anymore with that +oracle. + +For updates, performance is worse with the distributed oracle, because +`apply_write()` is now a CRDB operation where previously it was an in-memory +operation. Updates are a particularly bad case because all updates are +serialized by the (process) global write lock, so we pay the full CRDB overhead +_per_ update query and cannot amortize the costs. + +### Peek/Select + +![peek-benchmark](./static/distributed_ts_oracle/peek-benchmark.png) + +### Insert + +![insert-benchmark](./static/distributed_ts_oracle/insert-benchmark.png) + +### Update + +![update-benchmark](./static/distributed_ts_oracle/update-benchmark.png) + diff --git a/doc/developer/design/static/distributed_ts_oracle/insert-benchmark.png b/doc/developer/design/static/distributed_ts_oracle/insert-benchmark.png new file mode 100644 index 0000000000000..462e694d23b00 Binary files /dev/null and b/doc/developer/design/static/distributed_ts_oracle/insert-benchmark.png differ diff --git a/doc/developer/design/static/distributed_ts_oracle/peek-benchmark.png b/doc/developer/design/static/distributed_ts_oracle/peek-benchmark.png new file mode 100644 index 0000000000000..825306902f0e1 Binary files /dev/null and b/doc/developer/design/static/distributed_ts_oracle/peek-benchmark.png differ diff --git a/doc/developer/design/static/distributed_ts_oracle/update-benchmark.png b/doc/developer/design/static/distributed_ts_oracle/update-benchmark.png new file mode 100644 index 0000000000000..982b3c4f99bd7 Binary files /dev/null and b/doc/developer/design/static/distributed_ts_oracle/update-benchmark.png differ