Skip to content

Commit 8785a25

Browse files
committed
Add vectorized null-suppression for block serde
1 parent 091ed9d commit 8785a25

File tree

17 files changed

+501
-52
lines changed

17 files changed

+501
-52
lines changed

core/trino-main/src/main/java/io/trino/metadata/BlockEncodingManager.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
*/
1414
package io.trino.metadata;
1515

16+
import com.google.inject.Inject;
17+
import io.trino.simd.BlockEncodingSimdSupport;
18+
import io.trino.spi.SimdSupport;
1619
import io.trino.spi.block.ArrayBlockEncoding;
1720
import io.trino.spi.block.Block;
1821
import io.trino.spi.block.BlockEncoding;
@@ -32,6 +35,7 @@
3235
import java.util.concurrent.ConcurrentHashMap;
3336

3437
import static com.google.common.base.Preconditions.checkArgument;
38+
import static io.trino.simd.BlockEncodingSimdSupport.TESTING_BLOCK_ENCODING_SIMD_SUPPORT;
3539
import static java.util.Objects.requireNonNull;
3640

3741
public final class BlockEncodingManager
@@ -41,14 +45,19 @@ public final class BlockEncodingManager
4145
// for serialization
4246
private final Map<Class<? extends Block>, BlockEncoding> blockEncodingNamesByClass = new ConcurrentHashMap<>();
4347

44-
public BlockEncodingManager()
48+
public static final BlockEncodingManager TESTING_BLOCK_ENCODING_MANAGER = new BlockEncodingManager(TESTING_BLOCK_ENCODING_SIMD_SUPPORT);
49+
50+
@Inject
51+
public BlockEncodingManager(
52+
BlockEncodingSimdSupport blockEncodingSimdSupport)
4553
{
4654
// add the built-in BlockEncodings
55+
SimdSupport simdSupport = blockEncodingSimdSupport.getSimdSupport();
4756
addBlockEncoding(new VariableWidthBlockEncoding());
48-
addBlockEncoding(new ByteArrayBlockEncoding());
49-
addBlockEncoding(new ShortArrayBlockEncoding());
50-
addBlockEncoding(new IntArrayBlockEncoding());
51-
addBlockEncoding(new LongArrayBlockEncoding());
57+
addBlockEncoding(new ByteArrayBlockEncoding(simdSupport.expandAndCompressByte()));
58+
addBlockEncoding(new ShortArrayBlockEncoding(simdSupport.expandAndCompressShort()));
59+
addBlockEncoding(new IntArrayBlockEncoding(simdSupport.expandAndCompressInt()));
60+
addBlockEncoding(new LongArrayBlockEncoding(simdSupport.expandAndCompressLong()));
5261
addBlockEncoding(new Fixed12BlockEncoding());
5362
addBlockEncoding(new Int128ArrayBlockEncoding());
5463
addBlockEncoding(new DictionaryBlockEncoding());

core/trino-main/src/main/java/io/trino/simd/BlockEncodingSimdSupport.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ public final class BlockEncodingSimdSupport
3838
private final SimdSupport simdSupport;
3939
private static final SimdSupport AUTO_DETECTED_SUPPORT = detectSimd();
4040

41+
public static final BlockEncodingSimdSupport TESTING_BLOCK_ENCODING_SIMD_SUPPORT = new BlockEncodingSimdSupport(new FeaturesConfig().setBlockSerdeVectorizedNullSuppressionStrategy(AUTO));
42+
4143
@Inject
4244
public BlockEncodingSimdSupport(
4345
FeaturesConfig featuresConfig)
@@ -93,4 +95,9 @@ enum X86SimdInstructionSet {
9395
(IntVector.SPECIES_PREFERRED.vectorBitSize() >= MINIMUM_SIMD_LENGTH) && (x86Flags.contains(X86SimdInstructionSet.avx512f)),
9496
(LongVector.SPECIES_PREFERRED.vectorBitSize() >= MINIMUM_SIMD_LENGTH) && (x86Flags.contains(X86SimdInstructionSet.avx512f)));
9597
}
98+
99+
public SimdSupport getSimdSupport()
100+
{
101+
return simdSupport;
102+
}
96103
}

