From 9abfee2925483a1c008b4addd6b4d72e95ed547e Mon Sep 17 00:00:00 2001 From: Deeka Wong Date: Mon, 12 Jun 2023 13:27:54 +0800 Subject: [PATCH] Adds `trigger` component (#239) Co-authored-by: Deeka Wong <8337659+huangdijia@users.noreply.github.com> --- .gitattributes | 4 + .github/workflows/close-pull-request.yml | 13 ++ .github/workflows/release.yaml | 25 ++ LICENSE | 21 ++ README.md | 73 ++++++ composer.json | 55 +++++ publish/trigger.php | 51 +++++ src/Annotation/Subscriber.php | 22 ++ src/Annotation/Trigger.php | 27 +++ src/Aspect/BinaryDataReaderAspect.php | 34 +++ src/Command/ConsumeCommand.php | 85 +++++++ src/Command/SubscribersCommand.php | 52 +++++ src/Command/TriggersCommand.php | 60 +++++ src/ConfigProvider.php | 45 ++++ src/Consumer.php | 213 ++++++++++++++++++ src/ConsumerManager.php | 64 ++++++ src/Contact/TriggerInterface.php | 20 ++ src/Event/OnReplicationStop.php | 20 ++ src/EventDispatcher.php | 15 ++ src/Listener/OnBootApplicationListener.php | 44 ++++ src/Monitor/HealthMonitor.php | 98 ++++++++ src/Mutex/RedisServerMutex.php | 116 ++++++++++ src/Mutex/ServerMutexInterface.php | 18 ++ .../BinLogCurrentSnapshotInterface.php | 20 ++ src/Snapshot/RedisBinLogCurrentSnapshot.php | 56 +++++ src/Subscriber/AbstractSubscriber.php | 17 ++ src/Subscriber/SnapshotSubscriber.php | 30 +++ src/Subscriber/TriggerSubscriber.php | 103 +++++++++ src/SubscriberManager.php | 56 +++++ src/Traits/Logger.php | 69 ++++++ src/Trigger/AbstractTrigger.php | 28 +++ src/TriggerManager.php | 87 +++++++ src/Util.php | 36 +++ 33 files changed, 1677 insertions(+) create mode 100644 .gitattributes create mode 100644 .github/workflows/close-pull-request.yml create mode 100644 .github/workflows/release.yaml create mode 100644 LICENSE create mode 100644 README.md create mode 100644 composer.json create mode 100644 publish/trigger.php create mode 100644 src/Annotation/Subscriber.php create mode 100644 src/Annotation/Trigger.php create mode 100644 src/Aspect/BinaryDataReaderAspect.php create mode 100644 src/Command/ConsumeCommand.php create mode 100644 src/Command/SubscribersCommand.php create mode 100644 src/Command/TriggersCommand.php create mode 100644 src/ConfigProvider.php create mode 100644 src/Consumer.php create mode 100644 src/ConsumerManager.php create mode 100644 src/Contact/TriggerInterface.php create mode 100644 src/Event/OnReplicationStop.php create mode 100644 src/EventDispatcher.php create mode 100644 src/Listener/OnBootApplicationListener.php create mode 100644 src/Monitor/HealthMonitor.php create mode 100644 src/Mutex/RedisServerMutex.php create mode 100644 src/Mutex/ServerMutexInterface.php create mode 100644 src/Snapshot/BinLogCurrentSnapshotInterface.php create mode 100644 src/Snapshot/RedisBinLogCurrentSnapshot.php create mode 100644 src/Subscriber/AbstractSubscriber.php create mode 100644 src/Subscriber/SnapshotSubscriber.php create mode 100644 src/Subscriber/TriggerSubscriber.php create mode 100644 src/SubscriberManager.php create mode 100644 src/Traits/Logger.php create mode 100644 src/Trigger/AbstractTrigger.php create mode 100644 src/TriggerManager.php create mode 100644 src/Util.php diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..c90148b --- /dev/null +++ b/.gitattributes @@ -0,0 +1,4 @@ +/.github export-ignore +/.vscode export-ignore +/tests export-ignore +.gitattributes export-ignore \ No newline at end of file diff --git a/.github/workflows/close-pull-request.yml b/.github/workflows/close-pull-request.yml new file mode 100644 index 0000000..fa4b5ca --- /dev/null +++ b/.github/workflows/close-pull-request.yml @@ -0,0 +1,13 @@ +name: Close Pull Request + +on: + pull_request_target: + types: [opened] + +jobs: + run: + runs-on: ubuntu-latest + steps: + - uses: superbrothers/close-pull-request@v3 + with: + comment: "Thank you for your pull request. However, you have submitted this PR on the friendsofhyperf organization which is a read-only sub split of `friendsofhyperf/components`. Please submit your PR on the https://github.com/friendsofhyperf/components repository.

Thanks!" diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml new file mode 100644 index 0000000..dde70ae --- /dev/null +++ b/.github/workflows/release.yaml @@ -0,0 +1,25 @@ +on: + push: + # Sequence of patterns matched against refs/tags + tags: + - 'v*' # Push events to matching v*, i.e. v1.0, v1.0.0 + +name: Release + +jobs: + release: + name: Release + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v2 + - name: Create Release + id: create_release + uses: actions/create-release@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + tag_name: ${{ github.ref }} + release_name: Release ${{ github.ref }} + draft: false + prerelease: false \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..75bb9fc --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2020 D.J.Hwang + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..a595f41 --- /dev/null +++ b/README.md @@ -0,0 +1,73 @@ +# Trigger + +[![Latest Stable Version](https://poser.pugx.org/friendsofhyperf/trigger/version.png)](https://packagist.org/packages/friendsofhyperf/trigger) +[![Total Downloads](https://poser.pugx.org/friendsofhyperf/trigger/d/total.png)](https://packagist.org/packages/friendsofhyperf/trigger) +[![GitHub license](https://img.shields.io/github/license/friendsofhyperf/trigger)](https://github.com/friendsofhyperf/trigger) + +MySQL trigger component for Hyperf, Based on a great work of creators:[moln/php-mysql-replication](https://github.com/moln/php-mysql-replication) + +## Installation + +- Request + +```bash +composer require "friendsofhyperf/trigger:~3.0.0" +``` + +- Publish + +```bash +php bin/hyperf.php vendor:publish friendsofhyperf/trigger +``` + +## Define a trigger + +```php +namespace App\Trigger; + +use FriendsOfHyperf\Trigger\Annotation\Trigger; +use FriendsOfHyperf\Trigger\Trigger\AbstractTrigger; +use MySQLReplication\Event\DTO\EventDTO; + +#[Trigger(table:"table", on:"*", connection:"default")] +class FooTrigger extends AbstractTrigger +{ + public function onWrite(array $new) + { + var_dump($new); + } + + public function onUpdate(array $old, array $new) + { + var_dump($old, $new); + } + + public function onDelete(array $old) + { + var_dump($old); + } +} +``` + +## Define a subscriber + +```php +namespace App\Subscriber; + +use FriendsOfHyperf\Trigger\Annotation\Subscriber; +use FriendsOfHyperf\Trigger\Subscriber\AbstractEventSubscriber; +use MySQLReplication\Event\DTO\EventDTO; + +#[Subscriber(connection:"default")] +class BarSubscriber extends AbstractEventSubscriber +{ + protected function allEvents(EventDTO $event): void + { + // some code + } +} +``` + +## Sponsor + +If you like them, Buy me a cup of coffee. [ [Alipay](https://hdj.me/images/alipay.jpg) | [WePay](https://hdj.me/images/wechat-pay.jpg) ] diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..e826218 --- /dev/null +++ b/composer.json @@ -0,0 +1,55 @@ +{ + "name": "friendsofhyperf/trigger", + "description": "The MySQL Trigger component for Hyperf.", + "type": "library", + "require": { + "php": ">=8.0.0", + "friendsofhyperf/command-signals": "~3.0.0", + "hyperf/collection": "~3.0.0", + "hyperf/command": "~3.0.0", + "hyperf/conditionable": "~3.0.0", + "hyperf/context": "~3.0.0", + "hyperf/coordinator": "~3.0.0", + "hyperf/coroutine": "~3.0.0", + "hyperf/di": "~3.0.0", + "hyperf/event": "~3.0.0", + "hyperf/process": "~3.0.0", + "hyperf/stringable": "~3.0.0", + "hyperf/support": "~3.0.0", + "hyperf/tappable": "~3.0.0", + "moln/php-mysql-replication": "^1.2" + }, + "autoload": { + "psr-4": { + "FriendsOfHyperf\\Trigger\\": "src" + } + }, + "extra": { + "hyperf": { + "config": "FriendsOfHyperf\\Trigger\\ConfigProvider" + } + }, + "suggest": { + "hyperf/redis": "Required to use Redis client.", + "hyperf/signal": "Required to use Signal manager." + }, + "config": { + "sort-packages": true, + "allow-plugins": { + "composer/package-versions-deprecated": true + } + }, + "minimum-stability": "dev", + "prefer-stable": true, + "license": "MIT", + "authors": [ + { + "name": "huangdijia", + "email": "huangdijia@gmail.com" + } + ], + "support": { + "issues": "https://github.com/friendsofhyperf/components/issues", + "source": "https://github.com/friendsofhyperf/components" + } +} diff --git a/publish/trigger.php b/publish/trigger.php new file mode 100644 index 0000000..3d3289b --- /dev/null +++ b/publish/trigger.php @@ -0,0 +1,51 @@ + [ + 'default' => [ + 'enable' => env('TRIGGER_ENABLE', true), + + 'host' => env('TRIGGER_HOST', ''), + 'port' => (int) env('TRIGGER_PORT', 3306), + 'user' => env('TRIGGER_USER', ''), + 'password' => env('TRIGGER_PASSWORD', ''), + 'databases_only' => env('TRIGGER_DATABASES_ONLY', '') ? explode(',', env('TRIGGER_DATABASES_ONLY')) : [], + 'tables_only' => env('TRIGGER_TABLES_ONLY', '') ? explode(',', env('TRIGGER_TABLES_ONLY')) : [], + 'heartbeat_period' => (int) env('TRIGGER_HEARTBEAT', 3), + + 'server_mutex' => [ + 'enable' => true, + 'expires' => 30, + 'keepalive_interval' => 10, + 'retry_interval' => 10, + ], + + 'health_monitor' => [ + 'enable' => true, + 'interval' => 30, + ], + + 'snapshot' => [ + 'version' => '1.0', + 'expires' => 24 * 3600, + 'interval' => 10, + ], + + 'concurrent' => [ + 'limit' => 1000, + ], + + 'consume_timeout' => 600, + ], + ], +]; diff --git a/src/Annotation/Subscriber.php b/src/Annotation/Subscriber.php new file mode 100644 index 0000000..24757c2 --- /dev/null +++ b/src/Annotation/Subscriber.php @@ -0,0 +1,22 @@ +arguments['keys']['size'] == BinaryDataReader::UNSIGNED_INT64_LENGTH) { + return (fn () => (int) $this->readUInt64())->call($proceedingJoinPoint->getInstance()); + } + + return $proceedingJoinPoint->process(); + } +} diff --git a/src/Command/ConsumeCommand.php b/src/Command/ConsumeCommand.php new file mode 100644 index 0000000..281a6aa --- /dev/null +++ b/src/Command/ConsumeCommand.php @@ -0,0 +1,85 @@ +getConsumers(); + + $this->trap([SIGINT, SIGTERM], function ($signo) use ($consumers) { + $this->warn(sprintf('Received signal %d, exiting...', $signo)); + + foreach ($consumers as $consumer) { + $consumer->stop(); + } + + CoordinatorManager::until(self::class)->resume(); + }); + + foreach ($consumers as $consumer) { + Coroutine::create(function () use ($consumer) { + $consumer->start(); + }); + } + + CoordinatorManager::until(self::class)->yield(); + + $this->info('Bye!'); + } + + /** + * @return Consumer[] + */ + public function getConsumers(): array + { + $consumers = []; + $connections = $this->config->get('trigger.connections', []); + + foreach ($connections as $connection => $options) { + if (isset($options['enable']) && ! $options['enable']) { + continue; + } + + $consumers[] = make(Consumer::class, [ + 'connection' => $connection, + 'options' => (array) $options, + ]); + } + + return $consumers; + } +} diff --git a/src/Command/SubscribersCommand.php b/src/Command/SubscribersCommand.php new file mode 100644 index 0000000..c3c4150 --- /dev/null +++ b/src/Command/SubscribersCommand.php @@ -0,0 +1,52 @@ +filter(function ($property, $class) { + if ($connection = $this->input->getOption('connection')) { + return $connection == $property->connection; + } + return true; + }) + ->transform(fn ($property, $class) => [$property->connection, $class, $property->priority]) + ->merge([ + ['[default]', SnapshotSubscriber::class, 1], + ['[default]', TriggerSubscriber::class, 1], + ]); + + $this->info('Subscribers:'); + $this->table(['Connection', 'Subscriber', 'Priority'], $rows); + } +} diff --git a/src/Command/TriggersCommand.php b/src/Command/TriggersCommand.php new file mode 100644 index 0000000..845e416 --- /dev/null +++ b/src/Command/TriggersCommand.php @@ -0,0 +1,60 @@ +each(function ($property, $class) { + /* @var Trigger $property */ + $property->table ??= class_basename($class); + }) + ->filter(function ($property, $class) { + /* @var Trigger $property */ + if ($this->input->getOption('connection')) { + return $this->input->getOption('connection') == $property->connection; + } + return true; + }) + ->filter(function ($property, $class) { + /* @var Trigger $property */ + if ($this->input->getOption('table')) { + return $this->input->getOption('table') == $property->table; + } + return true; + }) + ->transform(fn ($property, $class) => [$property->connection, $property->database, $property->table, implode(',', $property->events), $class, $property->priority]); + + $this->info('Triggers:'); + $this->table(['connection', 'Database', 'Table', 'Events', 'Trigger', 'Priority'], $rows); + } +} diff --git a/src/ConfigProvider.php b/src/ConfigProvider.php new file mode 100644 index 0000000..74a4fca --- /dev/null +++ b/src/ConfigProvider.php @@ -0,0 +1,45 @@ + [ + Aspect\BinaryDataReaderAspect::class, // Fix MySQLReplication bug + ], + 'dependencies' => [ + Mutex\ServerMutexInterface::class => Mutex\RedisServerMutex::class, + Snapshot\BinLogCurrentSnapshotInterface::class => Snapshot\RedisBinLogCurrentSnapshot::class, + ], + 'commands' => [ + Command\ConsumeCommand::class, + Command\SubscribersCommand::class, + Command\TriggersCommand::class, + ], + 'listeners' => [ + Listener\OnBootApplicationListener::class, + ], + 'publish' => [ + [ + 'id' => 'config', + 'description' => 'The config file of trigger.', + 'source' => __DIR__ . '/../publish/trigger.php', + 'destination' => BASE_PATH . '/config/autoload/trigger.php', + ], + ], + ]; + } +} diff --git a/src/Consumer.php b/src/Consumer.php new file mode 100644 index 0000000..e74d414 --- /dev/null +++ b/src/Consumer.php @@ -0,0 +1,213 @@ +name = $options['name']; + } + + $this->binLogCurrentSnapshot = make(BinLogCurrentSnapshotInterface::class, [ + 'consumer' => $this, + ]); + + if ($this->getOption('server_mutex.enable', true)) { + $this->serverMutex = make(ServerMutexInterface::class, [ + 'name' => 'trigger:mutex:' . $this->connection, + 'owner' => Util::getInternalIp(), + 'options' => $this->getOption('server_mutex', []) + ['connection' => $this->connection], + ]); + } + + if ($this->getOption('health_monitor.enable', true)) { + $this->healthMonitor = make(HealthMonitor::class, ['consumer' => $this]); + } + } + + public function start(): void + { + $callback = function () { + // Health monitor + if ($this->healthMonitor) { + $this->healthMonitor->process(); + } + + $replication = $this->makeReplication(); + + // Replication start + CoordinatorManager::until($this->getIdentifier())->resume(); + + $this->debug('Consumer started.'); + + // Worker exit + Coroutine::create(function () { + CoordinatorManager::until(Constants::WORKER_EXIT)->yield(); + + $this->stop(); + + $this->warning('Consumer stopped.'); + }); + + while (1) { + if ($this->isStopped()) { + break; + } + + wait(fn () => $replication->consume(), (float) $this->getOption('consume_timeout', 600)); + } + }; + + if ($this->serverMutex) { + $this->serverMutex->attempt($callback); + } else { + $callback(); + } + } + + public function getBinLogCurrentSnapshot(): BinLogCurrentSnapshotInterface + { + return $this->binLogCurrentSnapshot; + } + + public function getHealthMonitor(): ?HealthMonitor + { + return $this->healthMonitor; + } + + public function getName(): string + { + return $this->name ?? 'trigger-' . $this->connection; + } + + public function getOption(?string $key = null, $default = null) + { + if (is_null($key)) { + return $this->options; + } + + return Arr::get($this->options, $key, $default); + } + + public function getConnection(): string + { + return $this->connection; + } + + public function getIdentifier(): string + { + return sprintf('%s_start', $this->connection); + } + + public function stop(): void + { + $this->stopped = true; + $this->serverMutex?->release(); + } + + public function isStopped(): bool + { + return $this->stopped; + } + + protected function makeReplication(): MySQLReplicationFactory + { + $connection = $this->connection; + // Get options + $config = (array) $this->options; + // Get databases of replication + $databasesOnly = array_replace( + $config['databases_only'] ?? [], + $this->triggerManager->getDatabases($connection) + ); + // Get tables of replication + $tablesOnly = array_replace( + $config['tables_only'] ?? [], + $this->triggerManager->getTables($connection) + ); + + /** @var ConfigBuilder */ + $configBuilder = tap( + new ConfigBuilder(), + fn (ConfigBuilder $builder) => $builder->withUser($config['user'] ?? 'root') + ->withHost($config['host'] ?? '127.0.0.1') + ->withPassword($config['password'] ?? 'root') + ->withPort((int) $config['port'] ?? 3306) + ->withSlaveId(random_int(100, 999)) + ->withHeartbeatPeriod((float) $config['heartbeat_period'] ?? 3) + ->withDatabasesOnly($databasesOnly) + ->withTablesOnly($tablesOnly) + ); + + if ($binLogCurrent = $this->getBinLogCurrentSnapshot()->get()) { + $configBuilder->withBinLogFileName($binLogCurrent->getBinFileName()); + $configBuilder->withBinLogPosition((int) $binLogCurrent->getBinLogPosition()); + + $this->debug('Continue with position', $binLogCurrent->jsonSerialize()); + } + + $eventDispatcher = make(EventDispatcher::class); + + return tap( + make(MySQLReplicationFactory::class, [ + 'config' => $configBuilder->build(), + 'eventDispatcher' => $eventDispatcher, + ]), + function ($factory) use ($connection) { + /** @var MySQLReplicationFactory $factory */ + $subscribers = $this->subscriberManager->get($connection); + $subscribers[] = TriggerSubscriber::class; + $subscribers[] = SnapshotSubscriber::class; + + foreach ($subscribers as $subscriber) { + $factory->registerSubscriber(make($subscriber, ['consumer' => $this])); + } + } + ); + } +} diff --git a/src/ConsumerManager.php b/src/ConsumerManager.php new file mode 100644 index 0000000..f426f26 --- /dev/null +++ b/src/ConsumerManager.php @@ -0,0 +1,64 @@ +config->get('trigger.connections', []); + + foreach ($connections as $connection => $options) { + if (isset($options['enable']) && ! $options['enable']) { + continue; + } + + $consumer = make(Consumer::class, [ + 'connection' => $connection, + 'options' => (array) $options, + ]); + + $process = $this->createProcess($consumer); + $process->name = $consumer->getName(); + $process->nums = 1; + + ProcessManager::register($process); + } + } + + protected function createProcess(Consumer $consumer): AbstractProcess + { + return new class($this->container, $consumer) extends AbstractProcess { + public function __construct(ContainerInterface $container, protected Consumer $consumer) + { + parent::__construct($container); + } + + public function handle(): void + { + $this->consumer->start(); + } + }; + } +} diff --git a/src/Contact/TriggerInterface.php b/src/Contact/TriggerInterface.php new file mode 100644 index 0000000..b5e871b --- /dev/null +++ b/src/Contact/TriggerInterface.php @@ -0,0 +1,20 @@ +subscriberManager->register(); + $this->triggerManager->register(); + $this->consumerManager->run(); + } +} diff --git a/src/Monitor/HealthMonitor.php b/src/Monitor/HealthMonitor.php new file mode 100644 index 0000000..128fdca --- /dev/null +++ b/src/Monitor/HealthMonitor.php @@ -0,0 +1,98 @@ +connection = $consumer->getConnection(); + $this->monitorInterval = (int) $consumer->getOption('health_monitor.interval', 10); + $this->snapShortInterval = (int) $consumer->getOption('snapshot.interval', 10); + $this->binLogCurrentSnapshot = $consumer->getBinLogCurrentSnapshot(); + if ($container->has(StdoutLoggerInterface::class)) { + $this->logger = $container->get(StdoutLoggerInterface::class); + } + $this->timer = new Timer($this->logger); + } + + public function process(): void + { + Coroutine::create(function () { + CoordinatorManager::until($this->consumer->getIdentifier())->yield(); + + // Monitor binLogCurrent + $this->timer->tick($this->monitorInterval, function () { + if ($this->binLogCurrent instanceof BinLogCurrent) { + $this->debug( + sprintf( + 'Health monitoring, binLogCurrent: %s', + json_encode($this->binLogCurrent->jsonSerialize(), JSON_THROW_ON_ERROR) + ) + ); + } + }); + + // Health check and set snapshot + $this->timer->tick($this->snapShortInterval, function () { + if (! $this->binLogCurrent instanceof BinLogCurrent) { + return; + } + + $binLogCurrentCache = $this->binLogCurrentSnapshot->get(); + + if ( + $binLogCurrentCache instanceof BinLogCurrent + && $binLogCurrentCache->getBinLogPosition() == $this->binLogCurrent->getBinLogPosition() + ) { + if ($this->container->has(EventDispatcherInterface::class)) { + $this->container->get(EventDispatcherInterface::class)?->dispatch(new OnReplicationStop($this->connection, $this->binLogCurrent)); + } + } + + $this->binLogCurrentSnapshot->set($this->binLogCurrent); + }); + }); + } + + public function setBinLogCurrent(BinLogCurrent $binLogCurrent): void + { + $this->binLogCurrent = $binLogCurrent; + } +} diff --git a/src/Mutex/RedisServerMutex.php b/src/Mutex/RedisServerMutex.php new file mode 100644 index 0000000..02ca42d --- /dev/null +++ b/src/Mutex/RedisServerMutex.php @@ -0,0 +1,116 @@ +expires = (int) $options['expires'] ?? 60; + $this->keepaliveInterval = (int) $options['keepalive_interval'] ?? 10; + $this->name = $name ?? sprintf('trigger:server:%s', $this->connection); + $this->owner = $owner ?? Util::getInternalIp(); + $this->connection = $options['connection']; + $this->timer = new Timer($logger); + $this->retryInterval = (int) $options['retry_interval'] ?? 10; + } + + public function attempt(callable $callback = null): void + { + // Waiting for the server mutex. + $this->timer->tick($this->retryInterval, function () { + if ( + $this->redis->set($this->name, $this->owner, ['NX', 'EX' => $this->expires]) + || $this->redis->get($this->name) == $this->owner + ) { + $this->debug('Got server mutex.'); + CoordinatorManager::until($this->getIdentifier())->resume(); + + return Timer::STOP; + } + + $this->debug('Waiting server mutex.'); + }); + + // Waiting for the server mutex. + CoordinatorManager::until($this->getIdentifier())->yield(); + + $this->debug('Server mutex keepalive booted.'); + + $this->timer->tick($this->keepaliveInterval, function () { + if ($this->released) { + $this->debug('Server mutex keepalive stopped.'); + + return Timer::STOP; + } + + $this->redis->setNx($this->name, $this->owner); + $this->redis->expire($this->name, $this->expires); + $ttl = $this->redis->ttl($this->name); + + $this->debug('Server mutex keepalive executed', ['ttl' => $ttl]); + }); + + // Execute the callback. + if ($callback) { + try { + $callback(); + } catch (Throwable $e) { + $this->error((string) $e); + } + } + } + + /** + * Release the server mutex. + * @throws RedisException + */ + public function release(bool $force = false): void + { + if ($force || $this->redis->get($this->name) == $this->owner) { + $this->redis->del($this->name); + $this->released = true; + } + } + + protected function getIdentifier(): string + { + return sprintf('%s_%s', $this->connection, __CLASS__); + } +} diff --git a/src/Mutex/ServerMutexInterface.php b/src/Mutex/ServerMutexInterface.php new file mode 100644 index 0000000..eddfbb7 --- /dev/null +++ b/src/Mutex/ServerMutexInterface.php @@ -0,0 +1,18 @@ +redis->set($this->key(), serialize($binLogCurrent)); + $this->redis->expire($this->key(), (int) $this->consumer->getOption('snapshot.expires', 24 * 3600)); + } + + public function get(): ?BinLogCurrent + { + return with($this->redis->get($this->key()), function ($data) { + $data = unserialize((string) $data); + + if (! $data instanceof BinLogCurrent) { + return null; + } + + return $data; + }); + } + + private function key(): string + { + return join(':', [ + 'trigger', + 'snapshot', + 'binLogCurrent', + $this->consumer->getOption('snapshot.version', '1.0'), + $this->consumer->getConnection(), + ]); + } +} diff --git a/src/Subscriber/AbstractSubscriber.php b/src/Subscriber/AbstractSubscriber.php new file mode 100644 index 0000000..7427679 --- /dev/null +++ b/src/Subscriber/AbstractSubscriber.php @@ -0,0 +1,17 @@ +consumer->getHealthMonitor()) { + return; + } + + $this->consumer->getHealthMonitor()->setBinLogCurrent($event->getEventInfo()->getBinLogCurrent()); + } +} diff --git a/src/Subscriber/TriggerSubscriber.php b/src/Subscriber/TriggerSubscriber.php new file mode 100644 index 0000000..aadaae4 --- /dev/null +++ b/src/Subscriber/TriggerSubscriber.php @@ -0,0 +1,103 @@ +concurrent = new Concurrent( + (int) $consumer->getOption('concurrent.limit') ?? 1000 + ); + } + + public static function getSubscribedEvents(): array + { + return [ + ConstEventsNames::UPDATE => 'onUpdate', + ConstEventsNames::DELETE => 'onDelete', + ConstEventsNames::WRITE => 'onWrite', + ]; + } + + protected function allEvents(EventDTO $event): void + { + if (! $event instanceof RowsDTO) { + return; + } + + $key = join('.', [ + $this->consumer->getConnection(), + $event->getTableMap()->getDatabase(), + $event->getTableMap()->getTable(), + $event->getType(), + ]); + + $eventType = $event->getType(); + + foreach ($this->triggerManager->get($key) as $callable) { + foreach ($event->getValues() as $value) { + $this->concurrent->create(function () use ($callable, $value, $eventType) { + [$class, $method] = $callable; + + if (! $this->container->has($class)) { + $this->warning(sprintf('Entry "%s" cannot be resolved.', $class)); + return; + } + + $args = match ($eventType) { + ConstEventsNames::WRITE => [$value], + ConstEventsNames::UPDATE => [$value['before'], $value['after']], + ConstEventsNames::DELETE => [$value], + default => null, + }; + + if (! $args) { + return; + } + + try { + call([$this->container->get($class), $method], $args); + } catch (Throwable $e) { + $this->error(sprintf( + "%s in %s:%s\n%s", + $e->getMessage(), + $e->getFile(), + $e->getLine(), + $e->getTraceAsString() + )); + } + }); + } + } + } +} diff --git a/src/SubscriberManager.php b/src/SubscriberManager.php new file mode 100644 index 0000000..17ec49c --- /dev/null +++ b/src/SubscriberManager.php @@ -0,0 +1,56 @@ + $property) { + $queue->insert([$class, $property], $property->priority); + } + + foreach ($queue as $value) { + [$class, $property] = $value; + $this->subscribers[$property->connection] ??= []; + $this->subscribers[$property->connection][] = $class; + + $this->logger->debug(sprintf( + '[trigger.%s] %s registered by %s process by %s.', + $property->connection, + $this::class, + $class, + $this::class + )); + } + } + + public function get(string $connection = 'default'): array + { + return (array) Arr::get($this->subscribers, $connection, []); + } +} diff --git a/src/Traits/Logger.php b/src/Traits/Logger.php new file mode 100644 index 0000000..4c66259 --- /dev/null +++ b/src/Traits/Logger.php @@ -0,0 +1,69 @@ +getLogger()?->info($this->formatMessage($message, $context)); + } + + protected function debug(string $message, array $context = []): void + { + $this->getLogger()?->debug($this->formatMessage($message, $context)); + } + + protected function warning(string $message, array $context = []): void + { + $this->getLogger()?->warning($this->formatMessage($message, $context)); + } + + protected function error(string $message, array $context = []): void + { + $this->getLogger()?->error($this->formatMessage($message, $context)); + } + + protected function formatMessage(string $message, array $context = []): string + { + return sprintf( + '[trigger%s] %s %s', + /* @phpstan-ignore-next-line */ + isset($this->connection) ? ".{$this->connection}" : 'default', + $message, + $context ? json_encode($context, JSON_UNESCAPED_UNICODE) : '' + ); + } + + protected function getLogger(): ?LoggerInterface + { + /* @phpstan-ignore-next-line */ + if (isset($this->logger) && $this->logger instanceof LoggerInterface) { + return $this->logger; + } + + $container = ApplicationContext::getContainer(); + + if ($container->has(StdoutLoggerInterface::class)) { + return $container->get(StdoutLoggerInterface::class); + } + + return null; + } +} diff --git a/src/Trigger/AbstractTrigger.php b/src/Trigger/AbstractTrigger.php new file mode 100644 index 0000000..2d75712 --- /dev/null +++ b/src/Trigger/AbstractTrigger.php @@ -0,0 +1,28 @@ + $property) { + if ($property->events == ['*']) { + $property->events = ['write', 'update', 'delete']; + } + $queue->insert([$class, $property], $property->priority); + } + + foreach ($queue as $value) { + [$class, $property] = $value; + + /** @var Trigger $property */ + foreach ($property->events as $eventType) { + $config = $this->config->get('trigger.connections.' . $property->connection); + $property->table ??= class_basename($class); + $property->database ??= $config['databases_only'][0] ?? ''; + + $key = $this->buildKey($property->connection, $property->database, $property->table, $eventType); + $method = 'on' . ucfirst($eventType); + + $items = Arr::get($this->triggers, $key, []); + $items[] = [$class, $method]; + + Arr::set($this->triggers, $key, $items); + } + } + } + + public function get(string $key): array + { + return Arr::get($this->triggers, $key, []); + } + + public function getDatabases(string $connection): array + { + return array_keys($this->get($connection)); + } + + public function getTables(string $connection): array + { + $tables = []; + + foreach ($this->getDatabases($connection) as $database) { + $tables = [...$tables, ...array_keys($this->get($this->buildKey($connection, $database)))]; + } + + return $tables; + } + + private function buildKey(...$arguments): string + { + return join('.', $arguments); + } +} diff --git a/src/Util.php b/src/Util.php new file mode 100644 index 0000000..f26d8c4 --- /dev/null +++ b/src/Util.php @@ -0,0 +1,36 @@ +