diff --git a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs new file mode 100644 index 0000000000..ee17c12c96 --- /dev/null +++ b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs @@ -0,0 +1,130 @@ +namespace ServiceControl.AcceptanceTests.Recoverability.MessageFailures; + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using AcceptanceTesting; +using AcceptanceTesting.EndpointTemplates; +using Infrastructure; +using NServiceBus; +using NServiceBus.AcceptanceTesting; +using NServiceBus.Routing; +using NServiceBus.Transport; +using NUnit.Framework; +using ServiceControl.MessageFailures.Api; + +class When_ingesting_failed_message_with_missing_headers : AcceptanceTest +{ + [Test] + public async Task Should_be_ingested_when_minimal_required_headers_is_present() + { + var testStartTime = DateTime.UtcNow; + + var context = await Define(c => c.AddMinimalRequiredHeaders()) + .WithEndpoint() + .Done(async c => await TryGetFailureFromApi(c)) + .Run(); + + var failure = context.Failure; + + Assert.That(failure, Is.Not.Null); + Assert.That(failure.TimeSent, Is.Null); + + //No failure time will result in utc now being used + Assert.That(failure.TimeOfFailure, Is.GreaterThan(testStartTime)); + + // Both host and endpoint name is currently needed so this will be null since no host can be detected from the failed q header + Assert.That(failure.ReceivingEndpoint, Is.Null); + } + + [Test] + public async Task Should_include_headers_required_by_ServicePulse() + { + var context = await Define(c => + { + c.AddMinimalRequiredHeaders(); + + // This is needed for ServiceControl to be able to detect both endpoint (via failed q header) and host via the processing machine header + // Missing endpoint or host will cause a null ref in ServicePulse + c.Headers[Headers.ProcessingMachine] = "MyMachine"; + + c.Headers["NServiceBus.ExceptionInfo.ExceptionType"] = "SomeExceptionType"; + c.Headers["NServiceBus.ExceptionInfo.Message"] = "Some message"; + }) + .WithEndpoint() + .Done(async c => await TryGetFailureFromApi(c)) + .Run(); + + var failure = context.Failure; + + Assert.That(failure, Is.Not.Null); + + // ServicePulse assumes that the receiving endpoint name is present + Assert.That(failure.ReceivingEndpoint, Is.Not.Null); + Assert.That(failure.ReceivingEndpoint.Name, Is.EqualTo(context.EndpointNameOfReceivingEndpoint)); + Assert.That(failure.ReceivingEndpoint.Host, Is.EqualTo("MyMachine")); + + // ServicePulse needs both an exception type and description to render the UI in a resonable way + Assert.That(failure.Exception.ExceptionType, Is.EqualTo("SomeExceptionType")); + Assert.That(failure.Exception.Message, Is.EqualTo("Some message")); + } + + [Test] + public async Task TimeSent_should_not_be_casted() + { + var sentTime = DateTime.Parse("2014-11-11T02:26:58.000462Z").ToUniversalTime(); + + var context = await Define(c => + { + c.AddMinimalRequiredHeaders(); + c.Headers.Add("NServiceBus.TimeSent", DateTimeOffsetHelper.ToWireFormattedString(sentTime)); + }) + .WithEndpoint() + .Done(async c => await TryGetFailureFromApi(c)) + .Run(); + + var failure = context.Failure; + + Assert.That(failure, Is.Not.Null); + Assert.That(failure.TimeSent, Is.EqualTo(sentTime)); + } + + async Task TryGetFailureFromApi(TestContext context) + { + var allFailures = await this.TryGetMany("/api/errors/"); + + context.Failure = allFailures.Items.SingleOrDefault(f => f.QueueAddress == context.EndpointNameOfReceivingEndpoint); + + return context.Failure != null; + } + + class TestContext : ScenarioContext + { + // Endpoint name is made unique since we are using it to find the failure once ingestion is complete + public string EndpointNameOfReceivingEndpoint => $"MyEndpoint-{NUnit.Framework.TestContext.CurrentContext.Test.ID}"; + + public Dictionary Headers { get; } = []; + + public FailedMessageView Failure { get; set; } + + public void AddMinimalRequiredHeaders() => Headers["NServiceBus.FailedQ"] = EndpointNameOfReceivingEndpoint; + } + + class FailingEndpoint : EndpointConfigurationBuilder + { + public FailingEndpoint() => EndpointSetup(); + + class SendFailedMessage : DispatchRawMessages + { + protected override TransportOperations CreateMessage(TestContext context) + { + // we can't control the native message id so any guid will do here, we need to find the failed messsage using + // the endpoint name instead + var outgoingMessage = new OutgoingMessage(Guid.NewGuid().ToString(), context.Headers, Array.Empty()); + + return new TransportOperations(new TransportOperation(outgoingMessage, new UnicastAddressTag("error"))); + } + } + } +} \ No newline at end of file diff --git a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs deleted file mode 100644 index 6fad5fd287..0000000000 --- a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs +++ /dev/null @@ -1,126 +0,0 @@ -namespace ServiceControl.AcceptanceTests.Recoverability.MessageFailures -{ - using System; - using System.Collections.Generic; - using System.Threading.Tasks; - using AcceptanceTesting; - using AcceptanceTesting.EndpointTemplates; - using Infrastructure; - using NServiceBus; - using NServiceBus.AcceptanceTesting; - using NServiceBus.Routing; - using NServiceBus.Transport; - using NUnit.Framework; - using ServiceControl.MessageFailures.Api; - using Conventions = NServiceBus.AcceptanceTesting.Customization.Conventions; - - class When_processing_message_with_missing_metadata_failed : AcceptanceTest - { - [Test] - public async Task Null_TimeSent_should_not_be_cast_to_DateTimeMin() - { - FailedMessageView failure = null; - - await Define() - .WithEndpoint() - .Done(async c => - { - var result = await this.TryGetSingle("/api/errors/", m => m.Id == c.UniqueMessageId); - failure = result; - return result; - }) - .Run(); - - Assert.That(failure, Is.Not.Null); - Assert.That(failure.TimeSent, Is.Null); - } - - [Test] - public async Task TimeSent_should_not_be_casted() - { - FailedMessageView failure = null; - - var sentTime = DateTime.Parse("2014-11-11T02:26:58.000462Z"); - - await Define(ctx => { ctx.TimeSent = sentTime; }) - .WithEndpoint() - .Done(async c => - { - var result = await this.TryGet($"/api/errors/last/{c.UniqueMessageId}"); - failure = result; - return c.UniqueMessageId != null & result; - }) - .Run(); - - Assert.That(failure, Is.Not.Null); - Assert.That(failure.TimeSent, Is.EqualTo(sentTime)); - } - - [Test] - public async Task Should_be_able_to_get_the_message_by_id() - { - FailedMessageView failure = null; - - await Define() - .WithEndpoint() - .Done(async c => - { - var result = await this.TryGet($"/api/errors/last/{c.UniqueMessageId}"); - failure = result; - return c.UniqueMessageId != null & result; - }) - .Run(); - - Assert.That(failure, Is.Not.Null); - } - - public class Failing : EndpointConfigurationBuilder - { - public Failing() => EndpointSetup(c => { c.Recoverability().Delayed(x => x.NumberOfRetries(0)); }); - - class SendFailedMessage : DispatchRawMessages - { - protected override TransportOperations CreateMessage(MyContext context) - { - context.EndpointNameOfReceivingEndpoint = Conventions.EndpointNamingConvention(typeof(Failing)); - context.MessageId = Guid.NewGuid().ToString(); - context.UniqueMessageId = DeterministicGuid.MakeId(context.MessageId, context.EndpointNameOfReceivingEndpoint).ToString(); - - var headers = new Dictionary - { - [Headers.MessageId] = context.MessageId, - [Headers.ProcessingEndpoint] = context.EndpointNameOfReceivingEndpoint, - ["NServiceBus.ExceptionInfo.ExceptionType"] = "2014-11-11 02:26:57:767462 Z", - ["NServiceBus.ExceptionInfo.Message"] = "An error occurred while attempting to extract logical messages from transport message NServiceBus.TransportMessage", - ["NServiceBus.ExceptionInfo.InnerExceptionType"] = "System.Exception", - ["NServiceBus.ExceptionInfo.Source"] = "NServiceBus.Core", - ["NServiceBus.ExceptionInfo.StackTrace"] = string.Empty, - ["NServiceBus.FailedQ"] = Conventions.EndpointNamingConvention(typeof(Failing)), - ["NServiceBus.TimeOfFailure"] = "2014-11-11 02:26:58:000462 Z" - }; - if (context.TimeSent.HasValue) - { - headers["NServiceBus.TimeSent"] = DateTimeOffsetHelper.ToWireFormattedString(context.TimeSent.Value); - } - - var outgoingMessage = new OutgoingMessage(context.MessageId, headers, new byte[0]); - - return new TransportOperations( - new TransportOperation(outgoingMessage, new UnicastAddressTag("error")) - ); - } - } - } - - public class MyContext : ScenarioContext - { - public string MessageId { get; set; } - - public string EndpointNameOfReceivingEndpoint { get; set; } - - public string UniqueMessageId { get; set; } - - public DateTime? TimeSent { get; set; } - } - } -} \ No newline at end of file diff --git a/src/ServiceControl.Persistence.RavenDB/UnitOfWork/RavenRecoverabilityIngestionUnitOfWork.cs b/src/ServiceControl.Persistence.RavenDB/UnitOfWork/RavenRecoverabilityIngestionUnitOfWork.cs index cf6ee57a13..71c0008b5c 100644 --- a/src/ServiceControl.Persistence.RavenDB/UnitOfWork/RavenRecoverabilityIngestionUnitOfWork.cs +++ b/src/ServiceControl.Persistence.RavenDB/UnitOfWork/RavenRecoverabilityIngestionUnitOfWork.cs @@ -32,7 +32,7 @@ public Task RecordFailedProcessingAttempt( FailedMessage.ProcessingAttempt processingAttempt, List groups) { - var uniqueMessageId = context.Headers.UniqueId(); + var uniqueMessageId = GetUniqueMessageId(context); var contentType = GetContentType(context.Headers, "text/xml"); var bodySize = context.Body.Length; @@ -61,7 +61,7 @@ public Task RecordFailedProcessingAttempt( var storeMessageCmd = CreateFailedMessagesPatchCommand(uniqueMessageId, processingAttempt, groups); parentUnitOfWork.AddCommand(storeMessageCmd); - AddStoreBodyCommands(context, contentType); + AddStoreBodyCommands(uniqueMessageId, context, contentType); return Task.CompletedTask; } @@ -71,10 +71,7 @@ public Task RecordSuccessfulRetry(string retriedMessageUniqueId) var failedMessageDocumentId = FailedMessageIdGenerator.MakeDocumentId(retriedMessageUniqueId); var failedMessageRetryDocumentId = FailedMessageRetry.MakeDocumentId(retriedMessageUniqueId); - var patchRequest = new PatchRequest - { - Script = $@"this.{nameof(FailedMessage.Status)} = {(int)FailedMessageStatus.Resolved};" - }; + var patchRequest = new PatchRequest { Script = $@"this.{nameof(FailedMessage.Status)} = {(int)FailedMessageStatus.Resolved};" }; expirationManager.EnableExpiration(patchRequest); @@ -84,6 +81,21 @@ public Task RecordSuccessfulRetry(string retriedMessageUniqueId) return Task.CompletedTask; } + static string GetUniqueMessageId(MessageContext context) + { + if (context.Headers.TryGetValue("ServiceControl.Retry.UniqueMessageId", out var existingUniqueMessageId)) + { + return existingUniqueMessageId; + } + + if (!context.Headers.TryGetValue(Headers.MessageId, out var messageId)) + { + messageId = context.NativeMessageId; + } + + return DeterministicGuid.MakeId(messageId, context.Headers.ProcessingEndpointName()).ToString(); + } + ICommandData CreateFailedMessagesPatchCommand(string uniqueMessageId, FailedMessage.ProcessingAttempt processingAttempt, List groups) { @@ -119,9 +131,9 @@ ICommandData CreateFailedMessagesPatchCommand(string uniqueMessageId, FailedMess ", Values = new Dictionary { - {"status", (int)FailedMessageStatus.Unresolved}, - {"failureGroups", groups}, - {"attempt", processingAttempt} + { "status", (int)FailedMessageStatus.Unresolved }, + { "failureGroups", groups }, + { "attempt", processingAttempt } }, }, patchIfMissing: new PatchRequest @@ -137,18 +149,17 @@ ICommandData CreateFailedMessagesPatchCommand(string uniqueMessageId, FailedMess ", Values = new Dictionary { - {"status", (int)FailedMessageStatus.Unresolved}, - {"failureGroups", groups}, - {"attempt", processingAttempt}, - {"uniqueMessageId", uniqueMessageId} + { "status", (int)FailedMessageStatus.Unresolved }, + { "failureGroups", groups }, + { "attempt", processingAttempt }, + { "uniqueMessageId", uniqueMessageId } } }); } - void AddStoreBodyCommands(MessageContext context, string contentType) + void AddStoreBodyCommands(string uniqueMessageId, MessageContext context, string contentType) { - var uniqueId = context.Headers.UniqueId(); - var documentId = FailedMessageIdGenerator.MakeDocumentId(uniqueId); + var documentId = FailedMessageIdGenerator.MakeDocumentId(uniqueMessageId); var stream = new ReadOnlyStream(context.Body); var putAttachmentCmd = new PutAttachmentCommandData(documentId, "body", stream, contentType, changeVector: null); @@ -160,9 +171,9 @@ static string GetContentType(IReadOnlyDictionary headers, string => headers.GetValueOrDefault(Headers.ContentType, defaultContentType); static int MaxProcessingAttempts = 10; + // large object heap starts above 85000 bytes and not above 85 KB! internal const int LargeObjectHeapThreshold = 85_000; static readonly Encoding utf8 = new UTF8Encoding(true, true); - } } \ No newline at end of file diff --git a/src/ServiceControl.Persistence/Infrastructure/TransportMessageExtensions.cs b/src/ServiceControl.Persistence/Infrastructure/TransportMessageExtensions.cs index 9ed1567ab5..d8ba148245 100644 --- a/src/ServiceControl.Persistence/Infrastructure/TransportMessageExtensions.cs +++ b/src/ServiceControl.Persistence/Infrastructure/TransportMessageExtensions.cs @@ -36,27 +36,6 @@ public static string ProcessingEndpointName(this IReadOnlyDictionary headers) - { - return headers.TryGetValue("ServiceControl.Retry.UniqueMessageId", out var existingUniqueMessageId) - ? existingUniqueMessageId - : DeterministicGuid.MakeId(headers.MessageId(), headers.ProcessingEndpointName()).ToString(); - } - - public static string ProcessingId(this IReadOnlyDictionary headers) - { - var messageId = headers.MessageId(); - var processingEndpointName = headers.ProcessingEndpointName(); - var processingStarted = headers.ProcessingStarted(); - - if (messageId == default || processingEndpointName == default || processingStarted == default) - { - return Guid.NewGuid().ToString(); - } - - return DeterministicGuid.MakeId(messageId, processingEndpointName, processingStarted).ToString(); - } - // NOTE: Duplicated from TransportMessage public static string MessageId(this IReadOnlyDictionary headers) { @@ -102,16 +81,12 @@ public static bool IsBinary(this IReadOnlyDictionary headers) return true; } + static string ReplyToAddress(this IReadOnlyDictionary headers) { return headers.TryGetValue(Headers.ReplyToAddress, out var destination) ? destination : null; } - static string ProcessingStarted(this IReadOnlyDictionary headers) - { - return headers.TryGetValue(Headers.ProcessingStarted, out var processingStarted) ? processingStarted : null; - } - static string ExtractQueue(string address) { var atIndex = address?.IndexOf("@", StringComparison.InvariantCulture); diff --git a/src/ServiceControl.Persistence/ProcessedMessage.cs b/src/ServiceControl.Persistence/ProcessedMessage.cs index 050e5ca6c7..525f964416 100644 --- a/src/ServiceControl.Persistence/ProcessedMessage.cs +++ b/src/ServiceControl.Persistence/ProcessedMessage.cs @@ -2,9 +2,6 @@ { using System; using System.Collections.Generic; - using NServiceBus; - using ServiceControl.Persistence; - using ServiceControl.Persistence.Infrastructure; public class ProcessedMessage { @@ -14,16 +11,6 @@ public ProcessedMessage() Headers = []; } - public ProcessedMessage(Dictionary headers, Dictionary metadata) - { - UniqueMessageId = headers.UniqueId(); - MessageMetadata = metadata; - Headers = headers; - - ProcessedAt = Headers.TryGetValue(NServiceBus.Headers.ProcessingEnded, out var processedAt) ? - DateTimeOffsetHelper.ToDateTimeOffset(processedAt).UtcDateTime : DateTime.UtcNow; // best guess - } - public string Id { get; set; } public string UniqueMessageId { get; set; } diff --git a/src/ServiceControl/Operations/ErrorProcessor.cs b/src/ServiceControl/Operations/ErrorProcessor.cs index 6ac04c8edd..9e8e8b11ea 100644 --- a/src/ServiceControl/Operations/ErrorProcessor.cs +++ b/src/ServiceControl/Operations/ErrorProcessor.cs @@ -116,6 +116,7 @@ async Task ProcessMessage(MessageContext context, IIngestionUnitOfWork unitOfWor var failureDetails = failedMessageFactory.ParseFailureDetails(context.Headers); var processingAttempt = failedMessageFactory.CreateProcessingAttempt( + messageId, context.Headers, new Dictionary(metadata), failureDetails); diff --git a/src/ServiceControl/Operations/FailedMessageFactory.cs b/src/ServiceControl/Operations/FailedMessageFactory.cs index 5c7abb0644..76d04a4e9a 100644 --- a/src/ServiceControl/Operations/FailedMessageFactory.cs +++ b/src/ServiceControl/Operations/FailedMessageFactory.cs @@ -59,14 +59,14 @@ static ExceptionDetails GetException(IReadOnlyDictionary headers return exceptionDetails; } - public FailedMessage.ProcessingAttempt CreateProcessingAttempt(Dictionary headers, Dictionary metadata, FailureDetails failureDetails) + public FailedMessage.ProcessingAttempt CreateProcessingAttempt(string messageId, Dictionary headers, Dictionary metadata, FailureDetails failureDetails) { return new FailedMessage.ProcessingAttempt { AttemptedAt = failureDetails.TimeOfFailure, FailureDetails = failureDetails, MessageMetadata = metadata, - MessageId = headers[Headers.MessageId], + MessageId = messageId, Headers = headers }; }