3.控制结构
  AnyLlCIhvKpr 2023年11月12日 37 0


分支

可以在工作流中定义多个分支,更根据表达式的值选择执行哪个分支。

使用CreateBranch()方法定义分支

//创建两个分支
var branch1 = builder.CreateBranch()
    .StartWith<PrintMessage>()
        .Input(step => step.Message, data => "hi from 1")
    .Then<PrintMessage>()
        .Input(step => step.Message, data => "bye from 1");

var branch2 = builder.CreateBranch()
    .StartWith<PrintMessage>()
        .Input(step => step.Message, data => "hi from 2")
    .Then<PrintMessage>()
        .Input(step => step.Message, data => "bye from 2");

//如果data.value1的值为one,则执行branch1
//如果data.value1的值为two,则执行branch2
builder
    .StartWith<HelloWorld>()
    .Decide(data => data.Value1)
        .Branch((data, outcome) => data.Value1 == "one", branch1)
        .Branch((data, outcome) => data.Value1 == "two", branch2);

foreach

public class ForEachWorkflow : IWorkflow
{
    public string Id => "Foreach";
    public int Version => 1;

    public void Build(IWorkflowBuilder<object> builder)
    {
        builder
            .StartWith<SayHello>()
            .ForEach(data => new List<int>() { 1, 2, 3, 4 })
                .Do(x => x
                    .StartWith<DisplayContext>()
                        .Input(step => step.Message, (data, context) => context.Item)
                    .Then<DoSomething>())
            .Then<SayGoodbye>();
    }        
}

while

public class WhileWorkflow : IWorkflow<MyData>
{
    public string Id => "While";
    public int Version => 1;

    public void Build(IWorkflowBuilder<MyData> builder)
    {
        builder
            .StartWith<SayHello>()
            .While(data => data.Counter < 3)
                .Do(x => x
                    .StartWith<DoSomething>()
                    .Then<IncrementStep>()
                        .Input(step => step.Value1, data => data.Counter)
                        .Output(data => data.Counter, step => step.Value2))
            .Then<SayGoodbye>();
    }        
}

if

public class IfWorkflow : IWorkflow<MyData>
{ 
    public void Build(IWorkflowBuilder<MyData> builder)
    {
        builder
            .StartWith<SayHello>()
            .If(data => data.Counter < 3).Do(then => then
                .StartWith<PrintMessage>()
                    .Input(step => step.Message, data => "Value is less than 3")
            )
            .If(data => data.Counter < 5).Do(then => then
                .StartWith<PrintMessage>()
                    .Input(step => step.Message, data => "Value is less than 5")
            )
            .Then<SayGoodbye>();
    }        
}

并行

public class ParallelWorkflow : IWorkflow<MyData>
{
    public string Id => "parallel-sample";
    public int Version => 1;

