diff --git a/composer.json b/composer.json index a4384ed..3dc2366 100644 --- a/composer.json +++ b/composer.json @@ -28,6 +28,7 @@ }, "suggest": { "hyperf/event": "Required to use DbQueryExecutedListener.", + "longlang/phpkafka": "Required (^1.2) to use Kafka Producer.", "jonahgeorge/jaeger-client-php": "Required (^0.6) to use jaeger tracing." }, "autoload": { diff --git a/publish/opentracing.php b/publish/opentracing.php index 359c26c..05a9027 100644 --- a/publish/opentracing.php +++ b/publish/opentracing.php @@ -32,9 +32,34 @@ 'ipv6' => null, 'port' => 9501, ], - 'options' => [ - 'endpoint_url' => env('ZIPKIN_ENDPOINT_URL', 'http://localhost:9411/api/v2/spans'), - 'timeout' => env('ZIPKIN_TIMEOUT', 1), + 'reporter' => env('ZIPKIN_REPORTER', 'http'), // kafka, http + 'reporters' => [ + // options for http reporter + 'http' => [ + 'class' => \Zipkin\Reporters\Http::class, + 'constructor' => [ + 'options' => [ + 'endpoint_url' => env('ZIPKIN_ENDPOINT_URL', 'http://localhost:9411/api/v2/spans'), + 'timeout' => env('ZIPKIN_TIMEOUT', 1), + ], + ], + ], + // options for kafka reporter + 'kafka' => [ + 'class' => \Hyperf\Tracer\Adapter\Reporter\Kafka::class, + 'constructor' => [ + 'options' => [ + 'topic' => env('ZIPKIN_KAFKA_TOPIC', 'zipkin'), + 'bootstrap_servers' => env('ZIPKIN_KAFKA_BOOTSTRAP_SERVERS', '127.0.0.1:9092'), + 'acks' => (int) env('ZIPKIN_KAFKA_ACKS', -1), + 'connect_timeout' => (int) env('ZIPKIN_KAFKA_CONNECT_TIMEOUT', 1), + 'send_timeout' => (int) env('ZIPKIN_KAFKA_SEND_TIMEOUT', 1), + ], + ], + ], + 'noop' => [ + 'class' => \Zipkin\Reporters\Noop::class, + ], ], 'sampler' => BinarySampler::createAsAlwaysSample(), ], diff --git a/src/Adapter/HttpClientFactory.php b/src/Adapter/HttpClientFactory.php index 598aeef..0eb6d4e 100644 --- a/src/Adapter/HttpClientFactory.php +++ b/src/Adapter/HttpClientFactory.php @@ -11,41 +11,9 @@ */ namespace Hyperf\Tracer\Adapter; -use Hyperf\Guzzle\ClientFactory as GuzzleClientFactory; -use RuntimeException; -use Zipkin\Reporters\Http\ClientFactory; - -class HttpClientFactory implements ClientFactory +/** + * @deprecated v3.0, will remove in v3.1, use \Hyperf\Tracer\Adapter\Reporter\HttpClientFactory instead. + */ +class HttpClientFactory extends Reporter\HttpClientFactory { - public function __construct(private GuzzleClientFactory $guzzleClientFactory) - { - } - - public function build(array $options): callable - { - return function (string $payload) use ($options): void { - $url = $options['endpoint_url']; - unset($options['endpoint_url']); - $client = $this->guzzleClientFactory->create($options); - $additionalHeaders = $options['headers'] ?? []; - $requiredHeaders = [ - 'Content-Type' => 'application/json', - 'Content-Length' => strlen($payload), - 'b3' => '0', - ]; - $headers = array_merge($additionalHeaders, $requiredHeaders); - $response = $client->post($url, [ - 'body' => $payload, - 'headers' => $headers, - // If 'no_aspect' option is true, then the HttpClientAspect will not modify the client options. - 'no_aspect' => true, - ]); - $statusCode = $response->getStatusCode(); - if ($statusCode !== 202) { - throw new RuntimeException( - sprintf('Reporting of spans failed, status code %d', $statusCode) - ); - } - }; - } } diff --git a/src/Adapter/Reporter/HttpClientFactory.php b/src/Adapter/Reporter/HttpClientFactory.php new file mode 100644 index 0000000..0b03bad --- /dev/null +++ b/src/Adapter/Reporter/HttpClientFactory.php @@ -0,0 +1,51 @@ +guzzleClientFactory->create($options); + $additionalHeaders = $options['headers'] ?? []; + $requiredHeaders = [ + 'Content-Type' => 'application/json', + 'Content-Length' => strlen($payload), + 'b3' => '0', + ]; + $headers = array_merge($additionalHeaders, $requiredHeaders); + $response = $client->post($url, [ + 'body' => $payload, + 'headers' => $headers, + // If 'no_aspect' option is true, then the HttpClientAspect will not modify the client options. + 'no_aspect' => true, + ]); + $statusCode = $response->getStatusCode(); + if ($statusCode !== 202) { + throw new RuntimeException( + sprintf('Reporting of spans failed, status code %d', $statusCode) + ); + } + }; + } +} diff --git a/src/Adapter/Reporter/Kafka.php b/src/Adapter/Reporter/Kafka.php new file mode 100644 index 0000000..c49e838 --- /dev/null +++ b/src/Adapter/Reporter/Kafka.php @@ -0,0 +1,68 @@ +serializer = $serializer ?? new JsonV2Serializer(); + $this->logger = $logger ?? new NullLogger(); + } + + /** + * @param array|Span[] $spans + */ + public function report(array $spans): void + { + if (count($spans) === 0) { + return; + } + + $payload = $this->serializer->serialize($spans); + + if (! $payload) { + $this->logger->error( + sprintf('failed to encode spans with code %d', json_last_error()) + ); + return; + } + + $client = $this->clientFactory->build($this->options); + + try { + $client($payload); + } catch (Throwable $e) { + $this->logger->error(sprintf('failed to report spans: %s', $e->getMessage())); + } + } +} diff --git a/src/Adapter/Reporter/KafkaClientFactory.php b/src/Adapter/Reporter/KafkaClientFactory.php new file mode 100644 index 0000000..4aedd04 --- /dev/null +++ b/src/Adapter/Reporter/KafkaClientFactory.php @@ -0,0 +1,58 @@ +producer ??= $this->createProducer($options); + + return function (string $payload) use ($options): void { + $this->producer->send( + $options['topic'] ?? 'zipkin', + $payload, + uniqid('', true) + ); + }; + } + + private function createProducer(array $options): Producer + { + $options = array_replace([ + 'bootstrap_servers' => '127.0.0.1:9092', + 'acks' => -1, + 'connect_timeout' => 1, + 'send_timeout' => 1, + ], $options); + $config = new ProducerConfig(); + + $config->setBootstrapServer($options['bootstrap_servers']); + $config->setUpdateBrokers(true); + if (is_int($options['acks'])) { + $config->setAcks($options['acks']); + } + if (is_float($options['connect_timeout'])) { + $config->setConnectTimeout($options['connect_timeout']); + } + if (is_float($options['send_timeout'])) { + $config->setSendTimeout($options['send_timeout']); + } + + return new Producer($config); + } +} diff --git a/src/Adapter/Reporter/ReporterFactory.php b/src/Adapter/Reporter/ReporterFactory.php new file mode 100644 index 0000000..1a830fa --- /dev/null +++ b/src/Adapter/Reporter/ReporterFactory.php @@ -0,0 +1,45 @@ +httpClientFactory; + } + + if (! class_exists($class)) { + throw new RuntimeException(sprintf('Class %s is not exists.', $class)); + } + + if (! is_a($class, Reporter::class, true)) { + throw new RuntimeException('Unsupported reporter.'); + } + + return make($class, $constructor); + } +} diff --git a/src/Adapter/ZipkinTracerFactory.php b/src/Adapter/ZipkinTracerFactory.php index 9381706..9f66e58 100644 --- a/src/Adapter/ZipkinTracerFactory.php +++ b/src/Adapter/ZipkinTracerFactory.php @@ -12,9 +12,9 @@ namespace Hyperf\Tracer\Adapter; use Hyperf\Contract\ConfigInterface; +use Hyperf\Tracer\Adapter\Reporter\ReporterFactory; use Hyperf\Tracer\Contract\NamedFactoryInterface; use Zipkin\Endpoint; -use Zipkin\Reporters\Http; use Zipkin\Samplers\BinarySampler; use Zipkin\TracingBuilder; use ZipkinOpenTracing\Tracer; @@ -25,16 +25,16 @@ class ZipkinTracerFactory implements NamedFactoryInterface private string $name = ''; - public function __construct(private ConfigInterface $config, private HttpClientFactory $clientFactory) + public function __construct(private ConfigInterface $config, private ReporterFactory $reportFactory) { } public function make(string $name): \OpenTracing\Tracer { $this->name = $name; - [$app, $options, $sampler] = $this->parseConfig(); + [$app, $sampler, $reporterOption] = $this->parseConfig(); $endpoint = Endpoint::create($app['name'], $app['ipv4'], $app['ipv6'], $app['port']); - $reporter = new Http($options, $this->clientFactory); + $reporter = $this->reportFactory->make($reporterOption); $tracing = TracingBuilder::create() ->havingLocalEndpoint($endpoint) ->havingSampler($sampler) @@ -46,6 +46,7 @@ public function make(string $name): \OpenTracing\Tracer private function parseConfig(): array { // @TODO Detect the ipv4, ipv6, port from server object or system info automatically. + $reporter = (string) $this->getConfig('reporter', 'http'); return [ $this->getConfig('app', [ 'name' => 'skeleton', @@ -53,10 +54,13 @@ private function parseConfig(): array 'ipv6' => null, 'port' => 9501, ]), - $this->getConfig('options', [ - 'timeout' => 1, - ]), $this->getConfig('sampler', BinarySampler::createAsAlwaysSample()), + $this->getConfig('reporters.' . $reporter, [ + 'class' => \Zipkin\Reporters\Http::class, + 'constructor' => [ + 'options' => $this->getConfig('options', []), + ], + ]), ]; } diff --git a/tests/TracerFactoryTest.php b/tests/TracerFactoryTest.php index 1b1e2e4..f9e226c 100644 --- a/tests/TracerFactoryTest.php +++ b/tests/TracerFactoryTest.php @@ -135,11 +135,14 @@ public function testJaegerFactory() protected function getContainer($config) { $container = Mockery::mock(Container::class); - $client = Mockery::mock(\Hyperf\Tracer\Adapter\HttpClientFactory::class); + $client = Mockery::mock(\Hyperf\Tracer\Adapter\Reporter\HttpClientFactory::class); + $reporter = Mockery::mock(\Hyperf\Tracer\Adapter\Reporter\ReporterFactory::class); + $reporter->shouldReceive('make') + ->andReturn(new \Zipkin\Reporters\Http([], $client)); $container->shouldReceive('get') ->with(\Hyperf\Tracer\Adapter\ZipkinTracerFactory::class) - ->andReturn(new \Hyperf\Tracer\Adapter\ZipkinTracerFactory($config, $client)); + ->andReturn(new \Hyperf\Tracer\Adapter\ZipkinTracerFactory($config, $reporter)); $container->shouldReceive('get') ->with(\Hyperf\Tracer\Adapter\JaegerTracerFactory::class) ->andReturn(new \Hyperf\Tracer\Adapter\JaegerTracerFactory($config));