Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Create a JDBC connection only while decoding array types #335

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public class YugabyteDBChangeEventSourceFactory implements ChangeEventSourceFact
private static final Logger LOGGER = LoggerFactory.getLogger(YugabyteDBChangeEventSourceFactory.class);

private final YugabyteDBConnectorConfig configuration;
private final YugabyteDBConnection jdbcConnection;
private final ErrorHandler errorHandler;
private final YugabyteDBEventDispatcher<TableId> dispatcher;
private final Clock clock;
Expand All @@ -46,7 +45,6 @@ public class YugabyteDBChangeEventSourceFactory implements ChangeEventSourceFact

public YugabyteDBChangeEventSourceFactory(YugabyteDBConnectorConfig configuration,
Snapshotter snapshotter,
YugabyteDBConnection jdbcConnection,
ErrorHandler errorHandler,
YugabyteDBEventDispatcher<TableId> dispatcher,
Clock clock, YugabyteDBSchema schema,
Expand All @@ -56,7 +54,6 @@ public YugabyteDBChangeEventSourceFactory(YugabyteDBConnectorConfig configuratio
SlotState startingSlotInfo,
ChangeEventQueue<DataChangeEvent> queue) {
this.configuration = configuration;
this.jdbcConnection = jdbcConnection;
this.errorHandler = errorHandler;
this.dispatcher = dispatcher;
this.clock = clock;
Expand All @@ -76,7 +73,7 @@ public SnapshotChangeEventSource<YBPartition, YugabyteDBOffsetContext> getSnapsh
configuration,
taskContext,
snapshotter,
jdbcConnection,
null, /* yugabyteDBConnection */
schema,
dispatcher,
clock,
Expand All @@ -91,7 +88,7 @@ public StreamingChangeEventSource<YBPartition, YugabyteDBOffsetContext> getStrea
return new YugabyteDBStreamingChangeEventSource(
configuration,
snapshotter,
jdbcConnection,
null, /* yugabyteDBConnection */
dispatcher,
errorHandler,
clock,
Expand All @@ -104,7 +101,7 @@ public StreamingChangeEventSource<YBPartition, YugabyteDBOffsetContext> getStrea
return new YugabyteDBConsistentStreamingSource(
configuration,
snapshotter,
jdbcConnection,
null, /* yugabyteDBConnection */
dispatcher,
errorHandler,
clock,
Expand All @@ -119,14 +116,14 @@ public StreamingChangeEventSource<YBPartition, YugabyteDBOffsetContext> getStrea
public Optional<IncrementalSnapshotChangeEventSource<YBPartition, ? extends DataCollectionId>> getIncrementalSnapshotChangeEventSource(YugabyteDBOffsetContext offsetContext,
SnapshotProgressListener snapshotProgressListener,
DataChangeEventListener dataChangeEventListener) {
final SignalBasedIncrementalSnapshotChangeEventSource<YBPartition, TableId> incrementalSnapshotChangeEventSource = new SignalBasedIncrementalSnapshotChangeEventSource<YBPartition, TableId>(
configuration,
jdbcConnection,
dispatcher,
schema,
clock,
snapshotProgressListener,
dataChangeEventListener);
return Optional.of(incrementalSnapshotChangeEventSource);
final SignalBasedIncrementalSnapshotChangeEventSource<YBPartition, TableId> incrementalSnapshotChangeEventSource = new SignalBasedIncrementalSnapshotChangeEventSource<YBPartition, TableId>(
configuration,
null, /* jdbcConnection */
dispatcher,
schema,
clock,
snapshotProgressListener,
dataChangeEventListener);
return Optional.of(incrementalSnapshotChangeEventSource);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,7 @@ private Object[] columnValues(List<ReplicationMessage.Column> columns, TableId t
undeliveredToastableColumns.remove(columnName);
int position = getPosition(columnName, table, values);
if (position != -1) {
Object value = column.getValue(() -> (BaseConnection) connection.connection(),
connectorConfig.includeUnknownDatatypes());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this change needed, connection.connection() can return null/connection depending on the context

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The YugabyteDBConnection object doesn't have any context of what type of data it will be used to decode. In case an array type is encountered, then a JDBC connection is created to be used otherwise it would be just the YugabyteDBConnection object.

Object value = column.getValue(connection, connectorConfig.includeUnknownDatatypes());
// values[position] = value;
values[position] = new Object[]{ value, Boolean.TRUE };
}
Expand Down Expand Up @@ -240,8 +239,7 @@ private Object[] updatedColumnValues(List<ReplicationMessage.Column> columns, Ta

int position = getPosition(columnName, table, values);
if (position != -1) {
Object value = column.getValue(() -> (BaseConnection) connection.connection(),
connectorConfig.includeUnknownDatatypes());
Object value = column.getValue(connection, connectorConfig.includeUnknownDatatypes());

values[position] = new Object[]{ value, Boolean.TRUE };
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
}
String serializedNameToType = "";
String serializedOidToType = "";
try (YugabyteDBConnection connection = new YugabyteDBConnection(yugabyteDBConnectorConfig.getJdbcConfig(), YugabyteDBConnection.CONNECTION_GENERAL)) {
try (YugabyteDBConnection connection = new YugabyteDBConnection(yugabyteDBConnectorConfig.getJdbcConfig(), YugabyteDBConnection.CONNECTION_VALIDATE_CONNECTION)) {
if (yugabyteDBConnectorConfig.isYSQLDbType()) {
YugabyteDBTypeRegistry typeRegistry = new YugabyteDBTypeRegistry(connection);
Map<String, YugabyteDBType> nameToType = typeRegistry.getNameToType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
/**
* Kafka connect source task which uses YugabyteDB CDC API to process DB changes.
*
* @author Suranjan Kumar ([email protected])
* @author Suranjan Kumar ([email protected]), Vaibhav Kushwaha ([email protected])
*/
public class YugabyteDBConnectorTask
extends BaseSourceTask<YBPartition, YugabyteDBOffsetContext> {
Expand All @@ -60,8 +60,6 @@ public class YugabyteDBConnectorTask

private volatile YugabyteDBTaskContext taskContext;
private volatile ChangeEventQueue<DataChangeEvent> queue;
private volatile YugabyteDBConnection jdbcConnection;
private volatile YugabyteDBConnection heartbeatConnection;
private volatile YugabyteDBSchema schema;

@Override
Expand All @@ -81,19 +79,12 @@ public ChangeEventSourceCoordinator<YBPartition, YugabyteDBOffsetContext> start(
final String databaseCharsetName = config.getString(YugabyteDBConnectorConfig.CHAR_SET);
final Charset databaseCharset = Charset.forName(databaseCharsetName);

Encoding encoding = Encoding.defaultEncoding(); // UTF-8
YugabyteDBTaskConnection taskConnection = new YugabyteDBTaskConnection(encoding);


final YugabyteDBValueConverterBuilder valueConverterBuilder = (typeRegistry) -> YugabyteDBValueConverter.of(
connectorConfig,
databaseCharset,
typeRegistry);



if (connectorConfig.isYSQLDbType()) {

String nameToTypeStr = config.getString(YugabyteDBConnectorConfig.NAME_TO_TYPE.toString());
String oidToTypeStr = config.getString(YugabyteDBConnectorConfig.OID_TO_TYPE.toString());

Expand All @@ -114,15 +105,10 @@ public ChangeEventSourceCoordinator<YBPartition, YugabyteDBOffsetContext> start(
LOGGER.error("Error while deserializing object to type string", e);
}

// Global JDBC connection used both for snapshotting and streaming.
// Must be able to resolve datatypes.
jdbcConnection = new YugabyteDBConnection(connectorConfig.getJdbcConfig(), valueConverterBuilder,
YugabyteDBConnection.CONNECTION_GENERAL);

// CDCSDK We can just build the type registry on the co-ordinator and then send
// the map of Postgres Type and Oid to the Task using Config
final YugabyteDBTypeRegistry yugabyteDBTypeRegistry = new YugabyteDBTypeRegistry(taskConnection, nameToType,
oidToType, jdbcConnection);
// This type registry is being build with the nameToType and oidToType map populated.
final YugabyteDBTypeRegistry yugabyteDBTypeRegistry =
new YugabyteDBTypeRegistry(connectorConfig.getJdbcConfig(), nameToType, oidToType);

schema = new YugabyteDBSchema(connectorConfig, yugabyteDBTypeRegistry, topicSelector,
valueConverterBuilder.build(yugabyteDBTypeRegistry));
Expand All @@ -146,15 +132,9 @@ public ChangeEventSourceCoordinator<YBPartition, YugabyteDBOffsetContext> start(
new YugabyteDBOffsetContext.Loader(connectorConfig));
final Clock clock = Clock.system();

YugabyteDBOffsetContext context = new YugabyteDBOffsetContext(previousOffsets,
connectorConfig);

LoggingContext.PreviousContext previousContext = taskContext
.configureLoggingContext(CONTEXT_NAME + "|" + taskId);
try {
// Print out the server information
// CDCSDK Get the table,

queue = new ChangeEventQueue.Builder<DataChangeEvent>()
.pollInterval(connectorConfig.getPollInterval())
.maxBatchSize(connectorConfig.getMaxBatchSize())
Expand All @@ -167,7 +147,7 @@ public ChangeEventSourceCoordinator<YBPartition, YugabyteDBOffsetContext> start(

final YugabyteDBEventMetadataProvider metadataProvider = new YugabyteDBEventMetadataProvider();

Configuration configuration = connectorConfig.getConfig();
// todo Vaibhav: see if we can get rid of heartbeat factory
HeartbeatFactory heartbeatFactory = new HeartbeatFactory<>(
connectorConfig,
topicSelector,
Expand Down Expand Up @@ -198,7 +178,7 @@ public ChangeEventSourceCoordinator<YBPartition, YugabyteDBOffsetContext> start(
metadataProvider,
heartbeatFactory,
schemaNameAdjuster,
jdbcConnection);
null /* jdbcConnection */);

YugabyteDBChangeEventSourceCoordinator coordinator = new YugabyteDBChangeEventSourceCoordinator(
previousOffsets,
Expand All @@ -208,7 +188,6 @@ public ChangeEventSourceCoordinator<YBPartition, YugabyteDBOffsetContext> start(
new YugabyteDBChangeEventSourceFactory(
connectorConfig,
snapshotter,
jdbcConnection,
errorHandler,
dispatcher,
clock,
Expand All @@ -233,6 +212,7 @@ public ChangeEventSourceCoordinator<YBPartition, YugabyteDBOffsetContext> start(
}
}

// todo Vaibhav: not being used.
Map<YBPartition, YugabyteDBOffsetContext> getPreviousOffsetss(
Partition.Provider<YBPartition> provider,
OffsetContext.Loader<YugabyteDBOffsetContext> loader) {
Expand Down Expand Up @@ -340,14 +320,6 @@ public List<SourceRecord> doPoll() throws InterruptedException {

@Override
protected void doStop() {
if (jdbcConnection != null) {
jdbcConnection.close();
}

if (heartbeatConnection != null) {
heartbeatConnection.close();
}

if (schema != null) {
schema.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,11 +373,17 @@ protected Optional<ColumnEditor> readTableColumn(CdcService.CDCSDKColumnInfoPB c
}

private int getLength(int oid) {
return getTypeRegistry().get(oid).getDefaultLength();
// YB Note: Since we are anyway getting the default length as -1 as the service doesn't send
// this value, it will not make any difference to return it directly without looking it up.
// return getTypeRegistry().get(oid).getDefaultLength();
return -1;
}

private int getScale(int oid) {
return getTypeRegistry().get(oid).getDefaultScale();
// YB Note: Since we are anyway getting the default scale as -1 as the service doesn't send
// this value, it will not make any difference to return it directly without looking it up.
// return getTypeRegistry().get(oid).getDefaultScale();
return -1;
}

private int resolveNativeType(int oid) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public YugabyteDBSnapshotChangeEventSource(YugabyteDBConnectorConfig connectorCo
this.dispatcher = dispatcher;
this.clock = clock;
this.snapshotter = snapshotter;
this.connection = connection;
this.connection = new YugabyteDBConnection(connectorConfig.getJdbcConfig(), YugabyteDBConnection.CONNECTION_GENERAL);
this.snapshotProgressListener = snapshotProgressListener;

AsyncYBClient asyncClient = new AsyncYBClient.AsyncYBClientBuilder(connectorConfig.masterAddresses())
Expand Down Expand Up @@ -155,7 +155,7 @@ public SnapshotResult<YugabyteDBOffsetContext> execute(ChangeEventSourceContext
partitions.forEach(YBPartition::markTableAsColocated);

LOGGER.info("Setting offsetContext/previousOffset for snapshot...");
previousOffset = YugabyteDBOffsetContext.initialContextForSnapshot(this.connectorConfig, connection, clock, partitions);
previousOffset = YugabyteDBOffsetContext.initialContextForSnapshot(this.connectorConfig, null /* connection */, clock, partitions);

this.partitionRanges = YugabyteDBConnectorUtils.populatePartitionRanges(
connectorConfig.getConfig().getString(YugabyteDBConnectorConfig.HASH_RANGES_LIST));
Expand Down Expand Up @@ -188,6 +188,16 @@ public SnapshotResult<YugabyteDBOffsetContext> execute(ChangeEventSourceContext
else {
snapshotProgressListener.snapshotAborted(partition);
}

// Close YugabyteDBConnection if opened.
try {
if (connection.isConnected()) {
LOGGER.info("Closing YugabyteDBConnection after snapshot");
connection.close();
}
} catch (SQLException sqle) {
LOGGER.error("Error while closing JDBC connection during snapshot", sqle);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public YugabyteDBStreamingChangeEventSource(YugabyteDBConnectorConfig connectorC
YugabyteDBSchema schema, YugabyteDBTaskContext taskContext, ReplicationConnection replicationConnection,
ChangeEventQueue<DataChangeEvent> queue) {
this.connectorConfig = connectorConfig;
this.connection = connection;
this.connection = new YugabyteDBConnection(connectorConfig.getJdbcConfig(), YugabyteDBConnection.CONNECTION_GENERAL);
this.dispatcher = dispatcher;
this.errorHandler = errorHandler;
this.clock = clock;
Expand Down Expand Up @@ -144,7 +144,7 @@ public void execute(ChangeEventSourceContext context, YBPartition partition, Yug

if (!hasStartLsnStoredInContext) {
LOGGER.info("No start opid found in the context.");
offsetContext = YugabyteDBOffsetContext.initialContext(connectorConfig, connection, clock, partitions);
offsetContext = YugabyteDBOffsetContext.initialContext(connectorConfig, null /* connection */, clock, partitions);
}
try {
// Populate partition ranges.
Expand Down
Loading
Loading