core/trino-main/src/main/java/io/trino/testing/PlanTester.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@
8080
import io.trino.memory.MemoryManagerConfig;
8181
import io.trino.memory.NodeMemoryConfig;
8282
import io.trino.metadata.AnalyzePropertyManager;
83-
import io.trino.metadata.BlockEncodingManager;
8483
import io.trino.metadata.CatalogManager;
8584
import io.trino.metadata.ColumnPropertyManager;
8685
import io.trino.metadata.DisabledSystemSecurityMetadata;
@@ -252,6 +251,7 @@
252251
import static io.trino.execution.ParameterExtractor.bindParameters;
253252
import static io.trino.execution.querystats.PlanOptimizersStatsCollector.createPlanOptimizersStatsCollector;
254253
import static io.trino.execution.warnings.WarningCollector.NOOP;
254+
import static io.trino.metadata.BlockEncodingManager.TESTING_BLOCK_ENCODING_MANAGER;
255255
import static io.trino.node.TestingInternalNodeManager.CURRENT_NODE;
256256
import static io.trino.spi.connector.Constraint.alwaysTrue;
257257
import static io.trino.spi.connector.DynamicFilter.EMPTY;
@@ -358,10 +358,9 @@ private PlanTester(Session defaultSession, int nodeCountForStats)
358358
catalogManager,
359359
notificationExecutor);
360360

361-
BlockEncodingManager blockEncodingManager = new BlockEncodingManager();
362361
TypeRegistry typeRegistry = new TypeRegistry(typeOperators, new FeaturesConfig());
363362
TypeManager typeManager = new InternalTypeManager(typeRegistry);
364-
InternalBlockEncodingSerde blockEncodingSerde = new InternalBlockEncodingSerde(blockEncodingManager, typeManager);
363+
InternalBlockEncodingSerde blockEncodingSerde = new InternalBlockEncodingSerde(TESTING_BLOCK_ENCODING_MANAGER, typeManager);
365364
SecretsResolver secretsResolver = new SecretsResolver(ImmutableMap.of());
366365

367366
this.globalFunctionCatalog = new GlobalFunctionCatalog(
@@ -496,7 +495,7 @@ private PlanTester(Session defaultSession, int nodeCountForStats)
496495
new GroupProviderManager(secretsResolver),
497496
new SessionPropertyDefaults(nodeInfo, accessControl, secretsResolver),
498497
typeRegistry,
499-
blockEncodingManager,
498+
TESTING_BLOCK_ENCODING_MANAGER,
500499
new HandleResolver(),
501500
exchangeManagerRegistry,
502501
spoolingManagerRegistry);

core/trino-main/src/test/java/io/trino/execution/buffer/TestPagesSerde.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import io.airlift.slice.SliceInput;
2020
import io.airlift.slice.SliceOutput;
2121
import io.airlift.slice.Slices;
22-
import io.trino.metadata.BlockEncodingManager;
2322
import io.trino.metadata.InternalBlockEncodingSerde;
2423
import io.trino.spi.Page;
2524
import io.trino.spi.PageBuilder;
@@ -47,6 +46,7 @@
4746
import static io.trino.execution.buffer.CompressionCodec.NONE;
4847
import static io.trino.execution.buffer.PagesSerdeUtil.readPages;
4948
import static io.trino.execution.buffer.PagesSerdeUtil.writePages;
49+
import static io.trino.metadata.BlockEncodingManager.TESTING_BLOCK_ENCODING_MANAGER;
5050
import static io.trino.operator.PageAssertions.assertPageEquals;
5151
import static io.trino.spi.type.BigintType.BIGINT;
5252
import static io.trino.spi.type.DoubleType.DOUBLE;
@@ -66,7 +66,7 @@ public class TestPagesSerde
6666
@BeforeAll
6767
public void setup()
6868
{
69-
blockEncodingSerde = new InternalBlockEncodingSerde(new BlockEncodingManager(), TESTING_TYPE_MANAGER);
69+
blockEncodingSerde = new InternalBlockEncodingSerde(TESTING_BLOCK_ENCODING_MANAGER, TESTING_TYPE_MANAGER);
7070
}
7171

7272
@AfterAll

core/trino-main/src/test/java/io/trino/execution/buffer/TestingPagesSerdes.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,17 @@
1313
*/
1414
package io.trino.execution.buffer;
1515

16-
import io.trino.metadata.BlockEncodingManager;
1716
import io.trino.metadata.InternalBlockEncodingSerde;
1817

1918
import static io.trino.execution.buffer.CompressionCodec.NONE;
19+
import static io.trino.metadata.BlockEncodingManager.TESTING_BLOCK_ENCODING_MANAGER;
2020
import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER;
2121

