Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for custom metrics #322

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.wepay.kafka.connect.bigquery.convert.SchemaConverter;
import com.wepay.kafka.connect.bigquery.exception.ConversionConnectException;
import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException;
import com.wepay.kafka.connect.bigquery.metrics.jmx.BqSinkConnectorMetricsImpl;
import com.wepay.kafka.connect.bigquery.utils.*;
import com.wepay.kafka.connect.bigquery.write.batch.GCSBatchTableWriter;
import com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor;
Expand Down Expand Up @@ -93,6 +94,7 @@
*/
public class BigQuerySinkTask extends SinkTask {
private static final Logger logger = LoggerFactory.getLogger(BigQuerySinkTask.class);
private volatile BqSinkConnectorMetricsImpl bqSinkConnectorMetrics;

private AtomicReference<BigQuery> bigQuery;
private AtomicReference<SchemaManager> schemaManager;
Expand Down Expand Up @@ -534,7 +536,8 @@ private GCSToBQWriter getGcsWriter() {
schemaManager,
retry,
retryWait,
autoCreateTables);
autoCreateTables,
bqSinkConnectorMetrics.getMetricsEventPublisher());
}

private SinkRecordConverter getConverter(BigQuerySinkTaskConfig config) {
Expand All @@ -554,6 +557,15 @@ public void start(Map<String, String> properties) {
logger.trace("task.start()");
stopped = false;
config = new BigQuerySinkTaskConfig(properties);

// register metrics
String taskId = String.valueOf(config.getInt(BigQuerySinkTaskConfig.TASK_ID_CONFIG));
if (taskId == null || taskId.isEmpty()) {
taskId = UUID.randomUUID().toString();
}
bqSinkConnectorMetrics = new BqSinkConnectorMetricsImpl(taskId);
bqSinkConnectorMetrics.register();

autoCreateTables = config.getBoolean(BigQuerySinkConfig.TABLE_CREATE_CONFIG);
upsertDelete = config.getBoolean(BigQuerySinkConfig.UPSERT_ENABLED_CONFIG)
|| config.getBoolean(BigQuerySinkConfig.DELETE_ENABLED_CONFIG);
Expand Down Expand Up @@ -680,7 +692,11 @@ private void startGCSToBQLoadTask() {
));
}
}
GCSToBQLoadRunnable loadRunnable = new GCSToBQLoadRunnable(getBigQuery(), bucket);
GCSToBQLoadRunnable loadRunnable = new GCSToBQLoadRunnable(
getBigQuery(),
bucket,
bqSinkConnectorMetrics.getMetricsEventPublisher()
);

int intervalSec = config.getInt(BigQuerySinkConfig.BATCH_LOAD_INTERVAL_SEC_CONFIG);
loadExecutor.scheduleAtFixedRate(loadRunnable, intervalSec, intervalSec, TimeUnit.SECONDS);
Expand Down Expand Up @@ -715,6 +731,9 @@ public void stop() {
stopped = true;
}

if (bqSinkConnectorMetrics != null) {
bqSinkConnectorMetrics.unregister();
}
logger.trace("task.stop()");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.StorageException;

import com.wepay.kafka.connect.bigquery.metrics.MetricsEventPublisher;
import com.wepay.kafka.connect.bigquery.metrics.events.*;
import com.wepay.kafka.connect.bigquery.metrics.jmx.BqSinkConnectorMetricsImpl;
import com.wepay.kafka.connect.bigquery.write.row.GCSToBQWriter;

import org.slf4j.Logger;
Expand All @@ -57,6 +60,7 @@
*/
public class GCSToBQLoadRunnable implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(GCSToBQLoadRunnable.class);
private final MetricsEventPublisher metricsEventPublisher;

