Skip to content

Commit

Permalink
Merge v2 to v3
Browse files Browse the repository at this point in the history
Merge v2 to v3
  • Loading branch information
zarusz authored Nov 9, 2024
2 parents 834044d + 00012ee commit 1e78a38
Show file tree
Hide file tree
Showing 30 changed files with 466 additions and 294 deletions.
1 change: 0 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ on:
pull_request_target:
branches: ["master", "devops/*"]
workflow_dispatch:
branches: ["master", "release/*", "feature/*"]

permissions:
contents: read
Expand Down
80 changes: 52 additions & 28 deletions docs/provider_kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ Please read the [Introduction](intro.md) before reading this provider documentat
- [Configuration properties](#configuration-properties)
- [Minimizing message latency](#minimizing-message-latency)
- [SSL and password authentication](#ssl-and-password-authentication)
- [Selecting message partition for topic producer](#selecting-message-partition-for-topic-producer)
- [Default partitioner with message key](#default-partitioner-with-message-key)
- [Assigning partition explicitly](#assigning-partition-explicitly)
- [Consumer context](#consumer-context)
- [Producers](#producers)
- [High throughput publish](#high-throughput-publish)
- [Selecting message partition for topic producer](#selecting-message-partition-for-topic-producer)
- [Default partitioner with message key](#default-partitioner-with-message-key)
- [Assigning partition explicitly](#assigning-partition-explicitly)
- [Message Headers](#message-headers)
- [Consumers](#consumers)
- [Consumer context](#consumer-context)
- [Offset Commit](#offset-commit)
- [Consumer Error Handling](#consumer-error-handling)
- [Debugging](#debugging)
Expand Down Expand Up @@ -119,12 +121,34 @@ private static void AddSsl(string username, string password, ClientConfig c)

The file `cloudkarafka_2020-12.ca` has to be set to `Copy to Output Directory` as `Copy always`.

## Selecting message partition for topic producer
## Producers

### High throughput publish

By default each [.Publish()](../src/SlimMessageBus/IPublishBus.cs) / [.Send()](../src/SlimMessageBus/RequestResponse/IRequestResponseBus.cs) is producing the message to the Kafka transport and awaiting the response.
This is to ensure the errors in delivery to the Kafka transport are reported as [ProducerMessageBusException](../src/SlimMessageBus/Exceptions/ProducerMessageBusException.cs) and ensuring delivery to the kafka cluster.

However, for scenarios where we want higher throughput with the sacrifice of delivery we can use the `.EnableProduceAwait(false)` on the producer or bus configuration.
When await is disabled the message will be delivered to the Kafka client without awaiting the produce result, and the client's internal buffering will be used more effectively.

> When `.EnableProduceAwait(false)` any errors in delivery will be reported only in the logs resulting in an fire&forget semantics and possible message loss.
```cs
mbb.Produce<PingMessage>(x =>
{
x.DefaultTopic(topic);
// Partition #0 - for even counters, and #1 - for odd counters
x.PartitionProvider((m, t) => m.Counter % 2);
x.EnableProduceAwait(enableProduceAwait);
});
```

### Selecting message partition for topic producer

Kafka topics are broken into partitions. The question is how does SMB Kafka choose the partition to assign the message?
There are two possible options:

### Default partitioner with message key
#### Default partitioner with message key

Currently, [confluent-kafka-dotnet](https://github.com/confluentinc/confluent-kafka-dotnet) does not support custom partitioners (see [here](https://github.com/confluentinc/confluent-kafka-dotnet/issues/343)).
The default partitioner is supported, which works in this way:
Expand All @@ -148,7 +172,7 @@ mbb

The key must be a `byte[]`.

### Assigning partition explicitly
#### Assigning partition explicitly

SMB Kafka allows to set a provider (selector) that will assign the partition number for a given message and topic pair. Here is an example:

Expand All @@ -167,33 +191,13 @@ mbb

With this approach your provider needs to know the number of partitions for a topic.

## Consumer context

The consumer can implement the `IConsumerWithContext` interface to access the Kafka native message:

```cs
public class PingConsumer : IConsumer<PingMessage>, IConsumerWithContext
{
public IConsumerContext Context { get; set; }

public Task OnHandle(PingMessage message)
{
// SMB Kafka transport specific extension:
var transportMessage = Context.GetTransportMessage();
var partition = transportMessage.TopicPartition.Partition;
}
}
```

This could be useful to extract the message's offset or partition.

## Message Headers

SMB uses headers to pass additional metadata information with the message. This includes the `MessageType` (of type `string`) or in the case of request/response messages the `RequestId` (of type `string`), `ReplyTo` (of type `string`) and `Expires` (of type `long`).

The Kafka message header values are natively binary (`byte[]`) in the underlying .NET client, as a result SMB needs to serialize the header values.
By default the [DefaultKafkaHeaderSerializer](../src/SlimMessageBus.Host.Kafka/DefaultKafkaHeaderSerializer.cs) is used to serialize header values.
If you need to specify a different serializer provide a specfic `IMessageSerializer` implementation (custom or one of the available serialization plugins):
If you need to specify a different serializer provide a specific `IMessageSerializer` implementation (custom or one of the available serialization plugins):

```cs
// MessageBusBuilder mbb;
Expand All @@ -209,6 +213,26 @@ mbb
## Consumers

### Consumer context

The consumer can implement the `IConsumerWithContext` interface to access the Kafka native message:

```cs
public class PingConsumer : IConsumer<PingMessage>, IConsumerWithContext
{
public IConsumerContext Context { get; set; }

public Task OnHandle(PingMessage message)
{
// SMB Kafka transport specific extension:
var transportMessage = Context.GetTransportMessage();
var partition = transportMessage.TopicPartition.Partition;
}
}
```

This could be useful to extract the message's offset or partition.

### Offset Commit

In the current Kafka provider implementation, SMB handles the manual commit of topic-partition offsets for the consumer.Th
Expand Down
72 changes: 44 additions & 28 deletions docs/provider_kafka.t.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ Please read the [Introduction](intro.md) before reading this provider documentat
- [Configuration properties](#configuration-properties)
- [Minimizing message latency](#minimizing-message-latency)
- [SSL and password authentication](#ssl-and-password-authentication)
- [Selecting message partition for topic producer](#selecting-message-partition-for-topic-producer)
- [Default partitioner with message key](#default-partitioner-with-message-key)
- [Assigning partition explicitly](#assigning-partition-explicitly)
- [Consumer context](#consumer-context)
- [Producers](#producers)
- [High throughput publish](#high-throughput-publish)
- [Selecting message partition for topic producer](#selecting-message-partition-for-topic-producer)
- [Default partitioner with message key](#default-partitioner-with-message-key)
- [Assigning partition explicitly](#assigning-partition-explicitly)
- [Message Headers](#message-headers)
- [Consumers](#consumers)
- [Consumer context](#consumer-context)
- [Offset Commit](#offset-commit)
- [Consumer Error Handling](#consumer-error-handling)
- [Debugging](#debugging)
Expand Down Expand Up @@ -119,12 +121,26 @@ private static void AddSsl(string username, string password, ClientConfig c)

The file `cloudkarafka_2020-12.ca` has to be set to `Copy to Output Directory` as `Copy always`.

## Selecting message partition for topic producer
## Producers

### High throughput publish

By default each [.Publish()](../src/SlimMessageBus/IPublishBus.cs) / [.Send()](../src/SlimMessageBus/RequestResponse/IRequestResponseBus.cs) is producing the message to the Kafka transport and awaiting the response.
This is to ensure the errors in delivery to the Kafka transport are reported as [ProducerMessageBusException](../src/SlimMessageBus/Exceptions/ProducerMessageBusException.cs) and ensuring delivery to the kafka cluster.

However, for scenarios where we want higher throughput with the sacrifice of delivery we can use the `.EnableProduceAwait(false)` on the producer or bus configuration.
When await is disabled the message will be delivered to the Kafka client without awaiting the produce result, and the client's internal buffering will be used more effectively.

> When `.EnableProduceAwait(false)` any errors in delivery will be reported only in the logs resulting in an fire&forget semantics and possible message loss.
@[:cs](../src/Tests/SlimMessageBus.Host.Kafka.Test/KafkaMessageBusIt.cs,ExampleEnableProduceAwait)

### Selecting message partition for topic producer

Kafka topics are broken into partitions. The question is how does SMB Kafka choose the partition to assign the message?
There are two possible options:

### Default partitioner with message key
#### Default partitioner with message key

Currently, [confluent-kafka-dotnet](https://github.com/confluentinc/confluent-kafka-dotnet) does not support custom partitioners (see [here](https://github.com/confluentinc/confluent-kafka-dotnet/issues/343)).
The default partitioner is supported, which works in this way:
Expand All @@ -148,7 +164,7 @@ mbb

The key must be a `byte[]`.

### Assigning partition explicitly
#### Assigning partition explicitly

SMB Kafka allows to set a provider (selector) that will assign the partition number for a given message and topic pair. Here is an example:

Expand All @@ -167,33 +183,13 @@ mbb

With this approach your provider needs to know the number of partitions for a topic.

## Consumer context

The consumer can implement the `IConsumerWithContext` interface to access the Kafka native message:

```cs
public class PingConsumer : IConsumer<PingMessage>, IConsumerWithContext
{
public IConsumerContext Context { get; set; }

public Task OnHandle(PingMessage message)
{
// SMB Kafka transport specific extension:
var transportMessage = Context.GetTransportMessage();
var partition = transportMessage.TopicPartition.Partition;
}
}
```

This could be useful to extract the message's offset or partition.

## Message Headers

SMB uses headers to pass additional metadata information with the message. This includes the `MessageType` (of type `string`) or in the case of request/response messages the `RequestId` (of type `string`), `ReplyTo` (of type `string`) and `Expires` (of type `long`).

The Kafka message header values are natively binary (`byte[]`) in the underlying .NET client, as a result SMB needs to serialize the header values.
By default the [DefaultKafkaHeaderSerializer](../src/SlimMessageBus.Host.Kafka/DefaultKafkaHeaderSerializer.cs) is used to serialize header values.
If you need to specify a different serializer provide a specfic `IMessageSerializer` implementation (custom or one of the available serialization plugins):
If you need to specify a different serializer provide a specific `IMessageSerializer` implementation (custom or one of the available serialization plugins):

```cs
// MessageBusBuilder mbb;
Expand All @@ -209,6 +205,26 @@ mbb
## Consumers

### Consumer context

The consumer can implement the `IConsumerWithContext` interface to access the Kafka native message:

```cs
public class PingConsumer : IConsumer<PingMessage>, IConsumerWithContext
{
public IConsumerContext Context { get; set; }

public Task OnHandle(PingMessage message)
{
// SMB Kafka transport specific extension:
var transportMessage = Context.GetTransportMessage();
var partition = transportMessage.TopicPartition.Partition;
}
}
```

This could be useful to extract the message's offset or partition.

### Offset Commit

In the current Kafka provider implementation, SMB handles the manual commit of topic-partition offsets for the consumer.Th
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
namespace SlimMessageBus.Host;

using System.Reflection;

public abstract class AbstractConsumerBuilder : IAbstractConsumerBuilder
public abstract class AbstractConsumerBuilder : IAbstractConsumerBuilder, IConsumerBuilder
{
public MessageBusSettings Settings { get; }

public ConsumerSettings ConsumerSettings { get; }

AbstractConsumerSettings IAbstractConsumerBuilder.ConsumerSettings => ConsumerSettings;

HasProviderExtensions IBuilderWithSettings.Settings => ConsumerSettings;

protected AbstractConsumerBuilder(MessageBusSettings settings, Type messageType, string path = null)
{
Settings = settings ?? throw new ArgumentNullException(nameof(settings));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
namespace SlimMessageBus.Host;

public class ConsumerBuilder<TMessage> : AbstractConsumerBuilder
public class ConsumerBuilder<TMessage> : AbstractConsumerBuilder, IConsumerBuilder
{
HasProviderExtensions IBuilderWithSettings.Settings => ConsumerSettings;

public ConsumerBuilder(MessageBusSettings settings, Type messageType = null)
: base(settings, messageType ?? typeof(TMessage))
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace SlimMessageBus.Host;

public interface IBuilderWithSettings
{
HasProviderExtensions Settings { get; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
namespace SlimMessageBus.Host;

public interface IConsumerBuilder : IBuilderWithSettings
{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
namespace SlimMessageBus.Host;

public interface IProducerBuilder : IBuilderWithSettings
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ namespace SlimMessageBus.Host;

using Microsoft.Extensions.DependencyInjection.Extensions;

public class MessageBusBuilder : IHasPostConfigurationActions, ISerializationBuilder
public class MessageBusBuilder : IHasPostConfigurationActions, ISerializationBuilder, IProducerBuilder
{
/// <summary>
/// Parent bus builder.
Expand All @@ -17,15 +17,18 @@ public class MessageBusBuilder : IHasPostConfigurationActions, ISerializationBui
/// <summary>
/// The current settings that are being built.
/// </summary>
public MessageBusSettings Settings { get; private set; } = new();
public MessageBusSettings Settings { get; private set; } = new();

HasProviderExtensions IBuilderWithSettings.Settings => Settings;

/// <summary>
/// The bus factory method.
/// </summary>
public Func<MessageBusSettings, IMessageBusProvider> BusFactory { get; private set; }

public IList<Action<IServiceCollection>> PostConfigurationActions { get; } = [];



protected MessageBusBuilder()
{
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
namespace SlimMessageBus.Host;

public class ProducerBuilder<T>
public class ProducerBuilder<T> : IProducerBuilder
{
public ProducerSettings Settings { get; }

public Type MessageType => Settings.MessageType;


HasProviderExtensions IBuilderWithSettings.Settings => Settings;

public ProducerBuilder(ProducerSettings settings)
: this(settings, typeof(T))
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
namespace SlimMessageBus.Host;

public class RequestResponseBuilder : IAbstractConsumerBuilder
public class RequestResponseBuilder : IAbstractConsumerBuilder, IConsumerBuilder
{
public RequestResponseSettings Settings { get; }

public AbstractConsumerSettings ConsumerSettings => Settings;

HasProviderExtensions IBuilderWithSettings.Settings => Settings;

public RequestResponseBuilder(RequestResponseSettings settings)
{
Settings = settings;
Expand Down
Loading

0 comments on commit 1e78a38

Please sign in to comment.