Skip to content

Commit

Permalink
Enable hot standby dispatch
Browse files Browse the repository at this point in the history
This is the initial commit to support hot standby dispatch in GPDB. In this
commit, hot standby dispatch is enabled when the hot_standby GUC is set to ON,
and the standby coordinator can be connected and run queries on. Basic query
dispatching and error handling cases are covered, please see the
isolation2/hot_standby tests for those cases.

Current limitations that will be addressed in coming works:
* No read-committed isolation from global transaction, so e.g. a SELECT on
  standby QD could see partial INSERT results on the primary QD.
* No repeatable-read isolation, so e.g., a UDF that runs multiple SELECTs on the
  standby QD could see different results from the SELECTs even they are the same.
* No transaction block BEGIN ... END, and as a result, no cursor support or
  other things that depend on BEGIN...END.
* Query conflict between primary and standby has not been tested yet. This will
  be done with/after the isolation work.

Co-authored-by: Soumyadeep Chakraborty <[email protected]>
Co-authored-by: Jimmy Yih <[email protected]>
  • Loading branch information
3 people committed Jan 8, 2024
1 parent 9c5fee9 commit 35e95b9
Show file tree
Hide file tree
Showing 22 changed files with 822 additions and 20 deletions.
2 changes: 2 additions & 0 deletions src/backend/access/transam/xlog.c
Original file line number Diff line number Diff line change
Expand Up @@ -8154,6 +8154,8 @@ StartupXLOG(void)
*/
InRecovery = false;

SIMPLE_FAULT_INJECTOR("out_of_recovery_in_startupxlog");

/*
* If we are a standby with contentid -1 and undergoing promotion,
* update ourselves as the new coordinator in catalog. This does not
Expand Down
4 changes: 4 additions & 0 deletions src/backend/cdb/cdbfts.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ FtsNotifyProber(void)
int32 started;
int32 done;

/* Ignore if we don't have a FTS probe process, like a standby QD in a mirrored cluster. */
if (FtsProbePID() == 0)
return;

if (am_ftsprobe)
return;

Expand Down
1 change: 1 addition & 0 deletions src/backend/cdb/cdbtm.c
Original file line number Diff line number Diff line change
Expand Up @@ -2130,6 +2130,7 @@ performDtxProtocolCommitPrepared(const char *gid, bool raiseErrorIfNotFound)
sendWaitGxidsToQD(waitGxids);

finishDistributedTransactionContext("performDtxProtocolCommitPrepared -- Commit Prepared", false);
SIMPLE_FAULT_INJECTOR("finish_commit_prepared");
}

/**
Expand Down
34 changes: 27 additions & 7 deletions src/backend/cdb/cdbutil.c
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ getCdbComponentInfo(void)
{
cdbInfo = &component_databases->segment_db_info[i];

if (cdbInfo->config->role != GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY)
if (!IS_HOT_STANDBY_QD() && cdbInfo->config->role != GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY)
continue;

hsEntry = (HostPrimaryCountEntry *) hash_search(hostPrimaryCountHash, cdbInfo->config->hostname, HASH_FIND, &found);
Expand All @@ -555,7 +555,7 @@ getCdbComponentInfo(void)
{
cdbInfo = &component_databases->entry_db_info[i];

if (cdbInfo->config->role != GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY)
if (!IS_HOT_STANDBY_QD() && cdbInfo->config->role != GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY)
continue;

hsEntry = (HostPrimaryCountEntry *) hash_search(hostPrimaryCountHash, cdbInfo->config->hostname, HASH_FIND, &found);
Expand Down Expand Up @@ -1011,7 +1011,16 @@ cdbcomponent_getComponentInfo(int contentId)
/* entry db */
if (contentId == -1)
{
cdbInfo = &cdbs->entry_db_info[0];
Assert(cdbs->total_entry_dbs == 1 || cdbs->total_entry_dbs == 2);
/*
* For a standby QD, get the last entry db which can be the first (on
* a replica cluster) or the second (on a mirrored cluster) entry.
*/
if (IS_HOT_STANDBY_QD())
cdbInfo = &cdbs->entry_db_info[cdbs->total_entry_dbs - 1];
else
cdbInfo = &cdbs->entry_db_info[0];

return cdbInfo;
}

Expand All @@ -1028,10 +1037,10 @@ cdbcomponent_getComponentInfo(int contentId)
Assert(cdbs->total_segment_dbs == cdbs->total_segments * 2);
cdbInfo = &cdbs->segment_db_info[2 * contentId];

if (!SEGMENT_IS_ACTIVE_PRIMARY(cdbInfo))
{
/* use the other segment if it is not what the QD wants */
if ((IS_HOT_STANDBY_QD() && SEGMENT_IS_ACTIVE_PRIMARY(cdbInfo))
|| (!IS_HOT_STANDBY_QD() && !SEGMENT_IS_ACTIVE_PRIMARY(cdbInfo)))
cdbInfo = &cdbs->segment_db_info[2 * contentId + 1];
}

