diff --git a/publish/trigger.php b/publish/trigger.php index b12baba..13d03ff 100644 --- a/publish/trigger.php +++ b/publish/trigger.php @@ -44,8 +44,6 @@ 'concurrent' => [ 'limit' => 1000, ], - - 'consume_timeout' => 600, ], ], ]; diff --git a/src/Consumer.php b/src/Consumer.php index 535b38a..9a4bb54 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -24,8 +24,8 @@ use Hyperf\Coroutine\Coroutine; use MySQLReplication\Config\ConfigBuilder; use MySQLReplication\MySQLReplicationFactory; +use Throwable; -use function Hyperf\Coroutine\wait; use function Hyperf\Support\make; use function Hyperf\Tappable\tap; @@ -74,7 +74,7 @@ public function __construct( public function start(): void { $callback = function () { - // Health monitor + // Health monitor start if ($this->healthMonitor) { $this->healthMonitor->process(); } @@ -89,9 +89,7 @@ public function start(): void // Worker exit Coroutine::create(function () { CoordinatorManager::until(Constants::WORKER_EXIT)->yield(); - $this->stop(); - $this->warning('Consumer stopped.'); }); @@ -100,7 +98,11 @@ public function start(): void break; } - wait(fn () => $replication->consume(), (float) $this->getOption('consume_timeout', 600)); + try { + $replication->consume(); + } catch (Throwable $e) { + $this->warning((string) $e); + } } };