From e69f74c15e4473579b2956eea3a3083604678ec1 Mon Sep 17 00:00:00 2001 From: Deeka Wong Date: Fri, 22 Sep 2023 08:50:44 +0800 Subject: [PATCH] Catch the exception when consume timeout (#347) * Catch the exception when consume timeout * Remove the waiter when consume * Optimize phpdoc --------- Co-authored-by: Deeka Wong <8337659+huangdijia@users.noreply.github.com> --- publish/trigger.php | 2 -- src/Consumer.php | 12 +++++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/publish/trigger.php b/publish/trigger.php index b12baba..13d03ff 100644 --- a/publish/trigger.php +++ b/publish/trigger.php @@ -44,8 +44,6 @@ 'concurrent' => [ 'limit' => 1000, ], - - 'consume_timeout' => 600, ], ], ]; diff --git a/src/Consumer.php b/src/Consumer.php index 535b38a..9a4bb54 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -24,8 +24,8 @@ use Hyperf\Coroutine\Coroutine; use MySQLReplication\Config\ConfigBuilder; use MySQLReplication\MySQLReplicationFactory; +use Throwable; -use function Hyperf\Coroutine\wait; use function Hyperf\Support\make; use function Hyperf\Tappable\tap; @@ -74,7 +74,7 @@ public function __construct( public function start(): void { $callback = function () { - // Health monitor + // Health monitor start if ($this->healthMonitor) { $this->healthMonitor->process(); } @@ -89,9 +89,7 @@ public function start(): void // Worker exit Coroutine::create(function () { CoordinatorManager::until(Constants::WORKER_EXIT)->yield(); - $this->stop(); - $this->warning('Consumer stopped.'); }); @@ -100,7 +98,11 @@ public function start(): void break; } - wait(fn () => $replication->consume(), (float) $this->getOption('consume_timeout', 600)); + try { + $replication->consume(); + } catch (Throwable $e) { + $this->warning((string) $e); + } } };