Replies: 2 comments 4 replies
-
Hi @bmmptlgc, var predicateHeader = (Headers headers, string headerKey, string headerValue) =>
{
var messageTypeHeader = headers.FirstOrDefault(h => h.Key == headerKey)?.GetValueBytes();
if (messageTypeHeader == null)
return false;
var messageType = Encoding.UTF8.GetString(messageTypeHeader);
return messageType.Equals(headerValue);
};
var sourceStream = builder.Stream<string, GenericRecord>("input");
var branches = sourceStream
.Branch(
(k,v) => predicateHeader(StreamizMetadata.GetCurrentHeadersMetadata(), "message-type", "orders"),
(k,v) => predicateHeader(StreamizMetadata.GetCurrentHeadersMetadata(), "message-type", "sales"),
(k,v) => predicateHeader(StreamizMetadata.GetCurrentHeadersMetadata(), "message-type", "test"),
(s, record) => true);
// only messages with the header message-type = orders
branches[0].To(...);
// only messages with the header message-type = sales
branches[1].To(...);
// only messages with the header message-type = test
branches[2].To(...);
// dlq
branches[3].To("dlq-topic"); Btw, I plan to add the context in the filter and branch DSL processors in a short term |
Beta Was this translation helpful? Give feedback.
3 replies
-
Hi @LGouellec, |
Beta Was this translation helpful? Give feedback.
1 reply
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
I am new to Streamiz.Kafka.Net and I am trying to write a streaming application the will consume messages from a multi Avro schema Kafka topic, filter messages on a message type header and then publish the non filtered messages to a topic dedicated to a single message type.
For example, lets imagine source-topic has message types A, B and C (header message-type will contain the value). Considering that we want to filter out messages of type B, the end result would be all messages with message-type A being published to destination-topic-A and all messages with message-type C being published to destination-topic-C.
Something like that wouldn't work, because Filter's Func parameter doesn't have a context parameter. Is there another way to achieve my goal?
Beta Was this translation helpful? Give feedback.
All reactions