Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Producer - Exception occured inside SelectAsync - Cancellation cause must not be null #426

Open
mhbuck opened this issue Jan 28, 2025 · 5 comments · Fixed by #427
Open

Comments

@mhbuck
Copy link

mhbuck commented Jan 28, 2025

Version Information
Akka.Streams Version=1.5.37
Akka.Streams.Kafka Version=1.5.35

Describe the bug
When running the above versions the system is intermittently getting the below error when producing messages.

An exception occured inside SelectAsync while processing message ["System.Threading.Tasks.ContinuationResultTaskFromResultTask`2[Akka.Streams.Kafka.Messages.MultiResultPart`2[System.Int64,System.Byte[]][],Akka.Streams.Kafka.Messages.IResults`3[System.Int64,System.Byte[],Models.External.Market]]"]. Supervision strategy: Stop

[Thread 0042][[akka://passthrough/user/StreamSupervisor-0/Flow-0-0-unknown-operation#826107269]] Error in stage [Akka.Streams.Kafka.Stages.DefaultProducerStage`5[System.Int64,System.Byte[],Models.External.Market,Akka.Streams.Kafka.Messages.IEnvelope`3[System.Int64,System.Byte[],Models.External.Market],Akka.Streams.Kafka.Messages.IResults`3[System.Int64,System.Byte[],Models.External.Market]]]: Cancellation cause must not be null (Parameter 'cause')
Cause: System.ArgumentException: Cancellation cause must not be null (Parameter 'cause')
   at Akka.Streams.Stage.GraphStageLogic.InternalOnDownstreamFinish(Exception cause)
   at Akka.Streams.Stage.OutHandler.OnDownstreamFinish(Exception cause)
   at Akka.Streams.Stage.GraphStageLogic.LambdaOutHandler.OnDownstreamFinish(Exception cause)
   at Akka.Streams.Implementation.Fusing.GraphInterpreter.Execute(Int32 eventLimit)

To Reproduce
What I have below is an attempt to simplify the code and replicate what is happening. Unfortunately currently I have not been able to replicate it while pointing to a local docker instance of Kafka.

This specifically looks like an issue with the Flow and not the Sink. I have not been able to prove this completely but there are numerous systems running with sinks for publishing that are not presenting the same issues.

The code that has been simplified in an attempt to replicate

using System.Text.Json;
using Akka.Actor;
using Akka.Configuration;
using Akka.Hosting;
using Akka.Logger.Serilog;
using Akka.Streams.Dsl;
using Akka.Streams.Kafka.Dsl;
using Akka.Streams.Kafka.Extensions;
using Akka.Streams.Kafka.Messages;
using Akka.Streams.Kafka.Settings;
using Confluent.Kafka;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Serilog;

var hostBuilder = new HostBuilder();

//Add serilog logging
var logger = new LoggerConfiguration()
    .WriteTo.Console()
    .CreateLogger();

// Add Serilog to the host
hostBuilder.ConfigureLogging(logging =>
{
    logging.AddSerilog(logger);
});

hostBuilder.ConfigureServices((context, services) =>
{
    var kafkaConfig =
        ConfigurationFactory.FromResource<ConsumerSettings<object, object>>(
            "Akka.Streams.Kafka.reference.conf");
    services.AddAkka("MyActorSystem", (builder, sp) =>
    {
        builder.ConfigureLoggers(setup =>
        {
            setup.LogLevel = Akka.Event.LogLevel.InfoLevel;
            setup.AddLogger<SerilogLogger>();
        });

        builder.AddHocon(kafkaConfig, HoconAddMode.Prepend);
    });
});

var host = hostBuilder.Build();

var completionTask = host.RunAsync();

// grab the ActorSystem from the DI container
var actorSystem = host.Services.GetRequiredService<ActorSystem>();


var producerConfig = new ProducerConfig()
{
    BootstrapServers = "127.0.0.1:9092",
    ClientId = $"{Environment.MachineName}-{Environment.ProcessId}-producer",
    CompressionType = CompressionType.Lz4
};

var producerSettings = ProducerSettings<long, byte[]>
    .Create(actorSystem, Serializers.Int64, Serializers.ByteArray)
    .WithProducerConfig(producerConfig);

var kafkaProducerFlow = KafkaProducer.FlexiFlow<long, byte[], ThingDto>(producerSettings); 


var source = 
    Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(250), "publish")
    .Select(_ => CreateMarketProducerRecord()) 
    .Via(kafkaProducerFlow)
    .Select(ProcessResults)
    .RunWith(Sink.Ignore<IResults<long, byte[], ThingDto>>(), actorSystem);

await completionTask; // wait for the host to shut down

IEnvelope<long, byte[], ThingDto> CreateMarketProducerRecord()
{
    // Random long number
    var random = new Random().Next(1, 1000);
    var messageForKafka = new ThingDto("name", random, DateTime.UtcNow, new[] { new Child("child1"), new Child("child2") });

    var jsonPayload = JsonSerializer.SerializeToUtf8Bytes(messageForKafka);
    var normalMarketRecord = new ProducerRecord<long, byte[]>("test-topic", messageForKafka.Number, jsonPayload);

    return ProducerMessage.Multi(new[] { normalMarketRecord, normalMarketRecord }.ToImmutableSet(), messageForKafka);
}

IResults<long, byte[], ThingDto> ProcessResults(
    IResults<long, byte[], ThingDto> result)
{
    
    if (result is Result<long, byte[], ThingDto> response)
    {
        logger.Information($"{response.Metadata.Topic}/{response.Metadata.Partition} {response.Metadata.Offset}" );
    }
    else if (result is MultiResult<long, byte[], ThingDto> multiResponse)
    {
        foreach (var part in multiResponse.Parts)
        {
            logger.Information($"{part.Metadata.Topic}/{part.Metadata.Partition} {part.Metadata.Offset}" );
        }
    }

    return result;
}

// Creating an object that is a bit bigger than just a string value.
public class ThingDto
{
    public ThingDto(string name, long number, DateTime dateTime, Child[] children)
    {
        Name = name;
        Number = number;
        DateTime = dateTime;
        Children = children;
    }

    public string Name { get; set; }
    public long Number { get; set; }
    public DateTime DateTime { get; set; }
    public Child[] Children { get; set; }
}

public class Child
{
    public Child(string name)
    {
        Name = name;
    }

    public string Name { get; set; }
}

The csproj file

<Project Sdk="Microsoft.NET.Sdk">
    <PropertyGroup>
        <OutputType>Exe</OutputType>
        <TargetFramework>net9.0</TargetFramework>
        <ImplicitUsings>enable</ImplicitUsings>
        <Nullable>enable</Nullable>
    </PropertyGroup>
    <ItemGroup>
        <PackageReference Include="Akka.Hosting" Version="1.5.37" />
        <PackageReference Include="Akka.Logger.Serilog" Version="1.5.25" />
        <PackageReference Include="Akka.Streams" Version="1.5.37" />
        <PackageReference Include="Akka.Streams.Kafka" Version="1.5.35" />
        <PackageReference Include="Microsoft.Extensions.Hosting" Version="9.0.1"/>
        <PackageReference Include="Serilog" Version="4.2.0" />
        <PackageReference Include="Serilog.Extensions.Hosting" Version="9.0.0" />
        <PackageReference Include="Serilog.Sinks.Console" Version="6.0.0" />
    </ItemGroup>
</Project>

Expected behavior
Expect more information about the failure or the failure being handled correctly. The cancellation cause is null so there is no information about how it is being handled.

Environment
This is running in docker containers in Azure AKS connecting to Confluent Cloud.

Additional context
This code has been running for a couple hours and have published about ~60k messages with no failures to a local docker kafka instance. Next test will be doing it against the Confluent Cloud instance which is possibly pointing this to being an intermittent connection issues.

@mhbuck
Copy link
Author

mhbuck commented Jan 29, 2025

I have been running the above code pushing to Confluent Cloud from my local machine. So far there has been no failures. I am going to try start pushing the data sizes higher and more frequent to test that out.

The system that is giving issues I have enabled debug logs on and currently there is nothing really additional that I can report from those logs.

The error occurs

[ERROR][01/29/2025 10:13:03.930Z][Thread 0045][[akka://passthrough/user/StreamSupervisor-0/Flow-0-0-unknown-operation#2007877700]] 
Error in stage [Akka.Streams.Kafka.Stages.DefaultProducerStage`5[System.Int64,System.Byte[],Models.External.Market,Akka.Streams.Kafka.Messages.IEnvelope`3[System.Int64,System.Byte[],Models.External.Market],Akka.Streams.Kafka.Messages.IResults`3[System.Int64,System.Byte[],Models.External.Market]]]: 
Cancellation cause must not be null (Parameter 'cause')

Then after that get a Stage Complete message and producer closed

[DEBUG][01/29/2025 10:13:03.930Z][Thread 0045][FlowShape`2([kafka.producer.in] [kafka.producer.out])(akka.tcp://[email protected]:2552/user/StreamSupervisor-0)] Stage completed

[DEBUG][01/29/2025 10:13:04.074Z][Thread 0045][FlowShape`2([kafka.producer.in] [kafka.producer.out])(akka.tcp://[email protected]:2552/user/StreamSupervisor-0)] Producer closed: passthrough-9d448664f-zjzsj-1-producer#producer-4

After that in the logs go into the RestartFlow logic which I added in to keep the service processing. The majority of the debug logs seem to be around the consumer opposed to producer.

@Arkatufus
Copy link
Contributor

The exception was thrown from inside the DefaultProducerStage.DefaultProducerStageLogic OutHandler but it is not caused by a code inside Akka.Streams.Kafka. The DefaultProducerStageLogic OutHandler is a LambdaOutHandler with a null onDownstreamFinish delegate.

This means that the problem is inside the Akka.Streams SelectAsync code still not handling the Result<T>.FromTask() properly, which is odd.

@mhbuck
Copy link
Author

mhbuck commented Feb 17, 2025

To report back on this. We do still get intermittent errors about the task continuation.

[ERROR][02/17/2025 11:15:52.760Z][Thread 0020][FlowShape`2([SelectAsync.in] [SelectAsync.out])(akka.tcp://[email protected]:2552/user/StreamSupervisor-0)] An exception occured inside SelectAsync while processing message [System.Threading.Tasks.ContinuationResultTaskFromResultTask`2[Akka.Streams.Kafka.Messages.MultiResultPart`2[System.Int64,System.Byte[]][],Akka.Streams.Kafka.Messages.IResults`3[System.Int64,System.Byte[],Models.External.Market]]]. Supervision strategy: Stop

From all of my testing as long as we have the flow in a restart we handle this stop and restart publishing I have not found any instances where data has been lost. So far in production (which I released this morning) the errors are around once every 2 hours.

@Aaronontheweb Aaronontheweb reopened this Feb 17, 2025
@Aaronontheweb
Copy link
Member

We'll leave this issue open then as the issue isn't fully resolved - thanks for letting us know @mhbuck

@mhbuck
Copy link
Author

mhbuck commented Feb 19, 2025

I have been discussing with another team and they are having issues with what I believe is similar to the above issue.

This team is running Akka 1.5.38 and Akka.Streams.Kafka 1.5.37

The SelectAsync for a flow that is being used as part of a Kafka Consumer stream is throwing an exception and following the various code flows and looking at the logs I believe the exception is null as well.

The code to build up the flow is essentially the below

private Flow<CommittableMessage<Ignore, byte[]>, ICommittable, Task<ICommittableOffset>> GetFlow(Func<CommittableMessage<Ignore, byte[]>, Task<ICommittable>> processingFunction)
{
    return Flow
        .Create<CommittableMessage<Ignore, byte[]>, Task<ICommittableOffset>>()
        .SelectAsync(_kafkaFeedOptions.CurrentValue.Parallelism, x => processingFunction(x))
        .WithAttributes(ActorAttributes.CreateSupervisionStrategy(_flowDecider.Decide))
        .Named($"{typeof(T).Name}-flow");
}

The function that is passed in as the processingFunction is wrapped in a try catch to handle any processing logic exceptions. The logs are giving a LogSource of

FlowShape`2([SelectAsync.in] [SelectAsync.out])(akka.tcp://[email protected]:2552/user/ConsumerActor/StreamSupervisor-1)

and the exception message is

An exception occured inside SelectAsync while processing message ["Akka.Streams.Kafka.Messages.CommittableMessage2[Confluent.Kafka.Ignore,System.Byte[]]"]. Supervision strategy: Stop

The custom flow decider code implemented in this project does not do any checks on the exception that would trigger a null exception and the Error log does not have an exception property

Looking at how that message is getting logged in the streams implementation the exception is definitely being passed to the log step.
https://github.com/akkadotnet/akka.net/blob/2784f34c87185ecee527d9b87efd2e56b82bf0d2/src/core/Akka.Streams/Implementation/Fusing/Ops.cs#L2597

Unfortunately similar to the original issue logged we have not been able to put together a reliable replication as it only seems to occur running in a k8 cluster.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants