Skip to content

Do not require message id header for error ingestion #4946

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 11 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
namespace ServiceControl.AcceptanceTests.Recoverability.MessageFailures;

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using AcceptanceTesting;
using AcceptanceTesting.EndpointTemplates;
using Infrastructure;
using NServiceBus;
using NServiceBus.AcceptanceTesting;
using NServiceBus.Routing;
using NServiceBus.Transport;
using NUnit.Framework;
using ServiceControl.MessageFailures.Api;

class When_ingesting_failed_message_with_missing_headers : AcceptanceTest
{
[Test]
public async Task Should_be_ingested_when_minimal_required_headers_is_present()
{
var testStartTime = DateTime.UtcNow;

var context = await Define<TestContext>(c => c.AddMinimalRequiredHeaders())
.WithEndpoint<FailingEndpoint>()
.Done(async c => await TryGetFailureFromApi(c))
.Run();

var failure = context.Failure;

Assert.That(failure, Is.Not.Null);
Assert.That(failure.TimeSent, Is.Null);

//No failure time will result in utc now being used
Assert.That(failure.TimeOfFailure, Is.GreaterThan(testStartTime));

// Both host and endpoint name is currently needed so this will be null since no host can be detected from the failed q header
Assert.That(failure.ReceivingEndpoint, Is.Null);
}

[Test]
public async Task Should_include_headers_required_by_ServicePulse()
{
var context = await Define<TestContext>(c =>
{
c.AddMinimalRequiredHeaders();

// This is needed for ServiceControl to be able to detect both endpoint (via failed q header) and host via the processing machine header
// Missing endpoint or host will cause a null ref in ServicePulse
c.Headers[Headers.ProcessingMachine] = "MyMachine";

c.Headers["NServiceBus.ExceptionInfo.ExceptionType"] = "SomeExceptionType";
c.Headers["NServiceBus.ExceptionInfo.Message"] = "Some message";
})
.WithEndpoint<FailingEndpoint>()
.Done(async c => await TryGetFailureFromApi(c))
.Run();

var failure = context.Failure;

Assert.That(failure, Is.Not.Null);

// ServicePulse assumes that the receiving endpoint name is present
Assert.That(failure.ReceivingEndpoint, Is.Not.Null);
Assert.That(failure.ReceivingEndpoint.Name, Is.EqualTo(context.EndpointNameOfReceivingEndpoint));
Assert.That(failure.ReceivingEndpoint.Host, Is.EqualTo("MyMachine"));

// ServicePulse needs both an exception type and description to render the UI in a resonable way
Assert.That(failure.Exception.ExceptionType, Is.EqualTo("SomeExceptionType"));
Assert.That(failure.Exception.Message, Is.EqualTo("Some message"));
}

[Test]
public async Task TimeSent_should_not_be_casted()
{
var sentTime = DateTime.Parse("2014-11-11T02:26:58.000462Z").ToUniversalTime();

var context = await Define<TestContext>(c =>
{
c.AddMinimalRequiredHeaders();
c.Headers.Add("NServiceBus.TimeSent", DateTimeOffsetHelper.ToWireFormattedString(sentTime));
})
.WithEndpoint<FailingEndpoint>()
.Done(async c => await TryGetFailureFromApi(c))
.Run();

var failure = context.Failure;

Assert.That(failure, Is.Not.Null);
Assert.That(failure.TimeSent, Is.EqualTo(sentTime));
}

async Task<bool> TryGetFailureFromApi(TestContext context)
{
var allFailures = await this.TryGetMany<FailedMessageView>("/api/errors/");

context.Failure = allFailures.Items.SingleOrDefault(f => f.QueueAddress == context.EndpointNameOfReceivingEndpoint);

return context.Failure != null;
}

class TestContext : ScenarioContext
{
// Endpoint name is made unique since we are using it to find the failure once ingestion is complete
public string EndpointNameOfReceivingEndpoint => $"MyEndpoint-{NUnit.Framework.TestContext.CurrentContext.Test.ID}";

public Dictionary<string, string> Headers { get; } = [];

public FailedMessageView Failure { get; set; }

public void AddMinimalRequiredHeaders() => Headers["NServiceBus.FailedQ"] = EndpointNameOfReceivingEndpoint;
}

class FailingEndpoint : EndpointConfigurationBuilder
{
public FailingEndpoint() => EndpointSetup<DefaultServerWithoutAudit>();

class SendFailedMessage : DispatchRawMessages<TestContext>
{
protected override TransportOperations CreateMessage(TestContext context)
{
// we can't control the native message id so any guid will do here, we need to find the failed messsage using
// the endpoint name instead
var outgoingMessage = new OutgoingMessage(Guid.NewGuid().ToString(), context.Headers, Array.Empty<byte>());

return new TransportOperations(new TransportOperation(outgoingMessage, new UnicastAddressTag("error")));
}
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public Task RecordFailedProcessingAttempt(
FailedMessage.ProcessingAttempt processingAttempt,
List<FailedMessage.FailureGroup> groups)
{
var uniqueMessageId = context.Headers.UniqueId();
var uniqueMessageId = GetUniqueMessageId(context);
var contentType = GetContentType(context.Headers, "text/xml");
var bodySize = context.Body.Length;

Expand Down Expand Up @@ -61,7 +61,7 @@ public Task RecordFailedProcessingAttempt(
var storeMessageCmd = CreateFailedMessagesPatchCommand(uniqueMessageId, processingAttempt, groups);
parentUnitOfWork.AddCommand(storeMessageCmd);

AddStoreBodyCommands(context, contentType);
AddStoreBodyCommands(uniqueMessageId, context, contentType);

return Task.CompletedTask;
}
Expand All @@ -71,10 +71,7 @@ public Task RecordSuccessfulRetry(string retriedMessageUniqueId)
var failedMessageDocumentId = FailedMessageIdGenerator.MakeDocumentId(retriedMessageUniqueId);
var failedMessageRetryDocumentId = FailedMessageRetry.MakeDocumentId(retriedMessageUniqueId);

var patchRequest = new PatchRequest
{
Script = $@"this.{nameof(FailedMessage.Status)} = {(int)FailedMessageStatus.Resolved};"
};
var patchRequest = new PatchRequest { Script = $@"this.{nameof(FailedMessage.Status)} = {(int)FailedMessageStatus.Resolved};" };

expirationManager.EnableExpiration(patchRequest);

Expand All @@ -84,6 +81,21 @@ public Task RecordSuccessfulRetry(string retriedMessageUniqueId)
return Task.CompletedTask;
}

static string GetUniqueMessageId(MessageContext context)
{
if (context.Headers.TryGetValue("ServiceControl.Retry.UniqueMessageId", out var existingUniqueMessageId))
{
return existingUniqueMessageId;
}

if (!context.Headers.TryGetValue(Headers.MessageId, out var messageId))
{
messageId = context.NativeMessageId;
}

return DeterministicGuid.MakeId(messageId, context.Headers.ProcessingEndpointName()).ToString();
}

ICommandData CreateFailedMessagesPatchCommand(string uniqueMessageId, FailedMessage.ProcessingAttempt processingAttempt,
List<FailedMessage.FailureGroup> groups)
{
Expand Down Expand Up @@ -119,9 +131,9 @@ ICommandData CreateFailedMessagesPatchCommand(string uniqueMessageId, FailedMess
",
Values = new Dictionary<string, object>
{
{"status", (int)FailedMessageStatus.Unresolved},
{"failureGroups", groups},
{"attempt", processingAttempt}
{ "status", (int)FailedMessageStatus.Unresolved },
{ "failureGroups", groups },
{ "attempt", processingAttempt }
},
},
patchIfMissing: new PatchRequest
Expand All @@ -137,18 +149,17 @@ ICommandData CreateFailedMessagesPatchCommand(string uniqueMessageId, FailedMess
",
Values = new Dictionary<string, object>
{
{"status", (int)FailedMessageStatus.Unresolved},
{"failureGroups", groups},
{"attempt", processingAttempt},
{"uniqueMessageId", uniqueMessageId}
{ "status", (int)FailedMessageStatus.Unresolved },
{ "failureGroups", groups },
{ "attempt", processingAttempt },
{ "uniqueMessageId", uniqueMessageId }
}
});
}

void AddStoreBodyCommands(MessageContext context, string contentType)
void AddStoreBodyCommands(string uniqueMessageId, MessageContext context, string contentType)
{
var uniqueId = context.Headers.UniqueId();
var documentId = FailedMessageIdGenerator.MakeDocumentId(uniqueId);
var documentId = FailedMessageIdGenerator.MakeDocumentId(uniqueMessageId);

var stream = new ReadOnlyStream(context.Body);
var putAttachmentCmd = new PutAttachmentCommandData(documentId, "body", stream, contentType, changeVector: null);
Expand All @@ -160,9 +171,9 @@ static string GetContentType(IReadOnlyDictionary<string, string> headers, string
=> headers.GetValueOrDefault(Headers.ContentType, defaultContentType);

static int MaxProcessingAttempts = 10;

// large object heap starts above 85000 bytes and not above 85 KB!
internal const int LargeObjectHeapThreshold = 85_000;
static readonly Encoding utf8 = new UTF8Encoding(true, true);

}
}
Loading