diff --git a/TemporalioSamples.sln b/TemporalioSamples.sln index 3299e81..21b0be6 100644 --- a/TemporalioSamples.sln +++ b/TemporalioSamples.sln @@ -57,6 +57,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.ContextPr EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.SafeMessageHandlers", "src\SafeMessageHandlers\TemporalioSamples.SafeMessageHandlers.csproj", "{FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.MessagePassing", "src\MessagePassing\TemporalioSamples.MessagePassing.csproj", "{641E4B08-E599-4D12-B31C-C3BA4B35C716}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -155,6 +157,10 @@ Global {FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}.Debug|Any CPU.Build.0 = Debug|Any CPU {FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}.Release|Any CPU.ActiveCfg = Release|Any CPU {FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}.Release|Any CPU.Build.0 = Release|Any CPU + {641E4B08-E599-4D12-B31C-C3BA4B35C716}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {641E4B08-E599-4D12-B31C-C3BA4B35C716}.Debug|Any CPU.Build.0 = Debug|Any CPU + {641E4B08-E599-4D12-B31C-C3BA4B35C716}.Release|Any CPU.ActiveCfg = Release|Any CPU + {641E4B08-E599-4D12-B31C-C3BA4B35C716}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -185,5 +191,6 @@ Global {B3DB7B8C-7BD3-4A53-A809-AB6279B1A630} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} {7B797D20-485F-441D-8E71-AF7E315FA9CF} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} {FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} + {641E4B08-E599-4D12-B31C-C3BA4B35C716} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} EndGlobalSection EndGlobal diff --git a/src/MessagePassing/GreetingWorkflow.workflow.cs b/src/MessagePassing/GreetingWorkflow.workflow.cs new file mode 100644 index 0000000..31cb428 --- /dev/null +++ b/src/MessagePassing/GreetingWorkflow.workflow.cs @@ -0,0 +1,70 @@ +namespace TemporalioSamples.MessagePassing; + +using Temporalio.Exceptions; +using Temporalio.Workflows; + +[Workflow] +public class GreetingWorkflow +{ + public enum Language + { + Chinese, + English, + French, + Spanish, + Portuguese, + } + + public record GetLanguagesInput(bool IncludeUnsupported); + + public record ApproveInput(string Name); + + private static readonly Dictionary Greetings = new() + { + [Language.English] = "Hello, world", + [Language.Chinese] = "你好,世界", + }; + + private bool approvedForRelease; + private string? approverName; + + [WorkflowRun] + public async Task RunAsync() + { + await Workflow.WaitConditionAsync(() => approvedForRelease); + return Greetings[CurrentLanguage]; + } + + [WorkflowQuery] + public IList GetLanguages(GetLanguagesInput input) => + Enum.GetValues(). + Where(language => input.IncludeUnsupported || Greetings.ContainsKey(language)). + ToList(); + + [WorkflowQuery] + public Language CurrentLanguage { get; private set; } = GreetingWorkflow.Language.English; + + [WorkflowSignal] + public async Task ApproveAsync(ApproveInput input) + { + approvedForRelease = true; + approverName = input.Name; + } + + [WorkflowUpdateValidator(nameof(SetCurrentLanguageAsync))] + public void ValidateLanguage(Language language) + { + if (!Greetings.ContainsKey(language)) + { + throw new ApplicationFailureException($"{language} is not supported"); + } + } + + [WorkflowUpdate] + public async Task SetCurrentLanguageAsync(Language language) + { + var previousLanguage = CurrentLanguage; + CurrentLanguage = language; + return previousLanguage; + } +} \ No newline at end of file diff --git a/src/MessagePassing/Program.cs b/src/MessagePassing/Program.cs new file mode 100644 index 0000000..3726a71 --- /dev/null +++ b/src/MessagePassing/Program.cs @@ -0,0 +1,90 @@ +using Microsoft.Extensions.Logging; +using Temporalio.Client; +using Temporalio.Worker; +using TemporalioSamples.MessagePassing; + +// Create a client to localhost on default namespace +using var loggerFactory = LoggerFactory.Create(builder => + builder. + AddSimpleConsole(options => options.TimestampFormat = "[HH:mm:ss] "). + SetMinimumLevel(LogLevel.Information)); +var client = await TemporalClient.ConnectAsync(new("localhost:7233") +{ + LoggerFactory = loggerFactory, +}); +var logger = loggerFactory.CreateLogger(); + +async Task RunWorkerAsync() +{ + // Cancellation token cancelled on ctrl+c + using var tokenSource = new CancellationTokenSource(); + Console.CancelKeyPress += (_, eventArgs) => + { + tokenSource.Cancel(); + eventArgs.Cancel = true; + }; + + // Run worker until cancelled + logger.LogInformation("Running worker"); + using var worker = new TemporalWorker( + client, + new TemporalWorkerOptions(taskQueue: "message-passing-sample"). + AddWorkflow()); + try + { + await worker.ExecuteAsync(tokenSource.Token); + } + catch (OperationCanceledException) + { + logger.LogInformation("Worker cancelled"); + } +} + +async Task ExecuteWorkflowAsync() +{ + // Start workflow + var workflowHandle = await client.StartWorkflowAsync( + (GreetingWorkflow wf) => wf.RunAsync(), + new(id: "message-passing-workflow-id", taskQueue: "message-passing-sample")); + + logger.LogInformation( + "Supported languages: {Languages}", + await workflowHandle.QueryAsync(wf => wf.GetLanguages(new(false)))); + + var previousLanguage = await workflowHandle.ExecuteUpdateAsync( + wf => wf.SetCurrentLanguageAsync(GreetingWorkflow.Language.Chinese)); + var currentLanguage = await workflowHandle.QueryAsync(wf => wf.CurrentLanguage); + logger.LogInformation( + "Languages changed: {PreviousLanguage} -> {CurrentLanguage}", + previousLanguage, + currentLanguage); + + var updateHandle = await workflowHandle.StartUpdateAsync( + wf => wf.SetCurrentLanguageAsync(GreetingWorkflow.Language.English), + new(WorkflowUpdateStage.Accepted)); + previousLanguage = await updateHandle.GetResultAsync(); + currentLanguage = await workflowHandle.QueryAsync(wf => wf.CurrentLanguage); + logger.LogInformation( + "Languages changed: {PreviousLanguage} -> {CurrentLanguage}", + previousLanguage, + currentLanguage); + + await workflowHandle.SignalAsync(wf => wf.ApproveAsync(new("MyUser"))); + logger.LogInformation("Result: {Result}", await workflowHandle.GetResultAsync()); +} + +if (args.Length > 1) +{ + throw new ArgumentException("Must pass 'worker' or 'workflow' as the single argument"); +} +switch (args.ElementAtOrDefault(0)) +{ + case "worker": + await RunWorkerAsync(); + break; + case "workflow": + await ExecuteWorkflowAsync(); + break; + default: + throw new ArgumentException("Must pass 'worker' or 'workflow' as the single argument"); +} \ No newline at end of file diff --git a/src/MessagePassing/TemporalioSamples.MessagePassing.csproj b/src/MessagePassing/TemporalioSamples.MessagePassing.csproj new file mode 100644 index 0000000..e3b6154 --- /dev/null +++ b/src/MessagePassing/TemporalioSamples.MessagePassing.csproj @@ -0,0 +1,7 @@ + + + + Exe + + + \ No newline at end of file