Skip to content

Commit

Permalink
Merge branch 'master' into 3.1-merge
Browse files Browse the repository at this point in the history
  • Loading branch information
limingxinleo committed Aug 30, 2023
2 parents b214dd3 + 85b89a3 commit 2521fbc
Show file tree
Hide file tree
Showing 9 changed files with 236 additions and 17 deletions.
58 changes: 58 additions & 0 deletions class_map/GlobalTracer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact [email protected]
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace OpenTracing;

use Hyperf\Tracer\TracerContext;

final class GlobalTracer
{
/**
* @var Tracer
*/
private static $instance;

/**
* @var bool
*/
private static $isRegistered = false;

/**
* GlobalTracer::set sets the [singleton] Tracer returned by get().
* Those who use GlobalTracer (rather than directly manage a Tracer instance)
* should call GlobalTracer::set as early as possible in bootstrap, prior to
* start a new span. Prior to calling GlobalTracer::set, any Spans started
* via the `Tracer::startActiveSpan` (etc) globals are noops.
*/
public static function set(Tracer $tracer): void
{
TracerContext::setTracer($tracer);
self::$isRegistered = true;
}

/**
* GlobalTracer::get returns the global singleton `Tracer` implementation.
* Before `GlobalTracer::set` is called, the `GlobalTracer::get` is a noop
* implementation that drops all data handed to it.
*/
public static function get(): Tracer
{
return TracerContext::getTracer();
}

/**
* Returns true if a global tracer has been registered, otherwise returns false.
*/
public static function isRegistered(): bool
{
return self::$isRegistered;
}
}
97 changes: 88 additions & 9 deletions src/Adapter/Reporter/KafkaClientFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,113 @@
*/
namespace Hyperf\Tracer\Adapter\Reporter;

use Closure;
use Hyperf\Coordinator\Constants;
use Hyperf\Coordinator\CoordinatorManager;
use Hyperf\Engine\Channel;
use Hyperf\Engine\Coroutine;
use Hyperf\Tracer\Exception\ConnectionClosedException;
use longlang\phpkafka\Producer\Producer;
use longlang\phpkafka\Producer\ProducerConfig;
use Throwable;

class KafkaClientFactory
{
private ?Producer $producer = null;
protected ?Channel $chan = null;

protected ?Producer $producer = null;

protected array $options = [];

protected int $channelSize = 65535;

public function build(array $options): callable
{
$this->producer ??= $this->createProducer($options);
$this->options = $options;
if (isset($options['channel_size'])) {
$this->channelSize = (int) $options['channel_size'];
}

$this->loop();

return function (string $payload) use ($options): void {
$this->producer->send(
$options['topic'] ?? 'zipkin',
$payload,
uniqid('', true)
);
$topic = $options['topic'] ?? 'zipkin';
$key = $options['key'] ?? uniqid('', true);
$headers = $options['headers'] ?? [];
$partitionIndex = $options['partition_index'] ?? null;
$chan = $this->chan;

$chan->push(function () use ($topic, $key, $payload, $headers, $partitionIndex) {
try {
$this->producer->send($topic, $payload, $key, $headers, $partitionIndex);
} catch (Throwable $e) {
throw $e;
}
});

if ($chan->isClosing()) {
throw new ConnectionClosedException('Connection closed.');
}
};
}

private function createProducer(array $options): Producer
public function close(): void
{
$chan = $this->chan;
$producer = $this->producer;
$this->chan = null;
$this->producer = null;

$chan?->close();
$producer?->close();
}

protected function loop(): void
{
if ($this->chan != null) {
return;
}

$this->chan = new Channel($this->channelSize);

Coroutine::create(function () {
while (true) {
$this->producer = $this->makeProducer();
while (true) {
/** @var null|Closure $closure */
$closure = $this->chan?->pop();
if (! $closure) {
break 2;
}
try {
$closure->call($this);
} catch (Throwable) {
$this->producer->close();
break;
} finally {
$closure = null;
}
}
}

$this->close();
});

Coroutine::create(function () {
if (CoordinatorManager::until(Constants::WORKER_EXIT)->yield()) {
$this->close();
}
});
}

protected function makeProducer(): Producer
{
$options = array_replace([
'bootstrap_servers' => '127.0.0.1:9092',
'acks' => -1,
'connect_timeout' => 1,
'send_timeout' => 1,
], $options);
], $this->options);
$config = new ProducerConfig();

