Skip to content
This repository has been archived by the owner on Apr 15, 2024. It is now read-only.

feat: add vgtid reset metrics #1

Open
wants to merge 1 commit into
base: v.1.9.8.final
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<version.debezium>${project.version}</version.debezium>

<!-- Force different version -->
<version.debezium.patched>1.9.8.Final</version.debezium.patched>
<version.debezium.patched>1.9.9.Final</version.debezium.patched>

<!--
Specify the properties that will be used for setting up the integration tests' Docker container.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ protected void validateConnection(Map<String, ConfigValue> configValues, Configu
ConfigValue hostnameValue = configValues.get(RelationalDatabaseConnectorConfig.HOSTNAME.name());
// Try to connect to the database ...
final VitessConnectorConfig connectionConfig = new VitessConnectorConfig(config);
try (VitessReplicationConnection connection = new VitessReplicationConnection(connectionConfig, null)) {
try (VitessReplicationConnection connection = new VitessReplicationConnection(connectionConfig, null, null)) {
try {
connection.execute("SHOW DATABASES");
LOGGER.info("Successfully tested connection for {} with user '{}'", connection.connectionString(), connection.username());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.connector.vitess.connection.ReplicationConnection;
import io.debezium.connector.vitess.connection.VitessReplicationConnection;
import io.debezium.connector.vitess.metrics.VgtidResetMetric;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
Expand Down Expand Up @@ -70,7 +71,9 @@ protected ChangeEventSourceCoordinator<VitessPartition, VitessOffsetContext> sta
LOGGER.info("Found previous offset {}", previousOffset);
}

replicationConnection = new VitessReplicationConnection(connectorConfig, schema);
VgtidResetMetric vgtidResetMetric = new VgtidResetMetric(taskContext.getConnectorType(), taskContext.getTaskId(), connectorConfig);
vgtidResetMetric.register();
replicationConnection = new VitessReplicationConnection(connectorConfig, schema, vgtidResetMetric);

queue = new ChangeEventQueue.Builder<DataChangeEvent>()
.pollInterval(connectorConfig.getPollInterval())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.debezium.connector.vitess.Vgtid;
import io.debezium.connector.vitess.VitessConnectorConfig;
import io.debezium.connector.vitess.VitessDatabaseSchema;
import io.debezium.connector.vitess.metrics.VgtidResetMetric;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
Expand Down Expand Up @@ -54,9 +55,12 @@ public class VitessReplicationConnection implements ReplicationConnection {
private final AtomicReference<ManagedChannel> managedChannel = new AtomicReference<>();
private final AtomicInteger internalRestarts = new AtomicInteger(5);

public VitessReplicationConnection(VitessConnectorConfig config, VitessDatabaseSchema schema) {
private final VgtidResetMetric vgtidResetMetric;

public VitessReplicationConnection(VitessConnectorConfig config, VitessDatabaseSchema schema, VgtidResetMetric vgtidResetMetric) {
this.messageDecoder = new VStreamOutputMessageDecoder(schema);
this.config = config;
this.vgtidResetMetric = vgtidResetMetric;
}

/**
Expand Down Expand Up @@ -195,6 +199,7 @@ else if (internalRestarts.get() == 0
config.getKeyspace(), config.tableIncludeList(), vgtid, internalRestarts.getAndDecrement(), latestExistingVgtid);
LOGGER.warn(message, t);
restartStreaming(latestExistingVgtid);
vgtidResetMetric.resetVgtid();
}
else {
LOGGER.error(String.format(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package io.debezium.connector.vitess.metrics;

import java.lang.management.ManagementFactory;
import java.util.Map;
import java.util.stream.Collectors;

import javax.management.JMException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;

import org.apache.kafka.common.utils.Sanitizer;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.annotation.ThreadSafe;

/**
* Base for metrics implementations.
*
* @author Jiri Pechanec
*/
@ThreadSafe
public abstract class CustomMetrics {

private static final Logger LOGGER = LoggerFactory.getLogger(CustomMetrics.class);

private final ObjectName name;
private volatile boolean registered = false;

protected CustomMetrics(String connectorType, Map<String, String> tags) {
this.name = metricName(connectorType, tags);
}

/**
* Registers a metrics MBean into the platform MBean server.
* The method is intentionally synchronized to prevent preemption between registration and unregistration.
*/
public synchronized void register() {
try {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
if (mBeanServer == null) {
LOGGER.info("JMX not supported, bean '{}' not registered", name);
return;
}
LOGGER.info("JMX Registering metric {}", name);
mBeanServer.registerMBean(this, name);
registered = true;
}
catch (JMException e) {
throw new RuntimeException("Unable to register the MBean '" + name + "'", e);
}
}

/**
* Unregisters a metrics MBean from the platform MBean server.
* The method is intentionally synchronized to prevent preemption between registration and unregistration.
*/
public synchronized void unregister() {
if (this.name != null && registered) {
try {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
if (mBeanServer == null) {
LOGGER.debug("JMX not supported, bean '{}' not registered", name);
return;
}
mBeanServer.unregisterMBean(name);
registered = false;
}
catch (JMException e) {
throw new RuntimeException("Unable to unregister the MBean '" + name + "'", e);
}
}
}

/**
* Create a JMX metric name for the given metric.
* @return the JMX metric name
*/
protected ObjectName metricName(String connectorType, Map<String, String> tags) {
final String metricName = "debezium." + connectorType.toLowerCase() + ":type=connector-metrics,"
+ tags.entrySet().stream()
.map(e -> e.getKey() + "=" + Sanitizer.jmxSanitize(e.getValue()))
.collect(Collectors.joining(","));
try {
return new ObjectName(metricName);
}
catch (MalformedObjectNameException e) {
throw new ConnectException("Invalid metric name '" + metricName + "'");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.debezium.connector.vitess.metrics;

import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;

import io.debezium.connector.vitess.VitessConnectorConfig;
import io.debezium.util.Collect;

public class VgtidResetMetric extends CustomMetrics implements VgtidResetMetricMBean {
private final AtomicLong numberOfVgtidResets = new AtomicLong();

public VgtidResetMetric(String connectorType, String taskId, VitessConnectorConfig config) {
super(connectorType,
Collect.linkMapOf("taskId", String.format("%s-%s", taskId, UUID.randomUUID().toString()), "shard", config.getKeyspace(), "tables",
config.tableIncludeList() == null ? "no_table" : config.tableIncludeList()));
}

@Override
public long getNumberOfVgtidResets() {
return numberOfVgtidResets.get();
}

@Override
public void resetVgtid() {
numberOfVgtidResets.incrementAndGet();
}

@Override
public void reset() {
numberOfVgtidResets.set(0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.debezium.connector.vitess.metrics;

public interface VgtidResetMetricMBean {
long getNumberOfVgtidResets();

void resetVgtid();

void reset();
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import io.debezium.connector.vitess.connection.ReplicationMessage;
import io.debezium.connector.vitess.connection.VitessReplicationConnection;
import io.debezium.connector.vitess.metrics.VgtidResetMetric;
import io.debezium.doc.FixFor;
import io.debezium.util.SchemaNameAdjuster;

Expand All @@ -51,8 +52,11 @@ public void shouldHaveVgtidInResponse() throws Exception {
final VitessDatabaseSchema vitessDatabaseSchema = new VitessDatabaseSchema(
conf, SchemaNameAdjuster.create(), VitessTopicSelector.defaultSelector(conf));

final VgtidResetMetric metric = new VgtidResetMetric("testConnector", "1", conf);
metric.register();

AtomicReference<Throwable> error = new AtomicReference<>();
try (VitessReplicationConnection connection = new VitessReplicationConnection(conf, vitessDatabaseSchema)) {
try (VitessReplicationConnection connection = new VitessReplicationConnection(conf, vitessDatabaseSchema, metric)) {
Vgtid startingVgtid = Vgtid.of(
Binlogdata.VGtid.newBuilder()
.addShardGtids(
Expand Down Expand Up @@ -117,7 +121,9 @@ public void shouldReturnUpdatedSchemaWithOnlineDdl() throws Exception {
final VitessDatabaseSchema vitessDatabaseSchema = new VitessDatabaseSchema(
conf, SchemaNameAdjuster.create(), VitessTopicSelector.defaultSelector(conf));
AtomicReference<Throwable> error = new AtomicReference<>();
try (VitessReplicationConnection connection = new VitessReplicationConnection(conf, vitessDatabaseSchema)) {
final VgtidResetMetric metric = new VgtidResetMetric("testConnector", "1", conf);
metric.register();
try (VitessReplicationConnection connection = new VitessReplicationConnection(conf, vitessDatabaseSchema, metric)) {
Vgtid startingVgtid = Vgtid.of(
Binlogdata.VGtid.newBuilder()
.addShardGtids(
Expand Down