Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,35 @@
*/
package org.apache.pinot.core.data.manager.offline;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.utils.SegmentUtils;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.data.manager.BaseTableDataManager;
import org.apache.pinot.core.data.manager.DuoSegmentDataManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.SegmentContext;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.Schema;


/**
Expand All @@ -31,8 +55,18 @@
@ThreadSafe
public class OfflineTableDataManager extends BaseTableDataManager {

private TableUpsertMetadataManager _tableUpsertMetadataManager;

@Override
protected void doInit() {
Pair<TableConfig, Schema> tableConfigAndSchema = getCachedTableConfigAndSchema();
TableConfig tableConfig = tableConfigAndSchema.getLeft();
Schema schema = tableConfigAndSchema.getRight();
if (tableConfig.isUpsertEnabled()) {
_tableUpsertMetadataManager =
TableUpsertMetadataManagerFactory.create(_instanceDataManagerConfig.getUpsertConfig(), tableConfig, schema,
this, _segmentOperationsThrottler);
}
}

@Override
Expand All @@ -41,7 +75,17 @@ protected void doStart() {

@Override
protected void doShutdown() {
releaseAndRemoveAllSegments();
if (_tableUpsertMetadataManager != null) {
_tableUpsertMetadataManager.stop();
releaseAndRemoveAllSegments();
try {
_tableUpsertMetadataManager.close();
} catch (IOException e) {
_logger.warn("Caught exception while closing upsert metadata manager", e);
}
} else {
releaseAndRemoveAllSegments();
}
}

protected void doAddOnlineSegment(String segmentName)
Expand All @@ -57,8 +101,122 @@ protected void doAddOnlineSegment(String segmentName)
}
}

@Override
public void addSegment(ImmutableSegment immutableSegment, @Nullable SegmentZKMetadata zkMetadata) {
String segmentName = immutableSegment.getSegmentName();
Preconditions.checkState(!_shutDown,
"Table data manager is already shut down, cannot add segment: %s to table: %s",
segmentName, _tableNameWithType);
if (isUpsertEnabled()) {
handleUpsert(immutableSegment, zkMetadata);
return;
}
super.addSegment(immutableSegment, zkMetadata);
}

@Override
public List<SegmentContext> getSegmentContexts(List<IndexSegment> selectedSegments,
Map<String, String> queryOptions) {
List<SegmentContext> segmentContexts = new ArrayList<>(selectedSegments.size());
selectedSegments.forEach(s -> segmentContexts.add(new SegmentContext(s)));
if (isUpsertEnabled() && !QueryOptionsUtils.isSkipUpsert(queryOptions)) {
_tableUpsertMetadataManager.setSegmentContexts(segmentContexts, queryOptions);
}
return segmentContexts;
}

@Override
public void addConsumingSegment(String segmentName) {
throw new UnsupportedOperationException("Cannot add CONSUMING segment to OFFLINE table");
}

public boolean isUpsertEnabled() {
return _tableUpsertMetadataManager != null;
}

@VisibleForTesting
public TableUpsertMetadataManager getTableUpsertMetadataManager() {
return _tableUpsertMetadataManager;
}

public Map<Integer, Long> getPartitionToPrimaryKeyCount() {
if (isUpsertEnabled()) {
return _tableUpsertMetadataManager.getPartitionToPrimaryKeyCount();
}
return Collections.emptyMap();
}

private void handleUpsert(ImmutableSegment immutableSegment, @Nullable SegmentZKMetadata zkMetadata) {
String segmentName = immutableSegment.getSegmentName();
_logger.info("Adding immutable segment: {} with upsert enabled", segmentName);

// Set the ZK creation time so that same creation time can be used to break the comparison ties across replicas,
// to ensure data consistency of replica.
setZkCreationTimeIfAvailable(immutableSegment, zkMetadata);

Integer partitionId = SegmentUtils.getSegmentPartitionId(segmentName, _tableNameWithType, _helixManager, null);
Preconditions.checkNotNull(partitionId, "Failed to get partition id for segment: " + segmentName
+ " (upsert-enabled table: " + _tableNameWithType + ")");
PartitionUpsertMetadataManager partitionUpsertMetadataManager =
_tableUpsertMetadataManager.getOrCreatePartitionManager(partitionId);

_serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.DOCUMENT_COUNT,
immutableSegment.getSegmentMetadata().getTotalDocs());
_serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.SEGMENT_COUNT, 1L);
ImmutableSegmentDataManager newSegmentManager = new ImmutableSegmentDataManager(immutableSegment);
SegmentDataManager oldSegmentManager = _segmentDataManagerMap.get(segmentName);
if (oldSegmentManager == null) {
// When adding a new segment, we should register it 'before' it is fully initialized by
// partitionUpsertMetadataManager. Because when processing docs in the new segment, the docs in the other
// segments may be invalidated, making the queries see less valid docs than expected. We should let query
// access the new segment asap even though its validDocId bitmap is still being filled by
// partitionUpsertMetadataManager.
registerSegment(segmentName, newSegmentManager, partitionUpsertMetadataManager);
partitionUpsertMetadataManager.trackNewlyAddedSegment(segmentName);
partitionUpsertMetadataManager.addSegment(immutableSegment);
Comment on lines +157 to +176
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Offline upsert path doesn’t handle the partition upsert metadata manager’s preload mode. Unlike RealtimeTableDataManager, this code never calls preloadSegments(...) and doesn’t branch on partitionUpsertMetadataManager.isPreloading() to use preloadSegment(...). If preload is enabled in the upsert config, the preload flag may remain true and segments will be processed via the slower add/replace path (and potentially with different registration ordering than intended). Consider adding the preload trigger (based on ZK metadata partition id) and a preload branch mirroring the realtime implementation.

Copilot uses AI. Check for mistakes.
_logger.info("Added new immutable segment: {} with upsert enabled", segmentName);
} else {
replaceUpsertSegment(segmentName, oldSegmentManager, newSegmentManager, partitionUpsertMetadataManager);
}
}

private void replaceUpsertSegment(String segmentName, SegmentDataManager oldSegmentManager,
ImmutableSegmentDataManager newSegmentManager, PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
IndexSegment oldSegment = oldSegmentManager.getSegment();
ImmutableSegment immutableSegment = newSegmentManager.getSegment();
UpsertConfig.ConsistencyMode consistencyMode = _tableUpsertMetadataManager.getContext().getConsistencyMode();
if (consistencyMode == UpsertConfig.ConsistencyMode.NONE) {
partitionUpsertMetadataManager.replaceSegment(immutableSegment, oldSegment);
registerSegment(segmentName, newSegmentManager, partitionUpsertMetadataManager);
} else {
SegmentDataManager duoSegmentDataManager = new DuoSegmentDataManager(newSegmentManager, oldSegmentManager);
registerSegment(segmentName, duoSegmentDataManager, partitionUpsertMetadataManager);
partitionUpsertMetadataManager.replaceSegment(immutableSegment, oldSegment);
registerSegment(segmentName, newSegmentManager, partitionUpsertMetadataManager);
}
_logger.info("Replaced {} segment: {} with upsert enabled and consistency mode: {}",
oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, consistencyMode);
oldSegmentManager.offload();
releaseSegment(oldSegmentManager);
}

private void registerSegment(String segmentName, SegmentDataManager segmentDataManager,
@Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
if (partitionUpsertMetadataManager != null) {
partitionUpsertMetadataManager.trackSegmentForUpsertView(segmentDataManager.getSegment());
}
registerSegment(segmentName, segmentDataManager);
}

private void setZkCreationTimeIfAvailable(ImmutableSegment segment, @Nullable SegmentZKMetadata zkMetadata) {
if (zkMetadata != null && zkMetadata.getCreationTime() > 0) {
SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
if (segmentMetadata instanceof SegmentMetadataImpl) {
SegmentMetadataImpl segmentMetadataImpl = (SegmentMetadataImpl) segmentMetadata;
segmentMetadataImpl.setZkCreationTime(zkMetadata.getCreationTime());
_logger.info("Set ZK creation time {} for segment: {} in upsert table", zkMetadata.getCreationTime(),
zkMetadata.getSegmentName());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pinot.common.exception.TableNotFoundException;
import org.apache.pinot.common.metrics.ServerQueryPhase;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
import org.apache.pinot.core.query.pruner.SegmentPrunerService;
import org.apache.pinot.core.query.pruner.SegmentPrunerStatistics;
Expand Down Expand Up @@ -81,10 +82,9 @@ public static SingleTableExecutionInfo create(InstanceDataManager instanceDataMa
indexSegments.add(segmentDataManager.getSegment());
}
} else {
RealtimeTableDataManager rtdm = (RealtimeTableDataManager) tableDataManager;
TableUpsertMetadataManager tumm = rtdm.getTableUpsertMetadataManager();
TableUpsertMetadataManager tumm = getTableUpsertMetadataManager(tableDataManager);
boolean isUsingConsistencyMode =
rtdm.getTableUpsertMetadataManager().getContext().getConsistencyMode() != UpsertConfig.ConsistencyMode.NONE;
tumm.getContext().getConsistencyMode() != UpsertConfig.ConsistencyMode.NONE;
if (isUsingConsistencyMode) {
Comment on lines +85 to 88
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getTableUpsertMetadataManager(...) is annotated as nullable but its return value is dereferenced unconditionally when isUpsertTable(tableDataManager) is true. While current isUpsertTable checks make this safe, the nullable contract is misleading and can mask future regressions (or trigger nullness warnings). Consider enforcing non-null here (e.g., Preconditions.checkState(tumm != null, ...) / Objects.requireNonNull) and/or making the helper return non-null for the supported manager types.

Copilot uses AI. Check for mistakes.
tumm.lockForSegmentContexts();
}
Expand Down Expand Up @@ -134,12 +134,25 @@ private static boolean isUpsertTable(TableDataManager tableDataManager) {
// those segments in the list of segments for query to process on the server, otherwise, the query will see less
// than expected valid docs from the upsert table.
if (tableDataManager instanceof RealtimeTableDataManager) {
RealtimeTableDataManager rtdm = (RealtimeTableDataManager) tableDataManager;
return rtdm.isUpsertEnabled();
return ((RealtimeTableDataManager) tableDataManager).isUpsertEnabled();
}
if (tableDataManager instanceof OfflineTableDataManager) {
return ((OfflineTableDataManager) tableDataManager).isUpsertEnabled();
}
return false;
}

@Nullable
private static TableUpsertMetadataManager getTableUpsertMetadataManager(TableDataManager tableDataManager) {
if (tableDataManager instanceof RealtimeTableDataManager) {
return ((RealtimeTableDataManager) tableDataManager).getTableUpsertMetadataManager();
}
if (tableDataManager instanceof OfflineTableDataManager) {
return ((OfflineTableDataManager) tableDataManager).getTableUpsertMetadataManager();
}
return null;
}

private SingleTableExecutionInfo(TableDataManager tableDataManager, List<SegmentDataManager> segmentDataManagers,
List<IndexSegment> indexSegments, Map<IndexSegment, SegmentContext> providedSegmentContexts,
List<String> segmentsToQuery, List<String> optionalSegments, List<String> notAcquiredSegments) {
Expand Down
Loading
Loading