Questions about how to use Kafka Streams with Confluent.Kafka Headers #283
-
Hello, I'm trying to produce an event on an output topic with the Confluent.Kafka Headers object using Message and this error is occurring: Streamiz.Kafka.Net.Processors.Internal.TaskManager[0] My code looks like this: I declared the config And the implementation:
Is there any way? |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 4 replies
-
Hi @ncupertino , You can add or remove headers with var builder = new StreamBuilder();
builder.Stream<string, string>("topic-in", new StringSerDes(), new StringSerDes())
.Peek((k, v) => Console.WriteLine($" K: {k} V: {v}"))
.Map((key, value) => {
StreamizMetadata
.GetCurrentHeadersMetadata()
.Add("headerKey", new byte[] { 0x01, 0x02, 0x03 });
return KeyValuePair.Create<string, string>(null, value);
})
.To("topic-out"); |
Beta Was this translation helpful? Give feedback.
Hi @ncupertino ,
You can add or remove headers with
StreamizMetadata.GetCurrentHeadersMetadata()
inside a lambda function in Map(..), MapValues(...) or etc ...