Skip to content

Commit

Permalink
Async processor must be independent classes
Browse files Browse the repository at this point in the history
  • Loading branch information
loicsapone committed Jan 14, 2024
1 parent dee8fbc commit 5385156
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 108 deletions.
6 changes: 1 addition & 5 deletions src/Bundle/Command/AbstractImportCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

use IQ2i\DataImporter\Archiver\ArchiverInterface;
use IQ2i\DataImporter\Bundle\Exception\ItemHandlingException;
use IQ2i\DataImporter\Bundle\Processor\AsyncCliProcessor;
use IQ2i\DataImporter\Bundle\Processor\CliProcessor;
use IQ2i\DataImporter\DataImporter;
use IQ2i\DataImporter\Exchange\Message;
Expand Down Expand Up @@ -43,7 +42,6 @@ protected function configure(): void
{
$this
->addArgument('filename', InputArgument::OPTIONAL, 'File to import')
->addOption('async', null, InputOption::VALUE_NONE, 'Parallelize row process using symfony/messenger')
->addOption('step', null, InputOption::VALUE_NONE, 'Step through each record one-by-one')
->addOption('pause-on-error', null, InputOption::VALUE_NONE, 'Pause if an exception is thrown')
->addOption('batch-size', null, InputOption::VALUE_REQUIRED, 'Batch size', 100)
Expand Down Expand Up @@ -95,9 +93,7 @@ protected function handleEnd(): callable

protected function getProcessor(InputInterface $input, OutputInterface $output): ProcessorInterface
{
return $input->getOption('async')
? new AsyncCliProcessor($input, $output, $this->handleBegin(), $this->handleItem(), $this->handleBatch(), $this->handleEnd(), $this->getSerializer())
: new CliProcessor($input, $output, $this->handleBegin(), $this->handleItem(), $this->handleBatch(), $this->handleEnd(), $this->getSerializer());
return new CliProcessor($input, $output, $this->handleBegin(), $this->handleItem(), $this->handleBatch(), $this->handleEnd(), $this->getSerializer());
}

protected function getArchiver(): ?ArchiverInterface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,13 @@

use IQ2i\DataImporter\Exchange\Message;

class ProcessItemMessage
class AsyncMessage
{
public function __construct(
private readonly \Closure $handleItem,
private readonly Message $data,
) {
}

public function getHandleItem(): \Closure
{
return $this->handleItem;
}

public function getData(): Message
{
return $this->data;
Expand Down
22 changes: 0 additions & 22 deletions src/Bundle/Messenger/MessageHandler.php

This file was deleted.

46 changes: 46 additions & 0 deletions src/Bundle/Processor/AbstractAsyncProcessor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?php

declare(strict_types=1);

/*
* This file is part of the DataImporter package.
*
* (c) Loïc Sapone <[email protected]>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace IQ2i\DataImporter\Bundle\Processor;

use IQ2i\DataImporter\Bundle\Messenger\AsyncMessage;
use IQ2i\DataImporter\Exchange\Message;
use IQ2i\DataImporter\Processor\ProcessorInterface;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Contracts\Service\Attribute\Required;

abstract class AbstractAsyncProcessor implements ProcessorInterface
{
private ?MessageBusInterface $bus = null;

public function begin(Message $message)
{
}

public function item(Message $message)
{
$this->bus->dispatch(new AsyncMessage($message));
}

abstract public function processItem(AsyncMessage $message);

public function end(Message $message)
{
}

#[Required]
public function setBus(MessageBusInterface $bus): void
{
$this->bus = $bus;
}
}
20 changes: 0 additions & 20 deletions src/Bundle/Processor/AsyncCliProcessor.php

This file was deleted.

4 changes: 0 additions & 4 deletions src/Bundle/Resources/config/services.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

namespace Symfony\Component\DependencyInjection\Loader\Configurator;

use IQ2i\DataImporter\Bundle\Messenger\MessageHandler;
use IQ2i\DataImporter\Command\GenerateDtoCommand;

return static function (ContainerConfigurator $container) {
Expand All @@ -22,8 +21,5 @@
->arg('$defaultPath', '%kernel.project_dir%')
->arg('$defaultNamespace', 'App\\Dto')
->tag('console.command', ['command' => 'iq2i:data-importer:generate-dto'])

->set('iq2i_data_importer.messenger.message_handler', MessageHandler::class)
->tag('messenger.message_handler')
;
};
10 changes: 0 additions & 10 deletions src/DataImporter.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,11 @@
namespace IQ2i\DataImporter;

use IQ2i\DataImporter\Archiver\ArchiverInterface;
use IQ2i\DataImporter\Bundle\Messenger\ProcessItemMessage;
use IQ2i\DataImporter\Exchange\MessageFactory;
use IQ2i\DataImporter\Processor\AsyncProcessorInterface;
use IQ2i\DataImporter\Processor\BatchProcessorInterface;
use IQ2i\DataImporter\Processor\ProcessorInterface;
use IQ2i\DataImporter\Reader\ReaderInterface;
use Symfony\Component\Filesystem\Exception\IOException;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Serializer\NameConverter\CamelCaseToSnakeCaseNameConverter;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
use Symfony\Component\Serializer\Serializer;
Expand All @@ -36,7 +33,6 @@ public function __construct(
private readonly ProcessorInterface $processor,
private readonly ?ArchiverInterface $archiver = null,
SerializerInterface $serializer = null,
private readonly ?MessageBusInterface $bus = null,
) {
$this->serializer = $serializer ?? new Serializer([new ObjectNormalizer(null, new CamelCaseToSnakeCaseNameConverter())]);
}
Expand All @@ -51,12 +47,6 @@ public function execute(): void
$this->reader->isDenormalizable() ? $this->serializeData($data) : $data
);

if ($this->processor instanceof AsyncProcessorInterface && null !== $this->bus) {
$this->bus->dispatch(new ProcessItemMessage(fn () => $this->processor->item($message), $message));

continue;
}

$this->processor->item($message);

if ($this->processor instanceof BatchProcessorInterface && (
Expand Down
18 changes: 0 additions & 18 deletions src/Processor/AsyncProcessorInterface.php

This file was deleted.

17 changes: 8 additions & 9 deletions tests/DataImporterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
namespace IQ2i\DataImporter\Tests;

use IQ2i\DataImporter\Archiver\DateTimeArchiver;
use IQ2i\DataImporter\Bundle\Messenger\ProcessItemMessage;
use IQ2i\DataImporter\Bundle\Messenger\AsyncMessage;
use IQ2i\DataImporter\DataImporter;
use IQ2i\DataImporter\Processor\CallbackProcessor;
use IQ2i\DataImporter\Reader\CsvReader;
Expand Down Expand Up @@ -168,9 +168,9 @@ public function testWithUnreadableArchivePath()
public function testAsyncProcessor()
{
$bus = new class() implements MessageBusInterface {
public $messages = [];
public array $messages = [];

public $stamps = [];
public array $stamps = [];

public function dispatch($message, array $stamps = []): Envelope
{
Expand All @@ -181,26 +181,25 @@ public function dispatch($message, array $stamps = []): Envelope
}
};

$processor = new AsyncProcessor();
$processor->setBus($bus);

$dataImporter = new DataImporter(
new CsvReader(
$this->fs->getChild('books.csv')->url(),
null,
[CsvReader::CONTEXT_DELIMITER => ';']
),
new AsyncProcessor(),
null,
null,
$bus
$processor,
);

$dataImporter->execute();

$this->assertCount(2, $bus->messages);

/** @var ProcessItemMessage $message */
/** @var AsyncMessage $message */
$message = \array_pop($bus->messages);

$this->assertEquals(static function () {}, $message->getHandleItem());
$this->assertEquals([
'author' => 'Ralls, Kim',
'title' => 'Midnight Rain',
Expand Down
17 changes: 4 additions & 13 deletions tests/fixtures/Processor/AsyncProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,12 @@

namespace IQ2i\DataImporter\Tests\fixtures\Processor;

use IQ2i\DataImporter\Exchange\Message;
use IQ2i\DataImporter\Processor\AsyncProcessorInterface;
use IQ2i\DataImporter\Processor\ProcessorInterface;
use IQ2i\DataImporter\Bundle\Messenger\AsyncMessage;
use IQ2i\DataImporter\Bundle\Processor\AbstractAsyncProcessor;

class AsyncProcessor implements ProcessorInterface, AsyncProcessorInterface
class AsyncProcessor extends AbstractAsyncProcessor
{
public function begin(Message $message)
{
}

public function item(Message $message)
{
}

public function end(Message $message)
public function processItem(AsyncMessage $message)
{
}
}

0 comments on commit 5385156

Please sign in to comment.