forked from Avanade/Beef
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathEventHubsEventConverter.cs
178 lines (148 loc) · 9.12 KB
/
EventHubsEventConverter.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
// Copyright (c) Avanade. Licensed under the MIT License. See https://github.com/Avanade/Beef
using System;
using System.Threading.Tasks;
using AzureEventHubs = Azure.Messaging.EventHubs;
namespace Beef.Events.EventHubs
{
/// <summary>
/// Represents an <b>Azure Event Hubs</b> (see <see cref="AzureEventHubs.EventData"/>) <see cref="EventData"/> converter.
/// </summary>
public sealed class EventHubsEventConverter : IEventDataConverter<AzureEventHubs.EventData>
{
private readonly IEventDataContentSerializer _contentSerializer;
/// <summary>
/// Creates a new instance of <see cref="EventData{T}"/> using the specified <paramref name="valueType"/>.
/// </summary>
/// <param name="valueType">The <see cref="EventData{T}.Value"/> <see cref="Type"/>.</param>
/// <param name="metadata">The corresponding <see cref="EventMetadata"/>.</param>
/// <returns>A new instance of <see cref="EventData{T}"/></returns>
internal static EventData CreateValueEventData(Type valueType, EventMetadata? metadata) => (EventData)Activator.CreateInstance(typeof(EventData<>).MakeGenericType(new Type[] { valueType }), new object[] { metadata! });
/// <summary>
/// Initializes a new instance of the <see cref="EventHubsEventConverter"/> class.
/// </summary>
/// <param name="contentSerializer">The <see cref="IEventDataContentSerializer"/>. Defaults to <see cref="NewtonsoftJsonCloudEventSerializer"/>.</param>
public EventHubsEventConverter(IEventDataContentSerializer? contentSerializer = null) => _contentSerializer = contentSerializer ?? new NewtonsoftJsonCloudEventSerializer();
/// <summary>
/// Indicates whether to write to the <see cref="AzureEventHubs.EventData.Properties"/> for the <see cref="EventMetadata"/>.
/// </summary>
public bool UseMessagingPropertiesForMetadata { get; set; }
/// <summary>
/// Converts a <see cref="AzureEventHubs.EventData"/> to an <see cref="EventData"/>.
/// </summary>
/// <param name="event">The <see cref="AzureEventHubs.EventData"/>.</param>
/// <returns>The converted <see cref="EventData"/>.</returns>
public async Task<EventData> ConvertFromAsync(AzureEventHubs.EventData @event)
{
var ed = (await _contentSerializer.DeserializeAsync(@event.Body.ToArray()).ConfigureAwait(false)) ?? new EventData(null);
await MergeMetadataAndFixKeyAsync(ed, @event).ConfigureAwait(false);
return ed;
}
/// <summary>
/// Converts a <see cref="AzureEventHubs.EventData"/> to an <see cref="EventData{TEventData}"/>.
/// </summary>
/// <param name="valueType">The <see cref="EventData{T}.Value"/> <see cref="Type"/>.</param>
/// <param name="event">The <see cref="AzureEventHubs.EventData"/>.</param>
/// <returns>The converted <see cref="EventData{TEventData}"/>.</returns>
public async Task<EventData> ConvertFromAsync(Type valueType, AzureEventHubs.EventData @event)
{
var ed = (await _contentSerializer.DeserializeAsync(valueType, @event.Body.ToArray()).ConfigureAwait(false)) ?? CreateValueEventData(valueType, await GetMetadataAsync(@event).ConfigureAwait(false));
await MergeMetadataAndFixKeyAsync(ed, @event).ConfigureAwait(false);
return ed;
}
/// <summary>
/// Merge in the metadata and fix the key.
/// </summary>
private async Task MergeMetadataAndFixKeyAsync(EventData ed, AzureEventHubs.EventData eh)
{
if (!UseMessagingPropertiesForMetadata)
return;
var md = await GetMetadataAsync(eh).ConfigureAwait(false);
if (md != null)
{
ed.MergeMetadata(md);
// Always override the key as it doesn't lose the type like the serialized value does.
if (eh.Properties.TryGetValue(EventMetadata.KeyPropertyName, out var key))
ed.Key = key;
}
}
/// <summary>
/// Converts an <see cref="EventData"/> to a <see cref="AzureEventHubs.EventData"/>.
/// </summary>
/// <param name="event">The <see cref="EventData"/>.</param>
/// <returns>The <see cref="AzureEventHubs.EventData"/>.</returns>
public async Task<AzureEventHubs.EventData> ConvertToAsync(EventData @event)
{
if (@event == null)
throw new ArgumentNullException(nameof(@event));
if (string.IsNullOrEmpty(@event.Subject))
throw new ArgumentException("Subject property is required to be set.", nameof(@event));
var bytes = await _contentSerializer.SerializeAsync(@event).ConfigureAwait(false);
var ehed = new AzureEventHubs.EventData(new BinaryData(bytes));
if (UseMessagingPropertiesForMetadata)
{
ehed.Properties.Add(EventMetadata.SubjectAttributeName, @event.Subject);
if (@event.EventId != null)
ehed.Properties.Add(EventMetadata.EventIdAttributeName, @event.EventId);
if (@event.Action != null)
ehed.Properties.Add(EventMetadata.ActionAttributeName, @event.Action);
if (@event.TenantId != null)
ehed.Properties.Add(EventMetadata.TenantIdAttributeName, @event.TenantId);
if (@event.Source != null)
ehed.Properties.Add(EventMetadata.SourceAttributeName, @event.Source);
if (@event.Key != null)
ehed.Properties.Add(EventMetadata.KeyPropertyName, @event.Key);
if (@event.ETag != null)
ehed.Properties.Add(EventMetadata.ETagAttributeName, @event.ETag);
if (@event.Username != null)
ehed.Properties.Add(EventMetadata.UsernameAttributeName, @event.Username);
if (@event.UserId != null)
ehed.Properties.Add(EventMetadata.UserIdAttributeName, @event.UserId);
if (@event.Timestamp != null)
ehed.Properties.Add(EventMetadata.TimestampAttributeName, @event.Timestamp);
if (@event.CorrelationId != null)
ehed.Properties.Add(EventMetadata.CorrelationIdAttributeName, @event.CorrelationId);
if (@event.PartitionKey != null)
ehed.Properties.Add(EventMetadata.PartitionKeyAttributeName, @event.PartitionKey);
}
return ehed;
}
/// <summary>
/// Gets the <see cref="EventMetadata"/> from the <see cref="AzureEventHubs.EventData"/>.
/// </summary>
/// <param name="message">The <see cref="AzureEventHubs.EventData"/>.</param>
/// <returns>The <see cref="EventMetadata"/>.</returns>
public async Task<EventMetadata?> GetMetadataAsync(AzureEventHubs.EventData message)
{
if (message == null)
return null;
if (UseMessagingPropertiesForMetadata)
{
message.Properties.TryGetValue(EventMetadata.SubjectAttributeName, out var subject);
message.Properties.TryGetValue(EventMetadata.ActionAttributeName, out var action);
message.Properties.TryGetValue(EventMetadata.CorrelationIdAttributeName, out var correlationId);
message.Properties.TryGetValue(EventMetadata.PartitionKeyAttributeName, out var partitionKey);
message.Properties.TryGetValue(EventMetadata.KeyPropertyName, out var key);
message.Properties.TryGetValue(EventMetadata.ETagAttributeName, out var etag);
message.Properties.TryGetValue(EventMetadata.UsernameAttributeName, out var username);
message.Properties.TryGetValue(EventMetadata.UserIdAttributeName, out var userId);
return new EventMetadata
{
EventId = (message.Properties.TryGetValue(EventMetadata.EventIdAttributeName, out var eid) && eid != null && eid is Guid?) ? (Guid?)eid : null,
TenantId = (message.Properties.TryGetValue(EventMetadata.TenantIdAttributeName, out var tid) && tid != null && tid is Guid?) ? (Guid?)tid : null,
Subject = (string?)subject,
Action = (string?)action,
Source = (message.Properties.TryGetValue(EventMetadata.SourceAttributeName, out var src) && src != null && src is Uri) ? (Uri?)src : null,
Key = key,
ETag = (string)etag,
Username = (string?)username,
UserId = (string?)userId,
Timestamp = (message.Properties.TryGetValue(EventMetadata.TimestampAttributeName, out var time) && time != null && time is DateTime?) ? (DateTime?)time : null,
CorrelationId = (string?)correlationId,
PartitionKey = (string?)partitionKey
};
}
// Try deserializing to get metadata where possible.
return await _contentSerializer.DeserializeAsync(message.Body.ToArray()).ConfigureAwait(false);
}
}
}