Skip to content

Commit

Permalink
Merge branch 'master' into 3.1-merge
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/http-server/tests/Router/DispatcherFactoryTest.php
  • Loading branch information
limingxinleo committed Aug 25, 2023
2 parents 1cf1dc0 + 2fbef18 commit b214dd3
Show file tree
Hide file tree
Showing 9 changed files with 271 additions and 48 deletions.
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
},
"suggest": {
"hyperf/event": "Required to use DbQueryExecutedListener.",
"longlang/phpkafka": "Required (^1.2) to use Kafka Producer.",
"jonahgeorge/jaeger-client-php": "Required (^0.6) to use jaeger tracing."
},
"autoload": {
Expand Down
31 changes: 28 additions & 3 deletions publish/opentracing.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,34 @@
'ipv6' => null,
'port' => 9501,
],
'options' => [
'endpoint_url' => env('ZIPKIN_ENDPOINT_URL', 'http://localhost:9411/api/v2/spans'),
'timeout' => env('ZIPKIN_TIMEOUT', 1),
'reporter' => env('ZIPKIN_REPORTER', 'http'), // kafka, http
'reporters' => [
// options for http reporter
'http' => [
'class' => \Zipkin\Reporters\Http::class,
'constructor' => [
'options' => [
'endpoint_url' => env('ZIPKIN_ENDPOINT_URL', 'http://localhost:9411/api/v2/spans'),
'timeout' => env('ZIPKIN_TIMEOUT', 1),
],
],
],
// options for kafka reporter
'kafka' => [
'class' => \Hyperf\Tracer\Adapter\Reporter\Kafka::class,
'constructor' => [
'options' => [
'topic' => env('ZIPKIN_KAFKA_TOPIC', 'zipkin'),
'bootstrap_servers' => env('ZIPKIN_KAFKA_BOOTSTRAP_SERVERS', '127.0.0.1:9092'),
'acks' => (int) env('ZIPKIN_KAFKA_ACKS', -1),
'connect_timeout' => (int) env('ZIPKIN_KAFKA_CONNECT_TIMEOUT', 1),
'send_timeout' => (int) env('ZIPKIN_KAFKA_SEND_TIMEOUT', 1),
],
],
],
'noop' => [
'class' => \Zipkin\Reporters\Noop::class,
],
],
'sampler' => BinarySampler::createAsAlwaysSample(),
],
Expand Down
40 changes: 4 additions & 36 deletions src/Adapter/HttpClientFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,41 +11,9 @@
*/
namespace Hyperf\Tracer\Adapter;

use Hyperf\Guzzle\ClientFactory as GuzzleClientFactory;
use RuntimeException;
use Zipkin\Reporters\Http\ClientFactory;

class HttpClientFactory implements ClientFactory
/**
* @deprecated v3.0, will remove in v3.1, use \Hyperf\Tracer\Adapter\Reporter\HttpClientFactory instead.
*/
class HttpClientFactory extends Reporter\HttpClientFactory
{
public function __construct(private GuzzleClientFactory $guzzleClientFactory)
{
}

public function build(array $options): callable
{
return function (string $payload) use ($options): void {
$url = $options['endpoint_url'];
unset($options['endpoint_url']);
$client = $this->guzzleClientFactory->create($options);
$additionalHeaders = $options['headers'] ?? [];
$requiredHeaders = [
'Content-Type' => 'application/json',
'Content-Length' => strlen($payload),
'b3' => '0',
];
$headers = array_merge($additionalHeaders, $requiredHeaders);
$response = $client->post($url, [
'body' => $payload,
'headers' => $headers,
// If 'no_aspect' option is true, then the HttpClientAspect will not modify the client options.
'no_aspect' => true,
]);
$statusCode = $response->getStatusCode();
if ($statusCode !== 202) {
throw new RuntimeException(
sprintf('Reporting of spans failed, status code %d', $statusCode)
);
}
};
}
}
51 changes: 51 additions & 0 deletions src/Adapter/Reporter/HttpClientFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact [email protected]
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Tracer\Adapter\Reporter;

