Skip to content

Commit

Permalink
moved message formatted to its own class
Browse files Browse the repository at this point in the history
  • Loading branch information
ruokun-niu committed Dec 4, 2024
1 parent d3b7d3d commit b2bc6fc
Show file tree
Hide file tree
Showing 3 changed files with 271 additions and 224 deletions.
228 changes: 5 additions & 223 deletions reactions/debezium/debezium-reaction/Services/ChangeHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class DebeziumChangeHandler : IChangeEventHandler
private readonly bool _includeKey;

private readonly string _topic;

private DataChangeEventFormatter _formatter;

public DebeziumChangeHandler(IConfiguration config, ILogger<DebeziumChangeHandler> logger)
{
Expand All @@ -50,6 +52,8 @@ public DebeziumChangeHandler(IConfiguration config, ILogger<DebeziumChangeHandle
};

_producer = new ProducerBuilder<Null, string>(producerConfig).Build();

_formatter = new DataChangeEventFormatter(_includeKey, _includeSchemas);
}

public async Task HandleChange(ChangeEvent evt, object? queryConfig)
Expand Down Expand Up @@ -81,47 +85,9 @@ private async Task ProcessResults(EventMetadata metadata, Dictionary<string, obj
var noIndent = new JsonSerializerOptions { WriteIndented = false };
foreach (var res in results)
{
var dataChangeEventKey = new DataChangeEventKey();
var dataChangeEventValue = new DataChangeEventValue();
if (_includeKey)
{
if (_includeSchemas)
{
var keySchema = GetKeySchema(metadata);
dataChangeEventKey.KeySchema = keySchema;
}

var keyPayload = GetKeyPayload(metadata);
dataChangeEventKey.KeyPayload = keyPayload;
var keyString = JsonSerializer.Serialize(dataChangeEventKey, noIndent);
}

var resultJsonElement = ConvertDictionaryToJsonElement(res);
var valuePayload = GetValuePayload(op, metadata, resultJsonElement);

if (_includeSchemas)
{
var valueSchema = GetValueSchema(metadata, resultJsonElement);
dataChangeEventValue.ValueSchema = valueSchema;
}
dataChangeEventValue.ValuePayload = valuePayload;
var eventString = JsonSerializer.Serialize(dataChangeEventValue, noIndent);
if (_includeKey)
{
// We are unable to serialize the key and the value together, as we have duplicate key names
// This will result in the error: `System.InvalidOperationException: The JSON property name for 'Drasi.Reactions.Debezium.Services.DataChangeEvent.schema' collides with another property`
// serialize the key, and then concatenate the value
var eventKeyString = JsonSerializer.Serialize(dataChangeEventKey, noIndent);
if (eventKeyString.Length > 0 && eventKeyString.EndsWith("}"))
{
eventKeyString = eventKeyString.Substring(0, eventKeyString.Length - 1); // Remove the last '}'
}
eventString = eventString.TrimStart('{');
string eventString = _formatter.FormatEvent(metadata, res, op);

eventString = eventKeyString + "," + eventString;
}
_logger.LogInformation($"dataChangeEvent: {eventString}");

try
{
var deliveryReport = await _producer.ProduceAsync(_topic, new Message<Null, string> { Value = eventString }, CancellationToken.None);
Expand All @@ -143,188 +109,4 @@ private async Task ProcessResults(EventMetadata metadata, Dictionary<string, obj
}
}

static KeyPayload GetKeyPayload(EventMetadata metadata)
{
var keyPayload = new KeyPayload
{
Id = metadata.Seq.ToString()
};
return keyPayload;
}

static Schema GetKeySchema(EventMetadata metadata)
{
var keySchema = new Schema
{
Type = "struct",
Name = $"{metadata.Connector}.{metadata.QueryId}.Key",
Optional = false,
Fields = new List<Field>
{
new Field
{
SchemaField = "id",
Type = "string",
Optional = false
}
}
};
return keySchema;
}

static JsonElement ConvertDictionaryToJsonElement(Dictionary<string, object> dictionary)
{
string jsonString = JsonSerializer.Serialize(dictionary);

using JsonDocument jsonDocument = JsonDocument.Parse(jsonString);

return jsonDocument.RootElement.Clone();
}

static Payload GetValuePayload(string op, EventMetadata metadata, JsonElement res)
{
var valuePayload = new Payload
{
Source = new Source
{
Version = metadata.Version,
Connector = metadata.Connector,
TimestampMs = metadata.TsMs,
Sequence = metadata.Seq
},
Operation = op,
TimestampMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
};

switch (op)
{
case "c":
valuePayload.After = res;
break;
case "u":
valuePayload.Before = res.GetProperty("before");
valuePayload.After = res.GetProperty("after");
break;
case "d":
valuePayload.Before = res;
break;
default:
throw new Exception("Unknown op: " + op);
}

return valuePayload;
}

static Schema GetValueSchema(EventMetadata metadata, JsonElement res)
{
var sourceFields = new Field {
Type = "struct",
Optional = false,
Name = $"io.debezium.connector.{metadata.Connector}.Source",
SchemaField = "source",
Fields = new List<Field>
{
new Field
{
SchemaField = "version",
Type = "string",
Optional = false
},
new Field
{
SchemaField = "connector",
Type = "string",
Optional = false
},
new Field
{
SchemaField = "ts_ms",
Type = "int64",
Optional = false
},
new Field
{
SchemaField = "seq",
Type = "int64",
Optional = false
}
}
};

var metadataFields = new List<Field>
{
new Field
{
SchemaField = "op",
Type = "string",
Optional = false
},
new Field
{
SchemaField = "ts_ms",
Type = "int64",
Optional = true
}
};

var beforeField = new Field
{
Type = "struct",
Optional = true,
Name = $"{metadata.Connector}.{metadata.QueryId}.Value",
SchemaField = "before",
Fields = GetChangeDataFields(res)
};

var afterField = new Field
{
Type = "struct",
Optional = true,
Name = $"{metadata.Connector}.{metadata.QueryId}.Value",
SchemaField = "after",
Fields = GetChangeDataFields(res)
};

var valueSchema = new Schema
{
Type = "struct",
Name = $"{metadata.Connector}.{metadata.QueryId}.Value",
Optional = false,
Fields = new List<Field>
{
sourceFields,
beforeField,
afterField,
new Field
{
SchemaField = "op",
Type = "string",
Optional = false
},
new Field
{
SchemaField = "ts_ms",
Type = "int64",
Optional = true
}
}
};
return valueSchema;
}


static List<Field> GetChangeDataFields(JsonElement changeData)
{
var changeDataFields = new List<Field>();
foreach (var prop in changeData.EnumerateObject())
{
changeDataFields.Add(new Field
{
SchemaField = prop.Name,
Type = prop.Value.ValueKind.ToString().ToLower(),
Optional = false
});
}
return changeDataFields;
}
}
Loading

0 comments on commit b2bc6fc

Please sign in to comment.