Skip to content

Commit

Permalink
Merge pull request #78 from Vict0rynox/proceess-by-name-interaptor
Browse files Browse the repository at this point in the history
process by name interrupter
  • Loading branch information
victorynox authored Oct 1, 2020
2 parents e068021 + cefca15 commit 40792ad
Show file tree
Hide file tree
Showing 7 changed files with 297 additions and 8 deletions.
5 changes: 5 additions & 0 deletions config/autoload/test.global.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use rollun\callback\Callback\Interrupter\Factory\InterruptAbstractFactoryAbstract;
use rollun\callback\Callback\Interrupter\Factory\ProcessAbstractFactory;
use rollun\callback\Callback\Interrupter\Process;
use rollun\callback\Callback\Interrupter\ProcessByName;
use rollun\callback\Callback\Multiplexer;
use rollun\callback\Queues\Factory\FileAdapterAbstractFactory;
use rollun\callback\Queues\Factory\QueueClientAbstractFactory;
Expand Down Expand Up @@ -61,6 +62,10 @@
],
],
InterruptAbstractFactoryAbstract::KEY => [
'cronCallbackProcessByName' => [
ProcessAbstractFactory::KEY_CLASS => ProcessByName::class,
ProcessAbstractFactory::KEY_CALLBACK_SERVICE => 'cronCallback',
],
'testInterrupter' => [
ProcessAbstractFactory::KEY_CLASS => Process::class,
ProcessAbstractFactory::KEY_CALLBACK_SERVICE => 'testCallback',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@

abstract class InterruptAbstractFactoryAbstract implements AbstractFactoryInterface
{
const KEY = 'interrupt';
public const KEY = 'interrupt';

const KEY_CLASS = 'class';
public const KEY_CLASS = 'class';

const KEY_CALLBACK_SERVICE = 'callbackService';
public const KEY_CALLBACK_SERVICE = 'callbackService';

const DEFAULT_CLASS = InterrupterInterface::class;
public const DEFAULT_CLASS = InterrupterInterface::class;

/**
* @param ContainerInterface $container
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@
namespace rollun\callback\Callback\Interrupter\Factory;

use Interop\Container\ContainerInterface;
use rollun\callback\Callback\CallbackException;
use rollun\callback\Callback\Interrupter\Process;
use rollun\callback\Callback\SerializedCallback;
use rollun\callback\Callback\CallbackException;
use rollun\callback\ConfigProvider;

class ProcessAbstractFactory extends InterruptAbstractFactoryAbstract
{
const DEFAULT_CLASS = Process::class;
public const DEFAULT_CLASS = Process::class;

const KEY_MAX_EXECUTE_TIME = 'maxExecuteTime';
public const KEY_MAX_EXECUTE_TIME = 'maxExecuteTime';

/**
* @param ContainerInterface $container
Expand All @@ -28,7 +28,6 @@ public function __invoke(ContainerInterface $container, $requestedName, array $o
{
$factoryConfig = $options ?? $container->get('config')[static::KEY][$requestedName];


$class = $factoryConfig[static::KEY_CLASS];
$callback = $factoryConfig[static::KEY_CALLBACK_SERVICE];

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?php
/**
* @copyright Copyright © 2014 Rollun LC (http://rollun.com/)
* @license LICENSE.md New BSD License
*/

namespace rollun\callback\Callback\Interrupter\Factory;

use Interop\Container\ContainerInterface;
use rollun\callback\Callback\Interrupter\ProcessByName;
use rollun\callback\ConfigProvider;

class ProcessByNameAbstractFactory extends InterruptAbstractFactoryAbstract
{
public const DEFAULT_CLASS = ProcessByName::class;

public const KEY_MAX_EXECUTE_TIME = 'maxExecuteTime';

/**
* @param ContainerInterface $container
* @param string $requestedName
* @param array|null $options
* @return mixed|object
*/
public function __invoke(ContainerInterface $container, $requestedName, array $options = null)
{
$factoryConfig = $options ?? $container->get('config')[static::KEY][$requestedName];

$class = $factoryConfig[static::KEY_CLASS];
$callback = $factoryConfig[static::KEY_CALLBACK_SERVICE];

$maxExecuteTime = $factoryConfig[self::KEY_MAX_EXECUTE_TIME] ?? null;
$pidKiller = null;

if ($maxExecuteTime && $container->has(ConfigProvider::PID_KILLER_SERVICE)) {
$pidKiller = $container->get(ConfigProvider::PID_KILLER_SERVICE);
} else {
$maxExecuteTime = null;
}

return new $class($callback, $pidKiller, $maxExecuteTime);
}
}
169 changes: 169 additions & 0 deletions src/Callback/src/Callback/Interrupter/ProcessByName.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
<?php
/**
* @copyright Copyright © 2014 Rollun LC (http://rollun.com/)
* @license LICENSE.md New BSD License
*/

namespace rollun\callback\Callback\Interrupter;

use Jaeger\Tracer\Tracer;
use Psr\Log\LoggerInterface;
use ReflectionException;
use rollun\callback\Callback\CallbackException;
use rollun\callback\PidKiller\PidKillerInterface;
use rollun\callback\Promise\Interfaces\PayloadInterface;
use rollun\callback\Promise\SimplePayload;
use rollun\dic\InsideConstruct;
use rollun\logger\LifeCycleToken;
use rollun\utils\Json\Serializer;

/**
* Class Process
* @package rollun\callback\Callback\Interrupter
*/
class ProcessByName implements InterrupterInterface
{
const STDOUT_KEY = 'stdout';
const STDERR_KEY = 'stderr';
const PID_KEY = 'pid';

const SCRIPT_PATH = '/Script/service.php';

/**
* @var LifecycleToken
*/
protected $lifecycleToken;

/** @var integer */
protected $maxExecuteTime;

/** @var PidKillerInterface */
protected $pidKiller;

/** @var Tracer */
protected $tracer;

/**
* @var LoggerInterface
*/
protected $logger;
/**
* @var string
*/
private $callableServiceName;

/**
* Process constructor.
* @param string $callableServiceName
* @param PidKillerInterface|null $pidKiller
* @param int|null $maxExecuteTime
* @param LoggerInterface|null $logger
* @param LifeCycleToken|null $lifecycleToken
* @param Tracer|null $tracer
* @throws ReflectionException
*/
public function __construct(
string $callableServiceName,
$pidKiller = null,
int $maxExecuteTime = null,
LoggerInterface $logger = null,
LifeCycleToken $lifecycleToken = null,
Tracer $tracer = null
) {
InsideConstruct::setConstructParams([
"lifecycleToken" => LifeCycleToken::class,
'tracer' => Tracer::class,
'logger' => LoggerInterface::class
]);
$this->pidKiller = $pidKiller;
$this->maxExecuteTime = $maxExecuteTime;
$this->callableServiceName = $callableServiceName;
}

public function __sleep()
{
return [
'callback',
'pidKiller',
'maxExecuteTime',
'callableServiceName',
];
}


public function __wakeup()
{
InsideConstruct::initWakeup([
"lifecycleToken" => LifeCycleToken::class,
'tracer' => Tracer::class,
'logger' => LoggerInterface::class
]);
}

/**
* @param $value
* @return PayloadInterface
* @throws ReflectionException
*/
public function __invoke($value = null): PayloadInterface
{
$span = $this->tracer->start('Process::__invoke');

$context = $span->getContext();
$traserContext = base64_encode(Serializer::jsonSerialize($span->getContext()));


$cmd = "php {$this->getScriptName()} {$this->callableServiceName}"
. " lifecycleToken:{$this->lifecycleToken->serialize()}"
. " tracerContext:$traserContext"
. ' APP_ENV=' . getenv('APP_ENV');

$outStream = getenv('OUTPUT_STREAM');
if ($outStream) {
$payload[self::STDOUT_KEY] = $outStream;
$payload[self::STDERR_KEY] = $outStream;
} else {
$payload[self::STDOUT_KEY] = '/dev/null';
$payload[self::STDERR_KEY] = '/dev/null';

}
$payload['interrupter_type'] = self::class;

$cmd .= " 1>{$payload[self::STDOUT_KEY]} 2>{$payload[self::STDERR_KEY]}";

if (substr(php_uname(), 0, 7) !== "Windows") {
$cmd .= " & echo $!";
}

//fix not found context problem
$this->tracer->flush();
$pid = trim(shell_exec($cmd));

if ($this->maxExecuteTime && $this->pidKiller) {
$record = [
'name' => $this->callableServiceName,
'delaySeconds' => $this->maxExecuteTime,
'pid' => $pid,
];
$this->pidKiller->create($record);
}

$payload = new SimplePayload($pid, $payload);
$this->tracer->finish($span);
return $payload;
}

/**
* @return string
*/
protected function getScriptName(): string
{
$scriptPath = __DIR__ . self::SCRIPT_PATH;

if (!is_file($scriptPath)) {
throw new CallbackException(sprintf("File '%s' not found", realpath($scriptPath)));
}

return $scriptPath;
}
}
71 changes: 71 additions & 0 deletions src/Callback/src/Callback/Interrupter/Script/service.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
<?php

$path = getcwd();
if (!is_file($path . '/vendor/autoload.php')) {
$path = dirname(getcwd());
}
chdir($path);

require 'vendor/autoload.php';

use Jaeger\Span\Context\SpanContext;
use Jaeger\Tag\StringTag;
use Jaeger\Tracer\Tracer;
use Psr\Log\LoggerInterface;
use rollun\dic\InsideConstruct;
use rollun\callback\Callback\CallbackException;
use rollun\callback\Callback\Interrupter\Job;
use rollun\logger\LifeCycleToken;
use rollun\logger\Processor\ExceptionBacktrace;

/** @var Zend\ServiceManager\ServiceManager $container */
$container = include 'config/container.php';
InsideConstruct::setContainer($container);
$lifeCycleToke = LifeCycleToken::generateToken();

$callableServiceName = null;
$parentLifecycleToken = null;
$spanContext = null;

//Get argc
foreach ($argv as $i => $value) {
if ($i === 1) {
$callableServiceName = $value;
} elseif (strstr($value, 'lifecycleToken') !== false) {
[1 => $parentLifecycleToken] = explode(':', $value, 2);
} elseif (strstr($value, 'tracerContext') !== false) {
[1 => $tracerJsonContext] = explode(':', $value, 2);
$spanContext = \rollun\utils\Json\Serializer::jsonUnserialize(base64_decode($tracerJsonContext));
}
}

if ($parentLifecycleToken) {
$lifeCycleToke->unserialize($parentLifecycleToken);
}
$container->setService(LifeCycleToken::class, $lifeCycleToke);

/** @var Tracer $tracer */
$tracer = $container->get(Tracer::class);

$logger = $container->get(LoggerInterface::class);

try {
$span = $tracer->start('process.php', [], $spanContext);
if ($callableServiceName === null) {
throw new CallbackException('There is not callable service name');
}
$callable = $container->get($callableServiceName);

$logger->info("Interrupter 'Process' start.");
//$logger->debug("Serialized job: $paramsString");
call_user_func($callable, null);
$logger->info("Interrupter 'Process' finish.");
$tracer->finish($span);
} catch (\Throwable $e) {
$span->addTag(new StringTag('exception', json_encode((new ExceptionBacktrace())->getExceptionBacktrace($e))));
$logger->error('When execute process, catch error', [
'exception' => $e
]);
} finally {
$tracer->flush();
}
2 changes: 2 additions & 0 deletions src/Callback/src/ConfigProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use rollun\callback\Callback\Interrupter\Factory\HttpAbstractFactory;
use rollun\callback\Callback\Factory\HttpClientAbstractFactory;
use rollun\callback\Callback\Interrupter\Factory\ProcessAbstractFactory;
use rollun\callback\Callback\Interrupter\Factory\ProcessByNameAbstractFactory;
use rollun\callback\Callback\Interrupter\Factory\QueueJobFillerAbstractFactory;
use rollun\callback\Callback\Interrupter\Factory\QueueMessageFillerAbstractFactory;
use rollun\callback\Callback\Ping;
Expand Down Expand Up @@ -66,6 +67,7 @@ public function __invoke()
HttpAbstractFactory::class,
HttpClientAbstractFactory::class,
ProcessAbstractFactory::class,
ProcessByNameAbstractFactory::class,
QueueJobFillerAbstractFactory::class,
QueueMessageFillerAbstractFactory::class,

Expand Down

0 comments on commit 40792ad

Please sign in to comment.