$config->setBootstrapServer($options['bootstrap_servers']);
Expand Down
34 changes: 34 additions & 0 deletions src/Aspect/CreateTraceContextAspect.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact [email protected]
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Tracer\Aspect;

use Hyperf\Di\Aop\AbstractAspect;
use Hyperf\Di\Aop\ProceedingJoinPoint;
use Hyperf\Tracer\TracerContext;
use Zipkin\Propagation\TraceContext;

class CreateTraceContextAspect extends AbstractAspect
{
public array $classes = [
TraceContext::class . '::create',
TraceContext::class . '::create*',
];

public function process(ProceedingJoinPoint $proceedingJoinPoint)
{
$traceContext = $proceedingJoinPoint->process();
if ($traceContext instanceof TraceContext) {
TracerContext::setTraceId($traceContext->getTraceId());
}
return $traceContext;
}
}
4 changes: 4 additions & 0 deletions src/ConfigProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
namespace Hyperf\Tracer;

use GuzzleHttp\Client;
use Hyperf\Tracer\Aspect\CreateTraceContextAspect;
use Hyperf\Tracer\Aspect\HttpClientAspect;
use Hyperf\Tracer\Aspect\RedisAspect;
use Hyperf\Tracer\Aspect\TraceAnnotationAspect;
use Hyperf\Tracer\Listener\DbQueryExecutedListener;
use Jaeger\ThriftUdpTransport;
use OpenTracing\GlobalTracer;
use OpenTracing\Tracer;
use Zipkin\Propagation\Map;

Expand All @@ -37,12 +39,14 @@ public function __invoke(): array
'annotations' => [
'scan' => [
'class_map' => [
GlobalTracer::class => __DIR__ . '/../class_map/GlobalTracer.php',
Map::class => __DIR__ . '/../class_map/Map.php',
ThriftUdpTransport::class => __DIR__ . '/../class_map/ThriftUdpTransport.php',
],
],
],
'aspects' => [
CreateTraceContextAspect::class,
HttpClientAspect::class,
RedisAspect::class,
TraceAnnotationAspect::class,
Expand Down
18 changes: 18 additions & 0 deletions src/Exception/ConnectionClosedException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact [email protected]
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Tracer\Exception;

use RuntimeException;

class ConnectionClosedException extends RuntimeException
{
}
18 changes: 18 additions & 0 deletions src/Exception/TimeoutException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact [email protected]
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Tracer\Exception;

use RuntimeException;

class TimeoutException extends RuntimeException
{
}
8 changes: 3 additions & 5 deletions src/Middleware/TraceMiddleware.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,9 @@ public function process(ServerRequestInterface $request, RequestHandlerInterface
});
try {
$response = $handler->handle($request);
/** @var \ZipkinOpenTracing\SpanContext $spanContent */
$spanContent = $span->getContext();
/** @var \Zipkin\Propagation\TraceContext $traceContext */
$traceContext = $spanContent->getContext();
$response = $response->withHeader('Trace-Id', $traceContext->getTraceId());
if ($traceId = TracerContext::getTraceId()) {
$response = $response->withHeader('Trace-Id', $traceId);
}
$span->setTag($this->spanTagManager->get('response', 'status_code'), $response->getStatusCode());
} catch (Throwable $exception) {
$this->switchManager->isEnable('exception') && $this->appendExceptionToSpan($span, $exception);
Expand Down
4 changes: 1 addition & 3 deletions src/SpanStarter.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ protected function startSpan(
TracerContext::setRoot($root);
return $root;
}
$carrier = array_map(function ($header) {
return $header[0];
}, $request->getHeaders());
$carrier = array_map(fn ($header) => $header[0], $request->getHeaders());
if ($container->has(Rpc\Context::class) && $rpcContext = $container->get(Rpc\Context::class)) {
$rpcCarrier = $rpcContext->get('tracer.carrier');
if (! empty($rpcCarrier)) {
Expand Down
12 changes: 12 additions & 0 deletions src/TracerContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ class TracerContext

public const ROOT = 'tracer.root';

public const TRACE_ID = 'tracer.trace_id';

public static function setTracer(Tracer $tracer): Tracer
{
return Context::set(self::TRACER, $tracer);
Expand All @@ -42,4 +44,14 @@ public static function getRoot(): ?Span
{
return Context::get(self::ROOT) ?: null;
}

public static function setTraceId(string $traceId): string
{
return Context::set(self::TRACE_ID, $traceId);
}

public static function getTraceId(): ?string
{
return Context::get(self::TRACE_ID) ?: null;
}
}

0 comments on commit 2521fbc

Please sign in to comment.