Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
Expand Down Expand Up @@ -96,7 +98,8 @@
}
String topic = config.getString(SchemaRegistryConfig.METADATA_ENCODER_TOPIC_CONFIG);
this.keyTemplate = KeyTemplates.get(KEY_TEMPLATE_NAME);
this.encoders = createCache(new StringSerde(), new KeysetWrapperSerde(config), topic, null);
this.encoders = createCache(new StringSerde(), new KeysetWrapperSerde(config), topic,
new TenantCacheUpdateHandler());
} catch (Exception e) {
throw new IllegalArgumentException("Could not instantiate MetadataEncoderService", e);
}
Expand Down Expand Up @@ -392,4 +395,26 @@
}
}
}

/**
* Cache update handler that logs tenant (key) updates to the encoder cache.
*/
private static class TenantCacheUpdateHandler
implements CacheUpdateHandler<String, KeysetWrapper> {

@Override
public void handleUpdate(String tenant, KeysetWrapper newValue, KeysetWrapper oldValue,
TopicPartition tp, long offset, long timestamp) {
if (oldValue == null) {
log.info("Encoder cache update: new tenant '{}' added (partition={}, offset={}, "
+ "timestamp={})", tenant, tp.partition(), offset, timestamp);

Check failure on line 410 in core/src/main/java/io/confluent/kafka/schemaregistry/storage/encoder/MetadataEncoderService.java

View check run for this annotation

SonarQube-Confluent / schema-registry Sonarqube Results

core/src/main/java/io/confluent/kafka/schemaregistry/storage/encoder/MetadataEncoderService.java#L410

Define a constant instead of duplicating this literal "timestamp={})" 3 times.
} else if (newValue == null) {
log.info("Encoder cache update: tenant '{}' removed (partition={}, offset={}, "
+ "timestamp={})", tenant, tp.partition(), offset, timestamp);
} else {
log.info("Encoder cache update: tenant '{}' updated (partition={}, offset={}, "
+ "timestamp={})", tenant, tp.partition(), offset, timestamp);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,27 +49,29 @@
encoderService.init();

Map<String, String> properties = new HashMap<>();
properties.put("nonsensitive", "foo");

Check failure on line 52 in core/src/test/java/io/confluent/kafka/schemaregistry/storage/encoder/MetadataEncoderServiceTest.java

View check run for this annotation

SonarQube-Confluent / schema-registry Sonarqube Results

core/src/test/java/io/confluent/kafka/schemaregistry/storage/encoder/MetadataEncoderServiceTest.java#L52

Define a constant instead of duplicating this literal "nonsensitive" 3 times.
properties.put("sensitive", "foo");

Check failure on line 53 in core/src/test/java/io/confluent/kafka/schemaregistry/storage/encoder/MetadataEncoderServiceTest.java

View check run for this annotation

SonarQube-Confluent / schema-registry Sonarqube Results

core/src/test/java/io/confluent/kafka/schemaregistry/storage/encoder/MetadataEncoderServiceTest.java#L53

Define a constant instead of duplicating this literal "sensitive" 4 times.
Metadata metadata = new Metadata(null, properties, Collections.singleton("sensitive"));

SchemaValue schema = new SchemaValue(
"mysubject", null, null, null, null, null,
new io.confluent.kafka.schemaregistry.storage.Metadata(metadata), null, "true", false);
encoderService.encodeMetadata(schema);
assertEquals(schema.getMetadata().getProperties().get("nonsensitive"), "foo");
assertEquals("foo", schema.getMetadata().getProperties().get("nonsensitive"));
// the value of "sensitive" is encrypted
assertNotEquals(schema.getMetadata().getProperties().get("sensitive"), "foo");
assertNotEquals("foo", schema.getMetadata().getProperties().get("sensitive"));
assertNotNull(schema.getMetadata().getProperties().get(SchemaValue.ENCODED_PROPERTY));

SchemaValue schema2 = new SchemaValue(
"mysubject", null, null, null, null, null,
new io.confluent.kafka.schemaregistry.storage.Metadata(
schema.getMetadata().toMetadataEntity()), null, "true", false);
encoderService.decodeMetadata(schema2);
assertEquals(schema2.getMetadata().getProperties().get("nonsensitive"), "foo");
assertEquals("foo", schema2.getMetadata().getProperties().get("nonsensitive"));
// the value of "sensitive" is decrypted
assertEquals(schema2.getMetadata().getProperties().get("sensitive"), "foo");
assertEquals("foo", schema2.getMetadata().getProperties().get("sensitive"));
assertNull(schema2.getMetadata().getProperties().get(SchemaValue.ENCODED_PROPERTY));

encoderService.close();
}
}