From 2fbef185bde419776ca05c0cf2a77160b95b0452 Mon Sep 17 00:00:00 2001 From: Deeka Wong Date: Wed, 23 Aug 2023 01:03:59 -0500 Subject: [PATCH] Optimize kafka reporter (#6075) --- src/Adapter/HttpClientFactory.php | 40 ++------------ src/Adapter/Reporter/HttpClientFactory.php | 51 ++++++++++++++++++ src/Adapter/Reporter/Kafka.php | 56 ++++++-------------- src/Adapter/Reporter/KafkaClientFactory.php | 58 +++++++++++++++++++++ src/Adapter/Reporter/ReporterFactory.php | 9 ++++ tests/TracerFactoryTest.php | 2 +- 6 files changed, 139 insertions(+), 77 deletions(-) create mode 100644 src/Adapter/Reporter/HttpClientFactory.php create mode 100644 src/Adapter/Reporter/KafkaClientFactory.php diff --git a/src/Adapter/HttpClientFactory.php b/src/Adapter/HttpClientFactory.php index 598aeef..0eb6d4e 100644 --- a/src/Adapter/HttpClientFactory.php +++ b/src/Adapter/HttpClientFactory.php @@ -11,41 +11,9 @@ */ namespace Hyperf\Tracer\Adapter; -use Hyperf\Guzzle\ClientFactory as GuzzleClientFactory; -use RuntimeException; -use Zipkin\Reporters\Http\ClientFactory; - -class HttpClientFactory implements ClientFactory +/** + * @deprecated v3.0, will remove in v3.1, use \Hyperf\Tracer\Adapter\Reporter\HttpClientFactory instead. + */ +class HttpClientFactory extends Reporter\HttpClientFactory { - public function __construct(private GuzzleClientFactory $guzzleClientFactory) - { - } - - public function build(array $options): callable - { - return function (string $payload) use ($options): void { - $url = $options['endpoint_url']; - unset($options['endpoint_url']); - $client = $this->guzzleClientFactory->create($options); - $additionalHeaders = $options['headers'] ?? []; - $requiredHeaders = [ - 'Content-Type' => 'application/json', - 'Content-Length' => strlen($payload), - 'b3' => '0', - ]; - $headers = array_merge($additionalHeaders, $requiredHeaders); - $response = $client->post($url, [ - 'body' => $payload, - 'headers' => $headers, - // If 'no_aspect' option is true, then the HttpClientAspect will not modify the client options. - 'no_aspect' => true, - ]); - $statusCode = $response->getStatusCode(); - if ($statusCode !== 202) { - throw new RuntimeException( - sprintf('Reporting of spans failed, status code %d', $statusCode) - ); - } - }; - } } diff --git a/src/Adapter/Reporter/HttpClientFactory.php b/src/Adapter/Reporter/HttpClientFactory.php new file mode 100644 index 0000000..0b03bad --- /dev/null +++ b/src/Adapter/Reporter/HttpClientFactory.php @@ -0,0 +1,51 @@ +guzzleClientFactory->create($options); + $additionalHeaders = $options['headers'] ?? []; + $requiredHeaders = [ + 'Content-Type' => 'application/json', + 'Content-Length' => strlen($payload), + 'b3' => '0', + ]; + $headers = array_merge($additionalHeaders, $requiredHeaders); + $response = $client->post($url, [ + 'body' => $payload, + 'headers' => $headers, + // If 'no_aspect' option is true, then the HttpClientAspect will not modify the client options. + 'no_aspect' => true, + ]); + $statusCode = $response->getStatusCode(); + if ($statusCode !== 202) { + throw new RuntimeException( + sprintf('Reporting of spans failed, status code %d', $statusCode) + ); + } + }; + } +} diff --git a/src/Adapter/Reporter/Kafka.php b/src/Adapter/Reporter/Kafka.php index fd9f12f..c49e838 100644 --- a/src/Adapter/Reporter/Kafka.php +++ b/src/Adapter/Reporter/Kafka.php @@ -11,8 +11,6 @@ */ namespace Hyperf\Tracer\Adapter\Reporter; -use longlang\phpkafka\Producer\Producer; -use longlang\phpkafka\Producer\ProducerConfig; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; use Throwable; @@ -21,28 +19,24 @@ use Zipkin\Reporters\JsonV2Serializer; use Zipkin\Reporters\SpanSerializer; +use function count; +use function json_last_error; use function sprintf; class Kafka implements Reporter { - private Producer $producer; - - private string $topic; - private LoggerInterface $logger; private SpanSerializer $serializer; public function __construct( - array $options = [], - Producer $producer = null, + private array $options, + private KafkaClientFactory $clientFactory, LoggerInterface $logger = null, SpanSerializer $serializer = null ) { - $this->topic = $options['topic'] ?? 'zipkin'; $this->serializer = $serializer ?? new JsonV2Serializer(); $this->logger = $logger ?? new NullLogger(); - $this->producer = $producer ?? $this->createProducer($options); } /** @@ -50,43 +44,25 @@ public function __construct( */ public function report(array $spans): void { - if (empty($spans)) { + if (count($spans) === 0) { return; } - try { - $this->producer->send( - $this->topic, - $this->serializer->serialize($spans), - uniqid('', true) + $payload = $this->serializer->serialize($spans); + + if (! $payload) { + $this->logger->error( + sprintf('failed to encode spans with code %d', json_last_error()) ); - } catch (Throwable $e) { - $this->logger->error(sprintf('failed to report spans: %s', $e->getMessage())); + return; } - } - private function createProducer(array $options): Producer - { - $options = array_replace([ - 'bootstrap_servers' => '127.0.0.1:9092', - 'acks' => -1, - 'connect_timeout' => 1, - 'send_timeout' => 1, - ], $options); - $config = new ProducerConfig(); + $client = $this->clientFactory->build($this->options); - $config->setBootstrapServer($options['bootstrap_servers']); - $config->setUpdateBrokers(true); - if (is_int($options['acks'])) { - $config->setAcks($options['acks']); - } - if (is_float($options['connect_timeout'])) { - $config->setConnectTimeout($options['connect_timeout']); - } - if (is_float($options['send_timeout'])) { - $config->setSendTimeout($options['send_timeout']); + try { + $client($payload); + } catch (Throwable $e) { + $this->logger->error(sprintf('failed to report spans: %s', $e->getMessage())); } - - return new Producer($config); } } diff --git a/src/Adapter/Reporter/KafkaClientFactory.php b/src/Adapter/Reporter/KafkaClientFactory.php new file mode 100644 index 0000000..4aedd04 --- /dev/null +++ b/src/Adapter/Reporter/KafkaClientFactory.php @@ -0,0 +1,58 @@ +producer ??= $this->createProducer($options); + + return function (string $payload) use ($options): void { + $this->producer->send( + $options['topic'] ?? 'zipkin', + $payload, + uniqid('', true) + ); + }; + } + + private function createProducer(array $options): Producer + { + $options = array_replace([ + 'bootstrap_servers' => '127.0.0.1:9092', + 'acks' => -1, + 'connect_timeout' => 1, + 'send_timeout' => 1, + ], $options); + $config = new ProducerConfig(); + + $config->setBootstrapServer($options['bootstrap_servers']); + $config->setUpdateBrokers(true); + if (is_int($options['acks'])) { + $config->setAcks($options['acks']); + } + if (is_float($options['connect_timeout'])) { + $config->setConnectTimeout($options['connect_timeout']); + } + if (is_float($options['send_timeout'])) { + $config->setSendTimeout($options['send_timeout']); + } + + return new Producer($config); + } +} diff --git a/src/Adapter/Reporter/ReporterFactory.php b/src/Adapter/Reporter/ReporterFactory.php index a40b985..1a830fa 100644 --- a/src/Adapter/Reporter/ReporterFactory.php +++ b/src/Adapter/Reporter/ReporterFactory.php @@ -18,11 +18,20 @@ class ReporterFactory { + public function __construct( + private HttpClientFactory $httpClientFactory, + ) { + } + public function make(array $option = []): Reporter { $class = $option['class'] ?? ''; $constructor = $option['constructor'] ?? []; + if ($class === \Zipkin\Reporters\Http::class) { + $option['constructor']['requesterFactory'] = $this->httpClientFactory; + } + if (! class_exists($class)) { throw new RuntimeException(sprintf('Class %s is not exists.', $class)); } diff --git a/tests/TracerFactoryTest.php b/tests/TracerFactoryTest.php index 876f91d..25e378a 100644 --- a/tests/TracerFactoryTest.php +++ b/tests/TracerFactoryTest.php @@ -133,7 +133,7 @@ public function testJaegerFactory() protected function getContainer($config) { $container = Mockery::mock(Container::class); - $client = Mockery::mock(\Hyperf\Tracer\Adapter\HttpClientFactory::class); + $client = Mockery::mock(\Hyperf\Tracer\Adapter\Reporter\HttpClientFactory::class); $reporter = Mockery::mock(\Hyperf\Tracer\Adapter\Reporter\ReporterFactory::class); $reporter->shouldReceive('make') ->andReturn(new \Zipkin\Reporters\Http([], $client));