Skip to content

Commit 1c790c4

Browse files
committed
Use Scheduler as alternative to cron
refactoring command into service usage of sync:// transport by default to preserve current behaviour
1 parent 1ddf4d8 commit 1c790c4

26 files changed

+709
-522
lines changed

composer.json

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,18 @@
1010
"psr/log": "^3.0",
1111
"ruflin/elastica": "^8.0.0",
1212
"symfony/console": "^6.2",
13-
"symfony/lock": "^6.2"
13+
"symfony/lock": "^6.2",
14+
"symfony/scheduler": "^7.1"
1415
},
1516
"require-dev": {
1617
"bamarni/composer-bin-plugin": "^1.8.2",
1718
"phpstan/extension-installer": "^1.4.3",
18-
"phpstan/phpstan": "^1.12.3",
19+
"phpstan/phpstan": "^1.12.7",
1920
"phpstan/phpstan-deprecation-rules": "^1.2.1",
20-
"phpstan/phpstan-strict-rules": "^1.6.0",
21-
"rector/rector": "^1.2.5",
21+
"phpstan/phpstan-strict-rules": "^1.6.1",
22+
"rector/rector": "^1.2.8",
2223
"sentry/sentry": "^4.9.0",
23-
"symfony/http-client": "^6.4.11"
24+
"symfony/http-client": "^6.4.12"
2425
},
2526
"license": "MIT",
2627
"authors": [

src/Command/Index.php

Lines changed: 14 additions & 408 deletions
Large diffs are not rendered by default.

src/Controller/.gitkeep

Whitespace-only changes.

src/DependencyInjection/Configuration.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public function getConfigTreeBuilder()
2626
->integerNode('lock_timeout')->defaultValue(5 * 60)->info('To prevent overlapping indexing jobs. Set to a value higher than the slowest index. Value is specified in seconds.')->end()
2727
->integerNode('cooldown')->info('Cooldown time in seconds between populating jobs. Only considered in async operation. Starts at end of last indexing job.')->defaultValue(0)->end()
2828
->booleanNode('should_skip_failing_documents')->defaultFalse()->info('If true, when a document fails to be indexed, it will be skipped and indexing continue with the next document. If false, indexing that index will be aborted.')->end()
29-
->booleanNode('populate_async')->defaultFalse()->info('If true, documents to populate are being sent to the queue.')->end()
29+
->integerNode('interval')->defaultValue(600)->info('Interval in seconds at which the scheduler should be executed')->end()
3030
->end()
3131
->end()
3232
->arrayNode('events')

src/Document/DocumentRelationAwareDataObjectTrait.php

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,15 @@
44

55
namespace Valantic\ElasticaBridgeBundle\Document;
66

7+
use Elastic\Elasticsearch\Exception\ClientResponseException;
8+
use Elastic\Elasticsearch\Exception\ServerResponseException;
79
use Elastica\Query\BoolQuery;
810
use Elastica\Query\MatchQuery;
911
use Pimcore\Model\Element\AbstractElement;
10-
use Valantic\ElasticaBridgeBundle\Command\Index;
12+
use Symfony\Contracts\Service\Attribute\Required;
1113
use Valantic\ElasticaBridgeBundle\Enum\DocumentType;
1214
use Valantic\ElasticaBridgeBundle\Index\IndexInterface;
15+
use Valantic\ElasticaBridgeBundle\Service\LockService;
1316

1417
/**
1518
* Can be used on conjunction with DocumentNormalizerTrait::$relatedObjects.
@@ -20,20 +23,31 @@
2023
trait DocumentRelationAwareDataObjectTrait
2124
{
2225
protected IndexInterface $index;
26+
private LockService $privateLockService;
2327

2428
public function shouldIndex(AbstractElement $element): bool
2529
{
26-
$result = (
27-
Index::$isPopulating && $this->index->usesBlueGreenIndices()
28-
? $this->index->getBlueGreenInactiveElasticaIndex()
29-
: $this->index->getElasticaIndex()
30-
)
31-
->search(
32-
(new BoolQuery())
33-
->addFilter(new MatchQuery(DocumentInterface::META_TYPE, DocumentType::DOCUMENT))
34-
->addFilter(new MatchQuery(DocumentInterface::ATTRIBUTE_RELATED_OBJECTS, $element->getId()))
35-
);
30+
try {
31+
$result = (
32+
$this->privateLockService->isPopulating($this->index) && $this->index->usesBlueGreenIndices()
33+
? $this->index->getBlueGreenInactiveElasticaIndex()
34+
: $this->index->getElasticaIndex()
35+
)
36+
->count(
37+
(new BoolQuery())
38+
->addFilter(new MatchQuery(DocumentInterface::META_TYPE, DocumentType::DOCUMENT))
39+
->addFilter(new MatchQuery(DocumentInterface::ATTRIBUTE_RELATED_OBJECTS, $element->getId())),
40+
);
41+
} catch (ClientResponseException|ServerResponseException) {
42+
$result = 0;
43+
}
3644

37-
return $result->count() > 0;
45+
return $result > 0;
46+
}
47+
48+
#[Required]
49+
public function setLockService(LockService $lockService): void
50+
{
51+
$this->privateLockService = $lockService;
3852
}
3953
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Valantic\ElasticaBridgeBundle\EventListener\Messenger;
6+
7+
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;
8+
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
9+
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
10+
use Valantic\ElasticaBridgeBundle\Messenger\Message\CreateDocument;
11+
use Valantic\ElasticaBridgeBundle\Service\LockService;
12+
13+
#[AsEventListener]
14+
class CreateDocumentFailedEvent implements EventSubscriberInterface
15+
{
16+
public function __construct(private readonly LockService $lockService) {}
17+
18+
public function onMessageFailed(WorkerMessageFailedEvent $event): void
19+
{
20+
if ($event->willRetry() || !$event->getEnvelope()->getMessage() instanceof CreateDocument) {
21+
return;
22+
}
23+
$this->lockService->lockExecution($event->getEnvelope()->getMessage()->esIndex);
24+
}
25+
26+
public static function getSubscribedEvents()
27+
{
28+
return [
29+
WorkerMessageFailedEvent::class => 'onMessageFailed',
30+
];
31+
}
32+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Valantic\ElasticaBridgeBundle\Exception\Index;
6+
7+
use Valantic\ElasticaBridgeBundle\Exception\BaseException;
8+
9+
class AlreadyInProgressException extends BaseException
10+
{
11+
public const TYPE_COOLDOWN = 'cooldown';
12+
public const TYPE_INDEXING = 'indexing';
13+
public const TYPE_PROCESSING_MESSAGES = 'processing messages';
14+
public const TYPE_NO_DOCUMENTS = '0 documents';
15+
public const TYPE_PROCESSING = 'processing lock active';
16+
17+
/**
18+
* @param self::TYPE_* $type
19+
*/
20+
public function __construct(string $type)
21+
{
22+
parent::__construct(sprintf('Process not started (%s)', $type));
23+
}
24+
}

