Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.om.eventlistener;

import java.io.IOException;

/**
* Interface for implementations which load/save the current checkpoint
* transaction log index used by an event poller.
*/
public interface NotificationCheckpointStrategy {

String load() throws IOException;

void save(String val) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,16 @@ public interface OMEventListenerPluginContext {

// XXX: this probably doesn't belong here
String getThreadNamePrefix();

NotificationCheckpointStrategy getNotificationCheckpointStrategy();

/**
* Prunes records from completedRequestInfoTable using a hybrid soft and hard retention limit.
*
* @param checkpointKey the current consumer checkpoint (startKey).
* @param softLimit the soft retention keep limit behind the checkpoint.
* @param hardLimit the hard retention cap limit behind the latest transaction in database.
* @throws IOException if any database error occurs.
*/
void pruneCompletedRequestInfo(long checkpointKey, long softLimit, long hardLimit) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.om.exceptions;

import java.io.IOException;

/**
* Exception thrown when a requested completed request info record
* is not found because it has already been pruned by the retention policy.
*/
public class OMCompletedRequestPrunedException extends IOException {

public OMCompletedRequestPrunedException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public void initialize(OzoneConfiguration conf, OMEventListenerPluginContext plu
exception.initCause(ex);
throw exception;
}
this.seekPosition = new OMEventListenerLedgerPollerSeekPosition();
this.seekPosition = new OMEventListenerLedgerPollerSeekPosition(
pluginContext.getNotificationCheckpointStrategy());

LOG.info("Creating OMEventListenerLedgerPoller with serviceInterval={}," +
"serviceTimeout={}, seekPosition={}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hdds.utils.BackgroundTask;
import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
import org.apache.hadoop.ozone.om.exceptions.OMCompletedRequestPrunedException;
import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -49,6 +50,8 @@ public class OMEventListenerLedgerPoller extends BackgroundService {
private final OMEventListenerPluginContext pluginContext;
private final OMEventListenerLedgerPollerSeekPosition seekPosition;
private final Consumer<OmCompletedRequestInfo> callback;
private final long softRetentionLimit;
private final long hardRetentionLimit;

@SuppressWarnings("checkstyle:ParameterNumber")
public OMEventListenerLedgerPoller(long interval, TimeUnit unit,
Expand All @@ -70,6 +73,10 @@ public OMEventListenerLedgerPoller(long interval, TimeUnit unit,
this.pluginContext = pluginContext;
this.seekPosition = seekPosition;
this.callback = callback;
this.softRetentionLimit = configuration.getLong(
"ozone.om.plugin.kafka.ledger.retention.soft.limit", 100_000L);
this.hardRetentionLimit = configuration.getLong(
"ozone.om.plugin.kafka.ledger.retention.hard.limit", 1_000_000L);
}

private boolean shouldRun() {
Expand Down Expand Up @@ -121,14 +128,21 @@ public BackgroundTaskResult call() {
}
getRunCount().incrementAndGet();

String startKeyStr = seekPosition.get();
try {
String startKeyStr = seekPosition.get();
Long startKey = StringUtils.isNotBlank(startKeyStr) ? Long.valueOf(startKeyStr) : null;
for (OmCompletedRequestInfo requestInfo : pluginContext.listCompletedRequestInfo(
startKey, MAX_RESULTS)) {
callback.accept(requestInfo);
}
successRunCount.incrementAndGet();

if (startKey != null) {
pluginContext.pruneCompletedRequestInfo(startKey, softRetentionLimit, hardRetentionLimit);
}
} catch (OMCompletedRequestPrunedException e) {
LOG.warn("Consumer checkpoint {} has been hard-pruned. Fast-forwarding and self-healing...", startKeyStr);
seekPosition.set(null); // Clear checkpoint to fast-forward
} catch (IOException e) {
LOG.error("Error while running completed operation consumer " +
"background task. Will retry at next run.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,26 @@
/**
* This is a helper class to get/set the seek position used by the
* OMEventListenerLedgerPoller.
*
* XXX: the seek position should be persisted (and ideally distributed to
* all OMs) but at the moment it only lives in memory
*/
public class OMEventListenerLedgerPollerSeekPosition {
public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerLedgerPollerSeekPosition.class);

private final AtomicReference<String> seekPosition;
private final NotificationCheckpointStrategy checkpointStrategy;

public OMEventListenerLedgerPollerSeekPosition() {
this.seekPosition = new AtomicReference(initSeekPosition());
public OMEventListenerLedgerPollerSeekPosition(NotificationCheckpointStrategy checkpointStrategy) {
this.checkpointStrategy = checkpointStrategy;
this.seekPosition = new AtomicReference<>(initSeekPosition());
}

// TODO: load this from persistent storage
public String initSeekPosition() {
try {
if (checkpointStrategy != null) {
return checkpointStrategy.load();
}
} catch (Exception ex) {
LOG.error("Failed to load initial seek position from checkpoint strategy", ex);
}
return null;
}

Expand All @@ -48,10 +53,17 @@ public String get() {

public void set(String val) {
LOG.debug("Setting seek position {}", val);
// NOTE: this in-memory view of the seek position needs to be kept
// up to date because the OMEventListenerLedgerPoller has a
// reference to it
seekPosition.set(val);
try {
if (checkpointStrategy != null) {
checkpointStrategy.save(val);
}
// NOTE: this in-memory view of the seek position must only be kept
// up to date after we successfully persist it, so that any save
// failures prevent the poller from advancing and running away.
seekPosition.set(val);
} catch (Exception ex) {
LOG.error("Failed to save seek position checkpoint {}. Progress will not be advanced in-memory.", val, ex);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.om.eventlistener;

import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.BackgroundTask;
import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo;
import org.junit.jupiter.api.Test;

/**
* Tests for {@link OMEventListenerLedgerPoller}.
*/
public class TestOMEventListenerLedgerPoller {

@Test
public void testPollerPrunesCompletedRequestsCorrectly() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
// Configure soft retention to keep at most 5 records, hard retention 10 records
conf.setLong("ozone.om.plugin.kafka.ledger.retention.soft.limit", 5L);
conf.setLong("ozone.om.plugin.kafka.ledger.retention.hard.limit", 10L);

OMEventListenerPluginContext pluginContext = mock(OMEventListenerPluginContext.class);
when(pluginContext.isLeaderReady()).thenReturn(true);
when(pluginContext.getThreadNamePrefix()).thenReturn("test-poller-");

OMEventListenerLedgerPollerSeekPosition seekPosition = mock(OMEventListenerLedgerPollerSeekPosition.class);
// Seek position is 10
when(seekPosition.get()).thenReturn("10");

@SuppressWarnings("unchecked")
Consumer<OmCompletedRequestInfo> callback = mock(Consumer.class);

// List completes returns empty list for simplicity
when(pluginContext.listCompletedRequestInfo(eq(10L), anyInt()))
.thenReturn(Collections.emptyList());

OMEventListenerLedgerPoller poller = new OMEventListenerLedgerPoller(
1000, TimeUnit.MILLISECONDS, 1, 1000,
pluginContext, conf, seekPosition, callback);

BackgroundTaskQueue queue = poller.getTasks();
BackgroundTask task = queue.poll();

task.call();

// Verify task runs and calls listCompletedRequestInfo with current seek position (10)
verify(pluginContext, times(1)).listCompletedRequestInfo(eq(10L), anyInt());

// Verify pruning is triggered for startKey = 10, softLimit = 5, hardLimit = 10
verify(pluginContext, times(1)).pruneCompletedRequestInfo(eq(10L), eq(5L), eq(10L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.common.DeletedBlock;
import org.apache.hadoop.ozone.om.codec.OMDBDefinition;
import org.apache.hadoop.ozone.om.exceptions.OMCompletedRequestPrunedException;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
Expand Down Expand Up @@ -1380,7 +1381,7 @@ public List<OmCompletedRequestInfo> listCompletedRequestInfo(final Long startKey
// TODO: we should throw a custom exception here (instead of
// IOException) that needs to be handled appropriately by
// callers
throw new IOException(
throw new OMCompletedRequestPrunedException(
"Missing rows - start key not found (startKey=" + startKey
+ ", foundKey=" + completedRequestInfoRow.getKey() + ")");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -971,8 +971,6 @@ private void instantiateServices(boolean withNewSnapshot) throws IOException {
prefixManager = new PrefixManagerImpl(this, metadataManager, true);
keyManager = new KeyManagerImpl(this, scmClient, configuration,
perfMetrics);
eventListenerPluginManager = new OMEventListenerPluginManager(this,
configuration);
// If authorizer is not initialized or the authorizer is Native
// re-initialize the authorizer, else for non-native authorizer
// like ranger we can reuse previous value if it is initialized
Expand All @@ -996,6 +994,9 @@ public void close() {
}
};

eventListenerPluginManager = new OMEventListenerPluginManager(this,
configuration);

// Reload snapshot feature config flag
fsSnapshotEnabled = configuration.getBoolean(
OMConfigKeys.OZONE_FILESYSTEM_SNAPSHOT_ENABLED_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,27 @@

import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A narrow set of functionality we are ok with exposing to plugin
* implementations.
*/
public final class OMEventListenerPluginContextImpl implements OMEventListenerPluginContext {
private static final Logger LOG = LoggerFactory.getLogger(OMEventListenerPluginContextImpl.class);

private final OzoneManager ozoneManager;
private final NotificationCheckpointStrategy checkpointStrategy;

public OMEventListenerPluginContextImpl(OzoneManager ozoneManager) {
public OMEventListenerPluginContextImpl(OzoneManager ozoneManager,
NotificationCheckpointStrategy checkpointStrategy) {
this.ozoneManager = ozoneManager;
this.checkpointStrategy = checkpointStrategy;
}

@Override
Expand All @@ -50,4 +59,40 @@ public List<OmCompletedRequestInfo> listCompletedRequestInfo(Long startKey, int
public String getThreadNamePrefix() {
return ozoneManager.getThreadNamePrefix();
}

@Override
public NotificationCheckpointStrategy getNotificationCheckpointStrategy() {
return checkpointStrategy;
}

@Override
public void pruneCompletedRequestInfo(long checkpointKey, long softLimit, long hardLimit) throws IOException {
Table<Long, OmCompletedRequestInfo> table = ozoneManager.getMetadataManager().getCompletedRequestInfoTable();
try {
long softPruneBeforeKey = checkpointKey - softLimit;

long latestKey = -1;
try (TableIterator<Long, ? extends Table.KeyValue<Long, OmCompletedRequestInfo>>
iterator = table.iterator()) {
iterator.seekToLast();
if (iterator.hasNext()) {
latestKey = iterator.next().getKey();
}
}

long hardPruneBeforeKey = latestKey - hardLimit;

long beforeKey = Math.max(softPruneBeforeKey, hardPruneBeforeKey);
if (beforeKey > 0) {
table.deleteRange(0L, beforeKey);
LOG.info("Pruned records from completedRequestInfoTable older than trxLogIndex {} " +
"(checkpointKey={}, softLimit={}, latestKey={}, hardLimit={})",
beforeKey, checkpointKey, softLimit, latestKey, hardLimit);
}
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new IOException("Failed to prune completedRequestInfoTable range", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,12 @@ static List<OMEventListener> loadAll(OzoneManager ozoneManager, OzoneConfigurati
}
}

OMEventListenerPluginContext pluginContext = new OMEventListenerPluginContextImpl(ozoneManager);
NotificationCheckpointStrategy checkpointStrategy = null;
if (ozoneManager != null && ozoneManager.getOmMetadataReader() != null) {
checkpointStrategy = new OzoneFileCheckpointStrategy(ozoneManager,
ozoneManager.getOmMetadataReader().get(), conf);
}
OMEventListenerPluginContext pluginContext = new OMEventListenerPluginContextImpl(ozoneManager, checkpointStrategy);

for (String destName : destNameList) {
try {
Expand Down
Loading