Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,9 @@ private boolean processStarTrees(File indexDir,
StarTreeBuilderUtils.removeStarTrees(indexDir);
} else {
// NOTE: Always use OFF_HEAP mode on server side.
// Pass _indexLoadingConfig so downstream readers can resolve table-level configs we set
MultipleTreesBuilder builder = new MultipleTreesBuilder(starTreeBuilderConfigs, indexDir,
MultipleTreesBuilder.BuildMode.OFF_HEAP);
MultipleTreesBuilder.BuildMode.OFF_HEAP, _indexLoadingConfig);
// We don't create the builder using the try-with-resources pattern because builder.close() performs
// some clean-up steps to roll back the star-tree index to the previous state if it exists. If this goes wrong
// the star-tree index can be in an inconsistent state. To prevent that, when builder.close() throws an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class PinotSegmentColumnReader implements Closeable {
private final NullValueVectorReader _nullValueVectorReader;
private final int[] _dictIdBuffer;
private final DataType _valueType;
@Nullable
private final String _columnName;

public PinotSegmentColumnReader(IndexSegment indexSegment, String column) {
DataSource dataSource = indexSegment.getDataSource(column);
Expand All @@ -51,6 +53,7 @@ public PinotSegmentColumnReader(IndexSegment indexSegment, String column) {
_dictionary = dataSource.getDictionary();
_nullValueVectorReader = dataSource.getNullValueVector();
_valueType = _dictionary != null ? _dictionary.getValueType() : _forwardIndexReader.getStoredType();
_columnName = column;
if (_forwardIndexReader.isSingleValue()) {
_dictIdBuffer = null;
} else {
Expand All @@ -67,6 +70,7 @@ public PinotSegmentColumnReader(ForwardIndexReader forwardIndexReader, @Nullable
_dictionary = dictionary;
_nullValueVectorReader = nullValueVectorReader;
_valueType = _dictionary != null ? _dictionary.getValueType() : _forwardIndexReader.getStoredType();
_columnName = null;
if (_forwardIndexReader.isSingleValue()) {
_dictIdBuffer = null;
} else {
Expand All @@ -92,7 +96,34 @@ public Dictionary getDictionary() {
}

public int getDictId(int docId) {
return _forwardIndexReader.getDictId(docId, _forwardIndexReaderContext);
if (_forwardIndexReader.isDictionaryEncoded()) {
return _forwardIndexReader.getDictId(docId, _forwardIndexReaderContext);
}
if (_dictionary == null) {
throw new UnsupportedOperationException(
"Cannot resolve dictId: forward index is raw and no dictionary is materialized for column: " + (
_columnName != null ? _columnName : "<unknown>"));
}
// If we have separate dictionary on a RAW forward index column, use that dictionary.
switch (_valueType.getStoredType()) {
case INT:
return _dictionary.indexOf(_forwardIndexReader.getInt(docId, _forwardIndexReaderContext));
case LONG:
return _dictionary.indexOf(_forwardIndexReader.getLong(docId, _forwardIndexReaderContext));
case FLOAT:
return _dictionary.indexOf(_forwardIndexReader.getFloat(docId, _forwardIndexReaderContext));
case DOUBLE:
return _dictionary.indexOf(_forwardIndexReader.getDouble(docId, _forwardIndexReaderContext));
case BIG_DECIMAL:
return _dictionary.indexOf(_forwardIndexReader.getBigDecimal(docId, _forwardIndexReaderContext));
case STRING:
return _dictionary.indexOf(_forwardIndexReader.getString(docId, _forwardIndexReaderContext));
case BYTES:
return _dictionary.indexOf(new ByteArray(_forwardIndexReader.getBytes(docId, _forwardIndexReaderContext)));
default:
throw new UnsupportedOperationException(
"Cannot resolve dictId for raw forward index of stored type: " + _valueType.getStoredType());
}
}

public Object getValue(int docId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,13 @@ private void createRemoteBuffers(List<IndexEntry> zeroSizeEntries) {
properties.putAll(_segmentDirectoryLoaderContext.getSegmentCustomConfigs());
}

// Propagate segment-level custom metadata so downstream readers can read it
if (_segmentMetadata != null && _segmentMetadata.getCustomMap() != null) {
for (Map.Entry<String, String> e : _segmentMetadata.getCustomMap().entrySet()) {
properties.setProperty(e.getKey(), e.getValue());
}
}

// Propagate the table's task config (serialized as JSON) to remote/empty index buffers so that
// downstream readers backed by external storage can resolve any configuration they require
// (e.g. credentials, regions, endpoints) from the ingestion task config rather than relying
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.startree.StarTreeBuilderUtils;
import org.apache.pinot.segment.local.startree.v2.store.StarTreeIndexMapUtils;
import org.apache.pinot.segment.local.startree.v2.store.StarTreeIndexMapUtils.IndexKey;
Expand Down Expand Up @@ -86,6 +87,12 @@ public enum BuildMode {
*/
public MultipleTreesBuilder(List<StarTreeV2BuilderConfig> builderConfigs, File indexDir, BuildMode buildMode)
throws Exception {
this(builderConfigs, indexDir, buildMode, null);
}

public MultipleTreesBuilder(List<StarTreeV2BuilderConfig> builderConfigs, File indexDir, BuildMode buildMode,
@Nullable IndexLoadingConfig indexLoadingConfig)
throws Exception {
Preconditions.checkArgument(CollectionUtils.isNotEmpty(builderConfigs), "Must provide star-tree builder configs");
_builderConfigs = builderConfigs;
_buildMode = buildMode;
Expand All @@ -103,10 +110,25 @@ public MultipleTreesBuilder(List<StarTreeV2BuilderConfig> builderConfigs, File i
}
LOGGER.debug(logUpdatedStarTrees.toString());
}
_segment = ImmutableSegmentLoader.load(indexDir, ReadMode.mmap);
_segment = loadSegment(indexDir, indexLoadingConfig);
_starTreeCreationFailed = false;
}

private static ImmutableSegment loadSegment(File indexDir, @Nullable IndexLoadingConfig indexLoadingConfig)
throws Exception {
if (indexLoadingConfig != null && indexLoadingConfig.getTableConfig() != null) {
// Minimal IndexLoadingConfig carrying only TableConfig + Schema. Do NOT propagate
// segmentTier/segmentDirectoryLoader from the parent — a tier loader would treat this as a
// tier-resolution event and relocate the segment mid-build. The default loader is correct
// for this in-place read; TableConfig threads table-level config to downstream readers.
IndexLoadingConfig localConfig =
new IndexLoadingConfig(indexLoadingConfig.getTableConfig(), indexLoadingConfig.getSchema());
localConfig.setReadMode(ReadMode.mmap);
return ImmutableSegmentLoader.load(indexDir, localConfig, false);
}
return ImmutableSegmentLoader.load(indexDir, ReadMode.mmap);
}

/**
* Constructor for the multiple star-trees builder.
*
Expand Down
Loading