use Hyperf\Guzzle\ClientFactory as GuzzleClientFactory;
use RuntimeException;
use Zipkin\Reporters\Http\ClientFactory;

class HttpClientFactory implements ClientFactory
{
public function __construct(private GuzzleClientFactory $guzzleClientFactory)
{
}

public function build(array $options): callable
{
return function (string $payload) use ($options): void {
$url = $options['endpoint_url'];
unset($options['endpoint_url']);
$client = $this->guzzleClientFactory->create($options);
$additionalHeaders = $options['headers'] ?? [];
$requiredHeaders = [
'Content-Type' => 'application/json',
'Content-Length' => strlen($payload),
'b3' => '0',
];
$headers = array_merge($additionalHeaders, $requiredHeaders);
$response = $client->post($url, [
'body' => $payload,
'headers' => $headers,
// If 'no_aspect' option is true, then the HttpClientAspect will not modify the client options.
'no_aspect' => true,
]);
$statusCode = $response->getStatusCode();
if ($statusCode !== 202) {
throw new RuntimeException(
sprintf('Reporting of spans failed, status code %d', $statusCode)
);
}
};
}
}
68 changes: 68 additions & 0 deletions src/Adapter/Reporter/Kafka.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact [email protected]
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Tracer\Adapter\Reporter;

use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Throwable;
use Zipkin\Recording\Span;
use Zipkin\Reporter;
use Zipkin\Reporters\JsonV2Serializer;
use Zipkin\Reporters\SpanSerializer;

use function count;
use function json_last_error;
use function sprintf;

class Kafka implements Reporter
{
private LoggerInterface $logger;

private SpanSerializer $serializer;

public function __construct(
private array $options,
private KafkaClientFactory $clientFactory,
LoggerInterface $logger = null,
SpanSerializer $serializer = null
) {
$this->serializer = $serializer ?? new JsonV2Serializer();
$this->logger = $logger ?? new NullLogger();
}

/**
* @param array|Span[] $spans
*/
public function report(array $spans): void
{
if (count($spans) === 0) {
return;
}

$payload = $this->serializer->serialize($spans);

if (! $payload) {
$this->logger->error(
sprintf('failed to encode spans with code %d', json_last_error())
);
return;
}

$client = $this->clientFactory->build($this->options);

try {
$client($payload);
} catch (Throwable $e) {
$this->logger->error(sprintf('failed to report spans: %s', $e->getMessage()));
}
}
}
58 changes: 58 additions & 0 deletions src/Adapter/Reporter/KafkaClientFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact [email protected]
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Tracer\Adapter\Reporter;

use longlang\phpkafka\Producer\Producer;
use longlang\phpkafka\Producer\ProducerConfig;

class KafkaClientFactory
{
private ?Producer $producer = null;

public function build(array $options): callable
{
$this->producer ??= $this->createProducer($options);

return function (string $payload) use ($options): void {
$this->producer->send(
$options['topic'] ?? 'zipkin',
$payload,
uniqid('', true)
);
};
}

private function createProducer(array $options): Producer
{
$options = array_replace([
'bootstrap_servers' => '127.0.0.1:9092',
'acks' => -1,
'connect_timeout' => 1,
'send_timeout' => 1,
], $options);
$config = new ProducerConfig();

$config->setBootstrapServer($options['bootstrap_servers']);
$config->setUpdateBrokers(true);
if (is_int($options['acks'])) {
$config->setAcks($options['acks']);
}
if (is_float($options['connect_timeout'])) {
$config->setConnectTimeout($options['connect_timeout']);
}
if (is_float($options['send_timeout'])) {
$config->setSendTimeout($options['send_timeout']);
}

return new Producer($config);
}
}
45 changes: 45 additions & 0 deletions src/Adapter/Reporter/ReporterFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact [email protected]
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Tracer\Adapter\Reporter;

use RuntimeException;
use Zipkin\Reporter;

use function Hyperf\Support\make;