return cdbInfo;
}
Expand Down Expand Up @@ -1124,10 +1133,21 @@ cdb_setup(void)
*
* Ignore background worker because bgworker_should_start_mpp() already did
* the check.
*
* Ignore if we are the standby coordinator started in hot standby mode.
* We don't expect dtx recovery to have finished, as dtx recovery is
* performed at the end of startup. In hot standby, we are recovering
* continuously and should allow queries much earlier. Since a hot standby
* won't proceed dtx, it is not required to wait for recovery of the dtx
* that has been prepared but not committed (i.e. to commit them); on the
* other hand, the recovery of any in-doubt transactions (i.e. not prepared)
* won't bother a hot standby either, just like they can be recovered in the
* background when a primary instance is running.
*/
if (!IsBackgroundWorker &&
Gp_role == GP_ROLE_DISPATCH &&
!*shmDtmStarted)
!*shmDtmStarted &&
!IS_HOT_STANDBY_QD())
{
ereport(FATAL,
(errcode(ERRCODE_CANNOT_CONNECT_NOW),
Expand Down
6 changes: 6 additions & 0 deletions src/backend/cdb/dispatcher/cdbdisp_query.c
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,7 @@ buildGpQueryString(DispatchCommandQueryParms *pQueryParms,
{
const char *command = pQueryParms->strCommand;
int command_len;
int is_hs_dispatch = IS_HOT_STANDBY_QD() ? 1 : 0;
const char *plantree = pQueryParms->serializedPlantree;
int plantree_len = pQueryParms->serializedPlantreelen;
const char *sddesc = pQueryParms->serializedQueryDispatchDesc;
Expand Down Expand Up @@ -912,6 +913,7 @@ buildGpQueryString(DispatchCommandQueryParms *pQueryParms,
sizeof(outerUserId) /* outerUserIsSuper */ +
sizeof(currentUserId) +
sizeof(n32) * 2 /* currentStatementStartTimestamp */ +
sizeof(is_hs_dispatch) +
sizeof(command_len) +
sizeof(plantree_len) +
sizeof(sddesc_len) +
Expand Down Expand Up @@ -967,6 +969,10 @@ buildGpQueryString(DispatchCommandQueryParms *pQueryParms,
memcpy(pos, &n32, sizeof(n32));
pos += sizeof(n32);

tmp = htonl(is_hs_dispatch);
memcpy(pos, &tmp, sizeof(is_hs_dispatch));
pos += sizeof(is_hs_dispatch);

tmp = htonl(command_len);
memcpy(pos, &tmp, sizeof(command_len));
pos += sizeof(command_len);
Expand Down
3 changes: 1 addition & 2 deletions src/backend/cdb/dispatcher/cdbgang.c
Original file line number Diff line number Diff line change
Expand Up @@ -696,8 +696,7 @@ getCdbProcessesForQD(int isPrimary)

qdinfo = cdbcomponent_getComponentInfo(COORDINATOR_CONTENT_ID);

Assert(qdinfo->config->segindex == -1);
Assert(SEGMENT_IS_ACTIVE_PRIMARY(qdinfo));
Assert((qdinfo->config->segindex == -1 && SEGMENT_IS_ACTIVE_PRIMARY(qdinfo)) || IS_HOT_STANDBY_QD());
Assert(qdinfo->config->hostip != NULL);

proc = makeNode(CdbProcess);
Expand Down
4 changes: 3 additions & 1 deletion src/backend/cdb/dispatcher/test/cdbdisp_query_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,15 @@ test__CdbDispatchPlan_may_be_interrupted(void **state)

queryDesc->estate = CreateExecutorState();

/* will be called multiple times in e.g. FtsNotifyProber/getCdbComponentInfo */
will_return_count(RecoveryInProgress, false, -1);

/* cdbcomponent_getCdbComponents() mocks */
will_be_called(FtsNotifyProber);
will_return(getFtsVersion, 1);
will_return(GetGpExpandVersion, 1);

/* StartTransactionCommand() mocks */
will_return(RecoveryInProgress, false);
will_be_called(__wrap_VirtualXactLockTableInsert);
will_be_called(__wrap_AcceptInvalidationMessages);
will_be_called(initialize_wal_bytes_written);
Expand Down
2 changes: 1 addition & 1 deletion src/backend/fts/fts.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ sigIntHandler(SIGNAL_ARGS)
pid_t
FtsProbePID(void)
{
return *shmFtsProbePID;
return shmFtsProbePID ? *shmFtsProbePID : 0;
}

bool
Expand Down
10 changes: 1 addition & 9 deletions src/backend/storage/lmgr/proc.c
Original file line number Diff line number Diff line change
Expand Up @@ -344,17 +344,9 @@ InitProcess(void)
* WAL sender, etc are marked as GP_ROLE_UTILITY to prevent unwanted
* GP_ROLE_DISPATCH MyProc settings such as mppSessionId being valid and
* mppIsWriter set to true.
*
* RecoveryInProgress() to see if we are in hot standby, because
* HotStandbyActive() is still true after promotion.
*/
if (am_walsender || am_ftshandler || am_faulthandler ||
(GpIdentity.segindex == -1 && RecoveryInProgress()))
{
if (am_walsender || am_ftshandler || am_faulthandler)
Gp_role = GP_ROLE_UTILITY;
if (GpIdentity.segindex == -1 && RecoveryInProgress())
elog(WARNING, "Force to run in utility mode in hot standby");
}

/*
* ProcGlobal should be set up already (if we are a backend, we inherit
Expand Down
15 changes: 15 additions & 0 deletions src/backend/tcop/postgres.c
Original file line number Diff line number Diff line change
Expand Up @@ -5408,6 +5408,7 @@ PostgresMain(int argc, char *argv[],
const char *serializedQueryDispatchDesc = NULL;
const char *resgroupInfoBuf = NULL;

int is_hs_dispatch;
int query_string_len = 0;
int serializedDtxContextInfolen = 0;
int serializedPlantreelen = 0;
Expand Down Expand Up @@ -5444,6 +5445,20 @@ PostgresMain(int argc, char *argv[],
cuid = pq_getmsgint(&input_message, 4);

statementStart = pq_getmsgint64(&input_message);

/* check if the message is from standby QD and is expected */
is_hs_dispatch = pq_getmsgint(&input_message, 4);
if (is_hs_dispatch == 0 && IS_STANDBY_QE())
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("mirror segments can only process MPP protocol messages from standby QD"),
errhint("Exit the current session and re-connect.")));
else if (is_hs_dispatch != 0 && !IS_STANDBY_QE())
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("primary segments can only process MPP protocol messages from primary QD"),
errhint("Exit the current session and re-connect.")));

query_string_len = pq_getmsgint(&input_message, 4);
serializedPlantreelen = pq_getmsgint(&input_message, 4);
serializedQueryDispatchDesclen = pq_getmsgint(&input_message, 4);
Expand Down
2 changes: 2 additions & 0 deletions src/include/access/xlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#ifndef XLOG_H
#define XLOG_H

#include "postgres.h" /* for Datum */

#include "access/rmgr.h"
#include "access/xlogdefs.h"
#include "access/xloginsert.h"
Expand Down
3 changes: 3 additions & 0 deletions src/include/cdb/cdbvars.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#ifndef CDBVARS_H
#define CDBVARS_H

#include "access/xlog.h" /*RecoveryInProgress*/
#include "access/xlogdefs.h" /*XLogRecPtr*/
#include "catalog/gp_segment_configuration.h" /* COORDINATOR_CONTENT_ID */

Expand Down Expand Up @@ -749,8 +750,10 @@ extern GpId GpIdentity;

#define UNINITIALIZED_GP_IDENTITY_VALUE (-10000)
#define IS_QUERY_DISPATCHER() (GpIdentity.segindex == COORDINATOR_CONTENT_ID)
#define IS_HOT_STANDBY_QD() (EnableHotStandby && IS_QUERY_DISPATCHER() && RecoveryInProgress())

#define IS_QUERY_EXECUTOR_BACKEND() (Gp_role == GP_ROLE_EXECUTE && gp_session_id > 0)
#define IS_STANDBY_QE() (EnableHotStandby && IS_QUERY_EXECUTOR_BACKEND() && RecoveryInProgress())

/* Stores the listener port that this process uses to listen for incoming
* Interconnect connections from other Motion nodes.
Expand Down
3 changes: 3 additions & 0 deletions src/test/isolation2/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ installcheck-parallel-retrieve-cursor: install
installcheck-mirrorless: install
$(pg_isolation2_regress_installcheck) $(EXTRA_REGRESS_OPTS) --init-file=$(top_builddir)/src/test/regress/init_file --init-file=./init_file_isolation2 --schedule=$(srcdir)/mirrorless_schedule --dbname=isolation2-mirrorless

installcheck-hot-standby: install
$(pg_isolation2_regress_installcheck) $(EXTRA_REGRESS_OPTS) --init-file=$(top_builddir)/src/test/regress/init_file --init-file=./init_file_isolation2 --schedule=$(srcdir)/hot_standby_schedule --dbname=isolation2-hot-standby

installcheck-ic-tcp: install
ifeq ($(findstring gp_interconnect_type=tcp,$(PGOPTIONS)),gp_interconnect_type=tcp)
$(pg_isolation2_regress_installcheck) $(EXTRA_REGRESS_OPTS) --init-file=$(top_builddir)/src/test/regress/init_file --init-file=./init_file_isolation2 --bindir='$(bindir)' --inputdir=$(srcdir) --schedule=$(srcdir)/isolation2_ic_tcp_schedule
Expand Down
Loading

0 comments on commit 35e95b9

Please sign in to comment.