Skip to content

Commit

Permalink
created a class for datachangeevent; updated serialization logic
Browse files Browse the repository at this point in the history
  • Loading branch information
ruokun-niu committed Dec 4, 2024
1 parent b4446c2 commit d3b7d3d
Show file tree
Hide file tree
Showing 4 changed files with 261 additions and 190 deletions.
270 changes: 137 additions & 133 deletions reactions/debezium/debezium-reaction/Services/ChangeHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,37 +81,47 @@ private async Task ProcessResults(EventMetadata metadata, Dictionary<string, obj
var noIndent = new JsonSerializerOptions { WriteIndented = false };
foreach (var res in results)
{
// Debezium data change event format has duplicate property names, so we need to serialize manually
var dataChangeEvent = new StringBuilder("{");
var dataChangeEventKey = new DataChangeEventKey();
var dataChangeEventValue = new DataChangeEventValue();
if (_includeKey)
{
if (_includeSchemas)
{
var keySchema = GetKeySchema(metadata);
var keySchemaString = JsonSerializer.Serialize(keySchema, noIndent);
dataChangeEvent.Append($"\"schema\":{keySchemaString},");
dataChangeEventKey.KeySchema = keySchema;
}

var keyPayload = GetKeyPayload(metadata);
var keyPayloadString = JsonSerializer.Serialize(keyPayload, noIndent);
dataChangeEvent.Append($"\"payload\":{keyPayloadString},");
dataChangeEventKey.KeyPayload = keyPayload;
var keyString = JsonSerializer.Serialize(dataChangeEventKey, noIndent);
}

var resultJsonElement = ConvertDictionaryToJsonElement(res);
var valuePayload = GetValuePayload(op, metadata, resultJsonElement);
var valuePayloadString = JsonSerializer.Serialize(valuePayload, noIndent);

if (_includeSchemas)
{
var valueSchema = GetValueSchema(metadata, resultJsonElement);
var valueSchemaString = JsonSerializer.Serialize(valueSchema, noIndent);
dataChangeEvent.Append($"\"schema\":{valueSchemaString},");
dataChangeEventValue.ValueSchema = valueSchema;
}
dataChangeEvent.Append($"\"payload\":{valuePayloadString}");
_ = dataChangeEvent.Append("}");
dataChangeEventValue.ValuePayload = valuePayload;
var eventString = JsonSerializer.Serialize(dataChangeEventValue, noIndent);
if (_includeKey)
{
// We are unable to serialize the key and the value together, as we have duplicate key names
// This will result in the error: `System.InvalidOperationException: The JSON property name for 'Drasi.Reactions.Debezium.Services.DataChangeEvent.schema' collides with another property`
// serialize the key, and then concatenate the value
var eventKeyString = JsonSerializer.Serialize(dataChangeEventKey, noIndent);
if (eventKeyString.Length > 0 && eventKeyString.EndsWith("}"))
{
eventKeyString = eventKeyString.Substring(0, eventKeyString.Length - 1); // Remove the last '}'
}
eventString = eventString.TrimStart('{');

var eventString = dataChangeEvent.ToString();
eventString = eventKeyString + "," + eventString;
}
_logger.LogInformation($"dataChangeEvent: {eventString}");

try
{
var deliveryReport = await _producer.ProduceAsync(_topic, new Message<Null, string> { Value = eventString }, CancellationToken.None);
Expand All @@ -133,33 +143,32 @@ private async Task ProcessResults(EventMetadata metadata, Dictionary<string, obj
}
}

static JsonObject GetKeyPayload(EventMetadata metadata)
static KeyPayload GetKeyPayload(EventMetadata metadata)
{
var keyPayload = new JsonObject
var keyPayload = new KeyPayload
{
{ "id", metadata.Seq.ToString() }
Id = metadata.Seq.ToString()
};
return keyPayload;
}

static JsonObject GetKeySchema(EventMetadata metadata)
static Schema GetKeySchema(EventMetadata metadata)
{
var keySchema = new JsonObject
var keySchema = new Schema
{
{ "type", "struct" },
{ "name", $"{metadata.Connector}.{metadata.QueryId}.Key" },
{ "optional", false }
};
var fields = new JsonArray
{
new JsonObject
Type = "struct",
Name = $"{metadata.Connector}.{metadata.QueryId}.Key",
Optional = false,
Fields = new List<Field>
{
{ "field", "id" },
{ "type", "string" },
{ "optional", false }
new Field
{
SchemaField = "id",
Type = "string",
Optional = false
}
}
};
keySchema.Add("fields", fields);
return keySchema;
}

Expand All @@ -172,153 +181,148 @@ static JsonElement ConvertDictionaryToJsonElement(Dictionary<string, object> dic
return jsonDocument.RootElement.Clone();
}

static JsonObject GetValuePayload(string op, EventMetadata metadata, JsonElement res)
static Payload GetValuePayload(string op, EventMetadata metadata, JsonElement res)
{
JsonObject valueSource = new()
var valuePayload = new Payload
{
{ "version", metadata.Version },
{ "connector", metadata.Connector },
{ "ts_ms", metadata.TsMs },
{ "seq", metadata.Seq }
Source = new Source
{
Version = metadata.Version,
Connector = metadata.Connector,
TimestampMs = metadata.TsMs,
Sequence = metadata.Seq
},
Operation = op,
TimestampMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
};

JsonObject? beforeValue = null;
JsonObject? afterValue = null;
switch (op)
{
case "c":
afterValue = JsonObject.Create(res);
valuePayload.After = res;
break;
case "u":
beforeValue = JsonObject.Create(res.GetProperty("before"));
afterValue = JsonObject.Create(res.GetProperty("after"));
valuePayload.Before = res.GetProperty("before");
valuePayload.After = res.GetProperty("after");
break;
case "d":
beforeValue = JsonObject.Create(res);
valuePayload.Before = res;
break;
default:
throw new Exception("Unknown op: " + op);
}

JsonObject valuePayload = new()
{
{ "before", beforeValue },
{ "after", afterValue },
{ "source", valueSource },
{ "op", op },
{ "ts_ms", System.DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }
};

return valuePayload;
}

static JsonObject GetValueSchema(EventMetadata metadata, JsonElement res)
static Schema GetValueSchema(EventMetadata metadata, JsonElement res)
{
var changeDataFields = GetChangeDataFields(res);
var changeDataFields2 = GetChangeDataFields(res);

var sourceFields = new JsonArray
{
new JsonObject
{
{ "field", "version" },
{ "type", "string" },
{ "optional", false }
},
new JsonObject
{
{ "field", "connector" },
{ "type", "string" },
{ "optional", false }
},
new JsonObject
{
{ "field", "container" },
{ "type", "string" },
{ "optional", false }
},
new JsonObject
{
{ "field", "hostname" },
{ "type", "string" },
{ "optional", false }
},
new JsonObject
{
{ "field", "ts_ms" },
{ "type", "int64" },
{ "optional", false }
},
new JsonObject
var sourceFields = new Field {
Type = "struct",
Optional = false,
Name = $"io.debezium.connector.{metadata.Connector}.Source",
SchemaField = "source",
Fields = new List<Field>
{
{ "field", "seq" },
{ "type", "int64" },
{ "optional", false }
new Field
{
SchemaField = "version",
Type = "string",
Optional = false
},
new Field
{
SchemaField = "connector",
Type = "string",
Optional = false
},
new Field
{
SchemaField = "ts_ms",
Type = "int64",
Optional = false
},
new Field
{
SchemaField = "seq",
Type = "int64",
Optional = false
}
}
};

var valueFields = new JsonArray
var metadataFields = new List<Field>
{
new JsonObject
{
{ "type", "struct" },
{ "fields", changeDataFields },
{ "optional", true},
{ "name", $"{metadata.Connector}.{metadata.QueryId}.Value" },
{ "field", "before" }
},
new JsonObject
{
{ "type", "struct" },
{ "fields", changeDataFields2 },
{ "optional", true},
{ "name", $"{metadata.Connector}.{metadata.QueryId}.Value" },
{ "field", "after" }
},
new JsonObject
new Field
{
{ "type", "struct" },
{ "fields", sourceFields },
{ "optional", false},
{ "name", $"io.debezium.connector.{metadata.Connector}.Source" },
{ "field", "source" }
SchemaField = "op",
Type = "string",
Optional = false
},
new JsonObject
new Field
{
{ "type", "string" },
{ "optional", false},
{ "field", "op" }
},
new JsonObject
{
{ "type", "int64" },
{ "optional", true},
{ "field", "ts_ms" }
SchemaField = "ts_ms",
Type = "int64",
Optional = true
}
};

var valueSchema = new JsonObject
var beforeField = new Field
{
Type = "struct",
Optional = true,
Name = $"{metadata.Connector}.{metadata.QueryId}.Value",
SchemaField = "before",
Fields = GetChangeDataFields(res)
};

var afterField = new Field
{
{ "type", "struct" },
{ "fields", valueFields },
{ "optional", false },
{ "name", $"{metadata.Connector}.{metadata.QueryId}.Envelope" }
Type = "struct",
Optional = true,
Name = $"{metadata.Connector}.{metadata.QueryId}.Value",
SchemaField = "after",
Fields = GetChangeDataFields(res)
};

var valueSchema = new Schema
{
Type = "struct",
Name = $"{metadata.Connector}.{metadata.QueryId}.Value",
Optional = false,
Fields = new List<Field>
{
sourceFields,
beforeField,
afterField,
new Field
{
SchemaField = "op",
Type = "string",
Optional = false
},
new Field
{
SchemaField = "ts_ms",
Type = "int64",
Optional = true
}
}
};
return valueSchema;
}


static JsonArray GetChangeDataFields(JsonElement changeData)
static List<Field> GetChangeDataFields(JsonElement changeData)
{
var changeDataFields = new JsonArray();
var changeDataFields = new List<Field>();
foreach (var prop in changeData.EnumerateObject())
{
changeDataFields.Add(new JsonObject
changeDataFields.Add(new Field
{
{ "field", prop.Name },
{ "type", prop.Value.ValueKind.ToString().ToLower() },
{ "optional", false }
SchemaField = prop.Name,
Type = prop.Value.ValueKind.ToString().ToLower(),
Optional = false
});
}
return changeDataFields;
Expand Down
Loading

0 comments on commit d3b7d3d

Please sign in to comment.