diff --git a/src/main/java/io/connect/scylladb/ScyllaDbSessionImpl.java b/src/main/java/io/connect/scylladb/ScyllaDbSessionImpl.java index 01fb369..ea976ae 100644 --- a/src/main/java/io/connect/scylladb/ScyllaDbSessionImpl.java +++ b/src/main/java/io/connect/scylladb/ScyllaDbSessionImpl.java @@ -91,6 +91,7 @@ public TableMetadata.Table tableMetadata(String tableName) { result = new TableMetadataImpl.TableImpl(tableMetadata.get()); this.tableMetadataCache.put(tableName, result); } else { + log.warn("Could not find metadata for table {} in keyspace {}. Are you sure the table exists?", tableName, keyspaceMetadata.getName().asCql(false)); result = null; } } diff --git a/src/main/java/io/connect/scylladb/ScyllaDbSinkConnectorConfig.java b/src/main/java/io/connect/scylladb/ScyllaDbSinkConnectorConfig.java index 3af8d66..b7e4d78 100644 --- a/src/main/java/io/connect/scylladb/ScyllaDbSinkConnectorConfig.java +++ b/src/main/java/io/connect/scylladb/ScyllaDbSinkConnectorConfig.java @@ -25,6 +25,8 @@ import io.confluent.kafka.connect.utils.config.ValidEnum; import io.confluent.kafka.connect.utils.config.ValidPort; import io.connect.scylladb.topictotable.TopicConfigs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Configuration class for {@link ScyllaDbSinkConnector}. @@ -62,6 +64,10 @@ public class ScyllaDbSinkConnectorConfig extends AbstractConfig { private static final Pattern TOPIC_KS_TABLE_SETTING_PATTERN = Pattern.compile("topic\\.([a-zA-Z0-9._-]+)\\.([^.]+|\"[\"]+\")\\.([^.]+|\"[\"]+\")\\.(mapping|consistencyLevel|ttlSeconds|deletesEnabled)$"); + private static final String[] TOPIC_WISE_CONFIGS_VALID_SUFFIXES = {".mapping",".consistencyLevel",".ttlSeconds",".deletesEnabled"}; + + private static final Logger log = LoggerFactory.getLogger(ScyllaDbSinkConnectorConfig.class); + static final Set CLIENT_COMPRESSION = ImmutableSet.of("none", "lz4", "snappy"); static final Set TABLE_COMPRESSORS = ImmutableSet.of("SnappyCompressor", "LZ4Compressor", "DeflateCompressor", "none"); @@ -111,8 +117,9 @@ public ScyllaDbSinkConnectorConfig(Map originals) { Map> topicWiseConfigsMap = new HashMap<>(); for (final Map.Entry entry : ((Map) originals).entrySet()) { final String name2 = entry.getKey(); - if (name2.startsWith("topic.")) { + if (name2.startsWith("topic.") && hasTopicWiseConfigSuffix(name2)) { final String topicName = this.tryMatchTopicName(name2); + log.debug("Interpreting " + name2 + " as custom TopicWiseConfig for topic " + topicName); final Map topicMap = topicWiseConfigsMap.computeIfAbsent(topicName, t -> new HashMap()); topicMap.put(name2.split("\\.")[name2.split("\\.").length - 1], entry.getValue()); } @@ -567,6 +574,13 @@ private String tryMatchTopicName(final String name) { throw new IllegalArgumentException("The setting: " + name + " does not match topic.keyspace.table nor topic.codec regular expression pattern"); } + private boolean hasTopicWiseConfigSuffix(final String name) { + for (String suffix : TOPIC_WISE_CONFIGS_VALID_SUFFIXES) { + if (name.endsWith(suffix)) return true; + } + return false; + } + private static String[] toStringArray(Object[] arr){ return Arrays.stream(arr).map(Object::toString).toArray(String[]::new); }