Skip to content

Commit 111960f

Browse files
committed
Improved process orders sample
1 parent 7567fda commit 111960f

File tree

4 files changed

+46
-71
lines changed

4 files changed

+46
-71
lines changed

Samples/Sample.ConsoleApp/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ internal static class Program
77
{
88
private static async Task Main()
99
{
10-
await TravelAgency.Example.Perform();
10+
await WorkDistribution.Example.Perform();
1111

1212
Console.ReadLine();
1313
}

Samples/Sample.ConsoleApp/WorkDistribution/Example.cs

Lines changed: 24 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
using System.Threading.Tasks;
55
using Cleipnir.ResilientFunctions;
66
using Cleipnir.ResilientFunctions.Domain;
7-
using Cleipnir.ResilientFunctions.Helpers;
87
using Cleipnir.ResilientFunctions.MariaDb;
98
using Cleipnir.ResilientFunctions.PostgreSQL;
109
using Cleipnir.ResilientFunctions.SqlServer;
@@ -16,30 +15,35 @@ public static class Example
1615
{
1716
public static async Task Perform()
1817
{
19-
var postgresStore = new PostgreSqlFunctionStore("Server=localhost;Database=rfunctions;User Id=postgres;Password=Pa55word!; Include Error Detail=true;");
18+
const string postgresConnStr = "Server=localhost;Database=cleipnir_samples;User Id=postgres;Password=Pa55word!;Include Error Detail=true;";
19+
await Cleipnir.ResilientFunctions.PostgreSQL.DatabaseHelper.CreateDatabaseIfNotExists(postgresConnStr);
20+
var postgresStore = new PostgreSqlFunctionStore(postgresConnStr);
2021
await postgresStore.Initialize();
2122
await postgresStore.TruncateTables();
22-
var sqlServerStore = new SqlServerFunctionStore("Server=localhost;Database=rfunctions;User Id=sa;Password=Pa55word!;Encrypt=True;TrustServerCertificate=True;Max Pool Size=200;");
23+
24+
const string sqlServerConnStr = "Server=localhost;Database=CleipnirSamples;User Id=sa;Password=Pa55word!;Encrypt=True;TrustServerCertificate=True;Max Pool Size=200;";
25+
await Cleipnir.ResilientFunctions.SqlServer.DatabaseHelper.CreateDatabaseIfNotExists(sqlServerConnStr);
26+
var sqlServerStore = new SqlServerFunctionStore(sqlServerConnStr);
2327
await sqlServerStore.Initialize();
2428
await sqlServerStore.TruncateTables();
25-
var mariaDbStore = new MariaDbFunctionStore("server=localhost;userid=root;password=Pa55word!;database=rfunctions;AllowPublicKeyRetrieval=True;");
29+
30+
const string mariaDbConnStr = "server=localhost;userid=root;password=Pa55word!;database=cleipnir_samples;AllowPublicKeyRetrieval=True;";
31+
await Cleipnir.ResilientFunctions.MariaDb.DatabaseHelper.CreateDatabaseIfNotExists(mariaDbConnStr);
32+
var mariaDbStore = new MariaDbFunctionStore(mariaDbConnStr);
2633
await mariaDbStore.Initialize();
2734
await mariaDbStore.TruncateTables();
2835

2936
Console.WriteLine();
3037
Console.WriteLine("Postgres: ");
31-
var postgresTask = Task.Run(() => Perform(postgresStore));
38+
await Perform(postgresStore);
3239
Console.WriteLine();
3340
Console.WriteLine("SqlServer:");
34-
var sqlServerTask = Task.Run(() => Perform(sqlServerStore));
41+
await Perform(sqlServerStore);
3542
Console.WriteLine();
36-
Console.WriteLine("MySql:");
37-
var mariaDbTask = Task.Run(() => Perform(mariaDbStore));
38-
39-
await postgresTask;
40-
await sqlServerTask;
41-
await mariaDbTask;
43+
Console.WriteLine("MariaDB:");
44+
await Perform(mariaDbStore);
4245

46+
Console.WriteLine();
4347
Console.WriteLine("All completed!");
4448
}
4549

@@ -48,36 +52,23 @@ private static async Task Perform(IFunctionStore store)
4852
Console.WriteLine("Started: " + store.GetType().Name);
4953

5054
await store.Initialize();
51-
var functions = new FunctionsRegistry(
52-
store,
53-
new Settings(unhandledExceptionHandler: Console.WriteLine, watchdogCheckFrequency: TimeSpan.FromSeconds(60), leaseLength: TimeSpan.FromSeconds(5))
54-
);
55+
var registry = new FunctionsRegistry(store, new Settings(unhandledExceptionHandler: Console.WriteLine));
5556

56-
var processOrder = functions.RegisterAction<ProcessOrderRequest>(
57+
var processOrder = registry.RegisterAction<string>(
5758
"ProcessOrder",
5859
ProcessOrder.Execute
5960
);
60-
ProcessOrders.ProcessOrder!.Value = processOrder;
61-
var processOrders = functions.RegisterAction<List<string>>(
61+
ProcessOrders.ProcessOrder = processOrder;
62+
var processOrders = registry.RegisterAction<List<string>>(
6263
"ProcessOrders",
6364
ProcessOrders.Execute
6465
);
65-
ProcessOrder.MessageWriters = processOrders.MessageWriters;
66-
66+
6767
var orderIds = Enumerable
68-
.Range(0, 150)
69-
.Select(_ => Guid.NewGuid().ToString()) //Random.Shared.Next(1000, 9999)
68+
.Range(100, 150)
69+
.Select(id => $"MK-{id}")
7070
.ToList();
71-
await processOrders.Schedule("2024-01-27", orderIds);
72-
73-
var controlPanel = await processOrders.ControlPanel("2024-01-27");
74-
75-
await BusyWait.Until(async () =>
76-
{
77-
await controlPanel!.Refresh();
78-
return controlPanel.Status == Status.Succeeded;
79-
}, maxWait: TimeSpan.FromSeconds(60));
8071

81-
Console.WriteLine("Completed: " + store.GetType().Name);
72+
await processOrders.Schedule("2024-01-27", orderIds).Completion();
8273
}
8374
}

Samples/Sample.ConsoleApp/WorkDistribution/ProcessOrder.cs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,21 @@
11
using System;
22
using System.Threading.Tasks;
33
using Cleipnir.ResilientFunctions.CoreRuntime.Invocation;
4-
using Cleipnir.ResilientFunctions.Domain;
5-
using Cleipnir.ResilientFunctions.Messaging;
64

75
namespace ConsoleApp.WorkDistribution;
86

9-
public record ProcessOrderRequest(string OrderId, FlowId SendResultTo);
10-
117
public static class ProcessOrder
128
{
13-
public static MessageWriters? MessageWriters { get; set; }
14-
15-
public static async Task Execute(ProcessOrderRequest request, Workflow workflow)
9+
public static async Task Execute(string orderId, Workflow workflow)
1610
{
17-
var (orderId, sendResultTo) = request;
11+
Console.WriteLine($"{orderId}: Started processing order");
12+
1813
await ReserveFunds(orderId);
19-
var trackingNumber = await ShipOrder(orderId);
14+
var trackAndTraceNumber = await ShipOrder(orderId);
2015
await CaptureFunds(orderId);
21-
22-
await MessageWriters!.For(sendResultTo.Instance).AppendMessage(
23-
new FunctionCompletion<string>(trackingNumber, workflow.FlowId),
24-
idempotencyKey: workflow.FlowId.ToString()
25-
);
16+
await EmailOrderConfirmation(orderId, trackAndTraceNumber);
17+
18+
Console.WriteLine($"{orderId}: Finished processing order");
2619
}
2720

2821
private static async Task ReserveFunds(string orderId)
@@ -45,4 +38,10 @@ private static async Task CaptureFunds(string orderId)
4538
await Task.Delay(Random.Shared.Next(250, 1000));
4639
Console.WriteLine($"{orderId}: Funds captured");
4740
}
41+
42+
private static async Task EmailOrderConfirmation(string orderId, string trackAndTraceNumber)
43+
{
44+
await Task.Delay(Random.Shared.Next(250, 1000));
45+
Console.WriteLine($"{orderId}: Order confirmation sent with track and trace number '{trackAndTraceNumber}'");
46+
}
4847
}
Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,32 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Linq;
4-
using System.Threading;
54
using System.Threading.Tasks;
65
using Cleipnir.ResilientFunctions;
76
using Cleipnir.ResilientFunctions.CoreRuntime.Invocation;
8-
using Cleipnir.ResilientFunctions.Messaging;
9-
using Cleipnir.ResilientFunctions.Reactive.Extensions;
7+
using Cleipnir.ResilientFunctions.Domain;
108

119
namespace ConsoleApp.WorkDistribution;
1210

1311
public static class ProcessOrders
1412
{
15-
public static AsyncLocal<ActionRegistration<ProcessOrderRequest>>? ProcessOrder { get; } = new();
13+
public static ActionRegistration<string>? ProcessOrder { get; set; }
1614

1715
public static async Task Execute(List<string> orderIds, Workflow workflow)
1816
{
19-
var (effect, messages, _) = workflow;
17+
var (effect, _, _) = workflow;
2018
await effect.Capture(
2119
"Log_ProcessingStarted",
22-
() => Console.WriteLine("Processing of orders started")
20+
() => Console.WriteLine($"Processing of orders started ({orderIds.Count})")
2321
);
2422

25-
await effect.Capture(
26-
"ScheduleOrders",
27-
async () =>
28-
{
29-
foreach (var orderId in orderIds)
30-
await ProcessOrder!.Value!.Schedule(
31-
flowInstance: orderId,
32-
new ProcessOrderRequest(orderId, workflow.FlowId)
33-
);
34-
}
35-
);
36-
37-
await messages
38-
.OfType<FunctionCompletion<string>>()
39-
.Take(orderIds.Count)
23+
await ProcessOrder!
24+
.BulkSchedule(orderIds.Select(id => new BulkWork<string>(id, id)))
4025
.Completion();
41-
26+
4227
await effect.Capture(
4328
"Log_ProcessingFinished",
44-
() => Console.WriteLine($"Processing of orders completed - total: '{orderIds.Count}'")
29+
() => Console.WriteLine($"Processing of orders completed ({orderIds.Count})")
4530
);
4631
}
4732
}

0 commit comments

Comments
 (0)