Skip to content

Commit

Permalink
Cloudevents
Browse files Browse the repository at this point in the history
  • Loading branch information
ruokun-niu committed Jan 15, 2025
1 parent c7d7036 commit 7fd6593
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 9 deletions.
26 changes: 21 additions & 5 deletions reactions/aws/eventbridge-reaction/Services/ChangeHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace Drasi.Reactions.EventBridge.Services;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;

using System;
using System.Text.Json;
using Drasi.Reaction.SDK;
using Drasi.Reaction.SDK.Models.QueryOutput;
Expand Down Expand Up @@ -47,12 +48,19 @@ public async Task HandleChange(ChangeEvent evt, object? queryConfig)
switch (_format)
{
case OutputFormat.Packed:
var formattedEvent = _formatter.Format(evt);
var cloudEvent = new CloudEvent
{
Id = Guid.NewGuid().ToString(),
Type = "Drasi.ChangeEvent.Packed",
Source = evt.QueryId,
Data = _formatter.Format(evt),
Version = "1.0"
};
var packedRequestEntry = new PutEventsRequestEntry()
{
Source = evt.QueryId,
Detail = JsonSerializer.Serialize(formattedEvent),
DetailType = "Drasi.ChangeEvent",
Detail = JsonSerializer.Serialize(cloudEvent),
DetailType = "Drasi.ChangeEvent.Packed",
EventBusName = _eventBusName
};
var packedResponse = await _eventBridgeClient.PutEventsAsync(new PutEventsRequest()
Expand All @@ -71,11 +79,19 @@ public async Task HandleChange(ChangeEvent evt, object? queryConfig)
List<PutEventsRequestEntry> unpackedRequestEntries = new List<PutEventsRequestEntry>();
foreach (var result in formattedResults)
{
var currCloudEvent = new CloudEvent
{
Id = Guid.NewGuid().ToString(),
Type = "Drasi.ChangeEvent.Unpacked",
Source = evt.QueryId,
Data = result,
Version = "1.0"
};
var unpackedRequestEntry = new PutEventsRequestEntry()
{
Source = evt.QueryId,
Detail = JsonSerializer.Serialize(result),
DetailType = "Drasi.ChangeEvent",
Detail = JsonSerializer.Serialize(currCloudEvent),
DetailType = "Drasi.ChangeEvent.Unpacked",
EventBusName = _eventBusName
};
unpackedRequestEntries.Add(unpackedRequestEntry);
Expand Down
39 changes: 39 additions & 0 deletions reactions/aws/eventbridge-reaction/Services/CloudEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2024 The Drasi Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

namespace Drasi.Reactions.EventBridge.Services;
using System;
using System.Text.Json;
using System.Text.Json.Serialization;


public class CloudEvent
{
[JsonPropertyName("id")]
public string Id { get; set; }

[JsonPropertyName("type")]
public string Type { get; set; }

[JsonPropertyName("source")]
public string Source { get; set; }

[JsonPropertyName("data")]
public object Data { get; set; }

[JsonPropertyName("specversion")]
public string Version { get; set; } = "1.0";


}
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,19 @@ public async Task HandleControlSignal(ControlEvent evt, object? queryConfig)
switch (_format)
{
case OutputFormat.Packed:
var cloudEvent = new CloudEvent
{
Id = Guid.NewGuid().ToString(),
Type = "Drasi.ControlEvent.Packed",
Source = evt.QueryId,
Data = JsonSerializer.Serialize(evt),
Version = "1.0"
};
var requestEntry = new PutEventsRequestEntry()
{
Source = evt.QueryId,
Detail = JsonSerializer.Serialize(evt),
DetailType = "Drasi.ControlEvent",
Detail = JsonSerializer.Serialize(cloudEvent),
DetailType = "Drasi.ControlEvent.Packed",
EventBusName = _eventBusName
};
var response = await _client.PutEventsAsync(new PutEventsRequest()
Expand Down Expand Up @@ -83,11 +91,19 @@ public async Task HandleControlSignal(ControlEvent evt, object? queryConfig)
}
};

var controlCloudEvent = new CloudEvent
{
Id = Guid.NewGuid().ToString(),
Type = "Drasi.ControlEvent.Unpacked",
Source = evt.QueryId,
Data = JsonSerializer.Serialize(notification),
Version = "1.0"
};
var unpackedRequestEntry = new PutEventsRequestEntry()
{
Source = evt.QueryId,
Detail = JsonSerializer.Serialize(notification, Reaction.SDK.Models.QueryOutput.ModelOptions.JsonOptions),
DetailType = "Drasi.ControlEvent",
Detail = JsonSerializer.Serialize(controlCloudEvent),
DetailType = "Drasi.ControlEvent.Unpacked",
EventBusName = _eventBusName
};

Expand Down

0 comments on commit 7fd6593

Please sign in to comment.