diff --git a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java index 050de467f2687..013333875d193 100644 --- a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java +++ b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java @@ -283,8 +283,8 @@ public final class DataNodePipeMessages { + "PipeInsertNodeTabletInsertionEvent({}) overlaps with the time range: [{}, {}]. " + "Returning true to ensure data integrity."; public static final String FAILED_TO_ALLOCATE_MEMORY_FOR_PARSING_TSFILE = - "{}: failed to allocate memory for parsing TsFile {}, tablet event no. {}, retry count " - + "is {}, will keep retrying."; + "{}: failed to allocate memory for parsing TsFile {}, tablet event no. {}, " + + "will release parser memory and retry the TsFile event later."; public static final String FAILED_TO_BUILD_TABLET = "Failed to build tablet"; public static final String FAILED_TO_CHECK_NEXT = "Failed to check next"; public static final String FAILED_TO_CLOSE_TSFILEREADER = "Failed to close TsFileReader"; diff --git a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java index 147d977d57269..8ce5f83996717 100644 --- a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java +++ b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java @@ -272,8 +272,8 @@ public final class DataNodePipeMessages { "determining the event time of PipeInsertNodeTabletInsertionEvent({}) overlaps with the " + "time range: [{}, {}]. Returning true to ensure data integrity 时发生异常"; public static final String FAILED_TO_ALLOCATE_MEMORY_FOR_PARSING_TSFILE = - "{}: failed to allocate memory for parsing TsFile {}, tablet event no. {}, retry count " - + "is {}, will keep retrying."; + "{}:为解析 TsFile {} 分配内存失败,tablet 事件编号 {}," + + "将释放解析器内存并稍后重试该 TsFile 事件。"; public static final String FAILED_TO_BUILD_TABLET = "构建 tablet 失败"; public static final String FAILED_TO_CHECK_NEXT = "check next 失败"; public static final String FAILED_TO_CLOSE_TSFILEREADER = "关闭 TsFileReader 失败"; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 3509e6b29cef9..21fc71da969e1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -794,9 +794,8 @@ protected void calculateMemoryUsage( long needMemory = 0; - needMemory += calculateTsFileParserMemory(sourceParameters, sinkParameters); - needMemory += calculateSinkBatchMemory(sinkParameters); - needMemory += calculateSendTsFileReadBufferMemory(sourceParameters, sinkParameters); + // TsFile parser, sink batch, and TsFile read buffer memory are allocated dynamically + // from PipeMemoryManager only while they are active. needMemory += calculateAssignerMemory(sourceParameters); PipeMemoryManager pipeMemoryManager = PipeDataNodeResourceManager.memory(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java index c37aab5af2c8d..ec6ae0e7fc797 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java @@ -177,6 +177,8 @@ protected boolean executeOnce() throws Exception { event1 -> { try { pipeProcessor.process(event1, outputEventCollector); + } catch (PipeRuntimeOutOfMemoryCriticalException e) { + throw e; } catch (Exception e) { ex.set(e); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 6d4c3580bd281..33aa202b7f809 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -709,38 +709,37 @@ public interface TabletInsertionEventConsumer { public void consumeTabletInsertionEventsWithRetry( final TabletInsertionEventConsumer consumer, final String callerName) throws Exception { - final Iterable iterable = toTabletInsertionEvents(); - final Iterator iterator = iterable.iterator(); int tabletEventCount = 0; - while (iterator.hasNext()) { - final TabletInsertionEvent parsedEvent = iterator.next(); - tabletEventCount++; - int retryCount = 0; - while (true) { - // If failed due do insufficient memory, retry until success to avoid race among multiple - // processor threads + try { + final Iterable iterable = toTabletInsertionEvents(); + final Iterator iterator = iterable.iterator(); + while (iterator.hasNext()) { + final TabletInsertionEvent parsedEvent = iterator.next(); + tabletEventCount++; try { consumer.consume((PipeRawTabletInsertionEvent) parsedEvent); - break; } catch (final PipeRuntimeOutOfMemoryCriticalException e) { - if (retryCount++ % 100 == 0) { - LOGGER.warn( - DataNodePipeMessages.FAILED_TO_ALLOCATE_MEMORY_FOR_PARSING_TSFILE, - callerName, - getTsFile(), - tabletEventCount, - retryCount); - } else if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - DataNodePipeMessages.FAILED_TO_ALLOCATE_MEMORY_FOR_PARSING_TSFILE, - callerName, - getTsFile(), - tabletEventCount, - retryCount, - e); - } + releaseParsedTabletEvent(parsedEvent); + throw e; } } + } catch (final PipeRuntimeOutOfMemoryCriticalException e) { + close(); + LOGGER.warn( + DataNodePipeMessages.FAILED_TO_ALLOCATE_MEMORY_FOR_PARSING_TSFILE, + callerName, + getTsFile(), + tabletEventCount, + e); + throw e; + } + } + + private void releaseParsedTabletEvent(final TabletInsertionEvent parsedEvent) { + if (parsedEvent instanceof PipeRawTabletInsertionEvent + && ((PipeRawTabletInsertionEvent) parsedEvent).getReferenceCount() == 0 + && !((PipeRawTabletInsertionEvent) parsedEvent).isReleased()) { + ((PipeRawTabletInsertionEvent) parsedEvent).clearReferenceCount(getClass().getName()); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java index 627731fa7edc4..da5cc6b88dba1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java @@ -24,7 +24,6 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; -import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertionEventTableParser; @@ -109,9 +108,7 @@ protected TsFileInsertionEventParser( this.sourceEvent = sourceEvent; this.allocatedMemoryBlockForTablet = - PipeDataNodeResourceManager.memory() - .forceAllocateForTabletWithRetry( - IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes()); + PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0); LOGGER.debug( DataNodePipeMessages.TSFILE_HAS_INITIALIZED_PIPENAME_CREATION_TIME_PATTERN, @@ -180,6 +177,13 @@ protected void recordTabletMetrics(final Tablet tablet) { } } + protected void releaseTabletMemoryBlock() { + if (allocatedMemoryBlockForTablet != null + && allocatedMemoryBlockForTablet.getMemoryUsageInBytes() > 0) { + PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForTablet, 0); + } + } + @Override public void close() { tabletInsertionIterable = null; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java index 33652f3f3da69..ba840b11c32dc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java @@ -524,6 +524,7 @@ public TabletInsertionEvent next() { final Tablet tablet = tabletIterator.next(); // Record tablet metrics recordTabletMetrics(tablet); + releaseTabletMemoryBlock(); final boolean isAligned = deviceIsAlignedMap.getOrDefault( IDeviceID.Factory.DEFAULT_FACTORY.create(tablet.getDeviceId()), false); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java index e3af5aaa0c1a0..3b99a549b4b20 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java @@ -29,7 +29,6 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.db.auth.AuthorityChecker; -import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; @@ -144,12 +143,9 @@ public TsFileInsertionEventScanParser( filter = Objects.nonNull(timeFilterExpression) ? timeFilterExpression.getFilter() : null; this.allocatedMemoryBlockForBatchData = - PipeDataNodeResourceManager.memory() - .forceAllocateForTabletWithRetry( - IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes()); + PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0); this.allocatedMemoryBlockForChunk = - PipeDataNodeResourceManager.memory() - .forceAllocateForTabletWithRetry(PipeConfig.getInstance().getPipeMaxReaderChunkSize()); + PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0); try { currentModifications = @@ -235,6 +231,7 @@ public TabletInsertionEvent next() { final Tablet tablet = getNextTablet(); // Record tablet metrics recordTabletMetrics(tablet); + releaseTabletMemoryBlock(); final boolean isLast = isLastTabletWithoutDeferredException(); try { return sourceEvent == null diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java index 8ecdcc0cec5e9..5d81d6ae9021e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java @@ -22,11 +22,9 @@ import org.apache.iotdb.commons.audit.IAuditEntity; import org.apache.iotdb.commons.exception.auth.AccessDeniedException; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; -import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.queryengine.plan.relational.metadata.QualifiedObjectName; import org.apache.iotdb.db.auth.AuthorityChecker; -import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; @@ -94,25 +92,14 @@ public TsFileInsertionEventTableParser( allocatedMemoryBlockForModifications = PipeDataNodeResourceManager.memory() .forceAllocateForTabletWithRetry(currentModifications.ramBytesUsed()); - long tableSize = - Math.min( - IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(), - IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize()); - this.allocatedMemoryBlockForChunk = - PipeDataNodeResourceManager.memory() - .forceAllocateForTabletWithRetry( - PipeConfig.getInstance().getPipeMaxReaderChunkSize()); + PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0); this.allocatedMemoryBlockForBatchData = - PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(tableSize); + PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0); this.allocatedMemoryBlockForChunkMeta = - PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(tableSize); + PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0); this.allocatedMemoryBlockForTableSchemas = - PipeDataNodeResourceManager.memory() - .forceAllocateForTabletWithRetry( - IoTDBDescriptor.getInstance() - .getConfig() - .getPipeDataStructureTabletSizeInBytes()); + PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0); this.startTime = startTime; this.endTime = endTime; @@ -239,6 +226,7 @@ && hasTablePrivilege(entry.getKey()), final Tablet tablet = tabletIterator.next(); recordTabletMetrics(tablet); + releaseTabletMemoryBlock(); if (!PipeRawTabletInsertionEvent.isTabletEmpty(tablet)) { bufferedTablet = tablet; return true; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java index 6d27b9f7dd5b0..d7ba00eb18db0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java @@ -26,6 +26,7 @@ import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; @@ -538,6 +539,8 @@ public void process( event -> { try { process(event, eventCollector); + } catch (PipeRuntimeOutOfMemoryCriticalException e) { + throw e; } catch (Exception e) { ex.set(e); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java index a8e0c270570bb..fcaa0feb05814 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.processor.downsampling; import org.apache.iotdb.commons.consensus.DataRegionId; +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; @@ -156,6 +157,8 @@ public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector ev event -> { try { process(event, eventCollector); + } catch (PipeRuntimeOutOfMemoryCriticalException e) { + throw e; } catch (Exception e) { ex.set(e); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java index c99efe5e3da3e..64844430a720b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java @@ -329,12 +329,6 @@ public synchronized void resize( } final long oldSize = block.getMemoryUsageInBytes(); - if (oldSize == 0) { - // If the memory block is not registered, we need to register it first. - // Otherwise, the memory usage will be inconsistent. - // See registerMemoryBlock for more details. - allocatedBlocks.add(block); - } if (oldSize >= targetSize) { memoryBlock.release(oldSize - targetSize); @@ -350,6 +344,8 @@ public synchronized void resize( if (targetSize == 0) { allocatedBlocks.remove(block); } + + this.notifyAll(); return; } @@ -359,6 +355,12 @@ public synchronized void resize( if (getTotalNonFloatingMemorySizeInBytes() - memoryBlock.getUsedMemoryInBytes() >= sizeInBytes) { memoryBlock.forceAllocateWithoutLimitation(sizeInBytes); + if (oldSize == 0) { + // If the memory block is not registered, we need to register it first. + // Otherwise, the memory usage will be inconsistent. + // See registerMemoryBlock for more details. + allocatedBlocks.add(block); + } if (block instanceof PipeTabletMemoryBlock) { usedMemorySizeInBytesOfTablets += sizeInBytes; } @@ -495,6 +497,9 @@ public synchronized boolean tryAllocate( if (getTotalNonFloatingMemorySizeInBytes() - memoryBlock.getUsedMemoryInBytes() >= memoryInBytesNeededToBeAllocated) { memoryBlock.forceAllocateWithoutLimitation(memoryInBytesNeededToBeAllocated); + if (block.getMemoryUsageInBytes() == 0) { + allocatedBlocks.add(block); + } if (block instanceof PipeTabletMemoryBlock) { usedMemorySizeInBytesOfTablets += memoryInBytesNeededToBeAllocated; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java index aede0e994d9a9..a751f798a0ccd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java @@ -61,8 +61,7 @@ protected PipeTabletEventBatch( // limit in buffer size this.maxBatchSizeInBytes = requestMaxBatchSizeInBytes; - this.allocatedMemoryBlock = - PipeDataNodeResourceManager.memory().forceAllocate(requestMaxBatchSizeInBytes); + this.allocatedMemoryBlock = PipeDataNodeResourceManager.memory().forceAllocate(0); if (recordMetric != null) { this.recordMetric = recordMetric; } else { @@ -97,6 +96,10 @@ public synchronized boolean onEvent(final TabletInsertionEvent event) events.add((EnrichedEvent) event); } } catch (final Exception e) { + if (events.isEmpty()) { + clearBatchData(); + resetMemoryUsage(); + } // If the event is not added to the batch, we need to decrease the reference count. ((EnrichedEvent) event) .decreaseReferenceCount(PipeTransferBatchReqBuilder.class.getName(), false); @@ -126,7 +129,28 @@ public synchronized boolean onEvent(final TabletInsertionEvent event) protected abstract boolean constructBatch(final TabletInsertionEvent event) throws WALPipeException, IOException; + protected void increaseTotalBufferSizeAndUpdateMemoryBlock(final long bufferSize) { + if (bufferSize <= 0) { + return; + } + + final long newTotalBufferSize = totalBufferSize + bufferSize; + PipeDataNodeResourceManager.memory() + .forceResize(allocatedMemoryBlock, Math.min(newTotalBufferSize, maxBatchSizeInBytes)); + totalBufferSize = newTotalBufferSize; + } + + protected void releaseAllocatedMemoryBlock() { + PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlock, 0); + } + + protected void clearBatchData() {} + public boolean shouldEmit() { + if (events.isEmpty()) { + return false; + } + final long diff = System.currentTimeMillis() - firstEventProcessingTime; if (totalBufferSize >= maxBatchSizeInBytes || diff >= maxDelayInMs) { recordMetric.accept(diff, totalBufferSize, events.size()); @@ -138,23 +162,26 @@ public boolean shouldEmit() { public synchronized void onSuccess() { events.clear(); - totalBufferSize = 0; - - firstEventProcessingTime = Long.MIN_VALUE; + resetMemoryUsage(); } @Override public synchronized void close() { + if (isClosed) { + return; + } isClosed = true; clearEventsReferenceCount(PipeTabletEventBatch.class.getName()); events.clear(); + clearBatchData(); + resetMemoryUsage(); allocatedMemoryBlock.close(); } /** - * Discard all events of the given pipe. This method only clears the reference count of the events - * and discard them, but do not modify other objects (such as buffers) for simplicity. + * Discard all events of the given pipe. This method only clears the reference count of the + * events. If some events remain, cached batch data is kept unchanged for simplicity. */ public synchronized void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { @@ -162,14 +189,27 @@ public synchronized void discardEventsOfPipe( } public synchronized void discardEventsOfPipe(final CommitterKey committerKey) { - events.removeIf( - event -> { - if (isEventFromPipe(event, committerKey)) { - event.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName()); - return true; - } - return false; - }); + final boolean hasDiscardedEvents = + events.removeIf( + event -> { + if (isEventFromPipe(event, committerKey)) { + event.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName()); + return true; + } + return false; + }); + if (hasDiscardedEvents && events.isEmpty()) { + clearBatchData(); + resetMemoryUsage(); + } + } + + private void resetMemoryUsage() { + totalBufferSize = 0; + + releaseAllocatedMemoryBlock(); + + firstEventProcessingTime = Long.MIN_VALUE; } private static boolean isEventFromPipe( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java index b32479e2f1a21..05b348f323791 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java @@ -74,7 +74,6 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch { @Override protected boolean constructBatch(final TabletInsertionEvent event) throws IOException { final long bufferSize = buildTabletInsertionBuffer(event); - totalBufferSize += bufferSize; pipe2BytesAccumulated.compute( new Pair<>( ((EnrichedEvent) event).getPipeName(), ((EnrichedEvent) event).getCreationTime()), @@ -85,8 +84,13 @@ protected boolean constructBatch(final TabletInsertionEvent event) throws IOExce @Override public synchronized void onSuccess() { + clearBatchData(); + super.onSuccess(); + } + @Override + protected void clearBatchData() { insertNodeBuffers.clear(); tabletBuffers.clear(); @@ -161,24 +165,21 @@ private long buildTabletInsertionBuffer(final TabletInsertionEvent event) throws final InsertNode insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNode(); if (!(insertNode instanceof RelationalInsertTabletNode)) { buffer = insertNode.serializeToByteBuffer(); + final String databaseName = + pipeInsertNodeTabletInsertionEvent.isTableModelEvent() + ? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName() + : pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName(); + estimateSize = RamUsageEstimator.sizeOf(databaseName) + buffer.limit(); + increaseTotalBufferSizeAndUpdateMemoryBlock(estimateSize); insertNodeBuffers.add(buffer); - if (pipeInsertNodeTabletInsertionEvent.isTableModelEvent()) { - final String databaseName = - pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName(); - estimateSize = RamUsageEstimator.sizeOf(databaseName); - insertNodeDataBases.add(databaseName); - } else { - final String databaseName = pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName(); - estimateSize = RamUsageEstimator.sizeOf(databaseName); - insertNodeDataBases.add(databaseName); - } - estimateSize += buffer.limit(); + insertNodeDataBases.add(databaseName); } else { - for (final Tablet tablet : - ((PipeInsertNodeTabletInsertionEvent) event).convertToTablets()) { - estimateSize += - constructTabletBatch( - tablet, pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()); + final List tablets = pipeInsertNodeTabletInsertionEvent.convertToTablets(); + estimateSize = calculateTabletsSizeInBytes(tablets); + increaseTotalBufferSizeAndUpdateMemoryBlock(estimateSize); + for (final Tablet tablet : tablets) { + constructTabletBatchWithoutMemoryReservation( + tablet, pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()); } } } else { @@ -198,6 +199,7 @@ private long buildTabletInsertionBuffer(final TabletInsertionEvent event) throws } final String databaseName = pipeRawTabletInsertionEvent.getTreeModelDatabaseName(); estimateSize = RamUsageEstimator.sizeOf(databaseName) + buffer.limit(); + increaseTotalBufferSizeAndUpdateMemoryBlock(estimateSize); tabletBuffers.add(buffer); tabletDataBases.add(databaseName); } @@ -207,12 +209,27 @@ private long buildTabletInsertionBuffer(final TabletInsertionEvent event) throws } private long constructTabletBatch(final Tablet tablet, final String databaseName) { + final long estimateSize = calculateTabletSizeInBytes(tablet); + increaseTotalBufferSizeAndUpdateMemoryBlock(estimateSize); + constructTabletBatchWithoutMemoryReservation(tablet, databaseName); + return estimateSize; + } + + private void constructTabletBatchWithoutMemoryReservation( + final Tablet tablet, final String databaseName) { final Pair> currentBatch = tableModelTabletMap .computeIfAbsent(databaseName, k -> new HashMap<>()) .computeIfAbsent(tablet.getTableName(), k -> new Pair<>(0, new ArrayList<>())); currentBatch.setLeft(currentBatch.getLeft() + tablet.getRowSize()); currentBatch.getRight().add(tablet); + } + + private long calculateTabletsSizeInBytes(final List tablets) { + return tablets.stream().mapToLong(PipeTabletEventPlainBatch::calculateTabletSizeInBytes).sum(); + } + + private static long calculateTabletSizeInBytes(final Tablet tablet) { return PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) + 4; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java index 7b511e23fc6c9..053b42b2c780d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java @@ -86,6 +86,7 @@ protected boolean constructBatch(final TabletInsertionEvent event) { (PipeInsertNodeTabletInsertionEvent) event; final boolean isTableModel = insertNodeTabletInsertionEvent.isTableModelEvent(); final List tablets = insertNodeTabletInsertionEvent.convertToTablets(); + increaseTotalBufferSizeAndUpdateMemoryBlock(calculateTabletsSizeInBytes(tablets)); for (int i = 0; i < tablets.size(); ++i) { final Tablet tablet = tablets.get(i); if (isTabletEmpty(tablet)) { @@ -114,6 +115,7 @@ protected boolean constructBatch(final TabletInsertionEvent event) { if (isTabletEmpty(tablet)) { return true; } + increaseTotalBufferSizeAndUpdateMemoryBlock(calculateTabletSizeInBytes(tablet)); if (rawTabletInsertionEvent.isTableModelEvent()) { // table Model bufferTableModelTablet( @@ -139,6 +141,17 @@ protected boolean constructBatch(final TabletInsertionEvent event) { return true; } + private long calculateTabletsSizeInBytes(final List tablets) { + return tablets.stream() + .filter(tablet -> !isTabletEmpty(tablet)) + .mapToLong(PipeTabletEventTsFileBatch::calculateTabletSizeInBytes) + .sum(); + } + + private static long calculateTabletSizeInBytes(final Tablet tablet) { + return PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) * 2; + } + private void bufferTreeModelTablet( final String pipeName, final long creationTime, @@ -146,11 +159,6 @@ private void bufferTreeModelTablet( final boolean isAligned) { new PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary(); - // TODO: Currently, PipeTreeModelTsFileBuilderV2 still uses PipeTreeModelTsFileBuilder as a - // fallback builder, so memory table writing and storing temporary tablets require double the - // memory. - totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) * 2; - pipeName2WeightMap.compute( new Pair<>(pipeName, creationTime), (pipe, weight) -> Objects.nonNull(weight) ? ++weight : 1); @@ -162,11 +170,6 @@ private void bufferTableModelTablet( final String pipeName, final long creationTime, final Tablet tablet, final String dataBase) { new PipeTableModelTabletEventSorter(tablet).sortAndDeduplicateByDevIdTimestamp(); - // TODO: Currently, PipeTableModelTsFileBuilderV2 still uses PipeTableModelTsFileBuilder as a - // fallback builder, so memory table writing and storing temporary tablets require double the - // memory. - totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) * 2; - pipeName2WeightMap.compute( new Pair<>(pipeName, creationTime), (pipe, weight) -> Objects.nonNull(weight) ? ++weight : 1); @@ -209,8 +212,13 @@ public synchronized List> sealTsFiles() @Override public synchronized void onSuccess() { + clearBatchData(); + super.onSuccess(); + } + @Override + protected void clearBatchData() { pipeName2WeightMap.clear(); tableModeTsFileBuilder.onSuccess(); treeModeTsFileBuilder.onSuccess(); @@ -220,8 +228,6 @@ public synchronized void onSuccess() { public synchronized void close() { super.close(); - pipeName2WeightMap.clear(); - tableModeTsFileBuilder.close(); treeModeTsFileBuilder.close(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java index 4f2dab1bfa895..92a8c731fbe86 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java @@ -33,6 +33,8 @@ import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics; import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics; +import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; +import org.apache.iotdb.db.pipe.resource.memory.PipeTsFileMemoryBlock; import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventBatch; import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventPlainBatch; import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventTsFileBatch; @@ -497,10 +499,13 @@ private void transferFilePieces( final AirGapSocket socket, final boolean isMultiFile) throws PipeException, IOException { - final int readFileBufferSize = PIPE_CONFIG.getPipeSinkReadFileBufferSize(); - final byte[] readBuffer = new byte[readFileBufferSize]; - long position = 0; - try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) { + final int readFileBufferSize = getReadFileBufferSize(file); + try (final PipeTsFileMemoryBlock ignored = + PipeDataNodeResourceManager.memory() + .forceAllocateForTsFileWithRetry(readFileBufferSize); + final RandomAccessFile reader = new RandomAccessFile(file, "r")) { + final byte[] readBuffer = new byte[readFileBufferSize]; + long position = 0; while (true) { mayLimitRateAndRecordIO(readFileBufferSize); final int readLength = reader.read(readBuffer); @@ -532,6 +537,11 @@ private void transferFilePieces( } } + private int getReadFileBufferSize(final File file) { + return (int) + Math.min((long) PIPE_CONFIG.getPipeSinkReadFileBufferSize(), Math.max(file.length(), 1L)); + } + private boolean sendBatch( final AirGapSocket socket, byte[] bytes, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java index 481e340a739f3..9c3104c0ad17d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java @@ -39,6 +39,8 @@ import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; +import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; +import org.apache.iotdb.db.pipe.resource.memory.PipeTsFileMemoryBlock; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.builder.IoTConsensusV2SyncBatchReqBuilder; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2DeleteNodeReq; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TabletInsertNodeReq; @@ -435,10 +437,13 @@ protected void transferFilePieces( final TCommitId tCommitId, final TConsensusGroupId tConsensusGroupId) throws PipeException, IOException { - final int readFileBufferSize = PipeConfig.getInstance().getPipeSinkReadFileBufferSize(); - final byte[] readBuffer = new byte[readFileBufferSize]; - long position = 0; - try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) { + final int readFileBufferSize = getReadFileBufferSize(file); + try (final PipeTsFileMemoryBlock ignored = + PipeDataNodeResourceManager.memory() + .forceAllocateForTsFileWithRetry(readFileBufferSize); + final RandomAccessFile reader = new RandomAccessFile(file, "r")) { + final byte[] readBuffer = new byte[readFileBufferSize]; + long position = 0; while (true) { final int readLength = reader.read(readBuffer); if (readLength == -1) { @@ -501,6 +506,13 @@ protected void transferFilePieces( } } + private int getReadFileBufferSize(final File file) { + return (int) + Math.min( + (long) PipeConfig.getInstance().getPipeSinkReadFileBufferSize(), + Math.max(file.length(), 1L)); + } + private TEndPoint getFollowerUrl() { // In current iotConsensusV2 design, one connector corresponds to one follower, so the peers is // actually a singleton list diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java index 4e269aaa7e82c..52815e645bf8d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java @@ -31,6 +31,8 @@ import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.consensus.metric.IoTConsensusV2SinkMetrics; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; +import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; +import org.apache.iotdb.db.pipe.resource.memory.PipeTsFileMemoryBlock; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.IoTConsensusV2AsyncSink; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TsFilePieceReq; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TsFilePieceWithModReq; @@ -70,7 +72,8 @@ public class IoTConsensusV2TsFileInsertionEventHandler private final boolean transferMod; private final int readFileBufferSize; - private final byte[] readBuffer; + private PipeTsFileMemoryBlock memoryBlock; + private byte[] readBuffer; private long position; private RandomAccessFile reader; @@ -106,8 +109,15 @@ public IoTConsensusV2TsFileInsertionEventHandler( transferMod = event.isWithMod(); currentFile = transferMod ? modFile : tsFile; - readFileBufferSize = PipeConfig.getInstance().getPipeSinkReadFileBufferSize(); - readBuffer = new byte[readFileBufferSize]; + final long maxFileLength = + transferMod && Objects.nonNull(modFile) + ? Math.max(tsFile.length(), modFile.length()) + : tsFile.length(); + readFileBufferSize = + (int) + Math.min( + (long) PipeConfig.getInstance().getPipeSinkReadFileBufferSize(), + Math.max(maxFileLength, 1L)); position = 0; reader = @@ -128,6 +138,12 @@ public void transfer(final AsyncIoTConsensusV2ServiceClient client) this.client = client; client.setShouldReturnSelf(false); + if (readBuffer == null) { + memoryBlock = + PipeDataNodeResourceManager.memory().forceAllocateForTsFileWithRetry(readFileBufferSize); + readBuffer = new byte[readFileBufferSize]; + } + final int readLength = reader.read(readBuffer); if (readLength == -1) { if (currentFile == modFile) { @@ -246,6 +262,8 @@ public void onComplete(final TIoTConsensusV2TransferResp response) { client.returnSelf(); } + releaseReadBufferMemoryBlock(); + long duration = System.nanoTime() - createTime; metric.recordConnectorTsFileTransferTimer(duration); } @@ -330,10 +348,20 @@ public void onError(final Exception exception) { connector.addFailureEventToRetryQueue(event); metric.recordRetryCounter(); + releaseReadBufferMemoryBlock(); + if (client != null) { client.setShouldReturnSelf(true); client.returnSelf(); } } } + + private void releaseReadBufferMemoryBlock() { + if (memoryBlock != null) { + memoryBlock.close(); + memoryBlock = null; + readBuffer = null; + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/builder/IoTConsensusV2TransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/builder/IoTConsensusV2TransferBatchReqBuilder.java index 677c77e0540f5..b55cb1233f557 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/builder/IoTConsensusV2TransferBatchReqBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/builder/IoTConsensusV2TransferBatchReqBuilder.java @@ -70,6 +70,7 @@ public abstract class IoTConsensusV2TransferBatchReqBuilder implements AutoClose protected long firstEventProcessingTime = Long.MIN_VALUE; // limit in buffer size + protected final long maxBatchSizeInBytes; protected final PipeMemoryBlock allocatedMemoryBlock; protected long totalBufferSize = 0; @@ -92,37 +93,12 @@ protected IoTConsensusV2TransferBatchReqBuilder( this.consensusGroupId = consensusGroupId; this.thisDataNodeId = thisDataNodeId; - final long requestMaxBatchSizeInBytes = + maxBatchSizeInBytes = parameters.getLongOrDefault( Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, SINK_IOTDB_BATCH_SIZE_KEY), CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE); - allocatedMemoryBlock = - PipeDataNodeResourceManager.memory() - .tryAllocate(requestMaxBatchSizeInBytes) - .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 0)) - .setShrinkCallback( - (oldMemory, newMemory) -> - LOGGER.info( - DataNodePipeMessages.THE_BATCH_SIZE_LIMIT_HAS_SHRUNK_FROM, - oldMemory, - newMemory)) - .setExpandMethod( - oldMemory -> Math.min(Math.max(oldMemory, 1) * 2, requestMaxBatchSizeInBytes)) - .setExpandCallback( - (oldMemory, newMemory) -> - LOGGER.info( - DataNodePipeMessages.THE_BATCH_SIZE_LIMIT_HAS_EXPANDED_FROM, - oldMemory, - newMemory)); - - if (getMaxBatchSizeInBytes() != requestMaxBatchSizeInBytes) { - LOGGER.info( - "IoTConsensusV2TransferBatchReqBuilder: the max batch size is adjusted from {} to {} due to the " - + "memory restriction", - requestMaxBatchSizeInBytes, - getMaxBatchSizeInBytes()); - } + allocatedMemoryBlock = PipeDataNodeResourceManager.memory().forceAllocate(0); } /** @@ -137,27 +113,80 @@ public synchronized boolean onEvent(TabletInsertionEvent event) return false; } - final long requestCommitId = ((EnrichedEvent) event).getReplicateIndexForIoTV2(); + final EnrichedEvent enrichedEvent = (EnrichedEvent) event; + final long requestCommitId = enrichedEvent.getReplicateIndexForIoTV2(); // The deduplication logic here is to avoid the accumulation of the same event in a batch when // retrying. if ((events.isEmpty() || !events.get(events.size() - 1).equals(event))) { - events.add((EnrichedEvent) event); - requestCommitIds.add(requestCommitId); - final int bufferSize = buildTabletInsertionBuffer(event); - - ((EnrichedEvent) event) - .increaseReferenceCount(IoTConsensusV2TransferBatchReqBuilder.class.getName()); + if (!enrichedEvent.increaseReferenceCount( + IoTConsensusV2TransferBatchReqBuilder.class.getName())) { + LOGGER.warn(DataNodePipeMessages.CANNOT_INCREASE_REFERENCE_COUNT_FOR_EVENT_IGNORE, event); + return shouldEmit(); + } - if (firstEventProcessingTime == Long.MIN_VALUE) { - firstEventProcessingTime = System.currentTimeMillis(); + final int previousEventsSize = events.size(); + final int previousRequestCommitIdsSize = requestCommitIds.size(); + final int previousBatchReqsSize = batchReqs.size(); + try { + events.add(enrichedEvent); + requestCommitIds.add(requestCommitId); + final int bufferSize = buildTabletInsertionBuffer(event); + increaseTotalBufferSizeAndUpdateMemoryBlock(bufferSize); + + if (firstEventProcessingTime == Long.MIN_VALUE) { + firstEventProcessingTime = System.currentTimeMillis(); + } + } catch (final Exception e) { + rollbackTo(previousEventsSize, previousRequestCommitIdsSize, previousBatchReqsSize); + if (events.isEmpty()) { + resetMemoryUsage(); + } + enrichedEvent.decreaseReferenceCount( + IoTConsensusV2TransferBatchReqBuilder.class.getName(), false); + throw e; } + } + + return shouldEmit(); + } - totalBufferSize += bufferSize; + private boolean shouldEmit() { + return !events.isEmpty() + && (totalBufferSize >= getMaxBatchSizeInBytes() + || System.currentTimeMillis() - firstEventProcessingTime >= maxDelayInMs); + } + + private void increaseTotalBufferSizeAndUpdateMemoryBlock(final long bufferSize) { + if (bufferSize <= 0) { + return; } - return totalBufferSize >= getMaxBatchSizeInBytes() - || System.currentTimeMillis() - firstEventProcessingTime >= maxDelayInMs; + final long newTotalBufferSize = totalBufferSize + bufferSize; + PipeDataNodeResourceManager.memory() + .forceResize(allocatedMemoryBlock, Math.min(newTotalBufferSize, getMaxBatchSizeInBytes())); + totalBufferSize = newTotalBufferSize; + } + + private void rollbackTo( + final int previousEventsSize, + final int previousRequestCommitIdsSize, + final int previousBatchReqsSize) { + while (events.size() > previousEventsSize) { + events.remove(events.size() - 1); + } + while (requestCommitIds.size() > previousRequestCommitIdsSize) { + requestCommitIds.remove(requestCommitIds.size() - 1); + } + while (batchReqs.size() > previousBatchReqsSize) { + batchReqs.remove(batchReqs.size() - 1); + } + } + + private void resetMemoryUsage() { + firstEventProcessingTime = Long.MIN_VALUE; + totalBufferSize = 0; + PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlock, 0); } public synchronized void onSuccess() { @@ -166,9 +195,7 @@ public synchronized void onSuccess() { events.clear(); requestCommitIds.clear(); - firstEventProcessingTime = Long.MIN_VALUE; - - totalBufferSize = 0; + resetMemoryUsage(); } public IoTConsensusV2TabletBatchReq toTIoTConsensusV2BatchTransferReq() throws IOException { @@ -176,7 +203,7 @@ public IoTConsensusV2TabletBatchReq toTIoTConsensusV2BatchTransferReq() throws I } protected long getMaxBatchSizeInBytes() { - return allocatedMemoryBlock.getMemoryUsageInBytes(); + return maxBatchSizeInBytes; } public boolean isEmpty() { @@ -220,6 +247,9 @@ public synchronized void close() { ((EnrichedEvent) event).clearReferenceCount(this.getClass().getName()); } } + batchReqs.clear(); + events.clear(); + requestCommitIds.clear(); allocatedMemoryBlock.close(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java index c0a9eb6a79d65..735e6c48dbb13 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java @@ -37,6 +37,8 @@ import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; +import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; +import org.apache.iotdb.db.pipe.resource.memory.PipeTsFileMemoryBlock; import org.apache.iotdb.db.pipe.sink.payload.legacy.TsFilePipeData; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; @@ -417,8 +419,12 @@ private void transportSingleFilePieceByPiece(final File file) throws IOException long position = 0; // Try small piece to rebase the file position. - final byte[] buffer = new byte[PipeConfig.getInstance().getPipeSinkReadFileBufferSize()]; - try (final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) { + final int readFileBufferSize = getReadFileBufferSize(file); + try (final PipeTsFileMemoryBlock ignored = + PipeDataNodeResourceManager.memory() + .forceAllocateForTsFileWithRetry(readFileBufferSize); + final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) { + final byte[] buffer = new byte[readFileBufferSize]; while (true) { final int dataLength = randomAccessFile.read(buffer); if (dataLength == -1) { @@ -456,6 +462,13 @@ private void transportSingleFilePieceByPiece(final File file) throws IOException } } + private int getReadFileBufferSize(final File file) { + return (int) + Math.min( + (long) PipeConfig.getInstance().getPipeSinkReadFileBufferSize(), + Math.max(file.length(), 1L)); + } + @Override public void close() throws Exception { if (client != null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java index 2467c3ce14336..27cb81d6cda29 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java @@ -124,11 +124,15 @@ public PipeTransferTsFileHandler( // the memory of the TsFile event is not released, so the memory is not enough for slicing. This // will cause a deadlock. waitForResourceEnough4Slicing((long) ((1 + Math.random()) * 20 * 1000)); // 20 - 40 seconds + final long maxFileLength = + transferMod && Objects.nonNull(modFile) + ? Math.max(tsFile.length(), modFile.length()) + : tsFile.length(); readFileBufferSize = (int) Math.min( - PipeConfig.getInstance().getPipeSinkReadFileBufferSize(), - transferMod ? Math.max(tsFile.length(), modFile.length()) : tsFile.length()); + (long) PipeConfig.getInstance().getPipeSinkReadFileBufferSize(), + Math.max(maxFileLength, 1L)); position = 0; isSealSignalSent = new AtomicBoolean(false); @@ -142,21 +146,6 @@ public void transfer( final IoTDBDataNodeAsyncClientManager clientManager, final AsyncPipeDataTransferServiceClient client) throws TException, IOException { - // Delay creation of resources to avoid OOM or too many open files - if (readBuffer == null) { - memoryBlock = - PipeDataNodeResourceManager.memory() - .forceAllocateForTsFileWithRetry( - PipeConfig.getInstance().isPipeSinkReadFileBufferMemoryControlEnabled() - ? readFileBufferSize - : 0); - readBuffer = new byte[readFileBufferSize]; - } - - if (reader == null) { - reader = transferMod ? new RandomAccessFile(modFile, "r") : new RandomAccessFile(tsFile, "r"); - } - this.clientManager = clientManager; this.client = client; @@ -173,6 +162,17 @@ public void transfer( return; } + // Delay creation of resources to avoid OOM or too many open files + if (readBuffer == null) { + memoryBlock = + PipeDataNodeResourceManager.memory().forceAllocateForTsFileWithRetry(readFileBufferSize); + readBuffer = new byte[readFileBufferSize]; + } + + if (reader == null) { + reader = transferMod ? new RandomAccessFile(modFile, "r") : new RandomAccessFile(tsFile, "r"); + } + client.setShouldReturnSelf(false); client.setTimeoutDynamically(clientManager.getConnectionTimeout()); @@ -256,6 +256,7 @@ public void onComplete(final TPipeTransferResp response) { super.onComplete(response); } finally { if (sink.isClosed()) { + releaseReadBufferMemoryBlock(); returnClientIfNecessary(); } } @@ -319,6 +320,7 @@ protected boolean onCompleteInternal(final TPipeTransferResp response) { referenceCount); } + releaseReadBufferMemoryBlock(); returnClientIfNecessary(); } @@ -361,6 +363,7 @@ public void onError(final Exception exception) { try { super.onError(exception); } finally { + releaseReadBufferMemoryBlock(); returnClientIfNecessary(); } } @@ -412,6 +415,7 @@ protected void onErrorInternal(final Exception exception) { LOGGER.warn(DataNodePipeMessages.FAILED_TO_CLOSE_FILE_READER_OR_DELETE, e); } finally { try { + releaseReadBufferMemoryBlock(); returnClientIfNecessary(); } finally { if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) { @@ -473,10 +477,14 @@ public void clearEventsReferenceCount() { @Override public void close() { super.close(); + releaseReadBufferMemoryBlock(); + } + private void releaseReadBufferMemoryBlock() { if (memoryBlock != null) { memoryBlock.close(); memoryBlock = null; + readBuffer = null; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java index eb7d39864c433..7775092c270ca 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java @@ -22,10 +22,12 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient; import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFilePieceReq; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.response.PipeTransferFilePieceResp; import org.apache.iotdb.commons.utils.RetryUtils; import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent; @@ -36,6 +38,8 @@ import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics; import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics; +import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; +import org.apache.iotdb.db.pipe.resource.memory.PipeTsFileMemoryBlock; import org.apache.iotdb.db.pipe.sink.client.IoTDBDataNodeSyncClientManager; import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventBatch; import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventPlainBatch; @@ -71,6 +75,7 @@ import java.io.File; import java.io.IOException; +import java.io.RandomAccessFile; import java.nio.file.NoSuchFileException; import java.util.Arrays; import java.util.Collections; @@ -593,6 +598,89 @@ private void doTransfer( LOGGER.info(DataNodePipeMessages.SUCCESSFULLY_TRANSFERRED_FILE, tsFile); } + @Override + protected void transferFilePieces( + final Map, Double> pipe2WeightMap, + final File file, + final Pair clientAndStatus, + final boolean isMultiFile) + throws PipeException, IOException { + final int readFileBufferSize = getReadFileBufferSize(file); + try (final PipeTsFileMemoryBlock ignored = + PipeDataNodeResourceManager.memory() + .forceAllocateForTsFileWithRetry(readFileBufferSize); + final RandomAccessFile reader = new RandomAccessFile(file, "r")) { + final byte[] readBuffer = new byte[readFileBufferSize]; + long position = 0; + while (true) { + mayLimitRateAndRecordIO(readFileBufferSize); + final int readLength = reader.read(readBuffer); + if (readLength == -1) { + break; + } + + final byte[] payLoad = + readLength == readFileBufferSize + ? readBuffer + : Arrays.copyOfRange(readBuffer, 0, readLength); + final PipeTransferFilePieceResp resp; + try { + final TPipeTransferReq req = + compressIfNeeded( + isMultiFile + ? getTransferMultiFilePieceReq(file.getName(), position, payLoad) + : getTransferSingleFilePieceReq(file.getName(), position, payLoad)); + pipe2WeightMap.forEach( + (namePair, weight) -> + rateLimitIfNeeded( + namePair.getLeft(), + namePair.getRight(), + clientAndStatus.getLeft().getEndPoint(), + (long) (req.getBody().length * weight))); + resp = + PipeTransferFilePieceResp.fromTPipeTransferResp( + clientAndStatus.getLeft().pipeTransfer(req)); + } catch (final Exception e) { + clientAndStatus.setRight(false); + throw new PipeConnectionException( + String.format( + "Network error when transfer file %s, because %s.", file, e.getMessage()), + e); + } + + position += readLength; + + final TSStatus status = resp.getStatus(); + if (status.getCode() == TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET.getStatusCode()) { + position = resp.getEndWritingOffset(); + reader.seek(position); + LOGGER.info(DataNodePipeMessages.REDIRECT_FILE_POSITION_TO, position); + continue; + } + + if (status.getCode() + == TSStatusCode.PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED.getStatusCode()) { + getClientManager().sendHandshakeReq(clientAndStatus); + } + + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() + && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { + receiverStatusHandler.handle( + resp.getStatus(), + String.format("Transfer file %s error, result status %s.", file, resp.getStatus()), + file.getName()); + } + } + } + } + + private int getReadFileBufferSize(final File file) { + return (int) + Math.min( + (long) PipeConfig.getInstance().getPipeSinkReadFileBufferSize(), + Math.max(file.length(), 1L)); + } + @Override public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws IOException { if (Objects.isNull(compressionTimer) && Objects.nonNull(sinkTaskId)) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java index 91b6d7d56c532..3d81e143ac867 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.event; import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern; @@ -155,6 +156,87 @@ public void testScanParser() throws Exception { System.out.println(System.currentTimeMillis() - startTime); } + @Test + public void testScanParserReleasesTabletMemoryAfterRawTabletGenerated() throws Exception { + nonalignedTsFile = + TsFileGeneratorUtils.generateNonAlignedTsFile( + "nonaligned-release-tablet-memory.tsfile", 1, 1, 10, 0, 100, 10, 10); + + try (final TsFileInsertionEventScanParser parser = + new TsFileInsertionEventScanParser( + nonalignedTsFile, + new PrefixTreePattern("root"), + Long.MIN_VALUE, + Long.MAX_VALUE, + null, + null, + false)) { + final Iterator iterator = parser.toTabletInsertionEvents().iterator(); + + Assert.assertTrue(iterator.hasNext()); + final TabletInsertionEvent event = iterator.next(); + Assert.assertTrue(event instanceof PipeRawTabletInsertionEvent); + Assert.assertEquals(0, getAllocatedTabletMemory(parser).getMemoryUsageInBytes()); + + ((PipeRawTabletInsertionEvent) event).clearReferenceCount(getClass().getName()); + } + } + + @Test + public void testConsumeTabletInsertionEventsWithRetryReleasesParserOnOutOfMemory() + throws Exception { + nonalignedTsFile = + TsFileGeneratorUtils.generateNonAlignedTsFile( + "nonaligned-consume-oom.tsfile", 1, 1, 10, 0, 100, 10, 10); + resource = new TsFileResource(nonalignedTsFile); + resource.setStatusForTest(TsFileResourceStatus.NORMAL); + + // The TsFile generator only creates the file, so mark the resource non-empty explicitly. + final IDeviceID deviceID = IDeviceID.Factory.DEFAULT_FACTORY.create("root.testsg.d0"); + resource.updateStartTime(deviceID, 0); + resource.updateEndTime(deviceID, 9); + + final PipeTsFileInsertionEvent event = + new PipeTsFileInsertionEvent( + false, + "root", + resource, + null, + false, + false, + false, + null, + null, + 0, + null, + new PrefixTreePattern("root"), + null, + null, + null, + null, + true, + Long.MIN_VALUE, + Long.MAX_VALUE); + final AtomicReference parsedEventReference = + new AtomicReference<>(); + + final PipeRuntimeOutOfMemoryCriticalException exception = + Assert.assertThrows( + PipeRuntimeOutOfMemoryCriticalException.class, + () -> + event.consumeTabletInsertionEventsWithRetry( + parsedEvent -> { + parsedEventReference.set(parsedEvent); + throw new PipeRuntimeOutOfMemoryCriticalException("expected oom"); + }, + "test")); + + Assert.assertEquals("expected oom", exception.getMessage()); + Assert.assertNotNull(parsedEventReference.get()); + Assert.assertTrue(parsedEventReference.get().isReleased()); + Assert.assertNull(getEventParser(event).get()); + } + @Test public void testScanParserSplitNonAlignedSinglePageChunkByEstimatedPageMemory() throws Exception { final long originalPipeMaxReaderChunkSize = @@ -1995,6 +2077,22 @@ private PipeMemoryBlock getAllocatedBatchDataMemory(final TsFileInsertionEventSc return (PipeMemoryBlock) field.get(parser); } + private PipeMemoryBlock getAllocatedTabletMemory(final TsFileInsertionEventParser parser) + throws NoSuchFieldException, IllegalAccessException { + final Field field = + TsFileInsertionEventParser.class.getDeclaredField("allocatedMemoryBlockForTablet"); + field.setAccessible(true); + return (PipeMemoryBlock) field.get(parser); + } + + @SuppressWarnings("unchecked") + private AtomicReference getEventParser( + final PipeTsFileInsertionEvent event) throws NoSuchFieldException, IllegalAccessException { + final Field field = PipeTsFileInsertionEvent.class.getDeclaredField("eventParser"); + field.setAccessible(true); + return (AtomicReference) field.get(event); + } + private long calculatePipeMaxReaderChunkSizeForSinglePageAlignedChunk(final File tsFile) throws Exception { try (final TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) {