Skip to content

Commit

Permalink
Mailbox+Pipeline: bug fixing+new feature
Browse files Browse the repository at this point in the history
Bugs fixing, Mailbox: noderation introduced; pipeline-testing
  • Loading branch information
pewsou committed Jun 18, 2022
1 parent f291a3d commit 9ad8dab
Show file tree
Hide file tree
Showing 32 changed files with 1,317 additions and 384 deletions.
1 change: 1 addition & 0 deletions src/ASFKBase+Internal.mm
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
//
// Copyright © 2019-2022 Boris Vigman. All rights reserved.
//

Expand Down
2 changes: 1 addition & 1 deletion src/ASFKBase+Statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

//
// Copyright © 2019-2022 Boris Vigman. All rights reserved.
//

Expand Down
2 changes: 1 addition & 1 deletion src/ASFKBase+Statistics.mm
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

//
// Copyright © 2019-2022 Boris Vigman. All rights reserved.
//

Expand Down
37 changes: 28 additions & 9 deletions src/ASFKBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#import <Foundation/Foundation.h>
#import "ASFKPrjConfig.h"

#define ASFK_VERSION @"0.2.2"
#define ASFK_VERSION @"0.3.1"
#define ASFK_IDENTITY_TYPE id

#ifdef __ASFK_VERBOSE_PRINTING__
Expand Down Expand Up @@ -80,11 +80,25 @@

#import <atomic>
#import <vector>

/*!
@brief modes of dropping
*/
enum eASFKQDroppingPolicy{
/*!
@brief drop the newest item
*/
E_ASFK_Q_DP_TAIL=0,
/*!
@brief drop the oldest item
*/
E_ASFK_Q_DP_HEAD,
/*!
@brief don't drop, reject new candidate
*/
E_ASFK_Q_DP_REJECT,
/*!
@brief select item for dropping using some algorithm
*/
E_ASFK_Q_DP_ALGO
};

Expand Down Expand Up @@ -128,6 +142,8 @@ typedef id ( ^ASFKThreadpoolSummary)(void);
@return YES if flush attempt was issued; NO otherwise.
*/
-(BOOL) flushRequested;
-(void) setPaused:(BOOL) yesno;
-(BOOL) isPaused;
-(void) reset;
@end

