Skip to content

Commit

Permalink
Optimized code and added testing.
Browse files Browse the repository at this point in the history
  • Loading branch information
limingxinleo committed Nov 2, 2019
1 parent 33a219b commit 69599ab
Show file tree
Hide file tree
Showing 10 changed files with 268 additions and 173 deletions.
1 change: 1 addition & 0 deletions phpunit.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
<directory suffix="Test.php">./src/constants/tests</directory>
<directory suffix="Test.php">./src/consul/tests</directory>
<directory suffix="Test.php">./src/database/tests</directory>
<directory suffix="Test.php">./src/db/tests</directory>
<directory suffix="Test.php">./src/db-connection/tests</directory>
<directory suffix="Test.php">./src/di/tests</directory>
<directory suffix="Test.php">./src/dispatcher/tests</directory>
Expand Down
55 changes: 55 additions & 0 deletions src/db/src/AbstractConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,64 @@

namespace Hyperf\DB;

use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Pool\Connection;
use Hyperf\Pool\Exception\ConnectionException;

abstract class AbstractConnection extends Connection implements ConnectionInterface
{
use DetectsLostConnections;
use ManagesTransactions;

/**
* @var array
*/
protected $config = [];

public function getConfig(): array
{
return $this->config;
}

public function release(): void
{
if ($this->transactionLevel() > 0) {
$this->rollBack(0);
if ($this->container->has(StdoutLoggerInterface::class)) {
$logger = $this->container->get(StdoutLoggerInterface::class);
$logger->error('Maybe you\'ve forgotten to commit or rollback the MySQL transaction.');
}
}
$this->pool->release($this);
}

public function getActiveConnection()
{
if ($this->check()) {
return $this;
}

if (! $this->reconnect()) {
throw new ConnectionException('Connection reconnect failed.');
}

return $this;
}

public function retry(\Throwable $throwable, $name, $arguments)
{
if ($this->causedByLostConnection($throwable)) {
try {
$this->reconnect();
return $this->{$name}(...$arguments);
} catch (\Throwable $throwable) {
if ($this->container->has(StdoutLoggerInterface::class)) {
$logger = $this->container->get(StdoutLoggerInterface::class);
$logger->error('Connection execute retry failed. message = ' . $throwable->getMessage());
}
}
}

throw $throwable;
}
}
4 changes: 2 additions & 2 deletions src/db/src/ConnectionInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public function commit(): void;
/**
* Rollback the active database transaction.
*/
public function rollBack(): void;
public function rollBack(?int $toLevel = null): void;

/**
* Run an insert statement against the database.
Expand Down Expand Up @@ -58,5 +58,5 @@ public function query(string $query, array $bindings = []): array;
/**
* Run a select statement and return a single result.
*/
public function fetch(string $query, array $bindings = []): array;
public function fetch(string $query, array $bindings = []);
}
66 changes: 38 additions & 28 deletions src/db/src/DB.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

use Hyperf\DB\Pool\PoolFactory;
use Hyperf\Utils\Context;
use Throwable;

/**
* Class DB.
* @method beginTransaction()
* @method commit()
* @method rollback()
Expand All @@ -36,50 +36,65 @@ class DB
/**
* @var string
*/
protected $poolName = 'default';
protected $poolName;

public function __construct(PoolFactory $factory)
public function __construct(PoolFactory $factory, string $poolName = 'default')
{
$this->factory = $factory;
$this->poolName = $poolName;
}

