Skip to content

Pattern 1: Pipeline

pewsou edited this page Jun 18, 2022 · 5 revisions

Pipeline can be defined as sequence of code blocks/subroutines. They're organized in sequential order, ended by 'summary' block. Data are organized in a queue, and are feed item-by-item to first block. Results of processing by first block fed to second and so on, up to summary. Available CPUs share the work between subroutines, thus introducing some degree of parallel execution.

Example of usage:

Create new pipeline object:

ASFKPipelinePar* pipeline=[[ASFKPipelinePar alloc]initWithName:@"Pipeline"];

Add summary subroutine:

BOOL res = [pipeline setSummary:^id(id<ASFKControlCallback> controlBlock,NSDictionary* stats, id data) { return nil; }];

Add progress tracking routine (optional):

res=[pipeline setProgressRoutine:^id(NSUInteger stage, NSUInteger accomplished, NSUInteger outOf, id exData) { return nil; }];

Add cancellation routine (optional, yet recommended; needed for cases when pipeline processing should be cancelled):

res=[pipeline setCancellationHandler:^id(id identity) { return nil; }];

Add first code block to processing sequence:

res=[pipeline addRoutine: ^id(id<ASFKControlCallback> controlBlock, id data) { return nil; }];

Now the pipeline is set up. Many pipelines may be created. Data processing is, however, done by sessions; pipeline object may contain many independent sessions; the sessions by default will use the same processing routines, summaries and other code blocks. However they will use copies of these.

If non-default set of routines is required, then it is provided by next:

ASFKExecutionParams* params=[ASFKExecutionParams new];

params->progressProc=^id(NSUInteger stage,NSUInteger accomplished ,NSUInteger outOf,id exData){ return @"progress"; };

params->procs= @[^id(id<ASFKControlCallback> controlBlock, id data){ return @"Routine"; }];

params->SummaryRoutine = ^id(id<ASFKControlCallback> controlBlock,NSDictionary* stats,id data){ return @"sum"; };

params->cancellationProc = ^id(id identity){ return @"cancel"; };

Now we can create a session for this pipeline

[pipeline createSession:params sessionId:@"Session 1"];

After the session was created, we can fill the data:

NSDictionary* d= [pipeline castArray:@[@"1",@"2"] session:@"Session 1" exParam:nil];

the results will be accumulated in 'summary'.

Pausing

Execution of session can be paused:

[pipeline pauseSession:@"sessionId"];

the session will stop execution until 'resume' method called:

[pipeline resumeSession:@"sessionId"];

Session may be flushed from data: in this case all data that currently is batched for execution will be discarded:

[pipeline flushSession:@"sessionId"];

Cancellation

Session may be cancelled, i.e. its execution will be stopped, and the session will be destroyed.

[pipeline cancelSession:@"sessionId"];

If there was defined a routine 'cancellationHandler' it would be called upon the cancellation.

Clone this wiki locally