Skip to content

Commit

Permalink
[BOODMO-33976]prepare() on null problem resolve
Browse files Browse the repository at this point in the history
- scaler fixes
- possibility of been outside coroutine for connect
  • Loading branch information
fon-MaXX authored and mrVrAlex committed Jul 27, 2022
1 parent 01f36aa commit 0fd641a
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 37 deletions.
5 changes: 3 additions & 2 deletions example/cli.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
'driverClass' => \OpsWay\Doctrine\DBAL\Swoole\PgSQL\Driver::class,
'poolSize' => 5, // MAX count connections in one pool
'tickFrequency' => 60000, // when need check possibilities downscale (close) opened connection to DB in pools
'connectionTtl' => 60000, // when connection not used this time - it will be close (free)
'connectionTtl' => 60, // when connection not used this time(seconds) - it will be close (free)
'usedTimes' => 100, // 1 connection (in pool) will be re-used maximum N queries
'connectionDelay' => 2, // time(seconds) for waiting response from pool
'retry' => [
'max_attempts' => 2, // if connection in pool was timeout (before use) then try re-connect
'maxAttempts' => 2, // if connection in pool was timeout (before use) then try re-connect
'delay' => 1, // after this time
]
];
Expand Down
2 changes: 2 additions & 0 deletions example/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@ services:
POSTGRES_PASSWORD: secret
POSTGRES_USER: user
POSTGRES_DB: mydb
ports:
- 5432:5432
container_name: test_dbal_swoole_pgsql_driver_db
18 changes: 16 additions & 2 deletions example/server.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

declare(strict_types=1);

use OpsWay\Doctrine\DBAL\Swoole\PgSQL\Scaler;
use Swoole\Http\Server;
use Swoole\Http\Request;
use Swoole\Http\Response;
Expand All @@ -16,10 +17,11 @@
'driverClass' => \OpsWay\Doctrine\DBAL\Swoole\PgSQL\Driver::class,
'poolSize' => 5, // MAX count connections in one pool
'tickFrequency' => 60000, // when need check possibilities downscale (close) opened connection to DB in pools
'connectionTtl' => 60000, // when connection not used this time - it will be close (free)
'connectionTtl' => 60, // when connection not used this time(seconds) - it will be close (free)
'usedTimes' => 100, // 1 connection (in pool) will be re-used maximum N queries
'connectionDelay' => 2, // time(seconds) for waiting response from pool
'retry' => [
'max_attempts' => 2, // if connection in pool was timeout (before use) then try re-connect
'maxAttempts' => 2, // if connection in pool was timeout (before use) then try re-connect
'delay' => 1, // after this time
]
];
Expand All @@ -29,6 +31,7 @@
[new \OpsWay\Doctrine\DBAL\Swoole\PgSQL\DriverMiddleware($pool)]
);
$connFactory = static fn() => \Doctrine\DBAL\DriverManager::getConnection($connectionParams, $configuration);
$scaler = new Scaler($pool, $connectionParams['tickFrequency']); // will try to free idle connect on connectionTtl overdue

$server = new Swoole\HTTP\Server("0.0.0.0", 9501);

Expand All @@ -49,4 +52,15 @@
});
});

$server->on('workerstart', function() use ($scaler)
{
$scaler->run();
});

$server->on('workerstop', function() use ($pool, $scaler)
{
$pool->close();
$scaler->close();
});

