Skip to content

Commit

Permalink
Implemened subprocesses to prevent memory exhaustion on huge indices
Browse files Browse the repository at this point in the history
  • Loading branch information
Christian Bader committed May 29, 2024
1 parent 0f94137 commit 35421a8
Show file tree
Hide file tree
Showing 10 changed files with 286 additions and 94 deletions.
13 changes: 12 additions & 1 deletion src/Command/BaseCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,16 @@

abstract class BaseCommand extends AbstractCommand
{
protected const COMMAND_NAMESPACE = 'valantic:elastica-bridge:';
protected function displayThrowable(\Throwable $throwable): void
{
$this->output->writeln('');
$this->output->writeln(sprintf('In %s line %d', $throwable->getFile(), $throwable->getLine()));
$this->output->writeln('');

$this->output->writeln($throwable->getMessage());
$this->output->writeln('');

$this->output->writeln($throwable->getTraceAsString());
$this->output->writeln('');
}
}
3 changes: 2 additions & 1 deletion src/Command/Cleanup.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Question\ConfirmationQuestion;
use Valantic\ElasticaBridgeBundle\Constant\CommandConstants;
use Valantic\ElasticaBridgeBundle\Elastica\Client\ElasticsearchClient;
use Valantic\ElasticaBridgeBundle\Repository\IndexRepository;