public function __call($name, $arguments)
{
// Get a connection from coroutine context or connection pool.
$hasContextConnection = Context::has($this->getContextKey());
$hasContextConnection = $this->hasContextConnection($name);
$connection = $this->getConnection($hasContextConnection);
switch ($name) {
case 'beginTransaction':
Context::set($this->getContextKey(), $connection);
$transctionManager = new TransactionManager();
$result = $transctionManager->beginTransaction();
break;
case 'commit':
case 'rollback':
$transctionManager = new TransactionManager();
$result = $transctionManager->{$name}();
break;
default:
$result = $connection->{$name}(...$arguments);

try {
$connection = $connection->getConnection();
$result = $connection->{$name}(...$arguments);
} catch (Throwable $exception) {
$result = $connection->retry($exception, $name, $arguments);
} finally {
if (! $hasContextConnection) {
$connection->release();
}
}

return $result;
}

protected function hasContextConnection($name): bool
{
$hasContextConnection = Context::has($this->getContextKey());
if (! $hasContextConnection) {
if (in_array($name, ['beginTransaction', 'commit', 'rollBack'])) {
return true;
}

return false;
}

return true;
}

/**
* Get a connection from coroutine context, or from mysql connectio pool.
* @param mixed $hasContextConnection
*/
private function getConnection($hasContextConnection): AbstractConnection
protected function getConnection(bool $hasContextConnection): AbstractConnection
{
$connection = null;
if ($hasContextConnection) {
$connection = Context::get($this->getContextKey());
}
if (! $connection instanceof AbstractConnection) {
$pool = $this->factory->getPool($this->poolName);
Context::set('poolId', $pool->getCurrentConnections());
$connection = $pool->get()->getConnection();
$connection = $pool->get();
if ($hasContextConnection) {
Context::set($this->getContextKey(), $connection);
defer(function () use ($connection) {
$connection->release();
});
}
}
return $connection;
}
Expand All @@ -89,11 +104,6 @@ private function getConnection($hasContextConnection): AbstractConnection
*/
private function getContextKey(): string
{
return sprintf('database.%s', $this->poolName);
}

private function getPoolId()
{
return Context::get('poolId');
return sprintf('db.connection.%s', $this->poolName);
}
}
2 changes: 1 addition & 1 deletion src/db/src/ManagesTransactions.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public function commit(): void
*
* @throws Throwable
*/
public function rollBack($toLevel = null): void
public function rollBack(?int $toLevel = null): void
{
// We allow developers to rollback to a certain transaction level. We will verify
// that this given transaction level is valid before attempting to rollback to
Expand Down
21 changes: 3 additions & 18 deletions src/db/src/PDOConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
use Hyperf\Pool\Exception\ConnectionException;
use Hyperf\Pool\Pool;
use PDO;
use PDOStatement;
use Psr\Container\ContainerInterface;

class PDOConnection extends AbstractConnection
{
use ManagesTransactions;

/**
* @var PDO
*/
Expand Down Expand Up @@ -73,30 +72,16 @@ public function __call($name, $arguments)
return $this->connection->{$name}(...$arguments);
}

public function getActiveConnection()
{
if ($this->check()) {
return $this;
}

if (! $this->reconnect()) {
throw new ConnectionException('Connection reconnect failed.');
}

return $this;
}

/**
* Reconnect the connection.
*/
public function reconnect(): bool
{
$dbms = $this->config['driver'];
$host = $this->config['host'];
$dbName = $this->config['database'];
$username = $this->config['username'];
$password = $this->config['password'];
$dsn = "{$dbms}:host={$host};dbname={$dbName}";
$dsn = "mysql:host={$host};dbname={$dbName}";
try {
$pdo = new \PDO($dsn, $username, $password, $this->config['options']);
} catch (\Throwable $e) {
Expand Down Expand Up @@ -133,7 +118,7 @@ public function query(string $query, array $bindings = []): array
return $statement->fetchAll();
}

public function fetch(string $query, array $bindings = []): array
public function fetch(string $query, array $bindings = [])
{
$records = $this->query($query, $bindings);

Expand Down
5 changes: 5 additions & 0 deletions src/db/src/Pool/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,9 @@ public function getName(): string
{
return $this->name;
}

public function getConfig(): array
{
return $this->config;
}
}
53 changes: 53 additions & 0 deletions src/db/tests/Cases/AbstractTestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,64 @@

namespace HyperfTest\DB\Cases;

use Hyperf\Config\Config;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\DB\DB;
use Hyperf\DB\Frequency;
use Hyperf\DB\Pool\PDOPool;
use Hyperf\DB\Pool\PoolFactory;
use Hyperf\Di\Container;
use Hyperf\Pool\Channel;
use Hyperf\Pool\PoolOption;
use Hyperf\Utils\ApplicationContext;
use Hyperf\Utils\Context;
use Mockery;
use PHPUnit\Framework\TestCase;

/**
* Class AbstractTestCase.
*/
abstract class AbstractTestCase extends TestCase
{
protected $driver = 'pdo';

protected function tearDown()
{
Mockery::close();
Context::set('db.connection.default', null);
}

protected function getContainer($options = [])
{
$container = Mockery::mock(Container::class);
$container->shouldReceive('get')->with(ConfigInterface::class)->andReturn(new Config([
'db' => [
'default' => [
'driver' => $this->driver,
'password' => '910123',
'database' => 'hyperf',
'pool' => [
'max_connections' => 20,
],
'options' => $options,
],
],
]));
$container->shouldReceive('make')->with(PDOPool::class, Mockery::any())->andReturnUsing(function ($_, $args) {
return new PDOPool(...array_values($args));
});
$container->shouldReceive('make')->with(Frequency::class, Mockery::any())->andReturn(new Frequency());
$container->shouldReceive('make')->with(PoolOption::class, Mockery::any())->andReturnUsing(function ($_, $args) {
return new PoolOption(...array_values($args));
});
$container->shouldReceive('make')->with(Channel::class, Mockery::any())->andReturnUsing(function ($_, $args) {
return new Channel(...array_values($args));
});
$container->shouldReceive('get')->with(PoolFactory::class)->andReturn($factory = new PoolFactory($container));
$container->shouldReceive('get')->with(DB::class)->andReturn(new DB($factory, 'default'));
$container->shouldReceive('has')->with(StdoutLoggerInterface::class)->andReturn(false);
ApplicationContext::setContainer($container);
return $container;
}
}
Loading

0 comments on commit 69599ab

Please sign in to comment.