diff --git a/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadScheduler.java b/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadScheduler.java index 558f055f152a..eaf4ed7cac7c 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadScheduler.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadScheduler.java @@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.configuration2.PropertiesConfiguration; import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.utils.TarCompressionUtils; import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory; import org.apache.pinot.server.conf.ServerConf; @@ -46,6 +47,8 @@ import org.apache.pinot.spi.crypt.PinotCrypterFactory; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.metrics.PinotMetricUtils; +import org.apache.pinot.spi.metrics.PinotMetricsRegistry; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.retry.AttemptsExceededException; import org.slf4j.Logger; @@ -163,6 +166,24 @@ void initializeZK() { void initializeMetricsReporter() { LOGGER.info("Initializing metrics reporter"); + try { + ServerConf serverConf = new ServerConf(_pinotConfig); + PinotMetricsRegistry metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(serverConf.getMetricsConfig()); + ServerMetrics serverMetrics = + new ServerMetrics(serverConf.getMetricsPrefix(), metricsRegistry, serverConf.emitTableLevelMetrics(), + serverConf.getAllowedTablesForEmittingMetrics()); + serverMetrics.initializeGlobalMeters(); + boolean registered = ServerMetrics.register(serverMetrics); + if (registered) { + LOGGER.info("ServerMetrics successfully registered for predownload container"); + } else { + LOGGER.error("Failed to register ServerMetrics; an instance was already registered"); + } + } catch (Exception e) { + LOGGER.error("Failed to initialize ServerMetrics in predownload container; " + + "continuing with the currently registered ServerMetrics instance", e); + } + _predownloadMetrics = new PredownloadMetrics(); PredownloadStatusRecorder.registerMetrics(_predownloadMetrics); } diff --git a/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadSchedulerTest.java b/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadSchedulerTest.java index a5dca3bd8b5e..958e24bcf3cf 100644 --- a/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadSchedulerTest.java +++ b/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadSchedulerTest.java @@ -33,15 +33,19 @@ import org.apache.commons.io.FileUtils; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.utils.TarCompressionUtils; import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory; import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.env.CommonsConfigurationUtils; import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.metrics.PinotMetricUtils; +import org.apache.pinot.spi.metrics.PinotMetricsRegistry; import org.mockito.MockedConstruction; import org.mockito.MockedStatic; import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; import org.testng.annotations.Test; import static org.apache.pinot.server.predownload.PredownloadTestUtil.*; @@ -51,6 +55,8 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; public class PredownloadSchedulerTest { @@ -87,6 +93,11 @@ public void setUp(PropertiesConfiguration properties) _predownloadScheduler._executor = Runnable::run; } + @AfterMethod + public void cleanUpMetrics() { + ServerMetrics.deregister(); + } + @AfterClass public void tearDown() throws Exception { @@ -355,4 +366,68 @@ public void testPredownloadParallelismConfiguration() throws Exception { "Zero parallelism should fall back to default"); zeroScheduler.stop(); } + + @Test + public void testInitializeMetricsReporterRegistersServerMetrics() + throws Exception { + String propertiesFilePath = this.getClass().getClassLoader().getResource(SAMPLE_PROPERTIES_FILE_NAME).getPath(); + PropertiesConfiguration properties = CommonsConfigurationUtils.fromPath(propertiesFilePath); + PredownloadScheduler scheduler = new PredownloadScheduler(properties); + + try (MockedStatic pinotMetricUtilsMockedStatic = mockStatic(PinotMetricUtils.class)) { + PinotMetricsRegistry mockRegistry = mock(PinotMetricsRegistry.class); + pinotMetricUtilsMockedStatic.when(() -> PinotMetricUtils.getPinotMetricsRegistry(any())) + .thenReturn(mockRegistry); + + scheduler.initializeMetricsReporter(); + + pinotMetricUtilsMockedStatic.verify(() -> PinotMetricUtils.getPinotMetricsRegistry(any()), times(1)); + ServerMetrics registeredMetrics = ServerMetrics.get(); + assertNotNull(registeredMetrics, "ServerMetrics should be registered after initializeMetricsReporter"); + } finally { + scheduler.stop(); + } + } + + @Test + public void testInitializeMetricsReporterFallsBackOnFailure() + throws Exception { + String propertiesFilePath = this.getClass().getClassLoader().getResource(SAMPLE_PROPERTIES_FILE_NAME).getPath(); + PropertiesConfiguration properties = CommonsConfigurationUtils.fromPath(propertiesFilePath); + PredownloadScheduler scheduler = new PredownloadScheduler(properties); + + try (MockedStatic pinotMetricUtilsMockedStatic = mockStatic(PinotMetricUtils.class)) { + pinotMetricUtilsMockedStatic.when(() -> PinotMetricUtils.getPinotMetricsRegistry(any())) + .thenThrow(new RuntimeException("Metrics factory not available")); + + scheduler.initializeMetricsReporter(); + + ServerMetrics registeredMetrics = ServerMetrics.get(); + assertNotNull(registeredMetrics, "ServerMetrics.get() should return NOOP instance, not null"); + } finally { + scheduler.stop(); + } + } + + @Test + public void testInitializeMetricsReporterAlwaysCreatesPredownloadMetrics() + throws Exception { + String propertiesFilePath = this.getClass().getClassLoader().getResource(SAMPLE_PROPERTIES_FILE_NAME).getPath(); + PropertiesConfiguration properties = CommonsConfigurationUtils.fromPath(propertiesFilePath); + PredownloadScheduler scheduler = spy(new PredownloadScheduler(properties)); + + try (MockedStatic pinotMetricUtilsMockedStatic = mockStatic(PinotMetricUtils.class); + MockedStatic statusRecorderMockedStatic = + mockStatic(PredownloadStatusRecorder.class)) { + pinotMetricUtilsMockedStatic.when(() -> PinotMetricUtils.getPinotMetricsRegistry(any())) + .thenThrow(new RuntimeException("Metrics factory not available")); + + scheduler.initializeMetricsReporter(); + + statusRecorderMockedStatic.verify( + () -> PredownloadStatusRecorder.registerMetrics(any(PredownloadMetrics.class)), times(1)); + } finally { + scheduler.stop(); + } + } }