Skip to content

Commit

Permalink
prevent connection leak, store connection data in co::context
Browse files Browse the repository at this point in the history
  • Loading branch information
fon-MaXX authored and mrVrAlex committed Sep 1, 2022
1 parent e196f02 commit c822fce
Show file tree
Hide file tree
Showing 7 changed files with 408 additions and 262 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
}
},
"require": {
"php": "^8.0",
"php": "^8.1",
"beberlei/assert": "^3.3",
"doctrine/dbal": "^3.2"
},
Expand Down
512 changes: 327 additions & 185 deletions composer.lock

Large diffs are not rendered by default.

71 changes: 42 additions & 29 deletions src/Swoole/PgSQL/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@

namespace OpsWay\Doctrine\DBAL\Swoole\PgSQL;

use Closure;
use Doctrine\DBAL\Driver\Connection as ConnectionInterface;
use Doctrine\DBAL\Exception as DBALException;
use Doctrine\DBAL\ParameterType;
use Closure;
use OpsWay\Doctrine\DBAL\SQLParserUtils;
use OpsWay\Doctrine\DBAL\Swoole\PgSQL\Exception\ConnectionException;
use OpsWay\Doctrine\DBAL\Swoole\PgSQL\Exception\DriverException;
use Swoole\Coroutine as Co;
use Swoole\Coroutine\Context;
use Swoole\Coroutine\PostgreSQL;
use Throwable;
use WeakMap;

use function defer;
use function is_resource;
Expand All @@ -25,26 +25,13 @@

final class Connection implements ConnectionInterface
{
/** @psalm-var array<int, PostgreSQL> */
private array $internalStorage = [];
/** @psalm-var WeakMap<PostgreSQL, ConnectionStats> $statsStorage */
private WeakMap $statsStorage;

public function __construct(
private ConnectionPoolInterface $pool,
private int $retryDelay,
private int $maxAttempts,
private int $connectionDelay,
private ?Closure $connectConstructor = null,
) {
/** @psalm-suppress PropertyTypeCoercion */
$this->statsStorage = new WeakMap();
/** Outside of Coroutine Co::getCid() = -1 */
if (Co::getCid() < 1) {
return;
}
/** @psalm-suppress UnusedFunctionCall */
defer(fn () => $this->onDefer());
}

/**
Expand All @@ -68,7 +55,7 @@ public function prepare(string $sql) : Statement
}
$connection = $this->getNativeConnection();

return new Statement($connection, $sql, $this->connectionStats($connection));
return new Statement($connection, $sql, $this->connectionStats());
}

/**
Expand All @@ -78,7 +65,7 @@ public function query(string $sql) : Result
{
$connection = $this->getNativeConnection();
$resource = $connection->query($sql);
$stats = $this->connectionStats($connection);
$stats = $this->connectionStats();
if ($stats instanceof ConnectionStats) {
$stats->counter++;
}
Expand Down Expand Up @@ -108,7 +95,7 @@ public function exec(string $sql) : int
{
$connection = $this->getNativeConnection();
$query = $connection->query($sql);
$stats = $this->connectionStats($connection);
$stats = $this->connectionStats();
if ($stats instanceof ConnectionStats) {
$stats->counter++;
}
Expand Down Expand Up @@ -176,7 +163,10 @@ public function errorInfo() : string

public function getNativeConnection() : PostgreSQL
{
$connection = $this->internalStorage[Co::getCid()] ?? null;
$context = $this->getContext();
/** @psalm-suppress MixedArrayAccess, MixedAssignment */
[$connection] = $context[self::class] ?? [null, null];
/** @psalm-suppress RedundantCondition */
if (! $connection instanceof PostgreSQL) {
$lastException = null;
for ($i = 0; $i < $this->maxAttempts; $i++) {
Expand Down Expand Up @@ -205,8 +195,10 @@ public function getNativeConnection() : PostgreSQL
"Connection ping failed. Trying reconnect (attempt $i). Reason: $errCode"
);
}
$this->internalStorage[Co::getCid()] = $connection;
$this->statsStorage[$connection] = $stats;
$context[self::class] = [$connection, $stats];

/** @psalm-suppress UnusedFunctionCall */
defer($this->onDefer(...));

break;
} catch (Throwable $e) {
Expand All @@ -227,31 +219,52 @@ public function getNativeConnection() : PostgreSQL
: throw new ConnectionException('Connection could not be initiated');
}
}
/** @psalm-suppress MixedArrayAccess, MixedAssignment */
[$connection] = $context[self::class] ?? [null];

