From 9a39679fcec74102e0bb00f2db753eaa6d3f7426 Mon Sep 17 00:00:00 2001 From: Andrei Konovalov Date: Tue, 3 Dec 2024 15:46:04 -0800 Subject: [PATCH] fix(IoT): fixing race condition in AWSIoTStreamThread .cxx_destruct #5452 Related issue: https://github.com/aws-amplify/aws-sdk-ios/issues/5452 Description of changes: 1. Addition of Synchronization Primitives New Properties: - dispatch_queue_t cleanupQueue - dispatch_semaphore_t cleanupSemaphore - BOOL isCleaningUp Purpose: Ensures thread-safe access and modification of critical properties like isRunning, shouldDisconnect, and defaultRunLoopTimer. Synchronization prevents race conditions during cleanup and cancellation processes. 2. Enhanced shouldContinueRunning Method Before: Used direct property access without synchronization After: Introduced synchronization using dispatch_sync for thread-safe checks Purpose:Prevents inconsistencies if multiple threads attempt to read/write properties simultaneously. 3. Cleanup Enhancements performCleanup and cleanupResources: Added explicit synchronization: dispatch_sync and dispatch_semaphore ensure cleanup operations are thread-safe and do not overlap if called multiple times. Handles complex cleanup sequences safely, such as invalidating timers, disconnecting streams, and deallocating the session. Purpose: Ensures that cleanup actions (e.g., closing streams and invalidating timers) are thread-safe and only executed once. 4. Timer Initialization Weak Reference to Prevent Retain Cycles: The timer in setupRunLoop now uses a __weak reference to avoid retain cycles Before: Used a strong reference (target:self), which could result in a retain cycle. Purpose: Avoids potential memory leaks by ensuring the thread does not retain itself via the timer. 5. Improved cancel Method Before: Simple isRunning flag and direct super cancel call After: Introduced thread-safe handling and ensured timer invalidation Purpose: Prevents race conditions when canceling the thread, ensuring timers are invalidated and properties are safely updated. --- AWSIoT/Internal/AWSIoTStreamThread.h | 2 +- AWSIoT/Internal/AWSIoTStreamThread.m | 214 +++++++++++++++------- AWSIoTUnitTests/AWSIoTStreamThreadTests.m | 125 ++++++++++--- 3 files changed, 254 insertions(+), 87 deletions(-) diff --git a/AWSIoT/Internal/AWSIoTStreamThread.h b/AWSIoT/Internal/AWSIoTStreamThread.h index fb2dc572721..97fa9ee5498 100644 --- a/AWSIoT/Internal/AWSIoTStreamThread.h +++ b/AWSIoT/Internal/AWSIoTStreamThread.h @@ -20,7 +20,7 @@ NS_ASSUME_NONNULL_BEGIN @interface AWSIoTStreamThread : NSThread -@property(strong, nullable) void (^onStop)(void); +@property(nonatomic, copy, nullable) void (^onStop)(void); -(instancetype)initWithSession:(nonnull AWSMQTTSession *)session decoderInputStream:(nonnull NSInputStream *)decoderInputStream diff --git a/AWSIoT/Internal/AWSIoTStreamThread.m b/AWSIoT/Internal/AWSIoTStreamThread.m index 2f8ca2f37e3..16ab900a89e 100644 --- a/AWSIoT/Internal/AWSIoTStreamThread.m +++ b/AWSIoT/Internal/AWSIoTStreamThread.m @@ -27,11 +27,16 @@ @interface AWSIoTStreamThread() @property(nonatomic, assign) NSTimeInterval defaultRunLoopTimeInterval; @property(nonatomic, assign) BOOL isRunning; @property(nonatomic, assign) BOOL shouldDisconnect; + +// Add synchronization primitives +@property(nonatomic, strong) dispatch_queue_t cleanupQueue; +@property(nonatomic, strong) dispatch_semaphore_t cleanupSemaphore; +@property(nonatomic, assign) BOOL isCleaningUp; @end @implementation AWSIoTStreamThread -- (nonnull instancetype)initWithSession:(nonnull AWSMQTTSession *)session +- (nonnull instancetype)initWithSession:(nonnull AWSMQTTSession *)session decoderInputStream:(nonnull NSInputStream *)decoderInputStream encoderOutputStream:(nonnull NSOutputStream *)encoderOutputStream { return [self initWithSession:session @@ -40,10 +45,10 @@ - (nonnull instancetype)initWithSession:(nonnull AWSMQTTSession *)session outputStream:nil]; } --(instancetype)initWithSession:(nonnull AWSMQTTSession *)session - decoderInputStream:(nonnull NSInputStream *)decoderInputStream - encoderOutputStream:(nonnull NSOutputStream *)encoderOutputStream - outputStream:(nullable NSOutputStream *)outputStream; { +- (instancetype)initWithSession:(nonnull AWSMQTTSession *)session + decoderInputStream:(nonnull NSInputStream *)decoderInputStream + encoderOutputStream:(nonnull NSOutputStream *)encoderOutputStream + outputStream:(nullable NSOutputStream *)outputStream { if (self = [super init]) { _session = session; _decoderInputStream = decoderInputStream; @@ -51,103 +56,184 @@ -(instancetype)initWithSession:(nonnull AWSMQTTSession *)session _outputStream = outputStream; _defaultRunLoopTimeInterval = 10; _shouldDisconnect = NO; + _isCleaningUp = NO; + + // Initialize synchronization primitives + _cleanupQueue = dispatch_queue_create("com.amazonaws.iot.streamthread.cleanup", DISPATCH_QUEUE_SERIAL); + _cleanupSemaphore = dispatch_semaphore_create(1); } return self; } - (void)main { - AWSDDLogVerbose(@"Started execution of Thread: [%@]", self); - //This is invoked in a new thread by the webSocketDidOpen method or by the Connect method. Get the runLoop from the thread. - self.runLoopForStreamsThread = [NSRunLoop currentRunLoop]; + @autoreleasepool { + AWSDDLogVerbose(@"Started execution of Thread: [%@]", self); + + if (![self setupRunLoop]) { + AWSDDLogError(@"Failed to setup run loop for thread: [%@]", self); + return; + } + + [self startIOOperations]; + + while ([self shouldContinueRunning]) { + @autoreleasepool { + [self.runLoopForStreamsThread runMode:NSDefaultRunLoopMode + beforeDate:[NSDate dateWithTimeIntervalSinceNow:self.defaultRunLoopTimeInterval]]; + } + } + + [self performCleanup]; + + AWSDDLogVerbose(@"Finished execution of Thread: [%@]", self); + } +} - //Setup a default timer to ensure that the RunLoop always has atleast one timer on it. This is to prevent the while loop - //below to spin in tight loop when all input sources and session timers are shutdown during a reconnect sequence. +- (BOOL)setupRunLoop { + if (self.isRunning) { + AWSDDLogError(@"Thread already running"); + return NO; + } + + self.runLoopForStreamsThread = [NSRunLoop currentRunLoop]; + + // Setup timer with weak reference to prevent retain cycles + __weak typeof(self) weakSelf = self; self.defaultRunLoopTimer = [[NSTimer alloc] initWithFireDate:[NSDate dateWithTimeIntervalSinceNow:60.0] - interval:60.0 - target:self - selector:@selector(timerHandler:) - userInfo:nil - repeats:YES]; + interval:60.0 + target:weakSelf + selector:@selector(timerHandler:) + userInfo:nil + repeats:YES]; + + if (!self.defaultRunLoopTimer) { + AWSDDLogError(@"Failed to create run loop timer"); + return NO; + } [self.runLoopForStreamsThread addTimer:self.defaultRunLoopTimer forMode:NSDefaultRunLoopMode]; - self.isRunning = YES; + return YES; +} + +- (void)startIOOperations { if (self.outputStream) { [self.outputStream scheduleInRunLoop:self.runLoopForStreamsThread - forMode:NSDefaultRunLoopMode]; + forMode:NSDefaultRunLoopMode]; [self.outputStream open]; } - - //Update the runLoop and runLoopMode in session. [self.session connectToInputStream:self.decoderInputStream outputStream:self.encoderOutputStream]; +} - while (self.isRunning && !self.isCancelled) { - //This will continue run until the thread is cancelled - //Run one cycle of the runloop. This will return after a input source event or timer event is processed - [self.runLoopForStreamsThread runMode:NSDefaultRunLoopMode - beforeDate:[NSDate dateWithTimeIntervalSinceNow:self.defaultRunLoopTimeInterval]]; - } - - [self cleanUp]; +- (BOOL)shouldContinueRunning { + __block BOOL shouldRun; + dispatch_sync(self.cleanupQueue, ^{ + shouldRun = self.isRunning && !self.isCancelled && self.defaultRunLoopTimer != nil; + }); + return shouldRun; +} - AWSDDLogVerbose(@"Finished execution of Thread: [%@]", self); +- (void)invalidateTimer { + dispatch_sync(self.cleanupQueue, ^{ + if (self.defaultRunLoopTimer) { + [self.defaultRunLoopTimer invalidate]; + self.defaultRunLoopTimer = nil; + } + }); } - (void)cancel { AWSDDLogVerbose(@"Issued Cancel on thread [%@]", (NSThread *)self); - self.isRunning = NO; - [super cancel]; + [self cancelWithDisconnect:NO]; } - (void)cancelAndDisconnect:(BOOL)shouldDisconnect { - AWSDDLogVerbose(@"Issued Cancel and Disconnect = [%@] on thread [%@]", shouldDisconnect ? @"YES" : @"NO", (NSThread *)self); - self.shouldDisconnect = shouldDisconnect; - self.isRunning = NO; - [super cancel]; + AWSDDLogVerbose(@"Issued Cancel and Disconnect = [%@] on thread [%@]", + shouldDisconnect ? @"YES" : @"NO", (NSThread *)self); + [self cancelWithDisconnect:shouldDisconnect]; } -- (void)cleanUp { - if (self.defaultRunLoopTimer) { - [self.defaultRunLoopTimer invalidate]; - self.defaultRunLoopTimer = nil; - } - - if (self.shouldDisconnect) { - if (self.session) { - [self.session close]; - self.session = nil; - } - - if (self.outputStream) { - self.outputStream.delegate = nil; - [self.outputStream close]; - [self.outputStream removeFromRunLoop:self.runLoopForStreamsThread - forMode:NSDefaultRunLoopMode]; - self.outputStream = nil; +- (void)cancelWithDisconnect:(BOOL)shouldDisconnect { + // Ensure thread-safe property updates + dispatch_sync(self.cleanupQueue, ^{ + if (!self.isCleaningUp) { + self.shouldDisconnect = shouldDisconnect; + self.isRunning = NO; + [super cancel]; + + // Invalidate timer to trigger run loop exit + [self invalidateTimer]; } + }); +} - if (self.decoderInputStream) { - [self.decoderInputStream close]; - self.decoderInputStream = nil; - } +- (void)performCleanup { + dispatch_semaphore_wait(self.cleanupSemaphore, DISPATCH_TIME_FOREVER); + + if (self.isCleaningUp) { + dispatch_semaphore_signal(self.cleanupSemaphore); + return; + } + + self.isCleaningUp = YES; + dispatch_semaphore_signal(self.cleanupSemaphore); + + dispatch_sync(self.cleanupQueue, ^{ + [self cleanupResources]; + }); +} - if (self.encoderOutputStream) { - [self.encoderOutputStream close]; - self.encoderOutputStream = nil; - } +- (void)cleanupResources { + if (self.shouldDisconnect) { + [self closeSession]; + [self closeStreams]; } else { AWSDDLogVerbose(@"Skipping disconnect for thread: [%@]", (NSThread *)self); } - - if (self.onStop) { - self.onStop(); + + // Handle onStop callback + dispatch_block_t stopBlock = self.onStop; + if (stopBlock) { self.onStop = nil; + stopBlock(); + } +} + +- (void)closeSession { + if (self.session) { + [self.session close]; + self.session = nil; + } +} + +- (void)closeStreams { + if (self.outputStream) { + self.outputStream.delegate = nil; + [self.outputStream close]; + [self.outputStream removeFromRunLoop:self.runLoopForStreamsThread + forMode:NSDefaultRunLoopMode]; + self.outputStream = nil; + } + + if (self.decoderInputStream) { + [self.decoderInputStream close]; + self.decoderInputStream = nil; + } + + if (self.encoderOutputStream) { + [self.encoderOutputStream close]; + self.encoderOutputStream = nil; } } - (void)timerHandler:(NSTimer*)theTimer { - AWSDDLogVerbose(@"Default run loop timer executed on Thread: [%@]. isRunning = %@. isCancelled = %@", self, self.isRunning ? @"YES" : @"NO", self.isCancelled ? @"YES" : @"NO"); + AWSDDLogVerbose(@"Default run loop timer executed on Thread: [%@]. isRunning = %@. isCancelled = %@", + self, self.isRunning ? @"YES" : @"NO", self.isCancelled ? @"YES" : @"NO"); +} + +- (void)dealloc { + AWSDDLogVerbose(@"Deallocating AWSIoTStreamThread: [%@]", self); } @end diff --git a/AWSIoTUnitTests/AWSIoTStreamThreadTests.m b/AWSIoTUnitTests/AWSIoTStreamThreadTests.m index 89a84ae7aed..11b61fa055c 100644 --- a/AWSIoTUnitTests/AWSIoTStreamThreadTests.m +++ b/AWSIoTUnitTests/AWSIoTStreamThreadTests.m @@ -18,7 +18,17 @@ #import "AWSIoTStreamThread.h" @interface AWSIoTStreamThread() + @property(nonatomic, assign) NSTimeInterval defaultRunLoopTimeInterval; +@property (nonatomic, assign) BOOL isRunning; +@property (nonatomic, strong) dispatch_queue_t cleanupQueue; +@property (nonatomic, assign) BOOL isCleaningUp; +@property (nonatomic, strong, nullable) NSTimer *defaultRunLoopTimer; +@property (nonatomic, strong, nullable) NSRunLoop *runLoopForStreamsThread; + +- (void)invalidateTimer; + + @end @@ -51,11 +61,13 @@ - (void)setUp { encoderOutputStream:self.encoderOutputStream outputStream:self.outputStream]; self.thread.defaultRunLoopTimeInterval = 0.1; + [self.thread start]; [self waitForExpectations:@[startExpectation] timeout:1]; } - (void)tearDown { + [self.thread cancelAndDisconnect:YES]; self.thread = nil; self.session = nil; self.decoderInputStream = nil; @@ -67,6 +79,7 @@ - (void)tearDown { /// When: The thread is started /// Then: The output stream is opened and the session is connected to the decoder and encoder streams - (void)testStart_shouldOpenStream_andInvokeConnectOnSession { + OCMVerify([self.outputStream scheduleInRunLoop:[OCMArg any] forMode:NSDefaultRunLoopMode]); OCMVerify([self.outputStream open]); OCMVerify([self.session connectToInputStream:[OCMArg any] outputStream:[OCMArg any]]); } @@ -87,44 +100,112 @@ - (void)testCancelAndDisconnect_shouldCloseStreams_andInvokeOnStop { OCMVerify([self.encoderOutputStream close]); OCMVerify([self.outputStream close]); OCMVerify([self.session close]); + XCTAssertFalse(self.thread.isRunning); } /// Given: A running AWSIoTStreamThread /// When: The thread is cancelled with disconnect set to NO /// Then: Neither the session nor the streams are closed - (void)testCancel_shouldNotCloseStreams_andInvokeOnStop { + XCTestExpectation *stopExpectation = [self expectationWithDescription:@"AWSIoTStreamThread.onStop expectation"]; + self.thread.onStop = ^{ + [stopExpectation fulfill]; + }; + + __block BOOL didInvokeSessionClose = NO; + OCMStub([self.session close]).andDo(^(NSInvocation *invocation) { + didInvokeSessionClose = YES; + }); + + __block BOOL didInvokeDecoderInputStreamClose = NO; + OCMStub([self.decoderInputStream close]).andDo(^(NSInvocation *invocation) { + didInvokeDecoderInputStreamClose = YES; + }); + + __block BOOL didInvokeEncoderOutputStreamClose = NO; + OCMStub([self.encoderOutputStream close]).andDo(^(NSInvocation *invocation) { + didInvokeEncoderOutputStreamClose = YES; + }); + + __block BOOL didInvokeOutputStreamClose = NO; + OCMStub([self.outputStream close]).andDo(^(NSInvocation *invocation) { + didInvokeOutputStreamClose = YES; + }); + + [self.thread cancelAndDisconnect:NO]; + [self waitForExpectations:@[stopExpectation] timeout:1]; + + XCTAssertFalse(didInvokeSessionClose, @"The `close` method on `session` should not be invoked"); + XCTAssertFalse(didInvokeDecoderInputStreamClose, @"The `close` method on `decoderInputStream` should not be invoked"); + XCTAssertFalse(didInvokeEncoderOutputStreamClose, @"The `close` method on `encoderOutputStream` should not be invoked"); + XCTAssertFalse(didInvokeOutputStreamClose, @"The `close` method on `outputStream` should not be invoked"); +} + +- (void)testCancelAndDisconnect_shouldSetIsCleaningUp { XCTestExpectation *stopExpectation = [self expectationWithDescription:@"AWSIoTStreamThread.onStop expectation"]; self.thread.onStop = ^{ [stopExpectation fulfill]; }; - __block BOOL didInvokeSessionClose = NO; - [OCMStub([self.session close]) andDo:^(NSInvocation *invocation) { - didInvokeSessionClose = YES; - }]; + [self.thread cancelAndDisconnect:YES]; - __block BOOL didInvokeDecoderInputStreamClose = NO; - [OCMStub([self.decoderInputStream close]) andDo:^(NSInvocation *invocation) { - didInvokeDecoderInputStreamClose = YES; - }]; + __block BOOL isCleaningUp; + dispatch_sync(self.thread.cleanupQueue, ^{ + isCleaningUp = self.thread.isCleaningUp; + }); - __block BOOL didInvokeEncoderDecoderInputStreamClose = NO; - [OCMStub([self.encoderOutputStream close]) andDo:^(NSInvocation *invocation) { - didInvokeEncoderDecoderInputStreamClose = YES; - }]; + XCTAssertTrue(isCleaningUp, @"isCleaningUp should be YES during cleanup"); + [self waitForExpectations:@[stopExpectation] timeout:1]; +} - __block BOOL didInvokeOutputStreamClose = NO; - [OCMStub([self.outputStream close]) andDo:^(NSInvocation *invocation) { - didInvokeOutputStreamClose = YES; - }]; +- (void)testInvalidateTimer_shouldInvalidateAndSetTimerToNil { + [self.thread invalidateTimer]; - [self.thread cancelAndDisconnect:NO]; - [self waitForExpectations:@[stopExpectation] timeout:1]; + __block BOOL isTimerInvalidated = NO; + OCMStub([self.thread.defaultRunLoopTimer invalidate]).andDo(^(NSInvocation *invocation) { + isTimerInvalidated = YES; + }); + + XCTAssertNil(self.thread.defaultRunLoopTimer, @"defaultRunLoopTimer should be nil after invalidation"); + XCTAssertTrue(isTimerInvalidated, @"Timer invalidate method should have been called"); +} + +- (void)testRunLoop_shouldInvokeRunModeBeforeDate { + id mockRunLoop = OCMClassMock([NSRunLoop class]); + OCMStub([mockRunLoop runMode:NSDefaultRunLoopMode beforeDate:[OCMArg any]]); - XCTAssertFalse(didInvokeSessionClose); - XCTAssertFalse(didInvokeDecoderInputStreamClose); - XCTAssertFalse(didInvokeEncoderDecoderInputStreamClose); - XCTAssertFalse(didInvokeOutputStreamClose); + // Replace the real run loop with the mock + self.thread.runLoopForStreamsThread = mockRunLoop; + + // Let the thread run for a brief moment + XCTestExpectation *runLoopExpectation = [self expectationWithDescription:@"RunLoop expectation"]; + dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(0.1 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{ + [self.thread cancelAndDisconnect:YES]; + [runLoopExpectation fulfill]; + }); + + [self waitForExpectations:@[runLoopExpectation] timeout:1]; + + OCMVerify([mockRunLoop runMode:NSDefaultRunLoopMode beforeDate:[OCMArg any]]); + [mockRunLoop stopMocking]; +} + +- (void)testCancelAndDisconnect_shouldSynchronizeOnCleanupQueue { + XCTestExpectation *stopExpectation = [self expectationWithDescription:@"AWSIoTStreamThread.onStop expectation"]; + self.thread.onStop = ^{ + [stopExpectation fulfill]; + }; + + [self.thread cancelAndDisconnect:YES]; + + // Validate synchronization + __block BOOL didSynchronize = NO; + dispatch_sync(self.thread.cleanupQueue, ^{ + didSynchronize = YES; + }); + + XCTAssertTrue(didSynchronize, @"The cleanupQueue should synchronize the operations"); + [self waitForExpectations:@[stopExpectation] timeout:1]; } @end