diff --git a/publish/trigger.php b/publish/trigger.php index 9a139ed..4de32d0 100644 --- a/publish/trigger.php +++ b/publish/trigger.php @@ -22,6 +22,7 @@ 'databases_only' => env('TRIGGER_DATABASES_ONLY', '') ? explode(',', env('TRIGGER_DATABASES_ONLY')) : [], 'tables_only' => env('TRIGGER_TABLES_ONLY', '') ? explode(',', env('TRIGGER_TABLES_ONLY')) : [], 'heartbeat_period' => (int) env('TRIGGER_HEARTBEAT', 3), + 'connect_retries' => 10, 'server_mutex' => [ 'enable' => true, diff --git a/src/Consumer.php b/src/Consumer.php index 34fdd64..9290538 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -27,6 +27,7 @@ use MySQLReplication\Socket\SocketException; use function Hyperf\Support\make; +use function Hyperf\Support\retry; use function Hyperf\Tappable\tap; class Consumer @@ -101,7 +102,12 @@ public function start(): void try { $replication->consume(); } catch (SocketException $e) { - $replication->connect(); + retry( + (int) $this->getOption('connect_retries', 10), + fn () => $replication->connect(), + 200 + ); + $this->debug('Connection lost, reconnected.'); } } }; @@ -200,9 +206,9 @@ protected function makeReplication(): MySQLReplicationFactory make(MySQLReplicationFactory::class, [ 'config' => $configBuilder->build(), 'eventDispatcher' => $eventDispatcher, + 'logger' => $this->logger, ]), - function ($factory) use ($connection) { - /** @var MySQLReplicationFactory $factory */ + function (MySQLReplicationFactory $factory) use ($connection) { $subscribers = $this->subscriberManager->get($connection); $subscribers[] = TriggerSubscriber::class; $subscribers[] = SnapshotSubscriber::class;