$server->start();
42 changes: 34 additions & 8 deletions src/Swoole/PgSQL/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use function defer;
use function is_resource;
use function sleep;
use function strlen;
use function substr;
use function time;
Expand All @@ -26,11 +27,22 @@ final class Connection implements ConnectionInterface
{
/** @psalm-var array<int, PostgreSQL> */
private array $internalStorage = [];
/** @psalm-var WeakMap<PostgreSQL, ConnectionStats> $statsStorage */
private WeakMap $statsStorage;

public function __construct(private ConnectionPool $pool, private int $retryDelay, private int $maxAttempts)
{
public function __construct(
private ConnectionPoolInterface $pool,
private int $retryDelay,
private int $maxAttempts,
private int $connectionDelay,
) {
/** @psalm-suppress PropertyTypeCoercion */
$this->statsStorage = new WeakMap();
/** Outside of Coroutine Co::getCid() = -1 */
if (Co::getCid() < 1) {
return;
}
/** @psalm-suppress UnusedFunctionCall */
defer(fn () => $this->onDefer());
}

Expand Down Expand Up @@ -168,16 +180,18 @@ public function getNativeConnection() : PostgreSQL
$lastException = null;
for ($i = 0; $i < $this->maxAttempts; $i++) {
try {
/** @psalm-suppress MissingDependency */
[$connection, $stats] = $this->pool->get(2);
[$connection, $stats] = $this->pool->get($this->connectionDelay);
if (! $connection instanceof PostgreSQL) {
throw new DriverException('No connect available in pull');
}
if (! $stats instanceof ConnectionStats) {
throw new DriverException('Provided connect is corrupted');
}
/** @var resource|bool $query */
$query = $connection->query('SELECT 1');
$affectedRows = is_resource($query) ? (int) $connection->affectedRows($query) : 0;
if ($affectedRows !== 1) {
$errCode = trim($connection->errCode);
$errCode = trim((string) $connection->errCode);
throw new ConnectionException(
"Connection ping failed. Trying reconnect (attempt $i). Reason: $errCode"
);
Expand All @@ -189,13 +203,13 @@ public function getNativeConnection() : PostgreSQL
} catch (Throwable $e) {
$errCode = '';
if ($connection instanceof PostgreSQL) {
$errCode = $connection->errCode;
$errCode = (int) $connection->errCode;
$connection = null;
}
$lastException = $e instanceof DBALException
? $e
: new ConnectionException($e->getMessage(), (string) $errCode, '', (int) $e->getCode(), $e);
Co::sleep($this->retryDelay); // Sleep s after failure
$this->sleep($this->retryDelay); // Sleep s after failure
}
}
if (! $connection instanceof PostgreSQL) {
Expand All @@ -215,7 +229,7 @@ public function connectionStats(PostgreSQL $connection) : ?ConnectionStats

private function onDefer() : void
{
$connection = $this->internalStorage[Co::getCid()] ?: null;
$connection = $this->internalStorage[Co::getCid()] ?? null;
if (! $connection instanceof PostgreSQL) {
return;
}
Expand All @@ -224,7 +238,19 @@ private function onDefer() : void
$stats->lastInteraction = time();
}
$this->pool->put($connection);
/** @psalm-suppress MixedArrayOffset */
unset($this->internalStorage[Co::getCid()]);
$this->statsStorage->offsetUnset($connection);
}

private function sleep(int $seconds) : void
{
if (Co::getCid() > 0) {
Co::sleep($seconds);

return;
}
/** @psalm-suppress ArgumentTypeCoercion */
sleep($seconds);
}
}
42 changes: 36 additions & 6 deletions src/Swoole/PgSQL/ConnectionPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
final class ConnectionPool implements ConnectionPoolInterface
{
private ?Channel $pool = null;
/** @psalm-var WeakMap<PostgreSQL, ConnectionStats> $map */
private ?WeakMap $map = null;

public function __construct(
Expand All @@ -29,16 +30,23 @@ public function __construct(
throw new DriverException('Expected, connection pull size > 0');
}
$this->pool = new Channel($this->size);
$this->map = new WeakMap();
/** @psalm-suppress PropertyTypeCoercion */
$this->map = new WeakMap();
}

/** @psalm-return array{0 : PostgreSQL|null, 1 : ConnectionStats|null } */
/** @psalm-return array{PostgreSQL|null, ConnectionStats|null } */
public function get(float $timeout = -1) : array
{
/** Pool was closed */
if (! $this->map || ! $this->pool) {
throw new DriverException('ConnectionPool was closed');
}
/** @var PostgreSQL|null $connection */
$connection = $this->pool->pop($timeout);
if (! $connection instanceof PostgreSQL) {
/** try to fill pull with new connect */
$this->make();
/** @var PostgreSQL|null $connection */
$connection = $this->pool->pop($timeout);
}
if (! $connection instanceof PostgreSQL) {
Expand All @@ -53,6 +61,10 @@ public function get(float $timeout = -1) : array

public function put(PostgreSQL $connection) : void
{
/** Pool was closed */
if (! $this->map || ! $this->pool) {
return;
}
if (! $this->map->offsetExists($connection)) {
return;
}
Expand All @@ -61,7 +73,6 @@ public function put(PostgreSQL $connection) : void

return;
}
/** @psalm-var ConnectionStats|null $stats */
$stats = $this->map[$connection] ?? null;
if (! $stats || $stats->isOverdue()) {
$this->remove($connection);
Expand All @@ -73,14 +84,23 @@ public function put(PostgreSQL $connection) : void

public function close() : void
{
/** Pool was closed */
if (! $this->map || ! $this->pool) {
return;
}
$this->pool->close();
$this->pool = null;
$this->map = null;
}

public function capacity() : int
{
return $this->pool->capacity;
return (int) $this->map?->count();
}

public function length() : int
{
return (int) $this->pool?->length();
}

/**
Expand All @@ -103,21 +123,31 @@ public function __unserialize($data) : void

private function remove(PostgreSQL $connection) : void
{
/** Pool was closed */
if (! $this->map || ! $this->pool) {
return;
}
$this->map->offsetUnset($connection);
unset($connection);
}

private function make() : void
{
if ($this->pool->capacity === $this->map->count()) {
/** Pool was closed */
if (! $this->map || ! $this->pool) {
return;
}
if ($this->pool->capacity === $this->capacity()) {
return;
}
try {
/** @var PostgreSQL $connection */
$connection = ($this->constructor)();
} catch (Throwable) {
throw new Exception('Could not initialize connection with constructor');
}
$this->map[$connection] = new ConnectionStats(time(), 1, $this->connectionTtl, $this->connectionUseLimit);
$this->put($connection);
/** @psalm-suppress PossiblyNullReference */
$this->pool->push($connection);
}
}
5 changes: 3 additions & 2 deletions src/Swoole/PgSQL/ConnectionPoolFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace OpsWay\Doctrine\DBAL\Swoole\PgSQL;

use OpsWay\Doctrine\DBAL\Swoole\PgSQL\Exception\DriverException;
use Swoole\Coroutine\PostgreSQL;

/**
Expand Down Expand Up @@ -37,9 +38,9 @@ class ConnectionPoolFactory
public function __invoke(array $params) : ConnectionPoolInterface
{
/**
* @var int|null $pullSize
* @var int $pullSize
*/
$pullSize = $params['poolSize'] ?? null;
$pullSize = $params['poolSize'] ?? throw new DriverException('poolSize required for connectionPool');
/** @var int|string $usageLimit */
$usageLimit = $params['usedTimes'] ?? self::DEFAULT_USAGE_LIMIT;

Expand Down
4 changes: 3 additions & 1 deletion src/Swoole/PgSQL/ConnectionPoolInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@

interface ConnectionPoolInterface
{
/** @psalm-return array{0 : PostgreSQL|null, 1 : ConnectionStats|null } */
/** @psalm-return array{PostgreSQL|null, ConnectionStats|null } */
public function get(float $timeout = -1) : array;

public function put(PostgreSQL $connection) : void;

public function capacity() : int;

public function length() : int;

public function close() : void;
}
13 changes: 7 additions & 6 deletions src/Swoole/PgSQL/ConnectionStats.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ public function __construct(

public function isOverdue() : bool
{
return match (true) {
! $this->counterLimit && ! $this->ttl,
$this->counterLimit && $this->counterLimit > $this->counter,
$this->ttl && time() - $this->lastInteraction > $this->ttl => false,
default => true
};
if (! $this->counterLimit && ! $this->ttl) {
return false;
}
$counterOverflow = $this->counterLimit !== null && $this->counter > $this->counterLimit;
$ttlOverdue = $this->ttl !== null && time() - $this->lastInteraction > $this->ttl;

return $counterOverflow || $ttlOverdue;
}
}
5 changes: 3 additions & 2 deletions src/Swoole/PgSQL/Driver.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ public function connect(array $params, $username = null, $password = null, array
if (! $this->pool instanceof ConnectionPoolInterface) {
throw new DriverException('Connection pull should be initialized');
}
$retryMaxAttempts = (int) ($params['retry']['max_attempts'] ?? 1);
$retryMaxAttempts = (int) ($params['retry']['maxAttempts'] ?? 1);
$retryDelay = (int) ($params['retry']['delay'] ?? 0);
$connectionDelay = (int) ($params['connectionDelay'] ?? 0);

return new Connection($this->pool, $retryDelay, $retryMaxAttempts);
return new Connection($this->pool, $retryDelay, $retryMaxAttempts, $connectionDelay);
}

/**
Expand Down
16 changes: 8 additions & 8 deletions src/Swoole/PgSQL/Scaler.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class Scaler

private ?int $timerId = null;

public function __construct(private ConnectionPoolInterface $pool, private ?int $tickFrequency)
public function __construct(private ConnectionPoolInterface $pool, private int $tickFrequency = self::DOWNSCALE_TICK_FREQUENCY)
{
}

Expand All @@ -25,25 +25,25 @@ public function run() : void
return;
}
$this->timerId = Timer::tick(
$this->tickFrequency ?? self::DOWNSCALE_TICK_FREQUENCY,
$this->tickFrequency,
fn() => $this->downscale()
) ?: null;
}

/** @psalm-suppress UnusedVariable */
private function downscale() : void
{
$poolCapacity = $this->pool->capacity();
$poolLength = $this->pool->length();
/** @psalm-var PostgreSQL[] $connections */
$connections = [];
while ($poolCapacity > 0) {
/** @psalm-suppress UnusedVariable */
[$connection, $connectionStats] = $this->pool->get();
/** connection never null if poll capacity > 0 */
while ($poolLength > 0) {
[$connection, $connectionStats] = $this->pool->get($this->tickFrequency / 1000);
/** connection never null if pool capacity > 0 */
if (! $connection) {
return;
}
$connections[] = $connection;
$poolCapacity--;
$poolLength--;
}
array_map(fn(PostgreSQL $connection) => $this->pool->put($connection), $connections);
}
Expand Down

0 comments on commit 0fd641a

Please sign in to comment.