Skip to content

Commit

Permalink
feat: introduce partition-level metrics and add more compaction metrics
Browse files Browse the repository at this point in the history
1. introduce partition-level metrics
2. add more compaction metrics

Signed-off-by: TheR1sing3un <[email protected]>
  • Loading branch information
TheR1sing3un committed Nov 5, 2024
1 parent 506f106 commit 2288f5a
Show file tree
Hide file tree
Showing 11 changed files with 174 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -2154,6 +2154,10 @@ public boolean isCompactionLogBlockMetricsOn() {
return metricsConfig.isCompactionLogBlockMetricsOn();
}

public boolean isPartitionLevelMetricsOn() {
return getBoolean(HoodieMetricsConfig.PARTITION_LEVEL_METRICS_ENABLE);
}

public boolean isExecutorMetricsEnabled() {
return metricsConfig.isExecutorMetricsEnabled();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.metrics;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.Reservoir;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.UniformSnapshot;

import java.util.ArrayList;
import java.util.List;

public class HoodieHistogram extends Histogram {

public HoodieHistogram() {
super(new HoodieUnlimitedReservoir());
}

/**
* UNSAFE: This class will cache all measurements in memory and may cause OOM if the number of measurements is too large.
*/
private static class HoodieUnlimitedReservoir implements Reservoir {

private final List<Long> measurements = new ArrayList<>();

@Override
public int size() {
return measurements.size();
}

@Override
public void update(long value) {
measurements.add(value);
}

@Override
public Snapshot getSnapshot() {
return new UniformSnapshot(measurements);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,25 @@ public class HoodieMetrics {
public static final String DURATION_STR = "duration";
public static final String SOURCE_READ_AND_INDEX_ACTION = "source_read_and_index";

/* --------------- Partition Level Metrics --------------- */
public static final String LOG_RECORDS_SCAN_TIME = "logRecordsScanTime";
public static final String CREATE_OR_UPSERT_TIME = "createTimeOrUpsertTime";
// Operation time means the time taken to perform the operation on the partition, equals to the sum of createTime/updateTime and logRecordsScanTime
public static final String OPERATION_TIME = "operationTime";
public static final String WRITE_RECORDS_NUM = "writeRecordsNum";
public static final String UPDATE_RECORDS_NUM = "updateRecordsNum";
public static final String INSERT_RECORDS_NUM = "insertRecordsNum";
public static final String DELETE_RECORDS_NUM = "deleteRecordsNum";
public static final String BYTES_WRITTEN = "bytesWritten";
// The ratio of spilled log records to total log records
public static final String LOG_RECORDS_SPILL_PERCENT = "logRecordsSpillPercent";
public static final String MAX_MEMORY_FOR_COMPACTION = "maxMemoryForCompaction";


public static final String PARTITION_LEVEL_PREFIX = "partition=";
public static final String UN_PARTITIONED_TABLE_METRIC_NAME = "UN_PARTITIONED_METRICS";
/* --------------- Partition Level Metrics --------------- */

private Metrics metrics;
// Some timers
public String rollbackTimerName = null;
Expand Down Expand Up @@ -261,6 +280,22 @@ public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs, Hoo
metrics.registerGauge(getMetricsName(actionType, TOTAL_CORRUPTED_LOG_BLOCKS_STR), totalCorruptedLogBlocks);
metrics.registerGauge(getMetricsName(actionType, TOTAL_ROLLBACK_LOG_BLOCKS_STR), totalRollbackLogBlocks);
}
if (config.isPartitionLevelMetricsOn()) {
metadata.getPartitionToWriteStats().values().stream().flatMap(stats -> stats.stream()).forEach(stat -> {
metrics.registerHistogram(getPartitionMetricsName(actionType, stat.getPartitionPath(), LOG_RECORDS_SCAN_TIME), stat.getRuntimeStats().getTotalScanTime());
metrics.registerHistogram(getPartitionMetricsName(actionType, stat.getPartitionPath(), CREATE_OR_UPSERT_TIME),
stat.getRuntimeStats().getTotalCreateTime() + stat.getRuntimeStats().getTotalUpsertTime());
metrics.registerHistogram(getPartitionMetricsName(actionType, stat.getPartitionPath(), OPERATION_TIME),
stat.getRuntimeStats().getTotalScanTime() + stat.getRuntimeStats().getTotalCreateTime() + stat.getRuntimeStats().getTotalUpsertTime());
metrics.registerHistogram(getPartitionMetricsName(actionType, stat.getPartitionPath(), WRITE_RECORDS_NUM), stat.getNumWrites());
metrics.registerHistogram(getPartitionMetricsName(actionType, stat.getPartitionPath(), UPDATE_RECORDS_NUM), stat.getNumUpdateWrites());
metrics.registerHistogram(getPartitionMetricsName(actionType, stat.getPartitionPath(), INSERT_RECORDS_NUM), stat.getNumInserts());
metrics.registerHistogram(getPartitionMetricsName(actionType, stat.getPartitionPath(), DELETE_RECORDS_NUM), stat.getNumDeletes());
metrics.registerHistogram(getPartitionMetricsName(actionType, stat.getPartitionPath(), BYTES_WRITTEN), stat.getTotalWriteBytes());
metrics.registerHistogram(getPartitionMetricsName(actionType, stat.getPartitionPath(), LOG_RECORDS_SPILL_PERCENT), (long) (stat.getRuntimeStats().getLogRecordsSpillRatio() * 100));
metrics.registerHistogram(getPartitionMetricsName(actionType, stat.getPartitionPath(), MAX_MEMORY_FOR_COMPACTION), stat.getRuntimeStats().getMaxMemoryForCompaction());
});
}
}
}

Expand Down Expand Up @@ -334,6 +369,11 @@ public String getMetricsName(String action, String metric) {
}
}

public String getPartitionMetricsName(String action, String partition, String metric) {
partition = StringUtils.isNullOrEmpty(partition) ? UN_PARTITIONED_TABLE_METRIC_NAME : partition;
return config == null ? null : String.format("%s.%s.%s.%s", config.getMetricReporterMetricsNamePrefix(), action, PARTITION_LEVEL_PREFIX + partition, metric);
}

public void updateClusteringFileCreationMetrics(long durationInMs) {
reportMetrics("replacecommit", "fileCreationTime", durationInMs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,10 @@ public List<WriteStatus> compact(HoodieCompactionHandler compactionHandler,
if (stat.getRuntimeStats() != null) {
runtimeStats.setTotalCreateTime(stat.getRuntimeStats().getTotalCreateTime());
runtimeStats.setTotalUpsertTime(stat.getRuntimeStats().getTotalUpsertTime());
runtimeStats.setLogRecordsSpillRatio(scanner.getSpillRatio());
runtimeStats.setMaxMemoryForCompaction(maxMemoryPerCompaction);
}
stat.setRuntimeStats(runtimeStats);
s.getStat().setRuntimeStats(runtimeStats);
}).collect(toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public void start() {
when(writeConfig.isMetricsOn()).thenReturn(true);
when(metricsConfig.getMetricsReporterType()).thenReturn(MetricsReporterType.CONSOLE);
when(metricsConfig.getBasePath()).thenReturn("s3://test" + UUID.randomUUID());
when(metricsConfig.getMetricReporterMetricsNamePrefix()).thenReturn("hoodie");
hoodieMetrics = new HoodieMetrics(writeConfig, HoodieTestUtils.getDefaultStorage());
metrics = hoodieMetrics.getMetrics();
}
Expand All @@ -64,4 +65,12 @@ public void testRegisterGauge() {
metrics.registerGauge("metric1", 123L);
assertEquals("123", metrics.getRegistry().getGauges().get("metric1").getValue().toString());
}

@Test
public void testRegisterHistogram() {
for (int i = 0; i < 10; i++) {
metrics.registerHistogram("histogram1", i);
}
assertEquals(10, metrics.getRegistry().getHistograms().get("histogram1").getCount());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,16 @@ public static class RuntimeStats implements Serializable {
*/
private long totalCreateTime;

/**
* Ratio of log records spilled to total log records.
*/
private double logRecordsSpillRatio;

/**
* Max memory used for compaction.
*/
private long maxMemoryForCompaction;

public long getTotalScanTime() {
return totalScanTime;
}
Expand All @@ -446,5 +456,21 @@ public long getTotalCreateTime() {
public void setTotalCreateTime(long totalCreateTime) {
this.totalCreateTime = totalCreateTime;
}

public double getLogRecordsSpillRatio() {
return logRecordsSpillRatio;
}

public void setLogRecordsSpillRatio(double logRecordsSpillRatio) {
this.logRecordsSpillRatio = logRecordsSpillRatio;
}

public long getMaxMemoryForCompaction() {
return maxMemoryForCompaction;
}

public void setMaxMemoryForCompaction(long maxMemoryForCompaction) {
this.maxMemoryForCompaction = maxMemoryForCompaction;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
private final long maxMemorySizeInBytes;
// Stores the total time taken to perform reading and merging of log blocks
private long totalTimeTakenToReadAndMergeBlocks;
// ratio of records to be spilled to disk
private double spillRatio;

@SuppressWarnings("unchecked")
private HoodieMergedLogRecordScanner(HoodieStorage storage, String basePath, List<String> logFilePaths, Schema readerSchema,
Expand Down Expand Up @@ -205,6 +207,7 @@ private void performScan() {

this.totalTimeTakenToReadAndMergeBlocks = timer.endTimer();
this.numMergedRecordsInLog = records.size();
this.spillRatio = records.getInDiskRecordsNumRatio();

if (LOG.isInfoEnabled()) {
LOG.info("Number of log files scanned => {}", logFilePaths.size());
Expand All @@ -213,6 +216,7 @@ private void performScan() {
LOG.info("Total size in bytes of MemoryBasedMap in ExternalSpillableMap => {}", records.getCurrentInMemoryMapSize());
LOG.info("Number of entries in DiskBasedMap in ExternalSpillableMap => {}", records.getDiskBasedMapNumEntries());
LOG.info("Size of file spilled to disk => {}", records.getSizeOfFileOnDiskInBytes());
LOG.info("Spill ratio => " + this.spillRatio);
}
}

Expand Down Expand Up @@ -306,6 +310,10 @@ public long getTotalTimeTakenToReadAndMergeBlocks() {
return totalTimeTakenToReadAndMergeBlocks;
}

public double getSpillRatio() {
return spillRatio;
}

@Override
public void close() {
if (records != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ public int getInMemoryMapNumEntries() {
return inMemoryMap.size();
}

public double getInDiskRecordsNumRatio() {
return (double) getDiskBasedMap().size() / (inMemoryMap.size() + getDiskBasedMap().size());
}

/**
* Approximate memory footprint of the in-memory map.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ public class HoodieMetricsConfig extends HoodieConfig {
.sinceVersion("0.14.0")
.withDocumentation("Turn on/off metrics reporting for log blocks with compaction commit. off by default.");

public static final ConfigProperty<Boolean> PARTITION_LEVEL_METRICS_ENABLE = ConfigProperty
.key(METRIC_PREFIX + ".partition.level.metrics.enable")
.defaultValue(false)
.sinceVersion("0.15.0")
.withDocumentation("Enable partition level metrics. off by default.");

/**
* @deprecated Use {@link #TURN_METRICS_ON} and its methods instead
*/
Expand Down Expand Up @@ -403,6 +409,11 @@ public Builder withLockingMetrics(boolean enable) {
return this;
}

public Builder withPartitionLevelMetrics(boolean enable) {
hoodieMetricsConfig.setValue(PARTITION_LEVEL_METRICS_ENABLE, String.valueOf(enable));
return this;
}

public HoodieMetricsConfig build() {

hoodieMetricsConfig.setDefaults(HoodieMetricsConfig.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,15 @@ public Option<HoodieGauge<Long>> registerGauge(String metricName) {
return registerGauge(metricName, 0);
}

public void registerHistograms(Map<String/*metric name*/, Long/*updated value*/> metricsMap, Option<String> prefix) {
String metricPrefix = prefix.isPresent() ? prefix.get() + "." : "";
metricsMap.forEach((k, v) -> registerHistogram(metricPrefix + k, v));
}

public void registerHistogram(String metricName, long value) {
registry.histogram(metricName, () -> new HoodieHistogram()).update(value);
}

public MetricRegistry getRegistry() {
return registry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ class TestCompactionTable extends HoodieSparkSqlTestBase {
// disable automatic inline compaction
spark.sql("set hoodie.compact.inline=false")
spark.sql("set hoodie.compact.schedule.inline=false")
spark.sql("set hoodie.metrics.on=true")
spark.sql("set hoodie.metrics.reporter.type=CONSOLE")
spark.sql("set hoodie.metrics.partition.level.metrics.enable=true")

spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
Expand Down

0 comments on commit 2288f5a

Please sign in to comment.