Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,8 @@ public final class DataNodePipeMessages {
+ "PipeInsertNodeTabletInsertionEvent({}) overlaps with the time range: [{}, {}]. "
+ "Returning true to ensure data integrity.";
public static final String FAILED_TO_ALLOCATE_MEMORY_FOR_PARSING_TSFILE =
"{}: failed to allocate memory for parsing TsFile {}, tablet event no. {}, retry count "
+ "is {}, will keep retrying.";
"{}: failed to allocate memory for parsing TsFile {}, tablet event no. {}, "
+ "will release parser memory and retry the TsFile event later.";
public static final String FAILED_TO_BUILD_TABLET = "Failed to build tablet";
public static final String FAILED_TO_CHECK_NEXT = "Failed to check next";
public static final String FAILED_TO_CLOSE_TSFILEREADER = "Failed to close TsFileReader";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,8 @@ public final class DataNodePipeMessages {
"determining the event time of PipeInsertNodeTabletInsertionEvent({}) overlaps with the "
+ "time range: [{}, {}]. Returning true to ensure data integrity 时发生异常";
public static final String FAILED_TO_ALLOCATE_MEMORY_FOR_PARSING_TSFILE =
"{}: failed to allocate memory for parsing TsFile {}, tablet event no. {}, retry count "
+ "is {}, will keep retrying.";
"{}:为解析 TsFile {} 分配内存失败,tablet 事件编号 {},"
+ "将释放解析器内存并稍后重试该 TsFile 事件。";
public static final String FAILED_TO_BUILD_TABLET = "构建 tablet 失败";
public static final String FAILED_TO_CHECK_NEXT = "check next 失败";
public static final String FAILED_TO_CLOSE_TSFILEREADER = "关闭 TsFileReader 失败";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -794,9 +794,8 @@

long needMemory = 0;

needMemory += calculateTsFileParserMemory(sourceParameters, sinkParameters);
needMemory += calculateSinkBatchMemory(sinkParameters);
needMemory += calculateSendTsFileReadBufferMemory(sourceParameters, sinkParameters);
// TsFile parser, sink batch, and TsFile read buffer memory are allocated dynamically
// from PipeMemoryManager only while they are active.
needMemory += calculateAssignerMemory(sourceParameters);

PipeMemoryManager pipeMemoryManager = PipeDataNodeResourceManager.memory();
Expand Down Expand Up @@ -886,7 +885,7 @@

isTSFileParser =
isTSFileParser
|| (isExtractorHistory

Check warning on line 888 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this unused private "calculateTsFileParserMemory" method.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8Rr3jGfwQ4mSZzoAL-&open=AZ8Rr3jGfwQ4mSZzoAL-&pullRequest=18051
&& sourceParameters.hasAnyAttributes(
EXTRACTOR_HISTORY_END_TIME_KEY, SOURCE_HISTORY_END_TIME_KEY));

Expand Down Expand Up @@ -958,7 +957,7 @@
Arrays.asList(
PipeSinkConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY,
PipeSinkConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY),
PipeSinkConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE);

Check warning on line 960 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this unused private "calculateSinkBatchMemory" method.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8Rr3jGfwQ4mSZzoAL_&open=AZ8Rr3jGfwQ4mSZzoAL_&pullRequest=18051

if (!needUseBatch) {
return 0;
Expand Down Expand Up @@ -987,7 +986,7 @@
needTransferTsFile =
needTransferTsFile
|| PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE.equals(format)
|| PipeSinkConstant.CONNECTOR_FORMAT_TS_FILE_VALUE.equals(format);

Check warning on line 989 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this unused private "calculateSendTsFileReadBufferMemory" method.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8Rr3jGfwQ4mSZzoAMA&open=AZ8Rr3jGfwQ4mSZzoAMA&pullRequest=18051

if (!needTransferTsFile) {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ protected boolean executeOnce() throws Exception {
event1 -> {
try {
pipeProcessor.process(event1, outputEventCollector);
} catch (PipeRuntimeOutOfMemoryCriticalException e) {
throw e;
} catch (Exception e) {
ex.set(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@
LOGGER.info(DataNodePipeMessages.TEMPORARY_TSFILE_DETECTED_WILL_SKIP_ITS_TRANSFER, tsFile);
return;
}
if (isTableModelEvent()) {

Check warning on line 529 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 100 to 64, Complexity from 21 to 14, Nesting Level from 5 to 2, Number of Variables from 13 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8bkCfStSRAbF5ZBDRY&open=AZ8bkCfStSRAbF5ZBDRY&pullRequest=18051
for (final String table : tableNames) {
if (!tablePattern.matchesDatabase(getTableModelDatabaseName())
|| !tablePattern.matchesTable(table)) {
Expand Down Expand Up @@ -709,38 +709,37 @@

public void consumeTabletInsertionEventsWithRetry(
final TabletInsertionEventConsumer consumer, final String callerName) throws Exception {
final Iterable<TabletInsertionEvent> iterable = toTabletInsertionEvents();
final Iterator<TabletInsertionEvent> iterator = iterable.iterator();
int tabletEventCount = 0;
while (iterator.hasNext()) {
final TabletInsertionEvent parsedEvent = iterator.next();
tabletEventCount++;
int retryCount = 0;
while (true) {
// If failed due do insufficient memory, retry until success to avoid race among multiple
// processor threads
try {
final Iterable<TabletInsertionEvent> iterable = toTabletInsertionEvents();
final Iterator<TabletInsertionEvent> iterator = iterable.iterator();
while (iterator.hasNext()) {
final TabletInsertionEvent parsedEvent = iterator.next();
tabletEventCount++;
try {
consumer.consume((PipeRawTabletInsertionEvent) parsedEvent);
break;
} catch (final PipeRuntimeOutOfMemoryCriticalException e) {
if (retryCount++ % 100 == 0) {
LOGGER.warn(
DataNodePipeMessages.FAILED_TO_ALLOCATE_MEMORY_FOR_PARSING_TSFILE,
callerName,
getTsFile(),
tabletEventCount,
retryCount);
} else if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
DataNodePipeMessages.FAILED_TO_ALLOCATE_MEMORY_FOR_PARSING_TSFILE,
callerName,
getTsFile(),
tabletEventCount,
retryCount,
e);
}
releaseParsedTabletEvent(parsedEvent);
throw e;
}
}
} catch (final PipeRuntimeOutOfMemoryCriticalException e) {
close();
LOGGER.warn(
DataNodePipeMessages.FAILED_TO_ALLOCATE_MEMORY_FOR_PARSING_TSFILE,
callerName,

Check warning on line 730 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Extract this nested try block into a separate method.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8bkCfStSRAbF5ZBDRT&open=AZ8bkCfStSRAbF5ZBDRT&pullRequest=18051
getTsFile(),
tabletEventCount,
e);
throw e;
}
}

Check warning on line 737 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Either log this exception and handle it, or rethrow it with some contextual information.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8bkCfStSRAbF5ZBDRU&open=AZ8bkCfStSRAbF5ZBDRU&pullRequest=18051
private void releaseParsedTabletEvent(final TabletInsertionEvent parsedEvent) {
if (parsedEvent instanceof PipeRawTabletInsertionEvent
&& ((PipeRawTabletInsertionEvent) parsedEvent).getReferenceCount() == 0
&& !((PipeRawTabletInsertionEvent) parsedEvent).isReleased()) {
((PipeRawTabletInsertionEvent) parsedEvent).clearReferenceCount(getClass().getName());
}
}

Expand All @@ -748,7 +747,7 @@
public Iterable<TabletInsertionEvent> toTabletInsertionEvents() throws PipeException {
// 20 - 40 seconds for waiting
// Can not be unlimited or will cause deadlock
return toTabletInsertionEvents((long) ((1 + Math.random()) * 20 * 1000));

Check warning on line 750 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace this instanceof check and cast with 'instanceof PipeRawTabletInsertionEvent piperawtabletinsertionevent'

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8bkCfStSRAbF5ZBDRW&open=AZ8bkCfStSRAbF5ZBDRW&pullRequest=18051

Check warning on line 750 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace this instanceof check and cast with 'instanceof PipeRawTabletInsertionEvent piperawtabletinsertionevent'

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8bkCfStSRAbF5ZBDRV&open=AZ8bkCfStSRAbF5ZBDRV&pullRequest=18051

Check warning on line 750 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace this instanceof check and cast with 'instanceof PipeRawTabletInsertionEvent piperawtabletinsertionevent'

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8bkCfStSRAbF5ZBDRX&open=AZ8bkCfStSRAbF5ZBDRX&pullRequest=18051
}

public Iterable<TabletInsertionEvent> toTabletInsertionEvents(final long timeoutMs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertionEventTableParser;
Expand Down Expand Up @@ -109,9 +108,7 @@ protected TsFileInsertionEventParser(
this.sourceEvent = sourceEvent;

this.allocatedMemoryBlockForTablet =
PipeDataNodeResourceManager.memory()
.forceAllocateForTabletWithRetry(
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes());
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);

LOGGER.debug(
DataNodePipeMessages.TSFILE_HAS_INITIALIZED_PIPENAME_CREATION_TIME_PATTERN,
Expand Down Expand Up @@ -180,6 +177,13 @@ protected void recordTabletMetrics(final Tablet tablet) {
}
}

protected void releaseTabletMemoryBlock() {
if (allocatedMemoryBlockForTablet != null
&& allocatedMemoryBlockForTablet.getMemoryUsageInBytes() > 0) {
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForTablet, 0);
}
}

@Override
public void close() {
tabletInsertionIterable = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ public TabletInsertionEvent next() {
final Tablet tablet = tabletIterator.next();
// Record tablet metrics
recordTabletMetrics(tablet);
releaseTabletMemoryBlock();
final boolean isAligned =
deviceIsAlignedMap.getOrDefault(
IDeviceID.Factory.DEFAULT_FACTORY.create(tablet.getDeviceId()), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
Expand Down Expand Up @@ -144,12 +143,9 @@
filter = Objects.nonNull(timeFilterExpression) ? timeFilterExpression.getFilter() : null;

this.allocatedMemoryBlockForBatchData =
PipeDataNodeResourceManager.memory()
.forceAllocateForTabletWithRetry(
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes());
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
this.allocatedMemoryBlockForChunk =
PipeDataNodeResourceManager.memory()
.forceAllocateForTabletWithRetry(PipeConfig.getInstance().getPipeMaxReaderChunkSize());
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);

try {
currentModifications =
Expand Down Expand Up @@ -235,6 +231,7 @@
final Tablet tablet = getNextTablet();
// Record tablet metrics
recordTabletMetrics(tablet);
releaseTabletMemoryBlock();
final boolean isLast = isLastTabletWithoutDeferredException();
try {
return sourceEvent == null
Expand Down Expand Up @@ -449,7 +446,7 @@
}
}

private boolean putValueToColumns(final BatchData data, final Tablet tablet, final int rowIndex) {

Check warning on line 449 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 134 to 64, Complexity from 34 to 14, Nesting Level from 4 to 2, Number of Variables from 8 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8Rr3kEfwQ4mSZzoAMB&open=AZ8Rr3kEfwQ4mSZzoAMB&pullRequest=18051
boolean isNeedFillTime = false;
if (data.getDataType() == TSDataType.VECTOR) {
for (int i = 0; i < tablet.getSchemas().size(); ++i) {
Expand Down Expand Up @@ -586,7 +583,7 @@
return isNeedFillTime;
}

private void moveToNextChunkReader()

Check warning on line 586 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 123 to 64, Complexity from 25 to 14, Nesting Level from 4 to 2, Number of Variables from 12 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8Rr3kEfwQ4mSZzoAMC&open=AZ8Rr3kEfwQ4mSZzoAMC&pullRequest=18051
throws IOException, IllegalStateException, IllegalPathException {
ChunkHeader chunkHeader;
currentMeasurements.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@
import org.apache.iotdb.commons.audit.IAuditEntity;
import org.apache.iotdb.commons.exception.auth.AccessDeniedException;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.queryengine.plan.relational.metadata.QualifiedObjectName;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
Expand Down Expand Up @@ -94,25 +92,14 @@ public TsFileInsertionEventTableParser(
allocatedMemoryBlockForModifications =
PipeDataNodeResourceManager.memory()
.forceAllocateForTabletWithRetry(currentModifications.ramBytesUsed());
long tableSize =
Math.min(
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(),
IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize());

this.allocatedMemoryBlockForChunk =
PipeDataNodeResourceManager.memory()
.forceAllocateForTabletWithRetry(
PipeConfig.getInstance().getPipeMaxReaderChunkSize());
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
this.allocatedMemoryBlockForBatchData =
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(tableSize);
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
this.allocatedMemoryBlockForChunkMeta =
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(tableSize);
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
this.allocatedMemoryBlockForTableSchemas =
PipeDataNodeResourceManager.memory()
.forceAllocateForTabletWithRetry(
IoTDBDescriptor.getInstance()
.getConfig()
.getPipeDataStructureTabletSizeInBytes());
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);

this.startTime = startTime;
this.endTime = endTime;
Expand Down Expand Up @@ -239,6 +226,7 @@ && hasTablePrivilege(entry.getKey()),

final Tablet tablet = tabletIterator.next();
recordTabletMetrics(tablet);
releaseTabletMemoryBlock();
if (!PipeRawTabletInsertionEvent.isTabletEmpty(tablet)) {
bufferedTablet = tablet;
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
Expand Down Expand Up @@ -424,7 +425,7 @@
}
}

private Map<String, Pair<Long, ByteBuffer>> processRow(

Check warning on line 428 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 91 to 64, Complexity from 17 to 14, Nesting Level from 4 to 2, Number of Variables from 12 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8bkCf0tSRAbF5ZBDRZ&open=AZ8bkCf0tSRAbF5ZBDRZ&pullRequest=18051
final Row row, final RowCollector rowCollector, final AtomicReference<Exception> exception) {
final Map<String, Pair<Long, ByteBuffer>> resultMap = new HashMap<>();

Expand Down Expand Up @@ -538,6 +539,8 @@
event -> {
try {
process(event, eventCollector);
} catch (PipeRuntimeOutOfMemoryCriticalException e) {
throw e;
} catch (Exception e) {
ex.set(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.processor.downsampling;

import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
Expand Down Expand Up @@ -145,7 +146,7 @@
* usage.
*/
@Override
public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector)

Check failure on line 149 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 19 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8bkCgLtSRAbF5ZBDRa&open=AZ8bkCgLtSRAbF5ZBDRa&pullRequest=18051
throws Exception {
if (shouldSplitFile) {
try {
Expand All @@ -156,6 +157,8 @@
event -> {
try {
process(event, eventCollector);
} catch (PipeRuntimeOutOfMemoryCriticalException e) {
throw e;
} catch (Exception e) {
ex.set(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,12 +329,6 @@ public synchronized void resize(
}

final long oldSize = block.getMemoryUsageInBytes();
if (oldSize == 0) {
// If the memory block is not registered, we need to register it first.
// Otherwise, the memory usage will be inconsistent.
// See registerMemoryBlock for more details.
allocatedBlocks.add(block);
}

if (oldSize >= targetSize) {
memoryBlock.release(oldSize - targetSize);
Expand All @@ -350,6 +344,8 @@ public synchronized void resize(
if (targetSize == 0) {
allocatedBlocks.remove(block);
}

this.notifyAll();
return;
}

Expand All @@ -359,6 +355,12 @@ public synchronized void resize(
if (getTotalNonFloatingMemorySizeInBytes() - memoryBlock.getUsedMemoryInBytes()
>= sizeInBytes) {
memoryBlock.forceAllocateWithoutLimitation(sizeInBytes);
if (oldSize == 0) {
// If the memory block is not registered, we need to register it first.
// Otherwise, the memory usage will be inconsistent.
// See registerMemoryBlock for more details.
allocatedBlocks.add(block);
}
if (block instanceof PipeTabletMemoryBlock) {
usedMemorySizeInBytesOfTablets += sizeInBytes;
}
Expand Down Expand Up @@ -495,6 +497,9 @@ public synchronized boolean tryAllocate(
if (getTotalNonFloatingMemorySizeInBytes() - memoryBlock.getUsedMemoryInBytes()
>= memoryInBytesNeededToBeAllocated) {
memoryBlock.forceAllocateWithoutLimitation(memoryInBytesNeededToBeAllocated);
if (block.getMemoryUsageInBytes() == 0) {
allocatedBlocks.add(block);
}
if (block instanceof PipeTabletMemoryBlock) {
usedMemorySizeInBytesOfTablets += memoryInBytesNeededToBeAllocated;
}
Expand Down
Loading
Loading