Skip to content

Commit

Permalink
add grpc request and response to entry (#507)
Browse files Browse the repository at this point in the history
* add grpc request and response to entry

* Formatting code

* add request and response   when using middleware

* Formatting code

* rename response method to getResponsePayload

* add grpc request and response to entry

* Formatting code

* add request and response   when using middleware

* Formatting code

* rename response method to getResponsePayload

* Formatting code

* Optimize code

* Optimize code

* Optimize code

* Optimize code

* Optimize

* Fix return statement in TelescopeMiddleware

* Refactor TelescopeMiddleware to return payload instead of an empty string

* Refactor TelescopeMiddleware to simplify getRequestPayload method

* Refactor getRequestPayload method in RequestHandledListener

* Refactor GrpcCoreMiddlewareAspect to handle request and response payloads

---------

Co-authored-by: guandeng <[email protected]>
Co-authored-by: Deeka Wong <[email protected]>
  • Loading branch information
3 people committed Jan 3, 2024
1 parent 491f8b4 commit fca1658
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 38 deletions.
85 changes: 85 additions & 0 deletions src/Aspect/GrpcCoreMiddlewareAspect.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
<?php

declare(strict_types=1);
/**
* This file is part of friendsofhyperf/components.
*
* @link https://github.com/friendsofhyperf/components
* @document https://github.com/friendsofhyperf/components/blob/main/README.md
* @contact [email protected]
*/

namespace FriendsOfHyperf\Telescope\Aspect;

use FriendsOfHyperf\Telescope\TelescopeConfig;
use FriendsOfHyperf\Telescope\TelescopeContext;
use Google\Protobuf\Internal\Message;
use Hyperf\Di\Aop\AbstractAspect;
use Hyperf\Di\Aop\ProceedingJoinPoint;
use Hyperf\GrpcServer\CoreMiddleware;
use Throwable;

use function Hyperf\Tappable\tap;

class GrpcCoreMiddlewareAspect extends AbstractAspect
{
public array $classes = [
CoreMiddleware::class . '::parseMethodParameters',
CoreMiddleware::class . '::handleResponse',
];

public function __construct(protected TelescopeConfig $telescopeConfig)
{
}

public function process(ProceedingJoinPoint $proceedingJoinPoint)
{
return tap($proceedingJoinPoint->process(), function ($result) use ($proceedingJoinPoint) {
if (! $this->telescopeConfig->isEnable('grpc')) {
return;
}

match ($proceedingJoinPoint->methodName) {
'parseMethodParameters' => $this->setRequestPayload($result[0] ?? null),
'handleResponse' => $this->setResponsePayload($proceedingJoinPoint->arguments['keys']['message'] ?? null),
default => null,
};
});
}

/**
* @param Message|null $message
*/
protected function setRequestPayload($message)
{
if (! $message instanceof Message) {
return;
}

try {
$payload = json_decode($message->serializeToJsonString(), true);
} catch (Throwable $e) {
return;
}

TelescopeContext::setGrpcRequestPayload($payload);
}

/**
* @param Message|null $message
*/
protected function setResponsePayload($message)
{
if (! $message instanceof Message) {
return;
}

try {
$payload = json_decode($message->serializeToJsonString(), true);
} catch (Throwable $e) {
return;
}

TelescopeContext::setGrpcResponsePayload($payload);
}
}
1 change: 1 addition & 0 deletions src/ConfigProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public function __invoke(): array
Aspect\RedisAspect::class,
Aspect\RpcAspect::class,
Aspect\RequestDispatcherAspect::class,
Aspect\GrpcCoreMiddlewareAspect::class,
],
'annotations' => [
'scan' => [
Expand Down
59 changes: 40 additions & 19 deletions src/Listener/RequestHandledListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -94,35 +94,22 @@ public function requestHandled(RequestTerminated|RpcRequestTerminated $event)
if ($this->incomingRequest($psr7Request)) {
/** @var Dispatched $dispatched */
$dispatched = $psr7Request->getAttribute(Dispatched::class);
$serverName = $dispatched->serverName ?? 'http';

$entry = IncomingEntry::make([
'ip_address' => $psr7Request->getServerParams()['remote_addr'] ?? 'unknown',
'uri' => $psr7Request->getRequestTarget(),
'method' => $psr7Request->getMethod(),
'controller_action' => $dispatched->handler ? $dispatched->handler->callback : '',
'middleware' => TelescopeContext::getMiddlewares(),
'headers' => $psr7Request->getHeaders(),
'payload' => $psr7Request->getParsedBody(),
'payload' => $this->getRequestPayload($psr7Request),
'session' => '',
'response_status' => $psr7Response->getStatusCode(),
'response' => $this->response($psr7Response),
'response' => $this->getResponsePayload($psr7Response),
'duration' => $startTime ? floor((microtime(true) - $startTime) * 1000) : null,
'memory' => round(memory_get_peak_usage(true) / 1024 / 1025, 1),
]);

$serverConfig = collect(config('server.servers'))->firstWhere('name', $serverName);
$handlerClass = $serverConfig['callbacks'][Event::ON_RECEIVE][0] ?? $serverConfig['callbacks'][Event::ON_REQUEST][0] ?? null;
$handler = is_string($handlerClass) && $this->container->has($handlerClass) ? $this->container->get($handlerClass) : null;

if (
$handler
&& (
is_a($handler, \Hyperf\RpcServer\Server::class, true)
|| is_a($handler, \Hyperf\JsonRpc\HttpServer::class, true)
|| is_a($handler, \Hyperf\GrpcServer\Server::class, true)
)
) {
if ($this->isRpcRequest($psr7Request)) {
Telescope::recordService($entry);
} else {
Telescope::recordRequest($entry);
Expand All @@ -139,7 +126,7 @@ protected function incomingRequest(ServerRequestInterface $psr7Request): bool
return ! $this->telescopeConfig->isPathIgnored($psr7Request);
}

protected function response(ResponseInterface $response): string|array
protected function getResponsePayload(ResponseInterface $response): array|string
{
$stream = $response->getBody();

Expand All @@ -165,8 +152,7 @@ protected function response(ResponseInterface $response): string|array
return $this->contentWithinLimits($content) ? $content : 'Purged By Hyperf Telescope'; /* @phpstan-ignore-line */
}
if (Str::contains($response->getHeaderLine('content-type'), 'application/grpc') !== false) {
// to do for grpc
return 'Purged By Hyperf Telescope';
return TelescopeContext::getGrpcResponsePayload() ?: 'Purged By Hyperf Telescope';
}
}

Expand Down Expand Up @@ -210,4 +196,39 @@ protected function getRpcContext(): array

return $this->container->get(RpcContext::class)->get('telescope.carrier', []);
}

protected function isRpcRequest(ServerRequestInterface $psr7Request): bool
{
$handler = $this->parseHandler($psr7Request);
if (
$handler
&& (
is_a($handler, \Hyperf\RpcServer\Server::class, true)
|| is_a($handler, \Hyperf\JsonRpc\HttpServer::class, true)
|| is_a($handler, \Hyperf\GrpcServer\Server::class, true)
)
) {
return true;
}

return false;
}

protected function parseHandler(ServerRequestInterface $psr7Request): mixed
{
$dispatched = $psr7Request->getAttribute(Dispatched::class);
$serverName = $dispatched->serverName ?? 'http';
$serverConfig = collect(config('server.servers'))->firstWhere('name', $serverName);
$handlerClass = $serverConfig['callbacks'][Event::ON_RECEIVE][0] ?? $serverConfig['callbacks'][Event::ON_REQUEST][0] ?? null;
return is_string($handlerClass) && $this->container->has($handlerClass) ? $this->container->get($handlerClass) : null;
}

protected function getRequestPayload(ServerRequestInterface $psr7Request): array|string
{
$handler = $this->parseHandler($psr7Request);
if ($handler && is_a($handler, \Hyperf\GrpcServer\Server::class, true)) {
return TelescopeContext::getGrpcRequestPayload() ?: '';
}
return $psr7Request->getParsedBody();
}
}
61 changes: 42 additions & 19 deletions src/Middleware/TelescopeMiddleware.php
Original file line number Diff line number Diff line change
Expand Up @@ -78,35 +78,22 @@ public function requestHandled($request, $response)
if ($this->incomingRequest($psr7Request)) {
/** @var Dispatched $dispatched */
$dispatched = $psr7Request->getAttribute(Dispatched::class);
$serverName = $dispatched->serverName ?? 'http';

$entry = IncomingEntry::make([
'ip_address' => $psr7Request->getServerParams()['remote_addr'],
'uri' => $psr7Request->getRequestTarget(),
'method' => $psr7Request->getMethod(),
'controller_action' => $dispatched->handler ? $dispatched->handler->callback : '',
'middleware' => TelescopeContext::getMiddlewares(),
'headers' => $psr7Request->getHeaders(),
'payload' => $psr7Request->getParsedBody(),
'payload' => $this->getRequestPayload($psr7Request),
'session' => '',
'response_status' => $psr7Response->getStatusCode(),
'response' => $this->response($psr7Response),
'response' => $this->getResponsePayload($psr7Response),
'duration' => $startTime ? floor((microtime(true) - $startTime) * 1000) : null,
'memory' => round(memory_get_peak_usage(true) / 1024 / 1025, 1),
]);

$serverConfig = collect(config('server.servers'))->firstWhere('name', $serverName);
$handlerClass = $serverConfig['callbacks'][Event::ON_RECEIVE][0] ?? $serverConfig['callbacks'][Event::ON_REQUEST][0] ?? null;
$handler = is_string($handlerClass) && $this->container->has($handlerClass) ? $this->container->get($handlerClass) : null;

if (
$handler
&& (
is_a($handler, \Hyperf\RpcServer\Server::class, true)
|| is_a($handler, \Hyperf\JsonRpc\HttpServer::class, true)
|| is_a($handler, \Hyperf\GrpcServer\Server::class, true)
)
) {
if ($this->isRpcRequest($psr7Request)) {
Telescope::recordService($entry);
} else {
Telescope::recordRequest($entry);
Expand All @@ -123,7 +110,7 @@ protected function incomingRequest(ServerRequestInterface $psr7Request): bool
return ! $this->telescopeConfig->isPathIgnored($psr7Request);
}

protected function response(ResponseInterface $response): string|array
protected function getResponsePayload(ResponseInterface $response): string|array
{
$stream = $response->getBody();
if ($stream->isSeekable()) {
Expand All @@ -146,8 +133,7 @@ protected function response(ResponseInterface $response): string|array
return $this->contentWithinLimits($content) ? $content : 'Purged By Hyperf Telescope'; /* @phpstan-ignore-line */
}
if (Str::contains($response->getHeaderLine('content-type'), 'application/grpc') !== false) {
// to do for grpc
return 'Purged By Hyperf Telescope';
return TelescopeContext::getGrpcResponsePayload() ?: 'Purged By Hyperf Telescope';
}
}

Expand Down Expand Up @@ -191,4 +177,41 @@ protected function getRpcContext(): array

return $this->container->get(RpcContext::class)->get('telescope.carrier', []);
}

protected function isRpcRequest(ServerRequestInterface $psr7Request): bool
{
$handler = $this->parseHandler($psr7Request);
if (
$handler
&& (
is_a($handler, \Hyperf\RpcServer\Server::class, true)
|| is_a($handler, \Hyperf\JsonRpc\HttpServer::class, true)
|| is_a($handler, \Hyperf\GrpcServer\Server::class, true)
)
) {
return true;
}

return false;
}

protected function parseHandler(ServerRequestInterface $psr7Request): mixed
{
$dispatched = $psr7Request->getAttribute(Dispatched::class);
$serverName = $dispatched->serverName ?? 'http';
$serverConfig = collect(config('server.servers'))->firstWhere('name', $serverName);
$handlerClass = $serverConfig['callbacks'][Event::ON_RECEIVE][0] ?? $serverConfig['callbacks'][Event::ON_REQUEST][0] ?? null;
return is_string($handlerClass) && $this->container->has($handlerClass) ? $this->container->get($handlerClass) : null;
}

protected function getRequestPayload(ServerRequestInterface $psr7Request): array|string
{
$handler = $this->parseHandler($psr7Request);

if ($handler && is_a($handler, \Hyperf\GrpcServer\Server::class, true)) {
return TelescopeContext::getGrpcRequestPayload() ?: '';
}

return $psr7Request->getParsedBody();
}
}
24 changes: 24 additions & 0 deletions src/TelescopeContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ class TelescopeContext

public const MIDDLEWARES = 'telescope.context.middlewares';

public const GRPC_REQUEST_PAYLOAD = 'telescope.context.grpc.request.payload';

public const GRPC_RESPONSE_PAYLOAD = 'telescope.context.grpc.response.payload';

public static function setBatchId(string $batchId): void
{
Context::set(self::BATCH_ID, $batchId);
Expand Down Expand Up @@ -109,4 +113,24 @@ public static function addEntry(IncomingEntry $entry): void

Context::set(self::ENTRIES, $entries);
}

public static function setGrpcRequestPayload(array $payload): void
{
Context::set(self::GRPC_REQUEST_PAYLOAD, $payload);
}

public static function getGrpcRequestPayload(): ?array
{
return Context::get(self::GRPC_REQUEST_PAYLOAD) ?: null;
}

public static function setGrpcResponsePayload(array $payload): void
{
Context::set(self::GRPC_RESPONSE_PAYLOAD, $payload);
}

public static function getGrpcResponsePayload(): ?array
{
return Context::get(self::GRPC_RESPONSE_PAYLOAD) ?: null;
}
}

0 comments on commit fca1658

Please sign in to comment.