diff --git a/class_map/GlobalTracer.php b/class_map/GlobalTracer.php new file mode 100644 index 0000000..a52c4b5 --- /dev/null +++ b/class_map/GlobalTracer.php @@ -0,0 +1,58 @@ +producer ??= $this->createProducer($options); + $this->options = $options; + if (isset($options['channel_size'])) { + $this->channelSize = (int) $options['channel_size']; + } + + $this->loop(); return function (string $payload) use ($options): void { - $this->producer->send( - $options['topic'] ?? 'zipkin', - $payload, - uniqid('', true) - ); + $topic = $options['topic'] ?? 'zipkin'; + $key = $options['key'] ?? uniqid('', true); + $headers = $options['headers'] ?? []; + $partitionIndex = $options['partition_index'] ?? null; + $chan = $this->chan; + + $chan->push(function () use ($topic, $key, $payload, $headers, $partitionIndex) { + try { + $this->producer->send($topic, $payload, $key, $headers, $partitionIndex); + } catch (Throwable $e) { + throw $e; + } + }); + + if ($chan->isClosing()) { + throw new ConnectionClosedException('Connection closed.'); + } }; } - private function createProducer(array $options): Producer + public function close(): void + { + $chan = $this->chan; + $producer = $this->producer; + $this->chan = null; + $this->producer = null; + + $chan?->close(); + $producer?->close(); + } + + protected function loop(): void + { + if ($this->chan != null) { + return; + } + + $this->chan = new Channel($this->channelSize); + + Coroutine::create(function () { + while (true) { + $this->producer = $this->makeProducer(); + while (true) { + /** @var null|Closure $closure */ + $closure = $this->chan?->pop(); + if (! $closure) { + break 2; + } + try { + $closure->call($this); + } catch (Throwable) { + $this->producer->close(); + break; + } finally { + $closure = null; + } + } + } + + $this->close(); + }); + + Coroutine::create(function () { + if (CoordinatorManager::until(Constants::WORKER_EXIT)->yield()) { + $this->close(); + } + }); + } + + protected function makeProducer(): Producer { $options = array_replace([ 'bootstrap_servers' => '127.0.0.1:9092', 'acks' => -1, 'connect_timeout' => 1, 'send_timeout' => 1, - ], $options); + ], $this->options); $config = new ProducerConfig(); $config->setBootstrapServer($options['bootstrap_servers']); diff --git a/src/Aspect/CreateTraceContextAspect.php b/src/Aspect/CreateTraceContextAspect.php new file mode 100644 index 0000000..24bf7ee --- /dev/null +++ b/src/Aspect/CreateTraceContextAspect.php @@ -0,0 +1,34 @@ +process(); + if ($traceContext instanceof TraceContext) { + TracerContext::setTraceId($traceContext->getTraceId()); + } + return $traceContext; + } +} diff --git a/src/ConfigProvider.php b/src/ConfigProvider.php index 8396377..9ebb00b 100644 --- a/src/ConfigProvider.php +++ b/src/ConfigProvider.php @@ -12,11 +12,13 @@ namespace Hyperf\Tracer; use GuzzleHttp\Client; +use Hyperf\Tracer\Aspect\CreateTraceContextAspect; use Hyperf\Tracer\Aspect\HttpClientAspect; use Hyperf\Tracer\Aspect\RedisAspect; use Hyperf\Tracer\Aspect\TraceAnnotationAspect; use Hyperf\Tracer\Listener\DbQueryExecutedListener; use Jaeger\ThriftUdpTransport; +use OpenTracing\GlobalTracer; use OpenTracing\Tracer; use Zipkin\Propagation\Map; @@ -37,12 +39,14 @@ public function __invoke(): array 'annotations' => [ 'scan' => [ 'class_map' => [ + GlobalTracer::class => __DIR__ . '/../class_map/GlobalTracer.php', Map::class => __DIR__ . '/../class_map/Map.php', ThriftUdpTransport::class => __DIR__ . '/../class_map/ThriftUdpTransport.php', ], ], ], 'aspects' => [ + CreateTraceContextAspect::class, HttpClientAspect::class, RedisAspect::class, TraceAnnotationAspect::class, diff --git a/src/Exception/ConnectionClosedException.php b/src/Exception/ConnectionClosedException.php new file mode 100644 index 0000000..427191a --- /dev/null +++ b/src/Exception/ConnectionClosedException.php @@ -0,0 +1,18 @@ +handle($request); - /** @var \ZipkinOpenTracing\SpanContext $spanContent */ - $spanContent = $span->getContext(); - /** @var \Zipkin\Propagation\TraceContext $traceContext */ - $traceContext = $spanContent->getContext(); - $response = $response->withHeader('Trace-Id', $traceContext->getTraceId()); + if ($traceId = TracerContext::getTraceId()) { + $response = $response->withHeader('Trace-Id', $traceId); + } $span->setTag($this->spanTagManager->get('response', 'status_code'), $response->getStatusCode()); } catch (Throwable $exception) { $this->switchManager->isEnable('exception') && $this->appendExceptionToSpan($span, $exception); diff --git a/src/SpanStarter.php b/src/SpanStarter.php index 97837ba..d2339c6 100644 --- a/src/SpanStarter.php +++ b/src/SpanStarter.php @@ -45,9 +45,7 @@ protected function startSpan( TracerContext::setRoot($root); return $root; } - $carrier = array_map(function ($header) { - return $header[0]; - }, $request->getHeaders()); + $carrier = array_map(fn ($header) => $header[0], $request->getHeaders()); if ($container->has(Rpc\Context::class) && $rpcContext = $container->get(Rpc\Context::class)) { $rpcCarrier = $rpcContext->get('tracer.carrier'); if (! empty($rpcCarrier)) { diff --git a/src/TracerContext.php b/src/TracerContext.php index 1a9d230..8117e4c 100644 --- a/src/TracerContext.php +++ b/src/TracerContext.php @@ -23,6 +23,8 @@ class TracerContext public const ROOT = 'tracer.root'; + public const TRACE_ID = 'tracer.trace_id'; + public static function setTracer(Tracer $tracer): Tracer { return Context::set(self::TRACER, $tracer); @@ -42,4 +44,14 @@ public static function getRoot(): ?Span { return Context::get(self::ROOT) ?: null; } + + public static function setTraceId(string $traceId): string + { + return Context::set(self::TRACE_ID, $traceId); + } + + public static function getTraceId(): ?string + { + return Context::get(self::TRACE_ID) ?: null; + } }