diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java index 5edd5b3e8c92..3407189fcf6a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java @@ -283,4 +283,11 @@ public int getTimeout() { * @throws IllegalStateException if this service's state isn't FAILED. */ Throwable failureCause(); + + /** + * Hook invoked before persisting replication offsets. Eg: Buffered endpoints can flush/close WALs + * here. + */ + default void beforePersistingReplicationOffset() throws IOException { + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 4e122ef5e8b9..6d60e4272ff5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -866,4 +866,24 @@ public long getTotalReplicatedEdits() { long getSleepForRetries() { return sleepForRetries; } + + void restartShipper(String walGroupId, ReplicationSourceShipper oldWorker) { + workerThreads.compute(walGroupId, (key, current) -> { + if (current != oldWorker) { + return current; // already replaced + } + + LOG.warn("Restarting shipper for walGroupId={}", walGroupId); + + try { + ReplicationSourceShipper newWorker = createNewShipper(walGroupId); + startShipper(newWorker); + return newWorker; + } catch (Exception e) { + LOG.error("Failed to restart shipper for walGroupId={}", walGroupId, e); + return current; // retry later + } + }); + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index d05e4fed045b..be36814b893e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -74,6 +75,19 @@ public enum WorkerState { private final int DEFAULT_TIMEOUT = 20000; private final int getEntriesTimeout; private final int shipEditsTimeout; + private long accumulatedSizeSinceLastUpdate = 0L; + private long lastOffsetUpdateTime = EnvironmentEdgeManager.currentTime(); + private long offsetUpdateIntervalMs; + private long offsetUpdateSizeThresholdBytes; + private WALEntryBatch lastShippedBatch; + private final List entriesForCleanUpHFileRefs = new ArrayList<>(); + + private static final String OFFSET_UPDATE_INTERVAL_MS_KEY = + "hbase.replication.shipper.offset.update.interval.ms"; + private static final String OFFSET_UPDATE_SIZE_THRESHOLD_KEY = + "hbase.replication.shipper.offset.update.size.threshold"; + private static final long DEFAULT_OFFSET_UPDATE_INTERVAL_MS = Long.MAX_VALUE; + private static final long DEFAULT_OFFSET_UPDATE_SIZE_THRESHOLD = -1L; public ReplicationSourceShipper(Configuration conf, String walGroupId, ReplicationSource source, ReplicationSourceWALReader walReader) { @@ -90,6 +104,10 @@ public ReplicationSourceShipper(Configuration conf, String walGroupId, Replicati this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT); this.shipEditsTimeout = this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT, HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT); + this.offsetUpdateIntervalMs = + conf.getLong(OFFSET_UPDATE_INTERVAL_MS_KEY, DEFAULT_OFFSET_UPDATE_INTERVAL_MS); + this.offsetUpdateSizeThresholdBytes = + conf.getLong(OFFSET_UPDATE_SIZE_THRESHOLD_KEY, DEFAULT_OFFSET_UPDATE_SIZE_THRESHOLD); } @Override @@ -106,9 +124,27 @@ public final void run() { continue; } try { - WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout); + // check time-based offset persistence + if (shouldPersistLogPosition()) { + // Trigger offset persistence via existing retry/backoff mechanism in shipEdits() + WALEntryBatch emptyBatch = createEmptyBatchForTimeBasedFlush(); + if (emptyBatch != null) { + shipEdits(emptyBatch); + } + } + + long pollTimeout = getEntriesTimeout; + if (offsetUpdateIntervalMs != Long.MAX_VALUE) { + long elapsed = EnvironmentEdgeManager.currentTime() - lastOffsetUpdateTime; + long remaining = offsetUpdateIntervalMs - elapsed; + if (remaining > 0) { + pollTimeout = Math.min(getEntriesTimeout, remaining); + } + } + WALEntryBatch entryBatch = entryReader.poll(pollTimeout); LOG.debug("Shipper from source {} got entry batch from reader: {}", source.getQueueId(), entryBatch); + if (entryBatch == null) { continue; } @@ -118,11 +154,24 @@ public final void run() { } else { shipEdits(entryBatch); } - } catch (InterruptedException | ReplicationRuntimeException e) { - // It is interrupted and needs to quit. - LOG.warn("Interrupted while waiting for next replication entry batch", e); + } catch (InterruptedException e) { + // Normal shutdown + if (!isActive()) { + Thread.currentThread().interrupt(); + break; + } + + // Unexpected interruption → restart + abortAndRestart(e); + break; + + } catch (ReplicationRuntimeException e) { + // Already handled upstream (or future safety) + LOG.warn("Shipper encountered fatal error", e); Thread.currentThread().interrupt(); + break; } + } // If the worker exits run loop without finishing its task, mark it as stopped. if (!isFinished()) { @@ -133,6 +182,16 @@ public final void run() { } } + private WALEntryBatch createEmptyBatchForTimeBasedFlush() { + // Reuse last shipped WAL position with 0 entries + if (lastShippedBatch == null) { + return null; + } + WALEntryBatch batch = new WALEntryBatch(0, lastShippedBatch.getLastWalPath()); + batch.setLastWalPosition(lastShippedBatch.getLastWalPosition()); + return batch; + } + private void noMoreData() { if (source.isRecovered()) { LOG.debug("Finished recovering queue for group {} of peer {}", walGroupId, @@ -154,15 +213,18 @@ protected void postFinish() { private void shipEdits(WALEntryBatch entryBatch) { List entries = entryBatch.getWalEntries(); int sleepMultiplier = 0; - if (entries.isEmpty()) { - updateLogPosition(entryBatch); - return; - } int currentSize = (int) entryBatch.getHeapSize(); - source.getSourceMetrics() - .setTimeStampNextToReplicate(entries.get(entries.size() - 1).getKey().getWriteTime()); + MetricsSource metrics = source.getSourceMetrics(); + if (metrics != null && !entries.isEmpty()) { + metrics.setTimeStampNextToReplicate(entries.get(entries.size() - 1).getKey().getWriteTime()); + } while (isActive()) { try { + if (entries.isEmpty()) { + lastShippedBatch = entryBatch; + persistLogPosition(); + return; + } try { source.tryThrottle(currentSize); } catch (InterruptedException e) { @@ -190,13 +252,13 @@ private void shipEdits(WALEntryBatch entryBatch) { } else { sleepMultiplier = Math.max(sleepMultiplier - 1, 0); } - // Clean up hfile references - for (Entry entry : entries) { - cleanUpHFileRefs(entry.getEdit()); - LOG.trace("shipped entry {}: ", entry); + + accumulatedSizeSinceLastUpdate += currentSize; + entriesForCleanUpHFileRefs.addAll(entries); + lastShippedBatch = entryBatch; + if (shouldPersistLogPosition()) { + persistLogPosition(); } - // Log and clean up WAL logs - updateLogPosition(entryBatch); // offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size) // this sizeExcludeBulkLoad has to use same calculation that when calling @@ -215,6 +277,12 @@ private void shipEdits(WALEntryBatch entryBatch) { entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000); } break; + } catch (IOException ioe) { + // Offset-Persist failure is treated as fatal to this shipper since it might come from + // beforePersistingReplicationOffset. So abort and restart the Shipper, and WAL reading + // will resume from the last successfully persisted offset + abortAndRestart(ioe); + return; } catch (Exception ex) { source.getSourceMetrics().incrementFailedBatches(); LOG.warn("{} threw unknown exception:", @@ -229,6 +297,41 @@ private void shipEdits(WALEntryBatch entryBatch) { } } + private boolean shouldPersistLogPosition() { + if (accumulatedSizeSinceLastUpdate == 0 || lastShippedBatch == null) { + return false; + } + + // Default behaviour to update offset immediately after replicate() + if (offsetUpdateSizeThresholdBytes == -1 && offsetUpdateIntervalMs == Long.MAX_VALUE) { + return true; + } + + return (accumulatedSizeSinceLastUpdate >= offsetUpdateSizeThresholdBytes) + || (EnvironmentEdgeManager.currentTime() - lastOffsetUpdateTime >= offsetUpdateIntervalMs); + } + + private void persistLogPosition() throws IOException { + if (lastShippedBatch == null) { + return; + } + + ReplicationEndpoint endpoint = source.getReplicationEndpoint(); + endpoint.beforePersistingReplicationOffset(); + + // Clean up hfile references + for (Entry entry : entriesForCleanUpHFileRefs) { + cleanUpHFileRefs(entry.getEdit()); + } + entriesForCleanUpHFileRefs.clear(); + + accumulatedSizeSinceLastUpdate = 0; + lastOffsetUpdateTime = EnvironmentEdgeManager.currentTime(); + + // Log and clean up WAL logs + updateLogPosition(lastShippedBatch); + } + private void cleanUpHFileRefs(WALEdit edit) throws IOException { String peerId = source.getPeerId(); if (peerId.contains("-")) { @@ -359,4 +462,13 @@ void clearWALEntryBatch() { long getSleepForRetries() { return sleepForRetries; } + + // Restart from last persisted offset + void abortAndRestart(Throwable cause) { + LOG.warn("Shipper for walGroupId={} aborting due to fatal error, will restart", walGroupId, + cause); + // Ask source to replace this worker + source.restartShipper(walGroupId, this); + Thread.currentThread().interrupt(); + } }