Skip to content

Commit 6049e89

Browse files
Merge branch '8.1.x' into master by rayokota
2 parents f614598 + 3d2a845 commit 6049e89

File tree

4 files changed

+34
-4
lines changed

4 files changed

+34
-4
lines changed

client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/Schema.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,10 @@ public Schema copy(Integer version, Integer id) {
258258
ruleSet, schema, schemaTags, timestamp, deleted);
259259
}
260260

261+
public Schema toHashKey() {
262+
return new Schema(subject, null, null, schemaType, references, metadata, ruleSet, schema);
263+
}
264+
261265
@io.swagger.v3.oas.annotations.media.Schema(description = SUBJECT_DESC, example = SUBJECT_EXAMPLE)
262266
@JsonProperty("subject")
263267
public String getSubject() {

core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1714,8 +1714,8 @@ public ParsedSchema parseSchema(
17141714
boolean normalize) throws InvalidSchemaException {
17151715
try {
17161716
ParsedSchema parsedSchema = isNew
1717-
? newSchemaCache.get(new RawSchema(schema.copy(), isNew, normalize))
1718-
: oldSchemaCache.get(new RawSchema(schema.copy(), isNew, normalize));
1717+
? newSchemaCache.get(new RawSchema(schema.toHashKey(), isNew, normalize))
1718+
: oldSchemaCache.get(new RawSchema(schema.toHashKey(), isNew, normalize));
17191719
if (schema.getVersion() != null) {
17201720
parsedSchema = parsedSchema.copy(schema.getVersion());
17211721
}
@@ -2323,6 +2323,16 @@ private CloseableIterator<SchemaRegistryValue> allVersionsFromAllContexts(
23232323
return new DelegatingIterator<>(versions.iterator());
23242324
}
23252325

2326+
public void invalidateFromNewSchemaCache(Schema schema) {
2327+
newSchemaCache.invalidate(new RawSchema(schema, true, false));
2328+
newSchemaCache.invalidate(new RawSchema(schema, true, true));
2329+
}
2330+
2331+
public void invalidateFromOldSchemaCache(Schema schema) {
2332+
oldSchemaCache.invalidate(new RawSchema(schema, false, false));
2333+
oldSchemaCache.invalidate(new RawSchema(schema, false, true));
2334+
}
2335+
23262336
public void clearNewSchemaCache() {
23272337
newSchemaCache.invalidateAll();
23282338
}

core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreMessageHandler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,12 +156,12 @@ private void handleDeleteSubject(DeleteSubjectValue deleteSubjectValue) {
156156
schemaValue.setDeleted(true);
157157
SchemaValue oldSchemaValue = (SchemaValue) lookupCache.put(schemaKey, schemaValue);
158158
lookupCache.schemaDeleted(schemaKey, schemaValue, oldSchemaValue);
159+
schemaRegistry.invalidateFromNewSchemaCache(schemaValue.toHashKey());
159160
}
160161
} catch (StoreException e) {
161162
log.error("Failed to delete subject {} in the local cache", subject, e);
162163
}
163164
}
164-
schemaRegistry.clearNewSchemaCache();
165165
}
166166

167167
private void handleClearSubject(ClearSubjectValue clearSubjectValue) {
@@ -185,7 +185,7 @@ private void handleSchemaUpdate(SchemaKey schemaKey,
185185

186186
if (schemaValue.isDeleted()) {
187187
lookupCache.schemaDeleted(schemaKey, schemaValue, oldSchemaValue);
188-
schemaRegistry.clearNewSchemaCache();
188+
schemaRegistry.invalidateFromNewSchemaCache(schemaValue.toHashKey());
189189
updateMetrics(metricsContainer.getSchemasDeleted(),
190190
metricsContainer.getSchemasDeleted(getSchemaType(schemaValue)));
191191
} else {
@@ -195,6 +195,7 @@ private void handleSchemaUpdate(SchemaKey schemaKey,
195195
}
196196
} else {
197197
lookupCache.schemaTombstoned(schemaKey, oldSchemaValue);
198+
// Need to clear entire cache until we can prevent hard deleting referenced schemas
198199
schemaRegistry.clearOldSchemaCache();
199200
}
200201
}

core/src/main/java/io/confluent/kafka/schemaregistry/storage/SchemaValue.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,4 +333,19 @@ public Schema toSchemaEntity() {
333333
isDeleted()
334334
);
335335
}
336+
337+
public Schema toHashKey() {
338+
return new Schema(
339+
getSubject(),
340+
null,
341+
null,
342+
getSchemaType(),
343+
getReferences() == null ? null : getReferences().stream()
344+
.map(SchemaReference::toRefEntity)
345+
.collect(Collectors.toList()),
346+
getMetadata() == null ? null : getMetadata().toMetadataEntity(),
347+
getRuleSet() == null ? null : getRuleSet().toRuleSetEntity(),
348+
getSchema()
349+
);
350+
}
336351
}

0 commit comments

Comments
 (0)