Skip to content

Commit

Permalink
Adds carrier packer (#537)
Browse files Browse the repository at this point in the history
Co-authored-by: Deeka Wong <[email protected]>
  • Loading branch information
huangdijia and huangdijia committed Feb 2, 2024
1 parent 715098a commit e2c6861
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 56 deletions.
15 changes: 6 additions & 9 deletions src/Tracing/Aspect/AmqpProducerAspect.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@

use FriendsOfHyperf\Sentry\Constants;
use FriendsOfHyperf\Sentry\Tracing\SpanStarter;
use FriendsOfHyperf\Sentry\Util\CarrierPacker;
use Hyperf\Amqp\Message\ProducerMessage;
use Hyperf\Di\Aop\AbstractAspect;
use Hyperf\Di\Aop\ProceedingJoinPoint;
use longlang\phpkafka\Protocol\RecordBatch\RecordHeader;
use PhpAmqpLib\Wire\AMQPTable;

use function Hyperf\Tappable\tap;
Expand All @@ -32,6 +32,10 @@ class AmqpProducerAspect extends AbstractAspect
'Hyperf\Amqp\Producer::produceMessage',
];

public function __construct(protected CarrierPacker $packer)
{
}

public function process(ProceedingJoinPoint $proceedingJoinPoint)
{
return match ($proceedingJoinPoint->methodName) {
Expand All @@ -53,14 +57,7 @@ protected function produceMessage(ProceedingJoinPoint $proceedingJoinPoint)
'amqp.produce',
sprintf('%s::%s', $proceedingJoinPoint->className, $proceedingJoinPoint->methodName)
);
$carrier = json_encode([
'sentry-trace' => $span->toTraceparent(),
'baggage' => $span->toBaggage(),
'traceparent' => $span->toW3CTraceparent(),
]);
$headers[] = (new RecordHeader())
->setHeaderKey(Constants::TRACE_CARRIER)
->setValue($carrier);
$carrier = $this->packer->pack($span);
(function () use ($carrier) {
$this->properties['application_headers'] ??= new AMQPTable();
$this->properties['application_headers']->set(Constants::TRACE_CARRIER, $carrier);
Expand Down
21 changes: 11 additions & 10 deletions src/Tracing/Aspect/AsyncQueueJobMessageAspect.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use FriendsOfHyperf\Sentry\Constants;
use FriendsOfHyperf\Sentry\Switcher;
use FriendsOfHyperf\Sentry\Tracing\SpanStarter;
use FriendsOfHyperf\Sentry\Util\CarrierPacker;
use Hyperf\AsyncQueue\JobMessage;
use Hyperf\Context\Context;
use Hyperf\Di\Aop\AbstractAspect;
Expand All @@ -31,8 +32,10 @@ class AsyncQueueJobMessageAspect extends AbstractAspect
JobMessage::class . '::__unserialize',
];

public function __construct(protected Switcher $switcher)
{
public function __construct(
protected Switcher $switcher,
protected CarrierPacker $packer
) {
}

public function process(ProceedingJoinPoint $proceedingJoinPoint)
Expand All @@ -54,11 +57,7 @@ protected function handleSerialize(ProceedingJoinPoint $proceedingJoinPoint)
if (is_array($result)) {
$job = array_is_list($result) ? head($result) : $result['job'] ?? null;
$span = $this->startSpan('async_queue.job.dispatch', $job ? $job::class : null);
$carrier = [
'sentry-trace' => $span->toTraceparent(),
'baggage' => $span->toBaggage(),
'traceparent' => $span->toW3CTraceparent(),
];
$carrier = $this->packer->pack($span);
if (array_is_list($result)) {
$result[] = $carrier;
} elseif (isset($result['job'])) {
Expand All @@ -74,17 +73,19 @@ protected function handleSerialize(ProceedingJoinPoint $proceedingJoinPoint)
protected function handleUnserialize(ProceedingJoinPoint $proceedingJoinPoint)
{
$data = $proceedingJoinPoint->arguments['keys']['data'] ?? [];
$carrier = [];
$carrier = '';

if (is_array($data)) {
if (array_is_list($data)) {
$carrier = end($data);
} elseif (isset($data['job'])) {
$carrier = $data[Constants::TRACE_CARRIER] ?? [];
$carrier = $data[Constants::TRACE_CARRIER] ?? '';
}
}

Context::set(Constants::TRACE_CARRIER, $carrier);
if ($carrier) {
Context::set(Constants::TRACE_CARRIER, $carrier);
}

return $proceedingJoinPoint->process();
}
Expand Down
18 changes: 7 additions & 11 deletions src/Tracing/Aspect/KafkaProducerAspect.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

use FriendsOfHyperf\Sentry\Constants;
use FriendsOfHyperf\Sentry\Tracing\SpanStarter;
use FriendsOfHyperf\Sentry\Util\CarrierPacker;
use Hyperf\Di\Aop\AbstractAspect;
use Hyperf\Di\Aop\ProceedingJoinPoint;
use longlang\phpkafka\Producer\ProduceMessage;
Expand All @@ -32,6 +33,10 @@ class KafkaProducerAspect extends AbstractAspect
'Hyperf\Kafka\Producer::sendBatchAsync',
];

public function __construct(protected CarrierPacker $packer)
{
}

public function process(ProceedingJoinPoint $proceedingJoinPoint)
{
return match ($proceedingJoinPoint->methodName) {
Expand All @@ -47,12 +52,7 @@ protected function sendAsync(ProceedingJoinPoint $proceedingJoinPoint)
'kafka.send',
sprintf('%s::%s', $proceedingJoinPoint->className, $proceedingJoinPoint->methodName)
);
$carrier = json_encode([
'sentry-trace' => $span->toTraceparent(),
'baggage' => $span->toBaggage(),
'traceparent' => $span->toW3CTraceparent(),
]);

$carrier = $this->packer->pack($span);
$headers = $proceedingJoinPoint->arguments['keys']['headers'] ?? [];
$headers[] = (new RecordHeader())
->setHeaderKey(Constants::TRACE_CARRIER)
Expand All @@ -70,11 +70,7 @@ protected function sendBatchAsync(ProceedingJoinPoint $proceedingJoinPoint)
'kafka.send_batch',
sprintf('%s::%s', $proceedingJoinPoint->className, $proceedingJoinPoint->methodName)
);
$carrier = json_encode([
'sentry-trace' => $span->toTraceparent(),
'baggage' => $span->toBaggage(),
'traceparent' => $span->toW3CTraceparent(),
]);
$carrier = $this->packer->pack($span);

foreach ($messages as $message) {
(
Expand Down
10 changes: 4 additions & 6 deletions src/Tracing/Aspect/RpcAspect.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use FriendsOfHyperf\Sentry\Switcher;
use FriendsOfHyperf\Sentry\Tracing\SpanStarter;
use FriendsOfHyperf\Sentry\Tracing\TagManager;
use FriendsOfHyperf\Sentry\Util\CarrierPacker;
use Hyperf\Context\Context;
use Hyperf\Coroutine\Coroutine;
use Hyperf\Di\Aop\AbstractAspect;
Expand Down Expand Up @@ -42,7 +43,8 @@ class RpcAspect extends AbstractAspect
public function __construct(
protected ContainerInterface $container,
protected Switcher $switcher,
protected TagManager $tagManager
protected TagManager $tagManager,
protected CarrierPacker $packer
) {
}

Expand Down Expand Up @@ -80,11 +82,7 @@ private function handleGenerateRpcPath(ProceedingJoinPoint $proceedingJoinPoint)

if ($this->container->has(Rpc\Context::class)) {
$rpcContext = $this->container->get(Rpc\Context::class);
$rpcContext->set(Constants::TRACE_CARRIER, [
'sentry-trace' => $span->toTraceparent(),
'baggage' => $span->toBaggage(),
'traceparent' => $span->toW3CTraceparent(),
]);
$rpcContext->set(Constants::TRACE_CARRIER, $this->packer->pack($span));
}

return $path;
Expand Down
10 changes: 4 additions & 6 deletions src/Tracing/Listener/TracingAmqpListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use FriendsOfHyperf\Sentry\Switcher;
use FriendsOfHyperf\Sentry\Tracing\SpanStarter;
use FriendsOfHyperf\Sentry\Tracing\TagManager;
use FriendsOfHyperf\Sentry\Util\CarrierPacker;
use Hyperf\Amqp\Event\AfterConsume;
use Hyperf\Amqp\Event\BeforeConsume;
use Hyperf\Amqp\Event\FailToConsume;
Expand All @@ -31,7 +32,8 @@ class TracingAmqpListener implements ListenerInterface

public function __construct(
protected Switcher $switcher,
protected TagManager $tagManager
protected TagManager $tagManager,
protected CarrierPacker $packer
) {
}

Expand Down Expand Up @@ -71,11 +73,7 @@ protected function startTransaction(BeforeConsume $event): void
/** @var AMQPTable|null $applicationHeaders */
$applicationHeaders = $amqpMessage->get('application_headers');
if ($applicationHeaders && isset($applicationHeaders[Constants::TRACE_CARRIER])) {
$carrier = json_decode($applicationHeaders[Constants::TRACE_CARRIER], true);
[$sentryTrace, $baggage] = [
$carrier['sentry-trace'] ?? '',
$carrier['baggage'] ?? '',
];
[$sentryTrace, $baggage] = $this->packer->unpack($applicationHeaders[Constants::TRACE_CARRIER]);
}
}

Expand Down
13 changes: 8 additions & 5 deletions src/Tracing/Listener/TracingAsyncQueueListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use FriendsOfHyperf\Sentry\Switcher;
use FriendsOfHyperf\Sentry\Tracing\SpanStarter;
use FriendsOfHyperf\Sentry\Tracing\TagManager;
use FriendsOfHyperf\Sentry\Util\CarrierPacker;
use Hyperf\AsyncQueue\Event\AfterHandle;
use Hyperf\AsyncQueue\Event\BeforeHandle;
use Hyperf\AsyncQueue\Event\FailedHandle;
Expand All @@ -32,7 +33,8 @@ class TracingAsyncQueueListener implements ListenerInterface

public function __construct(
protected Switcher $switcher,
protected TagManager $tagManager
protected TagManager $tagManager,
protected CarrierPacker $packer
) {
}

Expand Down Expand Up @@ -64,12 +66,13 @@ public function process(object $event): void

protected function startTransaction(BeforeHandle $event): void
{
$carrier = Context::get(Constants::TRACE_CARRIER, [], Coroutine::parentId());
$sentryTrace = $baggage = '';

if (! empty($carrier['sentry-trace']) && ! empty($carrier['baggage'])) {
$sentryTrace = $carrier['sentry-trace'] ?? $carrier['traceparent'];
$baggage = $carrier['baggage'];
/** @var string|null $carrier */
$carrier = Context::get(Constants::TRACE_CARRIER, null, Coroutine::parentId());

if ($carrier) {
[$sentryTrace, $baggage] = $this->packer->unpack($carrier);
}

$job = $event->getMessage()->job();
Expand Down
12 changes: 7 additions & 5 deletions src/Tracing/Listener/TracingKafkaListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use FriendsOfHyperf\Sentry\Switcher;
use FriendsOfHyperf\Sentry\Tracing\SpanStarter;
use FriendsOfHyperf\Sentry\Tracing\TagManager;
use FriendsOfHyperf\Sentry\Util\CarrierPacker;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Kafka\Event\AfterConsume;
use Hyperf\Kafka\Event\BeforeConsume;
Expand All @@ -30,7 +31,8 @@ class TracingKafkaListener implements ListenerInterface

public function __construct(
protected Switcher $switcher,
protected TagManager $tagManager
protected TagManager $tagManager,
protected CarrierPacker $packer
) {
}

Expand Down Expand Up @@ -63,20 +65,20 @@ protected function startTransaction(BeforeConsume $event): void
{
$consumer = $event->getConsumer();
$message = $event->getData();
$carrier = [];
$sentryTrace = $baggage = '';

if ($message instanceof ConsumeMessage) {
foreach ($message->getHeaders() as $header) {
if ($header->getHeaderKey() === Constants::TRACE_CARRIER) {
$carrier = json_decode($header->getValue(), true);
[$sentryTrace, $baggage] = $this->packer->unpack($header->getValue());
break;
}
}
}

$this->continueTrace(
sentryTrace: $carrier['sentry-trace'] ?? '',
baggage: $carrier['baggage'] ?? '',
sentryTrace: $sentryTrace,
baggage: $baggage,
name: $consumer::class,
op: 'kafka.consume',
description: 'consumer: ' . $consumer::class,
Expand Down
7 changes: 3 additions & 4 deletions src/Tracing/SpanStarter.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
namespace FriendsOfHyperf\Sentry\Tracing;

use FriendsOfHyperf\Sentry\Constants;
use FriendsOfHyperf\Sentry\Util\CarrierPacker;
use Hyperf\Context\ApplicationContext;
use Hyperf\Rpc\Context as RpcContext;
use Psr\Http\Message\ServerRequestInterface;
Expand Down Expand Up @@ -61,10 +62,8 @@ protected function startRequestTransaction(ServerRequestInterface $request, ...$
if ($container->has(RpcContext::class)) {
$rpcContext = $container->get(RpcContext::class);
$carrier = $rpcContext->get(Constants::TRACE_CARRIER);
if (! empty($carrier['sentry-trace']) && ! empty($carrier['baggage'])) {
$sentryTrace = $carrier['sentry-trace'] ?? $carrier['traceparent'];
$baggage = $carrier['baggage'];
}
$packer = $container->get(CarrierPacker::class);
[$sentryTrace, $baggage] = $packer->unpack($carrier);
}

return $this->continueTrace($sentryTrace, $baggage, ...$options);
Expand Down
45 changes: 45 additions & 0 deletions src/Util/CarrierPacker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?php

declare(strict_types=1);
/**
* This file is part of friendsofhyperf/components.
*
* @link https://github.com/friendsofhyperf/components
* @document https://github.com/friendsofhyperf/components/blob/main/README.md
* @contact [email protected]
*/

namespace FriendsOfHyperf\Sentry\Util;

use Sentry\Tracing\Span;
use Throwable;

class CarrierPacker
{
/**
* @return string[]
*/
public function unpack(string $data): array
{
try {
$carrier = json_decode($data, true, flags: JSON_THROW_ON_ERROR);

return [
$carrier['sentry-trace'] ?? $carrier['traceparent'] ?? '',
$carrier['baggage'] ?? '',
$carrier['traceparent'] ?? '',
];
} catch (Throwable) {
return ['', '', ''];
}
}

public function pack(Span $span): string
{
return json_encode([
'sentry-trace' => $span->toTraceparent(),
'baggage' => $span->toBaggage(),
'traceparent' => $span->toW3CTraceparent(),
]);
}
}

0 comments on commit e2c6861

Please sign in to comment.