From 6f3be28009b53f28d0329348bbea2e50d684ae01 Mon Sep 17 00:00:00 2001 From: crayxn Date: Mon, 28 Aug 2023 13:16:02 +0800 Subject: [PATCH 1/2] Fix error that using non-zipkin driver of tracer (#6097) Co-authored-by: Deeka Wong <8337659+huangdijia@users.noreply.github.com> --- class_map/GlobalTracer.php | 58 +++++++++++++++++++++++++ src/Aspect/CreateTraceContextAspect.php | 34 +++++++++++++++ src/ConfigProvider.php | 4 ++ src/Middleware/TraceMiddleware.php | 8 ++-- src/SpanStarter.php | 4 +- src/TracerContext.php | 12 +++++ 6 files changed, 112 insertions(+), 8 deletions(-) create mode 100644 class_map/GlobalTracer.php create mode 100644 src/Aspect/CreateTraceContextAspect.php diff --git a/class_map/GlobalTracer.php b/class_map/GlobalTracer.php new file mode 100644 index 0000000..a52c4b5 --- /dev/null +++ b/class_map/GlobalTracer.php @@ -0,0 +1,58 @@ +process(); + if ($traceContext instanceof TraceContext) { + TracerContext::setTraceId($traceContext->getTraceId()); + } + return $traceContext; + } +} diff --git a/src/ConfigProvider.php b/src/ConfigProvider.php index 8396377..9ebb00b 100644 --- a/src/ConfigProvider.php +++ b/src/ConfigProvider.php @@ -12,11 +12,13 @@ namespace Hyperf\Tracer; use GuzzleHttp\Client; +use Hyperf\Tracer\Aspect\CreateTraceContextAspect; use Hyperf\Tracer\Aspect\HttpClientAspect; use Hyperf\Tracer\Aspect\RedisAspect; use Hyperf\Tracer\Aspect\TraceAnnotationAspect; use Hyperf\Tracer\Listener\DbQueryExecutedListener; use Jaeger\ThriftUdpTransport; +use OpenTracing\GlobalTracer; use OpenTracing\Tracer; use Zipkin\Propagation\Map; @@ -37,12 +39,14 @@ public function __invoke(): array 'annotations' => [ 'scan' => [ 'class_map' => [ + GlobalTracer::class => __DIR__ . '/../class_map/GlobalTracer.php', Map::class => __DIR__ . '/../class_map/Map.php', ThriftUdpTransport::class => __DIR__ . '/../class_map/ThriftUdpTransport.php', ], ], ], 'aspects' => [ + CreateTraceContextAspect::class, HttpClientAspect::class, RedisAspect::class, TraceAnnotationAspect::class, diff --git a/src/Middleware/TraceMiddleware.php b/src/Middleware/TraceMiddleware.php index 5f13312..cec7258 100644 --- a/src/Middleware/TraceMiddleware.php +++ b/src/Middleware/TraceMiddleware.php @@ -53,11 +53,9 @@ public function process(ServerRequestInterface $request, RequestHandlerInterface }); try { $response = $handler->handle($request); - /** @var \ZipkinOpenTracing\SpanContext $spanContent */ - $spanContent = $span->getContext(); - /** @var \Zipkin\Propagation\TraceContext $traceContext */ - $traceContext = $spanContent->getContext(); - $response = $response->withHeader('Trace-Id', $traceContext->getTraceId()); + if ($traceId = TracerContext::getTraceId()) { + $response = $response->withHeader('Trace-Id', $traceId); + } $span->setTag($this->spanTagManager->get('response', 'status_code'), $response->getStatusCode()); } catch (Throwable $exception) { $this->switchManager->isEnable('exception') && $this->appendExceptionToSpan($span, $exception); diff --git a/src/SpanStarter.php b/src/SpanStarter.php index 8b3eb62..1d2e57b 100644 --- a/src/SpanStarter.php +++ b/src/SpanStarter.php @@ -45,9 +45,7 @@ protected function startSpan( TracerContext::setRoot($root); return $root; } - $carrier = array_map(function ($header) { - return $header[0]; - }, $request->getHeaders()); + $carrier = array_map(fn ($header) => $header[0], $request->getHeaders()); if ($container->has(Rpc\Context::class) && $rpcContext = $container->get(Rpc\Context::class)) { $rpcCarrier = $rpcContext->get('tracer.carrier'); if (! empty($rpcCarrier)) { diff --git a/src/TracerContext.php b/src/TracerContext.php index 1a9d230..8117e4c 100644 --- a/src/TracerContext.php +++ b/src/TracerContext.php @@ -23,6 +23,8 @@ class TracerContext public const ROOT = 'tracer.root'; + public const TRACE_ID = 'tracer.trace_id'; + public static function setTracer(Tracer $tracer): Tracer { return Context::set(self::TRACER, $tracer); @@ -42,4 +44,14 @@ public static function getRoot(): ?Span { return Context::get(self::ROOT) ?: null; } + + public static function setTraceId(string $traceId): string + { + return Context::set(self::TRACE_ID, $traceId); + } + + public static function getTraceId(): ?string + { + return Context::get(self::TRACE_ID) ?: null; + } } From 85b89a372f7325fda1b7e38db9dd9d79e97e409e Mon Sep 17 00:00:00 2001 From: Deeka Wong Date: Tue, 29 Aug 2023 00:04:11 -0500 Subject: [PATCH 2/2] Optimize kafka reporter of tracer (#6098) --- src/Adapter/Reporter/KafkaClientFactory.php | 97 +++++++++++++++++++-- src/Exception/ConnectionClosedException.php | 18 ++++ src/Exception/TimeoutException.php | 18 ++++ 3 files changed, 124 insertions(+), 9 deletions(-) create mode 100644 src/Exception/ConnectionClosedException.php create mode 100644 src/Exception/TimeoutException.php diff --git a/src/Adapter/Reporter/KafkaClientFactory.php b/src/Adapter/Reporter/KafkaClientFactory.php index 4aedd04..bc61579 100644 --- a/src/Adapter/Reporter/KafkaClientFactory.php +++ b/src/Adapter/Reporter/KafkaClientFactory.php @@ -11,34 +11,113 @@ */ namespace Hyperf\Tracer\Adapter\Reporter; +use Closure; +use Hyperf\Coordinator\Constants; +use Hyperf\Coordinator\CoordinatorManager; +use Hyperf\Engine\Channel; +use Hyperf\Engine\Coroutine; +use Hyperf\Tracer\Exception\ConnectionClosedException; use longlang\phpkafka\Producer\Producer; use longlang\phpkafka\Producer\ProducerConfig; +use Throwable; class KafkaClientFactory { - private ?Producer $producer = null; + protected ?Channel $chan = null; + + protected ?Producer $producer = null; + + protected array $options = []; + + protected int $channelSize = 65535; public function build(array $options): callable { - $this->producer ??= $this->createProducer($options); + $this->options = $options; + if (isset($options['channel_size'])) { + $this->channelSize = (int) $options['channel_size']; + } + + $this->loop(); return function (string $payload) use ($options): void { - $this->producer->send( - $options['topic'] ?? 'zipkin', - $payload, - uniqid('', true) - ); + $topic = $options['topic'] ?? 'zipkin'; + $key = $options['key'] ?? uniqid('', true); + $headers = $options['headers'] ?? []; + $partitionIndex = $options['partition_index'] ?? null; + $chan = $this->chan; + + $chan->push(function () use ($topic, $key, $payload, $headers, $partitionIndex) { + try { + $this->producer->send($topic, $payload, $key, $headers, $partitionIndex); + } catch (Throwable $e) { + throw $e; + } + }); + + if ($chan->isClosing()) { + throw new ConnectionClosedException('Connection closed.'); + } }; } - private function createProducer(array $options): Producer + public function close(): void + { + $chan = $this->chan; + $producer = $this->producer; + $this->chan = null; + $this->producer = null; + + $chan?->close(); + $producer?->close(); + } + + protected function loop(): void + { + if ($this->chan != null) { + return; + } + + $this->chan = new Channel($this->channelSize); + + Coroutine::create(function () { + while (true) { + $this->producer = $this->makeProducer(); + while (true) { + /** @var null|Closure $closure */ + $closure = $this->chan?->pop(); + if (! $closure) { + break 2; + } + try { + $closure->call($this); + } catch (Throwable) { + $this->producer->close(); + break; + } finally { + $closure = null; + } + } + } + + $this->close(); + }); + + Coroutine::create(function () { + if (CoordinatorManager::until(Constants::WORKER_EXIT)->yield()) { + $this->close(); + } + }); + } + + protected function makeProducer(): Producer { $options = array_replace([ 'bootstrap_servers' => '127.0.0.1:9092', 'acks' => -1, 'connect_timeout' => 1, 'send_timeout' => 1, - ], $options); + ], $this->options); $config = new ProducerConfig(); $config->setBootstrapServer($options['bootstrap_servers']); diff --git a/src/Exception/ConnectionClosedException.php b/src/Exception/ConnectionClosedException.php new file mode 100644 index 0000000..427191a --- /dev/null +++ b/src/Exception/ConnectionClosedException.php @@ -0,0 +1,18 @@ +