diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt index eba1ea311fb0..de43ff2302df 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt @@ -93,7 +93,7 @@ interface DestinationTaskLauncher : TaskLauncher { justification = "arguments are guaranteed to be non-null by Kotlin's type system" ) class DefaultDestinationTaskLauncher( - private val taskScopeProvider: TaskScopeProvider>, + private val taskScopeProvider: TaskScopeProvider, private val catalog: DestinationCatalog, private val config: DestinationConfiguration, private val syncManager: SyncManager, diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/Task.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/Task.kt index 60a03a3d9418..453f9be8e103 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/Task.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/Task.kt @@ -4,8 +4,6 @@ package io.airbyte.cdk.load.task -import io.airbyte.cdk.load.util.CloseableCoroutine - interface Task { suspend fun execute() } @@ -21,24 +19,3 @@ interface TaskLauncher { */ suspend fun run() } - -/** - * Wraps tasks with exception handling. It should perform all necessary exception handling, then - * execute the provided callback. - */ -interface TaskExceptionHandler { - // Wrap a task with exception handling. - suspend fun withExceptionHandling(task: T): U - - // Set a callback that will be invoked when any exception handling is done. - suspend fun setCallback(callback: suspend () -> Unit) -} - -/** Provides the scope(s) in which tasks run. */ -interface TaskScopeProvider : CloseableCoroutine { - /** Launch a task in the correct scope. */ - suspend fun launch(task: T) - - /** Unliked [close], may attempt to fail gracefully, but should guarantee return. */ - suspend fun kill() -} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskScopeProvider.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/TaskScopeProvider.kt similarity index 95% rename from airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskScopeProvider.kt rename to airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/TaskScopeProvider.kt index 4478e39079ea..409a1fbd0d52 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskScopeProvider.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/TaskScopeProvider.kt @@ -54,8 +54,7 @@ interface WrappedTask : Task { @Singleton @Secondary -class DestinationTaskScopeProvider(config: DestinationConfiguration) : - TaskScopeProvider> { +class TaskScopeProvider(config: DestinationConfiguration) { private val log = KotlinLogging.logger {} private val timeoutMs = config.gracefulCancellationTimeoutMs @@ -81,7 +80,7 @@ class DestinationTaskScopeProvider(config: DestinationConfiguration) : private val failFastScope = ControlScope("input", Job(), Dispatchers.IO) - override suspend fun launch(task: WrappedTask) { + suspend fun launch(task: WrappedTask) { val scope = when (task.innerTask) { is InternalScope -> internalScope @@ -97,7 +96,7 @@ class DestinationTaskScopeProvider(config: DestinationConfiguration) : } } - override suspend fun close() { + suspend fun close() { // Under normal operation, all tasks should be complete // (except things like force flush, which loop). So // - it's safe to force cancel the internal tasks @@ -126,7 +125,7 @@ class DestinationTaskScopeProvider(config: DestinationConfiguration) : internalScope.job.cancel() } - override suspend fun kill() { + suspend fun kill() { log.info { "Killing task scopes" } // Terminate tasks which should be immediately terminated failFastScope.job.cancel() diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt index 3c9671d5deaf..78ca3d796e67 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt @@ -76,8 +76,8 @@ import org.junit.jupiter.api.Test "MockScopeProvider", ] ) -class DestinationTaskLauncherTest { - @Inject lateinit var mockScopeProvider: MockScopeProvider +class DestinationTaskLauncherTest { + @Inject lateinit var taskScopeProvider: TaskScopeProvider @Inject lateinit var taskLauncher: DestinationTaskLauncher @Inject lateinit var syncManager: SyncManager @@ -447,42 +447,6 @@ class DestinationTaskLauncherTest { teardownTaskFactory.hasRun.receive() } - @Test - fun testHandleTeardownComplete() = runTest { - // This should close the scope provider. - launch { - taskLauncher.run() - Assertions.assertTrue(mockScopeProvider.didClose) - } - taskLauncher.handleTeardownComplete() - } - - @Test - fun testHandleCallbackWithFailure() = runTest { - launch { - taskLauncher.run() - Assertions.assertTrue(mockScopeProvider.didKill) - } - taskLauncher.handleTeardownComplete(success = false) - } - - @Test - fun `test exceptions in tasks throw`(catalog: DestinationCatalog) = runTest { - mockSpillToDiskTaskFactory.forceFailure.getAndSet(true) - - val job = launch { taskLauncher.run() } - taskLauncher.handleTeardownComplete() - job.join() - - mockFailStreamTaskFactory.didRunFor.close() - - Assertions.assertEquals( - catalog.streams.map { it.descriptor }.toSet(), - mockFailStreamTaskFactory.didRunFor.toList().toSet(), - "FailStreamTask was run for each stream" - ) - } - @Test fun `test sync failure after stream failure`() = runTest { val job = launch { taskLauncher.run() } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt index f1ee31f867fd..3c4a978cf1bc 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt @@ -48,8 +48,7 @@ import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test class DestinationTaskLauncherUTest { - private val taskScopeProvider: TaskScopeProvider> = - mockk(relaxed = true) + private val taskScopeProvider: TaskScopeProvider = mockk(relaxed = true) private val catalog: DestinationCatalog = mockk(relaxed = true) private val syncManager: SyncManager = mockk(relaxed = true) @@ -179,4 +178,52 @@ class DestinationTaskLauncherUTest { ) coVerify(exactly = 1) { closeStreamTaskFactory.make(any(), any()) } } + + @Test + fun `task successful completion triggers scope close`() = runTest { + // This should close the scope provider. + val taskLauncher = getDefaultDestinationTaskLauncher(false) + launch { + taskLauncher.run() + coVerify { taskScopeProvider.close() } + } + taskLauncher.handleTeardownComplete() + } + + @Test + fun `test completion with failure triggers scope kill`() = runTest { + val taskLauncher = getDefaultDestinationTaskLauncher(false) + launch { + taskLauncher.run() + coVerify { taskScopeProvider.kill() } + } + taskLauncher.handleTeardownComplete(success = false) + } + + @Test + fun `test exceptions in tasks throw`() = runTest { + coEvery { spillToDiskTaskFactory.make(any(), any()) } answers + { + val task = mockk(relaxed = true) + coEvery { task.execute() } throws Exception("spill to disk task failed") + task + } + coEvery { taskScopeProvider.launch(any()) } coAnswers + { + val task = firstArg() + task.execute() + } + + val taskLauncher = getDefaultDestinationTaskLauncher(false) + val job = launch { taskLauncher.run() } + taskLauncher.handleTeardownComplete() + job.join() + coVerify { + failStreamTaskFactory.make( + any(), + any(), + match { it.namespace == "namespace" && it.name == "name" } + ) + } + } } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/MockScopeProvider.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/MockScopeProvider.kt deleted file mode 100644 index 62ed56dfbd2c..000000000000 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/MockScopeProvider.kt +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (c) 2024 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.cdk.load.task - -import io.micronaut.context.annotation.Primary -import io.micronaut.context.annotation.Requires -import jakarta.inject.Singleton -import java.util.concurrent.atomic.AtomicBoolean - -@Singleton -@Primary -@Requires(env = ["MockScopeProvider"]) -class MockScopeProvider : TaskScopeProvider> { - private val didCloseAB = AtomicBoolean(false) - private val didKillAB = AtomicBoolean(false) - - val didClose - get() = didCloseAB.get() - val didKill - get() = didKillAB.get() - - override suspend fun launch(task: WrappedTask) { - task.execute() - } - - override suspend fun close() { - didCloseAB.set(true) - } - - override suspend fun kill() { - didKillAB.set(true) - } -}