Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.tsfile.write.schema.IMeasurementSchema;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

import java.util.Map;
Expand Down Expand Up @@ -115,16 +116,33 @@ int initOrInvalidateLastCache(final String[] measurements, final boolean isInval

int tryUpdateLastCache(
final String[] measurements, final TimeValuePair[] timeValuePairs, boolean invalidateNull) {
return tryUpdateLastCache(measurements, null, timeValuePairs, invalidateNull);
}

int tryUpdateLastCache(
final String[] measurements,
final @Nullable IMeasurementSchema[] measurementSchemas,
final TimeValuePair[] timeValuePairs,
boolean invalidateNull) {
final DeviceLastCache cache = lastCache.get();
final int result =
Objects.nonNull(cache) ? cache.tryUpdate(measurements, timeValuePairs, invalidateNull) : 0;
Objects.nonNull(cache)
? cache.tryUpdate(measurements, measurementSchemas, timeValuePairs, invalidateNull)
: 0;
return Objects.nonNull(lastCache.get()) ? result : 0;
}

int tryUpdateLastCache(final String[] measurements, final TimeValuePair[] timeValuePairs) {
return tryUpdateLastCache(measurements, timeValuePairs, false);
}

int tryUpdateLastCache(
final String[] measurements,
final @Nullable IMeasurementSchema[] measurementSchemas,
final TimeValuePair[] timeValuePairs) {
return tryUpdateLastCache(measurements, measurementSchemas, timeValuePairs, false);
}

int invalidateLastCache(final String measurement) {
final DeviceLastCache cache = lastCache.get();
final int result = Objects.nonNull(cache) ? cache.invalidate(measurement) : 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.TsPrimitiveType;
import org.apache.tsfile.write.schema.IMeasurementSchema;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -122,15 +123,27 @@ int tryUpdate(
final @Nonnull String[] measurements,
final @Nonnull TimeValuePair[] timeValuePairs,
final boolean invalidateNull) {
return tryUpdate(measurements, null, timeValuePairs, invalidateNull);
}

int tryUpdate(
final @Nonnull String[] measurements,
final @Nullable IMeasurementSchema[] measurementSchemas,
final @Nonnull TimeValuePair[] timeValuePairs,
final boolean invalidateNull) {
final AtomicInteger diff = new AtomicInteger(0);
long lastTime = Long.MIN_VALUE;

for (int i = 0; i < measurements.length; ++i) {
final String measurement = getRawMeasurement(measurements, measurementSchemas, i);
if (Objects.isNull(measurement)) {
continue;
}
if (Objects.isNull(timeValuePairs[i])) {
if (invalidateNull) {
diff.addAndGet(
-((int) RamUsageEstimator.sizeOf(measurements[i])
+ getTvPairEntrySize(measurement2CachedLastMap.remove(measurements[i]))));
-((int) RamUsageEstimator.sizeOf(measurement)
+ getTvPairEntrySize(measurement2CachedLastMap.remove(measurement))));
}
continue;
}
Expand All @@ -140,8 +153,8 @@ int tryUpdate(
lastTime = timeValuePairs[i].getTimestamp();
}
measurement2CachedLastMap.computeIfPresent(
measurements[i],
(measurement, tvPair) -> {
measurement,
(measurementName, tvPair) -> {
if (tvPair.getTimestamp() <= timeValuePairs[finalI].getTimestamp()) {
diff.addAndGet(getDiffSize(tvPair, timeValuePairs[finalI]));
return timeValuePairs[finalI];
Expand All @@ -159,6 +172,21 @@ int tryUpdate(
return diff.get();
}

@Nullable
private static String getRawMeasurement(
final @Nonnull String[] measurements,
final @Nullable IMeasurementSchema[] measurementSchemas,
final int index) {
if (Objects.isNull(measurements[index])) {
return null;
}
return Objects.nonNull(measurementSchemas)
&& index < measurementSchemas.length
&& Objects.nonNull(measurementSchemas[index])
? measurementSchemas[index].getMeasurementId()
: measurements[index];
}

@GuardedBy("DataRegionInsertLock#writeLock")
int invalidate(final String measurement) {
final AtomicInteger diff = new AtomicInteger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,18 @@ public int update(final String[] measurements, final IMeasurementSchema[] schema
final int length = measurements.length;

for (int i = 0; i < length; ++i) {
final String inputMeasurement = measurements[i];
if (Objects.isNull(inputMeasurement)) {
continue;
}
final IMeasurementSchema schema = i < schemas.length ? schemas[i] : null;
final String measurement =
Objects.nonNull(schema) ? schema.getMeasurementId() : inputMeasurement;
// Skip this to avoid instance creation/gc for writing performance
if (measurements[i] == null || measurementMap.containsKey(measurements[i])) {
if (schema == null || measurementMap.containsKey(measurement)) {
continue;
}
diff += putEntry(measurements[i], schemas[i], null);
diff += putEntry(measurement, schema, null);
}
return diff;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ void updateLastCache(
: entry ->
entry.setMeasurementSchema(
database2Use, isAligned, measurements, measurementSchemas)
+ entry.tryUpdateLastCache(measurements, timeValuePairs),
+ entry.tryUpdateLastCache(measurements, measurementSchemas, timeValuePairs),
Objects.isNull(timeValuePairs));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,13 +399,19 @@ public boolean allMeasurementFailed() {
public String[] getRawMeasurements() {
String[] measurements = getMeasurements();
MeasurementSchema[] measurementSchemas = getMeasurementSchemas();
String[] rawMeasurements = new String[measurements.length];
String[] rawMeasurements = measurements;
for (int i = 0; i < measurements.length; i++) {
if (measurementSchemas[i] != null) {
if (measurementSchemas != null
&& i < measurementSchemas.length
&& measurementSchemas[i] != null) {
// get raw measurement rather than alias
rawMeasurements[i] = measurementSchemas[i].getMeasurementId();
} else {
rawMeasurements[i] = measurements[i];
String rawMeasurement = measurementSchemas[i].getMeasurementId();
if (!Objects.equals(rawMeasurement, measurements[i])) {
if (rawMeasurements == measurements) {
rawMeasurements = Arrays.copyOf(measurements, measurements.length);
}
rawMeasurements[i] = rawMeasurement;
}
}
}
return rawMeasurements;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -863,18 +863,12 @@ public TimeValuePair composeTimeValuePair(int columnIndex) {
}

public void updateLastCache(String databaseName) {
String[] rawMeasurements = getRawMeasurements();
TimeValuePair[] timeValuePairs = new TimeValuePair[rawMeasurements.length];
for (int i = 0; i < rawMeasurements.length; i++) {
TimeValuePair[] timeValuePairs = new TimeValuePair[measurements.length];
for (int i = 0; i < measurements.length; i++) {
timeValuePairs[i] = composeTimeValuePair(i);
}
DataNodeSchemaCache.getInstance()
.updateLastCacheIfExists(
databaseName,
devicePath,
rawMeasurements,
timeValuePairs,
isAligned,
measurementSchemas);
databaseName, devicePath, measurements, timeValuePairs, isAligned, measurementSchemas);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1191,18 +1191,12 @@ public void updateLastCache(final String databaseName) {
}

public void updateLastCache(final String databaseName, final TSStatus[] results) {
final String[] rawMeasurements = getRawMeasurements();
final TimeValuePair[] timeValuePairs = new TimeValuePair[rawMeasurements.length];
for (int i = 0; i < rawMeasurements.length; i++) {
final TimeValuePair[] timeValuePairs = new TimeValuePair[measurements.length];
for (int i = 0; i < measurements.length; i++) {
timeValuePairs[i] = composeLastTimeValuePair(i, results, 0, rowCount);
}
DataNodeSchemaCache.getInstance()
.updateLastCacheIfExists(
databaseName,
devicePath,
rawMeasurements,
timeValuePairs,
isAligned,
measurementSchemas);
databaseName, devicePath, measurements, timeValuePairs, isAligned, measurementSchemas);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache;
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.SchemaCacheEntry;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
import org.apache.iotdb.db.schemaengine.template.Template;

Expand Down Expand Up @@ -267,6 +269,43 @@ public void testUpdateLastCache() throws IllegalPathException {
Assert.assertEquals(0, dataNodeSchemaCache.getDeviceSchemaCache().getMemoryUsage());
}

@Test
public void testUpdateLastCacheWithAliasDoesNotCopyMeasurements() throws IllegalPathException {
final String database = "root.db";
final PartialPath device = new PartialPath("root.db.d_alias");
final MeasurementSchema s1 = new MeasurementSchema("s1", TSDataType.INT32);
final MeasurementPath s1Path = new MeasurementPath(device.concatNode("s1"), s1);

dataNodeSchemaCache.declareLastCache(database, s1Path);

final InsertRowNode insertRowNode =
new InsertRowNode(
new PlanNodeId("testUpdateLastCacheWithAliasDoesNotCopyMeasurements"),
device,
false,
new String[] {"alias"},
new TSDataType[] {TSDataType.INT32},
new MeasurementSchema[] {s1},
1L,
new Object[] {1},
false) {
@Override
public String[] getRawMeasurements() {
throw new AssertionError("Last cache update should not copy raw measurements");
}
};

insertRowNode.updateLastCache(database);

Assert.assertEquals(
new TimeValuePair(1L, new TsPrimitiveType.TsInt(1)),
dataNodeSchemaCache.getLastCache(new MeasurementPath(device.concatNode("s1"), s1)));
Assert.assertNull(
dataNodeSchemaCache.getLastCache(
new MeasurementPath(
device.concatNode("alias"), new MeasurementSchema("alias", TSDataType.INT32))));
}

@Test
public void testInvalidateLastCacheByWildcardDevicePath() throws IllegalPathException {
final MeasurementSchema s0 = new MeasurementSchema("s0", TSDataType.INT32);
Expand Down
Loading