-
Hi, Expected Value Actual Value var config = new StreamConfig<Int64SerDes,JsonSerDes<TeamInfo>>();
config.ApplicationId = "test-app-teaminfo";
config.BootstrapServers = "localhost:9092";
config.AutoOffsetReset = Confluent.Kafka.AutoOffsetReset.Earliest;
config.StateDir = @"C:/Temp/kafka-stream/";
var keys = new List<long>();
var builder = new StreamBuilder();
builder.Stream<long, TeamInfo>("TeamInfo")
.Foreach((k, v) =>
{
keys.Add(k);
});
var topology = builder.Build();
var stream = new KafkaStream(topology, config);
stream.Start(); |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 5 replies
-
Hi @jdang67, What is the nature of the application which produce inside your source topic ? A Java producer or .net producer ? Can you share the snippet of code of your producer, mainly the serializer please ? It seems an endianness issue. |
Beta Was this translation helpful? Give feedback.
-
I have not tried to produce the data again with your new Serde. To make your code work, see the code , I need to make a minor change public class BigEndianInt64SerDes : AbstractSerDes<long>
{
/// <summary>
/// Deserialize a record value from a big endian byte array into <see cref="Int64"/> value
/// </summary>
/// <param name="data">serialized bytes in big endian.</param>
/// <param name="context">serialization context</param>
/// <returns>deserialized <see cref="Int64"/> using data; may be null</returns>
public override long Deserialize(byte[] data, SerializationContext context)
{
return System.Buffers.Binary.BinaryPrimitives.ReadInt64BigEndian(data);
}
/// <summary>
/// Convert long <code>data</code> into a big endian byte array
/// </summary>
/// <param name="data"><see cref="Int64"/> data</param>
/// <param name="context">serialization context</param>
/// <returns>serialized bytes in big endian</returns>
public override byte[] Serialize(long data, SerializationContext context)
{
Span<byte> span = new Span<byte>();
// System.Buffers.Binary.BinaryPrimitives.WriteInt64BigEndian(span, data);
return BitConverter.GetBytes(data);
//return span.ToArray();
}
} |
Beta Was this translation helpful? Give feedback.
@LGouellec,
I have not tried to produce the data again with your new Serde. To make your code work, see the code , I need to make a minor change
otherwise, it throws an exception.