/** @psalm-suppress RedundantCondition */
if (! $connection instanceof PostgreSQL) {
throw new ConnectionException('Connection in context storage is corrupted');
}

return $this->internalStorage[Co::getCid()];
return $connection;
}

public function connectionStats(PostgreSQL $connection) : ?ConnectionStats
/** @psalm-suppress UnusedVariable, MixedArrayAccess, MixedAssignment */
public function connectionStats() : ?ConnectionStats
{
return $this->statsStorage[$connection] ?? null;
[$connection, $stats] = $this->getContext()[self::class] ?? [null, null];

return $stats;
}

/** @psalm-suppress MixedReturnTypeCoercion */
private function getContext() : Context
{
$context = Co::getContext((int) Co::getCid());
if (! $context instanceof Context) {
throw new ConnectionException('Connection Co::Context unavailable');
}
return $context;
}

private function onDefer() : void
{
if ($this->connectConstructor) {
return;
}
$connection = $this->internalStorage[Co::getCid()] ?? null;
$context = $this->getContext();
/** @psalm-suppress MixedArrayAccess, MixedAssignment */
[$connection, $stats] = $context[self::class] ?? [null, null];
/** @psalm-suppress RedundantCondition */
if (! $connection instanceof PostgreSQL) {
return;
}
$stats = $this->connectionStats($connection);
/** @psalm-suppress TypeDoesNotContainType */
if ($stats instanceof ConnectionStats) {
$stats->lastInteraction = time();
}
$this->pool->put($connection);
/** @psalm-suppress MixedArrayOffset */
unset($this->internalStorage[Co::getCid()]);
$this->statsStorage->offsetUnset($connection);
unset($context[self::class]);
}
}
78 changes: 32 additions & 46 deletions src/Swoole/PgSQL/ConnectionPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
use Throwable;
use WeakMap;

use function gc_collect_cycles;
use function time;

