diff --git a/src/ASFKAuthorizationMgr.h b/src/ASFKAuthorizationMgr.h index 4987685..d399c1d 100644 --- a/src/ASFKAuthorizationMgr.h +++ b/src/ASFKAuthorizationMgr.h @@ -18,6 +18,7 @@ #import #import "ASFKMBSecret.h" @interface ASFKAuthorizationMgr : NSObject{ + @public ASFKSecretComparisonProc secretProcConfig; @public ASFKSecretComparisonProc secretProcCreate; @public ASFKSecretComparisonProc secretProcDiscard; @public ASFKSecretComparisonProc secretProcRead; @@ -26,7 +27,8 @@ @public ASFKSecretComparisonProc secretProcUnicast; @public ASFKSecretComparisonProc secretProcMulticast; @public ASFKSecretComparisonProc secretProcHost; - @public ASFKSecretComparisonProc secretProcConfig; + @public ASFKSecretComparisonProc secretProcIssuer; + @public ASFKSecretComparisonProc secretProcModerate; } @property (readonly) ASFKMasterSecret* masterSecret; @@ -48,5 +50,8 @@ -(BOOL) matchHostSecret:(ASFKPrivateSecret*)secCurrent with:(ASFKPrivateSecret*)secOther; -(BOOL) matchSecuritySecret:(ASFKPrivateSecret*)secCurrent with:(ASFKPrivateSecret*)secOther; -(BOOL) matchConfigSecret:(ASFKPrivateSecret*)secCurrent with:(ASFKPrivateSecret*)secOther; +-(BOOL) matchIssuerSecret:(ASFKPrivateSecret*)secCurrent with:(ASFKPrivateSecret*)secOther; +-(BOOL) matchModeratorSecret:(ASFKPrivateSecret*)secCurrent with:(ASFKPrivateSecret*)secOther; + @end diff --git a/src/ASFKAuthorizationMgr.mm b/src/ASFKAuthorizationMgr.mm index c747b44..bf06a2a 100644 --- a/src/ASFKAuthorizationMgr.mm +++ b/src/ASFKAuthorizationMgr.mm @@ -58,7 +58,7 @@ -(id)init{ return YES; } }; - + secretProcRead=^BOOL(ASFKSecret* sec0,ASFKSecret* sec1){ if(sec0 && sec1){ return [sec1 matchesReaderSecret:sec0]; @@ -103,7 +103,6 @@ -(id)init{ return YES; } }; - secretProcConfig=^BOOL(ASFKSecret* sec0,ASFKSecret* sec1){ if(sec0 && sec1){ return [sec1 matchesConfigSecret:sec0]; @@ -127,6 +126,30 @@ -(id)init{ } }; + secretProcIssuer=^BOOL(ASFKSecret* sec0,ASFKSecret* sec1){ + if(sec0 && sec1){ + return [sec1 matchesIssuerSecret:sec0]; + } + else if(sec0 || sec1){ + return NO; + } + else{ + return YES; + } + + }; + secretProcModerate=^BOOL(ASFKSecret* sec0,ASFKSecret* sec1){ + if(sec0 && sec1){ + return [sec1 matchesModeratorSecret:sec0]; + } + else if(sec0 || sec1){ + return NO; + } + else{ + return YES; + } + }; + } return self; @@ -137,16 +160,20 @@ -(BOOL) setMasterSecret:(ASFKMasterSecret*)oldsec newSecret:(ASFKMasterSecret*)n if(newsec!=nil){ //test validity of new secret if([newsec validSecretSecurity]){ + _masterSecret=newsec; masterSecretBack=nil; + ASFKLog(@"DONE"); return YES; } return NO; } else{ + _masterSecret=newsec; masterSecretBack=nil; + return YES; } } @@ -157,17 +184,21 @@ -(BOOL) setMasterSecret:(ASFKMasterSecret*)oldsec newSecret:(ASFKMasterSecret*)n [self isMasterSecretValid:oldsec matcher:secretProcSecurity]){ if(newsec!=nil){ if([newsec validSecretSecurity]){ + [_masterSecret invalidateAll]; _masterSecret=newsec; masterSecretBack=nil; + ASFKLog(@"DONE"); return YES; } return NO; } else{ + [_masterSecret invalidateAll]; _masterSecret=newsec; + ASFKLog(@"DONE"); return YES; } @@ -250,7 +281,6 @@ -(BOOL) matchMulticasterSecret:(ASFKPrivateSecret*)secCurrent with:(ASFKPrivateS } return NO; } - -(BOOL) matchConfigSecret:(ASFKPrivateSecret*)secCurrent with:(ASFKPrivateSecret*)secOther{ if(secCurrent && secOther){ BOOL r1=secretProcConfig(secCurrent,secOther); @@ -260,6 +290,25 @@ -(BOOL) matchConfigSecret:(ASFKPrivateSecret*)secCurrent with:(ASFKPrivateSecret } return NO; } +-(BOOL) matchModeratorSecret:(ASFKPrivateSecret*)secCurrent with:(ASFKPrivateSecret*)secOther{ + if(secCurrent && secOther){ + BOOL r1=secretProcModerate(secCurrent,secOther); + return r1; + }else if(secCurrent==nil && secOther==nil){ + return YES; + } + return NO; +} +-(BOOL) matchIssuerSecret:(ASFKPrivateSecret*)secCurrent with:(ASFKPrivateSecret*)secOther{ + if(secCurrent && secOther){ + BOOL r1=secretProcIssuer(secCurrent,secOther); + return r1; + }else if(secCurrent==nil && secOther==nil){ + return YES; + } + return NO; +} + #pragma mark - Private methods -(BOOL) isMasterSecretValid:(ASFKMasterSecret*)msecret matcher:(ASFKSecretComparisonProc)match{ if(_masterSecret && msecret){ diff --git a/src/ASFKBase+Internal.h b/src/ASFKBase+Internal.h index ff8690b..f78f9e0 100644 --- a/src/ASFKBase+Internal.h +++ b/src/ASFKBase+Internal.h @@ -12,7 +12,7 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ - +// // Copyright © 2019-2022 Boris Vigman. All rights reserved. // @@ -25,7 +25,6 @@ +(ASFK_IDENTITY_TYPE) generateIdentity; +(NSString*) generateRandomString; +(NSNumber*) generateRandomNumber; - -(BOOL) isCancellationRequested; -(void) registerSession:(ASFKControlBlock*)cblk; -(ASFKControlBlock*) newSession; diff --git a/src/ASFKBase+Internal.mm b/src/ASFKBase+Internal.mm index 3252f56..114b5bc 100644 --- a/src/ASFKBase+Internal.mm +++ b/src/ASFKBase+Internal.mm @@ -12,7 +12,6 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ - // Copyright © 2019-2022 Boris Vigman. All rights reserved. // @@ -54,7 +53,7 @@ +(NSNumber*) generateRandomNumber{ } -(BOOL) isCancellationRequested{ - + return NO; } -(ASFKControlBlock*) refreshCancellationData{ diff --git a/src/ASFKBase.h b/src/ASFKBase.h index 97a6602..4eb084c 100644 --- a/src/ASFKBase.h +++ b/src/ASFKBase.h @@ -12,14 +12,13 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ - // Copyright © 2019-2022 Boris Vigman. All rights reserved. // #import #import "ASFKPrjConfig.h" -#define ASFK_VERSION @"0.2.1" +#define ASFK_VERSION @"0.2.2" #define ASFK_IDENTITY_TYPE id #ifdef __ASFK_VERBOSE_PRINTING__ @@ -64,7 +63,6 @@ #define kASFKReturnResult @"asfk_ret_result" #define ASFK_RET_SUMRESULT @"asfk_ret_sumresult" - #define ASFK_RET_NEXT_TARGET @"asfk_ret_nextTarget" #define kASFKReturnSessionId @"asfk_ret_sessionId" #define kASFKReturnDescription @"asfk_ret_description" @@ -72,6 +70,7 @@ #define kASFKReturnStatsTimeProcsElapsedSec @"asfk_ret_stats_procs_tesec" #define kASFKReturnStatsTimeSessionElapsedSec @"asfk_ret_stats_session_tesec" + #define kASFKProgressRoutine @"progress_proc" #define kASFKCancelRoutine @"cancel_proc" #define kASFKSummaryRoutine @"summary_proc" @@ -151,11 +150,13 @@ typedef id ( ^ASFKProgressRoutine)(NSUInteger stage,NSUInteger accomplished ,NSU @end @interface ASFKControlBlock : NSObject{ + @protected std::atomic itsResPosition; @protected NSUInteger totalProcessors; @protected NSLock* itsLock;; @protected ASFKProgressRoutine itsProgressProc; @protected std::atomic indexSecondary; + @public std::atomic< BOOL> flushed; @public std::atomic< BOOL> paused; } @@ -175,7 +176,6 @@ typedef id ( ^ASFKExecutableRoutine)(id controlBlock, id da typedef id ( ^ASFKExecutableRoutineSummary)(id controlBlock,NSDictionary* stats,id data); typedef id ( ^ASFKCancellationRoutine)(id identity); - /** @param controlBlock object controlling the execution @param index positive number of current iteration @@ -228,7 +228,6 @@ typedef BOOL ( ^ASFKExecutableRoutineLoopConditional)(id c @protected double totalSessionsTime; @protected ASFKProgressRoutine progressProc; @protected NSMutableDictionary* priv_statistics; - @protected dispatch_semaphore_t semHighLevelCall; } @property (readonly) NSString* itsName; @property (readonly) double totalTimeSeconds; @@ -276,6 +275,7 @@ typedef BOOL ( ^ASFKExecutableRoutineLoopConditional)(id c } @end +#import "ASFKNonlinearFlow.h" #import "ASFKLinearFlow.h" @interface ASFKQueue : ASFKLinearFlow{ @protected NSLock* lock; @@ -298,6 +298,51 @@ typedef BOOL ( ^ASFKExecutableRoutineLoopConditional)(id c -(void) unoccupy; @end +typedef enum enumASFKPipelineExecutionStatus{ + eASFK_ES_HAS_MORE=0, + eASFK_ES_HAS_NONE, + eASFK_ES_WAS_CANCELLED, + eASFK_ES_SKIPPED_MAINT +} eASFKThreadpoolExecutionStatus; + +@interface ASFKThreadpoolSession : ASFKBase{ + @public ASFKControlBlock* cblk; + @protected ASFKExecutableRoutineSummary passSummary; + @protected ASFKExecutableRoutineSummary expirationSummary; + @protected ASFKCancellationRoutine cancellationHandler; + @protected NSMutableArray* procs; + @protected ASFKExpirationCondition* excond; + @public std::atomic isStopped; + @public std::atomic paused; +} +@property ASFK_IDENTITY_TYPE sessionId; + +-(ASFKControlBlock*) getControlBlock; +-(id)initWithSessionId:(ASFK_IDENTITY_TYPE)sessionId andSubsessionId:(ASFK_IDENTITY_TYPE)subId; +-(void) flush; +-(void) cancel; +-(void) postDataItemsAsArray:(NSArray*)array; +-(void) postDataItemsAsOrderedSet:(NSOrderedSet*)set; +-(void) postDataItemsAsUnorderedSet:(NSSet*)set; +-(void) postDataItemsAsDictionary:(NSDictionary*)dict; +-(void) postDataItem:(id)dataItem; +-(void) addRoutinesFromArray:(NSArray*)procs; +-(void) replaceRoutinesWithArray:(NSArray*)procs; +-(void) setProgressRoutine:(ASFKProgressRoutine)progress; +-(void) setSummary:(ASFKExecutableRoutineSummary)sum; +-(void) setExpirationSummary:(ASFKExecutableRoutineSummary)sum; +-(eASFKThreadpoolExecutionStatus) select:(long)selector routineCancel:(ASFKCancellationRoutine)cancel; +-(void) setCancellationHandler:(ASFKCancellationRoutine)cru; +-(void) setExpirationCondition:(ASFKExpirationCondition*) trop; +-(BOOL) hasSessionSummary; + +-(BOOL) isBusy; + +-(long) procsCount; +-(long) itemsCount; + +@end + #import "ASFKFilter.h" #import "ASFKFilteringQueue.h" #import "ASFKMailbox.h" @@ -305,4 +350,3 @@ typedef BOOL ( ^ASFKExecutableRoutineLoopConditional)(id c - diff --git a/src/ASFKBase.mm b/src/ASFKBase.mm index f25cf86..0e763b6 100644 --- a/src/ASFKBase.mm +++ b/src/ASFKBase.mm @@ -12,7 +12,6 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ - // Copyright © 2019-2022 Boris Vigman. All rights reserved. // @@ -22,9 +21,7 @@ #include #import "ASFKQueue+Internal.h" -@implementation ASFKExecutionParams{ - -} +@implementation ASFKExecutionParams{} -(id) init{ self = [super init]; if(self) { @@ -33,11 +30,53 @@ -(id) init{ procs=nil; cancellationProc=nil; expCondition=nil; + } return self; } @end +@implementation ASFKThreadpoolSession +-(id)init{ + self=[super init]; + if(self){ + [self _TPSinitWithSession:nil andSubsession:nil]; + } + return self; +} +-(id)initWithSessionId:(ASFK_IDENTITY_TYPE)sessionId andSubsessionId:(ASFK_IDENTITY_TYPE)subId{ + self=[super init]; + if(self){ + [self _TPSinitWithSession:sessionId andSubsession:subId]; + } + return self; +} +-(void)_TPSinitWithSession:(ASFK_IDENTITY_TYPE)sessionId andSubsession:(ASFK_IDENTITY_TYPE)subId{ + procs=[NSMutableArray array]; + excond=[[ASFKExpirationCondition alloc]init]; + isStopped=NO; + paused=NO; + if(sessionId){ + cblk= [self newSession:sessionId andSubsession:subId]; + }else{ + cblk= [self newSession]; + } + + self.sessionId=cblk.sessionId; + passSummary=(id)^(id controlBlock,NSDictionary* stats,id data){ + ASFKLog(@"ASFKPipelineSession: Stub summary"); + return data; + }; + expirationSummary=(id)^(id controlBlock,NSDictionary* stats,id data){ + ASFKLog(@"ASFKPipelineSession: Stub expiration summary"); + return data; + }; + cancellationHandler=^id(id identity){ + return nil; + }; +} + +@end @implementation ASFKGlobalQueue{ NSMutableArray* mQ; @@ -59,8 +98,6 @@ + (ASFKGlobalQueue *)sharedManager { - (id)init { self = [super init]; if (self) { - NSUInteger pr=[[NSProcessInfo processInfo] activeProcessorCount]*ASFK_TP_LOAD_FACTOR; - ASFKLog(@"ASFK: Active processors: %lu detected",(unsigned long)pr); lock=[NSLock new]; dConQ_UserInteractive=dispatch_get_global_queue( QOS_CLASS_USER_INTERACTIVE, 0); @@ -143,7 +180,7 @@ -(void) _Baseinit:(NSString*)name{ _sources=_priv_sources; _targets=_priv_targets; lkNonLocal=[NSLock new]; - semHighLevelCall=dispatch_semaphore_create(1); + ctrlblocks=[NSMutableDictionary new]; } -(NSDictionary*)getStatistics{ @@ -156,7 +193,7 @@ -(NSDictionary*)getStatistics{ -(void)cancelAll{ [lkNonLocal lock]; [ctrlblocks enumerateKeysAndObjectsWithOptions:NSEnumerationConcurrent usingBlock:^(id _Nonnull key, id _Nonnull obj, BOOL * _Nonnull stop) { - ASFKControlBlock* cb = (ASFKControlBlock*)obj; + ASFKControlBlock* cb = (ASFKControlBlock*)obj;// [ctrlblocks objectForKey:key]; if(cb){ [cb cancel]; }else{ diff --git a/src/ASFKControlBlock+Internal.h b/src/ASFKControlBlock+Internal.h index 2d4ddc6..d0c9743 100644 --- a/src/ASFKControlBlock+Internal.h +++ b/src/ASFKControlBlock+Internal.h @@ -18,7 +18,6 @@ #import "ASFKBase.h" @interface ASFKControlBlock (Internal) - -(void) setResultPosition:(NSUInteger)proc; -(NSUInteger) getResultPosition; diff --git a/src/ASFKControlBlock.mm b/src/ASFKControlBlock.mm index 5bfe952..c05859d 100644 --- a/src/ASFKControlBlock.mm +++ b/src/ASFKControlBlock.mm @@ -23,7 +23,6 @@ @implementation ASFKControlBlock{ std::atomic< BOOL> abortByCallback; std::atomic< BOOL> abortByCaller; std::atomic< BOOL> abortByInternal; - std::vector> indexes; } -(id)initWithParent:(ASFK_IDENTITY_TYPE)parentId sessionId:(ASFK_IDENTITY_TYPE) sessionId andSubId:(ASFK_IDENTITY_TYPE)subid{ @@ -66,6 +65,7 @@ -(void) reset{ abortByCallback=NO; abortByCaller=NO; [itsLock lock]; + indexes.clear(); [itsLock unlock]; } diff --git a/src/ASFKExpirationCondition.h b/src/ASFKExpirationCondition.h index ff3c9d5..3b32236 100644 --- a/src/ASFKExpirationCondition.h +++ b/src/ASFKExpirationCondition.h @@ -71,6 +71,7 @@ */ -(ASFKConditionTemporal*) chooseEarliest:(ASFKConditionTemporal*)cond; + @end #pragma mark - Expiration conditions @interface ASFKExpirationCondition : ASFKCondition diff --git a/src/ASFKExpirationCondition.mm b/src/ASFKExpirationCondition.mm index 4581d7d..507956d 100644 --- a/src/ASFKExpirationCondition.mm +++ b/src/ASFKExpirationCondition.mm @@ -60,9 +60,8 @@ @implementation ASFKConditionTemporal{ -(void) _setDeadline:(NSDate*)aDate{ if(aDate){ - _itsDelay=-1; + _itsDelay=-1;//dd0/double(1e6); _itsDeadline=aDate; - }else{ _itsDelay=-1; _itsDeadline=nil; @@ -71,7 +70,6 @@ -(void) _setDeadline:(NSDate*)aDate{ -(void) _setDelay:(NSTimeInterval)seconds{ if(seconds>0){ using namespace std::chrono; - _itsDelay=seconds; diff --git a/src/ASFKFilter.h b/src/ASFKFilter.h index 5a74f78..5273fb0 100644 --- a/src/ASFKFilter.h +++ b/src/ASFKFilter.h @@ -12,6 +12,7 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ +// Created by Boris Vigman on 15/02/2019. // Copyright © 2019-2022 Boris Vigman. All rights reserved. // diff --git a/src/ASFKFilter.mm b/src/ASFKFilter.mm index a82a254..d3cd0d9 100644 --- a/src/ASFKFilter.mm +++ b/src/ASFKFilter.mm @@ -12,6 +12,7 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ +// Created by Boris Vigman on 15/02/2019. // Copyright © 2019-2022 Boris Vigman. All rights reserved. // diff --git a/src/ASFKFilteringQueue.h b/src/ASFKFilteringQueue.h index 17f3be1..9287e2e 100644 --- a/src/ASFKFilteringQueue.h +++ b/src/ASFKFilteringQueue.h @@ -24,7 +24,5 @@ typedef NSIndexSet* (^clbkASFKFQFilter)(NSArray* collection, NSRange range); -(void) setDroppingPolicy:(eASFKQDroppingPolicy)policy; -(void) setDroppingAlgorithmL1:(ASFKFilter*)dropAlg; -(void) filterWith:(ASFKFilter*)filter; -//-(BOOL) prependWithFilter:(ASFKFilter*)filter; -//-(BOOL) appendWithFilter:(ASFKFilter*)filter; -(BOOL) removeObjWithId:(id)obj andBlock:(BOOL (^)(id item,id sample, BOOL* stop)) blk; @end diff --git a/src/ASFKFilteringQueue.mm b/src/ASFKFilteringQueue.mm index e8a0305..bc76b65 100644 --- a/src/ASFKFilteringQueue.mm +++ b/src/ASFKFilteringQueue.mm @@ -23,7 +23,6 @@ @implementation ASFKFilteringQueue{ std::atomic maxQSize; std::atomic minQSize; ASFKFilter* itsFilter; - } -(id)init{ self=[super init]; @@ -81,11 +80,11 @@ -(BOOL) removeObjWithId:(id)obj andBlock:(BOOL (^)(id item,id sample, BOOL* stop [lkNonLocal unlock]; [mis removeAllIndexes]; - } return r; } + -(BOOL)push:(id)item{ BOOL res=NO; if(item){ diff --git a/src/ASFKGlobalThreadpool.h b/src/ASFKGlobalThreadpool.h index 4cee45c..658e15b 100644 --- a/src/ASFKGlobalThreadpool.h +++ b/src/ASFKGlobalThreadpool.h @@ -18,7 +18,7 @@ #import #import "ASFKBase.h" -#import "ASFKPipelineSession.h" + @interface ASFKGlobalThreadpool : NSObject +(ASFKGlobalThreadpool *)sharedManager ; @@ -28,16 +28,21 @@ -(void) postDataAsOrderedSet:(NSOrderedSet*)set forSession:(ASFK_IDENTITY_TYPE)sessionId; -(void) postDataAsUnorderedSet:(NSSet*)data forSession:(ASFK_IDENTITY_TYPE)sessionId; -(void) postDataAsDictionary:(NSDictionary*)data forSession:(ASFK_IDENTITY_TYPE)sessionId; --(BOOL) addSession:(ASFKPipelineSession*)aseq withId:(ASFK_IDENTITY_TYPE)identity; +-(BOOL) addSession:(ASFKThreadpoolSession*)aseq withId:(ASFK_IDENTITY_TYPE)identity; --(ASFKPipelineSession*) getThreadpoolSessionWithId:(ASFK_IDENTITY_TYPE)identity; +-(ASFKThreadpoolSession*) getThreadpoolSessionWithId:(ASFK_IDENTITY_TYPE)identity; -(NSArray*) getThreadpoolSessionsList; -(void) cancelSession:(ASFK_IDENTITY_TYPE)sessionId; -(void) cancelAll; -(BOOL) isBusySession:(ASFK_IDENTITY_TYPE)sessionId; +-(BOOL) isPausedSession:(ASFK_IDENTITY_TYPE)sessionId; + -(void) flushSession:(ASFK_IDENTITY_TYPE)sessionId; -(void) flushAll; - +-(void) pauseSession:(ASFK_IDENTITY_TYPE)sessionId; +-(void) pauseAll; +-(void) resumeSession:(ASFK_IDENTITY_TYPE)sessionId; +-(void) resumeAll; -(long) itemsCountForSession:(ASFK_IDENTITY_TYPE)sessionId; @end diff --git a/src/ASFKGlobalThreadpool.mm b/src/ASFKGlobalThreadpool.mm index 0e37bc9..55d48fd 100644 --- a/src/ASFKGlobalThreadpool.mm +++ b/src/ASFKGlobalThreadpool.mm @@ -45,6 +45,7 @@ @implementation ASFKGlobalThreadpool{ NSLock* lkMutexL1; NSLock* lkMutexL2; + NSCondition* lkCond; std::atomic shouldSleep; std::atomic busyCount; @@ -76,17 +77,16 @@ - (id)init { shouldSleep=YES; busyCount=0; qos=ASFK_PRIVSYM_QOS_CLASS; - NSUInteger pr=[[NSProcessInfo processInfo] activeProcessorCount]*ASFK_TP_LOAD_FACTOR; + NSUInteger pr=[[NSProcessInfo processInfo] activeProcessorCount]*ASFK_PRIVSYM_TP_LOAD_FACTOR; if(self.threadsLimit<1||self.threadsLimit>pr ){ ASFKLog(@"ASFKGlobalThreadpool: Requested number of threads is unavailable"); self.threadsLimit=pr; } if(pr>1){ - self.threadsLimit=pr*ASFK_PRIVSYM_LOAD_FACTOR; + self.threadsLimit=pr; } if(self.threadsLimit<=1) self.threadsLimit=1; - ASFKLog(@"ASFKGlobalThreadpool: Threads used: %lu",(unsigned long)self.threadsLimit); tpcfg.actualThreadsCount=self.threadsLimit; tpcfg.requiredThreadsCount=self.threadsLimit; @@ -108,12 +108,15 @@ -(long long)pausedSessionsCount{ [lkMutexL1 unlock]; return (ac); } + -(BOOL) isPausedSession:(ASFK_IDENTITY_TYPE)sessionId{ BOOL result=NO; if(sessionId){ [lkMutexL1 lock]; - ASFKPipelineSession* ss=[allSessions objectForKey:sessionId]; - result = ss->paused; + ASFKThreadpoolSession* ss=[allSessions objectForKey:sessionId]; + if(ss){ + result = ss->paused; + } [lkMutexL1 unlock]; } return result; @@ -122,8 +125,10 @@ -(BOOL) isBusySession:(ASFK_IDENTITY_TYPE)sessionId{ BOOL result=NO; if(sessionId){ [lkMutexL1 lock]; - ASFKPipelineSession* ss=[allSessions objectForKey:sessionId]; - result = [ss isBusy]; + ASFKThreadpoolSession* ss=[allSessions objectForKey:sessionId]; + if(ss){ + result = [ss isBusy]; + } [lkMutexL1 unlock]; } return result; @@ -139,7 +144,7 @@ -(long) itemsCountForSession:(ASFK_IDENTITY_TYPE)sessionId{ long result=0; if(sessionId){ [lkMutexL1 lock]; - ASFKPipelineSession* ss=[allSessions objectForKey:sessionId]; + ASFKThreadpoolSession* ss=[allSessions objectForKey:sessionId]; if(ss){ result= [ss itemsCount]; } @@ -152,7 +157,7 @@ -(void) flushSession:(ASFK_IDENTITY_TYPE)sessionId{ ASFKLog(@"ASFKGlobalThreadpool: Flushing session with ID %@",sessionId); if(sessionId){ [lkMutexL1 lock]; - ASFKPipelineSession* ss=[allSessions objectForKey:sessionId]; + ASFKThreadpoolSession* ss=[allSessions objectForKey:sessionId]; if(ss){ [ss flush]; } @@ -165,24 +170,28 @@ -(void) flushAll{ ASFKLog(@"ASFKGlobalThreadpool: Flushing all sessions"); [lkMutexL1 lock]; [allSessions enumerateKeysAndObjectsWithOptions:NSEnumerationConcurrent usingBlock:^(id _Nonnull key, id _Nonnull obj, BOOL * _Nonnull stop) { - ASFKPipelineSession* ss=[allSessions objectForKey:obj]; + ASFKThreadpoolSession* ss=[allSessions objectForKey:obj]; [ss flush]; ss=nil; }]; + + [lkMutexL1 unlock]; } -(void) cancelSession:(ASFK_IDENTITY_TYPE)sessionId{ DASFKLog(@"ASFKGlobalThreadpool: Cancelling session with ID %@",sessionId); if(sessionId){ [lkMutexL1 lock]; - ASFKPipelineSession* ss=[allSessions objectForKey:sessionId]; + ASFKThreadpoolSession* ss=[allSessions objectForKey:sessionId]; if(ss){ [ss cancel]; [runningSessions removeObjectForKey:sessionId]; [lkMutexL2 lock]; + onlineSessions=[runningSessions allValues]; [lkMutexL2 unlock]; + [allSessions removeObjectForKey:sessionId]; [pausedSessions removeObjectForKey:sessionId]; ss=nil; @@ -194,8 +203,9 @@ -(void) cancelSession:(ASFK_IDENTITY_TYPE)sessionId{ -(void) cancelAll{ DASFKLog(@"ASFKGlobalThreadpool: Cancelling all sessions"); [lkMutexL1 lock]; + [allSessions enumerateKeysAndObjectsWithOptions:NSEnumerationConcurrent usingBlock:^(id _Nonnull key, id _Nonnull obj, BOOL * _Nonnull stop) { - ASFKPipelineSession* ss=[allSessions objectForKey:obj]; + ASFKThreadpoolSession* ss=[allSessions objectForKey:obj]; [ss cancel]; ss=nil; @@ -203,10 +213,11 @@ -(void) cancelAll{ [lkMutexL2 lock]; tpcfg.actualThreadsCount=0; ThreadpoolConfig tpc=tpcfg; + [self _reassignProcs:tpc]; vectProc2Bounds.clear(); vectProc2Bounds.resize(tpcfg.actualThreadsCount); onlineSessions = [NSArray new]; - [self _reassignProcs:tpc]; + [lkMutexL2 unlock]; runningSessions = [NSMutableDictionary new]; @@ -215,10 +226,82 @@ -(void) cancelAll{ [lkMutexL1 unlock]; DASFKLog(@"ASFKGlobalThreadpool: All sessions should be cancelled"); } +-(void) pauseSession:(ASFK_IDENTITY_TYPE)sessionId{ + DASFKLog(@"ASFKGlobalThreadpool: Pausing session with ID %@",sessionId); + if(sessionId){ + [lkMutexL1 lock]; + ASFKThreadpoolSession* ss=[runningSessions objectForKey:sessionId]; + if(ss){ + ss->paused=YES; + [pausedSessions setObject:ss forKey:sessionId]; + [runningSessions removeObjectForKey:sessionId]; + [lkMutexL2 lock]; + onlineSessions=[runningSessions allValues]; + [lkMutexL2 unlock]; + ss=nil; + + } + [lkMutexL1 unlock]; + } + DASFKLog(@"ASFKGlobalThreadpool: Session %@ paused",sessionId); +} +-(void) pauseAll{ + DASFKLog(@"ASFKGlobalThreadpool: Pausing all sessions"); + [lkMutexL1 lock]; + [pausedSessions addEntriesFromDictionary:runningSessions]; + [runningSessions enumerateKeysAndObjectsWithOptions:NSEnumerationConcurrent usingBlock:^(id _Nonnull key, id _Nonnull obj, BOOL * _Nonnull stop){ + ASFKThreadpoolSession* ss=(ASFKThreadpoolSession*)obj; + ss->paused=YES; + }]; + runningSessions=[NSMutableDictionary new]; + [lkMutexL2 lock]; + onlineSessions = [NSArray new]; + [lkMutexL2 unlock]; + [lkMutexL1 unlock]; + DASFKLog(@"ASFKGlobalThreadpool: All sessions paused"); +} +-(void) resumeSession:(ASFK_IDENTITY_TYPE)sessionId{ + DASFKLog(@"ASFKGlobalThreadpool: Resuming session with ID %@",sessionId); + if(sessionId){ + [lkMutexL1 lock]; + ASFKThreadpoolSession* ss=[pausedSessions objectForKey:sessionId]; + if(ss){ + ss->paused=NO; + [pausedSessions removeObjectForKey:sessionId]; + [runningSessions setObject:ss forKey:sessionId]; + [lkMutexL2 lock]; + onlineSessions = [runningSessions allValues]; + [lkMutexL2 unlock]; + } + [lkMutexL1 unlock]; + ss=nil; + } + DASFKLog(@"ASFKGlobalThreadpool: Session %@ resumed",sessionId); +} +-(void) resumeAll{ + DASFKLog(@"ASFKGlobalThreadpool: Resuming all sessions"); + [lkMutexL1 lock]; + + [runningSessions addEntriesFromDictionary:pausedSessions]; + [pausedSessions enumerateKeysAndObjectsWithOptions:NSEnumerationConcurrent usingBlock:^(id _Nonnull key, id _Nonnull obj, BOOL * _Nonnull stop){ + ASFKThreadpoolSession* ss=(ASFKThreadpoolSession*)obj; + if(ss){ + ss->paused=NO;; + + } + }]; + + pausedSessions=[NSMutableDictionary new]; + [lkMutexL2 lock]; + onlineSessions=[runningSessions allValues]; + [lkMutexL2 unlock]; + [lkMutexL1 unlock]; + DASFKLog(@"ASFKGlobalThreadpool: All sessions resumed"); +} -(void) postDataAsDictionary:(NSDictionary*)data forSession:(ASFK_IDENTITY_TYPE)sessionId{ [lkMutexL1 lock]; - ASFKPipelineSession* ss=[allSessions objectForKey:sessionId]; + ASFKThreadpoolSession* ss=[allSessions objectForKey:sessionId]; [lkMutexL1 unlock]; if(ss){ [ss postDataItemsAsDictionary:data]; @@ -229,7 +312,7 @@ -(void) postDataAsDictionary:(NSDictionary*)data forSession:(ASFK_IDENTITY_TYPE) } -(void) postDataAsArray:(NSArray*)data forSession:(ASFK_IDENTITY_TYPE)sessionId{ [lkMutexL1 lock]; - ASFKPipelineSession* ss=[allSessions objectForKey:sessionId]; + ASFKThreadpoolSession* ss=[allSessions objectForKey:sessionId]; [lkMutexL1 unlock]; if(ss){ [ss postDataItemsAsArray:data]; @@ -240,7 +323,7 @@ -(void) postDataAsArray:(NSArray*)data forSession:(ASFK_IDENTITY_TYPE)sessionId{ } -(void) postDataAsOrderedSet:(NSOrderedSet*)data forSession:(ASFK_IDENTITY_TYPE)sessionId{ [lkMutexL1 lock]; - ASFKPipelineSession* ss=[allSessions objectForKey:sessionId]; + ASFKThreadpoolSession* ss=[allSessions objectForKey:sessionId]; [lkMutexL1 unlock]; if(ss){ [ss postDataItemsAsOrderedSet:data]; @@ -251,7 +334,7 @@ -(void) postDataAsOrderedSet:(NSOrderedSet*)data forSession:(ASFK_IDENTITY_TYPE) } -(void) postDataAsUnorderedSet:(NSSet*)data forSession:(ASFK_IDENTITY_TYPE)sessionId{ [lkMutexL1 lock]; - ASFKPipelineSession* ss=[allSessions objectForKey:sessionId]; + ASFKThreadpoolSession* ss=[allSessions objectForKey:sessionId]; [lkMutexL1 unlock]; if(ss){ [ss postDataItemsAsUnorderedSet:data]; @@ -261,7 +344,7 @@ -(void) postDataAsUnorderedSet:(NSSet*)data forSession:(ASFK_IDENTITY_TYPE)sessi } } --(BOOL) addSession:(ASFKPipelineSession*)aseq withId:(ASFK_IDENTITY_TYPE)identity{ +-(BOOL) addSession:(ASFKThreadpoolSession*)aseq withId:(ASFK_IDENTITY_TYPE)identity{ BOOL res=NO; [lkMutexL1 lock]; if([allSessions objectForKey:identity]==nil){ @@ -276,9 +359,9 @@ -(BOOL) addSession:(ASFKPipelineSession*)aseq withId:(ASFK_IDENTITY_TYPE)identit [lkMutexL1 unlock]; return res; } --(ASFKPipelineSession*) getThreadpoolSessionWithId:(ASFK_IDENTITY_TYPE)identity{ +-(ASFKThreadpoolSession*) getThreadpoolSessionWithId:(ASFK_IDENTITY_TYPE)identity{ [lkMutexL1 lock]; - ASFKPipelineSession* ss=[runningSessions objectForKey:identity]; + ASFKThreadpoolSession* ss=[runningSessions objectForKey:identity]; [lkMutexL1 unlock]; return ss; } @@ -354,8 +437,11 @@ -(void) _engineDeploy{ ///-----Housekeeping----- /// - vectProc2Bounds.clear(); - vectProc2Bounds.resize(tpcfg.actualThreadsCount); + if(vectProc2Bounds.size() != tpcfg.actualThreadsCount){ + vectProc2Bounds.clear(); + vectProc2Bounds.resize(tpcfg.actualThreadsCount); + } + [self _reassignProcs:tpc]; tcr=vectProc2Bounds[ii]; if(tcr.length==0 || @@ -375,12 +461,14 @@ -(void) _engineDeploy{ continue; } - __block ASFKPipelineSession* ss=[onlineSessions objectAtIndex:selectedSlot]; + __block ASFKThreadpoolSession* ss=[onlineSessions objectAtIndex:selectedSlot]; if(ss && [ss->cblk cancellationRequested]){ ThreadpoolConfig tpc1=tpcfg; + //[lkMutexL2 lock]; onlineSessions=[runningSessions allValues]; [self _reassignProcs:tpc1]; [lkMutexL2 unlock]; + //[lkMutexL1 unlock]; continue; } @@ -393,7 +481,7 @@ -(void) _engineDeploy{ onlineSessions=[runningSessions allValues]; [self _reassignProcs:tpc1]; [lkMutexL2 unlock]; - + //[lkMutexL1 unlock]; return nil; }]; } diff --git a/src/ASFKLinearFlow+Internal.h b/src/ASFKLinearFlow+Internal.h index 07f2478..d72906c 100644 --- a/src/ASFKLinearFlow+Internal.h +++ b/src/ASFKLinearFlow+Internal.h @@ -12,7 +12,7 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ - +// Created by Boris Vigman on 15/02/2019. // Copyright © 2019-2022 Boris Vigman. All rights reserved. // @@ -27,7 +27,7 @@ @property (nonatomic) ASFKExpirationCondition* excond; @property (nonatomic) id input; @property (nonatomic) ASFK_IDENTITY_TYPE sessionId; -@property (nonatomic) BOOL hasForeignProcs; + @end @interface ASFKLinearFlow (Internal) diff --git a/src/ASFKLinearFlow+Internal.mm b/src/ASFKLinearFlow+Internal.mm index ef9499d..8d68a3a 100644 --- a/src/ASFKLinearFlow+Internal.mm +++ b/src/ASFKLinearFlow+Internal.mm @@ -61,6 +61,7 @@ -(ASFKParamSet*) _convertInput:(id) input to:(ASFKParamSet*)ps{ } return ps; } + @end @implementation ASFKParamSet @@ -72,8 +73,6 @@ -(id) init{ self.cancProc=nil; self.input=nil; self.excond=nil; - self.hasForeignProcs=NO; - //self.customSessionId=nil; self.sessionId=nil; } return self; diff --git a/src/ASFKLinearFlow.h b/src/ASFKLinearFlow.h index 5adbd73..ed203df 100644 --- a/src/ASFKLinearFlow.h +++ b/src/ASFKLinearFlow.h @@ -12,7 +12,6 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ - // Copyright © 2019-2022 Boris Vigman. All rights reserved. // #ifndef ASFKLinearFlow_h @@ -91,11 +90,11 @@ @end @interface ASFKLinearFlow : ASFKBase{ -@protected - NSMutableArray * _backprocs; - NSArray *lfProcs; - ASFKCancellationRoutine cancelproc; - ASFKExecutableRoutineSummary sumproc; + @protected NSMutableArray * _backprocs; + @protected NSArray *lfProcs; + @protected ASFKCancellationRoutine cancelproc; + @protected ASFKExecutableRoutineSummary sumproc; + @protected dispatch_semaphore_t semHighLevelCall; } -(NSArray *) getRoutines; -(NSUInteger) getRoutinesCount; diff --git a/src/ASFKLinearFlow.mm b/src/ASFKLinearFlow.mm index 063e197..2001e92 100644 --- a/src/ASFKLinearFlow.mm +++ b/src/ASFKLinearFlow.mm @@ -51,6 +51,7 @@ -(void)_initLF{ }; cancelproc=nil; progressProc=nil; + semHighLevelCall=dispatch_semaphore_create(1); } -(NSUInteger) getRoutinesCount{ [lkNonLocal lock]; @@ -191,9 +192,9 @@ -(BOOL) setCancellationHandler:(ASFKCancellationRoutine)ch{ #pragma mark - Non-blocking methods -(NSDictionary*) castOrderedSet:(NSOrderedSet*)set session:(id)sessionId exParam:(ASFKExecutionParams*)ex{ dispatch_semaphore_wait(semHighLevelCall, DISPATCH_TIME_FOREVER); - ASFKParamSet* params=[self _decodeExParams:ex forSession:nil]; - params=[self _convertInputOrderedSet:set to:params]; + ASFKParamSet* params=[ASFKParamSet new]; params.sessionId=sessionId; + params=[self _convertInputOrderedSet:set to:params]; dispatch_semaphore_signal(semHighLevelCall); NSDictionary* res= [self _castOrderedSet:params]; @@ -201,9 +202,9 @@ -(NSDictionary*) castOrderedSet:(NSOrderedSet*)set session:(id)sessionId exParam } -(NSDictionary*) castUnorderedSet:(NSSet*)set session:(id)sessionId exParam:(ASFKExecutionParams*)ex{ dispatch_semaphore_wait(semHighLevelCall, DISPATCH_TIME_FOREVER); - ASFKParamSet* params=[self _decodeExParams:ex forSession:nil ]; - params=[self _convertInputUnorderedSet:set to:params]; + ASFKParamSet* params=[ASFKParamSet new]; params.sessionId=sessionId; + params=[self _convertInputUnorderedSet:set to:params]; dispatch_semaphore_signal(semHighLevelCall); NSDictionary* res= [self _castUnorderedSet:params]; @@ -211,10 +212,10 @@ -(NSDictionary*) castUnorderedSet:(NSSet*)set session:(id)sessionId exParam:(ASF } -(NSDictionary*) castArray:(NSArray*)array session:(id)sessionId exParam:(ASFKExecutionParams*)ex{ dispatch_semaphore_wait(semHighLevelCall, DISPATCH_TIME_FOREVER); - ASFKParamSet* params=[self _decodeExParams:ex forSession:nil]; + ASFKParamSet* params=[ASFKParamSet new]; + params.sessionId=sessionId; params=[self _convertInputArray:array to:params]; dispatch_semaphore_signal(semHighLevelCall); - params.sessionId=sessionId; NSDictionary* res= [self _castArray:params]; @@ -223,10 +224,10 @@ -(NSDictionary*) castArray:(NSArray*)array session:(id)sessionId exParam:(ASFKEx -(NSDictionary*) castDictionary:(NSDictionary*)dictionary session:(id)sessionId exParam:(ASFKExecutionParams*)ex{ dispatch_semaphore_wait(semHighLevelCall, DISPATCH_TIME_FOREVER); - ASFKParamSet* params=[self _decodeExParams:ex forSession:nil]; + ASFKParamSet* params=[ASFKParamSet new]; + params.sessionId=sessionId; params=[self _convertInputDictionary:dictionary to:params]; dispatch_semaphore_signal(semHighLevelCall); - params.sessionId=sessionId; NSDictionary* res= [self _castDictionary:params]; return res; @@ -234,9 +235,9 @@ -(NSDictionary*) castDictionary:(NSDictionary*)dictionary session:(id)sessionId -(NSDictionary*) castObject:(id)uns session:(id)sessionId exParam:(ASFKExecutionParams*)ex{ dispatch_semaphore_wait(semHighLevelCall, DISPATCH_TIME_FOREVER); - ASFKParamSet* params=[self _decodeExParams:ex forSession:nil]; - params=[self _convertInput:uns to:params]; + ASFKParamSet* params=[ASFKParamSet new]; params.sessionId=sessionId; + params=[self _convertInput:uns to:params]; dispatch_semaphore_signal(semHighLevelCall); NSDictionary* res= [self _castArray:params]; return res; @@ -244,9 +245,9 @@ -(NSDictionary*) castObject:(id)uns session:(id)sessionId exParam:(ASFKExecution #pragma mark - Blocking methods -(NSDictionary*) callOrderedSet:(NSOrderedSet*)set session:(id)sessionId exParam:(ASFKExecutionParams*)ex{ dispatch_semaphore_wait(semHighLevelCall, DISPATCH_TIME_FOREVER); - ASFKParamSet* params=[self _decodeExParams:ex forSession:nil]; - params=[self _convertInputOrderedSet:set to:params]; + ASFKParamSet* params=[ASFKParamSet new]; params.sessionId=sessionId; + params=[self _convertInputOrderedSet:set to:params]; dispatch_semaphore_signal(semHighLevelCall); NSDictionary* res= [self _callOrderedSet:params]; @@ -254,9 +255,9 @@ -(NSDictionary*) callOrderedSet:(NSOrderedSet*)set session:(id)sessionId exParam } -(NSDictionary*) callUnorderedSet:(NSSet*)set session:(id)sessionId exParam:(ASFKExecutionParams*)ex{ dispatch_semaphore_wait(semHighLevelCall, DISPATCH_TIME_FOREVER); - ASFKParamSet* params=[self _decodeExParams:ex forSession:nil]; - params=[self _convertInputUnorderedSet:set to:params]; + ASFKParamSet* params=[ASFKParamSet new]; params.sessionId=sessionId; + params=[self _convertInputUnorderedSet:set to:params]; dispatch_semaphore_signal(semHighLevelCall); NSDictionary* res= [self _callUnorderedSet:params]; @@ -266,9 +267,9 @@ -(NSDictionary*) callUnorderedSet:(NSSet*)set session:(id)sessionId exParam:(ASF -(NSDictionary*) callArray:(NSArray*)array session:(id)sessionId exParam:(ASFKExecutionParams*)ex{ dispatch_semaphore_wait(semHighLevelCall, DISPATCH_TIME_FOREVER); - ASFKParamSet* params=[self _decodeExParams:ex forSession:nil]; - params=[self _convertInputArray:array to:params]; + ASFKParamSet* params=[ASFKParamSet new]; params.sessionId=sessionId; + params=[self _convertInputArray:array to:params]; dispatch_semaphore_signal(semHighLevelCall); NSDictionary* res= [self _callArray:params]; @@ -277,9 +278,9 @@ -(NSDictionary*) callArray:(NSArray*)array session:(id)sessionId exParam:(ASFKEx -(NSDictionary*) callDictionary:(NSDictionary*)dictionary session:(id)sessionId exParam:(ASFKExecutionParams*)ex{ dispatch_semaphore_wait(semHighLevelCall, DISPATCH_TIME_FOREVER); - ASFKParamSet* params=[self _decodeExParams:ex forSession:nil]; - params=[self _convertInputDictionary:dictionary to:params]; + ASFKParamSet* params=[ASFKParamSet new]; params.sessionId=sessionId; + params=[self _convertInputDictionary:dictionary to:params]; dispatch_semaphore_signal(semHighLevelCall); NSDictionary* res= [self _callDictionary:params]; @@ -288,9 +289,9 @@ -(NSDictionary*) callDictionary:(NSDictionary*)dictionary session:(id)sessionId -(NSDictionary*) callObject:(id)uns session:(id)sessionId exParam:(ASFKExecutionParams*)ex{ dispatch_semaphore_wait(semHighLevelCall, DISPATCH_TIME_FOREVER); - ASFKParamSet* params=[self _decodeExParams:ex forSession:nil]; - params=[self _convertInput:uns to:params]; + ASFKParamSet* params=[ASFKParamSet new]; params.sessionId=sessionId; + params=[self _convertInput:uns to:params]; dispatch_semaphore_signal(semHighLevelCall); NSDictionary* res= [self _callArray:params]; return res; @@ -311,11 +312,11 @@ - (NSDictionary *)stepBlockingWithData:(id)data { - (NSDictionary *)stepNonblockingWithData:(id)data { NSDictionary* result=nil; if([data isKindOfClass:[NSDictionary class]]){ - + //result=[self castDictionary:data exParam:nil]; }else if([data isKindOfClass:[NSArray class]]){ - + //result=[self castArray:data exParam:nil]; }else{ - + //result=[self castObject:data exParam:nil]; } return result; } @@ -336,13 +337,20 @@ -(ASFKParamSet*) _decodeExParams:(ASFKExecutionParams*)ex forSession:(id)session expar.cancProc = ex->cancellationProc?ex->cancellationProc:cancelproc; expar.excond=ex->expCondition; expar.progress = ex->progressProc?ex->progressProc:progressProc; - if(sessionId){ - expar.sessionId=sessionId; - } - else{ - expar.sessionId=nil; - } + expar.sessionId=sessionId; } +// else{ +// expar.summary = sumproc; +// expar.procs = [_backprocs copy]; +// expar.cancProc = cancelproc; +// +// } +// if(sessionId){ +// expar.sessionId=sessionId; +// } +// else{ +// expar.sessionId=[ASFKBase generateIdentity]; +// } return expar; } @end diff --git a/src/ASFKMBProperties.h b/src/ASFKMBProperties.h index 962d778..0c986e4 100644 --- a/src/ASFKMBProperties.h +++ b/src/ASFKMBProperties.h @@ -12,13 +12,19 @@ GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ +// +// Created by Boris Vigman on 16/05/2021. // Copyright © 2019-2022 Boris Vigman. All rights reserved. // #import #import "ASFKExpirationCondition.h" #include - +//typedef enum e_ASFKMBPoppingPolicy{ +// e_ASFKMB_POP_OWNER_ONLY, +// e_ASFKMB_POP_FIRST_POPPER +//} eASFKMBPoppingPolicy; +typedef void(^ASFKMbNRunOnContainerReadRoutine)(id cId,NSDate* tstamp, NSArray* data); /*! @brief custom filter of incoming messages. @discussion custom Routine that filters accepted messages. User can review all accepted messages and select some subset to be removed. After this call ended, the selected messages will be removed from collection. @@ -36,7 +42,7 @@ typedef void(^ASFKMbNotifyOnContainerDiscardRoutine)(id cId,NSDate* tstamp); @brief notification on incoming messages. @discussion custom Routine that notifies user about incoming message. @param cId group/user ID. - @param newMsg handle of the new message before it was accepted. + @param newMsgCount up-to-date message count. */ typedef void(^ASFKMbNotifyOnNewMsgRoutine)(id cId, NSUInteger newMsgCount); typedef void(^ASFKMbNotifyOnContainerPopRoutine)(id cId,NSArray* popped,NSUInteger left); @@ -66,7 +72,6 @@ typedef void(^ASFKMbMsgFeedbackProc)(id cId, NSDate* timepoint, id msg); /*! @brief Group membership duration, greater than zero. After this period member will automatically leave the group. Negative value ignored. */ -//@property (nonatomic,readonly) NSTimeInterval leaveAfterSeconds; @property (nonatomic,readonly) ASFKConditionTemporal* grpMemLeaveTimer; /*! @brief set Group membership leaving date. Nil or date lesser than or equal to current time lead to invalidation of underlying properties. @@ -87,6 +92,7 @@ typedef void(^ASFKMbMsgFeedbackProc)(id cId, NSDate* timepoint, id msg); #pragma mark - Group/Container Props @interface ASFKMBContainerProperties :NSObject -(void) initFromProps:(ASFKMBContainerProperties*)p; +@property ASFKMbNRunOnContainerReadRoutine runOnReadProc; /*! @brief Indication of whether addition of new members to a group is allowed. NO for permission. @discussion When applied to standalone mailbox, no effect expected. @@ -262,4 +268,3 @@ typedef void(^ASFKMbMsgFeedbackProc)(id cId, NSDate* timepoint, id msg); @end typedef void(^ASFKMbCallReleaseRoutine)(); - diff --git a/src/ASFKMBProperties.mm b/src/ASFKMBProperties.mm index d4231bb..a5ce7d3 100644 --- a/src/ASFKMBProperties.mm +++ b/src/ASFKMBProperties.mm @@ -161,6 +161,7 @@ -(id)init{ self.onLeaveProc=nil; self.onDiscardProc=nil; self.feedbackProc=nil; + self.runOnReadProc=nil; _containerDeleteTimer=[ASFKConditionTemporal new]; _containerKickoutTimer=[ASFKConditionTemporal new]; _containerDropMsgTimer=[ASFKConditionTemporal new]; @@ -176,6 +177,7 @@ -(id)init{ } -(void) initFromProps:(ASFKMBContainerProperties*)p{ if(p){ + self.runOnReadProc=p.runOnReadProc; self.onNewMsgProc=p.onNewMsgProc; self.onPopProc=p.onPopProc; self.onReadProc=p.onReadProc; @@ -205,16 +207,12 @@ -(void) setPropDeleteAfterSeconds:(NSTimeInterval)seconds{ //} } -(void) setPropKickoutAfterSeconds:(NSTimeInterval)seconds{ - //if( seconds>0){ [self.containerKickoutTimer setDelay:seconds]; [self.containerKickoutTimer delayToDeadline]; - //} } -(void) setPropDropMsgAfterSeconds:(NSTimeInterval)seconds{ - //if( seconds>0){ [self.containerDropMsgTimer setDelay:seconds]; [self.containerDropMsgTimer delayToDeadline]; - //} } -(void) setPropDropMsgOnDate:(NSDate *)date{ [self.containerDropMsgTimer setDueDate:date]; @@ -256,30 +254,4 @@ -(id)init{ } @end -//@implementation ASFKMBBlockableMsgProperties -//-(id) init{ -// self=[super init]; -// if(self){ -// _msgReleaseTimer=[ASFKConditionTemporal new]; -// _conditionCallRelease=nil; -// _callRelease=nil;; -// } -// return self; -//} -//-(void) setPropReleaseTimer:(ASFKConditionTemporal*)condition{ -// _msgReleaseTimer=condition; -//} -//-(void) setPropCallRelease:(ASFKConditionCallRelease*)condition{ -// _conditionCallRelease=condition; -//} -//-(void) setPropCallReleaseRoutine:(ASFKMbCallReleaseRoutine)routine{ -// _callRelease=routine; -//} -//-(void) wait{ -// -//} -//-(void) signal{ -// -//} -//@end diff --git a/src/ASFKMBSecret.h b/src/ASFKMBSecret.h index 72a50af..4763dc6 100644 --- a/src/ASFKMBSecret.h +++ b/src/ASFKMBSecret.h @@ -12,7 +12,6 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ -// // Copyright © 2019-2022 Boris Vigman. All rights reserved. // @@ -23,18 +22,19 @@ Secrets are objects used to authorize operations; When API call invoked and secret is provided, it is tested against some other stored secret; if both secrets match, the operation is allowed. Secrets are organized by types and roles. There are 3 types: Master, Private and Group. Master secret is single, global and can affect all mailboxes. Private/Group may affect only specific mailbox; Private secret should be created and used by mailbox owners only, while Group secret may be used by owner and group members. Each secret may play different roles, while some roles are disabled for different secret types. - Available Roles: - Private Group Master + Available Roles: Private Group Master 1. creation of group mailbox x + by cloning or set operation 2. Reading x x 3. Popping x x - 4. Resource management x x x - 5. Discarding of mailboxes and groups x x - 6. unicast x x x - 7. multicast x x x + 4. Discarding of mailboxes and groups x x + 5. unicast x x x + 6. multicast x x x + 7. moderation - blinding/muting of members x x 8. security - changing secrets for Mailbox, Group, Global x x - 9. config - update of mailbox operational parameters x - 10. hosting - addition/removal of members to/from Group mailbox x x + 9. issuer - retraction/hiding of posted messages x x + 10. config - update of mailbox operational parameters x + 11. hosting - addition/removal of members to/from Group mailbox x x Secrets lifetime and configuration: All secrets have unlimited lifetime by default, which however can be configured to be temporary: for limited time period, limited number of use attempts or custom lifetime shortening criteria. When lifetime is ended the secret is invalidated forever. Manual invalidation is available too. @@ -48,6 +48,7 @@ typedef BOOL(^ASFKSecretComparisonProc)(id secret1,id secret2); */ @interface ASFKSecret :NSObject{ @private + id _secretModerator; id _secretUnicaster; id _secretMulticaster; id _secretPopper; @@ -57,6 +58,7 @@ typedef BOOL(^ASFKSecretComparisonProc)(id secret1,id secret2); id _secretSecurity; id _secretConfigurer; id _secretHost; + id _secretIssuer; } @property (readonly) ASFKConditionTemporal* timerExpiration; -(BOOL) matchesUnicasterSecret:(ASFKSecret*)secret; @@ -65,7 +67,9 @@ typedef BOOL(^ASFKSecretComparisonProc)(id secret1,id secret2); -(BOOL) matchesPopperSecret:(ASFKSecret*)secret; -(BOOL) matchesDiscarderSecret:(ASFKSecret*)secret; -(BOOL) matchesCreatorSecret:(ASFKSecret*)secret; +-(BOOL) matchesModeratorSecret:(ASFKSecret*)secret; -(BOOL) matchesHostSecret:(ASFKSecret*)secret; +-(BOOL) matchesIssuerSecret:(ASFKSecret*)secret; -(BOOL) matchesConfigSecret:(ASFKSecret*)secret; -(BOOL) matchesSecuritySecret:(ASFKSecret*)secret; @@ -75,6 +79,8 @@ typedef BOOL(^ASFKSecretComparisonProc)(id secret1,id secret2); -(BOOL) setPopperSecretOnce:(id)secret; -(BOOL) setDiscarderSecretOnce:(id)secret; -(BOOL) setCreatorSecretOnce:(id)secret; +-(BOOL) setModeratorSecretOnce:(id)secret; +-(BOOL) setIssuerSecretOnce:(id)secret; -(BOOL) setHostSecretOnce:(id)secret; -(BOOL) setConfigSecretOnce:(id)secret; -(BOOL) setSecuritySecretOnce:(id)secret; @@ -83,6 +89,8 @@ typedef BOOL(^ASFKSecretComparisonProc)(id secret1,id secret2); -(BOOL) setMulticasterSecretComparisonProcOnce:(ASFKSecretComparisonProc)cmpproc; -(BOOL) setCreatorSecretComparisonProcOnce:(ASFKSecretComparisonProc)cmpproc; -(BOOL) setDiscarderSecretComparisonProcOnce:(ASFKSecretComparisonProc)cmpproc; +-(BOOL) setModeratorSecretComparisonProcOnce:(ASFKSecretComparisonProc)cmpproc; +-(BOOL) setIssuerSecretComparisonProcOnce:(ASFKSecretComparisonProc)cmpproc; -(BOOL) setConfigSecretComparisonProcOnce:(ASFKSecretComparisonProc)cmpproc; -(BOOL) setSecuritySecretComparisonProcOnce:(ASFKSecretComparisonProc)cmpproc; -(BOOL) setReaderSecretComparisonProcOnce:(ASFKSecretComparisonProc)cmpproc; @@ -95,10 +103,13 @@ typedef BOOL(^ASFKSecretComparisonProc)(id secret1,id secret2); -(void) invalidatePopperSecret; -(void) invalidateDiscarderSecret; -(void) invalidateCreatorSecret; +-(void) invalidateModeratorSecret; -(void) invalidateSecuritySecret; +-(void) invalidateIssuerSecret; -(void) invalidateConfigSecret; -(void) invalidateHostSecret; -(void) invalidateAll; +-(BOOL) validSecretModerator; -(BOOL) validSecretCreator; -(BOOL) validSecretDiscarder; -(BOOL) validSecretUnicaster; @@ -108,6 +119,7 @@ typedef BOOL(^ASFKSecretComparisonProc)(id secret1,id secret2); -(BOOL) validSecretHost; -(BOOL) validSecretSecurity; -(BOOL) validSecretConfig; +-(BOOL) validSecretIssuer; -(BOOL) setExpirationDateOnce:(NSDate*)aDate; -(BOOL) setExpirationDelayOnce:(NSTimeInterval) sec; -(BOOL) passedExpirationDeadline:(NSDate*)deadline; @@ -116,21 +128,21 @@ typedef BOOL(^ASFKSecretComparisonProc)(id secret1,id secret2); @end /*! @brief Declaration of master secret entity. - @discussion If applied to container having private secret - the private secret is overriden if master secret is valid and non-nil. + @discussion If applied to container having private secret - the private secret is overriden if master secret is valid and non-nil. Roles available for master key: purging of maibox, deletion of mailbox, messages and users; setting of master secret; unicast, broadcast and multicast. Master secret may not be used for moderation, reading. */ @interface ASFKMasterSecret :ASFKSecret @end /*! @brief Declaration of private secret entity. - @discussion only container's owner having private secret may use it. + @discussion only container's owner having private secret may use it. Roles available for private secret: purging of mailbox; creation of private mailbox; reading and popping; moderation - muting, blinding and so on; unicast, broadcast and multicast. */ @interface ASFKPrivateSecret :ASFKSecret @end /*! @brief Declaration of group secret entity. - @discussion only group owner and group members may use it. + @discussion only group owner and group members may use it. Roles available for group secret: purging of mailbox; reading and popping; moderation - muting, blinding and so on. */ @interface ASFKGroupSecret :ASFKPrivateSecret diff --git a/src/ASFKMBSecret.mm b/src/ASFKMBSecret.mm index 359a5ab..feb94d9 100644 --- a/src/ASFKMBSecret.mm +++ b/src/ASFKMBSecret.mm @@ -65,7 +65,6 @@ -(BOOL) matchSecretHost:(id)sH secretGuest:(id)sG usageCount:(std::atomic secretExpirationSet; @@ -84,7 +83,6 @@ @implementation ASFKSecret{ ASFK_PrivSecretItem* psiSecurity; ASFK_PrivSecretItem* psiConfig; ASFK_PrivSecretItem* psiIssuer; - } @@ -97,7 +95,8 @@ -(id) init{ secretMaxUsageSet=NO; itsUsageCount=INTMAX_MAX; - psiModerator=[ASFK_PrivSecretItem new];; + psiSecurity=[ASFK_PrivSecretItem new]; + psiConfig=[ASFK_PrivSecretItem new]; psiUnicaster=[ASFK_PrivSecretItem new]; psiMulticaster=[ASFK_PrivSecretItem new]; psiReader=[ASFK_PrivSecretItem new]; @@ -105,25 +104,13 @@ -(id) init{ psiCreator=[ASFK_PrivSecretItem new]; psiDiscarder=[ASFK_PrivSecretItem new]; psiHost=[ASFK_PrivSecretItem new]; + psiModerator=[ASFK_PrivSecretItem new];; psiIssuer=[ASFK_PrivSecretItem new]; - psiSecurity=[ASFK_PrivSecretItem new]; - //psiRcManager=[ASFK_PrivSecretItem new]; - psiConfig=[ASFK_PrivSecretItem new]; + } return self; } --(BOOL) setIssuerSecretComparisonProcOnce:(ASFKSecretComparisonProc)cmpproc{ - if(cmpproc==nil){ - return NO; - } - BOOL tval=NO; - if(psiIssuer->secretCmpSet.compare_exchange_strong(tval,YES)) - { - psiIssuer->secretCmpProc=cmpproc; - return YES; - } - return NO; -} + -(BOOL) setUnicasterSecretComparisonProcOnce:(ASFKSecretComparisonProc)cmpproc{ if(cmpproc==nil){ return NO; @@ -196,18 +183,6 @@ -(BOOL) setDiscarderSecretComparisonProcOnce:(ASFKSecretComparisonProc)cmpproc{ } return NO; } --(BOOL) setModeratorSecretComparisonProcOnce:(ASFKSecretComparisonProc)cmpproc{ - if(cmpproc==nil){ - return NO; - } - BOOL tval=NO; - if(psiModerator->secretCmpSet.compare_exchange_strong(tval,YES)) - { - psiModerator->secretCmpProc=cmpproc; - return YES; - } - return NO; -} -(BOOL) setHostSecretComparisonProcOnce:(ASFKSecretComparisonProc)cmpproc{ if(cmpproc==nil){ return NO; @@ -244,6 +219,30 @@ -(BOOL) setConfigSecretComparisonProcOnce:(ASFKSecretComparisonProc)cmpproc{ } return NO; } +-(BOOL) setModeratorSecretComparisonProcOnce:(ASFKSecretComparisonProc)cmpproc{ + if(cmpproc==nil){ + return NO; + } + BOOL tval=NO; + if(psiModerator->secretCmpSet.compare_exchange_strong(tval,YES)) + { + psiModerator->secretCmpProc=cmpproc; + return YES; + } + return NO; +} +-(BOOL) setIssuerSecretComparisonProcOnce:(ASFKSecretComparisonProc)cmpproc{ + if(cmpproc==nil){ + return NO; + } + BOOL tval=NO; + if(psiIssuer->secretCmpSet.compare_exchange_strong(tval,YES)) + { + psiIssuer->secretCmpProc=cmpproc; + return YES; + } + return NO; +} -(BOOL) setMaxUsageCountOnce:(NSInteger)maxCount{ BOOL tval=NO; if(maxCount>0 && secretMaxUsageSet.compare_exchange_strong(tval,YES)) @@ -348,7 +347,6 @@ -(BOOL) matchesReaderSecret:(ASFKSecret*)secret{ return [psiReader matchSecretHost:_secretReader secretGuest:secret->_secretReader usageCount:itsUsageCount]; } - -(BOOL) matchesPopperSecret:(ASFKSecret*)secret{ if(!psiPopper->secretValid || secret==nil){ return NO; @@ -360,7 +358,6 @@ -(BOOL) matchesPopperSecret:(ASFKSecret*)secret{ } return [psiPopper matchSecretHost:_secretPopper secretGuest:secret->_secretPopper usageCount:itsUsageCount]; } - -(BOOL) matchesDiscarderSecret:(ASFKSecret*)secret{ if(!psiDiscarder->secretValid || secret==nil){ return NO; @@ -372,7 +369,6 @@ -(BOOL) matchesDiscarderSecret:(ASFKSecret*)secret{ } return [psiDiscarder matchSecretHost:_secretDiscarder secretGuest:secret->_secretDiscarder usageCount:itsUsageCount]; } - -(BOOL) matchesCreatorSecret:(ASFKSecret*)secret{ if(!psiCreator->secretValid || secret==nil){ return NO; @@ -395,6 +391,29 @@ -(BOOL) matchesHostSecret:(ASFKSecret*)secret{ } return [psiHost matchSecretHost:_secretHost secretGuest:secret->_secretHost usageCount:itsUsageCount]; } +-(BOOL) matchesIssuerSecret:(ASFKSecret*)secret{ + if(!psiIssuer->secretValid || secret==nil){ + return NO; + } + + if([self passedExpirationDeadline:[NSDate date]] || itsUsageCount<1){ + [self invalidateIssuerSecret]; + return NO; + } + return [psiIssuer matchSecretHost:_secretIssuer secretGuest:secret->_secretIssuer usageCount:itsUsageCount]; +} +-(BOOL) matchesModeratorSecret:(ASFKSecret*)secret{ + if(!psiModerator->secretValid || secret==nil){ + return NO; + } + if([self passedExpirationDeadline:[NSDate date]] || itsUsageCount<1){ + [self invalidateModeratorSecret]; + return NO; + } + + return [psiModerator matchSecretHost:_secretModerator secretGuest:secret->_secretModerator usageCount:itsUsageCount]; + +} #pragma mark - secret setting -(BOOL) setUnicasterSecretOnce:(id)secret{ @@ -429,7 +448,6 @@ -(BOOL) setReaderSecretOnce:(id)secret{ } return NO; } - -(BOOL) setPopperSecretOnce:(id)secret{ BOOL tval=NO; if(psiPopper->secretSet.compare_exchange_strong(tval,YES)) @@ -440,7 +458,6 @@ -(BOOL) setPopperSecretOnce:(id)secret{ } return NO; } - -(BOOL) setDiscarderSecretOnce:(id)secret{ BOOL tval=NO; if(psiDiscarder->secretSet.compare_exchange_strong(tval,YES)) @@ -484,7 +501,6 @@ -(BOOL) setSecuritySecretOnce:(id)secret{ } return NO; } - -(BOOL) setConfigSecretOnce:(id)secret{ BOOL tval=NO; if(psiConfig->secretSet.compare_exchange_strong(tval,YES)) @@ -496,10 +512,30 @@ -(BOOL) setConfigSecretOnce:(id)secret{ } return NO; } -#pragma mark - Invalidation --(void) invalidateModeratorSecret{ - psiModerator->secretValid=NO; +-(BOOL) setModeratorSecretOnce:(id)secret{ + BOOL tval=NO; + if(psiModerator->secretSet.compare_exchange_strong(tval,YES)) + { + _secretModerator=secret; + DASFKLog(@"ASFKMBSecret: base class called"); + return YES; + + } + return NO; } +-(BOOL) setIssuerSecretOnce:(id)secret{ + BOOL tval=NO; + if(psiIssuer->secretSet.compare_exchange_strong(tval,YES)) + { + _secretIssuer=secret; + DASFKLog(@"ASFKMBSecret: class called"); + return YES; + + } + return NO; +} +#pragma mark - Invalidation + -(void) invalidateUnicasterSecret{ psiUnicaster->secretValid=NO; } @@ -521,20 +557,21 @@ -(void) invalidateCreatorSecret{ -(void) invalidateConfigSecret{ psiConfig->secretValid=NO; } - --(void) invalidateIssuerSecret{ - psiIssuer->secretValid=NO; -} -(void) invalidateSecuritySecret{ psiSecurity->secretValid=NO; } -(void) invalidateHostSecret{ psiHost->secretValid=NO; } -#pragma mark - Validity --(BOOL) validSecretModerator{ - return psiModerator->secretValid; +-(void) invalidateIssuerSecret{ + psiIssuer->secretValid=NO; +} +-(void) invalidateModeratorSecret{ + psiModerator->secretValid=NO; } + +#pragma mark - Validity + -(BOOL) validSecretCreator{ return psiCreator->secretValid; } @@ -559,32 +596,34 @@ -(BOOL) validSecretSecurity{ -(BOOL) validSecretHost{ return psiHost->secretValid; } +-(BOOL) validSecretConfig{ + return psiConfig->secretValid; +} -(BOOL) validSecretIssuer{ return psiIssuer->secretValid; } - --(BOOL) validSecretConfig{ - return psiConfig->secretValid; +-(BOOL) validSecretModerator{ + return psiModerator->secretValid; } + -(void) invalidateAll{ - [self invalidateModeratorSecret]; [self invalidateUnicasterSecret]; [self invalidateMulticasterSecret]; [self invalidateReaderSecret]; [self invalidatePopperSecret]; [self invalidateDiscarderSecret]; [self invalidateCreatorSecret]; - [self invalidateIssuerSecret]; [self invalidateHostSecret]; [self invalidateConfigSecret]; [self invalidateSecuritySecret]; + [self invalidateModeratorSecret]; + [self invalidateIssuerSecret]; } -(BOOL) isValidOnDate:(NSDate*)aDate{ return (![self passedExpirationDeadline:aDate]); } -(BOOL) areValidAll{ return (psiCreator->secretValid&& - psiModerator->secretValid&& psiUnicaster->secretValid&& psiMulticaster->secretValid&& psiReader->secretValid&& @@ -593,13 +632,13 @@ -(BOOL) areValidAll{ psiHost->secretValid&& psiSecurity->secretValid&& psiConfig->secretValid&& + psiModerator->secretValid&& psiIssuer->secretValid ); } -(BOOL) isValidAny{ return ( psiCreator->secretValid|| - psiModerator->secretValid|| psiUnicaster->secretValid|| psiMulticaster->secretValid|| psiReader->secretValid|| @@ -607,8 +646,9 @@ -(BOOL) isValidAny{ psiDiscarder->secretValid|| psiSecurity->secretValid || psiHost->secretValid || - psiIssuer->secretValid|| - psiConfig->secretValid + psiConfig->secretValid|| + psiModerator->secretValid|| + psiIssuer->secretValid ); } @end diff --git a/src/ASFKMailbox.h b/src/ASFKMailbox.h index bd73931..1120151 100644 --- a/src/ASFKMailbox.h +++ b/src/ASFKMailbox.h @@ -12,6 +12,7 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ + // Copyright © 2019-2022 Boris Vigman. All rights reserved. // #ifndef __A_S_F_K_Mailbox_h__ @@ -285,7 +286,7 @@ typedef void(^ASFKMbLockConditionRoutine)(id cid, BOOL group, id msgId, id msg); @param uid user ID; if nil or not found, returns 0. @return Number of messages for this user; 0 if user not found. */ --(NSUInteger) totalMessagesForUser:(id)uid; +-(NSUInteger) totalMessagesInMailbox:(id)uid; /*! @brief counts ALL users of some group. @param gid group ID; if nil or not found, returns 0. @@ -293,7 +294,17 @@ typedef void(^ASFKMbLockConditionRoutine)(id cid, BOOL group, id msgId, id msg); */ -(NSUInteger) totalUsersInGroup:(id)gid; #pragma mark - Reading & Popping +/*! + @brief reading in blocking maner. + @discussion When called, the function reads all available messages; number of messages to read is defined by 'skipAndTake' range. When number of available messages is less than required, the call will not return until new messages arrive. When message from blocking call received, the calling thread is released. + @param skipAndTake range of retrieval; loc represents offset from beginning, number of messages to skip; length represents number of items to retrieve; if 0 then function returns when new messages arrive or waiting period expires. + @param mid user ID; if nil, read fails. + @param secret private (associated with this group) secret is required; if no secret set then nil must be provided. If provided secret does not match the stored one, operation fails. + @return array of available messages; size of the array is less or equal to msgcount; if there is no message, returns empty array. + */ +-(NSArray*) waitAndReadMsg:(NSRange)skipAndTake fromMailbox:(id)mid unblockIf:(ASFKMbLockConditionRoutine)condition withSecret:(ASFKPrivateSecret*)secret; +//-(NSArray*) waitAndReadMsgFromGroup:(id)gid unblockIf:(ASFKMbLockConditionRoutine) condition withSecret:(ASFKPrivateSecret*)secret; /*! @brief reads specified number of earliest messages delivered to given mailbox. @discussion After reading messages are NOT deleted from the queue; fetched messages ordered earliest to latest. If messages arrives from blocking call - the corresponding message is read, calling thread is NOT released. @@ -366,6 +377,16 @@ typedef void(^ASFKMbLockConditionRoutine)(id cid, BOOL group, id msgId, id msg); @param secret private (associated with this group) or group secret is required; if no secret set then nil must be provided. If prided secret does not match the stored one, operation fails. */ -(void) popEarliestMsg:(NSRange)skipAndTake fromGroup:(id)gid forUser:(id)uid withSecret:(ASFKPrivateSecret*)secret; +#pragma mark - Call interface +/*! + @brief delivers specified message synchronously to the specified mailbox. This method returns when the message when call was improper or message read by the receiver. Being invoked, this method blocks the calling thread, until some message read or a condition met. Examples of unblocking conditions: time period elapsed; or some custom condition may be introduced. + @discussion delivered message can not be retracted. + @param msg a message to be delivered; can be nil. + @param uid user ID; may be nil. + @param props properties of message; can be nil. + @return message's ID for successful delivery, nil otherwise. + */ +-(id) call:(id)msg forMailbox:(id)uid withProperties:(ASFKMBMsgProperties*)props unblockIf:(ASFKCondition*)condition secret:(ASFKSecret*)secret;; #pragma mark - Cast interface/Unicasting /*! @@ -376,7 +397,7 @@ typedef void(^ASFKMbLockConditionRoutine)(id cid, BOOL group, id msgId, id msg); @param props properties of message; can be nil. @return message's ID for successful delivery, nil otherwise. */ --(id) cast:(id)msg forMailbox:(id)uid withProperties:(ASFKMBMsgProperties*)props secret:(ASFKMasterSecret*)secret;; +-(id) cast:(id)msg forMailbox:(id)uid withProperties:(ASFKMBMsgProperties*)props secret:(ASFKSecret*)secret;; #pragma mark - Cast interface/Multicasting /*! @brief delivers specified message asynchronously to ALL users registered in specific group. @@ -403,11 +424,6 @@ typedef void(^ASFKMbLockConditionRoutine)(id cid, BOOL group, id msgId, id msg); @return YES for successful delivery, NO otherwise. */ -(BOOL) broadcast:(id)msg withProperties:(ASFKMBMsgProperties*)props secret:(ASFKSecret*)secret;; -/*! - @brief retracts delivered message from specific group. - @discussion poster can retract posted message while it is available for retraction;the message is available for retraction until it has been popped or its lifetime has ended. - @param msgId retractable message ID; if nil, action fails. - @param gid group ID; if nil, action fails. - */ + @end #endif /*#define __A_S_F_K_Mailbox_h__*/ diff --git a/src/ASFKMailbox.mm b/src/ASFKMailbox.mm index c4b9f51..dec0af7 100644 --- a/src/ASFKMailbox.mm +++ b/src/ASFKMailbox.mm @@ -12,7 +12,8 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ - +// +// Created by Boris Vigman on 05/04/2019. // Copyright © 2019-2022 Boris Vigman. All rights reserved. // @@ -100,9 +101,8 @@ -(BOOL) isPrivate; -(BOOL) isInvitable; -(BOOL) canShareUserList; -(NSArray*) read:(NSUInteger)amount offset:(NSUInteger)offset forUser:(id)uid latest:(BOOL) yesno; - +-(NSArray*) readBlocking:(NSUInteger)amount offset:(NSUInteger)offset forUser:(id)uid latest:(BOOL) yesno; -(NSUInteger) pop:(NSUInteger)amount offset:(NSUInteger)offset forUser:(id)uid latest:(BOOL) yesno; - -(void) markBlacklisted; -(void) runPeriodicProc:(NSDate*)tmpoint; -(void) discardUser:(id) uid; @@ -264,6 +264,7 @@ -(BOOL) setPrivateSecret:(ASFKPrivateSecret*)oldsec newsec:(ASFKPrivateSecret*)n //test validity of new secret if([newsec validSecretSecurity]){ myPSecret=newsec; + ASFKLog(@"DONE"); return YES; } @@ -271,6 +272,7 @@ -(BOOL) setPrivateSecret:(ASFKPrivateSecret*)oldsec newsec:(ASFKPrivateSecret*)n } else{ myPSecret=newsec; + return YES; } } @@ -283,6 +285,7 @@ -(BOOL) setPrivateSecret:(ASFKPrivateSecret*)oldsec newsec:(ASFKPrivateSecret*)n if([newsec validSecretSecurity]){ [myPSecret invalidateAll]; myPSecret=newsec; + ASFKLog(@"DONE"); return YES; } @@ -291,6 +294,7 @@ -(BOOL) setPrivateSecret:(ASFKPrivateSecret*)oldsec newsec:(ASFKPrivateSecret*)n else{ [myPSecret invalidateAll]; myPSecret=nil; + ASFKLog(@"DONE"); return YES; } @@ -345,7 +349,7 @@ -(BOOL) setMsgQDropperPolicy:(eASFKQDroppingPolicy)policy{ return res; } --(BOOL) setMemberingLimitsLow:(NSUInteger)low high:(NSUInteger)high dropPolicy:(eASFKQDroppingPolicy)dpolicy{ +-(BOOL) setMemberingLimitsLow:(NSUInteger)low high:(NSUInteger)high{ if(blacklisted){ return NO; } @@ -482,7 +486,7 @@ -(id) addMsg:(id) msg withProperties:(ASFKMBMsgProperties *)properties group:(BO { grant &=!itscprops.noPostUnpopulatedGroup || (itscprops.noPostUnpopulatedGroup && (([backusers count]) >0)); } - + BOOL memLim= (memLimitLow==0 || ([backusers count] >= memLimitLow && memLimitLow>0))?YES:NO; [lock1 unlock]; if(grant==NO){ @@ -504,9 +508,11 @@ -(id) addMsg:(id) msg withProperties:(ASFKMBMsgProperties *)properties group:(BO privmsg->wlocker=[NSCondition new]; BOOL success = [entranceQ push:privmsg]; if(success){ + [rwBlocker->rlocker lock]; [rwBlocker->rlocker signal]; [rwBlocker->rlocker unlock]; + [privmsg->wlocker lock]; [privmsg->wlocker wait]; [privmsg->wlocker unlock]; @@ -534,7 +540,113 @@ -(id) addMsg:(id) msg withProperties:(ASFKMBMsgProperties *)properties group:(BO return uuid; } #pragma mark - reading +-(NSArray*) readBlocking:(NSUInteger)amount offset:(NSUInteger)offset forUser:(id)uid latest:(BOOL) yesno{ + if(blacklisted){ + return @[]; + } + NSDate* tmpoint=[NSDate date]; + [self _testAndRemove:tmpoint]; + [self _testAndAccept:tmpoint]; + NSMutableArray* ma=[NSMutableArray array]; + NSMutableArray* readMsgs=[NSMutableArray array]; + [lock1 lock]; + BOOL hasuser=[_users containsObject:uid] || [_itsOwnerId isEqualTo:uid]; + if(!hasuser){ + [lock1 unlock]; + return ma; + } + if(NO==[self _canUserRead:uid]){ + [lock1 unlock]; + return ma; + } + NSUInteger msc=[messages count]; + [lock1 unlock]; + if(msc < 1 || amount > msc){ + [rwBlocker->rlocker lock]; + + while(1){ + [self _testAndRemove:tmpoint]; + [self _testAndAccept:tmpoint]; + [lock1 lock]; + msc=[messages count]; + [lock1 unlock]; + if(msc < 1 ){ + [rwBlocker->rlocker wait]; + } + else{ + break; + } + } + + [rwBlocker->rlocker unlock]; + } + + + if(msc < 1 || offset >= msc){ + + return ma; + } + + if(amount > msc){ + amount=msc; + } + NSInteger lowbound=offset; + if(yesno){ + lowbound=msc-amount; + if(lowbound<0){ + lowbound=0; + } + lowbound=lowbound+offset; + } + + if(lowbound>msc){ + lowbound=msc; + } + NSInteger hibound=lowbound+amount; + if(hibound>msc){ + hibound=msc; + } + + for (NSInteger ui=lowbound; uiblocked == YES){ + [ma addObject:privmsg.msg]; + privmsg.props->maxAccessLimit.fetch_sub(1); + [readMsgs addObject:privmsg.msgId]; + [privmsg->wlocker lock]; + [privmsg->wlocker signal]; + [privmsg->wlocker unlock]; + } + else + { + if( + (privmsg.props.msgReadabilityTimer.itsDeadline==nil + || [privmsg.props passedReadingDate:tmpoint]) + //&& privmsg.props->maxAccessLimit>0 + ) + { + if(privmsg.props->maxAccessLimit>0) + { + [ma addObject:privmsg.msg]; + privmsg.props->maxAccessLimit.fetch_sub(1); + [readMsgs addObject:privmsg.msgId]; + } + } + } + + } + + [lock1 lock]; + NSUInteger msgCount=[messages count]; + ASFKMbNotifyOnContainerReadRoutine crr=itscprops.onReadProc; + [lock1 unlock]; + if(crr){ + crr(self.itsOwnerId,readMsgs,msgCount); + } + return ma; +} -(NSArray*) read:(NSUInteger)amount offset:(NSUInteger)offset forUser:(id)uid latest:(BOOL) yesno{ if(blacklisted){ return @[]; @@ -600,10 +712,15 @@ -(NSArray*) read:(NSUInteger)amount offset:(NSUInteger)offset forUser:(id)uid la NSUInteger msgCount=[messages count]; ASFKMbNotifyOnContainerReadRoutine crr=itscprops.onReadProc; + ASFKMbNRunOnContainerReadRoutine rcr=itscprops.runOnReadProc; [lock1 unlock]; if(crr){ crr(self.itsOwnerId,readMsgs,msgCount); } + + if(rcr){ + rcr(self.itsOwnerId,[NSDate date],ma); + } return ma; } @@ -670,13 +787,14 @@ -(NSUInteger) pop:(NSUInteger)amount offset:(NSUInteger)offset forUser:(id)uid l [messages removeObjectsInRange:rn]; NSUInteger msgCount=[messages count]; ASFKMbNotifyOnContainerPopRoutine cpr=itscprops.onPopProc; + [lock1 unlock]; if(cpr){ cpr(self.itsOwnerId,poppedMsgs,msgCount); } + return rn.length; } - #pragma mark - deletion -(BOOL) shouldBeDeletedAtDate:(NSDate*)aDate{ BOOL res=NO; @@ -684,6 +802,7 @@ -(BOOL) shouldBeDeletedAtDate:(NSDate*)aDate{ [lock1 lock]; if(itscprops.containerDeleteTimer){ if([itscprops.containerDeleteTimer isConditionMetAfterDateValue:aDate data:nil]){ + //[lock1 unlock]; res=YES; } } @@ -758,126 +877,7 @@ -(void) discardUser:(id)uid{ itscprops.onLeaveProc(self.itsOwnerId, uid); } } -#pragma mark - moderation --(BOOL) mute:(BOOL)yesno user:(id)uid secret:(ASFKPrivateSecret *)secret group:(BOOL)grp{ - if(blacklisted){ - return NO; - } - if(grp){ - BOOL res=NO; - [lock1 lock]; - BOOL hasuser=[_users containsObject:uid]; - if(!hasuser){ - [lock1 unlock]; - return NO; - } - ASFKMBGroupMemberProperties* up=[uprops objectForKey:uid]; - if(up){ - up.isMuted=yesno; - res=YES; - } - [lock1 unlock]; - return res; - } - else{ - BOOL res=NO; - [lock1 lock]; - ASFKMBGroupMemberProperties* up=[uprops objectForKey:uid]; - if(up){ - up.isMuted=yesno; - res=YES; - } - else{ - if(yesno){ - ASFKMBGroupMemberProperties* u=[ASFKMBGroupMemberProperties new]; - u.isMuted=YES; - [uprops setObject:u forKey:uid]; - res=YES; - } - } - [lock1 unlock]; - return res; - } -} --(BOOL) muteAll:(BOOL)yesno secret:(ASFKPrivateSecret *)secret group:(BOOL)grp{ - if(blacklisted){ - return NO; - } - if(grp){ - BOOL res=YES; - [lock1 lock]; - [uprops enumerateKeysAndObjectsWithOptions:NSEnumerationConcurrent usingBlock:^(id _Nonnull key, id _Nonnull obj, BOOL * _Nonnull stop) { - BOOL hasuser=[_users containsObject:key] || [key isEqualTo:self.itsOwnerId]; - if(hasuser){ - ASFKMBGroupMemberProperties* up=(ASFKMBGroupMemberProperties* )obj; - up.isMuted=yesno; - } - - }]; - [lock1 unlock]; - return res; - } - else{ - BOOL res=YES; - [lock1 lock]; - - [uprops enumerateKeysAndObjectsWithOptions:NSEnumerationConcurrent usingBlock:^(id _Nonnull key, id _Nonnull obj, BOOL * _Nonnull stop) { - ASFKMBGroupMemberProperties* up=(ASFKMBGroupMemberProperties* )obj; - up.isMuted=yesno; - - }]; - [lock1 unlock]; - return res; - - } -} --(BOOL) blind:(BOOL)yesno user:(id)uid secret:(ASFKPrivateSecret *)secret{ - if(blacklisted){ - return NO; - } - BOOL res=NO; - [lock1 lock]; - BOOL hasuser=[_users containsObject:uid]; - if(!hasuser){ - [lock1 unlock]; - return NO; - } - ASFKMBGroupMemberProperties* up=[uprops objectForKey:uid]; - if(up){ - up.isBlinded=yesno; - res=YES; - } - [lock1 unlock]; - return res; -} --(BOOL) blindAll:(BOOL)yesno secret:(ASFKPrivateSecret *)secret{ - if(blacklisted){ - return NO; - } - BOOL res=YES; - [lock1 lock]; - [uprops enumerateKeysAndObjectsWithOptions:NSEnumerationConcurrent usingBlock:^(id _Nonnull key, id _Nonnull obj, BOOL * _Nonnull stop) { - BOOL hasuser=[_users containsObject:key] || [key isEqualTo:self.itsOwnerId]; - if(hasuser){ - ASFKMBGroupMemberProperties* up=(ASFKMBGroupMemberProperties* )obj; - up.isBlinded=yesno; - } - - }]; - - [lock1 unlock]; - return res; -} --(BOOL) retractMsg:(id)msgId{ - if(blacklisted){ - return NO; - } - [lock1 lock]; - [retractionList addObject:msgId]; - [lock1 unlock]; - return NO; -} #pragma mark - maintenance -(void) runPeriodicProc:(NSDate*)tmpoint{ if(blacklisted){ @@ -941,7 +941,9 @@ -(void) _testAndRemove:(NSDate*)tmpoint{ } -(void) _testAndAccept:(NSDate*)tmpoint{ [lock1 lock]; + NSUInteger eqsize=[entranceQ count]; + ASFKMbNotifyOnNewMsgRoutine onInb=itscprops.onNewMsgProc; [lock1 unlock]; NSUInteger msgCount=0; @@ -957,8 +959,10 @@ -(void) _testAndAccept:(NSDate*)tmpoint{ id msg=[entranceQ pull]; if(msg){ + [messages addObject:msg]; ++msgCount; + } } if(onInb && msgCount>0){ @@ -1210,14 +1214,17 @@ -(NSUInteger) runDiscarding:(size_t)sampleSize timepoint:(NSDate*)tm{ dispatch_apply([ua count], dConQ_Background, ^(size_t index) { ASFKSomeContainer* obj=[ua objectAtIndex:index]; [obj purge:tm]; + }); } tm=[NSDate date]; NSArray* ga=[self _repackItems:blacklistedGroups sampleSize:sampleSize dispQ:dConQ_Background]; if([ga count]>0){ dispatch_apply([ga count], dConQ_Background, ^(size_t index) { + ASFKSomeContainer* obj=[ga objectAtIndex:index]; [obj purge:tm]; + }); } @@ -1421,7 +1428,6 @@ -(NSUInteger) runDaemon:(size_t)sampleSize timepoint:(NSDate*)tm callbacks:(ASFK [self runDelivery:sampleSize]; [self runDiscarding:sampleSize timepoint:tm]; - [self runPeriodic:sampleSize timepoint:tm callbacks:clbs]; return 0; } @@ -1668,6 +1674,7 @@ -(BOOL) discardAllUsersWithSecret:(ASFKMasterSecret*)secret{ [lockDB lock]; [blacklistedUsers addObject:users]; [lockDB unlock]; + users=[NSMutableDictionary dictionary]; [lockUsersDB unlock]; return YES; @@ -1879,7 +1886,7 @@ -(NSUInteger) totalMessagesInGroup:(id)gid{ } return c; } --(NSUInteger) totalMessagesForUser:(id)uid{ +-(NSUInteger) totalMessagesInMailbox:(id)uid{ if(!uid){ return 0; } @@ -1909,6 +1916,21 @@ -(NSUInteger) totalUsersInGroup:(id)gid{ } return c; } +#pragma mark - Read with blocking +-(NSArray*) waitAndReadMsg:(NSRange)skipAndTake fromMailbox:(id)mid unblockIf:(ASFKMbLockConditionRoutine)condition withSecret:(ASFKPrivateSecret*)secret{ + if(!mid){ + return @[]; + } + [lockUsersDB lock]; + ASFKSomeContainer* sg=[users objectForKey:mid]; + [lockUsersDB unlock]; + + if(sg && ([sg isPrivateSecretValid:secret matcher:authmgr->secretProcRead])){ + NSArray* a=[sg readBlocking:skipAndTake.length offset:skipAndTake.location forUser:mid latest:YES]; + return a; + } + return @[]; +} #pragma mark - Read without blocking -(NSArray*) readEarliestMsg:(NSRange)skipAndTake fromMailbox:(id)uid withSecret:(ASFKPrivateSecret*)secret{ @@ -2059,9 +2081,23 @@ -(id) cast:(id)msg forGroup:(id)gid withProperties:(ASFKMBMsgProperties*)props s return res; } +-(id) call:(id)msg forMailbox:(id)uid withProperties:(ASFKMBMsgProperties *)props unblockIf:(ASFKCondition*)condition secret:(ASFKPrivateSecret*)secret{ + [self _cast_relaxMemoryPressure:ASFK_PRIVSYM_MEM_PRESSURE_THRESHOLD ]; + if(!uid || !msg){ + return nil; + } + [lockUsersDB lock]; + ASFKSomeContainer* sg=[users objectForKey:uid]; + [lockUsersDB unlock]; + if(sg && [sg isPrivateSecretValid:secret matcher:authmgr->secretProcUnicast]){ + + return [sg addMsg:msg withProperties:props group:NO blockable:YES]; + } + return nil; +} #pragma mark - Multicasting --(BOOL) broadcast:(id)msg withProperties:(ASFKMBMsgProperties*)props secret:(ASFKMasterSecret*)secret{ +-(BOOL) broadcast:(id)msg withProperties:(ASFKMBMsgProperties*)props secret:(ASFKSecret*)secret{ [self _cast_relaxMemoryPressure:ASFK_PRIVSYM_MEM_PRESSURE_THRESHOLD ]; if(!msg){ return NO; @@ -2116,6 +2152,7 @@ -(id) multicast:(id)msg toMembersOfGroup:(id)g0 secret:(ASFKSecret*)secret{ } return nil; } + #pragma mark - Private methods -(void) _castToSetOfUsers:(NSSet*) uset msg:(id)msg properties:(ASFKMBMsgProperties*)props{ if(uset && msg){ diff --git a/src/ASFKNonlinearFlow.h b/src/ASFKNonlinearFlow.h new file mode 100644 index 0000000..f440e86 --- /dev/null +++ b/src/ASFKNonlinearFlow.h @@ -0,0 +1,27 @@ +/* + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + */ + +// Copyright © 2019-2022 Boris Vigman. All rights reserved. +// + +#ifndef ASFKNonlinearFlow_h +#define ASFKNonlinearFlow_h +@interface ASFKNonlinearFlow : ASFKBase +-(BOOL) attachTrueTarget:(id)target withId:(ASFK_IDENTITY_TYPE)identity; +-(BOOL) attachFalseTarget:(id)target withId:(ASFK_IDENTITY_TYPE)identity; +-(BOOL) detachTrueTargetWithId:(ASFK_IDENTITY_TYPE)identity; +-(BOOL) detachFalseTargetWithId:(ASFK_IDENTITY_TYPE)identity; +@end +#endif /* ASFKNonlinearFlow */ diff --git a/src/ASFKNonlinearFlow.mm b/src/ASFKNonlinearFlow.mm new file mode 100644 index 0000000..a48be4c --- /dev/null +++ b/src/ASFKNonlinearFlow.mm @@ -0,0 +1,41 @@ +/* + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + */ + +// Copyright © 2019-2022 Boris Vigman. All rights reserved. +// + +#import "ASFKBase.h" + +@implementation ASFKNonlinearFlow +-(id)initWithName:(NSString*)name{ + self=[super initWithName:name]; + if(self){ + + } + return self; +} +-(BOOL) attachTrueTarget:(id)target withId:(ASFK_IDENTITY_TYPE)identity{ + return YES; +} +-(BOOL) attachFalseTarget:(id)target withId:(ASFK_IDENTITY_TYPE)identity{ + return YES; +} +-(BOOL) detachTrueTargetWithId:(ASFK_IDENTITY_TYPE)identity{ + return YES; +} +-(BOOL) detachFalseTargetWithId:(ASFK_IDENTITY_TYPE)identity{ + return YES; +} +@end diff --git a/src/ASFKPipelinePar.h b/src/ASFKPipelinePar.h index 31af51b..026b792 100644 --- a/src/ASFKPipelinePar.h +++ b/src/ASFKPipelinePar.h @@ -12,6 +12,7 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ + // Copyright © 2019-2022 Boris Vigman. All rights reserved. // @@ -23,6 +24,7 @@ */ @interface ASFKPipelinePar : ASFKLinearFlow +-(BOOL) isPausedSession:(ASFK_IDENTITY_TYPE)sessionId; /*! @brief Equals YES if session with given identity exists AND is still processing data batch ; NO otherwise. @@ -30,6 +32,7 @@ -(BOOL) isBusySession:(ASFK_IDENTITY_TYPE)sessionId; +(long long) runningSessionsCount; ++(long long) pausedSessionsCount; /*! @brief Cancels ALL sessions created by ALL instances. */ @@ -53,7 +56,32 @@ @brief flushes all queued items for all sessions. */ +(void)flushAllGlobally; +/*! + @brief flushes all queued items for all sessions created by this instance. + */ +-(void)pauseAll; +/*! + @brief flushes all queued items for given session ID. + */ +-(void)pauseSession:(ASFK_IDENTITY_TYPE)sessionId; + +/*! + @brief flushes all queued items for all sessions. + */ ++(void)pauseAllGlobally; +/*! + @brief flushes all queued items for all sessions created by this instance. + */ +-(void)resumeAll; +/*! + @brief flushes all queued items for given session ID. + */ +-(void)resumeSession:(ASFK_IDENTITY_TYPE)sessionId; +/*! + @brief flushes all queued items for all sessions. + */ ++(void)resumeAllGlobally; /*! @brief sets new class of QoS (i.e. thread priority). @param newqos required class of Quality of Service . Allowed values are:QOS_CLASS_USER_INTERACTIVE, QOS_CLASS_UTILITY, QOS_CLASS_BACKGROUND. By default QOS_CLASS_BACKGROUND is set. The parameter will be in effect after restart. diff --git a/src/ASFKPipelinePar.mm b/src/ASFKPipelinePar.mm index b216b5f..512fd88 100644 --- a/src/ASFKPipelinePar.mm +++ b/src/ASFKPipelinePar.mm @@ -12,7 +12,7 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ -// + // Copyright © 2019-2022 Boris Vigman. All rights reserved. // @@ -23,6 +23,7 @@ #include #include #import "ASFKGlobalThreadpool.h" +#import "ASFKPipelineSession.h" @interface ASFKPipelinePar() @end @implementation ASFKPipelinePar{ @@ -34,7 +35,7 @@ @implementation ASFKPipelinePar{ -(id)init{ self = [super init]; if(self){ - + //_defaultSessionId=[ASFKBase generateIdentity]; [self _initPipeline]; } return self; @@ -52,12 +53,13 @@ -(void) _initPipeline{ } --(ASFKPipelineSession*) _resolveSessionforParams:(ASFKParamSet*)ps sessionCreated:(BOOL&)created{ - ASFKPipelineSession* s=nil; +-(ASFKPipelineSession*) _resolveSessionforParams:(ASFKParamSet*)ps { + ASFKThreadpoolSession* s=nil; if(ps.sessionId != nil && NO==[ps.sessionId isKindOfClass:[NSNull class]]){ s=[globalTPool getThreadpoolSessionWithId:ps.sessionId]; - if(s){ - return s; + + if(s && [s isKindOfClass:[ASFKPipelineSession class]]){ + return (ASFKPipelineSession*)s; } else{ EASFKLog(@"Session %@ not found; probably wrong ID submitted",ps.sessionId); @@ -65,9 +67,9 @@ -(ASFKPipelineSession*) _resolveSessionforParams:(ASFKParamSet*)ps sessionCreate } } else - + { - + return nil; } } @@ -80,7 +82,7 @@ -(ASFKPipelineSession*) _createNewSessionWithId:(ASFK_IDENTITY_TYPE)sessionId{ return newseq; } -(ASFKPipelineSession*) _prepareSession:(ASFKPipelineSession*)seq withParams:(ASFKParamSet*) params { - [seq addRoutinesFromArray:params.procs]; + [seq replaceRoutinesWithArray:params.procs]; [seq setSummary:params.summary]; [self registerSession:[seq getControlBlock]]; return seq; @@ -104,7 +106,12 @@ -(void) setQualityOfService:(long)newqos{ +(long long) runningSessionsCount{ return [[ASFKGlobalThreadpool sharedManager] runningSessionsCount]; } - +/*! + @return number of paused sessions + */ ++(long long) pausedSessionsCount{ + return [[ASFKGlobalThreadpool sharedManager] pausedSessionsCount]; +} +(void)flushAllGlobally{ [[ASFKGlobalThreadpool sharedManager] flushAll]; } @@ -120,12 +127,51 @@ -(void) flushSession:(ASFK_IDENTITY_TYPE)sessionId{ [globalTPool flushSession:sessionId]; } +/*! + @brief flushes all queued items for all sessions created by this instance. + */ +-(void)pauseAll{ + [lkNonLocal lock]; + for (id s in ctrlblocks) { + [globalTPool pauseSession:s]; + } + [lkNonLocal unlock]; +} +/*! + @brief flushes all queued items for given session ID. + */ +-(void)pauseSession:(ASFK_IDENTITY_TYPE)sessionId{ + [globalTPool pauseSession:sessionId]; +} ++(void)pauseAllGloball{ + [[ASFKGlobalThreadpool sharedManager] pauseAll]; +} +/*! + @brief flushes all queued items for all sessions created by this instance. + */ +-(void)resumeAll{ + [lkNonLocal lock]; + for (id s in ctrlblocks) { + [globalTPool resumeSession:s]; + } + [lkNonLocal unlock]; +} +-(void)resumeSession:(ASFK_IDENTITY_TYPE)sessionId{ + [globalTPool resumeSession:sessionId]; +} ++(void)resumeAllGlobally{ + [[ASFKGlobalThreadpool sharedManager] resumeAll]; +} + +-(BOOL) isPausedSession:(ASFK_IDENTITY_TYPE)sessionId{ + return [globalTPool isPausedSession:sessionId]; +} + -(BOOL)isBusySession:(id)sessionId{ return [globalTPool isBusySession:sessionId]; } -(BOOL)isReady{ - return YES; } @@ -151,10 +197,22 @@ -(void)cancelSession:(NSString*)sessionId{ } } + -(NSDictionary* _Nonnull) createSession:(ASFKExecutionParams*_Nullable) exparams sessionId:(id _Nullable ) sid { uint64 main_t1=[ASFKBase getTimestamp]; dispatch_semaphore_wait(semHighLevelCall, DISPATCH_TIME_FOREVER); ASFKParamSet* params=[self _decodeExParams:exparams forSession:sid]; + if(!params.summary) + { + params.summary = sumproc; + } + if(!params.procs || [params.procs count]==0) + { + params.procs = [_backprocs copy]; + } + if(!params.cancProc){ + params.cancProc = cancelproc; + } //test params if(params.procs==nil @@ -170,10 +228,12 @@ -(NSDictionary* _Nonnull) createSession:(ASFKExecutionParams*_Nullable) exparams kASFKReturnStatsTimeSessionElapsedSec:@(elapsed), kASFKReturnDescription:ASFK_STR_INVALID_PARAM}; } - if(params.sessionId){} - else{ + if(!sid){ params.sessionId=[ASFKBase generateIdentity]; } + else{ + params.sessionId=sid; + } //create new session ASFKPipelineSession* seq=[self _createNewSessionWithId:params.sessionId]; //configure session @@ -217,7 +277,7 @@ -(NSArray*) getSessions{ -(NSDictionary*) _castArray:(ASFKParamSet*)params{ __block uint64 main_t1=[ASFKBase getTimestamp]; DASFKLog(@"ASFKPipelinePar:Object %@: trying to push data items",self.itsName); - //dispatch_semaphore_wait(semHighLevelCall, DISPATCH_TIME_FOREVER); + if ( params.sessionId==nil ||[params.sessionId isKindOfClass:[NSNull class]] @@ -225,7 +285,6 @@ -(NSDictionary*) _castArray:(ASFKParamSet*)params{ ||[params.input isKindOfClass:[NSNull class]] ||[params.input count]<1 ){ - //dispatch_semaphore_signal(semHighLevelCall); uint64 main_t2=[ASFKBase getTimestamp]; double elapsed=(main_t2-main_t1)/1e9; EASFKLog(@"ASFKPipelinePar:Some of input parameters are invalid for session %@",params.sessionId); @@ -236,17 +295,16 @@ -(NSDictionary*) _castArray:(ASFKParamSet*)params{ kASFKReturnDescription:ASFK_STR_INVALID_PARAM}; } ASFKLog(@"Performing non-blocking call"); - BOOL created=NO; - ASFKPipelineSession* s=[self _resolveSessionforParams:params sessionCreated:created]; + + ASFKPipelineSession* s=[self _resolveSessionforParams:params ]; if(s){ - if(params.hasForeignProcs){ + //if(params.hasForeignProcs){ if(params.excond && [params.excond isKindOfClass:[ASFKExpirationCondition class]]){ [s setExpirationCondition:params.excond]; } - [self _prepareSession:s withParams:params]; - [globalTPool addSession:s withId:s.sessionId]; + [globalTPool postDataAsArray:params.input forSession:s.sessionId]; - //[self registerSession:[s getControlBlock]]; + uint64 main_t2=[ASFKBase getTimestamp]; double elapsed=(main_t2-main_t1)/1e9; @@ -255,7 +313,7 @@ -(NSDictionary*) _castArray:(ASFKParamSet*)params{ kASFKReturnStatsTimeSessionElapsedSec:@(elapsed), kASFKReturnSessionId:s.sessionId, kASFKReturnDescription:ASFK_RC_DESCR_DEFERRED}; - } + //} } uint64 main_t2=[ASFKBase getTimestamp]; double elapsed=(main_t2-main_t1)/1e9; @@ -266,19 +324,19 @@ -(NSDictionary*) _castArray:(ASFKParamSet*)params{ kASFKReturnSessionId:[NSNull null], kASFKReturnStatsTimeSessionElapsedSec:@(elapsed), kASFKReturnDescription:@"Some of input parameters are invalid: missing data or Routines or summary"}; - //dispatch_semaphore_signal(semHighLevelCall); + } -(NSDictionary*) _castOrderedSet:(ASFKParamSet *)params{ __block uint64 main_t1=[ASFKBase getTimestamp]; DASFKLog(@"ASFKPipelinePar:Object %@: trying to push data items",self.itsName); - //dispatch_semaphore_wait(semHighLevelCall, DISPATCH_TIME_FOREVER); + if ( params.sessionId==nil ||[params.sessionId isKindOfClass:[NSNull class]] ||params.input==nil ||[params.input isKindOfClass:[NSNull class]] ||[params.input count]<1 - + ){ uint64 main_t2=[ASFKBase getTimestamp]; @@ -291,17 +349,15 @@ -(NSDictionary*) _castOrderedSet:(ASFKParamSet *)params{ kASFKReturnDescription:ASFK_STR_INVALID_PARAM}; } ASFKLog(@"Performing non-blocking call"); - BOOL created=NO; - ASFKPipelineSession* s=[self _resolveSessionforParams:params sessionCreated:created]; + + ASFKPipelineSession* s=[self _resolveSessionforParams:params ]; if(s){ - if(params.hasForeignProcs){ + //if(params.hasForeignProcs){ if(params.excond && [params.excond isKindOfClass:[ASFKExpirationCondition class]]){ [s setExpirationCondition:params.excond]; } - [globalTPool addSession:s withId:s.sessionId]; [globalTPool postDataAsOrderedSet:params.input forSession:s.sessionId]; - [self _prepareSession:s withParams:params]; uint64 main_t2=[ASFKBase getTimestamp]; double elapsed=(main_t2-main_t1)/1e9; @@ -311,7 +367,7 @@ -(NSDictionary*) _castOrderedSet:(ASFKParamSet *)params{ kASFKReturnStatsTimeSessionElapsedSec:@(elapsed), kASFKReturnSessionId:s.sessionId, kASFKReturnDescription:ASFK_RC_DESCR_DEFERRED}; - } + //} } uint64 main_t2=[ASFKBase getTimestamp]; double elapsed=(main_t2-main_t1)/1e9; @@ -326,14 +382,14 @@ -(NSDictionary*) _castOrderedSet:(ASFKParamSet *)params{ -(NSDictionary*) _castUnorderedSet:(ASFKParamSet *)params{ __block uint64 main_t1=[ASFKBase getTimestamp]; DASFKLog(@"ASFKPipelinePar:Object %@: trying to push data items",self.itsName); - //dispatch_semaphore_wait(semHighLevelCall, DISPATCH_TIME_FOREVER); + if ( params.sessionId==nil ||[params.sessionId isKindOfClass:[NSNull class]] ||params.input==nil ||[params.input isKindOfClass:[NSNull class]] ||[params.input count]<1 - + ){ uint64 main_t2=[ASFKBase getTimestamp]; @@ -346,18 +402,17 @@ -(NSDictionary*) _castUnorderedSet:(ASFKParamSet *)params{ kASFKReturnDescription:ASFK_STR_INVALID_PARAM}; } ASFKLog(@"Performing non-blocking call"); - BOOL created=NO; - ASFKPipelineSession* s=[self _resolveSessionforParams:params sessionCreated:created]; + + ASFKPipelineSession* s=[self _resolveSessionforParams:params]; if(s){ - if(params.hasForeignProcs){ + // if(params.hasForeignProcs){ if(params.excond && [params.excond isKindOfClass:[ASFKExpirationCondition class]]){ [s setExpirationCondition:params.excond]; } - [self _prepareSession:s withParams:params]; - [globalTPool addSession:s withId:s.sessionId]; - [globalTPool postDataAsUnorderedSet:params.input forSession:s.sessionId]; + [globalTPool postDataAsUnorderedSet:params.input forSession:s.sessionId]; + uint64 main_t2=[ASFKBase getTimestamp]; double elapsed=(main_t2-main_t1)/1e9; @@ -366,7 +421,7 @@ -(NSDictionary*) _castUnorderedSet:(ASFKParamSet *)params{ kASFKReturnStatsTimeSessionElapsedSec:@(elapsed), kASFKReturnSessionId:s.sessionId, kASFKReturnDescription:ASFK_RC_DESCR_DEFERRED}; - } + //} } uint64 main_t2=[ASFKBase getTimestamp]; double elapsed=(main_t2-main_t1)/1e9; @@ -389,7 +444,7 @@ -(NSDictionary*) _castDictionary:(ASFKParamSet*)params{ ||params.input==nil ||[params.input isKindOfClass:[NSNull class]] ||[params.input count]<1 - + ){ @@ -403,16 +458,14 @@ -(NSDictionary*) _castDictionary:(ASFKParamSet*)params{ kASFKReturnDescription:ASFK_STR_INVALID_PARAM}; } ASFKLog(@"Performing non-blocking call"); - BOOL created=NO; - ASFKPipelineSession* s=[self _resolveSessionforParams:params sessionCreated:created]; + + ASFKPipelineSession* s=[self _resolveSessionforParams:params ]; if(s){ - if(params.hasForeignProcs){ if(params.excond && [params.excond isKindOfClass:[ASFKExpirationCondition class]]){ [s setExpirationCondition:params.excond]; } - [self _prepareSession:s withParams:params]; - [globalTPool addSession:s withId:s.sessionId]; + [globalTPool postDataAsDictionary:params.input forSession:s.sessionId]; uint64 main_t2=[ASFKBase getTimestamp]; @@ -423,7 +476,7 @@ -(NSDictionary*) _castDictionary:(ASFKParamSet*)params{ kASFKReturnStatsTimeSessionElapsedSec:@(elapsed), kASFKReturnSessionId:s.sessionId, kASFKReturnDescription:ASFK_RC_DESCR_DEFERRED}; - } + //} } uint64 main_t2=[ASFKBase getTimestamp]; double elapsed=(main_t2-main_t1)/1e9; diff --git a/src/ASFKPipelineSession+Internal.h b/src/ASFKPipelineSession+Internal.h index 9f060a2..d8c8865 100644 --- a/src/ASFKPipelineSession+Internal.h +++ b/src/ASFKPipelineSession+Internal.h @@ -12,13 +12,11 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ - // Copyright © 2019-2022 Boris Vigman. All rights reserved. // #import "ASFKPipelineSession.h" #import "ASFKBase.h" @interface ASFKPipelineSession (Internal) - -(void) setResultPosition:(NSUInteger)proc; @end diff --git a/src/ASFKPipelineSession.h b/src/ASFKPipelineSession.h index 805c186..ec88400 100644 --- a/src/ASFKPipelineSession.h +++ b/src/ASFKPipelineSession.h @@ -12,49 +12,17 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ -// + // Copyright © 2019-2022 Boris Vigman. All rights reserved. // #ifndef ASFKPipelineSession_h #define ASFKPipelineSession_h #import "ASFKBase.h" -typedef enum enumASFKPipelineExecutionStatus{ - eASFK_ES_HAS_MORE=0, - eASFK_ES_HAS_NONE, - eASFK_ES_WAS_CANCELLED, - //eASFK_ES_WAS_EXPIRED, - eASFK_ES_SKIPPED_MAINT -} eASFKPipelineExecutionStatus; -@interface ASFKPipelineSession : ASFKBase{ - @public ASFKControlBlock* cblk; - @public std::atomic paused; -} -@property ASFK_IDENTITY_TYPE sessionId; --(id)initWithSessionId:(ASFK_IDENTITY_TYPE)sessionId andSubsessionId:(ASFK_IDENTITY_TYPE)subId; --(ASFKControlBlock*) getControlBlock; --(void) flush; --(void) cancel; --(void) postDataItemsAsArray:(NSArray*)array; --(void) postDataItemsAsOrderedSet:(NSOrderedSet*)set; --(void) postDataItemsAsUnorderedSet:(NSSet*)set; --(void) postDataItemsAsDictionary:(NSDictionary*)dict; --(void) postDataItem:(id)dataItem; --(void) addRoutinesFromArray:(NSArray*)procs; --(void) replaceRoutinesWithArray:(NSArray*)procs; --(void) setProgressRoutine:(ASFKProgressRoutine)progress; --(void) setSummary:(ASFKExecutableRoutineSummary)sum; --(void) setExpirationSummary:(ASFKExecutableRoutineSummary)sum; --(eASFKPipelineExecutionStatus) select:(long)selector routineCancel:(ASFKCancellationRoutine)cancel; --(void) setCancellationHandler:(ASFKCancellationRoutine)cru; --(void) setExpirationCondition:(ASFKExpirationCondition*) trop; --(BOOL) hasSessionSummary; --(BOOL) isBusy; --(long) procsCount; --(long) itemsCount; --(void) ping; + +@interface ASFKPipelineSession : ASFKThreadpoolSession @end #endif /* ASFKPipelineSession_h */ diff --git a/src/ASFKPipelineSession.mm b/src/ASFKPipelineSession.mm index 1bc6ab4..f1dbc41 100644 --- a/src/ASFKPipelineSession.mm +++ b/src/ASFKPipelineSession.mm @@ -12,7 +12,7 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ - +// // Copyright © 2019-2022 Boris Vigman. All rights reserved. // #define ASFK_LOCAL_REPLACE 0 @@ -41,20 +41,13 @@ @interface ASFKPipelineSession() @end @implementation ASFKPipelineSession{ - std::atomic isStopped; - ASFKExpirationCondition* excond; + std::atomic busyCount; NSMutableArray* dataQueues; - ASFKThreadpoolQueue* queueZero; - NSMutableArray* procs; - NSMutableArray* intermediateProcs; - ASFKExecutableRoutineSummary passSummary; - ASFKExecutableRoutineSummary expirationSummary; - ASFKProgressRoutine progressProc; - ASFKCancellationRoutine cancellationHandler; - NSLock* lock; NSRange execRange; std::priority_queue, ASFKComparePriorities> pq; + ASFKThreadpoolQueue* queueZero; + NSLock* lock; } -(id)init{ self=[super init]; @@ -64,7 +57,7 @@ -(id)init{ return self; } -(id)initWithSessionId:(ASFK_IDENTITY_TYPE)sessionId andSubsessionId:(ASFK_IDENTITY_TYPE)subId{ - self=[super init]; + self=[super initWithSessionId:sessionId andSubsessionId:subId]; if(self){ [self _PSinitWithSession:sessionId andSubsession:subId]; } @@ -81,14 +74,15 @@ -(void)_PSinitWithSession:(ASFK_IDENTITY_TYPE)sessionId andSubsession:(ASFK_IDEN dataQueues=[NSMutableArray array]; queueZero=[[ASFKThreadpoolQueue alloc]init]; if(sessionId){ - cblk= [self newSession:sessionId andSubsession:subId];// [ASFKControlBlock new]; + cblk= [self newSession:sessionId andSubsession:subId]; }else{ - cblk= [self newSession];// [ASFKControlBlock new]; + cblk= [self newSession]; } self.sessionId=cblk.sessionId; - intermediateProcs=[NSMutableArray array]; + + //intermediateProcs=[NSMutableArray array]; passSummary=(id)^(id controlBlock,NSDictionary* stats,id data){ ASFKLog(@"ASFKPipelineSession: Stub summary"); return data; @@ -260,94 +254,24 @@ -(void) postDataItem:(id)dataItem{ } -(void) addRoutinesFromArray:(NSArray*)ps{ - if([ps count]>0){ - //ready=NO; - [lock lock]; - [intermediateProcs addObject:@[@(ASFK_LOCAL_ADD),@([ps count]),ps]]; - [lock unlock]; - DASFKLog(@"Scheduled for addition %ld procs",[ps count]); - } } -(void) replaceRoutinesWithArray:(NSArray*)ps{ - //ready=NO; + [lock lock]; - long psc=[ps count]; - long procsc=[procs count]; - [intermediateProcs addObject:@[@(ASFK_LOCAL_REPLACE),@(procsc-psc),ps]]; + [procs removeAllObjects]; + [procs addObjectsFromArray:ps]; + [dataQueues removeAllObjects]; + for (ASFKExecutableRoutine er in ps) { + [dataQueues addObject:[ASFKThreadpoolQueue new]]; + } + [lock unlock]; DASFKLog(@"Scheduled for replacement %ld procs",[ps count]); } --(void)_updateRoutines{ - for (NSArray* ar in intermediateProcs) - { - if([[ar objectAtIndex:0]integerValue ]==ASFK_LOCAL_REPLACE){ - if([[ar objectAtIndex:2]count]>0) - { - long dqs=0; - if(dataQueues){ - if([dataQueues count]>0){ - dqs=[dataQueues count]; - ASFKLog(@"Removing %ld Routines",dqs); - ASFKThreadpoolQueue* q=[dataQueues objectAtIndex:0]; - [dataQueues removeAllObjects]; - [dataQueues addObject:q]; - [q unoccupy]; - [procs removeAllObjects]; - [procs addObjectsFromArray:[ar objectAtIndex:2]]; - dqs = [[ar objectAtIndex:2]count]; - ASFKLog(@"Setting %ld Routines instead",dqs); - for (; dqs>1; --dqs) { - [dataQueues addObject:[ASFKThreadpoolQueue new]]; - } - }else - { - [procs addObjectsFromArray:[ar objectAtIndex:2]]; - dqs = [[ar objectAtIndex:2]count]; - ASFKLog(@"Setting %ld Routines",dqs); - for (; dqs>0; --dqs) { - [dataQueues addObject:[ASFKThreadpoolQueue new]]; - } - } - busyCount=0; - } - }else{ - ASFKLog(@"Removing all data queues and Routines"); - for (ASFKThreadpoolQueue* q in dataQueues) { - [q reset]; - } - [dataQueues removeAllObjects]; - [procs removeAllObjects]; - busyCount=0; - } - }else{ - [procs addObjectsFromArray:[ar objectAtIndex:2]]; - long inc=[[ar objectAtIndex:1]integerValue ]; - ASFKLog(@"Adding %ld Routines",inc); - for (long c=inc; c>0; --c) { - [dataQueues addObject:[ASFKThreadpoolQueue new]]; - } - } - execRange=NSMakeRange(0, [dataQueues count]); - } - - [self _resetPriorityQueue]; - [self _adoptDataFromZeroQueue]; - - long c=0; - for (ASFKThreadpoolQueue* q in dataQueues) { - sASFKPrioritizedQueueItem qin; - qin.queueId=c; - qin.priority=[q count]; - if(qin.priority>0){ - pq.push(qin); - } - ++c; - } -} -(BOOL) hasSessionSummary{ [lock lock]; @@ -361,17 +285,11 @@ -(void) cancel{ [self flush]; } --(void) _resetQueues{ - for (ASFKThreadpoolQueue* q in dataQueues) { - [q reset]; - [q unoccupy]; - } -} + -(void)flush{ [lock lock]; cblk->flushed=YES; [self _resetQueues]; - [queueZero reset]; [lock unlock]; busyCount=0; @@ -405,26 +323,13 @@ -(BOOL) isBusy{ return busyCount.load()>0?YES:NO; } --(eASFKPipelineExecutionStatus) select:(long)selector routineCancel:(ASFKCancellationRoutine)cancel{ +-(eASFKThreadpoolExecutionStatus) select:(long)selector routineCancel:(ASFKCancellationRoutine)cancel{ [lock lock]; [self _adoptDataFromZeroQueue]; - - if([intermediateProcs count]>0){ - if( isStopped.load()){ - [self _updateRoutines]; - isStopped=NO; - [intermediateProcs removeAllObjects]; - [lock unlock]; - return eASFK_ES_SKIPPED_MAINT; - }else{ - isStopped=YES; - [lock unlock]; - return eASFK_ES_SKIPPED_MAINT; - } - } + if(isStopped.load()){ - + [lock unlock]; return eASFK_ES_WAS_CANCELLED; @@ -434,7 +339,7 @@ -(eASFKPipelineExecutionStatus) select:(long)selector routineCancel:(ASFKCancell || [cblk cancellationRequestedByStarter]) ){ - + [self _resetPriorityQueue]; ASFKCancellationRoutine cru=cancellationHandler; [lock unlock]; @@ -445,7 +350,7 @@ -(eASFKPipelineExecutionStatus) select:(long)selector routineCancel:(ASFKCancell [self forgetAllSessions]; return eASFK_ES_WAS_CANCELLED; } - + ASFKExecutableRoutineSummary summary; summary=passSummary; sASFKPrioritizedQueueItem qin; @@ -485,7 +390,7 @@ -(eASFKPipelineExecutionStatus) select:(long)selector routineCancel:(ASFKCancell ASFKExecutableRoutine eproc=[procs objectAtIndex:curpos]; ASFKThreadpoolQueue* q=[dataQueues objectAtIndex:curpos]; [lock unlock]; - BOOL empty; + BOOL empty=NO; id result=[q pullAndOccupyWithId:selector empty:empty]; if(result) @@ -507,7 +412,7 @@ -(eASFKPipelineExecutionStatus) select:(long)selector routineCancel:(ASFKCancell } [lock unlock]; if([cblk cancellationRequestedByCallback]|| [cblk cancellationRequestedByStarter]){ - + [lock lock]; [self _resetPriorityQueue]; ASFKCancellationRoutine cru=cancellationHandler; @@ -530,7 +435,6 @@ -(eASFKPipelineExecutionStatus) select:(long)selector routineCancel:(ASFKCancell std::vector bc={busyCount.load()}; if(trp && [trp isConditionMetForLonglongValues:bc data:result]){ ASFKLog(@"Expiring session %@",self.sessionId); - [self flush]; cancel(self.sessionId); expirproc(cblk,@{},res); @@ -549,7 +453,8 @@ -(eASFKPipelineExecutionStatus) select:(long)selector routineCancel:(ASFKCancell } if([cblk flushRequested]){ [cblk flushRequested:NO]; - }else{ + } + else{ [lock lock]; long long dco=[dataQueues count]; long nextQ; @@ -604,6 +509,13 @@ -(eASFKPipelineExecutionStatus) select:(long)selector routineCancel:(ASFKCancell return eASFK_ES_HAS_MORE; } +#pragma mark - Private methods +-(void) _resetQueues{ + for (ASFKThreadpoolQueue* q in dataQueues) { + [q reset]; + [q unoccupy]; + } +} -(void) _resetPriorityQueue{ while (!pq.empty()) { pq.pop(); @@ -621,4 +533,21 @@ -(void) _adoptDataFromZeroQueue{ [queueZero reset]; } } +-(void)_updateRoutines{ + //empty priority queue + [self _resetPriorityQueue]; + [self _adoptDataFromZeroQueue]; + + long c=0; + for (ASFKThreadpoolQueue* q in dataQueues) { + sASFKPrioritizedQueueItem qin; + qin.queueId=c; + qin.priority=[q count]; + if(qin.priority>0){ + pq.push(qin); + } + ++c; + } + +} @end diff --git a/src/ASFKPrjConfig.h b/src/ASFKPrjConfig.h index e00d3f3..0643c41 100644 --- a/src/ASFKPrjConfig.h +++ b/src/ASFKPrjConfig.h @@ -21,7 +21,7 @@ #define __ASFK_DEBUG__ 1 #define __ASFK_VERBOSE_PRINTING__ 1 -#define ASFK_PRIVSYM_LOAD_FACTOR 1 +#define ASFK_PRIVSYM_TP_LOAD_FACTOR 1 #define ASFK_PRIVSYM_QOS_CLASS QOS_CLASS_BACKGROUND #define ASFK_PRIVSYM_MEM_PRESSURE_THRESHOLD 100000 #define ASFK_CALC_ELAPSED_TIME(starttime, endtime) (endtime-starttime)/double(1e9) @@ -34,7 +34,6 @@ #define ASFK_STR_INVALID_PARAM @"some of input parameters are invalid" -#define ASFK_TP_LOAD_FACTOR 3 #endif /* ASFKPrjConfig_h */ diff --git a/src/ASFKThreadpoolQueue.mm b/src/ASFKThreadpoolQueue.mm index 861e592..c5b33ef 100644 --- a/src/ASFKThreadpoolQueue.mm +++ b/src/ASFKThreadpoolQueue.mm @@ -12,13 +12,11 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ - // Copyright © 2019-2022 Boris Vigman. All rights reserved. // #import "ASFKBase.h" #import "ASFKLinearFlow+Internal.h" -//#import "ASFKThreadpoolQueue+Internal.h" #import "ASFKQueue+Internal.h" @implementation ASFKThreadpoolQueue{ long occupant;