Expand Down Expand Up @@ -158,7 +174,7 @@ typedef id ( ^ASFKProgressRoutine)(NSUInteger stage,NSUInteger accomplished ,NSU
@protected std::atomic<NSUInteger> indexSecondary;

@public std::atomic< BOOL> flushed;
@public std::atomic< BOOL> paused;
@protected std::atomic< BOOL> paused;
}
@property (readonly) ASFK_IDENTITY_TYPE sessionId;
@property (readonly) ASFK_IDENTITY_TYPE parentId;
Expand All @@ -175,6 +191,7 @@ typedef id ( ^ASFKExecutableRoutine)(id<ASFKControlCallback> controlBlock, id da

typedef id ( ^ASFKExecutableRoutineSummary)(id<ASFKControlCallback> controlBlock,NSDictionary* stats,id data);
typedef id ( ^ASFKCancellationRoutine)(id identity);
typedef id ( ^ASFKOnPauseNotification)(id identity, BOOL paused);

/**
@param controlBlock object controlling the execution
Expand Down Expand Up @@ -268,10 +285,11 @@ typedef BOOL ( ^ASFKExecutableRoutineLoopConditional)(id<ASFKControlCallback> c

@interface ASFKExecutionParams:NSObject{
@public ASFKProgressRoutine progressProc;
@public ASFKExecutableRoutineSummary SummaryRoutine;
@public ASFKExecutableRoutineSummary summaryRoutine;
@public NSArray<ASFKExecutableRoutine>* procs;
@public ASFKCancellationRoutine cancellationProc;
@public ASFKExpirationCondition* expCondition;
@public ASFKOnPauseNotification onPauseProc;
}
@end

Expand Down Expand Up @@ -305,15 +323,19 @@ typedef enum enumASFKPipelineExecutionStatus{
eASFK_ES_SKIPPED_MAINT
} eASFKThreadpoolExecutionStatus;

typedef id ( ^ASFKThreadpoolSessionCancelProc)(id sessionId);
@interface ASFKThreadpoolSession : ASFKBase{

@public ASFKControlBlock* cblk;
@protected ASFKExecutableRoutineSummary passSummary;
@protected ASFKExecutableRoutineSummary expirationSummary;
@protected ASFKCancellationRoutine cancellationHandler;
@protected ASFKCancellationRoutine cancellationHandler;
@protected NSMutableArray<ASFKExecutableRoutine>* procs;
@protected ASFKExpirationCondition* excond;
@public ASFKOnPauseNotification onPauseNotification;
@public std::atomic<BOOL> isStopped;
@public std::atomic<BOOL> paused;
@public ASFKThreadpoolSessionCancelProc cancellationProc;
}
@property ASFK_IDENTITY_TYPE sessionId;

Expand All @@ -326,7 +348,6 @@ typedef enum enumASFKPipelineExecutionStatus{
-(void) postDataItemsAsUnorderedSet:(NSSet*)set;
-(void) postDataItemsAsDictionary:(NSDictionary*)dict;
-(void) postDataItem:(id)dataItem;
-(void) addRoutinesFromArray:(NSArray<ASFKExecutableRoutine>*)procs;
-(void) replaceRoutinesWithArray:(NSArray<ASFKExecutableRoutine>*)procs;
-(void) setProgressRoutine:(ASFKProgressRoutine)progress;
-(void) setSummary:(ASFKExecutableRoutineSummary)sum;
Expand All @@ -335,12 +356,10 @@ typedef enum enumASFKPipelineExecutionStatus{
-(void) setCancellationHandler:(ASFKCancellationRoutine)cru;
-(void) setExpirationCondition:(ASFKExpirationCondition*) trop;
-(BOOL) hasSessionSummary;

-(BOOL) isBusy;

-(long) procsCount;
-(long) itemsCount;

-(void) _invokeCancellationHandler:(ASFKCancellationRoutine) cru identity:(id)identity;
@end

#import "ASFKFilter.h"
Expand Down
46 changes: 32 additions & 14 deletions src/ASFKBase.mm
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,26 @@
#include <mach/mach_time.h>
#import "ASFKQueue+Internal.h"

@implementation ASFKExecutionParams{}
@implementation ASFKExecutionParams{

}
-(id) init{
self = [super init];
if(self) {
progressProc=nil;
SummaryRoutine=nil;
summaryRoutine=nil;
procs=nil;
cancellationProc=nil;
expCondition=nil;

onPauseProc=nil;
}
return self;
}
@end

@implementation ASFKThreadpoolSession
@implementation ASFKThreadpoolSession{
std::atomic<BOOL> cancelled;
}
-(id)init{
self=[super init];
if(self){
Expand All @@ -51,31 +55,43 @@ -(id)initWithSessionId:(ASFK_IDENTITY_TYPE)sessionId andSubsessionId:(ASFK_IDENT
}
return self;
}

-(void)_TPSinitWithSession:(ASFK_IDENTITY_TYPE)sessionId andSubsession:(ASFK_IDENTITY_TYPE)subId{
procs=[NSMutableArray array];
excond=[[ASFKExpirationCondition alloc]init];
isStopped=NO;
paused=NO;
onPauseNotification=nil;
if(sessionId){
cblk= [self newSession:sessionId andSubsession:subId];
}else{
cblk= [self newSession];
}

self.sessionId=cblk.sessionId;
passSummary=(id)^(id<ASFKControlCallback> controlBlock,NSDictionary* stats,id data){
ASFKLog(@"ASFKPipelineSession: Stub summary");
return data;
};
expirationSummary=(id)^(id<ASFKControlCallback> controlBlock,NSDictionary* stats,id data){
ASFKLog(@"ASFKPipelineSession: Stub expiration summary");
return data;
};

passSummary=nil;
expirationSummary=nil;
onPauseNotification=nil;
cancelled=NO;
cancellationHandler=^id(id identity){
ASFKLog(@"Default cancellation handler");
return nil;
};

}
-(void) _invokeCancellationHandler:(ASFKCancellationRoutine) cru identity:(id)identity{
BOOL tval=NO;
if(cru==nil){
return;
}

if(cancelled.compare_exchange_strong(tval,YES))
{
DASFKLog(@"Cancellation on the way, session %@",identity);
cru(identity);
}
}

@end

@implementation ASFKGlobalQueue{
Expand Down Expand Up @@ -112,6 +128,7 @@ -(id) submitBlocks:(NSArray<dispatch_block_t>*)blarray summary:(id(^)(void))summ
__block dispatch_queue_t q=[self _resolveQueue:qos];
if(blocking){
if(blarray && [blarray count]>0){
//ASFKLog(@"deploying %lu tasks",(unsigned long)[blarray count]);
dispatch_apply([blarray count], q, ^(size_t index) {
dispatch_block_t b= [blarray objectAtIndex:index];
b();
Expand Down Expand Up @@ -193,7 +210,7 @@ -(NSDictionary*)getStatistics{
-(void)cancelAll{
[lkNonLocal lock];
[ctrlblocks enumerateKeysAndObjectsWithOptions:NSEnumerationConcurrent usingBlock:^(id _Nonnull key, id _Nonnull obj, BOOL * _Nonnull stop) {
ASFKControlBlock* cb = (ASFKControlBlock*)obj;// [ctrlblocks objectForKey:key];
ASFKControlBlock* cb = (ASFKControlBlock*)obj;
if(cb){
[cb cancel];
}else{
Expand Down Expand Up @@ -246,4 +263,5 @@ -(BOOL) isBusySession:(ASFK_IDENTITY_TYPE)sessionId{
return NO;
}


@end
6 changes: 4 additions & 2 deletions src/ASFKControlBlock+Internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

//
// Copyright © 2019-2022 Boris Vigman. All rights reserved.

#import "ASFKBase.h"

@interface ASFKControlBlock (Internal)
//-(void) setStopped:(BOOL)stop;
//-(void) setTotalProcessorsNum:(NSUInteger)procs;
-(void) setResultPosition:(NSUInteger)proc;

//-(NSUInteger) getTotalProcessorsNum;
-(NSUInteger) getResultPosition;
-(void) setProgressRoutine:(ASFKProgressRoutine)progress;
-(void) setSecondaryIndex:(NSUInteger)secind;
Expand Down
2 changes: 1 addition & 1 deletion src/ASFKControlBlock+Internal.mm
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

//
// Copyright © 2019-2022 Boris Vigman. All rights reserved.

#import "ASFKControlBlock+Internal.h"
Expand Down
22 changes: 16 additions & 6 deletions src/ASFKControlBlock.mm
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

//
// Copyright © 2019-2022 Boris Vigman. All rights reserved.
//

Expand All @@ -23,23 +23,26 @@ @implementation ASFKControlBlock{
std::atomic< BOOL> abortByCallback;
std::atomic< BOOL> abortByCaller;
std::atomic< BOOL> abortByInternal;
//NSMutableArray* keys;
std::vector<std::vector<long long>> indexes;
}
-(id)initWithParent:(ASFK_IDENTITY_TYPE)parentId sessionId:(ASFK_IDENTITY_TYPE) sessionId andSubId:(ASFK_IDENTITY_TYPE)subid{
self=[super init];
if(self){
_parentId=[parentId copy];
_sessionId=[ASFKBase concatIdentity:sessionId withIdentity:subid];

//keys=[NSMutableArray array];
//_blkContainer=[[ASFKBlocksContainer alloc]init];
itsLock=[[NSLock alloc]init];

//[lock lock];
abortByCallback=NO;
abortByCaller=NO;
abortByInternal=NO;

//stopped=YES;
flushed=NO;
paused=NO;

//terminated=NO;
//[lock unlock];
}
return self;
}
Expand All @@ -50,6 +53,7 @@ -(void)cancel{
-(void) flushRequested:(BOOL)flush{
flushed=flush;
}

-(BOOL) flushRequested{
return flushed;
}
Expand All @@ -61,11 +65,17 @@ -(BOOL) cancellationRequestedByCallback{
BOOL b=abortByCallback;
return b;
}
-(void) setPaused:(BOOL) yesno{
paused=yesno;
}
-(BOOL) isPaused{
return paused;
}
-(void) reset{
abortByCallback=NO;
abortByCaller=NO;
[itsLock lock];

//[keys removeAllObjects];
indexes.clear();
[itsLock unlock];
}
Expand Down
Loading

0 comments on commit 9ad8dab

Please sign in to comment.