Skip to content

Latest commit

 

History

History
279 lines (229 loc) · 11.9 KB

safekeeper-protocol.md

File metadata and controls

279 lines (229 loc) · 11.9 KB

WAL proposer-safekeeper communication consensus protocol.

General requirements and architecture

There is single stateless master and several safekeepers. Number of safekeepers is determined by redundancy level. To minimize number of changes in Postgres core, we are using standard streaming replication from master (through WAL sender). This replication stream is initiated by the WAL proposer process that runs in the PostgreSQL server, which broadcasts the WAL generated by PostgreSQL to safekeepers. To provide durability we use synchronous replication at master (response to the commit statement is sent to the client only when acknowledged by WAL receiver). WAL proposer sends this acknowledgment only when LSN of commit record is confirmed by quorum of safekeepers.

WAL proposer tries to establish connections with safekeepers. At any moment of time each safekeeper can serve exactly once proposer, but it can accept new connections.

Any of safekeepers can be used as WAL server, producing replication stream. So both Pagers and Replicas (read-only computation nodes) can connect to safekeeper to receive WAL stream. Safekeepers is streaming WAL until it reaches min(commitLSN,flushLSN). Then replication is suspended until new data arrives from master.

Handshake

The goal of handshake is to collect quorum (to be able to perform recovery) and avoid split-brains caused by simultaneous presence of old and new master. Procedure of handshake consists of the following steps:

  1. Broadcast information about server to all safekeepers (wal segment size, system_id,...)
  2. Receive responses with information about safekeepers.
  3. Once quorum of handshake responses are received, propose new NodeId(max(term)+1, server.uuid) to all of them.
  4. On receiving proposed nodeId, safekeeper compares it with locally stored nodeId and if it is greater or equals then accepts proposed nodeId and persists this choice in the local control file.
  5. If quorum of safekeepers approve proposed nodeId, then server assumes that handshake is successfully completed and switch to recovery stage.

Recovery

Proposer computes max(restartLSN) and max(flushLSN) from quorum of attached safekeepers. RestartLSN - is position in WAL which is known to be delivered to all safekeepers. In other words: restartLSN can be also considered as cut-off horizon (all preceding WAL segments can be removed). FlushLSN is position flushed by safekeeper to the local persistent storage.

If max(restartLSN) != max(flushLSN), then recovery has to be performed. Proposer creates replication channel with most advanced safekeeper (safekeeper with the largest flushLSN). Then it downloads all WAL messages between max(restartLSN)..max(flushLSN). Messages are inserted in L1-list (ordered by LSN). Then we locate position of each safekeeper in this list according to their flushLSNs. Safekeepers that are not yet connected (out of quorum) should start from the beginning of the list (corresponding to restartLSN).

We need to choose max(flushLSN) because voting quorum may be different from quorum committed the last message. So we do not know whether records with max(flushLSN) was committed by quorum or not. So we have to consider it committed to avoid loose of committed data.

Calculated max(flushLSN) is called VCL (Volume Complete LSN). As far as it is chosen among quorum, there may be some other offline safekeeper with larger VCL. Once it becomes online, we need to overwrite its WAL beyond VCL. To support it, each safekeeper maintains epoch number. Epoch plays almost the same role as term, but algorithm of epoch bumping is different. VCL and new epoch are received by safekeeper from proposer during voting. But safekeeper doesn't switch to new epoch immediately after voting. Instead of it, safekeepers waits record with LSN > Max(flushLSN,VCL) is received. It means that we restore all records from old generation and switch to new generation. When proposer calculates max(FlushLSN), it first compares Epoch. So actually we compare (Epoch,FlushLSN) pairs.

Let's looks at the examples. Consider that we have three safekeepers: S1, S2, S3. Si(N) means that i-th safekeeper has epoch=N. Ri(x) - WAL record for resource X with LSN=i. Assume that we have the following state:

S1(1): R1(a)
S2(1): R1(a),R2(b)
S3(1): R1(a),R2(b),R3(c),R4(d)  - offline

Proposer choose quorum (S1,S2). VCL for them is 2. We download S2 to proposer and schedule its write to S1. After receiving record R5 the picture can be:

S1(2): R1(a),R2(b),R3(e)
S2(2): R1(a),R2(b),R3(e)
S3(1): R1(a),R2(b),R3(c),R4(d)  - offline

Now if server is crashed or restarted, we perform new voting and doesn't matter which quorum we choose: (S1,S2), (S2,S3)... in any case VCL=3, because S3 has smaller epoch. R3(c) will be overwritten with R3(e):

S1(3): R1(a),R2(b),R3(e)
S2(3): R1(a),R2(b),R3(e)
S3(1): R1(a),R2(b),R3(e),R4(d)

Epoch of S3 will be adjusted once it overwrites R4:

S1(3): R1(a),R2(b),R3(e),R4(f)
S2(3): R1(a),R2(b),R3(e),R4(f)
S3(3): R1(a),R2(b),R3(e),R4(f)

Crash can happen before epoch was bumped. Let's return back to the initial position:

S1(1): R1(a)
S2(1): R1(a),R2(b)
S3(1): R1(a),R2(b),R3(c),R4(d)  - offline

Assume that we start recovery:

S1(1): R1(a),R2(b)
S2(1): R1(a),R2(b)
S3(1): R1(a),R2(b),R3(c),R4(d)  - offline

and then crash happens. During voting we choose quorum (S3,S3). Now them belong to the same epoch and S3 is most advanced among them. So VCL is set to 4 and we recover S1 and S2 from S3:

S1(1): R1(a),R2(b),R3(c),R4(d)
S2(1): R1(a),R2(b),R3(c),R4(d)
S3(1): R1(a),R2(b),R3(c),R4(d)

Main loop

