Skip to content

Commit

Permalink
Merge pull request #5 from civitaspo/fix-conn-flood
Browse files Browse the repository at this point in the history
Fix conn flood
  • Loading branch information
civitaspo committed Oct 13, 2019
2 parents 97fde2f + a524b1a commit 9be3f1a
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 7 deletions.
9 changes: 7 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
0.0.1.pre3 (2019-10-14)
=======================

* Fix the connection flood.

0.0.1.pre2 (2019-10-07)
=======================

* Fix `org.flywaydb.core.api.FlywayException: Found non-empty schema(s) "public" without schema history table! Use baseline() or set baselineOnMigrate to true to initialize the schema history table.` when using this operator in digdag server mode. [#2](https://github.com/civitaspo/digdag-operator-pg_lock/pull/2)
* Fix a namespace bug: `Unsupported namespace: session (config)` [#3](https://github.com/civitaspo/digdag-operator-pg_lock/pull/3)

0.0.1.pre (2019-10-05)
======================
0.0.1.pre1 (2019-10-05)
=======================

* First Release
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ _export:
repositories:
- https://jitpack.io
dependencies:
- pro.civitaspo:digdag-operator-pg_lock:0.0.1.pre2
- pro.civitaspo:digdag-operator-pg_lock:0.0.1.pre3

+lock-with:
# Wait during 5m until getting the named lock if another task locks.
Expand Down Expand Up @@ -95,6 +95,7 @@ s)?\s*`.
pg_unlock>: ${the lock ID}
force: true
```
* This plugin close connections in `PgLockPgConnectionPooler#finalize()`, so the number of connections does not necessarily exceed value of **pg_lock.max_pool_size**.

# Development

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {
}

group = 'pro.civitaspo'
version = '0.0.1.pre2'
version = '0.0.1.pre3'

def digdagVersion = '0.9.39'
def scalaSemanticVersion = "2.13.0"
Expand Down
2 changes: 1 addition & 1 deletion example/example.dig
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ _export:
- file://${repos}
# - https://jitpack.io
dependencies:
- pro.civitaspo:digdag-operator-pg_lock:0.0.1.pre2
- pro.civitaspo:digdag-operator-pg_lock:0.0.1.pre3

+lock-with:
# This case shows "Hello A" before "Hello B".
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class PgLockPgConnectionPooler(config: PgLockPgConfig)
extends LazyLogging
{
private val hikari: HikariDataSource = createDataSourceWithConnectionPool().tap { hikari =>
logger.debug("Initialized: {}", hikari)
// TODO: ignore errors?
if (config.schemaMigration) PgLockPgDatabaseMigrator(config, hikari).migrate()
}
Expand All @@ -38,7 +39,10 @@ class PgLockPgConnectionPooler(config: PgLockPgConfig)

def shutdown(): Unit =
{
try hikari.close()
try {
hikari.close()
logger.debug("Shutdown: {}", hikari)
}
catch {
case ex: Exception =>
throw Throwables.propagate(ex)
Expand Down Expand Up @@ -75,4 +79,16 @@ class PgLockPgConnectionPooler(config: PgLockPgConfig)

new HikariDataSource(hc)
}

/*
TODO: This class depends on the `finalize()` method to close the pool, it is not a good way.
But, Guice Injector does not manage the lifecycle, so I have to take the way...
ref. https://github.com/google/guice/issues/1069
If someone else has a better way, please tell me or give me a pull-request.
*/

override def finalize(): Unit =
{
shutdown()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@ package pro.civitaspo.digdag.plugin.pg_lock.pg


import com.google.inject.{Inject, Provider}
import com.typesafe.scalalogging.LazyLogging
import io.digdag.client.config.Config


class PgLockPgConnectionPoolerProvider
extends Provider[PgLockPgConnectionPooler]
with LazyLogging
{
@Inject protected var systemConfig: Config = null

lazy private val pooler: PgLockPgConnectionPooler =
new PgLockPgConnectionPooler(config = PgLockPgConfig(systemConfig))

override def get(): PgLockPgConnectionPooler =
{
new PgLockPgConnectionPooler(config = PgLockPgConfig(systemConfig))
pooler
}
}

0 comments on commit 9be3f1a

Please sign in to comment.