Skip to content

Commit 7bac4d7

Browse files
committed
database驱动支持设置connection
1 parent 2988cd5 commit 7bac4d7

File tree

3 files changed

+23
-16
lines changed

3 files changed

+23
-16
lines changed

composer.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
"minimum-stability": "dev",
2525
"require": {
2626
"ext-json": "*",
27-
"topthink/think-helper": "^3.0.4",
2827
"topthink/framework": "^6.0",
2928
"symfony/process": "^4.2",
3029
"nesbot/carbon": "^2.16"

src/config.php

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@
1616
'type' => 'sync',
1717
],
1818
'database' => [
19-
'type' => 'database',
20-
'queue' => 'default',
21-
'table' => 'jobs',
19+
'type' => 'database',
20+
'queue' => 'default',
21+
'table' => 'jobs',
22+
'connection' => null,
2223
],
2324
'redis' => [
2425
'type' => 'redis',

src/queue/connector/Database.php

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Carbon\Carbon;
1515
use stdClass;
1616
use think\Db;
17+
use think\db\ConnectionInterface;
1718
use think\db\Query;
1819
use think\queue\Connector;
1920
use think\queue\InteractsWithTime;
@@ -47,7 +48,7 @@ class Database extends Connector
4748
*/
4849
protected $retryAfter = 60;
4950

50-
public function __construct(Db $db, $table, $default = 'default', $retryAfter = 60)
51+
public function __construct(ConnectionInterface $db, $table, $default = 'default', $retryAfter = 60)
5152
{
5253
$this->db = $db;
5354
$this->table = $table;
@@ -57,7 +58,9 @@ public function __construct(Db $db, $table, $default = 'default', $retryAfter =
5758

5859
public static function __make(Db $db, $config)
5960
{
60-
return new self($db, $config['table'], $config['queue'], $config['retry_after'] ?? 60);
61+
$connection = $db->connect($config['connection'] ?? null);
62+
63+
return new self($connection, $config['table'], $config['queue'], $config['retry_after'] ?? 60);
6164
}
6265

6366
public function size($queue = null)
@@ -105,9 +108,9 @@ function ($job) use ($queue, $data, $availableAt) {
105108
/**
106109
* 重新发布任务
107110
*
108-
* @param string $queue
111+
* @param string $queue
109112
* @param StdClass $job
110-
* @param int $delay
113+
* @param int $delay
111114
* @return mixed
112115
*/
113116
public function release($queue, $job, $delay)
@@ -119,9 +122,9 @@ public function release($queue, $job, $delay)
119122
* Push a raw payload to the database with a given delay.
120123
*
121124
* @param \DateTime|int $delay
122-
* @param string|null $queue
123-
* @param string $payload
124-
* @param int $attempts
125+
* @param string|null $queue
126+
* @param string $payload
127+
* @param int $attempts
125128
* @return mixed
126129
*/
127130
protected function pushToDatabase($queue, $payload, $delay = 0, $attempts = 0)
@@ -160,7 +163,8 @@ public function pop($queue = null)
160163
protected function getNextAvailableJob($queue)
161164
{
162165

163-
$job = $this->db->name($this->table)
166+
$job = $this->db
167+
->name($this->table)
164168
->lock(true)
165169
->where('queue', $this->getQueue($queue))
166170
->where(function (Query $query) {
@@ -190,10 +194,13 @@ protected function getNextAvailableJob($queue)
190194
*/
191195
protected function markJobAsReserved($job)
192196
{
193-
$this->db->name($this->table)->where('id', $job->id)->update([
194-
'reserve_time' => $job->reserve_time = $this->currentTime(),
195-
'attempts' => ++$job->attempts,
196-
]);
197+
$this->db
198+
->name($this->table)
199+
->where('id', $job->id)
200+
->update([
201+
'reserve_time' => $job->reserve_time = $this->currentTime(),
202+
'attempts' => ++$job->attempts,
203+
]);
197204

198205
return $job;
199206
}

0 commit comments

Comments
 (0)