Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
# Maven build targets
target/

# NPM dependencies and build output
node_modules/
dist/

# Maven release files
*.releaseBackup
release.properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,13 @@ public RecordData deserialize(String topic, Headers headers, byte[] data) {
}
}

var idHandler = baseSerde.getIdHandler();
ByteBuffer buffer;
SchemaLookupResult<Object> schema;
int length;

if (data.length > 0 && data[0] == BaseSerde.MAGIC_BYTE) {
if (data.length > 0 && data[0] == BaseSerde.MAGIC_BYTE && idHandler != null) {
buffer = BaseSerde.getByteBuffer(data);
var idHandler = baseSerde.getIdHandler();
artifactReference = idHandler.readId(buffer);
schema = resolve(artifactReference);
length = buffer.limit() - idHandler.idSize() - 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class RecordsResourceIT {
RecordHelper recordUtils;
String clusterId1;
String clusterId3;
String clusterIdY = "test-kafkaY";

@BeforeEach
void setup() {
Expand Down Expand Up @@ -473,6 +474,24 @@ void testConsumeRecordWithOffsetBeforeBeginning() {
.body("data[0].attributes.value", is(equalTo("fourth")));
}

@Test
void testConsumeRecordMagicByteWithoutRegistry() {
final String topicName = UUID.randomUUID().toString();
var topicIds = topicUtils.createTopics(List.of(topicName), 1);
recordUtils.produceRecord(topicName, null, null, null, "\u0000rest of value");

await().atMost(5, TimeUnit.SECONDS)
.until(() -> topicUtils.getTopicSize(topicName) == 1);

whenRequesting(req -> req
.get("", clusterIdY, topicIds.get(topicName)))
.assertThat()
.statusCode(is(Status.OK.getStatusCode()))
.body("data", hasSize(1))
.body("data[0].attributes.offset", is(equalTo(0)))
.body("data[0].attributes.value", is(equalTo("\u0000rest of value")));
}

@Test
void testConsumeRecordFromTimestampAcrossPartitions() {
final String topicName = UUID.randomUUID().toString();
Expand Down