src/Index/AbstractIndex.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ final public function getCreateArguments(): array
5858

5959
public function getBatchSize(): int
6060
{
61-
return 5000;
61+
return 500;
6262
}
6363

6464
public function shouldPopulateInSubprocesses(): bool

src/Index/IndexInterface.php

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,6 @@ public function findDocumentInstanceByPimcore(AbstractElement $element): ?Docume
117117
*
118118
* @see DocumentNormalizerTrait::$relatedObjects
119119
* @see IndexCommand
120-
* @see IndexCommand::$isPopulating
121120
* @see IndexInterface::getBlueGreenInactiveElasticaIndex()
122121
*/
123122
public function refreshIndexAfterEveryDocumentWhenPopulating(): bool;

src/Messenger/Handler/CreateDocumentHandler.php

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
use Symfony\Component\Console\Output\ConsoleOutputInterface;
1010
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
1111
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
12-
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
1312
use Valantic\ElasticaBridgeBundle\Messenger\Message\CreateDocument;
1413
use Valantic\ElasticaBridgeBundle\Repository\ConfigurationRepository;
1514
use Valantic\ElasticaBridgeBundle\Repository\DocumentRepository;
@@ -30,7 +29,7 @@ public function __construct(
3029
private readonly ConsoleOutputInterface $consoleOutput,
3130
) {}
3231

33-
public function __invoke(CreateDocument $message): void
32+
public function __invoke(CreateDocument $message, int $retryCount = 0): void
3433
{
3534
$this->handleMessage($message);
3635
}
@@ -40,8 +39,9 @@ public function __invoke(CreateDocument $message): void
4039
* @throws ServerResponseException
4140
* @throws MissingParameterException
4241
*/
43-
private function handleMessage(CreateDocument $message): void
44-
{
42+
private function handleMessage(
43+
CreateDocument $message,
44+
): void {
4545
$messageDecreased = false;
4646

4747
try {
@@ -64,7 +64,7 @@ private function handleMessage(CreateDocument $message): void
6464
);
6565
}
6666

67-
if ($message->callback->shouldCallEvent()) {
67+
if ($message->callback?->shouldCallEvent() === true) {
6868
$this->eventDispatcher->dispatch($message->callback->getEvent(), $message->callback->getEventName());
6969
}
7070

@@ -91,27 +91,18 @@ private function handleMessage(CreateDocument $message): void
9191
$this->consoleOutput->writeln(sprintf('Error processing message %s: %s', $message->esIndex, $throwable->getMessage()), ConsoleOutputInterface::VERBOSITY_NORMAL);
9292

9393
if (!$this->configurationRepository->shouldSkipFailingDocuments()) {
94-
$this->lockService->lockExecution($message->esIndex);
95-
96-
if ($this->configurationRepository->shouldPopulateAsync()) {
97-
throw new UnrecoverableMessageHandlingException($throwable->getMessage(), previous: $throwable);
98-
}
94+
throw $throwable;
9995
}
10096

10197
}
98+
10299
$messageDecreased = true;
103100
$this->lockService->messageProcessed($message->esIndex);
104101
} finally {
105102
if (!$messageDecreased) {
106103
$this->consoleOutput->writeln(sprintf('Message %s not processed. (ID: %s)', $message->esIndex, $message->objectId), ConsoleOutputInterface::VERBOSITY_VERBOSE);
107104
}
108105

109-
if ($message->lastItem && $message->cooldown > 0) {
110-
$key = $this->lockService->getKey($message->esIndex, 'cooldown');
111-
$this->lockService->createLockFromKey($key, ttl: $message->cooldown)->acquire();
112-
}
113-
114-
115106
\Pimcore::collectGarbage();
116107
}
117108
}

0 commit comments

Comments
 (0)