diff --git a/.gitattributes b/.gitattributes
new file mode 100644
index 0000000..c90148b
--- /dev/null
+++ b/.gitattributes
@@ -0,0 +1,4 @@
+/.github export-ignore
+/.vscode export-ignore
+/tests export-ignore
+.gitattributes export-ignore
\ No newline at end of file
diff --git a/.github/workflows/close-pull-request.yml b/.github/workflows/close-pull-request.yml
new file mode 100644
index 0000000..fa4b5ca
--- /dev/null
+++ b/.github/workflows/close-pull-request.yml
@@ -0,0 +1,13 @@
+name: Close Pull Request
+
+on:
+ pull_request_target:
+ types: [opened]
+
+jobs:
+ run:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: superbrothers/close-pull-request@v3
+ with:
+ comment: "Thank you for your pull request. However, you have submitted this PR on the friendsofhyperf organization which is a read-only sub split of `friendsofhyperf/components`. Please submit your PR on the https://github.com/friendsofhyperf/components repository.
Thanks!"
diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml
new file mode 100644
index 0000000..dde70ae
--- /dev/null
+++ b/.github/workflows/release.yaml
@@ -0,0 +1,25 @@
+on:
+ push:
+ # Sequence of patterns matched against refs/tags
+ tags:
+ - 'v*' # Push events to matching v*, i.e. v1.0, v1.0.0
+
+name: Release
+
+jobs:
+ release:
+ name: Release
+ runs-on: ubuntu-latest
+ steps:
+ - name: Checkout code
+ uses: actions/checkout@v2
+ - name: Create Release
+ id: create_release
+ uses: actions/create-release@v1
+ env:
+ GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+ with:
+ tag_name: ${{ github.ref }}
+ release_name: Release ${{ github.ref }}
+ draft: false
+ prerelease: false
\ No newline at end of file
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..75bb9fc
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2020 D.J.Hwang
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..a595f41
--- /dev/null
+++ b/README.md
@@ -0,0 +1,73 @@
+# Trigger
+
+[![Latest Stable Version](https://poser.pugx.org/friendsofhyperf/trigger/version.png)](https://packagist.org/packages/friendsofhyperf/trigger)
+[![Total Downloads](https://poser.pugx.org/friendsofhyperf/trigger/d/total.png)](https://packagist.org/packages/friendsofhyperf/trigger)
+[![GitHub license](https://img.shields.io/github/license/friendsofhyperf/trigger)](https://github.com/friendsofhyperf/trigger)
+
+MySQL trigger component for Hyperf, Based on a great work of creators:[moln/php-mysql-replication](https://github.com/moln/php-mysql-replication)
+
+## Installation
+
+- Request
+
+```bash
+composer require "friendsofhyperf/trigger:~3.0.0"
+```
+
+- Publish
+
+```bash
+php bin/hyperf.php vendor:publish friendsofhyperf/trigger
+```
+
+## Define a trigger
+
+```php
+namespace App\Trigger;
+
+use FriendsOfHyperf\Trigger\Annotation\Trigger;
+use FriendsOfHyperf\Trigger\Trigger\AbstractTrigger;
+use MySQLReplication\Event\DTO\EventDTO;
+
+#[Trigger(table:"table", on:"*", connection:"default")]
+class FooTrigger extends AbstractTrigger
+{
+ public function onWrite(array $new)
+ {
+ var_dump($new);
+ }
+
+ public function onUpdate(array $old, array $new)
+ {
+ var_dump($old, $new);
+ }
+
+ public function onDelete(array $old)
+ {
+ var_dump($old);
+ }
+}
+```
+
+## Define a subscriber
+
+```php
+namespace App\Subscriber;
+
+use FriendsOfHyperf\Trigger\Annotation\Subscriber;
+use FriendsOfHyperf\Trigger\Subscriber\AbstractEventSubscriber;
+use MySQLReplication\Event\DTO\EventDTO;
+
+#[Subscriber(connection:"default")]
+class BarSubscriber extends AbstractEventSubscriber
+{
+ protected function allEvents(EventDTO $event): void
+ {
+ // some code
+ }
+}
+```
+
+## Sponsor
+
+If you like them, Buy me a cup of coffee. [ [Alipay](https://hdj.me/images/alipay.jpg) | [WePay](https://hdj.me/images/wechat-pay.jpg) ]
diff --git a/composer.json b/composer.json
new file mode 100644
index 0000000..e826218
--- /dev/null
+++ b/composer.json
@@ -0,0 +1,55 @@
+{
+ "name": "friendsofhyperf/trigger",
+ "description": "The MySQL Trigger component for Hyperf.",
+ "type": "library",
+ "require": {
+ "php": ">=8.0.0",
+ "friendsofhyperf/command-signals": "~3.0.0",
+ "hyperf/collection": "~3.0.0",
+ "hyperf/command": "~3.0.0",
+ "hyperf/conditionable": "~3.0.0",
+ "hyperf/context": "~3.0.0",
+ "hyperf/coordinator": "~3.0.0",
+ "hyperf/coroutine": "~3.0.0",
+ "hyperf/di": "~3.0.0",
+ "hyperf/event": "~3.0.0",
+ "hyperf/process": "~3.0.0",
+ "hyperf/stringable": "~3.0.0",
+ "hyperf/support": "~3.0.0",
+ "hyperf/tappable": "~3.0.0",
+ "moln/php-mysql-replication": "^1.2"
+ },
+ "autoload": {
+ "psr-4": {
+ "FriendsOfHyperf\\Trigger\\": "src"
+ }
+ },
+ "extra": {
+ "hyperf": {
+ "config": "FriendsOfHyperf\\Trigger\\ConfigProvider"
+ }
+ },
+ "suggest": {
+ "hyperf/redis": "Required to use Redis client.",
+ "hyperf/signal": "Required to use Signal manager."
+ },
+ "config": {
+ "sort-packages": true,
+ "allow-plugins": {
+ "composer/package-versions-deprecated": true
+ }
+ },
+ "minimum-stability": "dev",
+ "prefer-stable": true,
+ "license": "MIT",
+ "authors": [
+ {
+ "name": "huangdijia",
+ "email": "huangdijia@gmail.com"
+ }
+ ],
+ "support": {
+ "issues": "https://github.com/friendsofhyperf/components/issues",
+ "source": "https://github.com/friendsofhyperf/components"
+ }
+}
diff --git a/publish/trigger.php b/publish/trigger.php
new file mode 100644
index 0000000..3d3289b
--- /dev/null
+++ b/publish/trigger.php
@@ -0,0 +1,51 @@
+ [
+ 'default' => [
+ 'enable' => env('TRIGGER_ENABLE', true),
+
+ 'host' => env('TRIGGER_HOST', ''),
+ 'port' => (int) env('TRIGGER_PORT', 3306),
+ 'user' => env('TRIGGER_USER', ''),
+ 'password' => env('TRIGGER_PASSWORD', ''),
+ 'databases_only' => env('TRIGGER_DATABASES_ONLY', '') ? explode(',', env('TRIGGER_DATABASES_ONLY')) : [],
+ 'tables_only' => env('TRIGGER_TABLES_ONLY', '') ? explode(',', env('TRIGGER_TABLES_ONLY')) : [],
+ 'heartbeat_period' => (int) env('TRIGGER_HEARTBEAT', 3),
+
+ 'server_mutex' => [
+ 'enable' => true,
+ 'expires' => 30,
+ 'keepalive_interval' => 10,
+ 'retry_interval' => 10,
+ ],
+
+ 'health_monitor' => [
+ 'enable' => true,
+ 'interval' => 30,
+ ],
+
+ 'snapshot' => [
+ 'version' => '1.0',
+ 'expires' => 24 * 3600,
+ 'interval' => 10,
+ ],
+
+ 'concurrent' => [
+ 'limit' => 1000,
+ ],
+
+ 'consume_timeout' => 600,
+ ],
+ ],
+];
diff --git a/src/Annotation/Subscriber.php b/src/Annotation/Subscriber.php
new file mode 100644
index 0000000..24757c2
--- /dev/null
+++ b/src/Annotation/Subscriber.php
@@ -0,0 +1,22 @@
+arguments['keys']['size'] == BinaryDataReader::UNSIGNED_INT64_LENGTH) {
+ return (fn () => (int) $this->readUInt64())->call($proceedingJoinPoint->getInstance());
+ }
+
+ return $proceedingJoinPoint->process();
+ }
+}
diff --git a/src/Command/ConsumeCommand.php b/src/Command/ConsumeCommand.php
new file mode 100644
index 0000000..281a6aa
--- /dev/null
+++ b/src/Command/ConsumeCommand.php
@@ -0,0 +1,85 @@
+getConsumers();
+
+ $this->trap([SIGINT, SIGTERM], function ($signo) use ($consumers) {
+ $this->warn(sprintf('Received signal %d, exiting...', $signo));
+
+ foreach ($consumers as $consumer) {
+ $consumer->stop();
+ }
+
+ CoordinatorManager::until(self::class)->resume();
+ });
+
+ foreach ($consumers as $consumer) {
+ Coroutine::create(function () use ($consumer) {
+ $consumer->start();
+ });
+ }
+
+ CoordinatorManager::until(self::class)->yield();
+
+ $this->info('Bye!');
+ }
+
+ /**
+ * @return Consumer[]
+ */
+ public function getConsumers(): array
+ {
+ $consumers = [];
+ $connections = $this->config->get('trigger.connections', []);
+
+ foreach ($connections as $connection => $options) {
+ if (isset($options['enable']) && ! $options['enable']) {
+ continue;
+ }
+
+ $consumers[] = make(Consumer::class, [
+ 'connection' => $connection,
+ 'options' => (array) $options,
+ ]);
+ }
+
+ return $consumers;
+ }
+}
diff --git a/src/Command/SubscribersCommand.php b/src/Command/SubscribersCommand.php
new file mode 100644
index 0000000..c3c4150
--- /dev/null
+++ b/src/Command/SubscribersCommand.php
@@ -0,0 +1,52 @@
+filter(function ($property, $class) {
+ if ($connection = $this->input->getOption('connection')) {
+ return $connection == $property->connection;
+ }
+ return true;
+ })
+ ->transform(fn ($property, $class) => [$property->connection, $class, $property->priority])
+ ->merge([
+ ['[default]', SnapshotSubscriber::class, 1],
+ ['[default]', TriggerSubscriber::class, 1],
+ ]);
+
+ $this->info('Subscribers:');
+ $this->table(['Connection', 'Subscriber', 'Priority'], $rows);
+ }
+}
diff --git a/src/Command/TriggersCommand.php b/src/Command/TriggersCommand.php
new file mode 100644
index 0000000..845e416
--- /dev/null
+++ b/src/Command/TriggersCommand.php
@@ -0,0 +1,60 @@
+each(function ($property, $class) {
+ /* @var Trigger $property */
+ $property->table ??= class_basename($class);
+ })
+ ->filter(function ($property, $class) {
+ /* @var Trigger $property */
+ if ($this->input->getOption('connection')) {
+ return $this->input->getOption('connection') == $property->connection;
+ }
+ return true;
+ })
+ ->filter(function ($property, $class) {
+ /* @var Trigger $property */
+ if ($this->input->getOption('table')) {
+ return $this->input->getOption('table') == $property->table;
+ }
+ return true;
+ })
+ ->transform(fn ($property, $class) => [$property->connection, $property->database, $property->table, implode(',', $property->events), $class, $property->priority]);
+
+ $this->info('Triggers:');
+ $this->table(['connection', 'Database', 'Table', 'Events', 'Trigger', 'Priority'], $rows);
+ }
+}
diff --git a/src/ConfigProvider.php b/src/ConfigProvider.php
new file mode 100644
index 0000000..74a4fca
--- /dev/null
+++ b/src/ConfigProvider.php
@@ -0,0 +1,45 @@
+ [
+ Aspect\BinaryDataReaderAspect::class, // Fix MySQLReplication bug
+ ],
+ 'dependencies' => [
+ Mutex\ServerMutexInterface::class => Mutex\RedisServerMutex::class,
+ Snapshot\BinLogCurrentSnapshotInterface::class => Snapshot\RedisBinLogCurrentSnapshot::class,
+ ],
+ 'commands' => [
+ Command\ConsumeCommand::class,
+ Command\SubscribersCommand::class,
+ Command\TriggersCommand::class,
+ ],
+ 'listeners' => [
+ Listener\OnBootApplicationListener::class,
+ ],
+ 'publish' => [
+ [
+ 'id' => 'config',
+ 'description' => 'The config file of trigger.',
+ 'source' => __DIR__ . '/../publish/trigger.php',
+ 'destination' => BASE_PATH . '/config/autoload/trigger.php',
+ ],
+ ],
+ ];
+ }
+}
diff --git a/src/Consumer.php b/src/Consumer.php
new file mode 100644
index 0000000..e74d414
--- /dev/null
+++ b/src/Consumer.php
@@ -0,0 +1,213 @@
+name = $options['name'];
+ }
+
+ $this->binLogCurrentSnapshot = make(BinLogCurrentSnapshotInterface::class, [
+ 'consumer' => $this,
+ ]);
+
+ if ($this->getOption('server_mutex.enable', true)) {
+ $this->serverMutex = make(ServerMutexInterface::class, [
+ 'name' => 'trigger:mutex:' . $this->connection,
+ 'owner' => Util::getInternalIp(),
+ 'options' => $this->getOption('server_mutex', []) + ['connection' => $this->connection],
+ ]);
+ }
+
+ if ($this->getOption('health_monitor.enable', true)) {
+ $this->healthMonitor = make(HealthMonitor::class, ['consumer' => $this]);
+ }
+ }
+
+ public function start(): void
+ {
+ $callback = function () {
+ // Health monitor
+ if ($this->healthMonitor) {
+ $this->healthMonitor->process();
+ }
+
+ $replication = $this->makeReplication();
+
+ // Replication start
+ CoordinatorManager::until($this->getIdentifier())->resume();
+
+ $this->debug('Consumer started.');
+
+ // Worker exit
+ Coroutine::create(function () {
+ CoordinatorManager::until(Constants::WORKER_EXIT)->yield();
+
+ $this->stop();
+
+ $this->warning('Consumer stopped.');
+ });
+
+ while (1) {
+ if ($this->isStopped()) {
+ break;
+ }
+
+ wait(fn () => $replication->consume(), (float) $this->getOption('consume_timeout', 600));
+ }
+ };
+
+ if ($this->serverMutex) {
+ $this->serverMutex->attempt($callback);
+ } else {
+ $callback();
+ }
+ }
+
+ public function getBinLogCurrentSnapshot(): BinLogCurrentSnapshotInterface
+ {
+ return $this->binLogCurrentSnapshot;
+ }
+
+ public function getHealthMonitor(): ?HealthMonitor
+ {
+ return $this->healthMonitor;
+ }
+
+ public function getName(): string
+ {
+ return $this->name ?? 'trigger-' . $this->connection;
+ }
+
+ public function getOption(?string $key = null, $default = null)
+ {
+ if (is_null($key)) {
+ return $this->options;
+ }
+
+ return Arr::get($this->options, $key, $default);
+ }
+
+ public function getConnection(): string
+ {
+ return $this->connection;
+ }
+
+ public function getIdentifier(): string
+ {
+ return sprintf('%s_start', $this->connection);
+ }
+
+ public function stop(): void
+ {
+ $this->stopped = true;
+ $this->serverMutex?->release();
+ }
+
+ public function isStopped(): bool
+ {
+ return $this->stopped;
+ }
+
+ protected function makeReplication(): MySQLReplicationFactory
+ {
+ $connection = $this->connection;
+ // Get options
+ $config = (array) $this->options;
+ // Get databases of replication
+ $databasesOnly = array_replace(
+ $config['databases_only'] ?? [],
+ $this->triggerManager->getDatabases($connection)
+ );
+ // Get tables of replication
+ $tablesOnly = array_replace(
+ $config['tables_only'] ?? [],
+ $this->triggerManager->getTables($connection)
+ );
+
+ /** @var ConfigBuilder */
+ $configBuilder = tap(
+ new ConfigBuilder(),
+ fn (ConfigBuilder $builder) => $builder->withUser($config['user'] ?? 'root')
+ ->withHost($config['host'] ?? '127.0.0.1')
+ ->withPassword($config['password'] ?? 'root')
+ ->withPort((int) $config['port'] ?? 3306)
+ ->withSlaveId(random_int(100, 999))
+ ->withHeartbeatPeriod((float) $config['heartbeat_period'] ?? 3)
+ ->withDatabasesOnly($databasesOnly)
+ ->withTablesOnly($tablesOnly)
+ );
+
+ if ($binLogCurrent = $this->getBinLogCurrentSnapshot()->get()) {
+ $configBuilder->withBinLogFileName($binLogCurrent->getBinFileName());
+ $configBuilder->withBinLogPosition((int) $binLogCurrent->getBinLogPosition());
+
+ $this->debug('Continue with position', $binLogCurrent->jsonSerialize());
+ }
+
+ $eventDispatcher = make(EventDispatcher::class);
+
+ return tap(
+ make(MySQLReplicationFactory::class, [
+ 'config' => $configBuilder->build(),
+ 'eventDispatcher' => $eventDispatcher,
+ ]),
+ function ($factory) use ($connection) {
+ /** @var MySQLReplicationFactory $factory */
+ $subscribers = $this->subscriberManager->get($connection);
+ $subscribers[] = TriggerSubscriber::class;
+ $subscribers[] = SnapshotSubscriber::class;
+
+ foreach ($subscribers as $subscriber) {
+ $factory->registerSubscriber(make($subscriber, ['consumer' => $this]));
+ }
+ }
+ );
+ }
+}
diff --git a/src/ConsumerManager.php b/src/ConsumerManager.php
new file mode 100644
index 0000000..f426f26
--- /dev/null
+++ b/src/ConsumerManager.php
@@ -0,0 +1,64 @@
+config->get('trigger.connections', []);
+
+ foreach ($connections as $connection => $options) {
+ if (isset($options['enable']) && ! $options['enable']) {
+ continue;
+ }
+
+ $consumer = make(Consumer::class, [
+ 'connection' => $connection,
+ 'options' => (array) $options,
+ ]);
+
+ $process = $this->createProcess($consumer);
+ $process->name = $consumer->getName();
+ $process->nums = 1;
+
+ ProcessManager::register($process);
+ }
+ }
+
+ protected function createProcess(Consumer $consumer): AbstractProcess
+ {
+ return new class($this->container, $consumer) extends AbstractProcess {
+ public function __construct(ContainerInterface $container, protected Consumer $consumer)
+ {
+ parent::__construct($container);
+ }
+
+ public function handle(): void
+ {
+ $this->consumer->start();
+ }
+ };
+ }
+}
diff --git a/src/Contact/TriggerInterface.php b/src/Contact/TriggerInterface.php
new file mode 100644
index 0000000..b5e871b
--- /dev/null
+++ b/src/Contact/TriggerInterface.php
@@ -0,0 +1,20 @@
+subscriberManager->register();
+ $this->triggerManager->register();
+ $this->consumerManager->run();
+ }
+}
diff --git a/src/Monitor/HealthMonitor.php b/src/Monitor/HealthMonitor.php
new file mode 100644
index 0000000..128fdca
--- /dev/null
+++ b/src/Monitor/HealthMonitor.php
@@ -0,0 +1,98 @@
+connection = $consumer->getConnection();
+ $this->monitorInterval = (int) $consumer->getOption('health_monitor.interval', 10);
+ $this->snapShortInterval = (int) $consumer->getOption('snapshot.interval', 10);
+ $this->binLogCurrentSnapshot = $consumer->getBinLogCurrentSnapshot();
+ if ($container->has(StdoutLoggerInterface::class)) {
+ $this->logger = $container->get(StdoutLoggerInterface::class);
+ }
+ $this->timer = new Timer($this->logger);
+ }
+
+ public function process(): void
+ {
+ Coroutine::create(function () {
+ CoordinatorManager::until($this->consumer->getIdentifier())->yield();
+
+ // Monitor binLogCurrent
+ $this->timer->tick($this->monitorInterval, function () {
+ if ($this->binLogCurrent instanceof BinLogCurrent) {
+ $this->debug(
+ sprintf(
+ 'Health monitoring, binLogCurrent: %s',
+ json_encode($this->binLogCurrent->jsonSerialize(), JSON_THROW_ON_ERROR)
+ )
+ );
+ }
+ });
+
+ // Health check and set snapshot
+ $this->timer->tick($this->snapShortInterval, function () {
+ if (! $this->binLogCurrent instanceof BinLogCurrent) {
+ return;
+ }
+
+ $binLogCurrentCache = $this->binLogCurrentSnapshot->get();
+
+ if (
+ $binLogCurrentCache instanceof BinLogCurrent
+ && $binLogCurrentCache->getBinLogPosition() == $this->binLogCurrent->getBinLogPosition()
+ ) {
+ if ($this->container->has(EventDispatcherInterface::class)) {
+ $this->container->get(EventDispatcherInterface::class)?->dispatch(new OnReplicationStop($this->connection, $this->binLogCurrent));
+ }
+ }
+
+ $this->binLogCurrentSnapshot->set($this->binLogCurrent);
+ });
+ });
+ }
+
+ public function setBinLogCurrent(BinLogCurrent $binLogCurrent): void
+ {
+ $this->binLogCurrent = $binLogCurrent;
+ }
+}
diff --git a/src/Mutex/RedisServerMutex.php b/src/Mutex/RedisServerMutex.php
new file mode 100644
index 0000000..02ca42d
--- /dev/null
+++ b/src/Mutex/RedisServerMutex.php
@@ -0,0 +1,116 @@
+expires = (int) $options['expires'] ?? 60;
+ $this->keepaliveInterval = (int) $options['keepalive_interval'] ?? 10;
+ $this->name = $name ?? sprintf('trigger:server:%s', $this->connection);
+ $this->owner = $owner ?? Util::getInternalIp();
+ $this->connection = $options['connection'];
+ $this->timer = new Timer($logger);
+ $this->retryInterval = (int) $options['retry_interval'] ?? 10;
+ }
+
+ public function attempt(callable $callback = null): void
+ {
+ // Waiting for the server mutex.
+ $this->timer->tick($this->retryInterval, function () {
+ if (
+ $this->redis->set($this->name, $this->owner, ['NX', 'EX' => $this->expires])
+ || $this->redis->get($this->name) == $this->owner
+ ) {
+ $this->debug('Got server mutex.');
+ CoordinatorManager::until($this->getIdentifier())->resume();
+
+ return Timer::STOP;
+ }
+
+ $this->debug('Waiting server mutex.');
+ });
+
+ // Waiting for the server mutex.
+ CoordinatorManager::until($this->getIdentifier())->yield();
+
+ $this->debug('Server mutex keepalive booted.');
+
+ $this->timer->tick($this->keepaliveInterval, function () {
+ if ($this->released) {
+ $this->debug('Server mutex keepalive stopped.');
+
+ return Timer::STOP;
+ }
+
+ $this->redis->setNx($this->name, $this->owner);
+ $this->redis->expire($this->name, $this->expires);
+ $ttl = $this->redis->ttl($this->name);
+
+ $this->debug('Server mutex keepalive executed', ['ttl' => $ttl]);
+ });
+
+ // Execute the callback.
+ if ($callback) {
+ try {
+ $callback();
+ } catch (Throwable $e) {
+ $this->error((string) $e);
+ }
+ }
+ }
+
+ /**
+ * Release the server mutex.
+ * @throws RedisException
+ */
+ public function release(bool $force = false): void
+ {
+ if ($force || $this->redis->get($this->name) == $this->owner) {
+ $this->redis->del($this->name);
+ $this->released = true;
+ }
+ }
+
+ protected function getIdentifier(): string
+ {
+ return sprintf('%s_%s', $this->connection, __CLASS__);
+ }
+}
diff --git a/src/Mutex/ServerMutexInterface.php b/src/Mutex/ServerMutexInterface.php
new file mode 100644
index 0000000..eddfbb7
--- /dev/null
+++ b/src/Mutex/ServerMutexInterface.php
@@ -0,0 +1,18 @@
+redis->set($this->key(), serialize($binLogCurrent));
+ $this->redis->expire($this->key(), (int) $this->consumer->getOption('snapshot.expires', 24 * 3600));
+ }
+
+ public function get(): ?BinLogCurrent
+ {
+ return with($this->redis->get($this->key()), function ($data) {
+ $data = unserialize((string) $data);
+
+ if (! $data instanceof BinLogCurrent) {
+ return null;
+ }
+
+ return $data;
+ });
+ }
+
+ private function key(): string
+ {
+ return join(':', [
+ 'trigger',
+ 'snapshot',
+ 'binLogCurrent',
+ $this->consumer->getOption('snapshot.version', '1.0'),
+ $this->consumer->getConnection(),
+ ]);
+ }
+}
diff --git a/src/Subscriber/AbstractSubscriber.php b/src/Subscriber/AbstractSubscriber.php
new file mode 100644
index 0000000..7427679
--- /dev/null
+++ b/src/Subscriber/AbstractSubscriber.php
@@ -0,0 +1,17 @@
+consumer->getHealthMonitor()) {
+ return;
+ }
+
+ $this->consumer->getHealthMonitor()->setBinLogCurrent($event->getEventInfo()->getBinLogCurrent());
+ }
+}
diff --git a/src/Subscriber/TriggerSubscriber.php b/src/Subscriber/TriggerSubscriber.php
new file mode 100644
index 0000000..aadaae4
--- /dev/null
+++ b/src/Subscriber/TriggerSubscriber.php
@@ -0,0 +1,103 @@
+concurrent = new Concurrent(
+ (int) $consumer->getOption('concurrent.limit') ?? 1000
+ );
+ }
+
+ public static function getSubscribedEvents(): array
+ {
+ return [
+ ConstEventsNames::UPDATE => 'onUpdate',
+ ConstEventsNames::DELETE => 'onDelete',
+ ConstEventsNames::WRITE => 'onWrite',
+ ];
+ }
+
+ protected function allEvents(EventDTO $event): void
+ {
+ if (! $event instanceof RowsDTO) {
+ return;
+ }
+
+ $key = join('.', [
+ $this->consumer->getConnection(),
+ $event->getTableMap()->getDatabase(),
+ $event->getTableMap()->getTable(),
+ $event->getType(),
+ ]);
+
+ $eventType = $event->getType();
+
+ foreach ($this->triggerManager->get($key) as $callable) {
+ foreach ($event->getValues() as $value) {
+ $this->concurrent->create(function () use ($callable, $value, $eventType) {
+ [$class, $method] = $callable;
+
+ if (! $this->container->has($class)) {
+ $this->warning(sprintf('Entry "%s" cannot be resolved.', $class));
+ return;
+ }
+
+ $args = match ($eventType) {
+ ConstEventsNames::WRITE => [$value],
+ ConstEventsNames::UPDATE => [$value['before'], $value['after']],
+ ConstEventsNames::DELETE => [$value],
+ default => null,
+ };
+
+ if (! $args) {
+ return;
+ }
+
+ try {
+ call([$this->container->get($class), $method], $args);
+ } catch (Throwable $e) {
+ $this->error(sprintf(
+ "%s in %s:%s\n%s",
+ $e->getMessage(),
+ $e->getFile(),
+ $e->getLine(),
+ $e->getTraceAsString()
+ ));
+ }
+ });
+ }
+ }
+ }
+}
diff --git a/src/SubscriberManager.php b/src/SubscriberManager.php
new file mode 100644
index 0000000..17ec49c
--- /dev/null
+++ b/src/SubscriberManager.php
@@ -0,0 +1,56 @@
+ $property) {
+ $queue->insert([$class, $property], $property->priority);
+ }
+
+ foreach ($queue as $value) {
+ [$class, $property] = $value;
+ $this->subscribers[$property->connection] ??= [];
+ $this->subscribers[$property->connection][] = $class;
+
+ $this->logger->debug(sprintf(
+ '[trigger.%s] %s registered by %s process by %s.',
+ $property->connection,
+ $this::class,
+ $class,
+ $this::class
+ ));
+ }
+ }
+
+ public function get(string $connection = 'default'): array
+ {
+ return (array) Arr::get($this->subscribers, $connection, []);
+ }
+}
diff --git a/src/Traits/Logger.php b/src/Traits/Logger.php
new file mode 100644
index 0000000..4c66259
--- /dev/null
+++ b/src/Traits/Logger.php
@@ -0,0 +1,69 @@
+getLogger()?->info($this->formatMessage($message, $context));
+ }
+
+ protected function debug(string $message, array $context = []): void
+ {
+ $this->getLogger()?->debug($this->formatMessage($message, $context));
+ }
+
+ protected function warning(string $message, array $context = []): void
+ {
+ $this->getLogger()?->warning($this->formatMessage($message, $context));
+ }
+
+ protected function error(string $message, array $context = []): void
+ {
+ $this->getLogger()?->error($this->formatMessage($message, $context));
+ }
+
+ protected function formatMessage(string $message, array $context = []): string
+ {
+ return sprintf(
+ '[trigger%s] %s %s',
+ /* @phpstan-ignore-next-line */
+ isset($this->connection) ? ".{$this->connection}" : 'default',
+ $message,
+ $context ? json_encode($context, JSON_UNESCAPED_UNICODE) : ''
+ );
+ }
+
+ protected function getLogger(): ?LoggerInterface
+ {
+ /* @phpstan-ignore-next-line */
+ if (isset($this->logger) && $this->logger instanceof LoggerInterface) {
+ return $this->logger;
+ }
+
+ $container = ApplicationContext::getContainer();
+
+ if ($container->has(StdoutLoggerInterface::class)) {
+ return $container->get(StdoutLoggerInterface::class);
+ }
+
+ return null;
+ }
+}
diff --git a/src/Trigger/AbstractTrigger.php b/src/Trigger/AbstractTrigger.php
new file mode 100644
index 0000000..2d75712
--- /dev/null
+++ b/src/Trigger/AbstractTrigger.php
@@ -0,0 +1,28 @@
+ $property) {
+ if ($property->events == ['*']) {
+ $property->events = ['write', 'update', 'delete'];
+ }
+ $queue->insert([$class, $property], $property->priority);
+ }
+
+ foreach ($queue as $value) {
+ [$class, $property] = $value;
+
+ /** @var Trigger $property */
+ foreach ($property->events as $eventType) {
+ $config = $this->config->get('trigger.connections.' . $property->connection);
+ $property->table ??= class_basename($class);
+ $property->database ??= $config['databases_only'][0] ?? '';
+
+ $key = $this->buildKey($property->connection, $property->database, $property->table, $eventType);
+ $method = 'on' . ucfirst($eventType);
+
+ $items = Arr::get($this->triggers, $key, []);
+ $items[] = [$class, $method];
+
+ Arr::set($this->triggers, $key, $items);
+ }
+ }
+ }
+
+ public function get(string $key): array
+ {
+ return Arr::get($this->triggers, $key, []);
+ }
+
+ public function getDatabases(string $connection): array
+ {
+ return array_keys($this->get($connection));
+ }
+
+ public function getTables(string $connection): array
+ {
+ $tables = [];
+
+ foreach ($this->getDatabases($connection) as $database) {
+ $tables = [...$tables, ...array_keys($this->get($this->buildKey($connection, $database)))];
+ }
+
+ return $tables;
+ }
+
+ private function buildKey(...$arguments): string
+ {
+ return join('.', $arguments);
+ }
+}
diff --git a/src/Util.php b/src/Util.php
new file mode 100644
index 0000000..f26d8c4
--- /dev/null
+++ b/src/Util.php
@@ -0,0 +1,36 @@
+