Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dvc] Fix metrics reporting issue for DVRT and add more metrics #1548

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.linkedin.davinci.notifier.VeniceNotifier;
import com.linkedin.davinci.stats.AggHostLevelIngestionStats;
import com.linkedin.davinci.stats.AggVersionedDIVStats;
import com.linkedin.davinci.stats.AggVersionedDaVinciRecordTransformerStats;
import com.linkedin.davinci.stats.AggVersionedIngestionStats;
import com.linkedin.davinci.stats.ParticipantStoreConsumptionStats;
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService;
Expand Down Expand Up @@ -465,6 +466,12 @@ public void handleStoreDeleted(Store store) {
"Enabled a thread pool for AA/WC ingestion lookup with {} threads.",
serverConfig.getAaWCIngestionStorageLookupThreadPoolSize());

AggVersionedDaVinciRecordTransformerStats recordTransformerStats = null;
if (recordTransformerConfig != null) {
recordTransformerStats =
new AggVersionedDaVinciRecordTransformerStats(metricsRepository, metadataRepo, serverConfig);
}

ingestionTaskFactory = StoreIngestionTaskFactory.builder()
.setVeniceWriterFactory(veniceWriterFactory)
.setStorageEngineRepository(storageService.getStorageEngineRepository())
Expand All @@ -475,6 +482,7 @@ public void handleStoreDeleted(Store store) {
.setTopicManagerRepository(topicManagerRepository)
.setHostLevelIngestionStats(hostLevelIngestionStats)
.setVersionedDIVStats(versionedDIVStats)
.setDaVinciRecordTransformerStats(recordTransformerStats)
.setVersionedIngestionStats(versionedIngestionStats)
.setStoreBufferService(storeBufferService)
.setServerConfig(serverConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.linkedin.davinci.listener.response.AdminResponse;
import com.linkedin.davinci.notifier.VeniceNotifier;
import com.linkedin.davinci.stats.AggVersionedDIVStats;
import com.linkedin.davinci.stats.AggVersionedDaVinciRecordTransformerStats;
import com.linkedin.davinci.stats.AggVersionedIngestionStats;
import com.linkedin.davinci.stats.HostLevelIngestionStats;
import com.linkedin.davinci.storage.StorageEngineRepository;
Expand Down Expand Up @@ -250,6 +251,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {
protected final HostLevelIngestionStats hostLevelIngestionStats;
protected final AggVersionedDIVStats versionedDIVStats;
protected final AggVersionedIngestionStats versionedIngestionStats;
protected AggVersionedDaVinciRecordTransformerStats daVinciRecordTransformerStats;
protected final BooleanSupplier isCurrentVersion;
protected final Optional<HybridStoreConfig> hybridStoreConfig;
protected final Consumer<DataValidationException> divErrorMetricCallback;
Expand Down Expand Up @@ -502,21 +504,17 @@ public StoreIngestionTask(
recordTransformerConfig);
this.recordTransformerDeserializersByPutSchemaId = new SparseConcurrentList<>();

versionedIngestionStats.registerTransformerLatencySensor(storeName, versionNumber);
versionedIngestionStats.registerTransformerLifecycleStartLatency(storeName, versionNumber);
versionedIngestionStats.registerTransformerLifecycleEndLatency(storeName, versionNumber);
versionedIngestionStats.registerTransformerErrorSensor(storeName, versionNumber);
daVinciRecordTransformerStats = builder.getDaVinciRecordTransformerStats();

// onStartVersionIngestion called here instead of run() because this needs to finish running
// before bootstrapping starts
long startTime = System.currentTimeMillis();
recordTransformer.onStartVersionIngestion(isCurrentVersion.getAsBoolean());
long endTime = System.currentTimeMillis();
versionedIngestionStats.recordTransformerLifecycleStartLatency(
daVinciRecordTransformerStats.recordTransformerOnStartVersionIngestionLatency(
storeName,
versionNumber,
LatencyUtils.getElapsedTimeFromMsToMs(startTime),
endTime);
System.currentTimeMillis());
} else {
this.recordTransformerKeyDeserializer = null;
this.recordTransformerInputValueSchema = null;
Expand Down Expand Up @@ -659,7 +657,13 @@ public synchronized void subscribePartition(PubSubTopicPartition topicPartition,
int partitionNumber = topicPartition.getPartitionNumber();

if (recordTransformer != null) {
long startTime = System.currentTimeMillis();
recordTransformer.internalOnRecovery(storageEngine, partitionNumber, partitionStateSerializer, compressor);
daVinciRecordTransformerStats.recordTransformerOnRecoveryLatency(
storeName,
versionNumber,
LatencyUtils.getElapsedTimeFromMsToMs(startTime),
System.currentTimeMillis());
}

partitionToPendingConsumerActionCountMap.computeIfAbsent(partitionNumber, x -> new AtomicInteger(0))
Expand Down Expand Up @@ -3759,7 +3763,7 @@ private int processKafkaDataMessage(
int putSchemaId = put.getSchemaId() > 0 ? put.getSchemaId() : 1;

if (recordTransformer != null) {
long recordTransformerStartTime = System.currentTimeMillis();
long recordTransformerStartTime = System.nanoTime();
ByteBuffer valueBytes = put.getPutValue();
ByteBuffer assembledObject = chunkAssembler.bufferAndAssembleRecord(
consumerRecord.getTopicPartition(),
Expand Down Expand Up @@ -3800,8 +3804,9 @@ private int processKafkaDataMessage(
try {
transformerResult = recordTransformer.transformAndProcessPut(lazyKey, lazyValue);
} catch (Exception e) {
versionedIngestionStats.recordTransformerError(storeName, versionNumber, 1, currentTimeMs);
String errorMessage = "Record transformer experienced an error when transforming value=" + assembledObject;
daVinciRecordTransformerStats.recordTransformerPutError(storeName, versionNumber, 1, currentTimeMs);
String errorMessage =
"DaVinciRecordTransformer experienced an error when processing value: " + assembledObject;

throw new VeniceMessageException(errorMessage, e);
}
Expand All @@ -3823,10 +3828,10 @@ private int processKafkaDataMessage(
}

put.putValue = transformedBytes;
versionedIngestionStats.recordTransformerLatency(
daVinciRecordTransformerStats.recordTransformerPutLatency(
storeName,
versionNumber,
LatencyUtils.getElapsedTimeFromMsToMs(recordTransformerStartTime),
LatencyUtils.getElapsedTimeFromNsToUs(recordTransformerStartTime),
currentTimeMs);
writeToStorageEngine(producedPartition, keyBytes, put);
} else {
Expand Down Expand Up @@ -3861,7 +3866,21 @@ private int processKafkaDataMessage(

if (recordTransformer != null) {
Lazy<Object> lazyKey = Lazy.of(() -> this.recordTransformerKeyDeserializer.deserialize(keyBytes));
recordTransformer.processDelete(lazyKey);

long startTime = System.nanoTime();
try {
recordTransformer.processDelete(lazyKey);
} catch (Exception e) {
daVinciRecordTransformerStats.recordTransformerDeleteError(storeName, versionNumber, 1, currentTimeMs);
String errorMessage = "DaVinciRecordTransformer experienced an error when deleting key: " + lazyKey.get();

throw new VeniceMessageException(errorMessage, e);
}
daVinciRecordTransformerStats.recordTransformerDeleteLatency(
storeName,
versionNumber,
LatencyUtils.getElapsedTimeFromNsToUs(startTime),
System.currentTimeMillis());

// This is called here after processDelete because if the user stores their data somewhere other than
// Da Vinci, this function needs to execute to allow them to delete the data from the appropriate store
Expand Down Expand Up @@ -4144,12 +4163,11 @@ public synchronized void close() {
long startTime = System.currentTimeMillis();
Store store = storeRepository.getStoreOrThrow(storeName);
recordTransformer.onEndVersionIngestion(store.getCurrentVersion());
long endTime = System.currentTimeMillis();
versionedIngestionStats.recordTransformerLifecycleEndLatency(
daVinciRecordTransformerStats.recordTransformerOnEndVersionIngestionLatency(
storeName,
versionNumber,
LatencyUtils.getElapsedTimeFromMsToMs(startTime),
endTime);
System.currentTimeMillis());
Utils.closeQuietlyWithErrorLogged(this.recordTransformer);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.linkedin.davinci.notifier.VeniceNotifier;
import com.linkedin.davinci.stats.AggHostLevelIngestionStats;
import com.linkedin.davinci.stats.AggVersionedDIVStats;
import com.linkedin.davinci.stats.AggVersionedDaVinciRecordTransformerStats;
import com.linkedin.davinci.stats.AggVersionedIngestionStats;
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService;
import com.linkedin.davinci.storage.StorageEngineRepository;
Expand Down Expand Up @@ -111,6 +112,7 @@ public static class Builder {
private ReadOnlySchemaRepository schemaRepo;
private ReadOnlyStoreRepository metadataRepo;
private TopicManagerRepository topicManagerRepository;
private AggVersionedDaVinciRecordTransformerStats daVinciRecordTransformerStats;
private AggHostLevelIngestionStats ingestionStats;
private AggVersionedDIVStats versionedDIVStats;
private AggVersionedIngestionStats versionedStorageIngestionStats;
Expand Down Expand Up @@ -233,6 +235,15 @@ public Builder setTopicManagerRepository(TopicManagerRepository topicManagerRepo
return set(() -> this.topicManagerRepository = topicManagerRepository);
}

public AggVersionedDaVinciRecordTransformerStats getDaVinciRecordTransformerStats() {
return daVinciRecordTransformerStats;
}

public Builder setDaVinciRecordTransformerStats(
AggVersionedDaVinciRecordTransformerStats daVinciRecordTransformerStats) {
return set(() -> this.daVinciRecordTransformerStats = daVinciRecordTransformerStats);
}

public AggHostLevelIngestionStats getIngestionStats() {
return ingestionStats;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.linkedin.davinci.stats;

import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import io.tehuti.metrics.MetricsRepository;


/**
* The store level stats for {@link com.linkedin.davinci.client.DaVinciRecordTransformer}
*/
public class AggVersionedDaVinciRecordTransformerStats
extends AbstractVeniceAggVersionedStats<DaVinciRecordTransformerStats, DaVinciRecordTransformerStatsReporter> {
public AggVersionedDaVinciRecordTransformerStats(
MetricsRepository metricsRepository,
ReadOnlyStoreRepository metadataRepository,
VeniceServerConfig serverConfig) {
super(
metricsRepository,
metadataRepository,
DaVinciRecordTransformerStats::new,
DaVinciRecordTransformerStatsReporter::new,
serverConfig.isUnregisterMetricForDeletedStoreEnabled());
}

public void recordTransformerPutLatency(String storeName, int version, double value, long timestamp) {
recordVersionedAndTotalStat(storeName, version, stat -> stat.recordTransformerPutLatency(value, timestamp));
}

public void recordTransformerDeleteLatency(String storeName, int version, double value, long timestamp) {
recordVersionedAndTotalStat(storeName, version, stat -> stat.recordTransformerDeleteLatency(value, timestamp));
}

public void recordTransformerOnRecoveryLatency(String storeName, int version, double value, long timestamp) {
recordVersionedAndTotalStat(storeName, version, stat -> stat.recordTransformerOnRecoveryLatency(value, timestamp));
}

public void recordTransformerOnStartVersionIngestionLatency(
String storeName,
int version,
double value,
long timestamp) {
recordVersionedAndTotalStat(
storeName,
version,
stat -> stat.recordTransformerOnStartVersionIngestionLatency(value, timestamp));
}

public void recordTransformerOnEndVersionIngestionLatency(
String storeName,
int version,
double value,
long timestamp) {
recordVersionedAndTotalStat(
storeName,
version,
stat -> stat.recordTransformerOnEndVersionIngestionLatency(value, timestamp));
}

public void recordTransformerPutError(String storeName, int version, double value, long timestamp) {
recordVersionedAndTotalStat(storeName, version, stat -> stat.recordTransformerPutError(value, timestamp));
}

public void recordTransformerDeleteError(String storeName, int version, double value, long timestamp) {
recordVersionedAndTotalStat(storeName, version, stat -> stat.recordTransformerDeleteError(value, timestamp));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,52 +214,10 @@ public void recordNearlineLocalBrokerToReadyToServeLatency(
stat -> stat.recordNearlineLocalBrokerToReadyToServeLatency(value, timestamp));
}

public void recordTransformerLatency(String storeName, int version, double value, long timestamp) {
recordVersionedAndTotalStat(storeName, version, stat -> stat.recordTransformerLatency(value, timestamp));
}

public void recordTransformerLifecycleStartLatency(String storeName, int version, double value, long timestamp) {
recordVersionedAndTotalStat(
storeName,
version,
stat -> stat.recordTransformerLifecycleStartLatency(value, timestamp));
}

public void recordTransformerLifecycleEndLatency(String storeName, int version, double value, long timestamp) {
recordVersionedAndTotalStat(
storeName,
version,
stat -> stat.recordTransformerLifecycleEndLatency(value, timestamp));
}

public void recordTransformerError(String storeName, int version, double value, long timestamp) {
recordVersionedAndTotalStat(storeName, version, stat -> stat.recordTransformerError(value, timestamp));
}

public void recordMaxIdleTime(String storeName, int version, long idleTimeMs) {
getStats(storeName, version).recordIdleTime(idleTimeMs);
}

public void registerTransformerLatencySensor(String storeName, int version) {
getStats(storeName, version).registerTransformerLatencySensor();
getTotalStats(storeName).registerTransformerLatencySensor();
}

public void registerTransformerLifecycleStartLatency(String storeName, int version) {
getStats(storeName, version).registerTransformerLifecycleStartLatencySensor();
getTotalStats(storeName).registerTransformerLifecycleStartLatencySensor();
}

public void registerTransformerLifecycleEndLatency(String storeName, int version) {
getStats(storeName, version).registerTransformerLifecycleEndLatencySensor();
getTotalStats(storeName).registerTransformerLifecycleEndLatencySensor();
}

public void registerTransformerErrorSensor(String storeName, int version) {
getStats(storeName, version).registerTransformerErrorSensor();
getTotalStats(storeName).registerTransformerErrorSensor();
}

public void recordBatchProcessingRequest(String storeName, int version, int size, long timestamp) {
recordVersionedAndTotalStat(storeName, version, stat -> stat.recordBatchProcessingRequest(size, timestamp));
}
Expand Down
Loading
Loading