private final BigQuery bigQuery;
private final Bucket bucket;
Expand All @@ -81,12 +85,13 @@ public class GCSToBQLoadRunnable implements Runnable {
* @param bigQuery the {@link BigQuery} instance.
* @param bucket the the GCS bucket to read from.
*/
public GCSToBQLoadRunnable(BigQuery bigQuery, Bucket bucket) {
public GCSToBQLoadRunnable(BigQuery bigQuery, Bucket bucket, MetricsEventPublisher metricsEventPublisher) {
this.bigQuery = bigQuery;
this.bucket = bucket;
this.activeJobs = new HashMap<>();
this.claimedBlobIds = new HashSet<>();
this.deletableBlobIds = new HashSet<>();
this.metricsEventPublisher = metricsEventPublisher;
}

/**
Expand All @@ -106,10 +111,13 @@ private Map<TableId, List<Blob>> getBlobsUpToLimit() {
Page<Blob> list = bucket.list();
logger.trace("Finished GCS bucket list");

long totalBlobsCount = 0;

for (Blob blob : list.iterateAll()) {
BlobId blobId = blob.getBlobId();
TableId table = getTableFromBlob(blob);
logger.debug("Checking blob bucket={}, name={}, table={} ", blob.getBucket(), blob.getName(), table );
totalBlobsCount++;

if (table == null || claimedBlobIds.contains(blobId) || deletableBlobIds.contains(blobId)) {
// don't do anything if:
Expand All @@ -135,6 +143,9 @@ private Map<TableId, List<Blob>> getBlobsUpToLimit() {
}

logger.debug("Got blobs to upload: {}", tableToURIs);
metricsEventPublisher.publishMetricEvent(
new StorageBlobsStatusEvent(totalBlobsCount, claimedBlobIds.size(), deletableBlobIds.size())
);
return tableToURIs;
}

Expand Down Expand Up @@ -256,6 +267,9 @@ private void checkJobs() {
} finally {
logger.info("GCS To BQ job tally: {} successful jobs, {} failed jobs.",
successCount, failureCount);
metricsEventPublisher.publishMetricEvent(
new LoadJobStatusEvent(activeJobs.size(), successCount, failureCount)
);
}
}
}
Expand Down Expand Up @@ -301,8 +315,14 @@ private void deleteBlobs() {
logger.info("Successfully deleted {} blobs; failed to delete {} blobs",
successfulDeletes,
failedDeletes);
metricsEventPublisher.publishMetricEvent(
new StorageBlobDeleteOperationStatusEvent(successfulDeletes, failedDeletes)
);
} catch (StorageException ex) {
logger.warn("Storage exception while attempting to delete blobs", ex);
metricsEventPublisher.publishMetricEvent(
new StorageExceptionCountEvent()
);
}
}

Expand All @@ -319,8 +339,14 @@ public void run() {
logger.trace("Loading {} new blobs into BQ", tablesToSourceURIs.size());
triggerBigQueryLoadJobs(tablesToSourceURIs);
logger.trace("Finished BQ load run");
metricsEventPublisher.publishMetricEvent(
new BQLoaderRunStatusEvent(true)
);
} catch (Exception e) {
logger.error("Uncaught error in BQ loader", e);
metricsEventPublisher.publishMetricEvent(
new BQLoaderRunStatusEvent(false)
);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.wepay.kafka.connect.bigquery.metrics;

import io.debezium.annotation.ThreadSafe;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.management.*;
import java.lang.management.ManagementFactory;

@ThreadSafe
public abstract class Metrics {
private static final Logger LOGGER = LoggerFactory.getLogger(Metrics.class);
private final ObjectName name;
private volatile boolean registered = false;

protected Metrics(String taskId) {
this.name = this.metricName(taskId);
}

public synchronized void register() {
try {
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
if (mBeanServer == null) {
LOGGER.info("JMX not supported, bean '{}' not registered", this.name);
} else {
if (!mBeanServer.isRegistered(this.name)) {
// StandardMBean mbean = new StandardMBean(bqSinkConnectorMetrics, BqSinkConnectorMetricsMXBean.class);
try {
mBeanServer.registerMBean(this, this.name);
} catch (InstanceAlreadyExistsException ex1) {
LOGGER.error("Failed to register metrics MBean, as an old set with the same name exists, metrics will not be available", ex1);
} catch (MBeanRegistrationException ex2) {
LOGGER.error("Failed to register metrics MBean, metrics will not be available", ex2);
}
}
this.registered = true;
}
} catch (NotCompliantMBeanException ex) {
LOGGER.error("Failed to create Standard MBean, metrics will not be available", ex);
}
}

public synchronized void unregister() {
if (this.name != null && this.registered) {
try {
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
if (mBeanServer == null) {
LOGGER.debug("JMX not supported, bean '{}' not registered", this.name);
return;
}

try {
mBeanServer.unregisterMBean(this.name);
} catch (InstanceNotFoundException var3) {
LOGGER.info("Unable to unregister metrics MBean '{}' as it was not found", this.name);
}

this.registered = false;
} catch (JMException var4) {
throw new RuntimeException("Unable to unregister the MBean '" + this.name + "'", var4);
}
}
}

protected ObjectName metricName(String taskId) {
String metricName = "kafka.connect:type=bqsink-connector-metrics,taskid=" + taskId;
try {
return new ObjectName(metricName);
} catch (MalformedObjectNameException var5) {
throw new ConnectException("Invalid metric name '" + metricName + "'");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.wepay.kafka.connect.bigquery.metrics;

import com.wepay.kafka.connect.bigquery.metrics.events.MetricEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

public class MetricsEventPublisher {
private static final Logger LOGGER = LoggerFactory.getLogger(MetricsEventPublisher.class);
private final Map<Class<? extends MetricEvent>, Consumer<? extends MetricEvent>> subscribes = new ConcurrentHashMap<>();

public <T extends MetricEvent> void publishMetricEvent(T metricEvent) {
Consumer<T> consumer = (Consumer<T>) subscribes.get(metricEvent.getClass());
if (consumer != null) {
try {
consumer.accept(metricEvent);
} catch (Exception ex) {
LOGGER.warn("Failed to process metric event: " + metricEvent, ex);
}
}
}

public <T extends MetricEvent> void subscribe(Class<T> clazz, Consumer<T> consumer) {
if (subscribes.containsKey(clazz)) {
throw new IllegalStateException();
}
subscribes.put(clazz, consumer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.wepay.kafka.connect.bigquery.metrics.events;

public class BQLoaderRunStatusEvent implements MetricEvent {

private final boolean isSuccessfulBQLoaderRun;

public BQLoaderRunStatusEvent(boolean isSuccessfulBQLoaderRun) {
this.isSuccessfulBQLoaderRun = isSuccessfulBQLoaderRun;
}

public boolean isSuccessfulBQLoaderRun() {
return isSuccessfulBQLoaderRun;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.wepay.kafka.connect.bigquery.metrics.events;

public class LoadJobStatusEvent implements MetricEvent {

private final int numberOfActiveLoadJobs;
private final int numberOfSuccessfulLoadJobs;
private final int numberOfFailedLoadJobs;

public LoadJobStatusEvent(int numberOfActiveLoadJobs, int numberOfSuccessfulLoadJobs, int numberOfFailedLoadJobs) {
this.numberOfActiveLoadJobs = numberOfActiveLoadJobs;
this.numberOfSuccessfulLoadJobs = numberOfSuccessfulLoadJobs;
this.numberOfFailedLoadJobs = numberOfFailedLoadJobs;
}

public int getNumberOfActiveLoadJobs() {
return numberOfActiveLoadJobs;
}

public int getNumberOfSuccessfulLoadJobs() {
return numberOfSuccessfulLoadJobs;
}

public int getNumberOfFailedLoadJobs() {
return numberOfFailedLoadJobs;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.wepay.kafka.connect.bigquery.metrics.events;

public interface MetricEvent {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.wepay.kafka.connect.bigquery.metrics.events;

public class StorageBlobDeleteOperationStatusEvent implements MetricEvent {

private final int numberOfSuccessfullyDeletedBlobs;
private final int numberOfFailedDeletedBlobs;

public StorageBlobDeleteOperationStatusEvent(int numberOfSuccessfullyDeletedBlobs, int numberOfFailedDeletedBlobs) {
this.numberOfSuccessfullyDeletedBlobs = numberOfSuccessfullyDeletedBlobs;
this.numberOfFailedDeletedBlobs = numberOfFailedDeletedBlobs;
}

public int getNumberOfSuccessfullyDeletedBlobs() {
return numberOfSuccessfullyDeletedBlobs;
}

public int getNumberOfFailedDeletedBlobs() {
return numberOfFailedDeletedBlobs;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.wepay.kafka.connect.bigquery.metrics.events;

public class StorageBlobsStatusEvent implements MetricEvent {

private final long totalNumberOfBlobsInStorage;
private final int claimedNumberOfBlobsInStorage;
private final int deletableNumberOfBlobsInStorage;

public StorageBlobsStatusEvent(long totalNumberOfBlobsInStorage, int claimedNumberOfBlobsInStorage, int deletableNumberOfBlobsInStorage) {
this.totalNumberOfBlobsInStorage = totalNumberOfBlobsInStorage;
this.claimedNumberOfBlobsInStorage = claimedNumberOfBlobsInStorage;
this.deletableNumberOfBlobsInStorage = deletableNumberOfBlobsInStorage;
}

public long getTotalNumberOfBlobsInStorage() {
return totalNumberOfBlobsInStorage;
}

public int getClaimedNumberOfBlobsInStorage() {
return claimedNumberOfBlobsInStorage;
}

public int getDeletableNumberOfBlobsInStorage() {
return deletableNumberOfBlobsInStorage;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.wepay.kafka.connect.bigquery.metrics.events;

public class StorageExceptionCountEvent implements MetricEvent {
}
Loading