Skip to content

Commit

Permalink
Optimize kafka reporter of tracer (#6098)
Browse files Browse the repository at this point in the history
  • Loading branch information
huangdijia authored Aug 29, 2023
1 parent 6f3be28 commit 85b89a3
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 9 deletions.
97 changes: 88 additions & 9 deletions src/Adapter/Reporter/KafkaClientFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,113 @@
*/
namespace Hyperf\Tracer\Adapter\Reporter;

use Closure;
use Hyperf\Coordinator\Constants;
use Hyperf\Coordinator\CoordinatorManager;
use Hyperf\Engine\Channel;
use Hyperf\Engine\Coroutine;
use Hyperf\Tracer\Exception\ConnectionClosedException;
use longlang\phpkafka\Producer\Producer;
use longlang\phpkafka\Producer\ProducerConfig;
use Throwable;

class KafkaClientFactory
{
private ?Producer $producer = null;
protected ?Channel $chan = null;

protected ?Producer $producer = null;

protected array $options = [];

protected int $channelSize = 65535;

public function build(array $options): callable
{
$this->producer ??= $this->createProducer($options);
$this->options = $options;
if (isset($options['channel_size'])) {
$this->channelSize = (int) $options['channel_size'];
}

$this->loop();

return function (string $payload) use ($options): void {
$this->producer->send(
$options['topic'] ?? 'zipkin',
$payload,
uniqid('', true)
);
$topic = $options['topic'] ?? 'zipkin';
$key = $options['key'] ?? uniqid('', true);
$headers = $options['headers'] ?? [];
$partitionIndex = $options['partition_index'] ?? null;
$chan = $this->chan;

$chan->push(function () use ($topic, $key, $payload, $headers, $partitionIndex) {
try {
$this->producer->send($topic, $payload, $key, $headers, $partitionIndex);
} catch (Throwable $e) {
throw $e;
}
});

if ($chan->isClosing()) {
throw new ConnectionClosedException('Connection closed.');
}
};
}

private function createProducer(array $options): Producer
public function close(): void
{
$chan = $this->chan;
$producer = $this->producer;
$this->chan = null;
$this->producer = null;

$chan?->close();
$producer?->close();
}

protected function loop(): void
{
if ($this->chan != null) {
return;
}

$this->chan = new Channel($this->channelSize);

Coroutine::create(function () {
while (true) {
$this->producer = $this->makeProducer();
while (true) {
/** @var null|Closure $closure */
$closure = $this->chan?->pop();
if (! $closure) {
break 2;
}
try {
$closure->call($this);
} catch (Throwable) {
$this->producer->close();
break;
} finally {
$closure = null;
}
}
}

$this->close();
});

Coroutine::create(function () {
if (CoordinatorManager::until(Constants::WORKER_EXIT)->yield()) {
$this->close();
}
});
}

protected function makeProducer(): Producer
{
$options = array_replace([
'bootstrap_servers' => '127.0.0.1:9092',
'acks' => -1,
'connect_timeout' => 1,
'send_timeout' => 1,
], $options);
], $this->options);
$config = new ProducerConfig();

$config->setBootstrapServer($options['bootstrap_servers']);
Expand Down
18 changes: 18 additions & 0 deletions src/Exception/ConnectionClosedException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact [email protected]
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Tracer\Exception;

use RuntimeException;

class ConnectionClosedException extends RuntimeException
{
}
18 changes: 18 additions & 0 deletions src/Exception/TimeoutException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact [email protected]
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Tracer\Exception;

use RuntimeException;

class TimeoutException extends RuntimeException
{
}

0 comments on commit 85b89a3

Please sign in to comment.