From d8c75f72b06cc0ed87a97fae3b46b9c2d95d710d Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Mon, 7 Oct 2019 14:59:01 +0900 Subject: [PATCH 1/5] Fix a namespace bug: Unsupported namespace: session (config) --- .../civitaspo/digdag/plugin/pg_lock/lock/PgLockNamespace.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/pg_lock/lock/PgLockNamespace.scala b/src/main/scala/pro/civitaspo/digdag/plugin/pg_lock/lock/PgLockNamespace.scala index fb93806..019288e 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/pg_lock/lock/PgLockNamespace.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/pg_lock/lock/PgLockNamespace.scala @@ -74,7 +74,8 @@ object PgLockNamespace case "global" => Global case "site" => Site case "project" => Project - case "workflow" => Session + case "workflow" => Workflow + case "session" => Session case "attempt" => Attempt case unsupported => throw new ConfigException(s"Unsupported namespace: $unsupported") } From b15c8bba177f0b79d5d987e52560e0685f527f40 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Mon, 7 Oct 2019 14:59:28 +0900 Subject: [PATCH 2/5] Add params option to test digdag run --- .../plugin/pg_lock/DigdagTestUtils.scala | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/src/test/scala/pro/civitaspo/digdag/plugin/pg_lock/DigdagTestUtils.scala b/src/test/scala/pro/civitaspo/digdag/plugin/pg_lock/DigdagTestUtils.scala index b9934a3..da137cf 100644 --- a/src/test/scala/pro/civitaspo/digdag/plugin/pg_lock/DigdagTestUtils.scala +++ b/src/test/scala/pro/civitaspo/digdag/plugin/pg_lock/DigdagTestUtils.scala @@ -60,7 +60,8 @@ object DigdagTestUtils def digdagRun(projectPath: Path, configString: String, - digString: String): CommandStatus = + digString: String, + params: Map[String, String] = Map()): CommandStatus = { val configPath: Path = projectPath.resolve("config") writeFile(configPath.toFile, configString) @@ -70,14 +71,17 @@ object DigdagTestUtils val logPath: Path = projectPath.resolve("log") - val status: CommandStatus = digdag( - "run", - "--save", projectPath.toAbsolutePath.toString, - "--config", configPath.toString, - "--log", logPath.toString, - "--project", projectPath.toAbsolutePath.toString, - digPath.toString - ) + val args: Seq[String] = Seq.newBuilder[String] + .addOne("run") + .addOne("--save").addOne(projectPath.toAbsolutePath.toString) + .addOne("--config").addOne(configPath.toString) + .addOne("--log").addOne(logPath.toString) + .addOne("--project").addOne(projectPath.toAbsolutePath.toString) + .addAll(params.toSeq.map(_.productIterator.mkString("=")).flatMap(Seq("--param", _))) + .addOne(digPath.toString) + .result() + + val status: CommandStatus = digdag(args: _*) val log: String = Source.fromFile(logPath.toFile).mkString status.copy(log = Option(log)) From 50914a5c7a227e3b27122d31a92f5f48892ad327 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Mon, 7 Oct 2019 15:13:48 +0900 Subject: [PATCH 3/5] Enhance the logging for locks --- .../civitaspo/digdag/plugin/pg_lock/lock/PgLockOperator.scala | 3 +++ .../digdag/plugin/pg_lock/lock/PgLockPollingWaiter.scala | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/pg_lock/lock/PgLockOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/pg_lock/lock/PgLockOperator.scala index 64b4251..6a89b4f 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/pg_lock/lock/PgLockOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/pg_lock/lock/PgLockOperator.scala @@ -82,6 +82,9 @@ class PgLockOperator(context: OperatorContext, val lockId: UUID = UUID.randomUUID() doLock(dao, lockId) pgClient.commit() + logger.info(s"Successfully get the lock (id: $lockId, namespace_type: ${namespace.getType}," + + s" namespace_value: ${namespace.getValue}, owner_attempt_id: ${namespace.getOwnerAttemptId}," + + s" expire_in: ${expireIn.toString}, limit: $limit)") buildTaskResult(lockId) } diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/pg_lock/lock/PgLockPollingWaiter.scala b/src/main/scala/pro/civitaspo/digdag/plugin/pg_lock/lock/PgLockPollingWaiter.scala index db53db3..6b268ac 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/pg_lock/lock/PgLockPollingWaiter.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/pg_lock/lock/PgLockPollingWaiter.scala @@ -70,7 +70,7 @@ case class PgLockPollingWaiter(lastStateParams: Config, } if (isSuccess) { - logger.info(s"Successfully finished polling '$pollingName'. (retry_count: $retryCount, total waiting: ${totalWaitingSeconds}s)") + logger.debug(s"Successfully finished polling '$pollingName'. (retry_count: $retryCount, total waiting: ${totalWaitingSeconds}s)") // NOTE: Remove polling state for the next use of the PgLockPollingWaiter instance. lastRootState.remove(pollingName) @@ -98,7 +98,7 @@ case class PgLockPollingWaiter(lastStateParams: Config, } } - logger.info(s"Wait ${nextWaitingSeconds}s for '$pollingName'. (next state => $nextStateParam)") + logger.debug(s"Wait ${nextWaitingSeconds}s for '$pollingName'. (next state => $nextStateParam)") throw TaskExecutionException.ofNextPolling(nextWaitingSeconds, ConfigElement.copyOf(nextStateParam)) } } From 1afcce213d7f4bdd6ae943e26365a7b86a961c82 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Mon, 7 Oct 2019 15:27:22 +0900 Subject: [PATCH 4/5] Write namespace tests --- src/test/resources/namespace.dig | 7 ++++ .../plugin/pg_lock/PgLockPluginTest.scala | 35 ++++++++++++++++++- 2 files changed, 41 insertions(+), 1 deletion(-) create mode 100644 src/test/resources/namespace.dig diff --git a/src/test/resources/namespace.dig b/src/test/resources/namespace.dig new file mode 100644 index 0000000..9a5e14d --- /dev/null +++ b/src/test/resources/namespace.dig @@ -0,0 +1,7 @@ +timezone: UTC + ++namespace: + pg_lock>: simple + namespace: ${namespace} + _do: + echo>: simple \ No newline at end of file diff --git a/src/test/scala/pro/civitaspo/digdag/plugin/pg_lock/PgLockPluginTest.scala b/src/test/scala/pro/civitaspo/digdag/plugin/pg_lock/PgLockPluginTest.scala index 74c2a8d..aa95c02 100644 --- a/src/test/scala/pro/civitaspo/digdag/plugin/pg_lock/PgLockPluginTest.scala +++ b/src/test/scala/pro/civitaspo/digdag/plugin/pg_lock/PgLockPluginTest.scala @@ -373,7 +373,7 @@ class PgLockPluginTest } } - behavior of "the pg_lock> operator" + behavior of "the pg_lock> operator" it should "fail with wait_timeout: 0s if another task locks" in { val digString = readResource("/wait-timeout.dig") @@ -416,4 +416,37 @@ class PgLockPluginTest assert(expectedPattern.findFirstIn(status.log.get).isDefined) } + it should "fail when namespace is unknown" in { + val digString = readResource("/namespace.dig") + + def assertNamespace(namespace: String, + expectError: Boolean = false): Unit = + { + val status: CommandStatus = digdagRun( + projectPath = tmpDir.toPath, + configString = defaultSystemConfig, + digString = digString, + params = Map("namespace" -> namespace) + ) + + if (expectError) { + assert(status.code == 1) + val expectedPattern: Regex = ("""\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} \+\d{4} \[ERROR\] \(.+?\+main\+namespace\): Configuration error at task \+main\+namespace: Unsupported namespace: """ + namespace + """ \(config\)""").r + assert(expectedPattern.findFirstIn(status.log.get).isDefined) + } + else { + assert(status.code == 0) + val expectedPattern: Regex = ("""\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} \+\d{4} \[INFO\] \(.+?\+main\+namespace\): Successfully get the lock \(id: [0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}, namespace_type: """ + namespace + """, namespace_value: [0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}, owner_attempt_id: .+?, expire_in: .+?, limit: .+?\)""").r + assert(expectedPattern.findFirstIn(status.log.get).isDefined) + } + } + + assertNamespace(namespace = "global") + assertNamespace(namespace = "site") + assertNamespace(namespace = "project") + assertNamespace(namespace = "workflow") + assertNamespace(namespace = "session") + assertNamespace(namespace = "attempt") + assertNamespace(namespace = "test", expectError = true) + } } From ed3d2cfdb4a9bafe6aee333b1f23666212d6c7f4 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Mon, 7 Oct 2019 16:14:47 +0900 Subject: [PATCH 5/5] Use --no-save option in test digdag run --- .../pro/civitaspo/digdag/plugin/pg_lock/DigdagTestUtils.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/src/test/scala/pro/civitaspo/digdag/plugin/pg_lock/DigdagTestUtils.scala b/src/test/scala/pro/civitaspo/digdag/plugin/pg_lock/DigdagTestUtils.scala index da137cf..a3299ba 100644 --- a/src/test/scala/pro/civitaspo/digdag/plugin/pg_lock/DigdagTestUtils.scala +++ b/src/test/scala/pro/civitaspo/digdag/plugin/pg_lock/DigdagTestUtils.scala @@ -79,6 +79,7 @@ object DigdagTestUtils .addOne("--project").addOne(projectPath.toAbsolutePath.toString) .addAll(params.toSeq.map(_.productIterator.mkString("=")).flatMap(Seq("--param", _))) .addOne(digPath.toString) + .addOne("--no-save") .result() val status: CommandStatus = digdag(args: _*)