Skip to content

Commit

Permalink
v3.1 init
Browse files Browse the repository at this point in the history
Co-Authored-By: Deeka Wong <[email protected]>
  • Loading branch information
huangdijia and huangdijia committed Jun 12, 2023
1 parent 131d391 commit cc3af88
Show file tree
Hide file tree
Showing 25 changed files with 320 additions and 147 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
max-parallel: 15
matrix:
os: [ ubuntu-latest ]
php: [ "8.0", "8.1", "8.2" ]
php: [ "8.1", "8.2" ]

name: PHP ${{ matrix.php }} Test on ${{ matrix.os }}

Expand Down
3 changes: 2 additions & 1 deletion .vscode/cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"friendsofhyperf",
"friendsofphp",
"krowinski",
"moln",
"nums"
]
}
}
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use FriendsOfHyperf\Trigger\Annotation\Trigger;
use FriendsOfHyperf\Trigger\Trigger\AbstractTrigger;
use MySQLReplication\Event\DTO\EventDTO;

#[Trigger(table:"table", on:"*", pool:"default")]
#[Trigger(table:"table", on:"*", connection:"default")]
class FooTrigger extends AbstractTrigger
{
public function onWrite(array $new)
Expand Down Expand Up @@ -58,7 +58,7 @@ use FriendsOfHyperf\Trigger\Annotation\Subscriber;
use FriendsOfHyperf\Trigger\Subscriber\AbstractEventSubscriber;
use MySQLReplication\Event\DTO\EventDTO;

#[Subscriber(pool:"default")]
#[Subscriber(connection:"default")]
class BarSubscriber extends AbstractEventSubscriber
{
protected function allEvents(EventDTO $event): void
Expand Down
28 changes: 19 additions & 9 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,26 @@
"description": "The MySQL Trigger component for Hyperf.",
"type": "library",
"require": {
"hyperf/command": "~3.0.0",
"hyperf/coordinator": "~3.0.0",
"hyperf/di": "~3.0.0",
"hyperf/event": "~3.0.0",
"hyperf/process": "~3.0.0",
"hyperf/utils": "~3.0.0",
"php": ">=8.1.0",
"friendsofhyperf/command-signals": "~3.1.0",
"hyperf/collection": "~3.1.0",
"hyperf/command": "~3.1.0",
"hyperf/conditionable": "~3.1.0",
"hyperf/context": "~3.1.0",
"hyperf/coordinator": "~3.1.0",
"hyperf/coroutine": "~3.1.0",
"hyperf/di": "~3.1.0",
"hyperf/event": "~3.1.0",
"hyperf/process": "~3.1.0",
"hyperf/stringable": "~3.1.0",
"hyperf/tappable": "~3.1.0",
"moln/php-mysql-replication": "^1.2"
},
"require-dev": {
"friendsofphp/php-cs-fixer": "^3.0",
"hyperf/framework": "~3.0.0",
"hyperf/redis": "~3.0.0",
"hyperf/signal": "~3.0.0",
"hyperf/framework": "~3.1.0",
"hyperf/redis": "~3.1.0",
"hyperf/signal": "~3.1.0",
"phpstan/phpstan": "^1.0",
"swoole/ide-helper": "dev-master"
},
Expand All @@ -27,6 +34,9 @@
"extra": {
"hyperf": {
"config": "FriendsOfHyperf\\Trigger\\ConfigProvider"
},
"branch-alias": {
"dev-main": "3.1-dev"
}
},
"suggest": {
Expand Down
8 changes: 7 additions & 1 deletion publish/trigger.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@
* @document https://github.com/friendsofhyperf/trigger/blob/main/README.md
* @contact [email protected]
*/
use function Hyperf\Support\env;

return [
'pools' => [
'connections' => [
'default' => [
'enable' => env('TRIGGER_ENABLE', true),

'host' => env('TRIGGER_HOST', ''),
'port' => (int) env('TRIGGER_PORT', 3306),
'user' => env('TRIGGER_USER', ''),
Expand Down Expand Up @@ -40,6 +44,8 @@
'concurrent' => [
'limit' => 1000,
],

'consume_timeout' => 600,
],
],
];
2 changes: 1 addition & 1 deletion src/Annotation/Subscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#[Attribute(Attribute::TARGET_CLASS)]
class Subscriber extends AbstractAnnotation
{
public function __construct(public string $pool = 'default', public int $priority = 0)
public function __construct(public string $connection = 'default', public int $priority = 0)
{
}
}
2 changes: 1 addition & 1 deletion src/Annotation/Trigger.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public function __construct(
public ?string $database = null,
public ?string $table = null,
public array $events = ['*'],
public string $pool = 'default',
public string $connection = 'default',
public int $priority = 0
) {
}
Expand Down
34 changes: 34 additions & 0 deletions src/Aspect/BinaryDataReaderAspect.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

declare(strict_types=1);
/**
* This file is part of hyperf-trigger.
*
* @link https://github.com/friendsofhyperf/trigger
* @document https://github.com/friendsofhyperf/trigger/blob/main/README.md
* @contact [email protected]
*/
namespace FriendsOfHyperf\Trigger\Aspect;

use Hyperf\Di\Aop\AbstractAspect;
use Hyperf\Di\Aop\ProceedingJoinPoint;
use MySQLReplication\BinaryDataReader\BinaryDataReader;

/**
* @mixin BinaryDataReader
*/
class BinaryDataReaderAspect extends AbstractAspect
{
public array $classes = [
BinaryDataReader::class . '::readUIntBySize',
];

public function process(ProceedingJoinPoint $proceedingJoinPoint)
{
if ($proceedingJoinPoint->arguments['keys']['size'] == BinaryDataReader::UNSIGNED_INT64_LENGTH) {
return (fn () => (int) $this->readUInt64())->call($proceedingJoinPoint->getInstance());
}

return $proceedingJoinPoint->process();
}
}
85 changes: 85 additions & 0 deletions src/Command/ConsumeCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
<?php

declare(strict_types=1);
/**
* This file is part of hyperf-trigger.
*
* @link https://github.com/friendsofhyperf/trigger
* @document https://github.com/friendsofhyperf/trigger/blob/main/README.md
* @contact [email protected]
*/
namespace FriendsOfHyperf\Trigger\Command;

use FriendsOfHyperf\CommandSignals\Traits\InteractsWithSignals;
use FriendsOfHyperf\Trigger\Consumer;
use Hyperf\Command\Annotation\Command;
use Hyperf\Command\Command as HyperfCommand;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Coordinator\CoordinatorManager;
use Hyperf\Coroutine\Coroutine;
use Psr\Container\ContainerInterface;

use function Hyperf\Support\make;

#[Command()]
class ConsumeCommand extends HyperfCommand
{
use InteractsWithSignals;

protected ?string $signature = 'trigger:consume';

protected string $description = 'Run consumers.';

public function __construct(
protected ContainerInterface $container,
protected ConfigInterface $config
) {
}

public function handle()
{
$consumers = $this->getConsumers();

$this->trap([SIGINT, SIGTERM], function ($signo) use ($consumers) {
$this->warn(sprintf('Received signal %d, exiting...', $signo));

foreach ($consumers as $consumer) {
$consumer->stop();
}

CoordinatorManager::until(self::class)->resume();
});

foreach ($consumers as $consumer) {
Coroutine::create(function () use ($consumer) {
$consumer->start();
});
}

CoordinatorManager::until(self::class)->yield();

$this->info('Bye!');
}

/**
* @return Consumer[]
*/
public function getConsumers(): array
{
$consumers = [];
$connections = $this->config->get('trigger.connections', []);

foreach ($connections as $connection => $options) {
if (isset($options['enable']) && ! $options['enable']) {
continue;
}

$consumers[] = make(Consumer::class, [
'connection' => $connection,
'options' => (array) $options,
]);
}

return $consumers;
}
}
12 changes: 7 additions & 5 deletions src/Command/SubscribersCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
use Hyperf\Di\Annotation\AnnotationCollector;
use Psr\Container\ContainerInterface;

use function Hyperf\Collection\collect;

class SubscribersCommand extends HyperfCommand
{
protected ?string $signature = 'describe:subscribers {--P|poll= : Pool}';
protected ?string $signature = 'describe:subscribers {--C|connection= : connection}';

protected string $description = 'List all subscribers.';

Expand All @@ -33,18 +35,18 @@ public function handle()
$subscribers = AnnotationCollector::getClassesByAnnotation(Subscriber::class);
$rows = collect($subscribers)
->filter(function ($property, $class) {
if ($this->input->getOption('pool')) {
return $this->input->getOption('pool') == $property->pool;
if ($connection = $this->input->getOption('connection')) {
return $connection == $property->connection;
}
return true;
})
->transform(fn ($property, $class) => [$property->pool, $class, $property->priority])
->transform(fn ($property, $class) => [$property->connection, $class, $property->priority])
->merge([
['[default]', SnapshotSubscriber::class, 1],
['[default]', TriggerSubscriber::class, 1],
]);

$this->info('Subscribers:');
$this->table(['Pool', 'Subscriber', 'Priority'], $rows);
$this->table(['Connection', 'Subscriber', 'Priority'], $rows);
}
}
13 changes: 8 additions & 5 deletions src/Command/TriggersCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
use Hyperf\Di\Annotation\AnnotationCollector;
use Psr\Container\ContainerInterface;

use function Hyperf\Collection\collect;
use function Hyperf\Support\class_basename;

class TriggersCommand extends HyperfCommand
{
protected ?string $signature = 'describe:triggers {--P|pool= : Pool} {--T|table= : Table}';
protected ?string $signature = 'describe:triggers {--C|connection= : connection} {--T|table= : Table}';

protected string $description = 'List all triggers.';

Expand All @@ -37,8 +40,8 @@ public function handle()
})
->filter(function ($property, $class) {
/* @var Trigger $property */
if ($this->input->getOption('pool')) {
return $this->input->getOption('pool') == $property->pool;
if ($this->input->getOption('connection')) {
return $this->input->getOption('connection') == $property->connection;
}
return true;
})
Expand All @@ -49,9 +52,9 @@ public function handle()
}
return true;
})
->transform(fn ($property, $class) => [$property->pool, $property->database, $property->table, implode(',', $property->events), $class, $property->priority]);
->transform(fn ($property, $class) => [$property->connection, $property->database, $property->table, implode(',', $property->events), $class, $property->priority]);

$this->info('Triggers:');
$this->table(['Pool', 'Database', 'Table', 'Events', 'Trigger', 'Priority'], $rows);
$this->table(['connection', 'Database', 'Table', 'Events', 'Trigger', 'Priority'], $rows);
}
}
22 changes: 9 additions & 13 deletions src/ConfigProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,27 @@
*/
namespace FriendsOfHyperf\Trigger;

