Skip to content

Commit

Permalink
Merge pull request #3 from civitaspo/fix-ns-session
Browse files Browse the repository at this point in the history
Fix ns session
  • Loading branch information
civitaspo authored Oct 7, 2019
2 parents 0fdc487 + ed3d2cf commit 2db5865
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ object PgLockNamespace
case "global" => Global
case "site" => Site
case "project" => Project
case "workflow" => Session
case "workflow" => Workflow
case "session" => Session
case "attempt" => Attempt
case unsupported => throw new ConfigException(s"Unsupported namespace: $unsupported")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ class PgLockOperator(context: OperatorContext,
val lockId: UUID = UUID.randomUUID()
doLock(dao, lockId)
pgClient.commit()
logger.info(s"Successfully get the lock (id: $lockId, namespace_type: ${namespace.getType}," +
s" namespace_value: ${namespace.getValue}, owner_attempt_id: ${namespace.getOwnerAttemptId}," +
s" expire_in: ${expireIn.toString}, limit: $limit)")

buildTaskResult(lockId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ case class PgLockPollingWaiter(lastStateParams: Config,
}

if (isSuccess) {
logger.info(s"Successfully finished polling '$pollingName'. (retry_count: $retryCount, total waiting: ${totalWaitingSeconds}s)")
logger.debug(s"Successfully finished polling '$pollingName'. (retry_count: $retryCount, total waiting: ${totalWaitingSeconds}s)")

// NOTE: Remove polling state for the next use of the PgLockPollingWaiter instance.
lastRootState.remove(pollingName)
Expand Down Expand Up @@ -98,7 +98,7 @@ case class PgLockPollingWaiter(lastStateParams: Config,
}
}

logger.info(s"Wait ${nextWaitingSeconds}s for '$pollingName'. (next state => $nextStateParam)")
logger.debug(s"Wait ${nextWaitingSeconds}s for '$pollingName'. (next state => $nextStateParam)")
throw TaskExecutionException.ofNextPolling(nextWaitingSeconds, ConfigElement.copyOf(nextStateParam))
}
}
7 changes: 7 additions & 0 deletions src/test/resources/namespace.dig
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
timezone: UTC

+namespace:
pg_lock>: simple
namespace: ${namespace}
_do:
echo>: simple
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ object DigdagTestUtils

def digdagRun(projectPath: Path,
configString: String,
digString: String): CommandStatus =
digString: String,
params: Map[String, String] = Map()): CommandStatus =
{
val configPath: Path = projectPath.resolve("config")
writeFile(configPath.toFile, configString)
Expand All @@ -70,14 +71,18 @@ object DigdagTestUtils

val logPath: Path = projectPath.resolve("log")

val status: CommandStatus = digdag(
"run",
"--save", projectPath.toAbsolutePath.toString,
"--config", configPath.toString,
"--log", logPath.toString,
"--project", projectPath.toAbsolutePath.toString,
digPath.toString
)
val args: Seq[String] = Seq.newBuilder[String]
.addOne("run")
.addOne("--save").addOne(projectPath.toAbsolutePath.toString)
.addOne("--config").addOne(configPath.toString)
.addOne("--log").addOne(logPath.toString)
.addOne("--project").addOne(projectPath.toAbsolutePath.toString)
.addAll(params.toSeq.map(_.productIterator.mkString("=")).flatMap(Seq("--param", _)))
.addOne(digPath.toString)
.addOne("--no-save")
.result()

val status: CommandStatus = digdag(args: _*)

val log: String = Source.fromFile(logPath.toFile).mkString
status.copy(log = Option(log))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ class PgLockPluginTest
}
}

behavior of "the pg_lock> operator"
behavior of "the pg_lock> operator"
it should "fail with wait_timeout: 0s if another task locks" in {
val digString = readResource("/wait-timeout.dig")

Expand Down Expand Up @@ -416,4 +416,37 @@ class PgLockPluginTest
assert(expectedPattern.findFirstIn(status.log.get).isDefined)
}

it should "fail when namespace is unknown" in {
val digString = readResource("/namespace.dig")

def assertNamespace(namespace: String,
expectError: Boolean = false): Unit =
{
val status: CommandStatus = digdagRun(
projectPath = tmpDir.toPath,
configString = defaultSystemConfig,
digString = digString,
params = Map("namespace" -> namespace)
)

if (expectError) {
assert(status.code == 1)
val expectedPattern: Regex = ("""\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} \+\d{4} \[ERROR\] \(.+?\+main\+namespace\): Configuration error at task \+main\+namespace: Unsupported namespace: """ + namespace + """ \(config\)""").r
assert(expectedPattern.findFirstIn(status.log.get).isDefined)
}
else {
assert(status.code == 0)
val expectedPattern: Regex = ("""\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} \+\d{4} \[INFO\] \(.+?\+main\+namespace\): Successfully get the lock \(id: [0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}, namespace_type: """ + namespace + """, namespace_value: [0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}, owner_attempt_id: .+?, expire_in: .+?, limit: .+?\)""").r
assert(expectedPattern.findFirstIn(status.log.get).isDefined)
}
}

assertNamespace(namespace = "global")
assertNamespace(namespace = "site")
assertNamespace(namespace = "project")
assertNamespace(namespace = "workflow")
assertNamespace(namespace = "session")
assertNamespace(namespace = "attempt")
assertNamespace(namespace = "test", expectError = true)
}
}

0 comments on commit 2db5865

Please sign in to comment.