Skip to content
This repository has been archived by the owner on Jan 29, 2024. It is now read-only.

Commit

Permalink
add try lock
Browse files Browse the repository at this point in the history
  • Loading branch information
rez1dent3 committed Jul 12, 2020
1 parent 26db3f2 commit c465e60
Showing 1 changed file with 26 additions and 15 deletions.
41 changes: 26 additions & 15 deletions src/Commands/BulkWrite.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
use Bavix\Entry\Services\BulkService;
use Bavix\Entry\Jobs\BulkWriter;
use Illuminate\Console\Command;
use Illuminate\Contracts\Cache\LockTimeoutException;
use Illuminate\Support\Facades\Cache;

class BulkWrite extends Command
{
Expand All @@ -29,23 +31,32 @@ class BulkWrite extends Command
*/
public function handle(): void
{
$batchSize = \config('entry.batchSize', 10000);
$keys = app(BulkService::class)->keys();
foreach ($keys as $key) {
[$bulkName, $class] = \explode(':', $key, 2);
$chunkIterator = app(BulkService::class)
->chunkIterator($batchSize, $key);

foreach ($chunkIterator as $bulkData) {
foreach ($bulkData as $itemKey => $itemValue) {
$bulkData[$itemKey] = \json_decode($itemValue, true);
$lock = Cache::lock(__CLASS__, 120);
try {
$lock->block(1);
// Lock acquired after waiting maximum of second...
$batchSize = \config('entry.batchSize', 10000);
$keys = app(BulkService::class)->keys();
foreach ($keys as $key) {
[$bulkName, $class] = \explode(':', $key, 2);
$chunkIterator = app(BulkService::class)
->chunkIterator($batchSize, $key);

foreach ($chunkIterator as $bulkData) {
foreach ($bulkData as $itemKey => $itemValue) {
$bulkData[$itemKey] = \json_decode($itemValue, true);
}

$queueName = \config('entry.queueName', 'default');
$job = new BulkWriter(new $class, $bulkData);
$job->onQueue($queueName);
\dispatch($job);
}

$queueName = \config('entry.queueName', 'default');
$job = new BulkWriter(new $class, $bulkData);
$job->onQueue($queueName);
\dispatch($job);
}
} catch (LockTimeoutException $timeoutException) {
// Unable to acquire lock...
} finally {
optional($lock)->release();
}
}

Expand Down

0 comments on commit c465e60

Please sign in to comment.