class ReporterFactory
{
public function __construct(
private HttpClientFactory $httpClientFactory,
) {
}

public function make(array $option = []): Reporter
{
$class = $option['class'] ?? '';
$constructor = $option['constructor'] ?? [];

if ($class === \Zipkin\Reporters\Http::class) {
$option['constructor']['requesterFactory'] = $this->httpClientFactory;
}

if (! class_exists($class)) {
throw new RuntimeException(sprintf('Class %s is not exists.', $class));
}

if (! is_a($class, Reporter::class, true)) {
throw new RuntimeException('Unsupported reporter.');
}

return make($class, $constructor);
}
}
18 changes: 11 additions & 7 deletions src/Adapter/ZipkinTracerFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
namespace Hyperf\Tracer\Adapter;

use Hyperf\Contract\ConfigInterface;
use Hyperf\Tracer\Adapter\Reporter\ReporterFactory;
use Hyperf\Tracer\Contract\NamedFactoryInterface;
use Zipkin\Endpoint;
use Zipkin\Reporters\Http;
use Zipkin\Samplers\BinarySampler;
use Zipkin\TracingBuilder;
use ZipkinOpenTracing\Tracer;
Expand All @@ -25,16 +25,16 @@ class ZipkinTracerFactory implements NamedFactoryInterface

private string $name = '';

public function __construct(private ConfigInterface $config, private HttpClientFactory $clientFactory)
public function __construct(private ConfigInterface $config, private ReporterFactory $reportFactory)
{
}

public function make(string $name): \OpenTracing\Tracer
{
$this->name = $name;
[$app, $options, $sampler] = $this->parseConfig();
[$app, $sampler, $reporterOption] = $this->parseConfig();
$endpoint = Endpoint::create($app['name'], $app['ipv4'], $app['ipv6'], $app['port']);
$reporter = new Http($options, $this->clientFactory);
$reporter = $this->reportFactory->make($reporterOption);
$tracing = TracingBuilder::create()
->havingLocalEndpoint($endpoint)
->havingSampler($sampler)
Expand All @@ -46,17 +46,21 @@ public function make(string $name): \OpenTracing\Tracer
private function parseConfig(): array
{
// @TODO Detect the ipv4, ipv6, port from server object or system info automatically.
$reporter = (string) $this->getConfig('reporter', 'http');
return [
$this->getConfig('app', [
'name' => 'skeleton',
'ipv4' => '127.0.0.1',
'ipv6' => null,
'port' => 9501,
]),
$this->getConfig('options', [
'timeout' => 1,
]),
$this->getConfig('sampler', BinarySampler::createAsAlwaysSample()),
$this->getConfig('reporters.' . $reporter, [
'class' => \Zipkin\Reporters\Http::class,
'constructor' => [
'options' => $this->getConfig('options', []),
],
]),
];
}

Expand Down
7 changes: 5 additions & 2 deletions tests/TracerFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,14 @@ public function testJaegerFactory()
protected function getContainer($config)
{
$container = Mockery::mock(Container::class);
$client = Mockery::mock(\Hyperf\Tracer\Adapter\HttpClientFactory::class);
$client = Mockery::mock(\Hyperf\Tracer\Adapter\Reporter\HttpClientFactory::class);
$reporter = Mockery::mock(\Hyperf\Tracer\Adapter\Reporter\ReporterFactory::class);
$reporter->shouldReceive('make')
->andReturn(new \Zipkin\Reporters\Http([], $client));

$container->shouldReceive('get')
->with(\Hyperf\Tracer\Adapter\ZipkinTracerFactory::class)
->andReturn(new \Hyperf\Tracer\Adapter\ZipkinTracerFactory($config, $client));
->andReturn(new \Hyperf\Tracer\Adapter\ZipkinTracerFactory($config, $reporter));
$container->shouldReceive('get')
->with(\Hyperf\Tracer\Adapter\JaegerTracerFactory::class)
->andReturn(new \Hyperf\Tracer\Adapter\JaegerTracerFactory($config));
Expand Down

0 comments on commit b214dd3

Please sign in to comment.