diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/NotificationCheckpointStrategy.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/NotificationCheckpointStrategy.java new file mode 100644 index 000000000000..b91966462ce8 --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/NotificationCheckpointStrategy.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import java.io.IOException; + +/** + * Interface for implementations which load/save the current checkpoint + * transaction log index used by an event poller. + */ +public interface NotificationCheckpointStrategy { + + String load() throws IOException; + + void save(String val) throws IOException; +} diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java index f61b5efa6120..ea6844063d96 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java @@ -35,4 +35,16 @@ public interface OMEventListenerPluginContext { // XXX: this probably doesn't belong here String getThreadNamePrefix(); + + NotificationCheckpointStrategy getNotificationCheckpointStrategy(); + + /** + * Prunes records from completedRequestInfoTable using a hybrid soft and hard retention limit. + * + * @param checkpointKey the current consumer checkpoint (startKey). + * @param softLimit the soft retention keep limit behind the checkpoint. + * @param hardLimit the hard retention cap limit behind the latest transaction in database. + * @throws IOException if any database error occurs. + */ + void pruneCompletedRequestInfo(long checkpointKey, long softLimit, long hardLimit) throws IOException; } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMCompletedRequestPrunedException.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMCompletedRequestPrunedException.java new file mode 100644 index 000000000000..2b1ae7209368 --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMCompletedRequestPrunedException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.exceptions; + +import java.io.IOException; + +/** + * Exception thrown when a requested completed request info record + * is not found because it has already been pruned by the retention policy. + */ +public class OMCompletedRequestPrunedException extends IOException { + + public OMCompletedRequestPrunedException(String message) { + super(message); + } +} diff --git a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java index f42524ba98ad..96513188c731 100644 --- a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java +++ b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java @@ -84,7 +84,8 @@ public void initialize(OzoneConfiguration conf, OMEventListenerPluginContext plu exception.initCause(ex); throw exception; } - this.seekPosition = new OMEventListenerLedgerPollerSeekPosition(); + this.seekPosition = new OMEventListenerLedgerPollerSeekPosition( + pluginContext.getNotificationCheckpointStrategy()); LOG.info("Creating OMEventListenerLedgerPoller with serviceInterval={}," + "serviceTimeout={}, seekPosition={}", diff --git a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPoller.java b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPoller.java index 2ae3a2ff70cc..026abf6a921b 100644 --- a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPoller.java +++ b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPoller.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.utils.BackgroundTask; import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; import org.apache.hadoop.hdds.utils.BackgroundTaskResult; +import org.apache.hadoop.ozone.om.exceptions.OMCompletedRequestPrunedException; import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +50,8 @@ public class OMEventListenerLedgerPoller extends BackgroundService { private final OMEventListenerPluginContext pluginContext; private final OMEventListenerLedgerPollerSeekPosition seekPosition; private final Consumer callback; + private final long softRetentionLimit; + private final long hardRetentionLimit; @SuppressWarnings("checkstyle:ParameterNumber") public OMEventListenerLedgerPoller(long interval, TimeUnit unit, @@ -70,6 +73,10 @@ public OMEventListenerLedgerPoller(long interval, TimeUnit unit, this.pluginContext = pluginContext; this.seekPosition = seekPosition; this.callback = callback; + this.softRetentionLimit = configuration.getLong( + "ozone.om.plugin.kafka.ledger.retention.soft.limit", 100_000L); + this.hardRetentionLimit = configuration.getLong( + "ozone.om.plugin.kafka.ledger.retention.hard.limit", 1_000_000L); } private boolean shouldRun() { @@ -121,14 +128,21 @@ public BackgroundTaskResult call() { } getRunCount().incrementAndGet(); + String startKeyStr = seekPosition.get(); try { - String startKeyStr = seekPosition.get(); Long startKey = StringUtils.isNotBlank(startKeyStr) ? Long.valueOf(startKeyStr) : null; for (OmCompletedRequestInfo requestInfo : pluginContext.listCompletedRequestInfo( startKey, MAX_RESULTS)) { callback.accept(requestInfo); } successRunCount.incrementAndGet(); + + if (startKey != null) { + pluginContext.pruneCompletedRequestInfo(startKey, softRetentionLimit, hardRetentionLimit); + } + } catch (OMCompletedRequestPrunedException e) { + LOG.warn("Consumer checkpoint {} has been hard-pruned. Fast-forwarding and self-healing...", startKeyStr); + seekPosition.set(null); // Clear checkpoint to fast-forward } catch (IOException e) { LOG.error("Error while running completed operation consumer " + "background task. Will retry at next run.", e); diff --git a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPollerSeekPosition.java b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPollerSeekPosition.java index bccbda1e2a7e..2c35dcc82f18 100644 --- a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPollerSeekPosition.java +++ b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPollerSeekPosition.java @@ -24,21 +24,26 @@ /** * This is a helper class to get/set the seek position used by the * OMEventListenerLedgerPoller. - * - * XXX: the seek position should be persisted (and ideally distributed to - * all OMs) but at the moment it only lives in memory */ public class OMEventListenerLedgerPollerSeekPosition { public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerLedgerPollerSeekPosition.class); private final AtomicReference seekPosition; + private final NotificationCheckpointStrategy checkpointStrategy; - public OMEventListenerLedgerPollerSeekPosition() { - this.seekPosition = new AtomicReference(initSeekPosition()); + public OMEventListenerLedgerPollerSeekPosition(NotificationCheckpointStrategy checkpointStrategy) { + this.checkpointStrategy = checkpointStrategy; + this.seekPosition = new AtomicReference<>(initSeekPosition()); } - // TODO: load this from persistent storage public String initSeekPosition() { + try { + if (checkpointStrategy != null) { + return checkpointStrategy.load(); + } + } catch (Exception ex) { + LOG.error("Failed to load initial seek position from checkpoint strategy", ex); + } return null; } @@ -48,10 +53,17 @@ public String get() { public void set(String val) { LOG.debug("Setting seek position {}", val); - // NOTE: this in-memory view of the seek position needs to be kept - // up to date because the OMEventListenerLedgerPoller has a - // reference to it - seekPosition.set(val); + try { + if (checkpointStrategy != null) { + checkpointStrategy.save(val); + } + // NOTE: this in-memory view of the seek position must only be kept + // up to date after we successfully persist it, so that any save + // failures prevent the poller from advancing and running away. + seekPosition.set(val); + } catch (Exception ex) { + LOG.error("Failed to save seek position checkpoint {}. Progress will not be advanced in-memory.", val, ex); + } } @Override diff --git a/hadoop-ozone/ozone-manager-plugins/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerLedgerPoller.java b/hadoop-ozone/ozone-manager-plugins/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerLedgerPoller.java new file mode 100644 index 000000000000..e388277b9953 --- /dev/null +++ b/hadoop-ozone/ozone-manager-plugins/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerLedgerPoller.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.BackgroundTask; +import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.junit.jupiter.api.Test; + +/** + * Tests for {@link OMEventListenerLedgerPoller}. + */ +public class TestOMEventListenerLedgerPoller { + + @Test + public void testPollerPrunesCompletedRequestsCorrectly() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + // Configure soft retention to keep at most 5 records, hard retention 10 records + conf.setLong("ozone.om.plugin.kafka.ledger.retention.soft.limit", 5L); + conf.setLong("ozone.om.plugin.kafka.ledger.retention.hard.limit", 10L); + + OMEventListenerPluginContext pluginContext = mock(OMEventListenerPluginContext.class); + when(pluginContext.isLeaderReady()).thenReturn(true); + when(pluginContext.getThreadNamePrefix()).thenReturn("test-poller-"); + + OMEventListenerLedgerPollerSeekPosition seekPosition = mock(OMEventListenerLedgerPollerSeekPosition.class); + // Seek position is 10 + when(seekPosition.get()).thenReturn("10"); + + @SuppressWarnings("unchecked") + Consumer callback = mock(Consumer.class); + + // List completes returns empty list for simplicity + when(pluginContext.listCompletedRequestInfo(eq(10L), anyInt())) + .thenReturn(Collections.emptyList()); + + OMEventListenerLedgerPoller poller = new OMEventListenerLedgerPoller( + 1000, TimeUnit.MILLISECONDS, 1, 1000, + pluginContext, conf, seekPosition, callback); + + BackgroundTaskQueue queue = poller.getTasks(); + BackgroundTask task = queue.poll(); + + task.call(); + + // Verify task runs and calls listCompletedRequestInfo with current seek position (10) + verify(pluginContext, times(1)).listCompletedRequestInfo(eq(10L), anyInt()); + + // Verify pruning is triggered for startKey = 10, softLimit = 5, hardLimit = 10 + verify(pluginContext, times(1)).pruneCompletedRequestInfo(eq(10L), eq(5L), eq(10L)); + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java index a09cc3a8a2b6..0dce9a221dd3 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java @@ -95,6 +95,7 @@ import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.common.DeletedBlock; import org.apache.hadoop.ozone.om.codec.OMDBDefinition; +import org.apache.hadoop.ozone.om.exceptions.OMCompletedRequestPrunedException; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; import org.apache.hadoop.ozone.om.helpers.BucketLayout; @@ -1380,7 +1381,7 @@ public List listCompletedRequestInfo(final Long startKey // TODO: we should throw a custom exception here (instead of // IOException) that needs to be handled appropriately by // callers - throw new IOException( + throw new OMCompletedRequestPrunedException( "Missing rows - start key not found (startKey=" + startKey + ", foundKey=" + completedRequestInfoRow.getKey() + ")"); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index 93a601f7d08f..7313aba9b9b7 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -971,8 +971,6 @@ private void instantiateServices(boolean withNewSnapshot) throws IOException { prefixManager = new PrefixManagerImpl(this, metadataManager, true); keyManager = new KeyManagerImpl(this, scmClient, configuration, perfMetrics); - eventListenerPluginManager = new OMEventListenerPluginManager(this, - configuration); // If authorizer is not initialized or the authorizer is Native // re-initialize the authorizer, else for non-native authorizer // like ranger we can reuse previous value if it is initialized @@ -996,6 +994,9 @@ public void close() { } }; + eventListenerPluginManager = new OMEventListenerPluginManager(this, + configuration); + // Reload snapshot feature config flag fsSnapshotEnabled = configuration.getBoolean( OMConfigKeys.OZONE_FILESYSTEM_SNAPSHOT_ENABLED_KEY, diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContextImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContextImpl.java index e0a805f0afc7..c3997b86df8f 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContextImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContextImpl.java @@ -19,18 +19,27 @@ import java.io.IOException; import java.util.List; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A narrow set of functionality we are ok with exposing to plugin * implementations. */ public final class OMEventListenerPluginContextImpl implements OMEventListenerPluginContext { + private static final Logger LOG = LoggerFactory.getLogger(OMEventListenerPluginContextImpl.class); + private final OzoneManager ozoneManager; + private final NotificationCheckpointStrategy checkpointStrategy; - public OMEventListenerPluginContextImpl(OzoneManager ozoneManager) { + public OMEventListenerPluginContextImpl(OzoneManager ozoneManager, + NotificationCheckpointStrategy checkpointStrategy) { this.ozoneManager = ozoneManager; + this.checkpointStrategy = checkpointStrategy; } @Override @@ -50,4 +59,40 @@ public List listCompletedRequestInfo(Long startKey, int public String getThreadNamePrefix() { return ozoneManager.getThreadNamePrefix(); } + + @Override + public NotificationCheckpointStrategy getNotificationCheckpointStrategy() { + return checkpointStrategy; + } + + @Override + public void pruneCompletedRequestInfo(long checkpointKey, long softLimit, long hardLimit) throws IOException { + Table table = ozoneManager.getMetadataManager().getCompletedRequestInfoTable(); + try { + long softPruneBeforeKey = checkpointKey - softLimit; + + long latestKey = -1; + try (TableIterator> + iterator = table.iterator()) { + iterator.seekToLast(); + if (iterator.hasNext()) { + latestKey = iterator.next().getKey(); + } + } + + long hardPruneBeforeKey = latestKey - hardLimit; + + long beforeKey = Math.max(softPruneBeforeKey, hardPruneBeforeKey); + if (beforeKey > 0) { + table.deleteRange(0L, beforeKey); + LOG.info("Pruned records from completedRequestInfoTable older than trxLogIndex {} " + + "(checkpointKey={}, softLimit={}, latestKey={}, hardLimit={})", + beforeKey, checkpointKey, softLimit, latestKey, hardLimit); + } + } catch (IOException e) { + throw e; + } catch (Exception e) { + throw new IOException("Failed to prune completedRequestInfoTable range", e); + } + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginManager.java index 79674dc20ff8..6e5b6c992866 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginManager.java @@ -92,7 +92,12 @@ static List loadAll(OzoneManager ozoneManager, OzoneConfigurati } } - OMEventListenerPluginContext pluginContext = new OMEventListenerPluginContextImpl(ozoneManager); + NotificationCheckpointStrategy checkpointStrategy = null; + if (ozoneManager != null && ozoneManager.getOmMetadataReader() != null) { + checkpointStrategy = new OzoneFileCheckpointStrategy(ozoneManager, + ozoneManager.getOmMetadataReader().get(), conf); + } + OMEventListenerPluginContext pluginContext = new OMEventListenerPluginContextImpl(ozoneManager, checkpointStrategy); for (String destName : destNameList) { try { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OzoneFileCheckpointStrategy.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OzoneFileCheckpointStrategy.java new file mode 100644 index 000000000000..da8c387dd31b --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OzoneFileCheckpointStrategy.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import com.google.protobuf.ServiceException; +import java.io.IOException; +import java.net.InetAddress; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.ozone.om.IOmMetadataReader; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketArgs; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetBucketPropertyRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.UserInfo; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.ratis.protocol.ClientId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An implementation of NotificationCheckpointStrategy which loads/saves + * the the last known notification sent by an event notification plugin + * directly as metadata on the checkpoint bucket itself. + * + * This allows another OM to pick up from the appropriate place in the + * event of a leadership change. + * + * NOTE: The current approach is to store the current checkpoint as a + * bucket metadata property. This has the virtue of requiring only a + * single request (the first implementation of this used a two-phase + * CreateKeyRequest / CommitKeyRequest approach with the checkpoint + * value being stored as a KeyArgs metadata property, but the two-phase + * nature increased complexity). + */ +public class OzoneFileCheckpointStrategy implements NotificationCheckpointStrategy { + + public static final Logger LOG = LoggerFactory.getLogger(OzoneFileCheckpointStrategy.class); + + public static final String OZONE_OM_PLUGIN_CHECKPOINT_VOLUME = "ozone.om.plugin.kafka.checkpoint.volume"; + public static final String OZONE_OM_PLUGIN_CHECKPOINT_VOLUME_DEFAULT = "notifications"; + + public static final String OZONE_OM_PLUGIN_CHECKPOINT_BUCKET = "ozone.om.plugin.kafka.checkpoint.bucket"; + public static final String OZONE_OM_PLUGIN_CHECKPOINT_BUCKET_DEFAULT = "checkpoint"; + + public static final String OZONE_OM_PLUGIN_CHECKPOINT_SAVE_INTERVAL = + "ozone.om.plugin.kafka.checkpoint.save.interval"; + public static final int OZONE_OM_PLUGIN_CHECKPOINT_SAVE_INTERVAL_DEFAULT = 100; + + private static final String METDATA_KEY = "notification-checkpoint"; + + private final AtomicLong callId = new AtomicLong(0); + private final ClientId clientId = ClientId.randomId(); + private final OzoneManager ozoneManager; + private final AtomicLong saveCount = new AtomicLong(0); + + private final String volume; + private final String bucket; + private final int saveInterval; + + public OzoneFileCheckpointStrategy(OzoneManager ozoneManager, final IOmMetadataReader omMetadataReader, + OzoneConfiguration conf) { + this.ozoneManager = ozoneManager; + this.volume = conf.get(OZONE_OM_PLUGIN_CHECKPOINT_VOLUME, OZONE_OM_PLUGIN_CHECKPOINT_VOLUME_DEFAULT); + this.bucket = conf.get(OZONE_OM_PLUGIN_CHECKPOINT_BUCKET, OZONE_OM_PLUGIN_CHECKPOINT_BUCKET_DEFAULT); + + int interval = conf.getInt(OZONE_OM_PLUGIN_CHECKPOINT_SAVE_INTERVAL, + OZONE_OM_PLUGIN_CHECKPOINT_SAVE_INTERVAL_DEFAULT); + if (interval < 1) { + LOG.warn("Configured save interval {} is invalid. Defaulting to 100.", interval); + interval = OZONE_OM_PLUGIN_CHECKPOINT_SAVE_INTERVAL_DEFAULT; + } + this.saveInterval = interval; + } + + @Override + public String load() throws IOException { + try { + OmBucketInfo bucketInfo = ozoneManager.getBucketInfo(volume, bucket); + if (bucketInfo != null && bucketInfo.getMetadata() != null) { + return bucketInfo.getMetadata().get(METDATA_KEY); + } + } catch (IOException ex) { + LOG.info("Error loading notification checkpoint from bucket /{}/{} - {}", volume, bucket, ex.getMessage()); + } + return null; + } + + @Override + public void save(String val) throws IOException { + if (StringUtils.isBlank(val)) { + return; + } + long previousSaveCount = saveCount.getAndIncrement(); + // Throttle database commits: persist checkpoint based on configured interval to avoid write storms + if (previousSaveCount == 0 || previousSaveCount % saveInterval == 0) { + saveImpl(val); + } + } + + private void saveImpl(String val) { + BucketArgs bucketArgs = BucketArgs.newBuilder() + .setVolumeName(volume) + .setBucketName(bucket) + .addMetadata(HddsProtos.KeyValue.newBuilder() + .setKey(METDATA_KEY) + .setValue(val) + .build()) + .build(); + + SetBucketPropertyRequest setBucketPropertyRequest = SetBucketPropertyRequest.newBuilder() + .setBucketArgs(bucketArgs) + .build(); + + OMRequest omRequest = OMRequest.newBuilder() + .setCmdType(Type.SetBucketProperty) + .setClientId(clientId.toString()) + .setSetBucketPropertyRequest(setBucketPropertyRequest) + .setUserInfo(getUserInfo()) + .build(); + + submitRequest(omRequest); + LOG.info("Persisted {} = {} directly as metadata on bucket /{}/{}", METDATA_KEY, val, volume, bucket); + } + + private UserInfo getUserInfo() { + UserInfo.Builder userInfo = UserInfo.newBuilder(); + try { + userInfo.setUserName(UserGroupInformation.getCurrentUser().getShortUserName()); + } catch (IOException e) { + LOG.warn("Failed to get current login user name", e); + userInfo.setUserName("om"); + } + + if (ozoneManager.getOmRpcServerAddr() != null) { + InetAddress remoteAddress = ozoneManager.getOmRpcServerAddr().getAddress(); + if (remoteAddress != null) { + userInfo.setHostName(remoteAddress.getHostName()); + userInfo.setRemoteAddress(remoteAddress.getHostAddress()); + } + } + return userInfo.build(); + } + + private OMResponse submitRequest(OMRequest omRequest) { + try { + return OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, callId.incrementAndGet()); + } catch (ServiceException e) { + LOG.error("Set bucket metadata " + omRequest.getCmdType() + " request failed. Will retry at next run.", e); + } + return null; + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOzoneFileCheckpointStrategy.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOzoneFileCheckpointStrategy.java new file mode 100644 index 000000000000..9a7c92d2978f --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOzoneFileCheckpointStrategy.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; + +import com.google.protobuf.ServiceException; +import java.io.IOException; +import org.apache.hadoop.ozone.om.IOmMetadataReader; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.ratis.protocol.ClientId; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +/** + * Tests {@link OzoneFileCheckpointStrategy}. + */ +@ExtendWith(MockitoExtension.class) +public class TestOzoneFileCheckpointStrategy { + + @Mock + private OzoneManager mockOzoneManager; + @Mock + private IOmMetadataReader mockOmMetadataReader; + @Mock + private OzoneManagerProtocolProtos.OMResponse mockOmResponse; + @Mock + private OmBucketInfo mockBucketInfo; + + private OzoneFileCheckpointStrategy ozoneFileCheckpointStrategy; + + @BeforeEach + public void setup() { + ozoneFileCheckpointStrategy = new OzoneFileCheckpointStrategy(mockOzoneManager, mockOmMetadataReader, + new org.apache.hadoop.hdds.conf.OzoneConfiguration()); + } + + @Test + public void testSaveStrategy() throws IOException, ServiceException { + try (MockedStatic utils = mockStatic(OzoneManagerRatisUtils.class)) { + utils.when(() -> OzoneManagerRatisUtils.submitRequest(any(OzoneManager.class), + any(OzoneManagerProtocolProtos.OMRequest.class), + any(ClientId.class), any(Long.class))).thenReturn(mockOmResponse); + + // Check its saved on first iteration + ozoneFileCheckpointStrategy.save("00000000000000000001"); + utils.verify(() -> OzoneManagerRatisUtils.submitRequest(any(OzoneManager.class), + any(OzoneManagerProtocolProtos.OMRequest.class), + any(ClientId.class), any(Long.class)), Mockito.times(1)); + + // But not on second + ozoneFileCheckpointStrategy.save("0000000000000000002"); + utils.verify(() -> OzoneManagerRatisUtils.submitRequest(any(OzoneManager.class), + any(OzoneManagerProtocolProtos.OMRequest.class), + any(ClientId.class), any(Long.class)), Mockito.times(1)); + + for (int i = 0; i <= 100; i++) { + String val = String.format("%020d", i); + ozoneFileCheckpointStrategy.save(val); + } + + // Check submit has only ran twice in total (first save + save at 100th iteration) + utils.verify(() -> OzoneManagerRatisUtils.submitRequest(any(OzoneManager.class), + any(OzoneManagerProtocolProtos.OMRequest.class), + any(ClientId.class), any(Long.class)), Mockito.times(2)); + } + } + + @Test + public void testLoadStrategyWhenMetadataNotSet() throws IOException { + when(mockOzoneManager.getBucketInfo(any(), any())).thenReturn(mockBucketInfo); + when(mockBucketInfo.getMetadata()).thenReturn(com.google.common.collect.ImmutableMap.of()); + Assertions.assertNull(ozoneFileCheckpointStrategy.load()); + } + + @Test + public void testLoadStrategyWhenBucketDoesNotExist() throws IOException { + when(mockOzoneManager.getBucketInfo(any(), any())).thenThrow(IOException.class); + Assertions.assertNull(ozoneFileCheckpointStrategy.load()); + } + + @Test + public void testLoadStrategyWithValidMetaData() throws IOException { + when(mockOzoneManager.getBucketInfo(any(), any())).thenReturn(mockBucketInfo); + when(mockBucketInfo.getMetadata()).thenReturn( + com.google.common.collect.ImmutableMap.of("notification-checkpoint", "00000000000000000017")); + Assertions.assertEquals("00000000000000000017", ozoneFileCheckpointStrategy.load()); + } +}