From b599685830456dad777741f4844c350147e21c68 Mon Sep 17 00:00:00 2001 From: Deeka Wong Date: Wed, 30 Aug 2023 01:22:20 -0500 Subject: [PATCH 1/2] Optimize the HttpClientFactory of tracer (#6100) --- src/Adapter/Reporter/HttpClientFactory.php | 116 +++++++++++++++----- src/Adapter/Reporter/KafkaClientFactory.php | 3 +- 2 files changed, 93 insertions(+), 26 deletions(-) diff --git a/src/Adapter/Reporter/HttpClientFactory.php b/src/Adapter/Reporter/HttpClientFactory.php index 0b03bad..e5f70d7 100644 --- a/src/Adapter/Reporter/HttpClientFactory.php +++ b/src/Adapter/Reporter/HttpClientFactory.php @@ -11,41 +11,107 @@ */ namespace Hyperf\Tracer\Adapter\Reporter; -use Hyperf\Guzzle\ClientFactory as GuzzleClientFactory; +use Closure; +use Hyperf\Contract\StdoutLoggerInterface; +use Hyperf\Coordinator\Constants; +use Hyperf\Coordinator\CoordinatorManager; +use Hyperf\Engine\Channel; +use Hyperf\Engine\Coroutine; +use Hyperf\Guzzle\ClientFactory; use RuntimeException; -use Zipkin\Reporters\Http\ClientFactory; +use Throwable; +use Zipkin\Reporters\Http\ClientFactory as ClientFactoryInterface; -class HttpClientFactory implements ClientFactory +class HttpClientFactory implements ClientFactoryInterface { - public function __construct(private GuzzleClientFactory $guzzleClientFactory) + protected ?Channel $chan = null; + + protected int $channelSize = 65535; + + public function __construct(private ClientFactory $clientFactory, protected StdoutLoggerInterface $logger) { } public function build(array $options): callable { + $this->loop(); + return function (string $payload) use ($options): void { - $url = $options['endpoint_url']; - unset($options['endpoint_url']); - $client = $this->guzzleClientFactory->create($options); - $additionalHeaders = $options['headers'] ?? []; - $requiredHeaders = [ - 'Content-Type' => 'application/json', - 'Content-Length' => strlen($payload), - 'b3' => '0', - ]; - $headers = array_merge($additionalHeaders, $requiredHeaders); - $response = $client->post($url, [ - 'body' => $payload, - 'headers' => $headers, - // If 'no_aspect' option is true, then the HttpClientAspect will not modify the client options. - 'no_aspect' => true, - ]); - $statusCode = $response->getStatusCode(); - if ($statusCode !== 202) { - throw new RuntimeException( - sprintf('Reporting of spans failed, status code %d', $statusCode) - ); + $chan = $this->chan; + $clientFactory = $this->clientFactory; + + $chan->push(static function () use ($payload, $options, $clientFactory) { + $url = $options['endpoint_url']; + unset($options['endpoint_url']); + $client = $clientFactory->create($options); + $additionalHeaders = $options['headers'] ?? []; + $requiredHeaders = [ + 'Content-Type' => 'application/json', + 'Content-Length' => strlen($payload), + 'b3' => '0', + ]; + $headers = array_merge($additionalHeaders, $requiredHeaders); + $response = $client->post($url, [ + 'body' => $payload, + 'headers' => $headers, + // If 'no_aspect' option is true, then the HttpClientAspect will not modify the client options. + 'no_aspect' => true, + ]); + $statusCode = $response->getStatusCode(); + if ($statusCode !== 202) { + throw new RuntimeException( + sprintf('Reporting of spans failed, status code %d', $statusCode) + ); + } + }); + + if ($chan->isClosing()) { + throw new RuntimeException('Connection closed.'); } }; } + + public function close(): void + { + $chan = $this->chan; + $this->chan = null; + + $chan?->close(); + } + + protected function loop(): void + { + if ($this->chan != null) { + return; + } + + $this->chan = new Channel($this->channelSize); + + Coroutine::create(function () { + while (true) { + while (true) { + /** @var null|Closure $closure */ + $closure = $this->chan?->pop(); + if (! $closure) { + break 2; + } + try { + $closure(); + } catch (Throwable) { + break; + } finally { + $closure = null; + } + } + } + + $this->close(); + }); + + Coroutine::create(function () { + if (CoordinatorManager::until(Constants::WORKER_EXIT)->yield()) { + $this->close(); + } + }); + } } diff --git a/src/Adapter/Reporter/KafkaClientFactory.php b/src/Adapter/Reporter/KafkaClientFactory.php index bc61579..344080b 100644 --- a/src/Adapter/Reporter/KafkaClientFactory.php +++ b/src/Adapter/Reporter/KafkaClientFactory.php @@ -20,8 +20,9 @@ use longlang\phpkafka\Producer\Producer; use longlang\phpkafka\Producer\ProducerConfig; use Throwable; +use Zipkin\Reporters\Http\ClientFactory; -class KafkaClientFactory +class KafkaClientFactory implements ClientFactory { protected ?Channel $chan = null; From f8e7986b11369212b9d958e8b636ec83007d8ffa Mon Sep 17 00:00:00 2001 From: Deeka Wong Date: Thu, 31 Aug 2023 00:56:23 -0500 Subject: [PATCH 2/2] Allowed output log when an exception occurs (#6111) --- src/Adapter/Reporter/HttpClientFactory.php | 5 ++--- src/Adapter/Reporter/ReporterFactory.php | 16 +++++++++++----- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/Adapter/Reporter/HttpClientFactory.php b/src/Adapter/Reporter/HttpClientFactory.php index e5f70d7..605cabc 100644 --- a/src/Adapter/Reporter/HttpClientFactory.php +++ b/src/Adapter/Reporter/HttpClientFactory.php @@ -12,7 +12,6 @@ namespace Hyperf\Tracer\Adapter\Reporter; use Closure; -use Hyperf\Contract\StdoutLoggerInterface; use Hyperf\Coordinator\Constants; use Hyperf\Coordinator\CoordinatorManager; use Hyperf\Engine\Channel; @@ -28,7 +27,7 @@ class HttpClientFactory implements ClientFactoryInterface protected int $channelSize = 65535; - public function __construct(private ClientFactory $clientFactory, protected StdoutLoggerInterface $logger) + public function __construct(private ClientFactory $clientFactory) { } @@ -58,7 +57,7 @@ public function build(array $options): callable 'no_aspect' => true, ]); $statusCode = $response->getStatusCode(); - if ($statusCode !== 202) { + if (! in_array($statusCode, [200, 202])) { throw new RuntimeException( sprintf('Reporting of spans failed, status code %d', $statusCode) ); diff --git a/src/Adapter/Reporter/ReporterFactory.php b/src/Adapter/Reporter/ReporterFactory.php index 1a830fa..ec8b88c 100644 --- a/src/Adapter/Reporter/ReporterFactory.php +++ b/src/Adapter/Reporter/ReporterFactory.php @@ -11,6 +11,8 @@ */ namespace Hyperf\Tracer\Adapter\Reporter; +use Hyperf\Contract\StdoutLoggerInterface; +use Psr\Container\ContainerInterface; use RuntimeException; use Zipkin\Reporter; @@ -19,7 +21,7 @@ class ReporterFactory { public function __construct( - private HttpClientFactory $httpClientFactory, + private ContainerInterface $container ) { } @@ -28,10 +30,6 @@ public function make(array $option = []): Reporter $class = $option['class'] ?? ''; $constructor = $option['constructor'] ?? []; - if ($class === \Zipkin\Reporters\Http::class) { - $option['constructor']['requesterFactory'] = $this->httpClientFactory; - } - if (! class_exists($class)) { throw new RuntimeException(sprintf('Class %s is not exists.', $class)); } @@ -40,6 +38,14 @@ public function make(array $option = []): Reporter throw new RuntimeException('Unsupported reporter.'); } + if ($class === \Zipkin\Reporters\Http::class) { + $constructor['requesterFactory'] = $this->container->get(HttpClientFactory::class); + } + + if ($this->container->has(StdoutLoggerInterface::class)) { + $constructor['logger'] = $this->container->get(StdoutLoggerInterface::class); + } + return make($class, $constructor); } }