diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index d46249e..8fe5eef 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -13,7 +13,7 @@ jobs: max-parallel: 15 matrix: os: [ ubuntu-latest ] - php: [ "8.0", "8.1", "8.2" ] + php: [ "8.1", "8.2" ] name: PHP ${{ matrix.php }} Test on ${{ matrix.os }} diff --git a/.vscode/cspell.json b/.vscode/cspell.json index 5283ea7..abdaaf6 100644 --- a/.vscode/cspell.json +++ b/.vscode/cspell.json @@ -4,6 +4,7 @@ "friendsofhyperf", "friendsofphp", "krowinski", + "moln", "nums" ] -} \ No newline at end of file +} diff --git a/README.md b/README.md index a914f52..28c369a 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ use FriendsOfHyperf\Trigger\Annotation\Trigger; use FriendsOfHyperf\Trigger\Trigger\AbstractTrigger; use MySQLReplication\Event\DTO\EventDTO; -#[Trigger(table:"table", on:"*", pool:"default")] +#[Trigger(table:"table", on:"*", connection:"default")] class FooTrigger extends AbstractTrigger { public function onWrite(array $new) @@ -58,7 +58,7 @@ use FriendsOfHyperf\Trigger\Annotation\Subscriber; use FriendsOfHyperf\Trigger\Subscriber\AbstractEventSubscriber; use MySQLReplication\Event\DTO\EventDTO; -#[Subscriber(pool:"default")] +#[Subscriber(connection:"default")] class BarSubscriber extends AbstractEventSubscriber { protected function allEvents(EventDTO $event): void diff --git a/composer.json b/composer.json index 8fa21cf..b2ae49b 100644 --- a/composer.json +++ b/composer.json @@ -3,19 +3,26 @@ "description": "The MySQL Trigger component for Hyperf.", "type": "library", "require": { - "hyperf/command": "~3.0.0", - "hyperf/coordinator": "~3.0.0", - "hyperf/di": "~3.0.0", - "hyperf/event": "~3.0.0", - "hyperf/process": "~3.0.0", - "hyperf/utils": "~3.0.0", + "php": ">=8.1.0", + "friendsofhyperf/command-signals": "~3.1.0", + "hyperf/collection": "~3.1.0", + "hyperf/command": "~3.1.0", + "hyperf/conditionable": "~3.1.0", + "hyperf/context": "~3.1.0", + "hyperf/coordinator": "~3.1.0", + "hyperf/coroutine": "~3.1.0", + "hyperf/di": "~3.1.0", + "hyperf/event": "~3.1.0", + "hyperf/process": "~3.1.0", + "hyperf/stringable": "~3.1.0", + "hyperf/tappable": "~3.1.0", "moln/php-mysql-replication": "^1.2" }, "require-dev": { "friendsofphp/php-cs-fixer": "^3.0", - "hyperf/framework": "~3.0.0", - "hyperf/redis": "~3.0.0", - "hyperf/signal": "~3.0.0", + "hyperf/framework": "~3.1.0", + "hyperf/redis": "~3.1.0", + "hyperf/signal": "~3.1.0", "phpstan/phpstan": "^1.0", "swoole/ide-helper": "dev-master" }, @@ -27,6 +34,9 @@ "extra": { "hyperf": { "config": "FriendsOfHyperf\\Trigger\\ConfigProvider" + }, + "branch-alias": { + "dev-main": "3.1-dev" } }, "suggest": { diff --git a/publish/trigger.php b/publish/trigger.php index cd3e941..8834d10 100644 --- a/publish/trigger.php +++ b/publish/trigger.php @@ -8,9 +8,13 @@ * @document https://github.com/friendsofhyperf/trigger/blob/main/README.md * @contact huangdijia@gmail.com */ +use function Hyperf\Support\env; + return [ - 'pools' => [ + 'connections' => [ 'default' => [ + 'enable' => env('TRIGGER_ENABLE', true), + 'host' => env('TRIGGER_HOST', ''), 'port' => (int) env('TRIGGER_PORT', 3306), 'user' => env('TRIGGER_USER', ''), @@ -40,6 +44,8 @@ 'concurrent' => [ 'limit' => 1000, ], + + 'consume_timeout' => 600, ], ], ]; diff --git a/src/Annotation/Subscriber.php b/src/Annotation/Subscriber.php index d2c51ba..662c942 100644 --- a/src/Annotation/Subscriber.php +++ b/src/Annotation/Subscriber.php @@ -16,7 +16,7 @@ #[Attribute(Attribute::TARGET_CLASS)] class Subscriber extends AbstractAnnotation { - public function __construct(public string $pool = 'default', public int $priority = 0) + public function __construct(public string $connection = 'default', public int $priority = 0) { } } diff --git a/src/Annotation/Trigger.php b/src/Annotation/Trigger.php index cb02b71..6bd40c6 100644 --- a/src/Annotation/Trigger.php +++ b/src/Annotation/Trigger.php @@ -20,7 +20,7 @@ public function __construct( public ?string $database = null, public ?string $table = null, public array $events = ['*'], - public string $pool = 'default', + public string $connection = 'default', public int $priority = 0 ) { } diff --git a/src/Aspect/BinaryDataReaderAspect.php b/src/Aspect/BinaryDataReaderAspect.php new file mode 100644 index 0000000..a2cccc3 --- /dev/null +++ b/src/Aspect/BinaryDataReaderAspect.php @@ -0,0 +1,34 @@ +arguments['keys']['size'] == BinaryDataReader::UNSIGNED_INT64_LENGTH) { + return (fn () => (int) $this->readUInt64())->call($proceedingJoinPoint->getInstance()); + } + + return $proceedingJoinPoint->process(); + } +} diff --git a/src/Command/ConsumeCommand.php b/src/Command/ConsumeCommand.php new file mode 100644 index 0000000..61d3fc2 --- /dev/null +++ b/src/Command/ConsumeCommand.php @@ -0,0 +1,85 @@ +getConsumers(); + + $this->trap([SIGINT, SIGTERM], function ($signo) use ($consumers) { + $this->warn(sprintf('Received signal %d, exiting...', $signo)); + + foreach ($consumers as $consumer) { + $consumer->stop(); + } + + CoordinatorManager::until(self::class)->resume(); + }); + + foreach ($consumers as $consumer) { + Coroutine::create(function () use ($consumer) { + $consumer->start(); + }); + } + + CoordinatorManager::until(self::class)->yield(); + + $this->info('Bye!'); + } + + /** + * @return Consumer[] + */ + public function getConsumers(): array + { + $consumers = []; + $connections = $this->config->get('trigger.connections', []); + + foreach ($connections as $connection => $options) { + if (isset($options['enable']) && ! $options['enable']) { + continue; + } + + $consumers[] = make(Consumer::class, [ + 'connection' => $connection, + 'options' => (array) $options, + ]); + } + + return $consumers; + } +} diff --git a/src/Command/SubscribersCommand.php b/src/Command/SubscribersCommand.php index 8a2bf19..bbbb79a 100644 --- a/src/Command/SubscribersCommand.php +++ b/src/Command/SubscribersCommand.php @@ -17,9 +17,11 @@ use Hyperf\Di\Annotation\AnnotationCollector; use Psr\Container\ContainerInterface; +use function Hyperf\Collection\collect; + class SubscribersCommand extends HyperfCommand { - protected ?string $signature = 'describe:subscribers {--P|poll= : Pool}'; + protected ?string $signature = 'describe:subscribers {--C|connection= : connection}'; protected string $description = 'List all subscribers.'; @@ -33,18 +35,18 @@ public function handle() $subscribers = AnnotationCollector::getClassesByAnnotation(Subscriber::class); $rows = collect($subscribers) ->filter(function ($property, $class) { - if ($this->input->getOption('pool')) { - return $this->input->getOption('pool') == $property->pool; + if ($connection = $this->input->getOption('connection')) { + return $connection == $property->connection; } return true; }) - ->transform(fn ($property, $class) => [$property->pool, $class, $property->priority]) + ->transform(fn ($property, $class) => [$property->connection, $class, $property->priority]) ->merge([ ['[default]', SnapshotSubscriber::class, 1], ['[default]', TriggerSubscriber::class, 1], ]); $this->info('Subscribers:'); - $this->table(['Pool', 'Subscriber', 'Priority'], $rows); + $this->table(['Connection', 'Subscriber', 'Priority'], $rows); } } diff --git a/src/Command/TriggersCommand.php b/src/Command/TriggersCommand.php index a661133..5cfa5df 100644 --- a/src/Command/TriggersCommand.php +++ b/src/Command/TriggersCommand.php @@ -15,9 +15,12 @@ use Hyperf\Di\Annotation\AnnotationCollector; use Psr\Container\ContainerInterface; +use function Hyperf\Collection\collect; +use function Hyperf\Support\class_basename; + class TriggersCommand extends HyperfCommand { - protected ?string $signature = 'describe:triggers {--P|pool= : Pool} {--T|table= : Table}'; + protected ?string $signature = 'describe:triggers {--C|connection= : connection} {--T|table= : Table}'; protected string $description = 'List all triggers.'; @@ -37,8 +40,8 @@ public function handle() }) ->filter(function ($property, $class) { /* @var Trigger $property */ - if ($this->input->getOption('pool')) { - return $this->input->getOption('pool') == $property->pool; + if ($this->input->getOption('connection')) { + return $this->input->getOption('connection') == $property->connection; } return true; }) @@ -49,9 +52,9 @@ public function handle() } return true; }) - ->transform(fn ($property, $class) => [$property->pool, $property->database, $property->table, implode(',', $property->events), $class, $property->priority]); + ->transform(fn ($property, $class) => [$property->connection, $property->database, $property->table, implode(',', $property->events), $class, $property->priority]); $this->info('Triggers:'); - $this->table(['Pool', 'Database', 'Table', 'Events', 'Trigger', 'Priority'], $rows); + $this->table(['connection', 'Database', 'Table', 'Events', 'Trigger', 'Priority'], $rows); } } diff --git a/src/ConfigProvider.php b/src/ConfigProvider.php index 12ee6f1..7ee7698 100644 --- a/src/ConfigProvider.php +++ b/src/ConfigProvider.php @@ -10,14 +10,6 @@ */ namespace FriendsOfHyperf\Trigger; -use FriendsOfHyperf\Trigger\Command\SubscribersCommand; -use FriendsOfHyperf\Trigger\Command\TriggersCommand; -use FriendsOfHyperf\Trigger\Listener\OnBootApplicationListener; -use FriendsOfHyperf\Trigger\Mutex\RedisServerMutex; -use FriendsOfHyperf\Trigger\Mutex\ServerMutexInterface; -use FriendsOfHyperf\Trigger\Snapshot\BinLogCurrentSnapshotInterface; -use FriendsOfHyperf\Trigger\Snapshot\RedisBinLogCurrentSnapshot; - class ConfigProvider { public function __invoke(): array @@ -25,16 +17,20 @@ public function __invoke(): array defined('BASE_PATH') or define('BASE_PATH', ''); return [ + 'aspects' => [ + Aspect\BinaryDataReaderAspect::class, // Fix MySQLReplication bug + ], 'dependencies' => [ - ServerMutexInterface::class => RedisServerMutex::class, - BinLogCurrentSnapshotInterface::class => RedisBinLogCurrentSnapshot::class, + Mutex\ServerMutexInterface::class => Mutex\RedisServerMutex::class, + Snapshot\BinLogCurrentSnapshotInterface::class => Snapshot\RedisBinLogCurrentSnapshot::class, ], 'commands' => [ - SubscribersCommand::class, - TriggersCommand::class, + Command\ConsumeCommand::class, + Command\SubscribersCommand::class, + Command\TriggersCommand::class, ], 'listeners' => [ - OnBootApplicationListener::class, + Listener\OnBootApplicationListener::class, ], 'publish' => [ [ diff --git a/src/Replication.php b/src/Consumer.php similarity index 73% rename from src/Replication.php rename to src/Consumer.php index cb92a01..8fea7f4 100644 --- a/src/Replication.php +++ b/src/Consumer.php @@ -16,15 +16,19 @@ use FriendsOfHyperf\Trigger\Subscriber\SnapshotSubscriber; use FriendsOfHyperf\Trigger\Subscriber\TriggerSubscriber; use FriendsOfHyperf\Trigger\Traits\Logger; +use Hyperf\Collection\Arr; use Hyperf\Contract\StdoutLoggerInterface; use Hyperf\Coordinator\Constants; use Hyperf\Coordinator\CoordinatorManager; -use Hyperf\Utils\Arr; -use Hyperf\Utils\Coroutine; +use Hyperf\Coroutine\Coroutine; use MySQLReplication\Config\ConfigBuilder; use MySQLReplication\MySQLReplicationFactory; -class Replication +use function Hyperf\Coroutine\wait; +use function Hyperf\Support\make; +use function Hyperf\Tappable\tap; + +class Consumer { use Logger; @@ -41,28 +45,28 @@ class Replication public function __construct( protected subscriberManager $subscriberManager, protected TriggerManager $triggerManager, - protected StdoutLoggerInterface $logger, - protected string $pool = 'default', - protected array $options = [] + protected string $connection = 'default', + protected array $options = [], + protected ?StdoutLoggerInterface $logger = null ) { if (isset($options['name'])) { $this->name = $options['name']; } $this->binLogCurrentSnapshot = make(BinLogCurrentSnapshotInterface::class, [ - 'replication' => $this, + 'consumer' => $this, ]); if ($this->getOption('server_mutex.enable', true)) { $this->serverMutex = make(ServerMutexInterface::class, [ - 'name' => 'trigger:mutex:' . $this->pool, + 'name' => 'trigger:mutex:' . $this->connection, 'owner' => Util::getInternalIp(), - 'options' => $this->getOption('server_mutex', []) + ['pool' => $this->pool], + 'options' => $this->getOption('server_mutex', []) + ['connection' => $this->connection], ]); } if ($this->getOption('health_monitor.enable', true)) { - $this->healthMonitor = make(HealthMonitor::class, ['replication' => $this]); + $this->healthMonitor = make(HealthMonitor::class, ['consumer' => $this]); } } @@ -79,7 +83,7 @@ public function start(): void // Replication start CoordinatorManager::until($this->getIdentifier())->resume(); - $this->debug('Process started.'); + $this->debug('Consumer started.'); // Worker exit Coroutine::create(function () { @@ -87,7 +91,7 @@ public function start(): void $this->stop(); - $this->warning('Process stopped.'); + $this->warning('Consumer stopped.'); }); while (1) { @@ -95,7 +99,7 @@ public function start(): void break; } - $replication->consume(); + wait(fn () => $replication->consume(), (float) $this->getOption('consume_timeout', 600)); } }; @@ -118,7 +122,7 @@ public function getHealthMonitor(): ?HealthMonitor public function getName(): string { - return $this->name ?? 'trigger-' . $this->pool; + return $this->name ?? 'trigger-' . $this->connection; } public function getOption(?string $key = null, $default = null) @@ -130,14 +134,14 @@ public function getOption(?string $key = null, $default = null) return Arr::get($this->options, $key, $default); } - public function getPool(): string + public function getConnection(): string { - return $this->pool; + return $this->connection; } public function getIdentifier(): string { - return sprintf('%s_start', $this->pool); + return sprintf('%s_start', $this->connection); } public function stop(): void @@ -153,18 +157,18 @@ public function isStopped(): bool protected function makeReplication(): MySQLReplicationFactory { - $pool = $this->pool; + $connection = $this->connection; // Get options $config = (array) $this->options; // Get databases of replication $databasesOnly = array_replace( $config['databases_only'] ?? [], - $this->triggerManager->getDatabases($pool) + $this->triggerManager->getDatabases($connection) ); // Get tables of replication $tablesOnly = array_replace( $config['tables_only'] ?? [], - $this->triggerManager->getTables($pool) + $this->triggerManager->getTables($connection) ); /** @var ConfigBuilder */ @@ -189,18 +193,21 @@ protected function makeReplication(): MySQLReplicationFactory $eventDispatcher = make(EventDispatcher::class); - return tap(make(MySQLReplicationFactory::class, [ - 'config' => $configBuilder->build(), - 'eventDispatcher' => $eventDispatcher, - ]), function ($factory) use ($pool) { - /** @var MySQLReplicationFactory $factory */ - $subscribers = $this->subscriberManager->get($pool); - $subscribers[] = TriggerSubscriber::class; - $subscribers[] = SnapshotSubscriber::class; - - foreach ($subscribers as $subscriber) { - $factory->registerSubscriber(make($subscriber, ['replication' => $this])); + return tap( + make(MySQLReplicationFactory::class, [ + 'config' => $configBuilder->build(), + 'eventDispatcher' => $eventDispatcher, + ]), + function ($factory) use ($connection) { + /** @var MySQLReplicationFactory $factory */ + $subscribers = $this->subscriberManager->get($connection); + $subscribers[] = TriggerSubscriber::class; + $subscribers[] = SnapshotSubscriber::class; + + foreach ($subscribers as $subscriber) { + $factory->registerSubscriber(make($subscriber, ['consumer' => $this])); + } } - }); + ); } } diff --git a/src/ReplicationManager.php b/src/ConsumerManager.php similarity index 51% rename from src/ReplicationManager.php rename to src/ConsumerManager.php index bd9c7a3..6e865c5 100644 --- a/src/ReplicationManager.php +++ b/src/ConsumerManager.php @@ -15,7 +15,9 @@ use Hyperf\Process\ProcessManager; use Psr\Container\ContainerInterface; -class ReplicationManager +use function Hyperf\Support\make; + +class ConsumerManager { public function __construct( protected ContainerInterface $container, @@ -25,30 +27,37 @@ public function __construct( public function run() { - $pools = $this->config->get('trigger.pools', []); + $connections = $this->config->get('trigger.connections', []); + + foreach ($connections as $connection => $options) { + if (isset($options['enable']) && ! $options['enable']) { + continue; + } - foreach ($pools as $pool => $options) { - $replication = make(Replication::class, [ - 'pool' => $pool, + $consumer = make(Consumer::class, [ + 'connection' => $connection, 'options' => (array) $options, ]); - $process = $this->createProcess($replication); - $process->name = $replication->getName(); + + $process = $this->createProcess($consumer); + $process->name = $consumer->getName(); $process->nums = 1; + ProcessManager::register($process); } } - protected function createProcess(Replication $replication): AbstractProcess + protected function createProcess(Consumer $consumer): AbstractProcess { - return new class($replication) extends AbstractProcess { - public function __construct(protected Replication $replication) + return new class($this->container, $consumer) extends AbstractProcess { + public function __construct(ContainerInterface $container, protected Consumer $consumer) { + parent::__construct($container); } public function handle(): void { - $this->replication->start(); + $this->consumer->start(); } }; } diff --git a/src/Event/OnReplicationStop.php b/src/Event/OnReplicationStop.php index 4cd94c2..c7d2b19 100644 --- a/src/Event/OnReplicationStop.php +++ b/src/Event/OnReplicationStop.php @@ -14,7 +14,7 @@ class OnReplicationStop { - public function __construct(public string $pool, public ?BinLogCurrent $binLogCurrent = null) + public function __construct(public string $connection, public ?BinLogCurrent $binLogCurrent = null) { } } diff --git a/src/Listener/OnBootApplicationListener.php b/src/Listener/OnBootApplicationListener.php index 3988445..aaac4a0 100644 --- a/src/Listener/OnBootApplicationListener.php +++ b/src/Listener/OnBootApplicationListener.php @@ -10,7 +10,7 @@ */ namespace FriendsOfHyperf\Trigger\Listener; -use FriendsOfHyperf\Trigger\ReplicationManager; +use FriendsOfHyperf\Trigger\ConsumerManager; use FriendsOfHyperf\Trigger\SubscriberManager; use FriendsOfHyperf\Trigger\TriggerManager; use Hyperf\Event\Contract\ListenerInterface; @@ -21,7 +21,7 @@ class OnBootApplicationListener implements ListenerInterface public function __construct( protected SubscriberManager $subscriberManager, protected TriggerManager $triggerManager, - protected ReplicationManager $replicationManager + protected ConsumerManager $consumerManager ) { } @@ -39,6 +39,6 @@ public function process(object $event): void { $this->subscriberManager->register(); $this->triggerManager->register(); - $this->replicationManager->run(); + $this->consumerManager->run(); } } diff --git a/src/Monitor/HealthMonitor.php b/src/Monitor/HealthMonitor.php index a7451fb..b32b5f5 100644 --- a/src/Monitor/HealthMonitor.php +++ b/src/Monitor/HealthMonitor.php @@ -10,14 +10,14 @@ */ namespace FriendsOfHyperf\Trigger\Monitor; +use FriendsOfHyperf\Trigger\Consumer; use FriendsOfHyperf\Trigger\Event\OnReplicationStop; -use FriendsOfHyperf\Trigger\Replication; use FriendsOfHyperf\Trigger\Snapshot\BinLogCurrentSnapshotInterface; use FriendsOfHyperf\Trigger\Traits\Logger; use Hyperf\Contract\StdoutLoggerInterface; use Hyperf\Coordinator\CoordinatorManager; use Hyperf\Coordinator\Timer; -use Hyperf\Utils\Coroutine; +use Hyperf\Coroutine\Coroutine; use MySQLReplication\BinLog\BinLogCurrent; use Psr\Container\ContainerInterface; use Psr\EventDispatcher\EventDispatcherInterface; @@ -32,28 +32,30 @@ class HealthMonitor protected int $snapShortInterval = 10; - protected string $pool; + protected string $connection; protected BinLogCurrentSnapshotInterface $binLogCurrentSnapshot; protected Timer $timer; - protected StdoutLoggerInterface $logger; + protected ?StdoutLoggerInterface $logger = null; - public function __construct(protected ContainerInterface $container, protected Replication $replication) + public function __construct(protected ContainerInterface $container, protected Consumer $consumer) { - $this->pool = $replication->getPool(); - $this->monitorInterval = (int) $replication->getOption('health_monitor.interval', 10); - $this->snapShortInterval = (int) $replication->getOption('snapshot.interval', 10); - $this->binLogCurrentSnapshot = $replication->getBinLogCurrentSnapshot(); - $this->logger = $this->container->get(StdoutLoggerInterface::class); + $this->connection = $consumer->getConnection(); + $this->monitorInterval = (int) $consumer->getOption('health_monitor.interval', 10); + $this->snapShortInterval = (int) $consumer->getOption('snapshot.interval', 10); + $this->binLogCurrentSnapshot = $consumer->getBinLogCurrentSnapshot(); + if ($container->has(StdoutLoggerInterface::class)) { + $this->logger = $container->get(StdoutLoggerInterface::class); + } $this->timer = new Timer($this->logger); } public function process(): void { Coroutine::create(function () { - CoordinatorManager::until($this->replication->getIdentifier())->yield(); + CoordinatorManager::until($this->consumer->getIdentifier())->yield(); // Monitor binLogCurrent $this->timer->tick($this->monitorInterval, function () { @@ -73,11 +75,15 @@ public function process(): void return; } + $binLogCurrentCache = $this->binLogCurrentSnapshot->get(); + if ( - $this->binLogCurrentSnapshot->get() instanceof BinLogCurrent - && $this->binLogCurrentSnapshot->get()->getBinLogPosition() == $this->binLogCurrent->getBinLogPosition() + $binLogCurrentCache instanceof BinLogCurrent + && $binLogCurrentCache->getBinLogPosition() == $this->binLogCurrent->getBinLogPosition() ) { - $this->container->get(EventDispatcherInterface::class)?->dispatch(new OnReplicationStop($this->pool, $this->binLogCurrent)); + if ($this->container->has(EventDispatcherInterface::class)) { + $this->container->get(EventDispatcherInterface::class)?->dispatch(new OnReplicationStop($this->connection, $this->binLogCurrent)); + } } $this->binLogCurrentSnapshot->set($this->binLogCurrent); diff --git a/src/Mutex/RedisServerMutex.php b/src/Mutex/RedisServerMutex.php index a2dfe63..6ea62ac 100644 --- a/src/Mutex/RedisServerMutex.php +++ b/src/Mutex/RedisServerMutex.php @@ -33,20 +33,20 @@ class RedisServerMutex implements ServerMutexInterface protected int $retryInterval = 10; - protected string $pool = 'default'; + protected string $connection = 'default'; public function __construct( - protected StdoutLoggerInterface $logger, protected Redis $redis, protected ?string $name = null, protected ?string $owner = null, - array $options = [] + array $options = [], + protected ?StdoutLoggerInterface $logger = null ) { $this->expires = (int) $options['expires'] ?? 60; $this->keepaliveInterval = (int) $options['keepalive_interval'] ?? 10; - $this->name = $name ?? sprintf('trigger:server:%s', $this->pool); + $this->name = $name ?? sprintf('trigger:server:%s', $this->connection); $this->owner = $owner ?? Util::getInternalIp(); - $this->pool = $options['pool']; + $this->connection = $options['connection']; $this->timer = new Timer($logger); $this->retryInterval = (int) $options['retry_interval'] ?? 10; } @@ -111,6 +111,6 @@ public function release(bool $force = false): void protected function getIdentifier(): string { - return sprintf('%s_%s', $this->pool, __CLASS__); + return sprintf('%s_%s', $this->connection, __CLASS__); } } diff --git a/src/Snapshot/RedisBinLogCurrentSnapshot.php b/src/Snapshot/RedisBinLogCurrentSnapshot.php index c6c6638..3fc20e5 100644 --- a/src/Snapshot/RedisBinLogCurrentSnapshot.php +++ b/src/Snapshot/RedisBinLogCurrentSnapshot.php @@ -10,14 +10,16 @@ */ namespace FriendsOfHyperf\Trigger\Snapshot; -use FriendsOfHyperf\Trigger\Replication; +use FriendsOfHyperf\Trigger\Consumer; use Hyperf\Redis\Redis; use MySQLReplication\BinLog\BinLogCurrent; +use function Hyperf\Support\with; + class RedisBinLogCurrentSnapshot implements BinLogCurrentSnapshotInterface { public function __construct( - private Replication $replication, + private Consumer $consumer, private Redis $redis ) { } @@ -25,7 +27,7 @@ public function __construct( public function set(BinLogCurrent $binLogCurrent): void { $this->redis->set($this->key(), serialize($binLogCurrent)); - $this->redis->expire($this->key(), (int) $this->replication->getOption('snapshot.expires', 24 * 3600)); + $this->redis->expire($this->key(), (int) $this->consumer->getOption('snapshot.expires', 24 * 3600)); } public function get(): ?BinLogCurrent @@ -47,8 +49,8 @@ private function key(): string 'trigger', 'snapshot', 'binLogCurrent', - $this->replication->getOption('snapshot.version', '1.0'), - $this->replication->getPool(), + $this->consumer->getOption('snapshot.version', '1.0'), + $this->consumer->getConnection(), ]); } } diff --git a/src/Subscriber/SnapshotSubscriber.php b/src/Subscriber/SnapshotSubscriber.php index 9837a24..48539a7 100644 --- a/src/Subscriber/SnapshotSubscriber.php +++ b/src/Subscriber/SnapshotSubscriber.php @@ -10,21 +10,21 @@ */ namespace FriendsOfHyperf\Trigger\Subscriber; -use FriendsOfHyperf\Trigger\Replication; +use FriendsOfHyperf\Trigger\Consumer; use MySQLReplication\Event\DTO\EventDTO; class SnapshotSubscriber extends AbstractSubscriber { - public function __construct(protected Replication $replication) + public function __construct(protected Consumer $consumer) { } protected function allEvents(EventDTO $event): void { - if (! $this->replication->getHealthMonitor()) { + if (! $this->consumer->getHealthMonitor()) { return; } - $this->replication->getHealthMonitor()->setBinLogCurrent($event->getEventInfo()->getBinLogCurrent()); + $this->consumer->getHealthMonitor()->setBinLogCurrent($event->getEventInfo()->getBinLogCurrent()); } } diff --git a/src/Subscriber/TriggerSubscriber.php b/src/Subscriber/TriggerSubscriber.php index b2dfcb6..ed21ad4 100644 --- a/src/Subscriber/TriggerSubscriber.php +++ b/src/Subscriber/TriggerSubscriber.php @@ -10,17 +10,19 @@ */ namespace FriendsOfHyperf\Trigger\Subscriber; -use FriendsOfHyperf\Trigger\Replication; +use FriendsOfHyperf\Trigger\Consumer; use FriendsOfHyperf\Trigger\Traits\Logger; use FriendsOfHyperf\Trigger\TriggerManager; use Hyperf\Contract\StdoutLoggerInterface; -use Hyperf\Utils\Coroutine\Concurrent; +use Hyperf\Coroutine\Concurrent; use MySQLReplication\Definitions\ConstEventsNames; use MySQLReplication\Event\DTO\EventDTO; use MySQLReplication\Event\DTO\RowsDTO; use Psr\Container\ContainerInterface; use Throwable; +use function Hyperf\Support\call; + class TriggerSubscriber extends AbstractSubscriber { use Logger; @@ -30,11 +32,11 @@ class TriggerSubscriber extends AbstractSubscriber public function __construct( protected ContainerInterface $container, protected TriggerManager $triggerManager, - protected StdoutLoggerInterface $logger, - protected Replication $replication + protected Consumer $consumer, + protected ?StdoutLoggerInterface $logger = null ) { $this->concurrent = new Concurrent( - (int) $replication->getOption('concurrent.limit') ?? 1000 + (int) $consumer->getOption('concurrent.limit') ?? 1000 ); } @@ -54,7 +56,7 @@ protected function allEvents(EventDTO $event): void } $key = join('.', [ - $this->replication->getPool(), + $this->consumer->getConnection(), $event->getTableMap()->getDatabase(), $event->getTableMap()->getTable(), $event->getType(), @@ -86,7 +88,7 @@ protected function allEvents(EventDTO $event): void try { call([$this->container->get($class), $method], $args); } catch (Throwable $e) { - $this->logger->error(sprintf( + $this->error(sprintf( "%s in %s:%s\n%s", $e->getMessage(), $e->getFile(), diff --git a/src/SubscriberManager.php b/src/SubscriberManager.php index 10907b3..3c1dc98 100644 --- a/src/SubscriberManager.php +++ b/src/SubscriberManager.php @@ -11,16 +11,16 @@ namespace FriendsOfHyperf\Trigger; use FriendsOfHyperf\Trigger\Annotation\Subscriber; +use Hyperf\Collection\Arr; use Hyperf\Contract\StdoutLoggerInterface; use Hyperf\Di\Annotation\AnnotationCollector; -use Hyperf\Utils\Arr; use SplPriorityQueue; class SubscriberManager { protected array $subscribers = []; - public function __construct(protected StdoutLoggerInterface $logger) + public function __construct(protected ?StdoutLoggerInterface $logger = null) { } @@ -36,12 +36,12 @@ public function register() foreach ($queue as $value) { [$class, $property] = $value; - $this->subscribers[$property->pool] ??= []; - $this->subscribers[$property->pool][] = $class; + $this->subscribers[$property->connection] ??= []; + $this->subscribers[$property->connection][] = $class; $this->logger->debug(sprintf( '[trigger.%s] %s registered by %s process by %s.', - $property->pool, + $property->connection, $this::class, $class, $this::class @@ -49,8 +49,8 @@ public function register() } } - public function get(string $pool = 'default'): array + public function get(string $connection = 'default'): array { - return (array) Arr::get($this->subscribers, $pool, []); + return (array) Arr::get($this->subscribers, $connection, []); } } diff --git a/src/Traits/Logger.php b/src/Traits/Logger.php index a2570db..751192a 100644 --- a/src/Traits/Logger.php +++ b/src/Traits/Logger.php @@ -10,34 +10,34 @@ */ namespace FriendsOfHyperf\Trigger\Traits; +use Hyperf\Context\ApplicationContext; use Hyperf\Contract\StdoutLoggerInterface; -use Hyperf\Utils\ApplicationContext; use Psr\Log\LoggerInterface; /** - * @property ?string $pool + * @property ?string $connection * @property ?LoggerInterface $logger */ trait Logger { protected function info(string $message, array $context = []): void { - $this->getLogger()->info($this->formatMessage($message, $context)); + $this->getLogger()?->info($this->formatMessage($message, $context)); } protected function debug(string $message, array $context = []): void { - $this->getLogger()->debug($this->formatMessage($message, $context)); + $this->getLogger()?->debug($this->formatMessage($message, $context)); } protected function warning(string $message, array $context = []): void { - $this->getLogger()->warning($this->formatMessage($message, $context)); + $this->getLogger()?->warning($this->formatMessage($message, $context)); } protected function error(string $message, array $context = []): void { - $this->getLogger()->error($this->formatMessage($message, $context)); + $this->getLogger()?->error($this->formatMessage($message, $context)); } protected function formatMessage(string $message, array $context = []): string @@ -45,19 +45,25 @@ protected function formatMessage(string $message, array $context = []): string return sprintf( '[trigger%s] %s %s', /* @phpstan-ignore-next-line */ - isset($this->pool) ? ".{$this->pool}" : 'default', + isset($this->connection) ? ".{$this->connection}" : 'default', $message, $context ? json_encode($context, JSON_UNESCAPED_UNICODE) : '' ); } - protected function getLogger(): LoggerInterface + protected function getLogger(): ?LoggerInterface { /* @phpstan-ignore-next-line */ if (isset($this->logger) && $this->logger instanceof LoggerInterface) { return $this->logger; } - return ApplicationContext::getContainer()->get(StdoutLoggerInterface::class); + $container = ApplicationContext::getContainer(); + + if ($container->has(StdoutLoggerInterface::class)) { + return $container->get(StdoutLoggerInterface::class); + } + + return null; } } diff --git a/src/TriggerManager.php b/src/TriggerManager.php index 5dd044f..8355c41 100644 --- a/src/TriggerManager.php +++ b/src/TriggerManager.php @@ -11,11 +11,13 @@ namespace FriendsOfHyperf\Trigger; use FriendsOfHyperf\Trigger\Annotation\Trigger; +use Hyperf\Collection\Arr; use Hyperf\Contract\ConfigInterface; use Hyperf\Di\Annotation\AnnotationCollector; -use Hyperf\Utils\Arr; use SplPriorityQueue; +use function Hyperf\Support\class_basename; + class TriggerManager { protected array $triggers = []; @@ -42,11 +44,11 @@ public function register(): void /** @var Trigger $property */ foreach ($property->events as $eventType) { - $config = $this->config->get('trigger.pools.' . $property->pool); + $config = $this->config->get('trigger.connections.' . $property->connection); $property->table ??= class_basename($class); $property->database ??= $config['databases_only'][0] ?? ''; - $key = $this->buildKey($property->pool, $property->database, $property->table, $eventType); + $key = $this->buildKey($property->connection, $property->database, $property->table, $eventType); $method = 'on' . ucfirst($eventType); $items = Arr::get($this->triggers, $key, []); @@ -62,17 +64,17 @@ public function get(string $key): array return Arr::get($this->triggers, $key, []); } - public function getDatabases(string $pool): array + public function getDatabases(string $connection): array { - return array_keys($this->get($pool)); + return array_keys($this->get($connection)); } - public function getTables(string $pool): array + public function getTables(string $connection): array { $tables = []; - foreach ($this->getDatabases($pool) as $database) { - $tables = [...$tables, ...array_keys($this->get($this->buildKey($pool, $database)))]; + foreach ($this->getDatabases($connection) as $database) { + $tables = [...$tables, ...array_keys($this->get($this->buildKey($connection, $database)))]; } return $tables; diff --git a/src/Util.php b/src/Util.php index e9e544e..46e458f 100644 --- a/src/Util.php +++ b/src/Util.php @@ -10,6 +10,8 @@ */ namespace FriendsOfHyperf\Trigger; +use RuntimeException; + class Util { /** @@ -29,6 +31,6 @@ public static function getInternalIp(): string return $ip; } - throw new \RuntimeException('Can not get the internal IP.'); + throw new RuntimeException('Can not get the internal IP.'); } }