2222
public final class TestingPagesSerdes
2323
{
2424
private TestingPagesSerdes() {}
2525

26-
private static final InternalBlockEncodingSerde BLOCK_ENCODING_SERDE = new InternalBlockEncodingSerde(new BlockEncodingManager(), TESTING_TYPE_MANAGER);
26+
private static final InternalBlockEncodingSerde BLOCK_ENCODING_SERDE = new InternalBlockEncodingSerde(TESTING_BLOCK_ENCODING_MANAGER, TESTING_TYPE_MANAGER);
2727

2828
public static PagesSerdeFactory createTestingPagesSerdeFactory()
2929
{

core/trino-main/src/test/java/io/trino/metadata/TestMetadataManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Set;
2929

3030
import static io.trino.client.NodeVersion.UNKNOWN;
31+
import static io.trino.metadata.BlockEncodingManager.TESTING_BLOCK_ENCODING_MANAGER;
3132
import static io.trino.metadata.CatalogManager.NO_CATALOGS;
3233
import static io.trino.transaction.InMemoryTransactionManager.createTestTransactionManager;
3334
import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER;
@@ -98,7 +99,7 @@ public MetadataManager build()
9899
}
99100

100101
if (languageFunctionManager == null) {
101-
BlockEncodingSerde blockEncodingSerde = new InternalBlockEncodingSerde(new BlockEncodingManager(), typeManager);
102+
BlockEncodingSerde blockEncodingSerde = new InternalBlockEncodingSerde(TESTING_BLOCK_ENCODING_MANAGER, typeManager);
102103
LanguageFunctionEngineManager engineManager = new LanguageFunctionEngineManager();
103104
languageFunctionManager = new LanguageFunctionManager(new SqlParser(), typeManager, _ -> ImmutableSet.of(), blockEncodingSerde, engineManager);
104105
}

core/trino-main/src/test/java/io/trino/server/remotetask/TestHttpRemoteTask.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import io.trino.server.FailTaskRequest;
6262
import io.trino.server.HttpRemoteTaskFactory;
6363
import io.trino.server.TaskUpdateRequest;
64+
import io.trino.simd.BlockEncodingSimdSupport;
6465
import io.trino.spi.ErrorCode;
6566
import io.trino.spi.QueryId;
6667
import io.trino.spi.block.Block;
@@ -676,6 +677,7 @@ public void configure(Binder binder)
676677
jsonCodecBinder(binder).bindJsonCodec(TaskUpdateRequest.class);
677678
jsonCodecBinder(binder).bindJsonCodec(FailTaskRequest.class);
678679

680+
binder.bind(BlockEncodingSimdSupport.class).toInstance(new BlockEncodingSimdSupport(true));
679681
binder.bind(TypeManager.class).toInstance(TESTING_TYPE_MANAGER);
680682
binder.bind(BlockEncodingManager.class).in(SINGLETON);
681683
binder.bind(BlockEncodingSerde.class).to(InternalBlockEncodingSerde.class).in(SINGLETON);

core/trino-main/src/test/java/io/trino/sql/planner/TestingPlannerContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import com.google.common.collect.ImmutableSet;
1717
import io.trino.FeaturesConfig;
1818
import io.trino.connector.CatalogServiceProvider;
19-
import io.trino.metadata.BlockEncodingManager;
2019
import io.trino.metadata.FunctionBundle;
2120
import io.trino.metadata.FunctionManager;
2221
import io.trino.metadata.GlobalFunctionCatalog;
@@ -51,6 +50,7 @@
5150
import static com.google.common.base.Preconditions.checkState;
5251
import static io.airlift.tracing.Tracing.noopTracer;
5352
import static io.trino.client.NodeVersion.UNKNOWN;
53+
import static io.trino.metadata.BlockEncodingManager.TESTING_BLOCK_ENCODING_MANAGER;
5454
import static java.util.Objects.requireNonNull;
5555

5656
public final class TestingPlannerContext
@@ -125,7 +125,7 @@ public PlannerContext build()
125125
globalFunctionCatalog.addFunctions(SystemFunctionBundle.create(featuresConfig, typeOperators, new BlockTypeOperators(typeOperators), UNKNOWN));
126126
functionBundles.forEach(globalFunctionCatalog::addFunctions);
127127

128-
BlockEncodingSerde blockEncodingSerde = new InternalBlockEncodingSerde(new BlockEncodingManager(), typeManager);
128+
BlockEncodingSerde blockEncodingSerde = new InternalBlockEncodingSerde(TESTING_BLOCK_ENCODING_MANAGER, typeManager);
129129

130130
LanguageFunctionManager languageFunctionManager = new LanguageFunctionManager(
131131
new SqlParser(),

core/trino-spi/pom.xml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,34 @@
319319
<old>method long[] io.trino.spi.PageSorter::sort(java.util.List&lt;io.trino.spi.type.Type&gt;, java.util.List&lt;io.trino.spi.Page&gt;, java.util.List&lt;java.lang.Integer&gt;, java.util.List&lt;io.trino.spi.connector.SortOrder&gt;, int)</old>
320320
<new>method java.util.Iterator&lt;io.trino.spi.Page&gt; io.trino.spi.PageSorter::sort(java.util.List&lt;io.trino.spi.type.Type&gt;, java.util.List&lt;io.trino.spi.Page&gt;, java.util.List&lt;java.lang.Integer&gt;, java.util.List&lt;io.trino.spi.connector.SortOrder&gt;, int)</new>
321321
</item>
322+
<item>
323+
<ignore>true</ignore>
324+
<code>java.method.numberOfParametersChanged</code>
325+
<old>method void io.trino.spi.block.ByteArrayBlockEncoding::&lt;init&gt;()</old>
326+
<new>method void io.trino.spi.block.ByteArrayBlockEncoding::&lt;init&gt;(boolean)</new>
327+
<justification>ByteArrayBlockEncoding need to accept a parameter to enable SIMD support</justification>
328+
</item>
329+
<item>
330+
<ignore>true</ignore>
331+
<code>java.method.numberOfParametersChanged</code>
332+
<old>method void io.trino.spi.block.IntArrayBlockEncoding::&lt;init&gt;()</old>
333+
<new>method void io.trino.spi.block.IntArrayBlockEncoding::&lt;init&gt;(boolean)</new>
334+
<justification>IntArrayBlockEncoding need to accept a parameter to enable SIMD support</justification>
335+
</item>
336+
<item>
337+
<ignore>true</ignore>
338+
<code>java.method.numberOfParametersChanged</code>
339+
<old>method void io.trino.spi.block.LongArrayBlockEncoding::&lt;init&gt;()</old>
340+
<new>method void io.trino.spi.block.LongArrayBlockEncoding::&lt;init&gt;(boolean)</new>
341+
<justification>LongArrayBlockEncoding need to accept a parameter to enable SIMD support</justification>
342+
</item>
343+
<item>
344+
<ignore>true</ignore>
345+
<code>java.method.numberOfParametersChanged</code>
346+
<old>method void io.trino.spi.block.ShortArrayBlockEncoding::&lt;init&gt;()</old>
347+
<new>method void io.trino.spi.block.ShortArrayBlockEncoding::&lt;init&gt;(boolean)</new>
348+
<justification>ShortArrayBlockEncoding need to accept a parameter to enable SIMD support</justification>
349+
</item>
322350
</differences>
323351
</revapi.differences>
324352
</analysisConfiguration>

core/trino-spi/src/main/java/io/trino/spi/block/ByteArrayBlockEncoding.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ public class ByteArrayBlockEncoding
2828
implements BlockEncoding
2929
{
3030
public static final String NAME = "BYTE_ARRAY";
31+
private final boolean enableVectorizedNullSuppression;
32+
33+
public ByteArrayBlockEncoding(boolean enableVectorizedNullSuppression)
34+
{
35+
this.enableVectorizedNullSuppression = enableVectorizedNullSuppression;
36+
}
3137

3238
@Override
3339
public String getName()
@@ -60,15 +66,12 @@ public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceO
6066
sliceOutput.writeBytes(rawValues, rawOffset, positionCount);
6167
}
6268
else {
63-
byte[] valuesWithoutNull = new byte[positionCount];
64-
int nonNullPositionCount = 0;
65-
for (int i = 0; i < positionCount; i++) {
66-
valuesWithoutNull[nonNullPositionCount] = rawValues[i + rawOffset];
67-
nonNullPositionCount += isNull[i + rawOffset] ? 0 : 1;
69+
if (enableVectorizedNullSuppression) {
70+
EncoderUtil.compressBytesWithNullsVectorized(sliceOutput, rawValues, isNull, rawOffset, positionCount);
71+
}
72+
else {
73+
EncoderUtil.compressBytesWithNullsScalar(sliceOutput, rawValues, isNull, rawOffset, positionCount);
6874
}
69-
70-
sliceOutput.writeInt(nonNullPositionCount);
71-
sliceOutput.writeBytes(valuesWithoutNull, 0, nonNullPositionCount);
7275
}
7376
}
7477

0 commit comments

Comments
 (0)