    public void Build(IWorkflowBuilder<MyData> builder)
    {
        builder
            .StartWith<SayHello>()
            .Parallel()
                .Do(then => 
                    then.StartWith<Task1dot1>()
                        .Then<Task1dot2>()
                .Do(then =>
                    then.StartWith<Task2dot1>()
                        .Then<Task2dot2>()
                .Do(then =>
                    then.StartWith<Task3dot1>()
                        .Then<Task3dot2>()
            .Join()
            .Then<SayGoodbye>();
    }        
}

按计划执行

在未来按照计划执行,使得工作流在后台异步执行

builder
    .StartWith(context => Console.WriteLine("Hello"))
    .Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule => schedule
        .StartWith(context => Console.WriteLine("Doing scheduled tasks"))
    )
    .Then(context => Console.WriteLine("Doing normal tasks"));

延迟执行

延迟工作流的当前分支一段时间

builder
    .StartWith(context => Console.WriteLine("Hello"))
    .Delay(p => TimeSpan.FromSeconds(5) );

重复

设置一组重复的后台步骤,知道满足特定条件

builder
    .StartWith(context => Console.WriteLine("Hello"))
    .Recur(data => TimeSpan.FromSeconds(5), data => data.Counter > 5).Do(recur => recur
        .StartWith(context => Console.WriteLine("Doing recurring task"))
    )
    .Then(context => Console.WriteLine("Carry on"));

案例 Parallel ForEach

  • steps

public class SayHello : StepBody
{
    public override ExecutionResult Run(IStepExecutionContext context)
    {
        Console.WriteLine("Hello");
        return ExecutionResult.Next();
    }
}
public class DisplayContext : StepBody
{        

    public object Item { get; set; }

    public override ExecutionResult Run(IStepExecutionContext context)
    {
        Console.WriteLine($"Working on item {Item}");
        return ExecutionResult.Next();
    }
}
public class DoSomething : StepBody
{
    public override ExecutionResult Run(IStepExecutionContext context)
    {
        Console.WriteLine("Doing something...");
        return ExecutionResult.Next();
    }
}
public class SayGoodbye : StepBody
{
    public override ExecutionResult Run(IStepExecutionContext context)
    {
        Console.WriteLine("Goodbye");
        return ExecutionResult.Next();
    }
}
  • workflow

public class ForEachWorkflow : IWorkflow
{
    public string Id => "Foreach";
    public int Version => 1;

    public void Build(IWorkflowBuilder<object> builder)
    {
        builder
            .StartWith<SayHello>()
            .ForEach(data => new List<int> { 1, 2, 3, 4 })
                .Do(x => x
                    .StartWith<DisplayContext>()
                        .Input(step => step.Item, (data, context) => context.Item)
                    .Then<DoSomething>())
            .Then<SayGoodbye>();
    }        
}
  • program

IServiceCollection services = new ServiceCollection();
services.AddLogging();
services.AddWorkflow();
var serviceProvider = services.BuildServiceProvider();
var host = serviceProvider.GetService<IWorkflowHost>();
host.RegisterWorkflow<ForEachWorkflow>();
host.Start();
Console.WriteLine("Starting workflow...");
string workflowId = host.StartWorkflow("Foreach").Result;
Console.ReadLine();
host.Stop();

3.控制结构

案例If

  • steps

public class PrintMessage : StepBody
{
    public string Message { get; set; }

    public override ExecutionResult Run(IStepExecutionContext context)
    {
        Console.WriteLine(Message);
        return ExecutionResult.Next();
    }
}
public class SayGoodbye : StepBody
{
    public override ExecutionResult Run(IStepExecutionContext context)
    {
        Console.WriteLine("Goodbye");
        return ExecutionResult.Next();
    }
}
public class SayHello : StepBody
{
    public override ExecutionResult Run(IStepExecutionContext context)
    {
        Console.WriteLine("Hello");
        return ExecutionResult.Next();
    }
}
  • workflow

public class MyData
{
    public int Counter { get; set; }
}
public class IfWorkflow : IWorkflow<MyData>
{
    public string Id => "if-sample";
    public int Version => 1;

    public void Build(IWorkflowBuilder<MyData> builder)
    {
        builder
            .StartWith<SayHello>()
            .If(data => data.Counter < 3).Do(then => then
                .StartWith<PrintMessage>()
                    .Input(step => step.Message, data => "Value is less than 3")
            )
            .If(data => data.Counter < 5).Do(then => then
                .StartWith<PrintMessage>()
                    .Input(step => step.Message, data => "Value is less than 5")
            )
            .Then<SayGoodbye>();
    }        
}
  • program

IServiceCollection services = new ServiceCollection();
services.AddLogging();
services.AddWorkflow();
var serviceProvider = services.BuildServiceProvider();
var host = serviceProvider.GetService<IWorkflowHost>();
host.RegisterWorkflow<IfWorkflow, MyData>();
host.Start();

Console.WriteLine("Starting workflow...");
string workflowId = host.StartWorkflow("if-sample", new MyData { Counter = 4 }).Result;

Console.ReadLine();
host.Stop();

3.控制结构

案例 Parallel Tasks

  • steps

public class PrintMessage : StepBody
{
    public string Message { get; set; }

    public override ExecutionResult Run(IStepExecutionContext context)
    {
        Console.WriteLine("当前线程:"+ Thread.GetCurrentProcessorId().ToString()+ "message:"+Message);
        return ExecutionResult.Next();
    }
}
public class SayHello : StepBody
{
    public override ExecutionResult Run(IStepExecutionContext context)
    {
        Console.WriteLine("Hello");
        return ExecutionResult.Next();
    }
}
public class SayGoodbye : StepBody
{
    public override ExecutionResult Run(IStepExecutionContext context)
    {
        Console.WriteLine("Goodbye");
        return ExecutionResult.Next();
    }
}
  • workflow

public class MyData
{
    public int Counter { get; set; }
}
public class ParallelWorkflow : IWorkflow<MyData>
{
    public string Id => "parallel-sample";
    public int Version => 1;

    public void Build(IWorkflowBuilder<MyData> builder)
    {
        builder
            .StartWith<SayHello>()
            .Parallel()
                .Do(then => 
                    then.StartWith<PrintMessage>()
                            .Input(step => step.Message, data => "Item 1.1")
                        .Then<PrintMessage>()
                            .Input(step => step.Message, data => "Item 1.2"))
                .Do(then =>
                    then.StartWith<PrintMessage>()
                            .Input(step => step.Message, data => "Item 2.1")
                        .Then<PrintMessage>()
                            .Input(step => step.Message, data => "Item 2.2")
                        .Then<PrintMessage>()
                            .Input(step => step.Message, data => "Item 2.3"))
                .Do(then =>
                    then.StartWith<PrintMessage>()
                            .Input(step => step.Message, data => "Item 3.1")
                        .Then<PrintMessage>()
                            .Input(step => step.Message, data => "Item 3.2"))
            .Join()
            .Then<SayGoodbye>();
    }        
}
  • program

IServiceCollection services = new ServiceCollection();
services.AddLogging();
services.AddWorkflow();
var serviceProvider = services.BuildServiceProvider();
var host = serviceProvider.GetService<IWorkflowHost>();
var controller = serviceProvider.GetService<IWorkflowController>();
controller.RegisterWorkflow<ParallelWorkflow, MyData>();//使用容器进行注册

Console.WriteLine("Starting workflow...");
controller.StartWorkflow<MyData>("parallel-sample");//使用容器开始

Console.ReadLine();
host.Stop();

3.控制结构

案例Schedule

  • workflow

class ScheduleWorkflow : IWorkflow
{
    public string Id => "schedule-sample";
    public int Version => 1;

    public void Build(IWorkflowBuilder<object> builder)
    {
        builder
            .StartWith(context => Console.WriteLine("Hello"))
            .Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule => schedule
                .StartWith(context => Console.WriteLine("Doing scheduled tasks"))
            )
            .Then(context => { Console.WriteLine("Doing normal tasks"); Console.WriteLine("等待5s"); });
    }
}

3.控制结构

案例Recur

public class MyData
{
    public int Counter { get; set; }
}
class RecurSampleWorkflow : IWorkflow<MyData>
{
    public string Id => "recur-sample";
    public int Version => 1;