Once recovery is completed, proposer switches to normal processing loop: it receives WAL stream from Postgres and appends WAL messages to the list. At the same time it tries to push messages to safekeepers. Each safekeeper is associated with some element in message list and once it acknowledged receiving of the message, position is moved forward. Each queue element contains acknowledgment mask, which bits corresponds to safekeepers. Once all safekeepers acknowledged receiving of this message (by setting correspondent bit), then element can be removed from queue and restartLSN is advanced forward.

Proposer maintains restartLSN and commitLSN based on the responses received by safekeepers. RestartLSN equals to the LSN of head message in the list. CommitLSN is flushLSN[nSafekeepers-Quorum] element in ordered array with flushLSNs of safekeepers. CommitLSN and RestartLSN are included in requests sent from proposer to safekeepers and stored in safekeepers control file. To avoid overhead of extra fsync, this control file is not fsynced on each request. Flushing this file is performed periodically, which means that restartLSN/commitLSN stored by safekeeper may be slightly deteriorated. It is not critical because may only cause redundant processing of some WAL record. And FlushLSN is recalculated after node restart by scanning local WAL files.

Fault tolerance

If the WAL proposer process looses connection to safekeeper it tries to reestablish this connection using the same nodeId.

Restart of PostgreSQL initiates new round of voting and switching new epoch.

Limitations

Right now message queue is maintained in main memory and is not spilled to the disk. It can cause memory overflow in case of presence of lagging safekeepers. It is assumed that in case of losing local data by some safekeepers, it should be recovered using some external mechanism.

Glossary

  • CommitLSN: position in WAL confirmed by quorum safekeepers.
  • RestartLSN: position in WAL confirmed by all safekeepers.
  • FlushLSN: part of WAL persisted to the disk by safekeeper.
  • NodeID: pair (term,UUID)
  • Pager: Neon component restoring pages from WAL stream
  • Replica: read-only computation node
  • VCL: the largest LSN for which we can guarantee availability of all prior records.

Algorithm

process WalProposer(safekeepers,server,curr_epoch,restart_lsn=0,message_queue={},feedbacks={})
    function do_recovery(epoch,restart_lsn,VCL)
        leader = i:safekeepers[i].state.epoch=epoch and safekeepers[i].state.flushLsn=VCL
        wal_stream = safekeepers[leader].start_replication(restart_lsn,VCL)
        do
            message = wal_stream.read()
            message_queue.append(message)
        while message.startPos < VCL

        for i in 1..safekeepers.size()
            for message in message_queue
                if message.endLsn < safekeepers[i].state.flushLsn
                    message.delivered += i
                else
                    send_message(i, message)
                    break
    end function

    function send_message(i,msg)
        msg.restartLsn = restart_lsn
        msg.commitLsn = get_commit_lsn()
        safekeepers[i].send(msg, response_handler)
    end function

    function do_broadcast(message)
        for i in 1..safekeepers.size()
            if not safekeepers[i].sending()
                send_message(i, message)
    end function

    function get_commit_lsn()
        sorted_feedbacks = feedbacks.sort()
        return sorted_feedbacks[safekeepers.size() - quorum]
    end function

    function response_handler(i,message,response)
        feedbacks[i] = if response.epoch=curr_epoch then response.flushLsn else VCL
        server.write(get_commit_lsn())

        message.delivered += i
        next_message = message_queue.next(message)
        if next_message
            send_message(i, next_message)

        while message_queue.head.delivered.size() = safekeepers.size()
            if restart_lsn < message_queue.head.beginLsn
                restart_lsn = message_queue.head.endLsn
            message_queue.pop_head()
    end function

    server_info = server.read()

    safekeepers.write(server_info)
    safekeepers.state = safekeepers.read()
    next_term = max(safekeepers.state.nodeId.term)+1
    restart_lsn = max(safekeepers.state.restartLsn)
    epoch,VCL = max(safekeepers.state.epoch,safekeepers.state.flushLsn)
    curr_epoch = epoch + 1

    proposal = Proposal(NodeId(next_term,server.id),curr_epoch,VCL)
    safekeepers.send(proposal)
    responses = safekeepers.read()
    if any responses.is_rejected()
        exit()

    for i in 1..safekeepers.size()
        feedbacks[i].flushLsn = if epoch=safekeepers[i].state.epoch then safekeepers[i].state.flushLsn else restart_lsn

    if restart_lsn != VCL
        do_recovery(epoch,restart_lsn,VCL)

    wal_stream = server.start_replication(VCL)
    for ever
        message = wal_stream.read()
        message_queue.append(message)
        do_broadcast(message)
end process

process safekeeper(gateway,state)
    function handshake()
        proposer = gateway.accept()
        server_info = proposer.read()
        proposer.write(state)
        proposal = proposer.read()
        if proposal.nodeId < state.nodeId
            proposer.write(rejected)
            return null
        else
            state.nodeId = proposal.nodeId
            state.proposed_epoch = proposal.epoch
            state.VCL = proposal.VCL
            write_control_file(state)
            proposer.write(accepted)
            return proposer
    end function

    state = read_control_file()
    state.flushLsn = locate_end_of_wal()

    for ever
        proposer = handshake()
        if not proposer
            continue
        for ever
            req = proposer.read()
            if req.nodeId != state.nodeId
                break
            save_wal_file(req.data)
            state.restartLsn = req.restartLsn
            if state.epoch < state.proposed_epoch and req.endPos > max(state.flushLsn,state.VCL)
                state.epoch = state.proposed_epoch
            if req.endPos > state.flushLsn
                state.flushLsn = req.endPos
            save_control_file(state)
            resp = Response(state.epoch,req.endPos)
            proposer.write(resp)
            notify_wal_sender(Min(req.commitLsn,req.endPos))
end process