final class ConnectionPool implements ConnectionPoolInterface
{
private ?Channel $pool = null;
private Channel $chan;
/** @psalm-var WeakMap<PostgreSQL, ConnectionStats> $map */
private ?WeakMap $map = null;
private WeakMap $map;

public function __construct(
private Closure $constructor,
Expand All @@ -29,24 +30,20 @@ public function __construct(
if ($this->size < 0) {
throw new DriverException('Expected, connection pull size > 0');
}
$this->pool = new Channel($this->size);
$this->chan = new Channel($this->size);
/** @psalm-suppress PropertyTypeCoercion */
$this->map = new WeakMap();
}

/** @psalm-return array{PostgreSQL|null, ConnectionStats|null } */
public function get(float $timeout = -1) : array
{
/** Pool was closed */
if (! $this->map || ! $this->pool) {
throw new DriverException('ConnectionPool was closed');
}
if ($this->pool->isEmpty()) {
if ($this->chan->isEmpty()) {
/** try to fill pull with new connect */
$this->make();
}
/** @var PostgreSQL|null $connection */
$connection = $this->pool->pop($timeout);
$connection = $this->chan->pop($timeout);
if (! $connection instanceof PostgreSQL) {
return [null, null];
}
Expand All @@ -59,46 +56,43 @@ public function get(float $timeout = -1) : array

public function put(PostgreSQL $connection) : void
{
/** Pool was closed */
if (! $this->map || ! $this->pool) {
return;
}
if (! $this->map->offsetExists($connection)) {
return;
}
if ($this->pool->isFull()) {
$stats = $this->map[$connection] ?? null;
if (! $stats || $stats->isOverdue()) {
$this->remove($connection);

return;
}
$stats = $this->map[$connection] ?? null;
if (! $stats || $stats->isOverdue()) {
if ($this->size <= $this->chan->length()) {
$this->remove($connection);

return;
}
$this->pool->push($connection);

/** to prevent hypothetical freeze if channel is full, will never happen but for sure */
if (! $this->chan->push($connection, 1)) {
$this->remove($connection);
}
}

public function close() : void
{
/** Pool was closed */
if (! $this->map || ! $this->pool) {
return;
}
$this->pool->close();
$this->pool = null;
$this->map = null;
$this->chan->close();
gc_collect_cycles();
}

public function capacity() : int
{
return (int) $this->map?->count();
return $this->map->count();
}

public function length() : int
{
return (int) $this->pool?->length();
return $this->chan->length();
}

public function stats() : array
{
return $this->chan->stats();
}

/**
Expand All @@ -121,31 +115,23 @@ public function __unserialize($data) : void

private function remove(PostgreSQL $connection) : void
{
/** Pool was closed */
if (! $this->map || ! $this->pool) {
return;
}
$this->map->offsetUnset($connection);
unset($connection);
}

private function make() : void
{
/** Pool was closed */
if (! $this->map || ! $this->pool) {
return;
}
if ($this->pool->capacity <= $this->capacity()) {
if ($this->size <= $this->capacity()) {
return;
}
try {
/** @var PostgreSQL $connection */
$connection = ($this->constructor)();
} catch (Throwable) {
throw new Exception('Could not initialize connection with constructor');
/** @var PostgreSQL $connection */
$connection = ($this->constructor)();
/** Allocate to map only after successful push(exclude chanel overflow cause of concurrency)
*
* @psalm-suppress PossiblyNullReference
*/
if ($this->chan->push($connection, 1)) {
$this->map[$connection] = new ConnectionStats(time(), 1, $this->connectionTtl, $this->connectionUseLimit);
}
$this->map[$connection] = new ConnectionStats(time(), 1, $this->connectionTtl, $this->connectionUseLimit);
/** @psalm-suppress PossiblyNullReference */
$this->pool->push($connection);
}
}
2 changes: 2 additions & 0 deletions src/Swoole/PgSQL/ConnectionPoolInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,6 @@ public function capacity() : int;
public function length() : int;

public function close() : void;

public function stats() : array;
}
3 changes: 2 additions & 1 deletion src/Swoole/PgSQL/Driver.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Swoole\Coroutine\PostgreSQL;

use function array_key_exists;
use function filter_var;
use function implode;
use function sprintf;

Expand All @@ -36,7 +37,7 @@ public function connect(array $params, $username = null, $password = null, array
$retryMaxAttempts = (int) ($params['retry']['maxAttempts'] ?? 1);
$retryDelay = (int) ($params['retry']['delay'] ?? 0);
$connectionDelay = (int) ($params['connectionDelay'] ?? 0);
$usePool = filter_var($params['useConnectionPool'], FILTER_VALIDATE_BOOLEAN);
$usePool = filter_var($params['useConnectionPool'], FILTER_VALIDATE_BOOLEAN);
$connectConstructor = $usePool ? null : static fn() : PostgreSQL => self::createConnection(self::generateDSN($params));

return new Connection($this->pool, $retryDelay, $retryMaxAttempts, $connectionDelay, $connectConstructor);
Expand Down
2 changes: 2 additions & 0 deletions src/Swoole/PgSQL/Scaler.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Swoole\Timer;

use function array_map;
use function gc_collect_cycles;

class Scaler
{
Expand Down Expand Up @@ -48,6 +49,7 @@ private function downscale() : void
$poolLength--;
}
array_map(fn(PostgreSQL $connection) => $this->pool->put($connection), $connections);
gc_collect_cycles();
}

public function close() : void
Expand Down

0 comments on commit c822fce

Please sign in to comment.