    public void Build(IWorkflowBuilder<MyData> builder)
    {
        builder
            .StartWith(context => Console.WriteLine("Hello"))
            //5s一次,直到data.Counter > 5
            .Recur(data => TimeSpan.FromSeconds(5), data => data.Counter > 5).Do(recur => recur
                .StartWith(context => Console.WriteLine("Doing recurring task"))
            )
            .Then(context => Console.WriteLine("Carry on"));
    }
}

3.控制结构

案例Branch

  • steps

public class PrintMessage : StepBody
{
    public string Message { get; set; }

    public override ExecutionResult Run(IStepExecutionContext context)
    {
        Console.WriteLine(Message);
        return ExecutionResult.Next();
    }
}
public class SayHello : StepBody
{
    public override ExecutionResult Run(IStepExecutionContext context)
    {
        Console.WriteLine("Hello");
        return ExecutionResult.Next();
    }
}
  • workflow

public class MyData
{
    public int Value { get; set; }
}

public class OutcomeWorkflow : IWorkflow<MyData>
{
    public string Id => "outcome-sample";
    public int Version => 1;

    public void Build(IWorkflowBuilder<MyData> builder)
    {
        var branch1 = builder.CreateBranch()
            .StartWith<PrintMessage>()
                .Input(step => step.Message, data => "hi from 1")
            .Then<PrintMessage>()
                .Input(step => step.Message, data => "bye from 1");

        var branch2 = builder.CreateBranch()
            .StartWith<PrintMessage>()
                .Input(step => step.Message, data => "hi from 2")
            .Then<PrintMessage>()
                .Input(step => step.Message, data => "bye from 2");
//如果data.value1的值为one,则执行branch1
//如果data.value1的值为two,则执行branch2
        builder
            .StartWith<SayHello>()
            .Decide(data => data.Value)
                .Branch(1, branch1)
                .Branch(2, branch2);
    }        
}
  • program

IServiceCollection services = new ServiceCollection();
services.AddLogging();
services.AddWorkflow();
var serviceProvider = services.BuildServiceProvider();
var host = serviceProvider.GetService<IWorkflowHost>();
host.RegisterWorkflow<OutcomeWorkflow, MyData>();
host.Start();

Console.WriteLine("Starting workflow...");
host.StartWorkflow("outcome-sample", new MyData { Value = 2 });


Console.ReadLine();
host.Stop();

3.控制结构


【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月12日 0

暂无评论

AnyLlCIhvKpr