From 6e8142d37c69178235fbb5f5e7e4a1015b38c651 Mon Sep 17 00:00:00 2001 From: Yuri Naryshkin Date: Fri, 29 Mar 2024 14:34:47 +0300 Subject: [PATCH] WIP --- .../snapshot/SnapshotStatusTask.java | 19 ++++-- .../snapshot/IgniteSnapshotManager.java | 18 ++++++ .../snapshot/dump/CreateDumpFutureTask.java | 30 +++++++++ .../snapshot/IgniteSnapshotMXBeanTest.java | 63 +++++++++++++++++++ 4 files changed, 126 insertions(+), 4 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/snapshot/SnapshotStatusTask.java b/modules/core/src/main/java/org/apache/ignite/internal/management/snapshot/SnapshotStatusTask.java index 88fea0f7eab3f..cf36c04e76d7c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/management/snapshot/SnapshotStatusTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/snapshot/SnapshotStatusTask.java @@ -123,10 +123,21 @@ protected SnapshotStatusJob(@Nullable NoArg arg, boolean debug) { else { MetricRegistry mreg = ignite.context().metric().registry(SNAPSHOT_METRICS); - metrics = new T5<>( - mreg.findMetric("CurrentSnapshotProcessedSize").value(), - mreg.findMetric("CurrentSnapshotTotalSize").value(), - -1L, -1L, -1L); + metrics = req.dump() + ? new T5<>( + mreg.findMetric("CurrentDumpProcessedPartitions").value(), + mreg.findMetric("CurrentDumpTotalPartitions").value(), + mreg.findMetric("CurrentDumpProcessedEntries").value(), + -1L, + -1L + ) + : new T5<>( + mreg.findMetric("CurrentSnapshotProcessedSize").value(), + mreg.findMetric("CurrentSnapshotTotalSize").value(), + -1L, + -1L, + -1L + ); } return new SnapshotStatus( diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java index f0bd25b5e7375..225877cd6dc9b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java @@ -612,6 +612,24 @@ public static String partDeltaFileName(int partId) { return task == null ? -1 : task.processedSize(); }, "Processed size of current cluster snapshot in bytes on this node."); + mreg.register("CurrentDumpTotalPartitions", () -> { + CreateDumpFutureTask task = currentSnapshotTask(CreateDumpFutureTask.class); + + return task == null ? -1 : task.totalPartitions(); + }, "Total number of partitions to be processed on this node."); + + mreg.register("CurrentDumpProcessedPartitions", () -> { + CreateDumpFutureTask task = currentSnapshotTask(CreateDumpFutureTask.class); + + return task == null ? -1 : task.processedPartitions(); + }, "Total number of partitions that have been processed on this node."); + + mreg.register("CurrentDumpProcessedEntries", () -> { + CreateDumpFutureTask task = currentSnapshotTask(CreateDumpFutureTask.class); + + return task == null ? -1 : task.storedEntries(); + }, "Total number of processed entries on this node."); + MetricRegistry incSnpMReg = cctx.kernalContext().metric().registry(INCREMENTAL_SNAPSHOT_METRICS); incSnpMReg.register("snapshotName", diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java index 1267e18bf8849..b13da65c542a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java @@ -109,6 +109,15 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple /** Processed dump size in bytes. */ private final AtomicLong processedSize = new AtomicLong(); + /** Total partitions. */ + private final AtomicLong totalParts = new AtomicLong(); + + /** Processed partitions. */ + private final AtomicLong processedParts = new AtomicLong(); + + /** Processed entries. */ + private final LongAdder storedEntries = new LongAdder(); + /** If {@code null} then encryption disabled. */ private final @Nullable Serializable encKey; @@ -272,6 +281,8 @@ private void prepare() throws IOException, IgniteCheckedException { AtomicLong writtenEntriesCnt = new AtomicLong(); AtomicLong changedEntriesCnt = new AtomicLong(); + totalParts.addAndGet(grpParts.size()); + String name = cctx.cache().cacheGroup(grp).cacheOrGroupName(); CacheGroupContext gctx = cctx.kernalContext().cache().cacheGroup(grp); @@ -318,6 +329,8 @@ private void prepare() throws IOException, IgniteCheckedException { writtenEntriesCnt.addAndGet(writtenEntriesCnt0); changedEntriesCnt.addAndGet(dumpCtx.changedCnt.intValue()); + processedParts.incrementAndGet(); + if (log.isDebugEnabled()) { log.debug("Finish group partition dump [name=" + name + ", id=" + grp + @@ -643,6 +656,8 @@ private void write( throw new IgniteException("Can't write row"); processedSize.addAndGet(buf.limit()); + + storedEntries.increment(); } /** @@ -725,4 +740,19 @@ private File groupDirectory(CacheGroupContext grpCtx) throws IgniteCheckedExcept public @Nullable Serializable encryptionKey() { return encKey; } + + /** */ + public long totalPartitions() { + return totalParts.get(); + } + + /** */ + public long processedPartitions() { + return processedParts.get(); + } + + /** */ + public long storedEntries() { + return storedEntries.sum(); + } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java index 91b88a45461b0..c27b564136b05 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java @@ -17,7 +17,13 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.OpenOption; import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import javax.management.AttributeNotFoundException; import javax.management.DynamicMBean; import javax.management.MBeanException; @@ -29,6 +35,9 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; import org.apache.ignite.internal.util.distributed.SingleNodeMessage; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; @@ -91,6 +100,60 @@ public void testCreateSnapshot() throws Exception { assertSnapshotCacheKeys(snp.cache(dfltCacheCfg.getName())); } + /** @throws Exception If fails. */ + @Test + public void testCreateDump() throws Exception { + IgniteEx ign = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE); + + DynamicMBean snpMBean = metricRegistry(ign.name(), null, SNAPSHOT_METRICS); + + assertEquals("Snapshot end time must be undefined on first snapshot operation starts.", + 0, (long)getMetric("LastSnapshotEndTime", snpMBean)); + + IgniteSnapshotManager mgr = ign.context().cache().context().snapshotMgr(); + + AtomicInteger cntr = new AtomicInteger(); + + CountDownLatch latch = new CountDownLatch(1); + + mgr.ioFactory(new RandomAccessFileIOFactory(){ + @Override public FileIO create(File file, OpenOption... modes) throws IOException { + return new RandomAccessFileIO(file, modes) { + @Override public int write(ByteBuffer srcBuf, long position) throws IOException { + if (cntr.incrementAndGet() >= 10) { + try { + latch.await(); + } + catch (InterruptedException e) { + throw new IOException(e); + } + } + + return super.write(srcBuf, position); + } + }; + } + }); + + IgniteFuture fut = mgr.createDump("dump", null); + + assertTrue("Dump total partitions should be defined.", + GridTestUtils.waitForCondition(() -> (long)getMetric("CurrentDumpTotalPartitions", snpMBean) > 0, TIMEOUT)); + + assertTrue("Dump processed partitions should be defined.", (long)getMetric("CurrentDumpProcessedPartitions", snpMBean) > 0); + + assertTrue("Dump processed entries should be defined.", (long)getMetric("CurrentDumpProcessedEntries", snpMBean) > 0); + + latch.countDown(); + + fut.get(); + + assertTrue("Waiting for snapshot operation failed.", + GridTestUtils.waitForCondition(() -> (long)getMetric("LastSnapshotEndTime", snpMBean) > 0, TIMEOUT)); + + stopAllGrids(); + } + /** @throws Exception If fails. */ @Test public void testCancelSnapshot() throws Exception {