Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-21997 Expose dump creating status into snapshot metrics #11305

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,21 @@ protected SnapshotStatusJob(@Nullable NoArg arg, boolean debug) {
else {
MetricRegistry mreg = ignite.context().metric().registry(SNAPSHOT_METRICS);

metrics = new T5<>(
mreg.<LongMetric>findMetric("CurrentSnapshotProcessedSize").value(),
mreg.<LongMetric>findMetric("CurrentSnapshotTotalSize").value(),
-1L, -1L, -1L);
metrics = req.dump()
? new T5<>(
mreg.<LongMetric>findMetric("CurrentDumpProcessedPartitions").value(),
mreg.<LongMetric>findMetric("CurrentDumpTotalPartitions").value(),
mreg.<LongMetric>findMetric("CurrentDumpProcessedEntries").value(),
-1L,
-1L
)
: new T5<>(
mreg.<LongMetric>findMetric("CurrentSnapshotProcessedSize").value(),
mreg.<LongMetric>findMetric("CurrentSnapshotTotalSize").value(),
-1L,
-1L,
-1L
);
}

return new SnapshotStatus(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,24 @@ public static String partDeltaFileName(int partId) {
return task == null ? -1 : task.processedSize();
}, "Processed size of current cluster snapshot in bytes on this node.");

mreg.register("CurrentDumpTotalPartitions", () -> {
CreateDumpFutureTask task = currentSnapshotTask(CreateDumpFutureTask.class);

return task == null ? -1 : task.totalPartitions();
}, "Total number of partitions to be processed on this node.");

mreg.register("CurrentDumpProcessedPartitions", () -> {
CreateDumpFutureTask task = currentSnapshotTask(CreateDumpFutureTask.class);

return task == null ? -1 : task.processedPartitions();
}, "Total number of partitions that have been processed on this node.");

mreg.register("CurrentDumpProcessedEntries", () -> {
CreateDumpFutureTask task = currentSnapshotTask(CreateDumpFutureTask.class);

return task == null ? -1 : task.storedEntries();
}, "Total number of processed entries on this node.");

MetricRegistry incSnpMReg = cctx.kernalContext().metric().registry(INCREMENTAL_SNAPSHOT_METRICS);

incSnpMReg.register("snapshotName",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
/** Processed dump size in bytes. */
private final AtomicLong processedSize = new AtomicLong();

/** Total partitions. */
private final AtomicLong totalParts = new AtomicLong();

/** Processed partitions. */
private final AtomicLong processedParts = new AtomicLong();

/** Processed entries. */
private final LongAdder storedEntries = new LongAdder();

/** If {@code null} then encryption disabled. */
private final @Nullable Serializable encKey;

Expand Down Expand Up @@ -272,6 +281,8 @@ private void prepare() throws IOException, IgniteCheckedException {
AtomicLong writtenEntriesCnt = new AtomicLong();
AtomicLong changedEntriesCnt = new AtomicLong();

totalParts.addAndGet(grpParts.size());

String name = cctx.cache().cacheGroup(grp).cacheOrGroupName();

CacheGroupContext gctx = cctx.kernalContext().cache().cacheGroup(grp);
Expand Down Expand Up @@ -318,6 +329,8 @@ private void prepare() throws IOException, IgniteCheckedException {
writtenEntriesCnt.addAndGet(writtenEntriesCnt0);
changedEntriesCnt.addAndGet(dumpCtx.changedCnt.intValue());

processedParts.incrementAndGet();

if (log.isDebugEnabled()) {
log.debug("Finish group partition dump [name=" + name +
", id=" + grp +
Expand Down Expand Up @@ -643,6 +656,8 @@ private void write(
throw new IgniteException("Can't write row");

processedSize.addAndGet(buf.limit());

storedEntries.increment();
}

/**
Expand Down Expand Up @@ -725,4 +740,19 @@ private File groupDirectory(CacheGroupContext grpCtx) throws IgniteCheckedExcept
public @Nullable Serializable encryptionKey() {
return encKey;
}

/** */
public long totalPartitions() {
return totalParts.get();
}

/** */
public long processedPartitions() {
return processedParts.get();
}

/** */
public long storedEntries() {
return storedEntries.sum();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@

package org.apache.ignite.internal.processors.cache.persistence.snapshot;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.OpenOption;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.AttributeNotFoundException;
import javax.management.DynamicMBean;
import javax.management.MBeanException;
Expand All @@ -29,6 +35,9 @@
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
Expand Down Expand Up @@ -91,6 +100,60 @@ public void testCreateSnapshot() throws Exception {
assertSnapshotCacheKeys(snp.cache(dfltCacheCfg.getName()));
}

/** @throws Exception If fails. */
@Test
public void testCreateDump() throws Exception {
IgniteEx ign = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);

DynamicMBean snpMBean = metricRegistry(ign.name(), null, SNAPSHOT_METRICS);

assertEquals("Snapshot end time must be undefined on first snapshot operation starts.",
0, (long)getMetric("LastSnapshotEndTime", snpMBean));

IgniteSnapshotManager mgr = ign.context().cache().context().snapshotMgr();

AtomicInteger cntr = new AtomicInteger();

CountDownLatch latch = new CountDownLatch(1);

mgr.ioFactory(new RandomAccessFileIOFactory(){
@Override public FileIO create(File file, OpenOption... modes) throws IOException {
return new RandomAccessFileIO(file, modes) {
@Override public int write(ByteBuffer srcBuf, long position) throws IOException {
if (cntr.incrementAndGet() >= 10) {
try {
latch.await();
}
catch (InterruptedException e) {
throw new IOException(e);
}
}

return super.write(srcBuf, position);
}
};
}
});

IgniteFuture<Void> fut = mgr.createDump("dump", null);

assertTrue("Dump total partitions should be defined.",
GridTestUtils.waitForCondition(() -> (long)getMetric("CurrentDumpTotalPartitions", snpMBean) > 0, TIMEOUT));

assertTrue("Dump processed partitions should be defined.", (long)getMetric("CurrentDumpProcessedPartitions", snpMBean) > 0);

assertTrue("Dump processed entries should be defined.", (long)getMetric("CurrentDumpProcessedEntries", snpMBean) > 0);

latch.countDown();

fut.get();

assertTrue("Waiting for snapshot operation failed.",
GridTestUtils.waitForCondition(() -> (long)getMetric("LastSnapshotEndTime", snpMBean) > 0, TIMEOUT));

stopAllGrids();
}

/** @throws Exception If fails. */
@Test
public void testCancelSnapshot() throws Exception {
Expand Down