Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.configuration2.PropertiesConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.server.conf.ServerConf;
Expand All @@ -46,6 +47,8 @@
import org.apache.pinot.spi.crypt.PinotCrypterFactory;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -163,6 +166,24 @@ void initializeZK() {
void initializeMetricsReporter() {
LOGGER.info("Initializing metrics reporter");

try {
ServerConf serverConf = new ServerConf(_pinotConfig);
PinotMetricsRegistry metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(serverConf.getMetricsConfig());
ServerMetrics serverMetrics =
new ServerMetrics(serverConf.getMetricsPrefix(), metricsRegistry, serverConf.emitTableLevelMetrics(),
serverConf.getAllowedTablesForEmittingMetrics());
serverMetrics.initializeGlobalMeters();
boolean registered = ServerMetrics.register(serverMetrics);
if (registered) {
LOGGER.info("ServerMetrics successfully registered for predownload container");
} else {
LOGGER.error("Failed to register ServerMetrics; an instance was already registered");
}
} catch (Exception e) {
LOGGER.error("Failed to initialize ServerMetrics in predownload container; "
+ "continuing with the currently registered ServerMetrics instance", e);
}

_predownloadMetrics = new PredownloadMetrics();
PredownloadStatusRecorder.registerMetrics(_predownloadMetrics);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,19 @@
import org.apache.commons.io.FileUtils;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.env.CommonsConfigurationUtils;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
import org.mockito.MockedConstruction;
import org.mockito.MockedStatic;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

import static org.apache.pinot.server.predownload.PredownloadTestUtil.*;
Expand All @@ -51,6 +55,8 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;


public class PredownloadSchedulerTest {
Expand Down Expand Up @@ -87,6 +93,11 @@ public void setUp(PropertiesConfiguration properties)
_predownloadScheduler._executor = Runnable::run;
}

@AfterMethod
public void cleanUpMetrics() {
ServerMetrics.deregister();
}

@AfterClass
public void tearDown()
throws Exception {
Expand Down Expand Up @@ -355,4 +366,68 @@ public void testPredownloadParallelismConfiguration() throws Exception {
"Zero parallelism should fall back to default");
zeroScheduler.stop();
}

@Test
public void testInitializeMetricsReporterRegistersServerMetrics()
throws Exception {
String propertiesFilePath = this.getClass().getClassLoader().getResource(SAMPLE_PROPERTIES_FILE_NAME).getPath();
PropertiesConfiguration properties = CommonsConfigurationUtils.fromPath(propertiesFilePath);
PredownloadScheduler scheduler = new PredownloadScheduler(properties);

try (MockedStatic<PinotMetricUtils> pinotMetricUtilsMockedStatic = mockStatic(PinotMetricUtils.class)) {
PinotMetricsRegistry mockRegistry = mock(PinotMetricsRegistry.class);
pinotMetricUtilsMockedStatic.when(() -> PinotMetricUtils.getPinotMetricsRegistry(any()))
.thenReturn(mockRegistry);

scheduler.initializeMetricsReporter();

pinotMetricUtilsMockedStatic.verify(() -> PinotMetricUtils.getPinotMetricsRegistry(any()), times(1));
ServerMetrics registeredMetrics = ServerMetrics.get();
assertNotNull(registeredMetrics, "ServerMetrics should be registered after initializeMetricsReporter");
} finally {
scheduler.stop();
}
}

@Test
public void testInitializeMetricsReporterFallsBackOnFailure()
throws Exception {
String propertiesFilePath = this.getClass().getClassLoader().getResource(SAMPLE_PROPERTIES_FILE_NAME).getPath();
PropertiesConfiguration properties = CommonsConfigurationUtils.fromPath(propertiesFilePath);
PredownloadScheduler scheduler = new PredownloadScheduler(properties);

try (MockedStatic<PinotMetricUtils> pinotMetricUtilsMockedStatic = mockStatic(PinotMetricUtils.class)) {
pinotMetricUtilsMockedStatic.when(() -> PinotMetricUtils.getPinotMetricsRegistry(any()))
.thenThrow(new RuntimeException("Metrics factory not available"));

scheduler.initializeMetricsReporter();

ServerMetrics registeredMetrics = ServerMetrics.get();
assertNotNull(registeredMetrics, "ServerMetrics.get() should return NOOP instance, not null");
} finally {
scheduler.stop();
}
}

@Test
public void testInitializeMetricsReporterAlwaysCreatesPredownloadMetrics()
throws Exception {
String propertiesFilePath = this.getClass().getClassLoader().getResource(SAMPLE_PROPERTIES_FILE_NAME).getPath();
PropertiesConfiguration properties = CommonsConfigurationUtils.fromPath(propertiesFilePath);
PredownloadScheduler scheduler = spy(new PredownloadScheduler(properties));

try (MockedStatic<PinotMetricUtils> pinotMetricUtilsMockedStatic = mockStatic(PinotMetricUtils.class);
MockedStatic<PredownloadStatusRecorder> statusRecorderMockedStatic =
mockStatic(PredownloadStatusRecorder.class)) {
pinotMetricUtilsMockedStatic.when(() -> PinotMetricUtils.getPinotMetricsRegistry(any()))
.thenThrow(new RuntimeException("Metrics factory not available"));

scheduler.initializeMetricsReporter();

statusRecorderMockedStatic.verify(
() -> PredownloadStatusRecorder.registerMetrics(any(PredownloadMetrics.class)), times(1));
} finally {
scheduler.stop();
}
}
}