From bdc7deb07d3e86e9f7281e553aa1adb5264353d0 Mon Sep 17 00:00:00 2001 From: Deeka Wong Date: Tue, 22 Aug 2023 22:41:14 -0500 Subject: [PATCH 1/2] Added `Kafka` reporter for zipkin (#6069) --- composer.json | 1 + publish/opentracing.php | 31 +++++++- src/Adapter/Reporter/Kafka.php | 92 ++++++++++++++++++++++++ src/Adapter/Reporter/ReporterFactory.php | 36 ++++++++++ src/Adapter/ZipkinTracerFactory.php | 18 +++-- tests/TracerFactoryTest.php | 5 +- 6 files changed, 172 insertions(+), 11 deletions(-) create mode 100644 src/Adapter/Reporter/Kafka.php create mode 100644 src/Adapter/Reporter/ReporterFactory.php diff --git a/composer.json b/composer.json index 33d6b2b..11084da 100644 --- a/composer.json +++ b/composer.json @@ -28,6 +28,7 @@ }, "suggest": { "hyperf/event": "Required to use DbQueryExecutedListener.", + "longlang/phpkafka": "Required (^1.2) to use Kafka Producer.", "jonahgeorge/jaeger-client-php": "Required (^0.6) to use jaeger tracing." }, "autoload": { diff --git a/publish/opentracing.php b/publish/opentracing.php index 359c26c..05a9027 100644 --- a/publish/opentracing.php +++ b/publish/opentracing.php @@ -32,9 +32,34 @@ 'ipv6' => null, 'port' => 9501, ], - 'options' => [ - 'endpoint_url' => env('ZIPKIN_ENDPOINT_URL', 'http://localhost:9411/api/v2/spans'), - 'timeout' => env('ZIPKIN_TIMEOUT', 1), + 'reporter' => env('ZIPKIN_REPORTER', 'http'), // kafka, http + 'reporters' => [ + // options for http reporter + 'http' => [ + 'class' => \Zipkin\Reporters\Http::class, + 'constructor' => [ + 'options' => [ + 'endpoint_url' => env('ZIPKIN_ENDPOINT_URL', 'http://localhost:9411/api/v2/spans'), + 'timeout' => env('ZIPKIN_TIMEOUT', 1), + ], + ], + ], + // options for kafka reporter + 'kafka' => [ + 'class' => \Hyperf\Tracer\Adapter\Reporter\Kafka::class, + 'constructor' => [ + 'options' => [ + 'topic' => env('ZIPKIN_KAFKA_TOPIC', 'zipkin'), + 'bootstrap_servers' => env('ZIPKIN_KAFKA_BOOTSTRAP_SERVERS', '127.0.0.1:9092'), + 'acks' => (int) env('ZIPKIN_KAFKA_ACKS', -1), + 'connect_timeout' => (int) env('ZIPKIN_KAFKA_CONNECT_TIMEOUT', 1), + 'send_timeout' => (int) env('ZIPKIN_KAFKA_SEND_TIMEOUT', 1), + ], + ], + ], + 'noop' => [ + 'class' => \Zipkin\Reporters\Noop::class, + ], ], 'sampler' => BinarySampler::createAsAlwaysSample(), ], diff --git a/src/Adapter/Reporter/Kafka.php b/src/Adapter/Reporter/Kafka.php new file mode 100644 index 0000000..fd9f12f --- /dev/null +++ b/src/Adapter/Reporter/Kafka.php @@ -0,0 +1,92 @@ +topic = $options['topic'] ?? 'zipkin'; + $this->serializer = $serializer ?? new JsonV2Serializer(); + $this->logger = $logger ?? new NullLogger(); + $this->producer = $producer ?? $this->createProducer($options); + } + + /** + * @param array|Span[] $spans + */ + public function report(array $spans): void + { + if (empty($spans)) { + return; + } + + try { + $this->producer->send( + $this->topic, + $this->serializer->serialize($spans), + uniqid('', true) + ); + } catch (Throwable $e) { + $this->logger->error(sprintf('failed to report spans: %s', $e->getMessage())); + } + } + + private function createProducer(array $options): Producer + { + $options = array_replace([ + 'bootstrap_servers' => '127.0.0.1:9092', + 'acks' => -1, + 'connect_timeout' => 1, + 'send_timeout' => 1, + ], $options); + $config = new ProducerConfig(); + + $config->setBootstrapServer($options['bootstrap_servers']); + $config->setUpdateBrokers(true); + if (is_int($options['acks'])) { + $config->setAcks($options['acks']); + } + if (is_float($options['connect_timeout'])) { + $config->setConnectTimeout($options['connect_timeout']); + } + if (is_float($options['send_timeout'])) { + $config->setSendTimeout($options['send_timeout']); + } + + return new Producer($config); + } +} diff --git a/src/Adapter/Reporter/ReporterFactory.php b/src/Adapter/Reporter/ReporterFactory.php new file mode 100644 index 0000000..a40b985 --- /dev/null +++ b/src/Adapter/Reporter/ReporterFactory.php @@ -0,0 +1,36 @@ +name = $name; - [$app, $options, $sampler] = $this->parseConfig(); + [$app, $sampler, $reporterOption] = $this->parseConfig(); $endpoint = Endpoint::create($app['name'], $app['ipv4'], $app['ipv6'], $app['port']); - $reporter = new Http($options, $this->clientFactory); + $reporter = $this->reportFactory->make($reporterOption); $tracing = TracingBuilder::create() ->havingLocalEndpoint($endpoint) ->havingSampler($sampler) @@ -46,6 +46,7 @@ public function make(string $name): \OpenTracing\Tracer private function parseConfig(): array { // @TODO Detect the ipv4, ipv6, port from server object or system info automatically. + $reporter = (string) $this->getConfig('reporter', 'http'); return [ $this->getConfig('app', [ 'name' => 'skeleton', @@ -53,10 +54,13 @@ private function parseConfig(): array 'ipv6' => null, 'port' => 9501, ]), - $this->getConfig('options', [ - 'timeout' => 1, - ]), $this->getConfig('sampler', BinarySampler::createAsAlwaysSample()), + $this->getConfig('reporters.' . $reporter, [ + 'class' => \Zipkin\Reporters\Http::class, + 'constructor' => [ + 'options' => $this->getConfig('options', []), + ], + ]), ]; } diff --git a/tests/TracerFactoryTest.php b/tests/TracerFactoryTest.php index 7df87b1..876f91d 100644 --- a/tests/TracerFactoryTest.php +++ b/tests/TracerFactoryTest.php @@ -134,10 +134,13 @@ protected function getContainer($config) { $container = Mockery::mock(Container::class); $client = Mockery::mock(\Hyperf\Tracer\Adapter\HttpClientFactory::class); + $reporter = Mockery::mock(\Hyperf\Tracer\Adapter\Reporter\ReporterFactory::class); + $reporter->shouldReceive('make') + ->andReturn(new \Zipkin\Reporters\Http([], $client)); $container->shouldReceive('get') ->with(\Hyperf\Tracer\Adapter\ZipkinTracerFactory::class) - ->andReturn(new \Hyperf\Tracer\Adapter\ZipkinTracerFactory($config, $client)); + ->andReturn(new \Hyperf\Tracer\Adapter\ZipkinTracerFactory($config, $reporter)); $container->shouldReceive('get') ->with(\Hyperf\Tracer\Adapter\JaegerTracerFactory::class) ->andReturn(new \Hyperf\Tracer\Adapter\JaegerTracerFactory($config)); From 2fbef185bde419776ca05c0cf2a77160b95b0452 Mon Sep 17 00:00:00 2001 From: Deeka Wong Date: Wed, 23 Aug 2023 01:03:59 -0500 Subject: [PATCH 2/2] Optimize kafka reporter (#6075) --- src/Adapter/HttpClientFactory.php | 40 ++------------ src/Adapter/Reporter/HttpClientFactory.php | 51 ++++++++++++++++++ src/Adapter/Reporter/Kafka.php | 56 ++++++-------------- src/Adapter/Reporter/KafkaClientFactory.php | 58 +++++++++++++++++++++ src/Adapter/Reporter/ReporterFactory.php | 9 ++++ tests/TracerFactoryTest.php | 2 +- 6 files changed, 139 insertions(+), 77 deletions(-) create mode 100644 src/Adapter/Reporter/HttpClientFactory.php create mode 100644 src/Adapter/Reporter/KafkaClientFactory.php diff --git a/src/Adapter/HttpClientFactory.php b/src/Adapter/HttpClientFactory.php index 598aeef..0eb6d4e 100644 --- a/src/Adapter/HttpClientFactory.php +++ b/src/Adapter/HttpClientFactory.php @@ -11,41 +11,9 @@ */ namespace Hyperf\Tracer\Adapter; -use Hyperf\Guzzle\ClientFactory as GuzzleClientFactory; -use RuntimeException; -use Zipkin\Reporters\Http\ClientFactory; - -class HttpClientFactory implements ClientFactory +/** + * @deprecated v3.0, will remove in v3.1, use \Hyperf\Tracer\Adapter\Reporter\HttpClientFactory instead. + */ +class HttpClientFactory extends Reporter\HttpClientFactory { - public function __construct(private GuzzleClientFactory $guzzleClientFactory) - { - } - - public function build(array $options): callable - { - return function (string $payload) use ($options): void { - $url = $options['endpoint_url']; - unset($options['endpoint_url']); - $client = $this->guzzleClientFactory->create($options); - $additionalHeaders = $options['headers'] ?? []; - $requiredHeaders = [ - 'Content-Type' => 'application/json', - 'Content-Length' => strlen($payload), - 'b3' => '0', - ]; - $headers = array_merge($additionalHeaders, $requiredHeaders); - $response = $client->post($url, [ - 'body' => $payload, - 'headers' => $headers, - // If 'no_aspect' option is true, then the HttpClientAspect will not modify the client options. - 'no_aspect' => true, - ]); - $statusCode = $response->getStatusCode(); - if ($statusCode !== 202) { - throw new RuntimeException( - sprintf('Reporting of spans failed, status code %d', $statusCode) - ); - } - }; - } } diff --git a/src/Adapter/Reporter/HttpClientFactory.php b/src/Adapter/Reporter/HttpClientFactory.php new file mode 100644 index 0000000..0b03bad --- /dev/null +++ b/src/Adapter/Reporter/HttpClientFactory.php @@ -0,0 +1,51 @@ +guzzleClientFactory->create($options); + $additionalHeaders = $options['headers'] ?? []; + $requiredHeaders = [ + 'Content-Type' => 'application/json', + 'Content-Length' => strlen($payload), + 'b3' => '0', + ]; + $headers = array_merge($additionalHeaders, $requiredHeaders); + $response = $client->post($url, [ + 'body' => $payload, + 'headers' => $headers, + // If 'no_aspect' option is true, then the HttpClientAspect will not modify the client options. + 'no_aspect' => true, + ]); + $statusCode = $response->getStatusCode(); + if ($statusCode !== 202) { + throw new RuntimeException( + sprintf('Reporting of spans failed, status code %d', $statusCode) + ); + } + }; + } +} diff --git a/src/Adapter/Reporter/Kafka.php b/src/Adapter/Reporter/Kafka.php index fd9f12f..c49e838 100644 --- a/src/Adapter/Reporter/Kafka.php +++ b/src/Adapter/Reporter/Kafka.php @@ -11,8 +11,6 @@ */ namespace Hyperf\Tracer\Adapter\Reporter; -use longlang\phpkafka\Producer\Producer; -use longlang\phpkafka\Producer\ProducerConfig; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; use Throwable; @@ -21,28 +19,24 @@ use Zipkin\Reporters\JsonV2Serializer; use Zipkin\Reporters\SpanSerializer; +use function count; +use function json_last_error; use function sprintf; class Kafka implements Reporter { - private Producer $producer; - - private string $topic; - private LoggerInterface $logger; private SpanSerializer $serializer; public function __construct( - array $options = [], - Producer $producer = null, + private array $options, + private KafkaClientFactory $clientFactory, LoggerInterface $logger = null, SpanSerializer $serializer = null ) { - $this->topic = $options['topic'] ?? 'zipkin'; $this->serializer = $serializer ?? new JsonV2Serializer(); $this->logger = $logger ?? new NullLogger(); - $this->producer = $producer ?? $this->createProducer($options); } /** @@ -50,43 +44,25 @@ public function __construct( */ public function report(array $spans): void { - if (empty($spans)) { + if (count($spans) === 0) { return; } - try { - $this->producer->send( - $this->topic, - $this->serializer->serialize($spans), - uniqid('', true) + $payload = $this->serializer->serialize($spans); + + if (! $payload) { + $this->logger->error( + sprintf('failed to encode spans with code %d', json_last_error()) ); - } catch (Throwable $e) { - $this->logger->error(sprintf('failed to report spans: %s', $e->getMessage())); + return; } - } - private function createProducer(array $options): Producer - { - $options = array_replace([ - 'bootstrap_servers' => '127.0.0.1:9092', - 'acks' => -1, - 'connect_timeout' => 1, - 'send_timeout' => 1, - ], $options); - $config = new ProducerConfig(); + $client = $this->clientFactory->build($this->options); - $config->setBootstrapServer($options['bootstrap_servers']); - $config->setUpdateBrokers(true); - if (is_int($options['acks'])) { - $config->setAcks($options['acks']); - } - if (is_float($options['connect_timeout'])) { - $config->setConnectTimeout($options['connect_timeout']); - } - if (is_float($options['send_timeout'])) { - $config->setSendTimeout($options['send_timeout']); + try { + $client($payload); + } catch (Throwable $e) { + $this->logger->error(sprintf('failed to report spans: %s', $e->getMessage())); } - - return new Producer($config); } } diff --git a/src/Adapter/Reporter/KafkaClientFactory.php b/src/Adapter/Reporter/KafkaClientFactory.php new file mode 100644 index 0000000..4aedd04 --- /dev/null +++ b/src/Adapter/Reporter/KafkaClientFactory.php @@ -0,0 +1,58 @@ +producer ??= $this->createProducer($options); + + return function (string $payload) use ($options): void { + $this->producer->send( + $options['topic'] ?? 'zipkin', + $payload, + uniqid('', true) + ); + }; + } + + private function createProducer(array $options): Producer + { + $options = array_replace([ + 'bootstrap_servers' => '127.0.0.1:9092', + 'acks' => -1, + 'connect_timeout' => 1, + 'send_timeout' => 1, + ], $options); + $config = new ProducerConfig(); + + $config->setBootstrapServer($options['bootstrap_servers']); + $config->setUpdateBrokers(true); + if (is_int($options['acks'])) { + $config->setAcks($options['acks']); + } + if (is_float($options['connect_timeout'])) { + $config->setConnectTimeout($options['connect_timeout']); + } + if (is_float($options['send_timeout'])) { + $config->setSendTimeout($options['send_timeout']); + } + + return new Producer($config); + } +} diff --git a/src/Adapter/Reporter/ReporterFactory.php b/src/Adapter/Reporter/ReporterFactory.php index a40b985..1a830fa 100644 --- a/src/Adapter/Reporter/ReporterFactory.php +++ b/src/Adapter/Reporter/ReporterFactory.php @@ -18,11 +18,20 @@ class ReporterFactory { + public function __construct( + private HttpClientFactory $httpClientFactory, + ) { + } + public function make(array $option = []): Reporter { $class = $option['class'] ?? ''; $constructor = $option['constructor'] ?? []; + if ($class === \Zipkin\Reporters\Http::class) { + $option['constructor']['requesterFactory'] = $this->httpClientFactory; + } + if (! class_exists($class)) { throw new RuntimeException(sprintf('Class %s is not exists.', $class)); } diff --git a/tests/TracerFactoryTest.php b/tests/TracerFactoryTest.php index 876f91d..25e378a 100644 --- a/tests/TracerFactoryTest.php +++ b/tests/TracerFactoryTest.php @@ -133,7 +133,7 @@ public function testJaegerFactory() protected function getContainer($config) { $container = Mockery::mock(Container::class); - $client = Mockery::mock(\Hyperf\Tracer\Adapter\HttpClientFactory::class); + $client = Mockery::mock(\Hyperf\Tracer\Adapter\Reporter\HttpClientFactory::class); $reporter = Mockery::mock(\Hyperf\Tracer\Adapter\Reporter\ReporterFactory::class); $reporter->shouldReceive('make') ->andReturn(new \Zipkin\Reporters\Http([], $client));