diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceCacheEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceCacheEntry.java index ecd4c297e5729..976888959aa53 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceCacheEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceCacheEntry.java @@ -26,6 +26,7 @@ import org.apache.tsfile.write.schema.IMeasurementSchema; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import java.util.Map; @@ -115,9 +116,19 @@ int initOrInvalidateLastCache(final String[] measurements, final boolean isInval int tryUpdateLastCache( final String[] measurements, final TimeValuePair[] timeValuePairs, boolean invalidateNull) { + return tryUpdateLastCache(measurements, null, timeValuePairs, invalidateNull); + } + + int tryUpdateLastCache( + final String[] measurements, + final @Nullable IMeasurementSchema[] measurementSchemas, + final TimeValuePair[] timeValuePairs, + boolean invalidateNull) { final DeviceLastCache cache = lastCache.get(); final int result = - Objects.nonNull(cache) ? cache.tryUpdate(measurements, timeValuePairs, invalidateNull) : 0; + Objects.nonNull(cache) + ? cache.tryUpdate(measurements, measurementSchemas, timeValuePairs, invalidateNull) + : 0; return Objects.nonNull(lastCache.get()) ? result : 0; } @@ -125,6 +136,13 @@ int tryUpdateLastCache(final String[] measurements, final TimeValuePair[] timeVa return tryUpdateLastCache(measurements, timeValuePairs, false); } + int tryUpdateLastCache( + final String[] measurements, + final @Nullable IMeasurementSchema[] measurementSchemas, + final TimeValuePair[] timeValuePairs) { + return tryUpdateLastCache(measurements, measurementSchemas, timeValuePairs, false); + } + int invalidateLastCache(final String measurement) { final DeviceLastCache cache = lastCache.get(); final int result = Objects.nonNull(cache) ? cache.invalidate(measurement) : 0; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceLastCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceLastCache.java index 9ebc1fc413cb1..78e6cdfb20bd9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceLastCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceLastCache.java @@ -23,6 +23,7 @@ import org.apache.tsfile.read.TimeValuePair; import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.TsPrimitiveType; +import org.apache.tsfile.write.schema.IMeasurementSchema; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -122,15 +123,27 @@ int tryUpdate( final @Nonnull String[] measurements, final @Nonnull TimeValuePair[] timeValuePairs, final boolean invalidateNull) { + return tryUpdate(measurements, null, timeValuePairs, invalidateNull); + } + + int tryUpdate( + final @Nonnull String[] measurements, + final @Nullable IMeasurementSchema[] measurementSchemas, + final @Nonnull TimeValuePair[] timeValuePairs, + final boolean invalidateNull) { final AtomicInteger diff = new AtomicInteger(0); long lastTime = Long.MIN_VALUE; for (int i = 0; i < measurements.length; ++i) { + final String measurement = getRawMeasurement(measurements, measurementSchemas, i); + if (Objects.isNull(measurement)) { + continue; + } if (Objects.isNull(timeValuePairs[i])) { if (invalidateNull) { diff.addAndGet( - -((int) RamUsageEstimator.sizeOf(measurements[i]) - + getTvPairEntrySize(measurement2CachedLastMap.remove(measurements[i])))); + -((int) RamUsageEstimator.sizeOf(measurement) + + getTvPairEntrySize(measurement2CachedLastMap.remove(measurement)))); } continue; } @@ -140,8 +153,8 @@ int tryUpdate( lastTime = timeValuePairs[i].getTimestamp(); } measurement2CachedLastMap.computeIfPresent( - measurements[i], - (measurement, tvPair) -> { + measurement, + (measurementName, tvPair) -> { if (tvPair.getTimestamp() <= timeValuePairs[finalI].getTimestamp()) { diff.addAndGet(getDiffSize(tvPair, timeValuePairs[finalI])); return timeValuePairs[finalI]; @@ -159,6 +172,21 @@ int tryUpdate( return diff.get(); } + @Nullable + private static String getRawMeasurement( + final @Nonnull String[] measurements, + final @Nullable IMeasurementSchema[] measurementSchemas, + final int index) { + if (Objects.isNull(measurements[index])) { + return null; + } + return Objects.nonNull(measurementSchemas) + && index < measurementSchemas.length + && Objects.nonNull(measurementSchemas[index]) + ? measurementSchemas[index].getMeasurementId() + : measurements[index]; + } + @GuardedBy("DataRegionInsertLock#writeLock") int invalidate(final String measurement) { final AtomicInteger diff = new AtomicInteger(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceNormalSchema.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceNormalSchema.java index 6d50905bbcac3..fece095f54ac1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceNormalSchema.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceNormalSchema.java @@ -66,11 +66,18 @@ public int update(final String[] measurements, final IMeasurementSchema[] schema final int length = measurements.length; for (int i = 0; i < length; ++i) { + final String inputMeasurement = measurements[i]; + if (Objects.isNull(inputMeasurement)) { + continue; + } + final IMeasurementSchema schema = i < schemas.length ? schemas[i] : null; + final String measurement = + Objects.nonNull(schema) ? schema.getMeasurementId() : inputMeasurement; // Skip this to avoid instance creation/gc for writing performance - if (measurements[i] == null || measurementMap.containsKey(measurements[i])) { + if (schema == null || measurementMap.containsKey(measurement)) { continue; } - diff += putEntry(measurements[i], schemas[i], null); + diff += putEntry(measurement, schema, null); } return diff; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceSchemaCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceSchemaCache.java index 9bcb020b72fc8..d9ce4982447a9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceSchemaCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceSchemaCache.java @@ -143,7 +143,7 @@ void updateLastCache( : entry -> entry.setMeasurementSchema( database2Use, isAligned, measurements, measurementSchemas) - + entry.tryUpdateLastCache(measurements, timeValuePairs), + + entry.tryUpdateLastCache(measurements, measurementSchemas, timeValuePairs), Objects.isNull(timeValuePairs)); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java index aee5cc1ec9384..29651e9455c5a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java @@ -399,13 +399,19 @@ public boolean allMeasurementFailed() { public String[] getRawMeasurements() { String[] measurements = getMeasurements(); MeasurementSchema[] measurementSchemas = getMeasurementSchemas(); - String[] rawMeasurements = new String[measurements.length]; + String[] rawMeasurements = measurements; for (int i = 0; i < measurements.length; i++) { - if (measurementSchemas[i] != null) { + if (measurementSchemas != null + && i < measurementSchemas.length + && measurementSchemas[i] != null) { // get raw measurement rather than alias - rawMeasurements[i] = measurementSchemas[i].getMeasurementId(); - } else { - rawMeasurements[i] = measurements[i]; + String rawMeasurement = measurementSchemas[i].getMeasurementId(); + if (!Objects.equals(rawMeasurement, measurements[i])) { + if (rawMeasurements == measurements) { + rawMeasurements = Arrays.copyOf(measurements, measurements.length); + } + rawMeasurements[i] = rawMeasurement; + } } } return rawMeasurements; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java index 23c17808d493c..19d4ee6da0a71 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java @@ -863,18 +863,12 @@ public TimeValuePair composeTimeValuePair(int columnIndex) { } public void updateLastCache(String databaseName) { - String[] rawMeasurements = getRawMeasurements(); - TimeValuePair[] timeValuePairs = new TimeValuePair[rawMeasurements.length]; - for (int i = 0; i < rawMeasurements.length; i++) { + TimeValuePair[] timeValuePairs = new TimeValuePair[measurements.length]; + for (int i = 0; i < measurements.length; i++) { timeValuePairs[i] = composeTimeValuePair(i); } DataNodeSchemaCache.getInstance() .updateLastCacheIfExists( - databaseName, - devicePath, - rawMeasurements, - timeValuePairs, - isAligned, - measurementSchemas); + databaseName, devicePath, measurements, timeValuePairs, isAligned, measurementSchemas); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java index 472c1e79d7e47..9440baf580c14 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java @@ -1191,18 +1191,12 @@ public void updateLastCache(final String databaseName) { } public void updateLastCache(final String databaseName, final TSStatus[] results) { - final String[] rawMeasurements = getRawMeasurements(); - final TimeValuePair[] timeValuePairs = new TimeValuePair[rawMeasurements.length]; - for (int i = 0; i < rawMeasurements.length; i++) { + final TimeValuePair[] timeValuePairs = new TimeValuePair[measurements.length]; + for (int i = 0; i < measurements.length; i++) { timeValuePairs[i] = composeLastTimeValuePair(i, results, 0, rowCount); } DataNodeSchemaCache.getInstance() .updateLastCacheIfExists( - databaseName, - devicePath, - rawMeasurements, - timeValuePairs, - isAligned, - measurementSchemas); + databaseName, devicePath, measurements, timeValuePairs, isAligned, measurementSchemas); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java index bf15d216e67cb..57d70853be3e5 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java @@ -27,6 +27,8 @@ import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache; import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.SchemaCacheEntry; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager; import org.apache.iotdb.db.schemaengine.template.Template; @@ -267,6 +269,43 @@ public void testUpdateLastCache() throws IllegalPathException { Assert.assertEquals(0, dataNodeSchemaCache.getDeviceSchemaCache().getMemoryUsage()); } + @Test + public void testUpdateLastCacheWithAliasDoesNotCopyMeasurements() throws IllegalPathException { + final String database = "root.db"; + final PartialPath device = new PartialPath("root.db.d_alias"); + final MeasurementSchema s1 = new MeasurementSchema("s1", TSDataType.INT32); + final MeasurementPath s1Path = new MeasurementPath(device.concatNode("s1"), s1); + + dataNodeSchemaCache.declareLastCache(database, s1Path); + + final InsertRowNode insertRowNode = + new InsertRowNode( + new PlanNodeId("testUpdateLastCacheWithAliasDoesNotCopyMeasurements"), + device, + false, + new String[] {"alias"}, + new TSDataType[] {TSDataType.INT32}, + new MeasurementSchema[] {s1}, + 1L, + new Object[] {1}, + false) { + @Override + public String[] getRawMeasurements() { + throw new AssertionError("Last cache update should not copy raw measurements"); + } + }; + + insertRowNode.updateLastCache(database); + + Assert.assertEquals( + new TimeValuePair(1L, new TsPrimitiveType.TsInt(1)), + dataNodeSchemaCache.getLastCache(new MeasurementPath(device.concatNode("s1"), s1))); + Assert.assertNull( + dataNodeSchemaCache.getLastCache( + new MeasurementPath( + device.concatNode("alias"), new MeasurementSchema("alias", TSDataType.INT32)))); + } + @Test public void testInvalidateLastCacheByWildcardDevicePath() throws IllegalPathException { final MeasurementSchema s0 = new MeasurementSchema("s0", TSDataType.INT32);