From 38408e0aeeca94d55e6cada52febc5b505e22c17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C9=91rry=20Shiv=C9=91m?= Date: Fri, 21 Jun 2024 22:12:01 +0530 Subject: [PATCH] feat: Add `runConcurrently` parameter to allow or disallow parallel execution (#17) --------- Signed-off-by: starry-shivam --- .../ktscheduler/executor/CoroutineExecutor.kt | 23 ++++-- .../kotlin/dev/starry/ktscheduler/job/Job.kt | 8 ++ .../ktscheduler/CoroutineExecutorTest.kt | 73 ++++++++++++++----- .../dev/starry/ktscheduler/KtSchedulerTest.kt | 61 ++++++++++++++++ 4 files changed, 142 insertions(+), 23 deletions(-) diff --git a/src/main/kotlin/dev/starry/ktscheduler/executor/CoroutineExecutor.kt b/src/main/kotlin/dev/starry/ktscheduler/executor/CoroutineExecutor.kt index 56379bd..ae9a88f 100644 --- a/src/main/kotlin/dev/starry/ktscheduler/executor/CoroutineExecutor.kt +++ b/src/main/kotlin/dev/starry/ktscheduler/executor/CoroutineExecutor.kt @@ -21,12 +21,16 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch import kotlinx.coroutines.withContext +import java.util.concurrent.ConcurrentHashMap /** * An executor that executes jobs using coroutines. */ class CoroutineExecutor : Executor { + // A map of currently running jobs. + private val runningJobs = ConcurrentHashMap() + /** * Executes the given job. * @@ -37,16 +41,23 @@ class CoroutineExecutor : Executor { override suspend fun execute( job: Job, onSuccess: () -> Unit, onError: (Exception) -> Unit ) { + // If the job is not allowed to run concurrently and a job with the + // same ID is already running, return. + if (!job.runConcurrently && runningJobs.containsKey(job.jobId)) { + return + } + CoroutineScope(job.dispatcher).launch { + // Add the job to the running jobs map. + runningJobs[job.jobId] = job try { job.callback() - withContext(Dispatchers.Default) { - onSuccess() - } + withContext(Dispatchers.Default) { onSuccess() } } catch (exc: Exception) { - withContext(Dispatchers.Default) { - onError(exc) - } + withContext(Dispatchers.Default) { onError(exc) } + } finally { + // Remove the job from the running jobs map. + runningJobs.remove(job.jobId) } } } diff --git a/src/main/kotlin/dev/starry/ktscheduler/job/Job.kt b/src/main/kotlin/dev/starry/ktscheduler/job/Job.kt index f03ca99..25adbd2 100644 --- a/src/main/kotlin/dev/starry/ktscheduler/job/Job.kt +++ b/src/main/kotlin/dev/starry/ktscheduler/job/Job.kt @@ -28,6 +28,7 @@ import java.time.ZonedDateTime * @property jobId A unique identifier for the job. * @property trigger The trigger that determines when the job should run. * @property nextRunTime The next time the job should run. + * @property runConcurrently Whether to run multiple instances of this job concurrently. * @property dispatcher The dispatcher to run the job on. * @property callback The callback function to run when the job is triggered. */ @@ -48,8 +49,15 @@ data class Job( */ val nextRunTime: ZonedDateTime, + /** + * Whether to run multiple instances of this job concurrently. + * Default is true. + */ + val runConcurrently: Boolean = true, + /** * The dispatcher to run the job on. + * Default is [Dispatchers.Default]. */ val dispatcher: CoroutineDispatcher = Dispatchers.Default, diff --git a/src/test/kotlin/dev/starry/ktscheduler/CoroutineExecutorTest.kt b/src/test/kotlin/dev/starry/ktscheduler/CoroutineExecutorTest.kt index aceb51a..37165e5 100644 --- a/src/test/kotlin/dev/starry/ktscheduler/CoroutineExecutorTest.kt +++ b/src/test/kotlin/dev/starry/ktscheduler/CoroutineExecutorTest.kt @@ -23,6 +23,8 @@ import dev.starry.ktscheduler.triggers.OneTimeTrigger import junit.framework.TestCase.assertTrue import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.delay +import kotlinx.coroutines.test.TestCoroutineScheduler import kotlinx.coroutines.test.UnconfinedTestDispatcher import kotlinx.coroutines.test.resetMain import kotlinx.coroutines.test.runTest @@ -54,41 +56,78 @@ class CoroutineExecutorTest { @Test fun testExecuteSuccess(): Unit = runTest { - val job = Job( - jobId = "testJob1", - trigger = trigger, - nextRunTime = ZonedDateTime.now(), - dispatcher = UnconfinedTestDispatcher(testScheduler) - ) { /* Do nothing */ } - + val job = createTestJob(scheduler = testScheduler) { } var onSuccessCalled = false val onSuccess: () -> Unit = { onSuccessCalled = true } val onError: (Throwable) -> Unit = { fail("onError should not be called") } executor.execute(job, onSuccess, onError) - Thread.sleep(100) + delay(50) assertTrue(onSuccessCalled) } @Test fun testExecuteError(): Unit = runTest { - val job = Job( - jobId = "testJob2", - trigger = trigger, - nextRunTime = ZonedDateTime.now(), - dispatcher = UnconfinedTestDispatcher(testScheduler), - callback = { throw IllegalArgumentException("Error") }, - ) + val job = createTestJob(scheduler = testScheduler) { throw IllegalArgumentException("Error") } val onSuccess: () -> Unit = { fail("onSuccess should not be called") } var exception: Throwable? = null val onError: (Throwable) -> Unit = { exception = it } executor.execute(job, onSuccess, onError) - Thread.sleep(100) - + delay(50) assertNotNull(exception) assertTrue(exception is IllegalArgumentException) assertEquals("Error", exception.message) } + + @Test + fun testConcurrentExecution(): Unit = runTest { + // Create a job that takes 100ms to execute. + val job = createTestJob( + scheduler = testScheduler, runConcurrently = true + ) { delay(100) } + + var onSuccessCalled = 0 + val onSuccess: () -> Unit = { onSuccessCalled += 1 } + val onError: (Throwable) -> Unit = { fail("onError should not be called") } + // Execute the job 3 times concurrently. + executor.execute(job, onSuccess, onError) + executor.execute(job, onSuccess, onError) + executor.execute(job, onSuccess, onError) + // Wait for the jobs to complete. + delay(110) + assertEquals(3, onSuccessCalled) + } + + @Test + fun testNonConcurrentExecution(): Unit = runTest { + // Create a job that takes 100ms to execute. + val job = createTestJob(scheduler = testScheduler, runConcurrently = false) { delay(100) } + + var onSuccessCalled = 0 + val onSuccess: () -> Unit = { onSuccessCalled++ } + val onError: (Throwable) -> Unit = { fail("onError should not be called") } + // Execute the job 3 times concurrently. + executor.execute(job, onSuccess, onError) + executor.execute(job, onSuccess, onError) + executor.execute(job, onSuccess, onError) + // Wait for the jobs to complete. + delay(110) + assertEquals(1, onSuccessCalled) + } + + private fun createTestJob( + jobId: String = "job1", + runConcurrently: Boolean = true, + scheduler: TestCoroutineScheduler, + callback: suspend () -> Unit, + ): Job = Job( + jobId = jobId, + trigger = trigger, + nextRunTime = ZonedDateTime.now(), + dispatcher = UnconfinedTestDispatcher(scheduler), + runConcurrently = runConcurrently, + callback = callback + ) } diff --git a/src/test/kotlin/dev/starry/ktscheduler/KtSchedulerTest.kt b/src/test/kotlin/dev/starry/ktscheduler/KtSchedulerTest.kt index 9631584..a32f820 100644 --- a/src/test/kotlin/dev/starry/ktscheduler/KtSchedulerTest.kt +++ b/src/test/kotlin/dev/starry/ktscheduler/KtSchedulerTest.kt @@ -317,6 +317,67 @@ class KtSchedulerTest { assertEquals("longRunningJob", eventListener.completedJobs[0]) } + @Test + fun `scheduler should not execute job concurrently if runConcurrently is false`(): Unit = runTest { + val scheduler = KtScheduler() + + // Create a job that takes 2 seconds to execute + val job = Job( + jobId = "longRunningJob", + trigger = IntervalTrigger(intervalSeconds = 1), + nextRunTime = ZonedDateTime.now(), + callback = { delay(2000) }, + runConcurrently = false + ) + val eventListener = TestJobEventListener() + + scheduler.addJob(job) + scheduler.addEventListener(eventListener) + scheduler.start() + // Job should not be completed yet + assertEquals(0, eventListener.completedJobs.size) + // Wait for 3 seconds + Thread.sleep(3000) + // Assert that the job was only executed once in 3 seconds + // because the job is not run concurrently and it takes 2 seconds to execute + assertEquals(1, eventListener.completedJobs.size) + // Assert that the job was executed twice after 4 seconds + Thread.sleep(1100) + assertEquals(2, eventListener.completedJobs.size) + assertEquals("longRunningJob", eventListener.completedJobs[0]) + assertEquals("longRunningJob", eventListener.completedJobs[1]) + scheduler.shutdown() + } + + @Test + fun `scheduler should execute job concurrently if runConcurrently is true`(): Unit = runTest { + val scheduler = KtScheduler() + + // Create a job that takes 2 seconds to execute + val job = Job( + jobId = "longRunningJob", + trigger = IntervalTrigger(intervalSeconds = 1), + nextRunTime = ZonedDateTime.now(), + callback = { delay(2000) }, + runConcurrently = true + ) + val eventListener = TestJobEventListener() + + scheduler.addJob(job) + scheduler.addEventListener(eventListener) + scheduler.start() + // Job should not be completed yet + assertEquals(0, eventListener.completedJobs.size) + // Wait for 3 seconds and shutdown the scheduler + Thread.sleep(3100) + scheduler.shutdown() + // Assert that the job was executed twice in 3 seconds + // because the job is run concurrently and it takes 2 seconds to execute + assertEquals(2, eventListener.completedJobs.size) + assertEquals("longRunningJob", eventListener.completedJobs[0]) + assertEquals("longRunningJob", eventListener.completedJobs[1]) + } + private fun createTestJob( jobId: String, runAt: ZonedDateTime = ZonedDateTime.now().plusSeconds(1),