Skip to content

Commit

Permalink
Merge branch 'master' into 3.1-merge
Browse files Browse the repository at this point in the history
  • Loading branch information
limingxinleo committed Sep 1, 2023
2 parents 2521fbc + f8e7986 commit d9f7bb8
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 31 deletions.
115 changes: 90 additions & 25 deletions src/Adapter/Reporter/HttpClientFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,41 +11,106 @@
*/
namespace Hyperf\Tracer\Adapter\Reporter;

use Hyperf\Guzzle\ClientFactory as GuzzleClientFactory;
use Closure;
use Hyperf\Coordinator\Constants;
use Hyperf\Coordinator\CoordinatorManager;
use Hyperf\Engine\Channel;
use Hyperf\Engine\Coroutine;
use Hyperf\Guzzle\ClientFactory;
use RuntimeException;
use Zipkin\Reporters\Http\ClientFactory;
use Throwable;
use Zipkin\Reporters\Http\ClientFactory as ClientFactoryInterface;

class HttpClientFactory implements ClientFactory
class HttpClientFactory implements ClientFactoryInterface
{
public function __construct(private GuzzleClientFactory $guzzleClientFactory)
protected ?Channel $chan = null;

protected int $channelSize = 65535;

public function __construct(private ClientFactory $clientFactory)
{
}

public function build(array $options): callable
{
$this->loop();

return function (string $payload) use ($options): void {
$url = $options['endpoint_url'];
unset($options['endpoint_url']);
$client = $this->guzzleClientFactory->create($options);
$additionalHeaders = $options['headers'] ?? [];
$requiredHeaders = [
'Content-Type' => 'application/json',
'Content-Length' => strlen($payload),
'b3' => '0',
];
$headers = array_merge($additionalHeaders, $requiredHeaders);
$response = $client->post($url, [
'body' => $payload,
'headers' => $headers,
// If 'no_aspect' option is true, then the HttpClientAspect will not modify the client options.
'no_aspect' => true,
]);
$statusCode = $response->getStatusCode();
if ($statusCode !== 202) {
throw new RuntimeException(
sprintf('Reporting of spans failed, status code %d', $statusCode)
);
$chan = $this->chan;
$clientFactory = $this->clientFactory;

$chan->push(static function () use ($payload, $options, $clientFactory) {
$url = $options['endpoint_url'];
unset($options['endpoint_url']);
$client = $clientFactory->create($options);
$additionalHeaders = $options['headers'] ?? [];
$requiredHeaders = [
'Content-Type' => 'application/json',
'Content-Length' => strlen($payload),
'b3' => '0',
];
$headers = array_merge($additionalHeaders, $requiredHeaders);
$response = $client->post($url, [
'body' => $payload,
'headers' => $headers,
// If 'no_aspect' option is true, then the HttpClientAspect will not modify the client options.
'no_aspect' => true,
]);
$statusCode = $response->getStatusCode();
if (! in_array($statusCode, [200, 202])) {
throw new RuntimeException(
sprintf('Reporting of spans failed, status code %d', $statusCode)
);
}
});

if ($chan->isClosing()) {
throw new RuntimeException('Connection closed.');
}
};
}

public function close(): void
{
$chan = $this->chan;
$this->chan = null;

$chan?->close();
}

protected function loop(): void
{
if ($this->chan != null) {
return;
}

$this->chan = new Channel($this->channelSize);

Coroutine::create(function () {
while (true) {
while (true) {
/** @var null|Closure $closure */
$closure = $this->chan?->pop();
if (! $closure) {
break 2;
}
try {
$closure();
} catch (Throwable) {
break;
} finally {
$closure = null;
}
}
}

$this->close();
});

Coroutine::create(function () {
if (CoordinatorManager::until(Constants::WORKER_EXIT)->yield()) {
$this->close();
}
});
}
}
3 changes: 2 additions & 1 deletion src/Adapter/Reporter/KafkaClientFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
use longlang\phpkafka\Producer\Producer;
use longlang\phpkafka\Producer\ProducerConfig;
use Throwable;
use Zipkin\Reporters\Http\ClientFactory;

class KafkaClientFactory
class KafkaClientFactory implements ClientFactory
{
protected ?Channel $chan = null;

Expand Down
16 changes: 11 additions & 5 deletions src/Adapter/Reporter/ReporterFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
*/
namespace Hyperf\Tracer\Adapter\Reporter;

use Hyperf\Contract\StdoutLoggerInterface;
use Psr\Container\ContainerInterface;
use RuntimeException;
use Zipkin\Reporter;

Expand All @@ -19,7 +21,7 @@
class ReporterFactory
{
public function __construct(
private HttpClientFactory $httpClientFactory,
private ContainerInterface $container
) {
}

Expand All @@ -28,10 +30,6 @@ public function make(array $option = []): Reporter
$class = $option['class'] ?? '';
$constructor = $option['constructor'] ?? [];

if ($class === \Zipkin\Reporters\Http::class) {
$option['constructor']['requesterFactory'] = $this->httpClientFactory;
}

if (! class_exists($class)) {
throw new RuntimeException(sprintf('Class %s is not exists.', $class));
}
Expand All @@ -40,6 +38,14 @@ public function make(array $option = []): Reporter
throw new RuntimeException('Unsupported reporter.');
}

if ($class === \Zipkin\Reporters\Http::class) {
$constructor['requesterFactory'] = $this->container->get(HttpClientFactory::class);
}

if ($this->container->has(StdoutLoggerInterface::class)) {
$constructor['logger'] = $this->container->get(StdoutLoggerInterface::class);
}

return make($class, $constructor);
}
}

0 comments on commit d9f7bb8

Please sign in to comment.