use FriendsOfHyperf\Trigger\Command\SubscribersCommand;
use FriendsOfHyperf\Trigger\Command\TriggersCommand;
use FriendsOfHyperf\Trigger\Listener\OnBootApplicationListener;
use FriendsOfHyperf\Trigger\Mutex\RedisServerMutex;
use FriendsOfHyperf\Trigger\Mutex\ServerMutexInterface;
use FriendsOfHyperf\Trigger\Snapshot\BinLogCurrentSnapshotInterface;
use FriendsOfHyperf\Trigger\Snapshot\RedisBinLogCurrentSnapshot;

class ConfigProvider
{
public function __invoke(): array
{
defined('BASE_PATH') or define('BASE_PATH', '');

return [
'aspects' => [
Aspect\BinaryDataReaderAspect::class, // Fix MySQLReplication bug
],
'dependencies' => [
ServerMutexInterface::class => RedisServerMutex::class,
BinLogCurrentSnapshotInterface::class => RedisBinLogCurrentSnapshot::class,
Mutex\ServerMutexInterface::class => Mutex\RedisServerMutex::class,
Snapshot\BinLogCurrentSnapshotInterface::class => Snapshot\RedisBinLogCurrentSnapshot::class,
],
'commands' => [
SubscribersCommand::class,
TriggersCommand::class,
Command\ConsumeCommand::class,
Command\SubscribersCommand::class,
Command\TriggersCommand::class,
],
'listeners' => [
OnBootApplicationListener::class,
Listener\OnBootApplicationListener::class,
],
'publish' => [
[
Expand Down
Loading

0 comments on commit cc3af88

Please sign in to comment.