Skip to content

Commit

Permalink
Added async connection pool.
Browse files Browse the repository at this point in the history
  • Loading branch information
andot committed Jun 16, 2015
1 parent 2511e52 commit cbd06be
Showing 1 changed file with 56 additions and 27 deletions.
83 changes: 56 additions & 27 deletions src/Hprose/Swoole/Socket/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* *
* hprose swoole socket client library for php 5.3+ *
* *
* LastModified: Apr 19, 2015 *
* LastModified: Jun 16, 2015 *
* Author: Ma Bingyao <[email protected]> *
* *
\**********************************************************/
Expand All @@ -30,12 +30,13 @@ class Client extends \Hprose\Client {
'open_eof_check' => false,
);
public $setting = array();
private $conn_stats = array();
private $sync_client;
private $pool = array();
private $type = SWOOLE_TCP;
private $host = "";
private $port = 0;
private $timeout = 30000;
private $pool_timeout = 100;
private function send($client, $data) {
$len = strlen($data);
if ($len < self::MAX_PACK_LEN - 4) {
Expand Down Expand Up @@ -96,6 +97,7 @@ private function initUrl($url) {
throw new \Exception("Only support tcp, tcp4, tcp6 or unix scheme");
}
$this->sync_client = new \swoole_client($this->type | SWOOLE_KEEP);
$this->pool = array();
}
else {
throw new \Exception("Can't parse this url: " . $url);
Expand Down Expand Up @@ -144,43 +146,70 @@ protected function sendAndReceive($request) {
}
protected function asyncSendAndReceive($request, $use) {
$self = $this;
$client = new \swoole_client($this->type, SWOOLE_SOCK_ASYNC);
$setting = array_replace($this->setting, self::$default_setting);
if (!isset($setting['package_max_length'])) {
$setting['package_max_length'] = $this->return_bytes(ini_get('memory_limit'));
}
if ($setting['package_max_length'] < 0) {
$setting['package_max_length'] = 0x7fffffff;
}
$client->set($setting);
$client->on("connect", function($cli) use ($self, $request, $use) {
if (!$self->send($cli, $request)) {
$self->sendAndReceiveCallback('', new \Exception(socket_strerror($cli->errCode)), $use);
$noop = function($client) {};
$on_connect = function($client) use ($self, $request, $use) {
if (!$self->send($client, $request)) {
$self->sendAndReceiveCallback('', new \Exception(socket_strerror($client->errCode)), $use);
}
});
$client->on("error", function($cli) use ($self, $use) {
$self->sendAndReceiveCallback('', new \Exception(socket_strerror($cli->errCode)), $use);
});
$client->on("receive", function($cli, $data) use ($self, $use) {
swoole_timer_clear($cli->timer);
};
$on_error = function($client) use ($self, $use) {
$self->sendAndReceiveCallback('', new \Exception(socket_strerror($client->errCode)), $use);
};
$on_receive = function($client, $data) use ($self, $use, $noop) {
swoole_timer_clear($client->timer);
$client->on("connect", $noop);
$client->on("error", $noop);
$client->on("receive", $noop);
$client->timer = swoole_timer_after($self->pool_timeout, function () use ($client) { $client->close(); });
array_push($this->pool, $client);
try {
$self->sendAndReceiveCallback(substr($data, 4), null, $use);
}
catch(\Exception $e) {
}
swoole_timer_after(0, function () use ($cli) { $cli->close(); });
});
$client->on("close", function($cli) {});
$client->connect($this->host, $this->port);
$client->timer = swoole_timer_after($this->timeout, function () use ($client) {
$client->close();
});
};
$client = null;
while (count($this->pool) > 0) {
$client = array_pop($this->pool);
if ($client->isConnected()) break;
}
if ($client == null || !$client->isConnected()) {
$client = new \swoole_client($this->type, SWOOLE_SOCK_ASYNC);
$setting = array_replace($this->setting, self::$default_setting);
if (!isset($setting['package_max_length'])) {
$setting['package_max_length'] = $this->return_bytes(ini_get('memory_limit'));
}
if ($setting['package_max_length'] < 0) {
$setting['package_max_length'] = 0x7fffffff;
}
$client->set($setting);
$client->on("connect", $on_connect);
$client->on("error", $on_error);
$client->on("receive", $on_receive);
$client->on("close", $noop);
$client->connect($this->host, $this->port);
}
else {
swoole_timer_clear($client->timer);
$client->on("error", $on_error);
$client->on("receive", $on_receive);
if (!$this->send($client, $request)) {
$this->sendAndReceiveCallback('', new \Exception(socket_strerror($client->errCode)), $use);
}
}
$client->timer = swoole_timer_after($this->timeout, function () use ($client) { $client->close(); });
}
public function setTimeout($timeout) {
$this->timeout = $timeout;
}
public function getTimeout() {
return $this->timeout;
}
public function setPoolTimeout($value) {
$this->pool_timeout = $value;
}
public function getPoolTimeout() {
return $this->pool_timeout;
}
}
}

0 comments on commit cbd06be

Please sign in to comment.