Expand All @@ -27,7 +28,7 @@ public function __construct(

protected function configure(): void
{
$this->setName(self::COMMAND_NAMESPACE . 'cleanup')
$this->setName(CommandConstants::COMMAND_CLEANUP)
->setDescription('Deletes Elasticsearch indices and aliases known to (i.e. created by) the bundle')
->addOption(
self::OPTION_ALL_IN_CLUSTER,
Expand Down
166 changes: 166 additions & 0 deletions src/Command/DoPopulateIndex.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
<?php

declare(strict_types=1);

namespace Valantic\ElasticaBridgeBundle\Command;

use Elastica\Index as ElasticaIndex;
use Pimcore\Model\Element\AbstractElement;
use Symfony\Component\Console\Helper\ProgressBar;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Valantic\ElasticaBridgeBundle\Constant\CommandConstants;
use Valantic\ElasticaBridgeBundle\Exception\Command\DocumentFailedException;
use Valantic\ElasticaBridgeBundle\Index\IndexInterface;
use Valantic\ElasticaBridgeBundle\Repository\ConfigurationRepository;
use Valantic\ElasticaBridgeBundle\Repository\DocumentRepository;
use Valantic\ElasticaBridgeBundle\Repository\IndexRepository;
use Valantic\ElasticaBridgeBundle\Service\DocumentHelper;

class DoPopulateIndex extends BaseCommand
{
public function __construct(
private readonly IndexRepository $indexRepository,
private readonly DocumentRepository $documentRepository,
private readonly DocumentHelper $documentHelper,
private readonly ConfigurationRepository $configurationRepository,
) {
parent::__construct();
}

protected function configure(): void
{
$this->setName(CommandConstants::COMMAND_DO_POPULATE_INDEX)
->setHidden(true)
->setDescription('[INTERNAL]')
->addOption(CommandConstants::OPTION_CONFIG, mode: InputOption::VALUE_REQUIRED)
->addOption(CommandConstants::OPTION_INDEX, mode: InputOption::VALUE_REQUIRED)
->addOption(CommandConstants::OPTION_BATCH_NUMBER, mode: InputOption::VALUE_REQUIRED)
->addOption(CommandConstants::OPTION_LISTING_COUNT, mode: InputOption::VALUE_REQUIRED)
->addOption(CommandConstants::OPTION_DOCUMENT, mode: InputOption::VALUE_REQUIRED)
;
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$indexConfig = $this->getIndex();

if (!$indexConfig instanceof IndexInterface) {
return self::FAILURE;
}

return $this->populateIndex($indexConfig, $indexConfig->getBlueGreenInactiveElasticaIndex());
}

private function getIndex(): ?IndexInterface
{
foreach ($this->indexRepository->flattenedAll() as $indexConfig) {
if ($indexConfig->getName() === $this->input->getOption(CommandConstants::OPTION_CONFIG)) {
return $indexConfig;
}
}

return null;
}

private function populateIndex(IndexInterface $indexConfig, ElasticaIndex $esIndex): int
{
ProgressBar::setFormatDefinition('custom', "%percent%%\t%remaining%\t%memory%\n%message%");

$batchNumber = (int) $this->input->getOption(CommandConstants::OPTION_BATCH_NUMBER);
$listingCount = (int) $this->input->getOption(CommandConstants::OPTION_LISTING_COUNT);

$allowedDocuments = $indexConfig->getAllowedDocuments();
$document = $this->input->getOption(CommandConstants::OPTION_DOCUMENT);

if (!in_array($document, $allowedDocuments, true)) {
return self::FAILURE;
}

$progressBar = new ProgressBar($this->output, $listingCount > 0 ? $listingCount : 1);
$progressBar->setMessage($document);
$progressBar->setFormat('custom');
$progressBar->setProgress($batchNumber * $indexConfig->getBatchSize());

if (!$indexConfig->shouldPopulateInSubprocesses()) {
$numberOfBatches = ceil($listingCount / $indexConfig->getBatchSize());

for ($batch = 0; $batch < $numberOfBatches; $batch++) {
$exitCode = $this->doPopulateIndex($esIndex, $indexConfig, $progressBar, $document, $batch);

if ($exitCode !== self::SUCCESS) {
return $exitCode;
}
}
} else {
return $this->doPopulateIndex($esIndex, $indexConfig, $progressBar, $document, $batchNumber);
}

return self::SUCCESS;
}

private function doPopulateIndex(
ElasticaIndex $esIndex,
IndexInterface $indexConfig,
ProgressBar $progressBar,
string $document,
int $batchNumber,
): int {
$documentInstance = $this->documentRepository->get($document);

$this->documentHelper->setTenantIfNeeded($documentInstance, $indexConfig);

$batchSize = $indexConfig->getBatchSize();

$listing = $documentInstance->getListingInstance($indexConfig);
$listing->setOffset($batchNumber * $batchSize);
$listing->setLimit($batchSize);

$esDocuments = [];

foreach ($listing->getData() ?? [] as $dataObject) {
try {
if (!$documentInstance->shouldIndex($dataObject)) {
continue;
}
$progressBar->advance();

$esDocuments[] = $this->documentHelper->elementToDocument($documentInstance, $dataObject);
} catch (\Throwable $throwable) {
$this->displayDocumentError($indexConfig, $document, $dataObject, $throwable);

if (!$this->configurationRepository->shouldSkipFailingDocuments()) {
throw new DocumentFailedException($throwable);
}
}
}

if (count($esDocuments) > 0) {
$esIndex->addDocuments($esDocuments);
$esDocuments = [];
}

if ($indexConfig->refreshIndexAfterEveryDocumentWhenPopulating()) {
$esIndex->refresh();
}

return self::SUCCESS;
}

private function displayDocumentError(
IndexInterface $indexConfig,
string $document,
AbstractElement $dataObject,
\Throwable $throwable,
): void {
$this->output->writeln('');
$this->output->writeln(sprintf(
'<fg=red;options=bold>Error while populating index %s, processing documents of type %s, last processed element ID %s.</>',
$indexConfig::class,
$document,
$dataObject->getId()
));
$this->displayThrowable($throwable);
}
}
9 changes: 5 additions & 4 deletions src/Command/Index.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\HttpKernel\KernelInterface;
use Symfony\Component\Process\Process;
use Valantic\ElasticaBridgeBundle\Constant\CommandConstants;
use Valantic\ElasticaBridgeBundle\Elastica\Client\ElasticsearchClient;
use Valantic\ElasticaBridgeBundle\Enum\IndexBlueGreenSuffix;
use Valantic\ElasticaBridgeBundle\Exception\Index\BlueGreenIndicesIncorrectlySetupException;
Expand Down Expand Up @@ -39,7 +40,7 @@ public function __construct(

protected function configure(): void
{
$this->setName(self::COMMAND_NAMESPACE . 'index')
$this->setName(CommandConstants::COMMAND_INDEX)
->setDescription('Ensures all the indices are present and populated.')
->addArgument(
self::ARGUMENT_INDEX,
Expand Down Expand Up @@ -169,9 +170,9 @@ private function populateIndex(IndexInterface $indexConfig, ElasticaIndex $esInd
self::$isPopulating = true;
$process = new Process(
[
'bin/console', self::COMMAND_NAMESPACE . 'populate-index',
'--config', $indexConfig->getName(),
'--index', $esIndex->getName(),
'bin/console', CommandConstants::COMMAND_POPULATE_INDEX,
'--' . CommandConstants::OPTION_CONFIG, $indexConfig->getName(),
'--' . CommandConstants::OPTION_INDEX, $esIndex->getName(),
...array_filter([$this->output->isVerbose() ? '-v' : null,
$this->output->isVeryVerbose() ? '-vv' : null,
$this->output->isDebug() ? '-vvv' : null,
Expand Down
Loading

0 comments on commit 35421a8

Please sign in to comment.