From 9ad8dab6d5229211d6a8fbb0aff7e0bf2ef45173 Mon Sep 17 00:00:00 2001 From: pewsou Date: Sat, 18 Jun 2022 14:33:18 +0300 Subject: [PATCH] Mailbox+Pipeline: bug fixing+new feature Bugs fixing, Mailbox: noderation introduced; pipeline-testing --- src/ASFKBase+Internal.mm | 1 + src/ASFKBase+Statistics.h | 2 +- src/ASFKBase+Statistics.mm | 2 +- src/ASFKBase.h | 37 +- src/ASFKBase.mm | 46 ++- src/ASFKControlBlock+Internal.h | 6 +- src/ASFKControlBlock+Internal.mm | 2 +- src/ASFKControlBlock.mm | 22 +- src/ASFKExpirationCondition.h | 56 ++- src/ASFKExpirationCondition.mm | 369 ++++++++++++++++++-- src/ASFKFilter.h | 70 +++- src/ASFKFilter.mm | 15 +- src/ASFKFilteringQueue.h | 45 ++- src/ASFKFilteringQueue.mm | 69 +++- src/ASFKGlobalThreadpool.h | 4 +- src/ASFKGlobalThreadpool.mm | 57 ++- src/ASFKLinearFlow+Internal.h | 8 +- src/ASFKLinearFlow+Internal.mm | 1 - src/ASFKLinearFlow.h | 11 +- src/ASFKLinearFlow.mm | 52 +-- src/ASFKMBProperties.h | 7 +- src/ASFKMBSecret.h | 4 +- src/ASFKMailbox.h | 60 +++- src/ASFKMailbox.mm | 521 ++++++++++++++++++++++------ src/ASFKNonlinearFlow.h | 1 - src/ASFKNonlinearFlow.mm | 1 - src/ASFKPipelinePar.h | 45 +-- src/ASFKPipelinePar.mm | 85 +++-- src/ASFKPipelineSession+Internal.mm | 1 - src/ASFKPipelineSession.h | 3 - src/ASFKPipelineSession.mm | 90 +++-- src/ASFKPrjConfig.h | 8 +- 32 files changed, 1317 insertions(+), 384 deletions(-) diff --git a/src/ASFKBase+Internal.mm b/src/ASFKBase+Internal.mm index 114b5bc..207c455 100644 --- a/src/ASFKBase+Internal.mm +++ b/src/ASFKBase+Internal.mm @@ -12,6 +12,7 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ +// // Copyright © 2019-2022 Boris Vigman. All rights reserved. // diff --git a/src/ASFKBase+Statistics.h b/src/ASFKBase+Statistics.h index 1d5ee8b..0516e68 100644 --- a/src/ASFKBase+Statistics.h +++ b/src/ASFKBase+Statistics.h @@ -12,7 +12,7 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ - +// // Copyright © 2019-2022 Boris Vigman. All rights reserved. // diff --git a/src/ASFKBase+Statistics.mm b/src/ASFKBase+Statistics.mm index 0347627..2b0b83f 100644 --- a/src/ASFKBase+Statistics.mm +++ b/src/ASFKBase+Statistics.mm @@ -12,7 +12,7 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ - +// // Copyright © 2019-2022 Boris Vigman. All rights reserved. // diff --git a/src/ASFKBase.h b/src/ASFKBase.h index 4eb084c..02de9f3 100644 --- a/src/ASFKBase.h +++ b/src/ASFKBase.h @@ -18,7 +18,7 @@ #import #import "ASFKPrjConfig.h" -#define ASFK_VERSION @"0.2.2" +#define ASFK_VERSION @"0.3.1" #define ASFK_IDENTITY_TYPE id #ifdef __ASFK_VERBOSE_PRINTING__ @@ -80,11 +80,25 @@ #import #import - +/*! + @brief modes of dropping + */ enum eASFKQDroppingPolicy{ + /*! + @brief drop the newest item + */ E_ASFK_Q_DP_TAIL=0, + /*! + @brief drop the oldest item + */ E_ASFK_Q_DP_HEAD, + /*! + @brief don't drop, reject new candidate + */ E_ASFK_Q_DP_REJECT, + /*! + @brief select item for dropping using some algorithm + */ E_ASFK_Q_DP_ALGO }; @@ -128,6 +142,8 @@ typedef id ( ^ASFKThreadpoolSummary)(void); @return YES if flush attempt was issued; NO otherwise. */ -(BOOL) flushRequested; +-(void) setPaused:(BOOL) yesno; +-(BOOL) isPaused; -(void) reset; @end @@ -158,7 +174,7 @@ typedef id ( ^ASFKProgressRoutine)(NSUInteger stage,NSUInteger accomplished ,NSU @protected std::atomic indexSecondary; @public std::atomic< BOOL> flushed; - @public std::atomic< BOOL> paused; + @protected std::atomic< BOOL> paused; } @property (readonly) ASFK_IDENTITY_TYPE sessionId; @property (readonly) ASFK_IDENTITY_TYPE parentId; @@ -175,6 +191,7 @@ typedef id ( ^ASFKExecutableRoutine)(id controlBlock, id da typedef id ( ^ASFKExecutableRoutineSummary)(id controlBlock,NSDictionary* stats,id data); typedef id ( ^ASFKCancellationRoutine)(id identity); +typedef id ( ^ASFKOnPauseNotification)(id identity, BOOL paused); /** @param controlBlock object controlling the execution @@ -268,10 +285,11 @@ typedef BOOL ( ^ASFKExecutableRoutineLoopConditional)(id c @interface ASFKExecutionParams:NSObject{ @public ASFKProgressRoutine progressProc; -@public ASFKExecutableRoutineSummary SummaryRoutine; +@public ASFKExecutableRoutineSummary summaryRoutine; @public NSArray* procs; @public ASFKCancellationRoutine cancellationProc; @public ASFKExpirationCondition* expCondition; +@public ASFKOnPauseNotification onPauseProc; } @end @@ -305,15 +323,19 @@ typedef enum enumASFKPipelineExecutionStatus{ eASFK_ES_SKIPPED_MAINT } eASFKThreadpoolExecutionStatus; +typedef id ( ^ASFKThreadpoolSessionCancelProc)(id sessionId); @interface ASFKThreadpoolSession : ASFKBase{ + @public ASFKControlBlock* cblk; @protected ASFKExecutableRoutineSummary passSummary; @protected ASFKExecutableRoutineSummary expirationSummary; - @protected ASFKCancellationRoutine cancellationHandler; + @protected ASFKCancellationRoutine cancellationHandler; @protected NSMutableArray* procs; @protected ASFKExpirationCondition* excond; + @public ASFKOnPauseNotification onPauseNotification; @public std::atomic isStopped; @public std::atomic paused; + @public ASFKThreadpoolSessionCancelProc cancellationProc; } @property ASFK_IDENTITY_TYPE sessionId; @@ -326,7 +348,6 @@ typedef enum enumASFKPipelineExecutionStatus{ -(void) postDataItemsAsUnorderedSet:(NSSet*)set; -(void) postDataItemsAsDictionary:(NSDictionary*)dict; -(void) postDataItem:(id)dataItem; --(void) addRoutinesFromArray:(NSArray*)procs; -(void) replaceRoutinesWithArray:(NSArray*)procs; -(void) setProgressRoutine:(ASFKProgressRoutine)progress; -(void) setSummary:(ASFKExecutableRoutineSummary)sum; @@ -335,12 +356,10 @@ typedef enum enumASFKPipelineExecutionStatus{ -(void) setCancellationHandler:(ASFKCancellationRoutine)cru; -(void) setExpirationCondition:(ASFKExpirationCondition*) trop; -(BOOL) hasSessionSummary; - -(BOOL) isBusy; - -(long) procsCount; -(long) itemsCount; - +-(void) _invokeCancellationHandler:(ASFKCancellationRoutine) cru identity:(id)identity; @end #import "ASFKFilter.h" diff --git a/src/ASFKBase.mm b/src/ASFKBase.mm index 0e763b6..ecb172c 100644 --- a/src/ASFKBase.mm +++ b/src/ASFKBase.mm @@ -21,22 +21,26 @@ #include #import "ASFKQueue+Internal.h" -@implementation ASFKExecutionParams{} +@implementation ASFKExecutionParams{ + +} -(id) init{ self = [super init]; if(self) { progressProc=nil; - SummaryRoutine=nil; + summaryRoutine=nil; procs=nil; cancellationProc=nil; expCondition=nil; - + onPauseProc=nil; } return self; } @end -@implementation ASFKThreadpoolSession +@implementation ASFKThreadpoolSession{ + std::atomic cancelled; +} -(id)init{ self=[super init]; if(self){ @@ -51,11 +55,13 @@ -(id)initWithSessionId:(ASFK_IDENTITY_TYPE)sessionId andSubsessionId:(ASFK_IDENT } return self; } + -(void)_TPSinitWithSession:(ASFK_IDENTITY_TYPE)sessionId andSubsession:(ASFK_IDENTITY_TYPE)subId{ procs=[NSMutableArray array]; excond=[[ASFKExpirationCondition alloc]init]; isStopped=NO; paused=NO; + onPauseNotification=nil; if(sessionId){ cblk= [self newSession:sessionId andSubsession:subId]; }else{ @@ -63,19 +69,29 @@ -(void)_TPSinitWithSession:(ASFK_IDENTITY_TYPE)sessionId andSubsession:(ASFK_IDE } self.sessionId=cblk.sessionId; - passSummary=(id)^(id controlBlock,NSDictionary* stats,id data){ - ASFKLog(@"ASFKPipelineSession: Stub summary"); - return data; - }; - expirationSummary=(id)^(id controlBlock,NSDictionary* stats,id data){ - ASFKLog(@"ASFKPipelineSession: Stub expiration summary"); - return data; - }; + + passSummary=nil; + expirationSummary=nil; + onPauseNotification=nil; + cancelled=NO; cancellationHandler=^id(id identity){ + ASFKLog(@"Default cancellation handler"); return nil; }; + +} +-(void) _invokeCancellationHandler:(ASFKCancellationRoutine) cru identity:(id)identity{ + BOOL tval=NO; + if(cru==nil){ + return; + } + + if(cancelled.compare_exchange_strong(tval,YES)) + { + DASFKLog(@"Cancellation on the way, session %@",identity); + cru(identity); + } } - @end @implementation ASFKGlobalQueue{ @@ -112,6 +128,7 @@ -(id) submitBlocks:(NSArray*)blarray summary:(id(^)(void))summ __block dispatch_queue_t q=[self _resolveQueue:qos]; if(blocking){ if(blarray && [blarray count]>0){ + //ASFKLog(@"deploying %lu tasks",(unsigned long)[blarray count]); dispatch_apply([blarray count], q, ^(size_t index) { dispatch_block_t b= [blarray objectAtIndex:index]; b(); @@ -193,7 +210,7 @@ -(NSDictionary*)getStatistics{ -(void)cancelAll{ [lkNonLocal lock]; [ctrlblocks enumerateKeysAndObjectsWithOptions:NSEnumerationConcurrent usingBlock:^(id _Nonnull key, id _Nonnull obj, BOOL * _Nonnull stop) { - ASFKControlBlock* cb = (ASFKControlBlock*)obj;// [ctrlblocks objectForKey:key]; + ASFKControlBlock* cb = (ASFKControlBlock*)obj; if(cb){ [cb cancel]; }else{ @@ -246,4 +263,5 @@ -(BOOL) isBusySession:(ASFK_IDENTITY_TYPE)sessionId{ return NO; } + @end diff --git a/src/ASFKControlBlock+Internal.h b/src/ASFKControlBlock+Internal.h index d0c9743..080890e 100644 --- a/src/ASFKControlBlock+Internal.h +++ b/src/ASFKControlBlock+Internal.h @@ -12,14 +12,16 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ - +// // Copyright © 2019-2022 Boris Vigman. All rights reserved. #import "ASFKBase.h" @interface ASFKControlBlock (Internal) +//-(void) setStopped:(BOOL)stop; +//-(void) setTotalProcessorsNum:(NSUInteger)procs; -(void) setResultPosition:(NSUInteger)proc; - +//-(NSUInteger) getTotalProcessorsNum; -(NSUInteger) getResultPosition; -(void) setProgressRoutine:(ASFKProgressRoutine)progress; -(void) setSecondaryIndex:(NSUInteger)secind; diff --git a/src/ASFKControlBlock+Internal.mm b/src/ASFKControlBlock+Internal.mm index ff88e70..0105969 100644 --- a/src/ASFKControlBlock+Internal.mm +++ b/src/ASFKControlBlock+Internal.mm @@ -12,7 +12,7 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ - +// // Copyright © 2019-2022 Boris Vigman. All rights reserved. #import "ASFKControlBlock+Internal.h" diff --git a/src/ASFKControlBlock.mm b/src/ASFKControlBlock.mm index c05859d..0e93827 100644 --- a/src/ASFKControlBlock.mm +++ b/src/ASFKControlBlock.mm @@ -12,7 +12,7 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ - +// // Copyright © 2019-2022 Boris Vigman. All rights reserved. // @@ -23,6 +23,7 @@ @implementation ASFKControlBlock{ std::atomic< BOOL> abortByCallback; std::atomic< BOOL> abortByCaller; std::atomic< BOOL> abortByInternal; + //NSMutableArray* keys; std::vector> indexes; } -(id)initWithParent:(ASFK_IDENTITY_TYPE)parentId sessionId:(ASFK_IDENTITY_TYPE) sessionId andSubId:(ASFK_IDENTITY_TYPE)subid{ @@ -30,16 +31,18 @@ -(id)initWithParent:(ASFK_IDENTITY_TYPE)parentId sessionId:(ASFK_IDENTITY_TYPE) if(self){ _parentId=[parentId copy]; _sessionId=[ASFKBase concatIdentity:sessionId withIdentity:subid]; - + //keys=[NSMutableArray array]; + //_blkContainer=[[ASFKBlocksContainer alloc]init]; itsLock=[[NSLock alloc]init]; - + //[lock lock]; abortByCallback=NO; abortByCaller=NO; abortByInternal=NO; - + //stopped=YES; flushed=NO; paused=NO; - + //terminated=NO; + //[lock unlock]; } return self; } @@ -50,6 +53,7 @@ -(void)cancel{ -(void) flushRequested:(BOOL)flush{ flushed=flush; } + -(BOOL) flushRequested{ return flushed; } @@ -61,11 +65,17 @@ -(BOOL) cancellationRequestedByCallback{ BOOL b=abortByCallback; return b; } +-(void) setPaused:(BOOL) yesno{ + paused=yesno; +} +-(BOOL) isPaused{ + return paused; +} -(void) reset{ abortByCallback=NO; abortByCaller=NO; [itsLock lock]; - + //[keys removeAllObjects]; indexes.clear(); [itsLock unlock]; } diff --git a/src/ASFKExpirationCondition.h b/src/ASFKExpirationCondition.h index 3b32236..5bfad29 100644 --- a/src/ASFKExpirationCondition.h +++ b/src/ASFKExpirationCondition.h @@ -12,29 +12,42 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ - // Copyright © 2019-2022 Boris Vigman. All rights reserved. #ifndef ASFKExpirationCondition_h #define ASFKExpirationCondition_h #import #include -@interface ASFKCondition :NSObject +@interface ASFKCondition :NSObject{ + @protected NSLock* lock; +} +-(BOOL) isConditionMet:(id) data; -(BOOL) isConditionMetForDoubleValues:(std::vector&)values data:(id)data; -(BOOL) isConditionMetForBoolValues:(std::vector&)values data:(id)data; --(BOOL) isConditionMetForULonglongValues:(std::vector&)values data:(id)data; --(BOOL) isConditionMetForLonglongValues:(std::vector&)values data:(id)data; +-(BOOL) isConditionMetForULonglongValues:(std::vector&)values data:(id)data; +-(BOOL) isConditionMetForLonglongValues:(std::vector&)values data:(id)data; -(BOOL) isConditionMetAfterDateValue:(NSDate*)aDate data:(id)data; -(BOOL) isConditionMetForObject:(id)data; +-(BOOL) isConditionMetForDoubleValue:(double)value data:(id)data; +-(BOOL) isConditionMetForBoolValue:(BOOL)value data:(id)data; +-(BOOL) isConditionMetForULonglongValue:(NSUInteger)value data:(id)data; +-(BOOL) isConditionMetForLonglongValue:(NSInteger)value data:(id)data; + +-(std::vector&) getULLVector; +-(std::vector&) getLLVector; +-(std::vector&) getDoubleVector; +-(std::vector&) getBoolVector; +-(NSArray*) getDateVector; +-(NSArray*) getDataVector; @end @interface ASFKConditionNone :ASFKCondition --(BOOL) isConditionMetForLonglongValues:(std::vector&)values data:(id)data; +-(BOOL) isConditionMetForLonglongValues:(std::vector&)values data:(id)data; @end -@interface ASFKConditionOnBatchEnd:ASFKCondition --(BOOL) isConditionMetForLonglongValues:(std::vector&)values data:(id)data; -@end +//@interface ASFKConditionOnBatchEnd:ASFKCondition +//-(BOOL) isConditionMetForLonglongValues:(std::vector&)values data:(id)data; +//@end @interface ASFKConditionTemporal : ASFKCondition @property (readonly,nonatomic) NSDate* itsDeadline; @@ -75,18 +88,37 @@ @end #pragma mark - Expiration conditions @interface ASFKExpirationCondition : ASFKCondition - +-(BOOL) setULonglongArg:(NSUInteger)arg; +-(BOOL) setLonglongArg:(NSInteger)arg; +-(BOOL) setBoolArg:(BOOL)arg; +-(BOOL) setDoubleArg:(double)arg; +-(BOOL) setObjArg:(id)arg; +-(BOOL) setDateArg:(NSDate*)arg; +-(BOOL) setULonglongArgs:(std::vector&)args; +-(BOOL) setLonglongArgs:(std::vector&)arg; +-(BOOL) setBoolArgs:(std::vector&)arg; +-(BOOL) setDoubleArgs:(std::vector&)arg; +-(BOOL) setObjArgs:(NSArray*)arg; +-(BOOL) setDateArgs:(NSArray*)arg; +-(BOOL) setSampleLongLong:(NSInteger) val; @end @interface ASFKExpirationConditionNone :ASFKExpirationCondition - +-(id) initWithBatchSize:(NSInteger)size; @end +@interface ASFKExpirationConditionOnTimer : ASFKExpirationCondition +@property (nonatomic,readonly) ASFKConditionTemporal* expirationTimer; +-(id) initWithSeconds:(NSTimeInterval)sec; +-(id) initWithDate:(NSDate*)aDate; +-(id) initWithTemporalCondition:(ASFKConditionTemporal*)cond; +@end + @interface ASFKExpirationOnBatchEnd :ASFKExpirationCondition --(id) initWithBatchSize:(unsigned long long) bsize; +-(id) initWithBatchSize:(NSInteger)size skip:(NSInteger)skip; @end @interface ASFKConditionCallRelease : ASFKCondition{ -@private std::vector releaseArgBool; + @private std::vector releaseArgBool; @private std::vector releaseArgDouble; @private std::vector releaseArgLongLong; @private std::vector releaseArgULongLong; diff --git a/src/ASFKExpirationCondition.mm b/src/ASFKExpirationCondition.mm index 507956d..7b52b01 100644 --- a/src/ASFKExpirationCondition.mm +++ b/src/ASFKExpirationCondition.mm @@ -12,24 +12,265 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ - // Copyright © 2019-2022 Boris Vigman. All rights reserved. // #import "ASFKExpirationCondition.h" #import "ASFKBase.h" #include -@implementation ASFKCondition +@implementation ASFKCondition{ + std::atomic setDates; + NSMutableArray* datesArray; + std::atomic setData; + NSMutableArray* dataArray; + std::atomic setULL; + std::vector vectULL; + std::atomic setLL; + std::vector vectLL; + std::atomic setDouble; + std::vector vectDouble; + std::atomic setBool; + std::vector vectBool; +} +-(void) _initCond{ + lock = [NSLock new]; + setDates=NO; + datesArray=[NSMutableArray new]; + setData=NO; + dataArray =[NSMutableArray new]; + setULL=NO; + vectULL.resize(1); + setLL=NO; + vectLL.resize(1); + setBool=NO; + vectBool.resize(1); + setDouble=NO; + vectDouble.resize(1); +} +-(id) init{ + self=[super init]; + if(self){ + [self _initCond]; + + } + return self; +} +-(BOOL) setULonglongArg:(NSUInteger)arg{ + BOOL tval=NO; + if(setULL.compare_exchange_strong(tval,YES)){ + [lock lock]; + if(vectULL.size() != 1){ + vectULL.resize(1); + } + vectULL[0]=arg; + [lock unlock]; + return YES; + } + return NO; +} +-(BOOL) setLonglongArg:(NSInteger)arg{ + BOOL tval=NO; + if(setLL.compare_exchange_strong(tval,YES)){ + [lock lock]; + if(vectLL.size() != 1){ + vectLL.resize(1); + } + vectLL[0]=arg; + [lock unlock]; + return YES; + } + return NO; +} +-(BOOL) setBoolArg:(BOOL)arg{ + BOOL tval=NO; + if(setBool.compare_exchange_strong(tval,YES)){ + [lock lock]; + if(vectBool.size() != 1){ + vectBool.resize(1); + } + vectBool[0]=arg; + [lock unlock]; + return YES; + } + return NO; +} +-(BOOL) setDoubleArg:(double)arg{ + BOOL tval=NO; + if(setDouble.compare_exchange_strong(tval,YES)){ + [lock lock]; + if(vectDouble.size() != 1){ + vectDouble.resize(1); + } + vectDouble[0]=arg; + [lock unlock]; + return YES; + } + return NO; + +} +-(BOOL) setObjArg:(id)arg{ + BOOL tval=NO; + if(setData.compare_exchange_strong(tval,YES)){ + if(arg){ + [lock lock]; + if([dataArray count]>0){ + [dataArray removeAllObjects]; + } + [dataArray addObject:arg]; + [lock unlock]; + } + else{ + [lock lock]; + [dataArray removeAllObjects]; + [lock unlock]; + } + return YES; + } + return NO; +} +-(BOOL) setDateArg:(NSDate*)arg{ + BOOL tval=NO; + if(setDates.compare_exchange_strong(tval,YES)){ + if(arg){ + [lock lock]; + if([datesArray count]>0){ + [datesArray removeAllObjects]; + } + [datesArray addObject:arg]; + [lock unlock]; + } + else{ + [lock lock]; + [datesArray removeAllObjects]; + [lock unlock]; + } + return YES; + } + return NO; +} +-(BOOL) setULonglongArgs:(std::vector&)args{ + BOOL tval=NO; + if(setULL.compare_exchange_strong(tval,YES)){ + [lock lock]; + if(vectULL.size() != args.size()){ + vectULL.resize(args.size()); + } + vectULL=args; + [lock unlock]; + return YES; + } + return NO; +} +-(BOOL) setLonglongArgs:(std::vector&)args{ + BOOL tval=NO; + if(setLL.compare_exchange_strong(tval,YES)){ + [lock lock]; + if(vectLL.size() != args.size()){ + vectLL.resize(args.size()); + } + vectLL = args; + [lock unlock]; + return YES; + } + return NO; +} +-(BOOL) setBoolArgs:(std::vector&)args{ + BOOL tval=NO; + if(setBool.compare_exchange_strong(tval,YES)){ + [lock lock]; + if(vectBool.size() != args.size()){ + vectBool.resize(args.size()); + } + vectBool = args; + [lock unlock]; + return YES; + } + return NO; +} +-(BOOL) setDoubleArgs:(std::vector&)args{ + BOOL tval=NO; + if(setDouble.compare_exchange_strong(tval,YES)){ + [lock lock]; + if(vectDouble.size() != args.size()){ + vectDouble.resize(args.size()); + } + vectDouble = args; + [lock unlock]; + return YES; + } + return NO; +} +-(BOOL) setDateArgs:(NSArray*)args{ + BOOL tval=NO; + if(setDates.compare_exchange_strong(tval,YES)){ + if(args){ + [lock lock]; + if([datesArray count] != [args count]){ + [datesArray removeAllObjects]; + } + [datesArray arrayByAddingObjectsFromArray:args]; + [lock unlock]; + } + else{ + [lock lock]; + [datesArray removeAllObjects]; + [lock unlock]; + } + return YES; + } + return NO; +} +-(BOOL) setObjArgs:(NSArray*)args{ + BOOL tval=NO; + if(setData.compare_exchange_strong(tval,YES)){ + if(args){ + [lock lock]; + if([dataArray count] != [args count]){ + [dataArray removeAllObjects]; + } + [dataArray arrayByAddingObjectsFromArray:args]; + [lock unlock]; + } + else{ + [lock lock]; + [dataArray removeAllObjects]; + [lock unlock]; + } + return YES; + } + return NO; +} +-(std::vector&) getULLVector{ + return vectULL; +} +-(std::vector&) getLLVector{ + return vectLL; +} +-(std::vector&) getDoubleVector{ + return vectDouble; +} +-(std::vector&) getBoolVector{ + return vectBool; +} +-(NSArray*) getDateVector{ + return datesArray; +} +-(NSArray*) getDataVector{ + return dataArray; +} +-(BOOL) isConditionMet:(id) data{ + return NO; +} -(BOOL) isConditionMetForDoubleValues:(std::vector&)values data:(id)data{ return NO; } -(BOOL) isConditionMetForBoolValues:(std::vector&)value data:(id)data{ return NO; } --(BOOL) isConditionMetForULonglongValues:(std::vector&)value data:(id)data{ +-(BOOL) isConditionMetForULonglongValues:(std::vector&)value data:(id)data{ return NO; } --(BOOL) isConditionMetForLonglongValues:(std::vector&)value data:(id)data{ +-(BOOL) isConditionMetForLonglongValues:(std::vector&)value data:(id)data{ return NO; } -(BOOL) isConditionMetAfterDateValues:(NSDate*)aDate data:(id)data{ @@ -38,21 +279,24 @@ -(BOOL) isConditionMetAfterDateValues:(NSDate*)aDate data:(id)data{ -(BOOL) isConditionMetForObject:(id)data{ return NO; } +-(BOOL) isConditionMetForDoubleValue:(double)value data:(id)data{ + return NO; +} +-(BOOL) isConditionMetForBoolValue:(BOOL)value data:(id)data{ + return NO; +} +-(BOOL) isConditionMetForULonglongValue:(NSUInteger)value data:(id)data{ + return NO; +} +-(BOOL) isConditionMetForLonglongValue:(NSInteger)value data:(id)data{ + return NO; +} @end @implementation ASFKConditionNone @end -@implementation ASFKConditionOnBatchEnd - --(BOOL) isConditionMetForLonglongValues:(std::vector&)values data:(id)data{ - if(values.size()>0 && values[0]>0){ - return NO; - } - return YES; -} -@end @implementation ASFKConditionTemporal{ std::chrono::time_point timePoint; @@ -236,36 +480,121 @@ -(BOOL) isConditionMetAfterDateValue:(NSDate*)aDate data:(id)data{ @end @implementation ASFKExpirationCondition +-(BOOL) setSampleLongLong:(NSInteger) val{ + return NO; +} @end @implementation ASFKExpirationConditionNone @end + +@implementation ASFKExpirationConditionOnTimer +-(id) init{ + self = [super init]; + if(self){ + _expirationTimer=[ASFKConditionTemporal new]; + } + return self; +} +-(id) initWithSeconds:(NSTimeInterval)sec{ + self = [super init]; + if(self){ + _expirationTimer=[ASFKConditionTemporal new]; + [_expirationTimer setDelay:sec]; + [_expirationTimer delayToDeadline]; + } + return self; +} +-(id) initWithDate:(NSDate*)aDate{ + if(self){ + _expirationTimer=[ASFKConditionTemporal new]; + [_expirationTimer setDueDate:aDate]; + [_expirationTimer deadlineToDelay]; + } + return self; +} +-(id) initWithTemporalCondition:(ASFKConditionTemporal*)cond{ + if(self){ + _expirationTimer=[ASFKConditionTemporal new]; + [_expirationTimer setFromTemporalCondition:cond]; + } + return self; +} + +-(BOOL) isConditionMet:(id) data{ + BOOL r=NO; + if([self.expirationTimer isConditionMetAfterDateValue:[NSDate date] data:nil]){ + r=YES; + NSLog(@"expiration by timer"); + } + return r; +} + +@end + @implementation ASFKExpirationOnBatchEnd{ - std::atomic batchSize; + std::atomic batchSize; + std::atomic skipItems; + std::atomic sample; } -(id) init{ self = [super init]; if(self){ - batchSize=ULONG_MAX; + sample=0; + skipItems=0; + batchSize=0; } return self; } --(id) initWithBatchSize:(unsigned long long) bsize{ +-(id) initWithBatchSize:(NSInteger)size skip:(NSInteger)skip{ self = [super init]; if(self){ - batchSize=bsize; + if(skip<0){ + skip=0; + } + if(size<0){ + size=0; + } + sample=0; + skipItems=skip; + batchSize=size; + } return self; } --(BOOL) isConditionMetForLonglongValue:(std::vector&)values data:(id)data{ - batchSize.fetch_sub(1); - if(batchSize.load()>0){ +-(BOOL) isConditionMet:(id)data{ + + BOOL x=[self isConditionMetForLonglongValue:sample data:nil]; + + return x; +} +-(BOOL) isConditionMetForLonglongValue:(NSInteger)value data:(id)data{ + BOOL res=NO; + if(value > 0){ + skipItems.fetch_sub(1); + } + + if(skipItems.load() > 0){ return NO; } + if(batchSize > 0){ + batchSize.fetch_sub(1); + } + if(batchSize > 0){ + res=NO; + } + else{ + res=YES; + } + return res; +} +-(BOOL) setSampleLongLong:(NSInteger)val{ + sample=val; return YES; } @end + @implementation ASFKConditionCallRelease @end diff --git a/src/ASFKFilter.h b/src/ASFKFilter.h index 5273fb0..8c56956 100644 --- a/src/ASFKFilter.h +++ b/src/ASFKFilter.h @@ -12,19 +12,73 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ -// Created by Boris Vigman on 15/02/2019. // Copyright © 2019-2022 Boris Vigman. All rights reserved. // #import "ASFKBase.h" @interface ASFKFilter : ASFKLinearFlow +/*! + @brief Tests given object with some custom criteria. + @param object object to test. + @return YES if test passes; NO otherwise. + */ -(BOOL) testCriteriaMatch:(id)object; --(BOOL) filterCandidatesInArray:(NSArray*)objects saveToArray:(NSMutableArray*)array; --(BOOL) filterCandidatesInArray:(NSArray*)objects saveToIndexSet:(NSMutableIndexSet*)iset; --(BOOL) filterCandidatesInArray:(NSArray*)objects saveToRange:(NSRange&)range; --(BOOL) filterCandidatesInSet:(NSSet*)objects saveToArray:(NSMutableArray*)array; --(BOOL) filterCandidatesInOrderedSet:(NSOrderedSet*)objects saveToIndexSet:(NSMutableIndexSet*)iset; --(BOOL) filterCandidatesInOrderedSet:(NSOrderedSet*)objects saveToRange:(NSRange&)range; --(BOOL) filterCandidatesInDictionary:(NSDictionary*)objects saveToKeys:(NSMutableArray*)keys values:(NSMutableArray*)values; +/*! + @brief Tests array of objects. + @param objects objects to test. + @param writeOut indication of whether to save passing objects or non-passing. YES for passing, NO otherwise. + @param array array of filtered objects. + @return YES if at least one test passes; NO otherwise. + */ +-(BOOL) filterCandidatesInArray:(NSArray*)objects passing:(BOOL)writeOut saveToArray:(NSMutableArray*)array; +/*! + @brief Tests array of objects. + @param objects objects to test. + @param writeOut indication of whether to save passing objects or non-passing. YES for passing, NO otherwise. + @param iset index set of filtered objects. + @return YES if at least one test passes; NO otherwise. + */ +-(BOOL) filterCandidatesInArray:(NSArray*)objects passing:(BOOL)writeOut saveToIndexSet:(NSMutableIndexSet*)iset; +/*! + @brief Tests array of objects. + @param objects objects to test. + @param writeOut indication of whether to save passing objects or non-passing. YES for passing, NO otherwise. + @param range range of indexes of filtered objects. + @return YES if at least one test passes; NO otherwise. + */ +-(BOOL) filterCandidatesInArray:(NSArray*)objects passing:(BOOL)writeOut saveToRange:(NSRange&)range; +/*! + @brief Tests unordered set of objects. + @param objects objects to test. + @param writeOut indication of whether to save passing objects or non-passing. YES for passing, NO otherwise. + @param array array of filtered objects. + @return YES if at least one test passes; NO otherwise. + */ +-(BOOL) filterCandidatesInSet:(NSSet*)objects passing:(BOOL)writeOut saveToArray:(NSMutableArray*)array; +/*! + @brief Tests unordered set of objects. + @param objects ordered set of objects to test. + @param writeOut indication of whether to save passing objects or non-passing. YES for passing, NO otherwise. + @param iset index set of filtered objects. + @return YES if at least one test passes; NO otherwise. + */ +-(BOOL) filterCandidatesInOrderedSet:(NSOrderedSet*)objects passing:(BOOL)writeOut saveToIndexSet:(NSMutableIndexSet*)iset; +/*! + @brief Tests unordered set of objects. + @param objects ordered set of objects to test. + @param writeOut indication of whether to save passing objects or non-passing. YES for passing, NO otherwise. + @param range range of indexes of filtered objects. + @return YES if at least one test passes; NO otherwise. + */ +-(BOOL) filterCandidatesInOrderedSet:(NSOrderedSet*)objects passing:(BOOL)writeOut saveToRange:(NSRange&)range; +/*! + @brief Tests dictionary of objects. + @param objects ordered set of objects to test. + @param writeOut indication of whether to save passing objects or non-passing. YES for passing, NO otherwise. + @param keys array of keys of filtered objects. If nil, then not used. + @param values array of values of filtered objects. If nil, then not used. + @return YES if at least one test passes; NO otherwise. + */ +-(BOOL) filterCandidatesInDictionary:(NSDictionary*)objects passing:(BOOL)writeOut saveToKeys:(NSMutableArray*)keys values:(NSMutableArray*)values; @end diff --git a/src/ASFKFilter.mm b/src/ASFKFilter.mm index d3cd0d9..7e0df16 100644 --- a/src/ASFKFilter.mm +++ b/src/ASFKFilter.mm @@ -12,7 +12,6 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ -// Created by Boris Vigman on 15/02/2019. // Copyright © 2019-2022 Boris Vigman. All rights reserved. // @@ -42,25 +41,25 @@ -(NSDictionary*) _callObject:(ASFKParamSet *)params{ -(BOOL) testCriteriaMatch:(id)object{ return YES; } --(BOOL) filterCandidatesInArray:(NSArray*)objects saveToArray:(NSMutableArray*)array{ +-(BOOL) filterCandidatesInArray:(NSArray*)objects passing:(BOOL)writeOut saveToArray:(NSMutableArray*)array{ return YES; } --(BOOL) filterCandidatesInArray:(NSArray*)objects saveToIndexSet:(NSMutableIndexSet*)iset{ +-(BOOL) filterCandidatesInArray:(NSArray*)objects passing:(BOOL)writeOut saveToIndexSet:(NSMutableIndexSet*)iset{ return YES; } --(BOOL) filterCandidatesInArray:(NSArray*)objects saveToRange:(NSRange&)range{ +-(BOOL) filterCandidatesInArray:(NSArray*)objects passing:(BOOL)writeOut saveToRange:(NSRange&)range{ return YES; } --(BOOL) filterCandidatesInSet:(NSSet*)objects saveToArray:(NSMutableArray*)array{ +-(BOOL) filterCandidatesInSet:(NSSet*)objects passing:(BOOL)writeOut saveToArray:(NSMutableArray*)array{ return YES; } --(BOOL) filterCandidatesInOrderedSet:(NSOrderedSet*)objects saveToIndexSet:(NSMutableIndexSet*)iset{ +-(BOOL) filterCandidatesInOrderedSet:(NSOrderedSet*)objects passing:(BOOL)writeOut saveToIndexSet:(NSMutableIndexSet*)iset{ return YES; } --(BOOL) filterCandidatesInOrderedSet:(NSOrderedSet*)objects saveToRange:(NSRange&)range{ +-(BOOL) filterCandidatesInOrderedSet:(NSOrderedSet*)objects passing:(BOOL)writeOut saveToRange:(NSRange&)range{ return YES; } --(BOOL) filterCandidatesInDictionary:(NSDictionary*)objects saveToKeys:(NSMutableArray*)keys values:(NSMutableArray*)values{ +-(BOOL) filterCandidatesInDictionary:(NSDictionary*)objects passing:(BOOL)writeOut saveToKeys:(NSMutableArray*)keys values:(NSMutableArray*)values{ return YES; } @end diff --git a/src/ASFKFilteringQueue.h b/src/ASFKFilteringQueue.h index 9287e2e..f3b189b 100644 --- a/src/ASFKFilteringQueue.h +++ b/src/ASFKFilteringQueue.h @@ -19,10 +19,49 @@ @interface ASFKFilteringQueue : ASFKQueue typedef NSIndexSet* (^clbkASFKFQFilter)(NSArray* collection, NSRange range); --(void) setMaxQSize:(NSUInteger)size; --(void) setMinQSize:(NSUInteger)size; +/*! + @brief Sets maximum queue size. + @discussion when the queue size reached this value any further enqueing operation will not increase it. + @param size required maximum size. + @return YES if the update left the limits in ascending order, NO otherwise. + */ +-(BOOL) setMaxQSize:(NSUInteger)size; +/*! + @brief Sets minimum queue size. + @discussion when the queue size reached this value any further enqueing operation will not decrease it. + @param size required minimum size. + @return YES if the update left the limits in ascending order, NO otherwise. + */ +-(BOOL) setMinQSize:(NSUInteger)size; +/*! + @brief Sets dropping methods for this queue. + @discussion when the queue's maximum size reached then on 'push' operation decision needs to be taken regarding fresh candidate. In order to keep the queue size unchanged some item(s) need to be discarded; alternatively new candidate may be rejected. This method sets specific dropping mode. + */ -(void) setDroppingPolicy:(eASFKQDroppingPolicy)policy; +/*! + @brief Sets dropping algorithm for this queue. + @discussion when the queue's maximum size reached then on 'push' operation decision needs to be taken regarding fresh candidate. In order to keep the queue size unchanged some item(s) need to be discarded; alternatively new candidate may be rejected. this method sets specific dropping algorittm. + @param dropAlg the custom dropping algorithm; may bi nil. + */ -(void) setDroppingAlgorithmL1:(ASFKFilter*)dropAlg; +/*! + @brief Pulls item from queue, while simulating the queue size. + @discussion Sometimes it is necessary to pull item from queue while pretending that its size is differend from actual. + @param count number to be temporarily added to the queue size while deciding if item can be pulled. + */ +-(id) pullWithCount:(NSInteger) count; +/*! + @brief Filters queue with provided filtering object. + @discussion Leaves in queue only items that do not match filtering criteria. + @param filter the filtering object; may be nil. + */ -(void) filterWith:(ASFKFilter*)filter; --(BOOL) removeObjWithId:(id)obj andBlock:(BOOL (^)(id item,id sample, BOOL* stop)) blk; +/*! + @brief Removes from queue given object. + @discussion Removes from queue all objects equal to given object with respect to provided property; equality is defined by the block. + @param obj object to remove; may not be nil. + @param blk block that tests equality; must return YES to remove; may not be nil. + @return YES for succesful removal; NO otherwise. + */ +-(BOOL) removeObjWithProperty:(id)obj andBlock:(BOOL (^)(id item,id sample, BOOL* stop)) blk; @end diff --git a/src/ASFKFilteringQueue.mm b/src/ASFKFilteringQueue.mm index bc76b65..272b4ca 100644 --- a/src/ASFKFilteringQueue.mm +++ b/src/ASFKFilteringQueue.mm @@ -45,11 +45,23 @@ -(void) _initFQ{ minQSize=0; maxQSize=ULONG_MAX; } --(void) setMaxQSize:(NSUInteger)size{ +-(BOOL) setMaxQSize:(NSUInteger)size{ + BOOL r=YES; + if(size <= minQSize){ + r=NO; + WASFKLog(@"new upper limit is not greater than lower limit"); + } maxQSize=size; + return r; } --(void) setMinQSize:(NSUInteger)size{ +-(BOOL) setMinQSize:(NSUInteger)size{ + BOOL r=YES; + if(size >= maxQSize){ + r=NO; + WASFKLog(@"new lower limit is not less than upper limit"); + } minQSize=size; + return r; } -(void) setDroppingPolicy:(eASFKQDroppingPolicy)policy{ dpolicy=policy; @@ -59,14 +71,16 @@ -(void) setDroppingAlgorithmL1:(ASFKFilter*)dropAlg{ itsFilter = dropAlg; [lkNonLocal unlock]; } --(BOOL) removeObjWithId:(id)obj andBlock:(BOOL (^)(id item,id sample, BOOL* stop)) blk{ +-(BOOL) removeObjWithProperty:(id)obj andBlock:(BOOL (^)(id item,id sample, BOOL* stop)) blk{ BOOL r=NO; - NSMutableIndexSet* mis=[NSMutableIndexSet new]; - if(obj){ + + if(obj && blk){ + NSMutableIndexSet* mis=[NSMutableIndexSet new]; NSUInteger c=0; [lkNonLocal lock]; BOOL stop = NO; for (id o in q) { + r=YES; if(blk(o,obj,&stop)){ [mis addIndex:c]; } @@ -77,10 +91,8 @@ -(BOOL) removeObjWithId:(id)obj andBlock:(BOOL (^)(id item,id sample, BOOL* stop } } [q removeObjectsAtIndexes:mis]; - [lkNonLocal unlock]; - - [mis removeAllIndexes]; + [lkNonLocal unlock]; } return r; } @@ -116,22 +128,43 @@ -(BOOL)push:(id)item{ res=NO; } else if(dpolicy == E_ASFK_Q_DP_ALGO){ - NSMutableIndexSet* iset=[NSMutableIndexSet new]; - res=[itsFilter filterCandidatesInArray:q saveToIndexSet:iset]; - if(res){ - [q removeObjectsAtIndexes:iset]; - [q addObject:item]; - res=YES; + ASFKFilter* ft=nil; + ft=itsFilter; + if(nil==ft){ + res=NO; } else{ - res=NO; + NSMutableIndexSet* iset=[NSMutableIndexSet new]; + res=[ft filterCandidatesInArray:q passing:YES saveToIndexSet:iset]; + if(res){ + [q removeObjectsAtIndexes:iset]; + [q addObject:item]; + res=YES; + } + else{ + res=NO; + } } + } } [lock unlock]; } return res; } +-(id)pullWithCount:(NSInteger) count{ + [lock lock]; + NSUInteger qc = [q count]; + id item=[q firstObject]; + if (item && qc + count >= minQSize) { + [q removeObjectAtIndex:0]; + } + else{ + item=nil; + } + [lock unlock];; + return item; +} -(id)pull{ [lock lock]; NSUInteger qc = [q count]; @@ -148,19 +181,19 @@ -(id)pull{ } -(void) filterWith:(ASFKFilter*)filter{ ASFKFilter* ft=filter; + [lock lock]; if(!ft) { ft=itsFilter; } if(ft){ NSMutableIndexSet* iset=[NSMutableIndexSet new]; - BOOL res=[itsFilter filterCandidatesInArray:q saveToIndexSet:iset]; + BOOL res=[ft filterCandidatesInArray:q passing:YES saveToIndexSet:iset]; if(res){ - [lock lock]; [q removeObjectsAtIndexes:iset]; - [lock unlock]; } } + [lock unlock]; } @end diff --git a/src/ASFKGlobalThreadpool.h b/src/ASFKGlobalThreadpool.h index 658e15b..eaf41ac 100644 --- a/src/ASFKGlobalThreadpool.h +++ b/src/ASFKGlobalThreadpool.h @@ -12,7 +12,7 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ - +// // Copyright © 2019-2022 Boris Vigman. All rights reserved. // @@ -44,5 +44,5 @@ -(void) pauseAll; -(void) resumeSession:(ASFK_IDENTITY_TYPE)sessionId; -(void) resumeAll; --(long) itemsCountForSession:(ASFK_IDENTITY_TYPE)sessionId; +-(long long) itemsCountForSession:(ASFK_IDENTITY_TYPE)sessionId; @end diff --git a/src/ASFKGlobalThreadpool.mm b/src/ASFKGlobalThreadpool.mm index 55d48fd..a66621a 100644 --- a/src/ASFKGlobalThreadpool.mm +++ b/src/ASFKGlobalThreadpool.mm @@ -12,7 +12,7 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ - +// // Copyright © 2019-2022 Boris Vigman. All rights reserved. // @@ -44,7 +44,7 @@ @implementation ASFKGlobalThreadpool{ ThreadpoolConfig tpcfg; NSLock* lkMutexL1; NSLock* lkMutexL2; - + //NSLock* lkMutexL1; NSCondition* lkCond; std::atomic shouldSleep; @@ -140,7 +140,7 @@ -(NSArray*) getThreadpoolSessionsList{ return a; } --(long) itemsCountForSession:(ASFK_IDENTITY_TYPE)sessionId{ +-(long long) itemsCountForSession:(ASFK_IDENTITY_TYPE)sessionId{ long result=0; if(sessionId){ [lkMutexL1 lock]; @@ -178,6 +178,26 @@ -(void) flushAll{ [lkMutexL1 unlock]; } +-(void) _cancelSessionInternally:(ASFK_IDENTITY_TYPE)sessionId{ + DASFKLog(@"ASFKGlobalThreadpool: Cancelling session with ID %@, internal trigger",sessionId); + if(sessionId){ + [lkMutexL1 lock]; + ASFKThreadpoolSession* ss=[allSessions objectForKey:sessionId]; + if(ss){ + [runningSessions removeObjectForKey:sessionId]; + [lkMutexL2 lock]; + + onlineSessions=[runningSessions allValues]; + [lkMutexL2 unlock]; + + [allSessions removeObjectForKey:sessionId]; + [pausedSessions removeObjectForKey:sessionId]; + ss=nil; + } + [lkMutexL1 unlock]; + } + DASFKLog(@"ASFKGlobalThreadpool: Session %@ should be cancelled",sessionId); +} -(void) cancelSession:(ASFK_IDENTITY_TYPE)sessionId{ DASFKLog(@"ASFKGlobalThreadpool: Cancelling session with ID %@",sessionId); if(sessionId){ @@ -238,7 +258,9 @@ -(void) pauseSession:(ASFK_IDENTITY_TYPE)sessionId{ [lkMutexL2 lock]; onlineSessions=[runningSessions allValues]; [lkMutexL2 unlock]; - ss=nil; + if(ss->onPauseNotification){ + ss->onPauseNotification(sessionId,YES); + } } [lkMutexL1 unlock]; @@ -249,12 +271,11 @@ -(void) pauseAll{ DASFKLog(@"ASFKGlobalThreadpool: Pausing all sessions"); [lkMutexL1 lock]; [pausedSessions addEntriesFromDictionary:runningSessions]; - [runningSessions enumerateKeysAndObjectsWithOptions:NSEnumerationConcurrent usingBlock:^(id _Nonnull key, id _Nonnull obj, BOOL * _Nonnull stop){ + runningSessions=[NSMutableDictionary new]; + [pausedSessions enumerateKeysAndObjectsWithOptions:NSEnumerationConcurrent usingBlock:^(id _Nonnull key, id _Nonnull obj, BOOL * _Nonnull stop){ ASFKThreadpoolSession* ss=(ASFKThreadpoolSession*)obj; ss->paused=YES; }]; - - runningSessions=[NSMutableDictionary new]; [lkMutexL2 lock]; onlineSessions = [NSArray new]; [lkMutexL2 unlock]; @@ -273,6 +294,9 @@ -(void) resumeSession:(ASFK_IDENTITY_TYPE)sessionId{ [lkMutexL2 lock]; onlineSessions = [runningSessions allValues]; [lkMutexL2 unlock]; + if(ss->onPauseNotification){ + ss->onPauseNotification(sessionId,NO); + } } [lkMutexL1 unlock]; ss=nil; @@ -284,18 +308,15 @@ -(void) resumeAll{ [lkMutexL1 lock]; [runningSessions addEntriesFromDictionary:pausedSessions]; - [pausedSessions enumerateKeysAndObjectsWithOptions:NSEnumerationConcurrent usingBlock:^(id _Nonnull key, id _Nonnull obj, BOOL * _Nonnull stop){ - ASFKThreadpoolSession* ss=(ASFKThreadpoolSession*)obj; - if(ss){ - ss->paused=NO;; - - } - }]; pausedSessions=[NSMutableDictionary new]; [lkMutexL2 lock]; onlineSessions=[runningSessions allValues]; [lkMutexL2 unlock]; + [runningSessions enumerateKeysAndObjectsWithOptions:NSEnumerationConcurrent usingBlock:^(id _Nonnull key, id _Nonnull obj, BOOL * _Nonnull stop){ + ASFKThreadpoolSession* ss=(ASFKThreadpoolSession*)obj; + ss->paused=NO; + }]; [lkMutexL1 unlock]; DASFKLog(@"ASFKGlobalThreadpool: All sessions resumed"); } @@ -402,7 +423,8 @@ -(void) _reassignProcs:(ThreadpoolConfig&)tpc{ tcr.lowBound=r; vectProc2Bounds[tpc.actualThreadsCount-r]=tcr; } - }else{ + } + else{ tpc.share = tpc.requiredThreadsCount / tpc.actualThreadsCount; tpc.residue = tpc.requiredThreadsCount%tpc.actualThreadsCount; long residue=tpc.residue; @@ -423,6 +445,7 @@ -(void) _reassignProcs:(ThreadpoolConfig&)tpc{ -(void) _engineDeploy{ __block NSMutableArray* blocks=[NSMutableArray array]; + long i=0; for (i=0; icblk cancellationRequested]){ ThreadpoolConfig tpc1=tpcfg; //[lkMutexL2 lock]; + onlineSessions=nil; onlineSessions=[runningSessions allValues]; [self _reassignProcs:tpc1]; [lkMutexL2 unlock]; @@ -476,9 +500,10 @@ -(void) _engineDeploy{ if(ss){ [ss select:ii routineCancel:^id(id identity) { DASFKLog(@"Stopping session %@, selector %ld",identity,selectedSlot); + + [self _cancelSessionInternally:identity]; ThreadpoolConfig tpc1=tpcfg; [lkMutexL2 lock]; - onlineSessions=[runningSessions allValues]; [self _reassignProcs:tpc1]; [lkMutexL2 unlock]; //[lkMutexL1 unlock]; diff --git a/src/ASFKLinearFlow+Internal.h b/src/ASFKLinearFlow+Internal.h index d72906c..d4fb0c1 100644 --- a/src/ASFKLinearFlow+Internal.h +++ b/src/ASFKLinearFlow+Internal.h @@ -12,7 +12,6 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ -// Created by Boris Vigman on 15/02/2019. // Copyright © 2019-2022 Boris Vigman. All rights reserved. // @@ -25,9 +24,10 @@ @property (nonatomic) ASFKProgressRoutine progress; @property (nonatomic) ASFKCancellationRoutine cancProc; @property (nonatomic) ASFKExpirationCondition* excond; +@property (nonatomic) ASFKOnPauseNotification onPause; @property (nonatomic) id input; @property (nonatomic) ASFK_IDENTITY_TYPE sessionId; - +//@property (nonatomic) BOOL hasForeignProcs; @end @interface ASFKLinearFlow (Internal) @@ -45,5 +45,7 @@ -(ASFKParamSet*) _convertInputUnorderedSet:(NSSet*) input to:(ASFKParamSet*)ps; -(ASFKParamSet*) _convertInput:(id) input to:(ASFKParamSet*)ps; -(ASFKParamSet*) _decodeExParams:(ASFKExecutionParams*)ex forSession:(id)sessionId; - +//-(void) _registerSession:(id)sessionId; +//-(void) _unregisterSession:(id)sessionId; +//-(void) _unregisterAllSessions; @end diff --git a/src/ASFKLinearFlow+Internal.mm b/src/ASFKLinearFlow+Internal.mm index 8d68a3a..18f0e84 100644 --- a/src/ASFKLinearFlow+Internal.mm +++ b/src/ASFKLinearFlow+Internal.mm @@ -13,7 +13,6 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ - // Copyright © 2019-2022 Boris Vigman. All rights reserved. // diff --git a/src/ASFKLinearFlow.h b/src/ASFKLinearFlow.h index ed203df..fd64246 100644 --- a/src/ASFKLinearFlow.h +++ b/src/ASFKLinearFlow.h @@ -92,8 +92,9 @@ @interface ASFKLinearFlow : ASFKBase{ @protected NSMutableArray * _backprocs; @protected NSArray *lfProcs; - @protected ASFKCancellationRoutine cancelproc; - @protected ASFKExecutableRoutineSummary sumproc; + @protected ASFKExecutableRoutineSummary sumProc; + @protected ASFKOnPauseNotification onPauseProc; + @protected ASFKCancellationRoutine cancellationHandler; @protected dispatch_semaphore_t semHighLevelCall; } -(NSArray *) getRoutines; @@ -105,11 +106,13 @@ @brief Equals NO if sender is updating stored Routines; YES otherwise. */ -(BOOL) isReady; + /** @brief Appends block which invokes Objective-C code; the block is added to internal collection. This operation may succeed only if no Routine is active at time of addition. @param proc block that processes a data. */ -(BOOL) addRoutine:(ASFKExecutableRoutine)proc; + /** @brief Stores array of Routines for later use; content of array is copied and added to internal collection. This operation may succeed only if no Routine is active at time of addition. @@ -117,18 +120,20 @@ @return YES if operation succeeded; NO otherwise; */ -(BOOL) addRoutines:(NSArray*)procs; + /** @brief Replaces existing collection of Routines with new one. This operation may succeed only if no Routine is active at time of addition. @param procs new array of Routines. If aray is empty or nil, nothing happens. @return YES if operation succeeded; NO otherwise. */ --(BOOL) setRoutinesFromArray:(NSArray*)procs; +-(BOOL) replaceRoutinesFromArray:(NSArray*)procs; /** @brief Stores summary block which invokes Objective-C code @param summary block that is called after all Routines. */ -(BOOL) setSummary:(ASFKExecutableRoutineSummary)summary; +-(BOOL) setOnPauseNotification:(ASFKOnPauseNotification)notification; /** @brief Stores block which invokes Objective-C code as a summary for cancelled session. @param ch block that is called in case of cancellation. diff --git a/src/ASFKLinearFlow.mm b/src/ASFKLinearFlow.mm index 2001e92..9aa40ee 100644 --- a/src/ASFKLinearFlow.mm +++ b/src/ASFKLinearFlow.mm @@ -45,11 +45,11 @@ -(void)_initLF{ itsIsReady=YES; _backprocs=[[NSMutableArray alloc]init]; lfProcs=_backprocs; - sumproc=(id)^(id controlBlock,NSDictionary* stats,id data){ + sumProc=(id)^(id controlBlock,NSDictionary* stats,id data){ ASFKLog(@"ASFKLinearFlow: Stub summary"); return data; }; - cancelproc=nil; + cancellationHandler=nil; progressProc=nil; semHighLevelCall=dispatch_semaphore_create(1); } @@ -70,13 +70,13 @@ -(NSUInteger) getRoutinesCount{ } -(ASFKExecutableRoutineSummary) getSummaryRoutine{ [lkNonLocal lock]; - ASFKExecutableRoutineSummary c=sumproc; + ASFKExecutableRoutineSummary c=sumProc; [lkNonLocal unlock]; return c; } -(ASFKCancellationRoutine) getCancellationHandler{ [lkNonLocal lock]; - ASFKCancellationRoutine c=cancelproc; + ASFKCancellationRoutine c=cancellationHandler; [lkNonLocal unlock]; return c; } @@ -133,7 +133,7 @@ -(BOOL) addRoutines:(NSArray*)procs{ } return YES; } --(BOOL) setRoutinesFromArray:(NSArray*)someprocs{ +-(BOOL) replaceRoutinesFromArray:(NSArray*)someprocs{ BOOL replaced=NO; dispatch_semaphore_wait(semHighLevelCall, DISPATCH_TIME_FOREVER); itsIsReady=NO; @@ -154,13 +154,29 @@ -(BOOL) setRoutinesFromArray:(NSArray*)someprocs{ -(BOOL) isReady{ return itsIsReady; } +-(BOOL) setOnPauseNotification:(ASFKOnPauseNotification)notification{ + if(notification){ + dispatch_semaphore_wait(semHighLevelCall, DISPATCH_TIME_FOREVER); + itsIsReady=NO; + [lkNonLocal lock]; + onPauseProc=notification; + [lkNonLocal unlock]; + itsIsReady=YES; + dispatch_semaphore_signal(semHighLevelCall); + } + else{ + EASFKLog(@"ASFKLinearFlow: Invalid Routine provided"); + return NO; + } + return YES; +} -(BOOL) setSummary:(ASFKExecutableRoutineSummary)summary{ if(summary){ dispatch_semaphore_wait(semHighLevelCall, DISPATCH_TIME_FOREVER); itsIsReady=NO; [lkNonLocal lock]; - sumproc=nil; - sumproc=summary; + sumProc=nil; + sumProc=summary; [lkNonLocal unlock]; itsIsReady=YES; dispatch_semaphore_signal(semHighLevelCall); @@ -176,8 +192,8 @@ -(BOOL) setCancellationHandler:(ASFKCancellationRoutine)ch{ dispatch_semaphore_wait(semHighLevelCall, DISPATCH_TIME_FOREVER); itsIsReady=NO; [lkNonLocal lock]; - cancelproc=nil; - cancelproc=ch; + cancellationHandler=nil; + cancellationHandler=ch; [lkNonLocal unlock]; itsIsReady=YES; dispatch_semaphore_signal(semHighLevelCall); @@ -325,7 +341,7 @@ - (NSDictionary *)stepNonblockingWithData:(id)data { -(ASFKParamSet*) _decodeExParams:(ASFKExecutionParams*)ex forSession:(id)sessionId{ ASFKParamSet* expar=[ASFKParamSet new]; if(ex){ - expar.summary = ex->SummaryRoutine?ex->SummaryRoutine:sumproc; + expar.summary = ex->summaryRoutine?ex->summaryRoutine:sumProc; expar.procs = [NSMutableArray array]; NSArray* prarr=ex->procs; if(prarr==nil || [prarr count]==0 || [prarr isKindOfClass:[NSNull class]]){ @@ -334,23 +350,13 @@ -(ASFKParamSet*) _decodeExParams:(ASFKExecutionParams*)ex forSession:(id)session for (ASFKExecutableRoutine p in prarr){ [expar.procs addObject:[p copy]]; }; - expar.cancProc = ex->cancellationProc?ex->cancellationProc:cancelproc; + expar.onPause = ex->onPauseProc?ex->onPauseProc:onPauseProc; + expar.cancProc = ex->cancellationProc?ex->cancellationProc:cancellationHandler; expar.excond=ex->expCondition; expar.progress = ex->progressProc?ex->progressProc:progressProc; expar.sessionId=sessionId; } -// else{ -// expar.summary = sumproc; -// expar.procs = [_backprocs copy]; -// expar.cancProc = cancelproc; -// -// } -// if(sessionId){ -// expar.sessionId=sessionId; -// } -// else{ -// expar.sessionId=[ASFKBase generateIdentity]; -// } + return expar; } @end diff --git a/src/ASFKMBProperties.h b/src/ASFKMBProperties.h index 0c986e4..2e5263d 100644 --- a/src/ASFKMBProperties.h +++ b/src/ASFKMBProperties.h @@ -12,18 +12,13 @@ GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ -// -// Created by Boris Vigman on 16/05/2021. // Copyright © 2019-2022 Boris Vigman. All rights reserved. // #import #import "ASFKExpirationCondition.h" #include -//typedef enum e_ASFKMBPoppingPolicy{ -// e_ASFKMB_POP_OWNER_ONLY, -// e_ASFKMB_POP_FIRST_POPPER -//} eASFKMBPoppingPolicy; + typedef void(^ASFKMbNRunOnContainerReadRoutine)(id cId,NSDate* tstamp, NSArray* data); /*! @brief custom filter of incoming messages. diff --git a/src/ASFKMBSecret.h b/src/ASFKMBSecret.h index 4763dc6..89a39b0 100644 --- a/src/ASFKMBSecret.h +++ b/src/ASFKMBSecret.h @@ -32,13 +32,13 @@ 6. multicast x x x 7. moderation - blinding/muting of members x x 8. security - changing secrets for Mailbox, Group, Global x x - 9. issuer - retraction/hiding of posted messages x x + 9. issuer - retraction/hiding of posted messages x x 10. config - update of mailbox operational parameters x 11. hosting - addition/removal of members to/from Group mailbox x x Secrets lifetime and configuration: All secrets have unlimited lifetime by default, which however can be configured to be temporary: for limited time period, limited number of use attempts or custom lifetime shortening criteria. When lifetime is ended the secret is invalidated forever. Manual invalidation is available too. - Any secret may be configured to have different porperties. Any property may be configured only once. + Any secret may be configured to have different properties. Any property may be configured only once. */ typedef BOOL(^ASFKSecretComparisonProc)(id secret1,id secret2); diff --git a/src/ASFKMailbox.h b/src/ASFKMailbox.h index 1120151..e6a46b8 100644 --- a/src/ASFKMailbox.h +++ b/src/ASFKMailbox.h @@ -12,7 +12,6 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ - // Copyright © 2019-2022 Boris Vigman. All rights reserved. // #ifndef __A_S_F_K_Mailbox_h__ @@ -233,7 +232,7 @@ typedef void(^ASFKMbLockConditionRoutine)(id cid, BOOL group, id msgId, id msg); @param secret secret; master secret is required; if no secret set then nil must be provided. @return YES for success, NO otherwise. */ --(BOOL) discardAllUsersWithSecret:(ASFKMasterSecret*)secret; +-(BOOL) discardAllMailboxesWithSecret:(ASFKMasterSecret*)secret; /*! @brief removes ALL messages from ALL groups @discussion all messages in all groups will be discarded. @@ -272,7 +271,7 @@ typedef void(^ASFKMbLockConditionRoutine)(id cid, BOOL group, id msgId, id msg); @discussion counts only unique users: user registered in several groups counts as 1; @return Number of users. */ --(NSUInteger) totalUsers; +-(NSUInteger) totalMailboxes; /*! @brief counts ALL messages delivered to some group. @discussion counts only messages that are not discarded; counts only unique messages: message delivered to several users counts as 1; @@ -296,15 +295,15 @@ typedef void(^ASFKMbLockConditionRoutine)(id cid, BOOL group, id msgId, id msg); #pragma mark - Reading & Popping /*! @brief reading in blocking maner. - @discussion When called, the function reads all available messages; number of messages to read is defined by 'skipAndTake' range. When number of available messages is less than required, the call will not return until new messages arrive. When message from blocking call received, the calling thread is released. + @discussion When called, the method reads all available messages; number of messages to read is defined by 'skipAndTake' range. When number of available messages is less than required, the call will not return until new messages arrive. When message from blocking call received, the calling thread is released. @param skipAndTake range of retrieval; loc represents offset from beginning, number of messages to skip; length represents number of items to retrieve; if 0 then function returns when new messages arrive or waiting period expires. @param mid user ID; if nil, read fails. + @param condition custom unblocking condition; may bu nil; @param secret private (associated with this group) secret is required; if no secret set then nil must be provided. If provided secret does not match the stored one, operation fails. @return array of available messages; size of the array is less or equal to msgcount; if there is no message, returns empty array. */ -(NSArray*) waitAndReadMsg:(NSRange)skipAndTake fromMailbox:(id)mid unblockIf:(ASFKMbLockConditionRoutine)condition withSecret:(ASFKPrivateSecret*)secret; -//-(NSArray*) waitAndReadMsgFromGroup:(id)gid unblockIf:(ASFKMbLockConditionRoutine) condition withSecret:(ASFKPrivateSecret*)secret; /*! @brief reads specified number of earliest messages delivered to given mailbox. @discussion After reading messages are NOT deleted from the queue; fetched messages ordered earliest to latest. If messages arrives from blocking call - the corresponding message is read, calling thread is NOT released. @@ -379,7 +378,7 @@ typedef void(^ASFKMbLockConditionRoutine)(id cid, BOOL group, id msgId, id msg); -(void) popEarliestMsg:(NSRange)skipAndTake fromGroup:(id)gid forUser:(id)uid withSecret:(ASFKPrivateSecret*)secret; #pragma mark - Call interface /*! - @brief delivers specified message synchronously to the specified mailbox. This method returns when the message when call was improper or message read by the receiver. Being invoked, this method blocks the calling thread, until some message read or a condition met. Examples of unblocking conditions: time period elapsed; or some custom condition may be introduced. + @brief delivers specified message synchronously to the specified mailbox. This method returns when it was called improperly or message read by the receiver. Being invoked, this method blocks the calling thread, until the message is read or a condition met. Examples of unblocking conditions: time period elapsed; or some custom condition may be introduced. @discussion delivered message can not be retracted. @param msg a message to be delivered; can be nil. @param uid user ID; may be nil. @@ -424,6 +423,55 @@ typedef void(^ASFKMbLockConditionRoutine)(id cid, BOOL group, id msgId, id msg); @return YES for successful delivery, NO otherwise. */ -(BOOL) broadcast:(id)msg withProperties:(ASFKMBMsgProperties*)props secret:(ASFKSecret*)secret;; +/*! + @brief retracts delivered message from specific group. + @discussion poster can retract posted message while it is available for retraction;the message is available for retraction until it has been popped or its lifetime has ended. + @param msgId retractable message ID; if nil, action fails. + @param gid group ID; if nil, action fails. + */ +#pragma mark - Message hiding & retraction +-(BOOL) retractMsg:(id)msgId fromGroup:(id)gid secret:(ASFKPrivateSecret*)secret;; +/*! + @brief retracts delivered message from specific user. + @discussion poster can retract posted message while it is available for retraction;the message is available for retraction until it has been popped or its lifetime has ended. + @param msgId retractable message ID; if nil, action fails. + @param uid user ID; if nil, action fails. + @return YES for succesful retraction, NO otherwise. + */ +-(BOOL) retractMsg:(id)msgId fromMailbox:(id)uid secret:(ASFKPrivateSecret*)secret;; +/*! + @brief prevents given user from posting in given group. + @discussion muted user can access messages delivered to group but cannot deliver messages to the group; to be muted user may or may not be member of this group. + @param yesno YES for muting, NO for unmuting. + @param uid user ID; if nil, action fails. + @param gid group ID; if nil, action fails. + @param secret private secret of this group; nil means no secret set. + @return YES for succesful muting, NO otherwise. + */ +-(BOOL) mute:(BOOL) yesno user:(id)uid inGroup:(id)gid secret:(ASFKPrivateSecret*)secret; +-(BOOL) muteAll:(BOOL) yesno inGroup:(id)gid secret:(ASFKPrivateSecret*)secret; +/*! + @brief prevents given user from posting in given mailbox. + @discussion muted user cannot deliver messages to the specified target. + @param yesno YES for muting, NO for unmuting. + @param guestId guest ID; if nil, action fails. + @param hostId mailbox owner's ID; if nil, action fails. + @param secret private secret of this group; nil means no secret set. + @return YES for succesful muting, NO otherwise. + */ +-(BOOL) mute:(BOOL) yesno user:(id)guestId inMailbox:(id)hostId secret:(ASFKPrivateSecret*)secret; +-(BOOL) muteAll:(BOOL) yesno inMailbox:(id)hostId secret:(ASFKPrivateSecret*)secret; +/*! + @brief prevents given user from accessing messages in given group. + @discussion blinded user can post messages to group but cannot read or pop; to be blinded user may or may not be member of this group. + @param yesno YES for blinding, NO for unblinding. + @param uid user ID; if nil, action fails. + @param gid group ID; if nil, action fails. + @param secret private secret of this group; nil means no secret set. + @return YES for succesful blinding, NO otherwise. + */ +-(BOOL) blind:(BOOL) yesno user:(id)uid inGroup:(id)gid secret:(ASFKPrivateSecret*)secret; +-(BOOL) blindAll:(BOOL) yesno inGroup:(id)gid secret:(ASFKPrivateSecret*)secret; @end #endif /*#define __A_S_F_K_Mailbox_h__*/ diff --git a/src/ASFKMailbox.mm b/src/ASFKMailbox.mm index dec0af7..e92198d 100644 --- a/src/ASFKMailbox.mm +++ b/src/ASFKMailbox.mm @@ -12,8 +12,6 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ -// -// Created by Boris Vigman on 05/04/2019. // Copyright © 2019-2022 Boris Vigman. All rights reserved. // @@ -21,7 +19,7 @@ #import "ASFKMailbox.h" #import "ASFKAuthorizationMgr.h" -@interface Private_ASFKBlockingRW:NSObject{ +@interface Private_ASFKBlocker:NSObject{ @public NSCondition* rlocker; @public BOOL blocked; } @@ -31,7 +29,7 @@ @interface Private_ASFKBlockingRW:NSObject{ @property (nonatomic) ASFKMBMsgProperties* props; @end -@implementation Private_ASFKBlockingRW +@implementation Private_ASFKBlocker @end @interface Private_ASFKMBMsg:NSObject{ @@ -95,6 +93,7 @@ -(id)initWithUser:(id)uid privateSecret:(ASFKPrivateSecret*) pSecret properties: -(BOOL) setProperties:(ASFKMBContainerProperties*)props; -(BOOL) setMemberingLimitsLow:(NSUInteger)low high:(NSUInteger)high; -(BOOL) isValid; +-(BOOL) isBlacklisted; -(BOOL) isPrivateSecretValid:(ASFKPrivateSecret*)secret matcher:(ASFKSecretComparisonProc)match; -(BOOL) setPrivateSecret:(ASFKPrivateSecret*)oldsec newsec:(ASFKPrivateSecret*)newsec user:(id)uid authMgr:(ASFKAuthorizationMgr*)auth; -(BOOL) isPrivate; @@ -130,7 +129,7 @@ -(void) _testAndRemove:(NSDate*)tmpoint; -(void) _testAndAccept:(NSDate*)tmpoint; @end @implementation ASFKSomeContainer{ - Private_ASFKBlockingRW* rwBlocker; + Private_ASFKBlocker* readBlocker; } +(NSDate*) maxDate1:(NSDate*)d1 date2:(NSDate*)d2{ if(d1 && d2){ @@ -199,11 +198,14 @@ -(id)initWithUser:(id)uid privateSecret:(ASFKPrivateSecret*) pSecret properties: memLimitLow=0; memLimitHigh=0; - rwBlocker=[Private_ASFKBlockingRW new]; - rwBlocker->rlocker=[NSCondition new]; + readBlocker=[Private_ASFKBlocker new]; + readBlocker->rlocker=[NSCondition new]; } return self; } +-(BOOL) isBlacklisted{ + return blacklisted; +} -(BOOL) isValid{ return blacklisted?NO:YES; } @@ -415,6 +417,7 @@ -(NSUInteger) userCount{ [lock1 unlock]; return c; } + -(NSUInteger) msgCount{ if(blacklisted){ return 0; @@ -465,9 +468,13 @@ -(id) addMsg:(id) msg withProperties:(ASFKMBMsgProperties *)properties group:(BO if(blacklisted){ return nil; } + if([self msgCount]>ASFK_PRIVSYM_PER_MLBX_MAX_MSG_LIMIT){ + WASFKLog(@"Too many messages in container %@; delivery failed!",self.itsOwnerId); + return nil; + } BOOL grant=NO; - + [lock1 lock]; if(itscprops.blockingReadwriteAllowed){ blk &= itscprops.blockingReadwriteAllowed; } @@ -475,7 +482,6 @@ -(id) addMsg:(id) msg withProperties:(ASFKMBMsgProperties *)properties group:(BO blk=NO; } - [lock1 lock]; if(properties==nil || [properties isKindOfClass:[NSNull null]]){ grant=[self _canUserPost:nil]; } @@ -508,11 +514,9 @@ -(id) addMsg:(id) msg withProperties:(ASFKMBMsgProperties *)properties group:(BO privmsg->wlocker=[NSCondition new]; BOOL success = [entranceQ push:privmsg]; if(success){ - - [rwBlocker->rlocker lock]; - [rwBlocker->rlocker signal]; - [rwBlocker->rlocker unlock]; - + [readBlocker->rlocker lock]; + [readBlocker->rlocker signal]; + [readBlocker->rlocker unlock]; [privmsg->wlocker lock]; [privmsg->wlocker wait]; [privmsg->wlocker unlock]; @@ -528,9 +532,9 @@ -(id) addMsg:(id) msg withProperties:(ASFKMBMsgProperties *)properties group:(BO uuid=nil; } else{ - [rwBlocker->rlocker lock]; - [rwBlocker->rlocker signal]; - [rwBlocker->rlocker unlock]; + [readBlocker->rlocker lock]; + [readBlocker->rlocker signal]; + [readBlocker->rlocker unlock]; } } @@ -544,12 +548,18 @@ -(NSArray*) readBlocking:(NSUInteger)amount offset:(NSUInteger)offset forUser:(i if(blacklisted){ return @[]; } + NSDate* tmpoint=[NSDate date]; [self _testAndRemove:tmpoint]; [self _testAndAccept:tmpoint]; NSMutableArray* ma=[NSMutableArray array]; NSMutableArray* readMsgs=[NSMutableArray array]; + NSMutableIndexSet* iset=[NSMutableIndexSet new]; [lock1 lock]; + if(NO==itscprops.blockingReadwriteAllowed){ + [lock1 unlock]; + return @[]; + } BOOL hasuser=[_users containsObject:uid] || [_itsOwnerId isEqualTo:uid]; if(!hasuser){ [lock1 unlock]; @@ -563,7 +573,7 @@ -(NSArray*) readBlocking:(NSUInteger)amount offset:(NSUInteger)offset forUser:(i [lock1 unlock]; if(msc < 1 || amount > msc){ - [rwBlocker->rlocker lock]; + [readBlocker->rlocker lock]; while(1){ [self _testAndRemove:tmpoint]; @@ -572,14 +582,14 @@ -(NSArray*) readBlocking:(NSUInteger)amount offset:(NSUInteger)offset forUser:(i msc=[messages count]; [lock1 unlock]; if(msc < 1 ){ - [rwBlocker->rlocker wait]; + [readBlocker->rlocker wait]; } else{ break; } } - [rwBlocker->rlocker unlock]; + [readBlocker->rlocker unlock]; } @@ -612,19 +622,26 @@ -(NSArray*) readBlocking:(NSUInteger)amount offset:(NSUInteger)offset forUser:(i { Private_ASFKMBMsg* privmsg=[messages objectAtIndex:ui]; if(privmsg->blocked == YES){ - [ma addObject:privmsg.msg]; - privmsg.props->maxAccessLimit.fetch_sub(1); - [readMsgs addObject:privmsg.msgId]; - [privmsg->wlocker lock]; - [privmsg->wlocker signal]; - [privmsg->wlocker unlock]; + if((privmsg.props.msgDeletionTimer && [privmsg.props passedDeletionDate:tmpoint]) || privmsg.props->maxAccessLimit==0){ + [iset addIndex:ui]; + } + else{ + [ma addObject:privmsg.msg]; + privmsg.props->maxAccessLimit.fetch_sub(1); + [readMsgs addObject:privmsg.msgId]; + [privmsg->wlocker lock]; + [privmsg->wlocker broadcast]; + [privmsg->wlocker unlock]; + } } else { - if( + if((privmsg.props.msgDeletionTimer && [privmsg.props passedDeletionDate:tmpoint]) || privmsg.props->maxAccessLimit==0){ + [iset addIndex:ui]; + } + else if( (privmsg.props.msgReadabilityTimer.itsDeadline==nil || [privmsg.props passedReadingDate:tmpoint]) - //&& privmsg.props->maxAccessLimit>0 ) { if(privmsg.props->maxAccessLimit>0) @@ -637,7 +654,7 @@ -(NSArray*) readBlocking:(NSUInteger)amount offset:(NSUInteger)offset forUser:(i } } - + [messages removeObjectsAtIndexes:iset]; [lock1 lock]; NSUInteger msgCount=[messages count]; ASFKMbNotifyOnContainerReadRoutine crr=itscprops.onReadProc; @@ -656,6 +673,7 @@ -(NSArray*) read:(NSUInteger)amount offset:(NSUInteger)offset forUser:(id)uid la [self _testAndAccept:tmpoint]; NSMutableArray* ma=[NSMutableArray array]; NSMutableArray* readMsgs=[NSMutableArray array]; + NSMutableIndexSet* iset=[NSMutableIndexSet new]; [lock1 lock]; BOOL hasuser=[_users containsObject:uid] || [_itsOwnerId isEqualTo:uid]; if(!hasuser){ @@ -702,14 +720,28 @@ -(NSArray*) read:(NSUInteger)amount offset:(NSUInteger)offset forUser:(id)uid la { Private_ASFKMBMsg* privmsg=[messages objectAtIndex:ui]; - if(privmsg.props->maxAccessLimit>0){ - [ma addObject:privmsg.msg]; - privmsg.props->maxAccessLimit.fetch_sub(1); - [readMsgs addObject:privmsg.msgId]; + if([privmsg.props passedDeletionDate:tmpoint]){ + [iset addIndex:ui]; + if(privmsg->blocked==YES && privmsg->wlocker != nil){ + [privmsg->wlocker lock]; + [privmsg->wlocker broadcast]; + [privmsg->wlocker unlock]; + } + } + else{ + if((privmsg.props.msgReadabilityTimer.itsDeadline==nil + || [privmsg.props passedReadingDate:tmpoint]) + ){ + if(privmsg.props->maxAccessLimit>0){ + [ma addObject:privmsg.msg]; + privmsg.props->maxAccessLimit.fetch_sub(1); + [readMsgs addObject:privmsg.msgId]; + } + } } - } + [messages removeObjectsAtIndexes:iset]; NSUInteger msgCount=[messages count]; ASFKMbNotifyOnContainerReadRoutine crr=itscprops.onReadProc; ASFKMbNRunOnContainerReadRoutine rcr=itscprops.runOnReadProc; @@ -770,17 +802,12 @@ -(NSUInteger) pop:(NSUInteger)amount offset:(NSUInteger)offset forUser:(id)uid l } for (NSInteger ui=lowbound; uiblocked==YES && privmsg->wlocker != nil){ [privmsg->wlocker lock]; [privmsg->wlocker broadcast]; [privmsg->wlocker unlock]; } - } } NSRange rn=NSMakeRange(lowbound, hibound-lowbound); @@ -836,15 +863,16 @@ -(void) purge:(NSDate*)tm{ } }]; [messages removeAllObjects]; - if(rwBlocker->rlocker){ - [rwBlocker->rlocker lock]; - [rwBlocker->rlocker broadcast]; - [rwBlocker->rlocker unlock]; + if(readBlocker->rlocker){ + [readBlocker->rlocker lock]; + [readBlocker->rlocker broadcast]; + [readBlocker->rlocker unlock]; } + ASFKMbNotifyOnContainerDiscardRoutine dp=itscprops.onDiscardProc; [lock1 unlock]; - if(itscprops.onDiscardProc){ - itscprops.onDiscardProc(self.itsOwnerId, tm); + if(dp){ + dp(self.itsOwnerId, tm); } } -(void) discardAllMessages{ @@ -856,8 +884,9 @@ -(void) discardAllUsers{ } [lock1 lock]; [backusers removeAllObjects]; - [messages removeAllObjects]; - [entranceQ reset]; + [uprops removeAllObjects]; +// [messages removeAllObjects]; +// [entranceQ reset]; [lock1 unlock]; } -(void) discardUser:(id)uid{ @@ -877,7 +906,128 @@ -(void) discardUser:(id)uid{ itscprops.onLeaveProc(self.itsOwnerId, uid); } } +#pragma mark - moderation +-(BOOL) mute:(BOOL)yesno user:(id)uid secret:(ASFKPrivateSecret *)secret group:(BOOL)grp{ + if(blacklisted){ + return NO; + } + if(grp){ + BOOL res=NO; + [lock1 lock]; + BOOL hasuser=[_users containsObject:uid]; + if(!hasuser){ + [lock1 unlock]; + return NO; + } + ASFKMBGroupMemberProperties* up=[uprops objectForKey:uid]; + if(up){ + up.isMuted=yesno; + res=YES; + } + [lock1 unlock]; + return res; + } + else{ + BOOL res=NO; + [lock1 lock]; + ASFKMBGroupMemberProperties* up=[uprops objectForKey:uid]; + if(up){ + up.isMuted=yesno; + res=YES; + } + else{ + if(yesno){ + ASFKMBGroupMemberProperties* u=[ASFKMBGroupMemberProperties new]; + u.isMuted=YES; + [uprops setObject:u forKey:uid]; + res=YES; + } + } + [lock1 unlock]; + return res; + } +} +-(BOOL) muteAll:(BOOL)yesno secret:(ASFKPrivateSecret *)secret group:(BOOL)grp{ + if(blacklisted){ + return NO; + } + if(grp){ + BOOL res=YES; + [lock1 lock]; + + [uprops enumerateKeysAndObjectsWithOptions:NSEnumerationConcurrent usingBlock:^(id _Nonnull key, id _Nonnull obj, BOOL * _Nonnull stop) { + BOOL hasuser=[_users containsObject:key] || [key isEqualTo:self.itsOwnerId]; + if(hasuser){ + ASFKMBGroupMemberProperties* up=(ASFKMBGroupMemberProperties* )obj; + up.isMuted=yesno; + } + + }]; + [lock1 unlock]; + return res; + } + else{ + BOOL res=YES; + [lock1 lock]; + + [uprops enumerateKeysAndObjectsWithOptions:NSEnumerationConcurrent usingBlock:^(id _Nonnull key, id _Nonnull obj, BOOL * _Nonnull stop) { + ASFKMBGroupMemberProperties* up=(ASFKMBGroupMemberProperties* )obj; + up.isMuted=yesno; + + }]; + [lock1 unlock]; + return res; + + } +} +-(BOOL) blind:(BOOL)yesno user:(id)uid secret:(ASFKPrivateSecret *)secret{ + if(blacklisted){ + return NO; + } + BOOL res=NO; + [lock1 lock]; + BOOL hasuser=[_users containsObject:uid]; + if(!hasuser){ + [lock1 unlock]; + return NO; + } + ASFKMBGroupMemberProperties* up=[uprops objectForKey:uid]; + if(up){ + up.isBlinded=yesno; + res=YES; + } + [lock1 unlock]; + return res; +} +-(BOOL) blindAll:(BOOL)yesno secret:(ASFKPrivateSecret *)secret{ + if(blacklisted){ + return NO; + } + BOOL res=YES; + [lock1 lock]; + + [uprops enumerateKeysAndObjectsWithOptions:NSEnumerationConcurrent usingBlock:^(id _Nonnull key, id _Nonnull obj, BOOL * _Nonnull stop) { + BOOL hasuser=[_users containsObject:key] || [key isEqualTo:self.itsOwnerId]; + if(hasuser){ + ASFKMBGroupMemberProperties* up=(ASFKMBGroupMemberProperties* )obj; + up.isBlinded=yesno; + } + + }]; + + [lock1 unlock]; + return res; +} +-(BOOL) retractMsg:(id)msgId{ + if(blacklisted){ + return NO; + } + [lock1 lock]; + [retractionList addObject:msgId]; + [lock1 unlock]; + return NO; +} #pragma mark - maintenance -(void) runPeriodicProc:(NSDate*)tmpoint{ if(blacklisted){ @@ -890,26 +1040,31 @@ -(void) runPeriodicProc:(NSDate*)tmpoint{ } #pragma mark - Private methods -(void) _testAndRemove:(NSDate*)tmpoint{ - NSMutableSet* ms=nil; [lock1 lock]; - ms=retractionList; - retractionList=[NSMutableSet new]; - [lock1 unlock]; NSIndexSet* inset1=[messages indexesOfObjectsPassingTest:^BOOL(id _Nonnull obj, NSUInteger idx, BOOL * _Nonnull stop) { Private_ASFKMBMsg* o=obj; BOOL rmCandidate=NO; - rmCandidate |= [o.props passedDeletionDate:tmpoint]; - if([ms containsObject: o.props.msgId]){ + rmCandidate |= [o.props passedDeletionDate:tmpoint]; + if([retractionList containsObject: o.props.msgId]){ rmCandidate = YES; - [ms removeObject:o.props.msgId]; - + [retractionList removeObject:o.props.msgId]; + if(o->blocked){ + [o->wlocker lock]; + [o->wlocker broadcast]; + [o->wlocker unlock]; + } } else{ - [entranceQ removeObjWithId:o.props.msgId andBlock:^BOOL(id item, id sample, BOOL* stop) { + [entranceQ removeObjWithProperty:o.props.msgId andBlock:^BOOL(id item, id sample, BOOL* stop) { Private_ASFKMBMsg* o=item; if (item && sample && [o.msgId isEqualTo:sample]) { *stop = YES; + if(o->blocked){ + [o->wlocker lock]; + [o->wlocker broadcast]; + [o->wlocker unlock]; + } return YES; } return NO; @@ -917,10 +1072,10 @@ -(void) _testAndRemove:(NSDate*)tmpoint{ } return rmCandidate; }]; - [lock1 lock]; + //[lock1 lock]; [messages removeObjectsAtIndexes:inset1]; - [entranceQ filterWith:nil]; + //[entranceQ filterWith:nil]; NSMutableArray* userstoRemove=[NSMutableArray array]; for (id userid in backusers) { @@ -956,7 +1111,7 @@ -(void) _testAndAccept:(NSDate*)tmpoint{ [lock1 unlock]; - id msg=[entranceQ pull]; + id msg=[entranceQ pullWithCount:msgCount]; if(msg){ @@ -1101,6 +1256,10 @@ -(id) createMailbox:(id)uid withProperties:(ASFKMBContainerProperties*)props sec return nil; } + if(YES == [self _test_mailboxes_limit:ASFK_PRIVSYM_MAX_MLBX_LIMIT]){ + [self _discard_relaxMemoryPressure:ASFK_PRIVSYM_OBJ_RELEASE_SAMPLE_SIZE]; + return nil; + } ASFKMBContainerProperties* p0=[ASFKMBContainerProperties new]; if(props==nil) { @@ -1128,6 +1287,10 @@ -(id) createGroup:(id)gid withProperties:(ASFKMBContainerProperties*)props secre return nil; } + if(YES == [self _test_mailboxes_limit:ASFK_PRIVSYM_MAX_MLBX_LIMIT]){ + [self _discard_relaxMemoryPressure:ASFK_PRIVSYM_OBJ_RELEASE_SAMPLE_SIZE]; + return nil; + } ASFKMBContainerProperties* p0=[ASFKMBContainerProperties new]; if(props==nil) { @@ -1151,6 +1314,10 @@ -(id) createGroup:(id)gid withProperties:(ASFKMBContainerProperties*)props secre } -(id) cloneGroup:(id)gid newId:(id)newid withProperties:(ASFKMBContainerProperties*)props secret:(ASFKPrivateSecret*)psecret{ + if(YES == [self _test_mailboxes_limit:ASFK_PRIVSYM_MAX_MLBX_LIMIT]){ + [self _discard_relaxMemoryPressure:ASFK_PRIVSYM_OBJ_RELEASE_SAMPLE_SIZE]; + return nil; + } if(gid!=nil && newid!=nil){ [lockGroupsDB lock]; ASFKSomeContainer* sg0=[groups objectForKey:gid]; @@ -1176,7 +1343,7 @@ -(id) cloneGroup:(id)gid newId:(id)newid withProperties:(ASFKMBContainerProperti return nil; } -(BOOL) addUser:(id)uid toGroup:(id)gid withProperties:(ASFKMBGroupMemberProperties*)props secret:(ASFKPrivateSecret*)psecret{ - [self _cast_relaxMemoryPressure:ASFK_PRIVSYM_MEM_PRESSURE_THRESHOLD ]; + [self _cast_relaxMemoryPressure:ASFK_PRIVSYM_MEM_PRESSURE_MLBX_THRESHOLD ]; if(!gid || !uid){ return NO; } @@ -1374,22 +1541,24 @@ -(NSUInteger) runPeriodic:(size_t)sampleSize timepoint:(NSDate*)tm callbacks:(AS if(tm==nil){ tm=[NSDate date]; } - if(objCount>ASFK_PRIVSYM_MEM_PRESSURE_THRESHOLD){ + if(objCount>ASFK_PRIVSYM_MEM_PRESSURE_MSG_THRESHOLD){ if(clbs && clbs->prMemPressure){ clbs->prMemPressure(tm,objCount); } } - dispatch_queue_t dConQ_Background=dispatch_get_global_queue(QOS_CLASS_BACKGROUND, 0); + //dispatch_queue_t dConQ_Background=dispatch_get_global_queue(QOS_CLASS_BACKGROUND, 0); - dispatch_apply(2, dConQ_Background, ^(size_t index) { - if(index==0){ + //dispatch_apply(2, dConQ_Background, ^(size_t index) + //{ + //if(index==0) + { [lockUsersDB lock]; [users enumerateKeysAndObjectsWithOptions:NSEnumerationConcurrent usingBlock:^(id _Nonnull key, id _Nonnull obj, BOOL * _Nonnull stop) { ASFKSomeContainer* sg=(ASFKSomeContainer*)obj; if(sg){ if([sg shouldBeDeletedAtDate:tm]){ [sg markBlacklisted]; - [blacklistedUsers addObject:sg]; + //[blacklistedUsers addObject:sg]; } else{ [sg runPeriodicProc:tm]; @@ -1397,16 +1566,18 @@ -(NSUInteger) runPeriodic:(size_t)sampleSize timepoint:(NSDate*)tm callbacks:(AS } }]; - [lockUsersDB unlock]; - }else{ + } +// else + { +// [lockGroupsDB lock]; [groups enumerateKeysAndObjectsWithOptions:NSEnumerationConcurrent usingBlock:^(id _Nonnull key, id _Nonnull obj, BOOL * _Nonnull stop) { ASFKSomeContainer* sg=(ASFKSomeContainer*)obj; if(sg){ if([sg shouldBeDeletedAtDate:tm]){ [sg markBlacklisted]; - [blacklistedGroups addObject:sg]; + //[blacklistedGroups addObject:sg]; } else{ [sg runPeriodicProc:tm]; @@ -1414,20 +1585,47 @@ -(NSUInteger) runPeriodic:(size_t)sampleSize timepoint:(NSDate*)tm callbacks:(AS } }]; + [lockGroupsDB unlock]; + } + //}; + dispatch_queue_t dConQ_Background=dispatch_get_global_queue(QOS_CLASS_BACKGROUND, 0); + dispatch_apply(2, dConQ_Background, ^(size_t index){ + if(index==0){ + NSMutableArray* deadkeys=[NSMutableArray new]; + [lockUsersDB lock]; + [users enumerateKeysAndObjectsUsingBlock:^(id _Nonnull key, id _Nonnull obj, BOOL * _Nonnull stop) { + ASFKSomeContainer* sg=(ASFKSomeContainer*)obj; + if([sg isBlacklisted]){ + [deadkeys addObject:key]; + } + }]; + //blacklistedUsers = [NSMutableArray new]; + [users removeObjectsForKeys:deadkeys]; + [lockUsersDB unlock]; + } + else{ + NSMutableArray* deadkeys=[NSMutableArray new]; + [lockGroupsDB lock]; + [groups enumerateKeysAndObjectsUsingBlock:^(id _Nonnull key, id _Nonnull obj, BOOL * _Nonnull stop) { + ASFKSomeContainer* sg=(ASFKSomeContainer*)obj; + if([sg isBlacklisted]){ + [deadkeys addObject:key]; + } + }]; + //blacklistedGroups = [NSMutableArray new]; + [groups removeObjectsForKeys:deadkeys]; [lockGroupsDB unlock]; } }); - return 0; } -(NSUInteger) runDaemon:(size_t)sampleSize timepoint:(NSDate*)tm callbacks:(ASFKMBCallbacksMaintenance*)clbs{ if(tm==nil){ tm=[NSDate date]; } - [self runDelivery:sampleSize]; [self runDiscarding:sampleSize timepoint:tm]; - + [self runDelivery:sampleSize]; [self runPeriodic:sampleSize timepoint:tm callbacks:clbs]; return 0; } @@ -1505,7 +1703,7 @@ -(BOOL) setMsgQDropPolicy:(eASFKQDroppingPolicy)policy forGroup:(id)gid secret:( [lockGroupsDB lock]; ASFKSomeContainer* sg=[groups objectForKey:gid]; if(sg && ([sg isPrivateSecretValid:secret matcher:authmgr->secretProcConfig])){ - res=[sg setMsgQDropPolicy:policy]; + res=[sg setMsgQDropperPolicy:policy]; } [lockGroupsDB unlock]; return res; @@ -1560,7 +1758,7 @@ -(BOOL) setMsgQDropPolicy:(eASFKQDroppingPolicy)policy forMailbox:(id)uid secret [lockUsersDB lock]; ASFKSomeContainer* sg=[users objectForKey:uid]; if(sg && ([sg isPrivateSecretValid:secret matcher:authmgr->secretProcConfig])){ - res=[sg setMsgQDropPolicy:policy]; + res=[sg setMsgQDropperPolicy:policy]; } [lockUsersDB unlock]; return res; @@ -1594,7 +1792,7 @@ -(BOOL) setMsgQDroppingAlgorithmL2:(clbkASFKMBFilter)dropAlg forMailbox:(id)uid #pragma mark - Discarding -(BOOL) discardMailbox:(id)uid secret:(ASFKSecret*)secret{ - [self _discard_relaxMemoryPressure:ASFK_PRIVSYM_MEM_PRESSURE_THRESHOLD ]; + [self _discard_relaxMemoryPressure: ASFK_PRIVSYM_OBJ_RELEASE_SAMPLE_SIZE]; if(!uid){ return NO; @@ -1628,7 +1826,7 @@ -(BOOL) discardMailbox:(id)uid secret:(ASFKSecret*)secret{ return res; } -(BOOL) discardGroup:(id)gid secret:(ASFKSecret*)secret{ - [self _discard_relaxMemoryPressure:ASFK_PRIVSYM_MEM_PRESSURE_THRESHOLD ]; + [self _discard_relaxMemoryPressure: ASFK_PRIVSYM_OBJ_RELEASE_SAMPLE_SIZE]; if(!gid){ return NO; @@ -1663,8 +1861,8 @@ -(BOOL) discardGroup:(id)gid secret:(ASFKSecret*)secret{ return res; } --(BOOL) discardAllUsersWithSecret:(ASFKMasterSecret*)secret{ - [self _discard_relaxMemoryPressure:ASFK_PRIVSYM_MEM_PRESSURE_THRESHOLD ]; +-(BOOL) discardAllMailboxesWithSecret:(ASFKMasterSecret*)secret{ + [self _discard_relaxMemoryPressure: ASFK_PRIVSYM_OBJ_RELEASE_SAMPLE_SIZE]; if(![authmgr isMasterSecretValid:secret matcher:authmgr->secretProcDiscard]){ return NO; @@ -1681,7 +1879,7 @@ -(BOOL) discardAllUsersWithSecret:(ASFKMasterSecret*)secret{ } -(BOOL) discardAllGroupsWithSecret:(ASFKMasterSecret*)secret{ ASFKLog(@"Discarding all groups"); - [self _discard_relaxMemoryPressure:ASFK_PRIVSYM_MEM_PRESSURE_THRESHOLD ]; + [self _discard_relaxMemoryPressure: ASFK_PRIVSYM_OBJ_RELEASE_SAMPLE_SIZE]; if(NO==[authmgr isMasterSecretValid:secret matcher:authmgr->secretProcDiscard]){ return NO; @@ -1696,7 +1894,7 @@ -(BOOL) discardAllGroupsWithSecret:(ASFKMasterSecret*)secret{ } -(BOOL) discardAllMessagesWithSecret:(ASFKMasterSecret*)secret{ ASFKLog(@"Discarding messages from groups"); - [self _discard_relaxMemoryPressure:ASFK_PRIVSYM_MEM_PRESSURE_THRESHOLD ]; + [self _discard_relaxMemoryPressure: ASFK_PRIVSYM_MSG_RELEASE_SAMPLE_SIZE]; BOOL ms=[authmgr isMasterSecretValid:secret matcher:authmgr->secretProcDiscard]; if(ms==NO){ @@ -1725,7 +1923,7 @@ -(BOOL) discardAllMessagesWithSecret:(ASFKMasterSecret*)secret{ } -(BOOL) discardAllMessagesFromMailbox:(id)uid secret:(ASFKPrivateSecret*)secret{ ASFKLog(@"Discarding ALL messages from user %@",uid); - [self _discard_relaxMemoryPressure:ASFK_PRIVSYM_MEM_PRESSURE_THRESHOLD ]; + [self _discard_relaxMemoryPressure: ASFK_PRIVSYM_MSG_RELEASE_SAMPLE_SIZE]; if( !uid){ return NO; } @@ -1742,7 +1940,7 @@ -(BOOL) discardAllMessagesFromMailbox:(id)uid secret:(ASFKPrivateSecret*)secret{ } -(BOOL) discardAllMessagesFromGroup:(id)gid secret:(ASFKPrivateSecret*)secret{ ASFKLog(@"Discarding ALL messages from group %@",gid); - [self _discard_relaxMemoryPressure:ASFK_PRIVSYM_MEM_PRESSURE_THRESHOLD ]; + [self _discard_relaxMemoryPressure: ASFK_PRIVSYM_MSG_RELEASE_SAMPLE_SIZE]; if( !gid){ return NO; @@ -1759,7 +1957,7 @@ -(BOOL) discardAllMessagesFromGroup:(id)gid secret:(ASFKPrivateSecret*)secret{ } -(BOOL) discardUsers:(NSArray*)uids fromGroup:(id)gid secret:(ASFKPrivateSecret*)secret{ ASFKLog(@"Discarding ALL users from group %@",gid); - [self _discard_relaxMemoryPressure:ASFK_PRIVSYM_MEM_PRESSURE_THRESHOLD ]; + [self _discard_relaxMemoryPressure: ASFK_PRIVSYM_OBJ_RELEASE_SAMPLE_SIZE]; if(!uids || !gid){ return NO; } @@ -1783,7 +1981,7 @@ -(BOOL) discardUsers:(NSArray*)uids fromGroup:(id)gid secret:(ASFKPrivateSecret return res; } -(BOOL) discardUser:(id)uid fromGroup:(id)gid secret:(ASFKPrivateSecret*)secret{ - [self _discard_relaxMemoryPressure:ASFK_PRIVSYM_MEM_PRESSURE_THRESHOLD ]; + [self _discard_relaxMemoryPressure: ASFK_PRIVSYM_OBJ_RELEASE_SAMPLE_SIZE]; if(!uid || !gid){ return NO; } @@ -1802,7 +2000,7 @@ -(BOOL) discardUser:(id)uid fromGroup:(id)gid secret:(ASFKPrivateSecret*)secret{ return res; } -(BOOL) discardAllUsersFromGroup:(id)gid secret:(ASFKPrivateSecret*)secret{ - [self _discard_relaxMemoryPressure:ASFK_PRIVSYM_MEM_PRESSURE_THRESHOLD ]; + [self _discard_relaxMemoryPressure: ASFK_PRIVSYM_OBJ_RELEASE_SAMPLE_SIZE]; if(!gid ){ return NO; } @@ -1815,7 +2013,7 @@ -(BOOL) discardAllUsersFromGroup:(id)gid secret:(ASFKPrivateSecret*)secret{ return YES; } -(BOOL) discardUserFromAllGroups:(id)uid secret:(ASFKMasterSecret*)secret{ - [self _discard_relaxMemoryPressure:ASFK_PRIVSYM_MEM_PRESSURE_THRESHOLD ]; + [self _discard_relaxMemoryPressure: ASFK_PRIVSYM_OBJ_RELEASE_SAMPLE_SIZE]; if(!uid ){ return NO; } @@ -1865,7 +2063,7 @@ -(NSUInteger) totalGroups{ [lockGroupsDB unlock]; return c; } --(NSUInteger) totalUsers{ +-(NSUInteger) totalMailboxes{ [lockUsersDB lock]; NSUInteger c=[users count]; [lockUsersDB unlock]; @@ -2051,7 +2249,7 @@ -(void) popEarliestMsg:(NSRange)skipAndTake fromGroup:(id)gid forUser:(id)uid wi #pragma mark - Unicasting -(id) cast:(id)msg forMailbox:(id)uid withProperties:(ASFKMBMsgProperties*)props secret:(ASFKPrivateSecret*)secret{ - [self _cast_relaxMemoryPressure:ASFK_PRIVSYM_MEM_PRESSURE_THRESHOLD ]; + [self _cast_relaxMemoryPressure:ASFK_PRIVSYM_MEM_PRESSURE_MSG_THRESHOLD ]; if(!uid || !msg){ return nil; } @@ -2059,13 +2257,12 @@ -(id) cast:(id)msg forMailbox:(id)uid withProperties:(ASFKMBMsgProperties*)props ASFKSomeContainer* sg=[users objectForKey:uid]; [lockUsersDB unlock]; if(sg && [sg isPrivateSecretValid:secret matcher:authmgr->secretProcUnicast]){ - return [sg addMsg:msg withProperties:props group:NO blockable:NO]; } return nil; } -(id) cast:(id)msg forGroup:(id)gid withProperties:(ASFKMBMsgProperties*)props secret:(ASFKPrivateSecret*)secret{ - [self _cast_relaxMemoryPressure:ASFK_PRIVSYM_MEM_PRESSURE_THRESHOLD ]; + [self _cast_relaxMemoryPressure:ASFK_PRIVSYM_MEM_PRESSURE_MSG_THRESHOLD ]; if(!gid || !msg){ return nil; } @@ -2082,7 +2279,7 @@ -(id) cast:(id)msg forGroup:(id)gid withProperties:(ASFKMBMsgProperties*)props s return res; } -(id) call:(id)msg forMailbox:(id)uid withProperties:(ASFKMBMsgProperties *)props unblockIf:(ASFKCondition*)condition secret:(ASFKPrivateSecret*)secret{ - [self _cast_relaxMemoryPressure:ASFK_PRIVSYM_MEM_PRESSURE_THRESHOLD ]; + [self _cast_relaxMemoryPressure:ASFK_PRIVSYM_MEM_PRESSURE_MSG_THRESHOLD ]; if(!uid || !msg){ return nil; } @@ -2098,7 +2295,7 @@ -(id) call:(id)msg forMailbox:(id)uid withProperties:(ASFKMBMsgProperties *)prop #pragma mark - Multicasting -(BOOL) broadcast:(id)msg withProperties:(ASFKMBMsgProperties*)props secret:(ASFKSecret*)secret{ - [self _cast_relaxMemoryPressure:ASFK_PRIVSYM_MEM_PRESSURE_THRESHOLD ]; + [self _cast_relaxMemoryPressure:ASFK_PRIVSYM_MEM_PRESSURE_MSG_THRESHOLD ]; if(!msg){ return NO; } @@ -2116,7 +2313,7 @@ -(BOOL) broadcast:(id)msg withProperties:(ASFKMBMsgProperties*)props secret:(ASF return YES; } -(id) multicast:(id)msg toMembersOfGroup:(id)g0 secret:(ASFKSecret*)secret{ - [self _cast_relaxMemoryPressure:ASFK_PRIVSYM_MEM_PRESSURE_THRESHOLD ]; + [self _cast_relaxMemoryPressure:ASFK_PRIVSYM_MEM_PRESSURE_MSG_THRESHOLD ]; if(msg==nil){ return nil; } @@ -2153,6 +2350,97 @@ -(id) multicast:(id)msg toMembersOfGroup:(id)g0 secret:(ASFKSecret*)secret{ return nil; } +#pragma mark - Message hiding & retraction +-(BOOL) retractMsg:(id)msgId fromGroup:(id)gid secret:(ASFKPrivateSecret*)secret{ + if(!msgId || !gid) + return NO; + [lockGroupsDB lock]; + ASFKSomeContainer* sg=[groups objectForKey:gid]; + [lockGroupsDB unlock]; + BOOL res=NO; + if(sg && [sg canRetract] &&[sg isPrivateSecretValid:secret matcher:authmgr->secretProcIssuer]){ + res=[sg retractMsg:msgId]; + } + return res; +} +-(BOOL) retractMsg:(id)msgId fromMailbox:(id)uid secret:(ASFKPrivateSecret*)secret{ + if(!msgId || !uid) + return NO; + [lockUsersDB lock]; + ASFKSomeContainer* sg=[users objectForKey:uid]; + [lockUsersDB unlock]; + BOOL res=NO; + if(sg && [sg canRetract] && [sg isPrivateSecretValid:secret matcher:authmgr->secretProcIssuer]){ + res=[sg retractMsg:msgId]; + } + return res; +} +-(BOOL) mute:(BOOL)yesno user:(id)uid inGroup:(id)gid secret:(ASFKPrivateSecret *)secret{ + if(!gid || !uid) + return NO; + [lockGroupsDB lock]; + ASFKSomeContainer* sg=[groups objectForKey:gid]; + [lockGroupsDB unlock]; + if(sg && [sg isPrivateSecretValid:secret matcher:authmgr->secretProcModerate]){ + return [sg mute:yesno user:uid secret:secret group:YES]; + } + return NO; +} +-(BOOL) muteAll:(BOOL)yesno inGroup:(id)gid secret:(ASFKPrivateSecret *)secret{ + if(!gid ) + return NO; + [lockGroupsDB lock]; + ASFKSomeContainer* sg=[groups objectForKey:gid]; + [lockGroupsDB unlock]; + if(sg && [sg isPrivateSecretValid:secret matcher:authmgr->secretProcModerate]){ + return [sg muteAll:yesno secret:secret group:YES]; + } + return NO; +} +-(BOOL) mute:(BOOL)yesno user:(id)uidguest inMailbox:(id)uidhost secret:(ASFKPrivateSecret *)secret{ + if(!uidguest || !uidhost) + return NO; + [lockUsersDB lock]; + ASFKSomeContainer* sg=[users objectForKey:uidhost]; + [lockUsersDB unlock]; + if(sg && [sg isPrivateSecretValid:secret matcher:authmgr->secretProcModerate]){ + return [sg mute:yesno user:uidguest secret:secret group:NO]; + } + return NO; +} +-(BOOL) muteAll:(BOOL)yesno inMailbox:(id)uidhost secret:(ASFKPrivateSecret *)secret{ + if(!uidhost) + return NO; + [lockUsersDB lock]; + ASFKSomeContainer* sg=[users objectForKey:uidhost]; + [lockUsersDB unlock]; + if(sg && [sg isPrivateSecretValid:secret matcher:authmgr->secretProcModerate]){ + return [sg muteAll:yesno secret:secret group:NO]; + } + return NO; +} +-(BOOL) blind:(BOOL)yesno user:(id)uidguest inGroup:(id)gid secret:(ASFKPrivateSecret *)secret{ + if(!gid || !uidguest) + return NO; + [lockGroupsDB lock]; + ASFKSomeContainer* sg=[groups objectForKey:gid]; + [lockGroupsDB unlock]; + if(sg &&[sg isPrivateSecretValid:secret matcher:authmgr->secretProcModerate]){ + return [sg blind:yesno user:uidguest secret:secret]; + } + return NO; +} +-(BOOL) blindAll:(BOOL)yesno inGroup:(id)gid secret:(ASFKPrivateSecret *)secret{ + if(!gid ) + return NO; + [lockGroupsDB lock]; + ASFKSomeContainer* sg=[groups objectForKey:gid]; + [lockGroupsDB unlock]; + if(sg &&[sg isPrivateSecretValid:secret matcher:authmgr->secretProcModerate]){ + return [sg blindAll:yesno secret:secret]; + } + return NO; +} #pragma mark - Private methods -(void) _castToSetOfUsers:(NSSet*) uset msg:(id)msg properties:(ASFKMBMsgProperties*)props{ if(uset && msg){ @@ -2181,12 +2469,12 @@ -(void) _castToSetOfUsers:(NSSet*) uset msg:(id)msg properties:(ASFKMBMsgPropert } -(void) _castToArrayOfUsers:(NSArray*)uarr msg:(id)msg properties:(ASFKMBMsgProperties*)props{ - [lockDB lock]; - NSUInteger defrefcount= [deferredMulticastUsers count]; - [lockDB unlock]; - if(defrefcount>ASFK_PRIVSYM_MEM_PRESSURE_THRESHOLD){ - [self _cast_relaxMemoryPressure:ASFK_PRIVSYM_MEM_PRESSURE_THRESHOLD ]; - } +// [lockDB lock]; +// NSUInteger defrefcount= [deferredMulticastUsers count]; +// [lockDB unlock]; +// if(defrefcount>ASFK_PRIVSYM_MEM_PRESSURE_MSG_THRESHOLD){ + [self _cast_relaxMemoryPressure: ASFK_PRIVSYM_MSG_RELEASE_SAMPLE_SIZE]; +// } if(uarr && msg){ [props setPropMsgId:[NSUUID UUID]]; [lockDB lock]; @@ -2216,26 +2504,40 @@ -(void) _cast_relaxMemoryPressure:(size_t)sampleSize{ NSUInteger defrefcount0= [deferredBroadcasts count]; NSUInteger defrefcount1= [deferredMulticastUsers count]; [lockDB unlock]; - if(defrefcount0 > ASFK_PRIVSYM_MEM_PRESSURE_THRESHOLD || defrefcount1 > ASFK_PRIVSYM_MEM_PRESSURE_THRESHOLD){ + + if(defrefcount0 > ASFK_PRIVSYM_MEM_PRESSURE_MSG_THRESHOLD || ASFK_PRIVSYM_MEM_PRESSURE_MSG_THRESHOLD > ASFK_PRIVSYM_MEM_PRESSURE_MSG_THRESHOLD){ [self runDelivery:sampleSize ]; } + } -(void) _discard_relaxMemoryPressure:(size_t)sampleSize{ + //if(NO==[self _test_mailboxes_limit:ASFK_PRIVSYM_MEM_PRESSURE_MLBX_THRESHOLD]) + //{ + WASFKLog(@"Too many mailboxes or groups created!"); + [self runDiscarding:sampleSize timepoint:[NSDate date]] ; + //} + +} +-(BOOL) _test_mailboxes_limit:(NSUInteger)limit{ [lockDB lock]; NSUInteger defrefcount0= [blacklistedUsers count]; NSUInteger defrefcount1= [blacklistedGroups count]; [lockDB unlock]; - if(defrefcount0 > ASFK_PRIVSYM_MEM_PRESSURE_THRESHOLD || defrefcount1 > ASFK_PRIVSYM_MEM_PRESSURE_THRESHOLD){ - [self runDiscarding:sampleSize timepoint:[NSDate date]]; - } + [lockUsersDB lock]; + NSUInteger ucount= [users count]; + [lockUsersDB unlock]; + [lockGroupsDB lock]; + NSUInteger gcount= [groups count]; + [lockGroupsDB unlock]; + return limit<=defrefcount0+defrefcount1+ucount+gcount; } --(NSArray*) _repackItems:(NSMutableArray*)container sampleSize:(size_t)iter dispQ:(dispatch_queue_t)dq{ +-(NSArray*) _repackItems:(NSMutableArray*)containers sampleSize:(size_t)iter dispQ:(dispatch_queue_t)dq{ NSMutableArray* ma=[NSMutableArray array]; for (NSUInteger i=0; i0){ - [container removeObjectAtIndex:0]; + if([containers count]>0){ + [containers removeObjectAtIndex:0]; } [lockDB unlock]; } @@ -2267,14 +2569,13 @@ -(void) _discard_relaxMemoryPressure:(size_t)sampleSize{ else{ [ma addObject:(ASFKSomeContainer*)obj]; [lockDB lock]; - if([container count]>0){ - [container removeObjectAtIndex:0]; + if([containers count]>0){ + [containers removeObjectAtIndex:0]; } [lockDB unlock]; } - - } + return ma; } @end diff --git a/src/ASFKNonlinearFlow.h b/src/ASFKNonlinearFlow.h index f440e86..dfd245e 100644 --- a/src/ASFKNonlinearFlow.h +++ b/src/ASFKNonlinearFlow.h @@ -12,7 +12,6 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ - // Copyright © 2019-2022 Boris Vigman. All rights reserved. // diff --git a/src/ASFKNonlinearFlow.mm b/src/ASFKNonlinearFlow.mm index a48be4c..75b03f8 100644 --- a/src/ASFKNonlinearFlow.mm +++ b/src/ASFKNonlinearFlow.mm @@ -12,7 +12,6 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ - // Copyright © 2019-2022 Boris Vigman. All rights reserved. // diff --git a/src/ASFKPipelinePar.h b/src/ASFKPipelinePar.h index 026b792..7f43617 100644 --- a/src/ASFKPipelinePar.h +++ b/src/ASFKPipelinePar.h @@ -12,7 +12,6 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ - // Copyright © 2019-2022 Boris Vigman. All rights reserved. // @@ -24,25 +23,25 @@ */ @interface ASFKPipelinePar : ASFKLinearFlow --(BOOL) isPausedSession:(ASFK_IDENTITY_TYPE)sessionId; +-(BOOL) isPausedSession:(_Null_unspecified ASFK_IDENTITY_TYPE)sessionId; +-(long long) itemsCountForSession:(_Null_unspecified id)sessionId; /*! @brief Equals YES if session with given identity exists AND is still processing data batch ; NO otherwise. */ --(BOOL) isBusySession:(ASFK_IDENTITY_TYPE)sessionId; +-(BOOL) isBusySession:(_Null_unspecified ASFK_IDENTITY_TYPE)sessionId; + +-(long long) getRunningSessionsCount; +-(long long) getPausedSessionsCount; -+(long long) runningSessionsCount; -+(long long) pausedSessionsCount; -/*! - @brief Cancels ALL sessions created by ALL instances. - */ -+(void)cancelAllGlobally; /*! @brief Cancels ALL sessions created by this instance. */ -(void)cancelAll; --(void)cancelSession:(ASFK_IDENTITY_TYPE)sessionId; - +/*! + @brief Cancels session with given id. + */ +-(void)cancelSession:(_Null_unspecified ASFK_IDENTITY_TYPE)sessionId; /*! @brief flushes all queued items for all sessions created by this instance. */ @@ -50,12 +49,8 @@ /*! @brief flushes all queued items for given session ID. */ --(void)flushSession:(ASFK_IDENTITY_TYPE)sessionId; +-(void)flushSession:(_Null_unspecified ASFK_IDENTITY_TYPE)sessionId; -/*! - @brief flushes all queued items for all sessions. - */ -+(void)flushAllGlobally; /*! @brief flushes all queued items for all sessions created by this instance. */ @@ -63,12 +58,8 @@ /*! @brief flushes all queued items for given session ID. */ --(void)pauseSession:(ASFK_IDENTITY_TYPE)sessionId; +-(void)pauseSession:(_Null_unspecified ASFK_IDENTITY_TYPE)sessionId; -/*! - @brief flushes all queued items for all sessions. - */ -+(void)pauseAllGlobally; /*! @brief flushes all queued items for all sessions created by this instance. */ @@ -76,12 +67,8 @@ /*! @brief flushes all queued items for given session ID. */ --(void)resumeSession:(ASFK_IDENTITY_TYPE)sessionId; +-(void)resumeSession:(_Null_unspecified ASFK_IDENTITY_TYPE)sessionId; -/*! - @brief flushes all queued items for all sessions. - */ -+(void)resumeAllGlobally; /*! @brief sets new class of QoS (i.e. thread priority). @param newqos required class of Quality of Service . Allowed values are:QOS_CLASS_USER_INTERACTIVE, QOS_CLASS_UTILITY, QOS_CLASS_BACKGROUND. By default QOS_CLASS_BACKGROUND is set. The parameter will be in effect after restart. @@ -92,11 +79,11 @@ @brief returns list of session ID's for all sessions created by this instance. @return Array of session ID's. */ --(NSArray*) getSessions; +-(NSArray* _Null_unspecified) getSessions; /*! @brief creates new non-expiring session associated with this instance. - @param exparams collection of session properties. - @param sid optional name of session. + @param exparams collection of session properties. May be nil; in that case default parameters will be adopted. + @param sid optional name of session. If nil, then random value will be assigned. @return Dictionary of return values. */ -(NSDictionary* _Nonnull) createSession:(ASFKExecutionParams*_Nullable) exparams sessionId:(id _Nullable ) sid; diff --git a/src/ASFKPipelinePar.mm b/src/ASFKPipelinePar.mm index 512fd88..b8c5a7e 100644 --- a/src/ASFKPipelinePar.mm +++ b/src/ASFKPipelinePar.mm @@ -12,7 +12,7 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ - +// // Copyright © 2019-2022 Boris Vigman. All rights reserved. // @@ -78,12 +78,20 @@ -(ASFKPipelineSession*) _createNewSessionWithId:(ASFK_IDENTITY_TYPE)sessionId{ ASFKLog(@"creating new session for id %@",sessionId); ASFKPipelineSession* newseq=[[ASFKPipelineSession alloc]initWithSessionId:sessionId andSubsessionId:nil]; newseq.sessionId=[[newseq getControlBlock]sessionId]; - + newseq->cancellationProc = (id)^(id sessionId){ + if(sessionId){ + [lkNonLocal lock]; + [ctrlblocks removeObjectForKey:sessionId]; + [lkNonLocal unlock]; + } + }; return newseq; } -(ASFKPipelineSession*) _prepareSession:(ASFKPipelineSession*)seq withParams:(ASFKParamSet*) params { [seq replaceRoutinesWithArray:params.procs]; [seq setSummary:params.summary]; + [seq setCancellationHandler:params.cancProc]; + seq->onPauseNotification=params.onPause; [self registerSession:[seq getControlBlock]]; return seq; } @@ -103,18 +111,16 @@ -(void) setQualityOfService:(long)newqos{ /*! @return number of running sessions */ -+(long long) runningSessionsCount{ - return [[ASFKGlobalThreadpool sharedManager] runningSessionsCount]; +-(long long) getRunningSessionsCount{ + return [globalTPool runningSessionsCount]; } /*! @return number of paused sessions */ -+(long long) pausedSessionsCount{ - return [[ASFKGlobalThreadpool sharedManager] pausedSessionsCount]; -} -+(void)flushAllGlobally{ - [[ASFKGlobalThreadpool sharedManager] flushAll]; +-(long long) getPausedSessionsCount{ + return [globalTPool pausedSessionsCount]; } +#pragma mark - Flush/Resume/Cancel -(void)flushAll{ [lkNonLocal lock]; for (id s in ctrlblocks) { @@ -128,7 +134,7 @@ -(void) flushSession:(ASFK_IDENTITY_TYPE)sessionId{ } /*! - @brief flushes all queued items for all sessions created by this instance. + @brief pauses all sessions created by this instance. */ -(void)pauseAll{ [lkNonLocal lock]; @@ -138,16 +144,14 @@ -(void)pauseAll{ [lkNonLocal unlock]; } /*! - @brief flushes all queued items for given session ID. + @brief pauses session for given session ID. */ -(void)pauseSession:(ASFK_IDENTITY_TYPE)sessionId{ [globalTPool pauseSession:sessionId]; } -+(void)pauseAllGloball{ - [[ASFKGlobalThreadpool sharedManager] pauseAll]; -} + /*! - @brief flushes all queued items for all sessions created by this instance. + @brief resumes all sessions created by this instance. */ -(void)resumeAll{ [lkNonLocal lock]; @@ -159,26 +163,6 @@ -(void)resumeAll{ -(void)resumeSession:(ASFK_IDENTITY_TYPE)sessionId{ [globalTPool resumeSession:sessionId]; } -+(void)resumeAllGlobally{ - [[ASFKGlobalThreadpool sharedManager] resumeAll]; -} - --(BOOL) isPausedSession:(ASFK_IDENTITY_TYPE)sessionId{ - return [globalTPool isPausedSession:sessionId]; -} - --(BOOL)isBusySession:(id)sessionId{ - return [globalTPool isBusySession:sessionId]; -} - --(BOOL)isReady{ - return YES; -} - --(long) itemsCountForSession:(id)sessionId{ - return [globalTPool itemsCountForSession:sessionId]; -} - -(void)cancelAll{ [lkNonLocal lock]; for (id s in ctrlblocks) { @@ -187,15 +171,31 @@ -(void)cancelAll{ [lkNonLocal unlock]; [self forgetAllSessions]; } -+(void)cancelAllGlobally{ - [[ASFKGlobalThreadpool sharedManager] cancelAll]; -} + -(void)cancelSession:(NSString*)sessionId{ [globalTPool cancelSession:sessionId]; if(sessionId){ [self forgetSession:sessionId]; } } +#pragma mark - Queries +-(BOOL) isPausedSession:(ASFK_IDENTITY_TYPE)sessionId{ + return [globalTPool isPausedSession:sessionId]; +} + +-(BOOL)isBusySession:(id)sessionId{ + return [globalTPool isBusySession:sessionId]; +} +//-(BOOL) isBusy{ +// return NO; +//} +-(BOOL)isReady{ + return YES; +} + +-(long long) itemsCountForSession:(id)sessionId{ + return [globalTPool itemsCountForSession:sessionId]; +} -(NSDictionary* _Nonnull) createSession:(ASFKExecutionParams*_Nullable) exparams sessionId:(id _Nullable ) sid { @@ -204,14 +204,14 @@ -(NSDictionary* _Nonnull) createSession:(ASFKExecutionParams*_Nullable) exparams ASFKParamSet* params=[self _decodeExParams:exparams forSession:sid]; if(!params.summary) { - params.summary = sumproc; + params.summary = sumProc; } if(!params.procs || [params.procs count]==0) { params.procs = [_backprocs copy]; } if(!params.cancProc){ - params.cancProc = cancelproc; + params.cancProc = cancellationHandler; } //test params @@ -246,7 +246,7 @@ -(NSDictionary* _Nonnull) createSession:(ASFKExecutionParams*_Nullable) exparams } //pass session to execution - BOOL res=[globalTPool addSession:s withId:s.sessionId]; + BOOL res=[globalTPool addSession:s withId:s.sessionId]; dispatch_semaphore_signal(semHighLevelCall); uint64 main_t2=[ASFKBase getTimestamp]; @@ -271,9 +271,8 @@ -(NSArray*) getSessions{ [lkNonLocal unlock]; return sessions; } -#pragma mark - Non-blocking methods - +#pragma mark - Non-blocking methods -(NSDictionary*) _castArray:(ASFKParamSet*)params{ __block uint64 main_t1=[ASFKBase getTimestamp]; DASFKLog(@"ASFKPipelinePar:Object %@: trying to push data items",self.itsName); diff --git a/src/ASFKPipelineSession+Internal.mm b/src/ASFKPipelineSession+Internal.mm index 5f7e94d..a34bf13 100644 --- a/src/ASFKPipelineSession+Internal.mm +++ b/src/ASFKPipelineSession+Internal.mm @@ -12,7 +12,6 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ - // Copyright © 2019-2022 Boris Vigman. All rights reserved. // diff --git a/src/ASFKPipelineSession.h b/src/ASFKPipelineSession.h index ec88400..22cae59 100644 --- a/src/ASFKPipelineSession.h +++ b/src/ASFKPipelineSession.h @@ -12,7 +12,6 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ - // Copyright © 2019-2022 Boris Vigman. All rights reserved. // @@ -20,8 +19,6 @@ #define ASFKPipelineSession_h #import "ASFKBase.h" - - @interface ASFKPipelineSession : ASFKThreadpoolSession @end diff --git a/src/ASFKPipelineSession.mm b/src/ASFKPipelineSession.mm index f1dbc41..709584d 100644 --- a/src/ASFKPipelineSession.mm +++ b/src/ASFKPipelineSession.mm @@ -12,7 +12,6 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ -// // Copyright © 2019-2022 Boris Vigman. All rights reserved. // #define ASFK_LOCAL_REPLACE 0 @@ -41,10 +40,8 @@ @interface ASFKPipelineSession() @end @implementation ASFKPipelineSession{ - - std::atomic busyCount; + std::atomic busyCount; NSMutableArray* dataQueues; - NSRange execRange; std::priority_queue, ASFKComparePriorities> pq; ASFKThreadpoolQueue* queueZero; NSLock* lock; @@ -70,7 +67,6 @@ -(void)_PSinitWithSession:(ASFK_IDENTITY_TYPE)sessionId andSubsession:(ASFK_IDEN excond=[[ASFKExpirationCondition alloc]init]; isStopped=NO; paused=NO; - execRange=NSMakeRange(0, 1); dataQueues=[NSMutableArray array]; queueZero=[[ASFKThreadpoolQueue alloc]init]; if(sessionId){ @@ -87,11 +83,10 @@ -(void)_PSinitWithSession:(ASFK_IDENTITY_TYPE)sessionId andSubsession:(ASFK_IDEN ASFKLog(@"ASFKPipelineSession: Stub summary"); return data; }; - expirationSummary=(id)^(id controlBlock,NSDictionary* stats,id data){ - ASFKLog(@"ASFKPipelineSession: Stub expiration summary"); - return data; - }; + expirationSummary=nil; + onPauseNotification=nil; cancellationHandler=^id(id identity){ + ASFKLog(@"Default cancellation handler"); return nil; }; } @@ -253,9 +248,9 @@ -(void) postDataItem:(id)dataItem{ [lock unlock]; } --(void) addRoutinesFromArray:(NSArray*)ps{ - -} +//-(void) addRoutinesFromArray:(NSArray*)ps{ +// +//} -(void) replaceRoutinesWithArray:(NSArray*)ps{ [lock lock]; @@ -296,13 +291,13 @@ -(void)flush{ } -(void)pause{ [lock lock]; - cblk->paused=YES; + [cblk setPaused: YES]; [lock unlock]; } -(void)resume{ [lock lock]; - cblk->paused=NO; + [cblk setPaused: NO]; [lock unlock]; } @@ -324,8 +319,13 @@ -(BOOL) isBusy{ } -(eASFKThreadpoolExecutionStatus) select:(long)selector routineCancel:(ASFKCancellationRoutine)cancel{ - [lock lock]; + BOOL tval=YES; + if(paused.compare_exchange_strong(tval,NO)){ + [dataQueues enumerateObjectsWithOptions:NSEnumerationConcurrent usingBlock:^(ASFKThreadpoolQueue * _Nonnull obj, NSUInteger idx, BOOL * _Nonnull stop) { + [obj unoccupy]; + }]; + } [self _adoptDataFromZeroQueue]; if(isStopped.load()){ @@ -345,11 +345,33 @@ -(eASFKThreadpoolExecutionStatus) select:(long)selector routineCancel:(ASFKCance [lock unlock]; [self flush]; cancel(self.sessionId); - cru(self.sessionId); + [self _invokeCancellationHandler:cru identity:self.sessionId]; +// cru(self.sessionId); DASFKLog(@"Cancelling... Pt 0, session %@",self.sessionId); [self forgetAllSessions]; return eASFK_ES_WAS_CANCELLED; } + if(busyCount==0){ + ASFKExecutableRoutineSummary expirproc=expirationSummary; + ASFKExpirationCondition* trp=excond; + //std::vector bc={busyCount.load()}; + if(trp){ + [trp setSampleLongLong:busyCount]; + if([trp isConditionMet:nil]){ + [lock unlock]; + DASFKLog(@"<1> Expiring session %@" ,self.sessionId); + [self flush]; + cancel(self.sessionId); + if(expirproc){ + expirproc(cblk,@{},nil); + } + [self forgetAllSessions]; + self->cancellationProc(self.sessionId); + return eASFK_ES_WAS_CANCELLED; + } + } + + } ASFKExecutableRoutineSummary summary; summary=passSummary; @@ -376,7 +398,8 @@ -(eASFKThreadpoolExecutionStatus) select:(long)selector routineCancel:(ASFKCance [lock unlock]; [self flush]; cancel(self.sessionId); - cru(self.sessionId); + [self _invokeCancellationHandler:cru identity:self.sessionId]; + //cru(self.sessionId); [self forgetAllSessions]; DASFKLog(@"Cancelling... Pt 1, session %@",self.sessionId); break; @@ -419,7 +442,8 @@ -(eASFKThreadpoolExecutionStatus) select:(long)selector routineCancel:(ASFKCance [lock unlock]; [self flush]; cancel(self.sessionId); - cru(self.sessionId); + [self _invokeCancellationHandler:cru identity:self.sessionId]; + //cru(self.sessionId); [self forgetAllSessions]; DASFKLog(@"Cancelling... Pt 2, session %@",self.sessionId); break; @@ -432,14 +456,21 @@ -(eASFKThreadpoolExecutionStatus) select:(long)selector routineCancel:(ASFKCance if(summary){ res=summary(cblk,@{},result); } - std::vector bc={busyCount.load()}; - if(trp && [trp isConditionMetForLonglongValues:bc data:result]){ - ASFKLog(@"Expiring session %@",self.sessionId); - [self flush]; - cancel(self.sessionId); - expirproc(cblk,@{},res); - break; + if(trp){ + [trp setSampleLongLong:busyCount]; + if([trp isConditionMet:result]){ + DASFKLog(@"<2> Expiring session %@",self.sessionId); + [self flush]; + cancel(self.sessionId); + if(expirproc){ + expirproc(cblk,@{},res); + } + [self forgetAllSessions]; + self->cancellationProc(self.sessionId); + break; + } } + } else if(curpos. */ - // Copyright © 2019-2022 Boris Vigman. All rights reserved. #ifndef ASFKPrjConfig_h @@ -23,7 +22,12 @@ #define ASFK_PRIVSYM_TP_LOAD_FACTOR 1 #define ASFK_PRIVSYM_QOS_CLASS QOS_CLASS_BACKGROUND -#define ASFK_PRIVSYM_MEM_PRESSURE_THRESHOLD 100000 +#define ASFK_PRIVSYM_MEM_PRESSURE_MSG_THRESHOLD 1000000 +#define ASFK_PRIVSYM_MEM_PRESSURE_MLBX_THRESHOLD 1000000 +#define ASFK_PRIVSYM_PER_MLBX_MAX_MSG_LIMIT 1000000 +#define ASFK_PRIVSYM_MAX_MLBX_LIMIT 100000 +#define ASFK_PRIVSYM_MSG_RELEASE_SAMPLE_SIZE 10000 +#define ASFK_PRIVSYM_OBJ_RELEASE_SAMPLE_SIZE 10000 #define ASFK_CALC_ELAPSED_TIME(starttime, endtime) (endtime-starttime)/double(1e9) #define ASFK_RC_DESCR_DONE @"OK"