diff --git a/composer.json b/composer.json index 3db4722a7..8ddfa7650 100644 --- a/composer.json +++ b/composer.json @@ -35,10 +35,8 @@ "packages/Ecotone/src/Enqueue/", "packages/Enqueue/src" ], - "Ecotone\\EventSourcing\\": [ - "packages/PdoEventSourcing/src", - "packages/PdoEventSourcing/src" - ], + "Ecotone\\EventSourcing\\": "packages/PdoEventSourcing/src", + "Ecotone\\EventSourcingV2\\": "packages/EventSourcingV2/src", "Ecotone\\JMSConverter\\": "packages/JmsConverter/src", "Ecotone\\Redis\\": "packages/Redis/src", "Ecotone\\Sqs\\": "packages/Sqs/src", @@ -82,9 +80,11 @@ "packages/LiteApplication/tests" ], "Test\\Ecotone\\EventSourcing\\": [ - "packages/EventSourcing/tests", "packages/PdoEventSourcing/tests" ], + "Test\\Ecotone\\EventSourcingV2\\": [ + "packages/EventSourcingV2/tests" + ], "Test\\Ecotone\\JMSConverter\\": [ "packages/JmsConverter/tests" ], @@ -200,6 +200,7 @@ "ecotone/jms-converter": "1.82.0", "ecotone/laravel": "1.82.0", "ecotone/pdo-event-sourcing": "1.82.0", + "ecotone/event-sourcing": "1.82.0", "ecotone/symfony-bundle": "1.82.0" }, "scripts": { diff --git a/docker-compose.yml b/docker-compose.yml index 59e926e74..99302c4b9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -22,6 +22,7 @@ services: APP_DB_HOST: database-mysql APP_DB_PORT: 3306 APP_DB_DRIVER: pdo_mysql + DATABASE_POSTGRES: pgsql://ecotone:secret@database:5432/ecotone DATABASE_MYSQL: mysql://ecotone:secret@database-mysql:3306/ecotone SQS_DSN: sqs:?key=key&secret=secret®ion=us-east-1&endpoint=http://localstack:4566&version=latest REDIS_DSN: redis://redis:6379 @@ -50,6 +51,7 @@ services: APP_DB_HOST: database-mysql APP_DB_PORT: 3306 APP_DB_DRIVER: pdo_mysql + DATABASE_POSTGRES: pgsql://ecotone:secret@database:5432/ecotone DATABASE_MYSQL: mysql://ecotone:secret@database-mysql:3306/ecotone SQS_DSN: sqs:?key=key&secret=secret®ion=us-east-1&endpoint=http://localstack:4566&version=latest REDIS_DSN: redis://redis:6379 diff --git a/local-packages.json b/local-packages.json index fd62c3f8f..fff64803c 100644 --- a/local-packages.json +++ b/local-packages.json @@ -39,6 +39,10 @@ { "type": "path", "url": "packages/Kafka" + }, + { + "type": "path", + "url": "packages/EventSourcingV2" } ] } \ No newline at end of file diff --git a/packages/Ecotone/src/AnnotationFinder/AnnotatedDefinition.php b/packages/Ecotone/src/AnnotationFinder/AnnotatedDefinition.php index 144268560..12be69b07 100644 --- a/packages/Ecotone/src/AnnotationFinder/AnnotatedDefinition.php +++ b/packages/Ecotone/src/AnnotationFinder/AnnotatedDefinition.php @@ -4,6 +4,7 @@ namespace Ecotone\AnnotationFinder; +use Ecotone\Messaging\Handler\TypeDescriptor; use InvalidArgumentException; /** @@ -155,9 +156,9 @@ public function hasClassAnnotation(string $type): bool return false; } - public function hasAnnotation(string $type): bool + public function hasAnnotation(string|TypeDescriptor $type): bool { - return $this->hasMethodAnnotation($type) || $this->hasClassAnnotation($type); + return $this->hasMethodAnnotation((string) $type) || $this->hasClassAnnotation((string) $type); } diff --git a/packages/Ecotone/src/Messaging/Config/ModuleClassList.php b/packages/Ecotone/src/Messaging/Config/ModuleClassList.php index 889f864c6..b141b308c 100644 --- a/packages/Ecotone/src/Messaging/Config/ModuleClassList.php +++ b/packages/Ecotone/src/Messaging/Config/ModuleClassList.php @@ -15,6 +15,7 @@ use Ecotone\Dbal\ObjectManager\ObjectManagerModule; use Ecotone\Dbal\Recoverability\DbalDeadLetterModule; use Ecotone\EventSourcing\Config\EventSourcingModule; +use Ecotone\EventSourcingV2\Ecotone\Config\EventSourcingV2Module; use Ecotone\JMSConverter\Configuration\JMSConverterConfigurationModule; use Ecotone\JMSConverter\Configuration\JMSDefaultSerialization; use Ecotone\Kafka\Configuration\KafkaModule; @@ -138,6 +139,10 @@ class ModuleClassList EventSourcingModule::class, ]; + public const EVENT_SOURCING_V2_MODULES = [ + EventSourcingV2Module::class, + ]; + public const JMS_CONVERTER_MODULES = [ JMSConverterConfigurationModule::class, JMSDefaultSerialization::class, diff --git a/packages/Ecotone/src/Messaging/Config/ModulePackageList.php b/packages/Ecotone/src/Messaging/Config/ModulePackageList.php index 7c2960452..95f4cb68b 100644 --- a/packages/Ecotone/src/Messaging/Config/ModulePackageList.php +++ b/packages/Ecotone/src/Messaging/Config/ModulePackageList.php @@ -17,6 +17,7 @@ final class ModulePackageList public const SQS_PACKAGE = 'sqs'; public const KAFKA_PACKAGE = 'kafka'; public const EVENT_SOURCING_PACKAGE = 'eventSourcing'; + public const EVENT_SOURCING_V2_PACKAGE = 'eventSourcingV2'; public const JMS_CONVERTER_PACKAGE = 'jmsConverter'; public const TRACING_PACKAGE = 'tracing'; public const LARAVEL_PACKAGE = 'laravel'; @@ -34,6 +35,7 @@ public static function getModuleClassesForPackage(string $packageName): array ModulePackageList::SQS_PACKAGE => ModuleClassList::SQS_MODULES, ModulePackageList::KAFKA_PACKAGE => ModuleClassList::KAFKA_MODULES, ModulePackageList::EVENT_SOURCING_PACKAGE => ModuleClassList::EVENT_SOURCING_MODULES, + ModulePackageList::EVENT_SOURCING_V2_PACKAGE => ModuleClassList::EVENT_SOURCING_V2_MODULES, ModulePackageList::JMS_CONVERTER_PACKAGE => ModuleClassList::JMS_CONVERTER_MODULES, ModulePackageList::TRACING_PACKAGE => ModuleClassList::TRACING_MODULES, ModulePackageList::TEST_PACKAGE => ModuleClassList::TEST_MODULES, @@ -57,6 +59,7 @@ public static function allPackages(): array self::KAFKA_PACKAGE, self::DBAL_PACKAGE, self::EVENT_SOURCING_PACKAGE, + self::EVENT_SOURCING_V2_PACKAGE, self::JMS_CONVERTER_PACKAGE, self::TRACING_PACKAGE, self::LARAVEL_PACKAGE, diff --git a/packages/Ecotone/src/Modelling/AggregateFlow/CallAggregate/CallAggregateServiceBuilder.php b/packages/Ecotone/src/Modelling/AggregateFlow/CallAggregate/CallAggregateServiceBuilder.php index ceeec1253..393849bcc 100644 --- a/packages/Ecotone/src/Modelling/AggregateFlow/CallAggregate/CallAggregateServiceBuilder.php +++ b/packages/Ecotone/src/Modelling/AggregateFlow/CallAggregate/CallAggregateServiceBuilder.php @@ -72,7 +72,7 @@ private function initialize(ClassDefinition $aggregateClassDefinition, string $m $this->interfaceToCall = $interfaceToCall; $isFactoryMethod = $this->interfaceToCall->isFactoryMethod(); if (! $this->isEventSourced && $isFactoryMethod) { - Assert::isTrue($this->interfaceToCall->getReturnType()->isClassNotInterface(), "Factory method {$this->interfaceToCall} for standard aggregate should return object. Did you wanted to register Event Sourced Aggregate?"); +// Assert::isTrue($this->interfaceToCall->getReturnType()->isClassNotInterface(), "Factory method {$this->interfaceToCall} for standard aggregate should return object. Did you wanted to register Event Sourced Aggregate?"); } } diff --git a/packages/EventSourcingV2/.gitattributes b/packages/EventSourcingV2/.gitattributes new file mode 100644 index 000000000..5699823c5 --- /dev/null +++ b/packages/EventSourcingV2/.gitattributes @@ -0,0 +1,7 @@ +tests/ export-ignore +.coveralls.yml export-ignore +.gitattributes export-ignore +.gitignore export-ignore +behat.yaml export-ignore +phpstan.neon export-ignore +phpunit.xml export-ignore \ No newline at end of file diff --git a/packages/EventSourcingV2/.github/FUNDING.yml b/packages/EventSourcingV2/.github/FUNDING.yml new file mode 100644 index 000000000..c7eaae65e --- /dev/null +++ b/packages/EventSourcingV2/.github/FUNDING.yml @@ -0,0 +1,12 @@ +# These are supported funding model platforms + +github: [dgafka] +patreon: # Replace with a single Open Collective username +open_collective: # Replace with a single Open Collective username +ko_fi: # Replace with a single Ko-fi username +tidelift: # Replace with a single Tidelift platform-name/package-name e.g., npm/babel +community_bridge: # Replace with a single Community Bridge project-name e.g., cloud-foundry +liberapay: # Replace with a single Liberapay username +issuehunt: # Replace with a single IssueHunt username +otechie: # Replace with a single Otechie username +custom: # Replace with up to 4 custom sponsorship URLs e.g., ['link1', 'link2'] diff --git a/packages/EventSourcingV2/.github/ISSUE_TEMPLATE/bug_report.md b/packages/EventSourcingV2/.github/ISSUE_TEMPLATE/bug_report.md new file mode 100644 index 000000000..2fc86f2cf --- /dev/null +++ b/packages/EventSourcingV2/.github/ISSUE_TEMPLATE/bug_report.md @@ -0,0 +1,10 @@ +--- +name: This is Read-Only repository +about: Report at ecotoneframework/ecotone-dev +title: '' +labels: '' +assignees: '' + +--- + +Report issue at [ecotone-dev](ecotoneframework/ecotone-dev) \ No newline at end of file diff --git a/packages/EventSourcingV2/.gitignore b/packages/EventSourcingV2/.gitignore new file mode 100644 index 000000000..18c159d80 --- /dev/null +++ b/packages/EventSourcingV2/.gitignore @@ -0,0 +1,9 @@ +.idea/ +vendor/ +bin/ +tests/coverage +!tests/coverage/.gitkeep +file +.phpunit.result.cache +composer.lock +phpunit.xml diff --git a/packages/EventSourcingV2/LICENSE b/packages/EventSourcingV2/LICENSE new file mode 100644 index 000000000..e01b72afe --- /dev/null +++ b/packages/EventSourcingV2/LICENSE @@ -0,0 +1,21 @@ +Copyright (c) 2024 Dariusz Gafka + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. + +**Scope of the License** + +Apache-2.0 Licence applies to non Enterprise Functionalities of the Ecotone Framework. +Functionalities of the Ecotone Framework referred to as Enterprise functionalities, are not covered under the Apache-2.0 license. These functionalities are provided under a separate Enterprise License. +For details on the Enterprise License, please write to support@simplycodedsoftware.com. \ No newline at end of file diff --git a/packages/EventSourcingV2/README.md b/packages/EventSourcingV2/README.md new file mode 100644 index 000000000..11338e364 --- /dev/null +++ b/packages/EventSourcingV2/README.md @@ -0,0 +1,50 @@ +# This is Read Only Repository +To contribute make use of [Ecotone-Dev repository](https://github.com/ecotoneframework/ecotone-dev). + +

+ +

+ +![Github Actions](https://github.com/ecotoneFramework/ecotone-dev/actions/workflows/split-testing.yml/badge.svg) +[![Latest Stable Version](https://poser.pugx.org/ecotone/ecotone/v/stable)](https://packagist.org/packages/ecotone/ecotone) +[![License](https://poser.pugx.org/ecotone/ecotone/license)](https://packagist.org/packages/ecotone/ecotone) +[![Total Downloads](https://img.shields.io/packagist/dt/ecotone/ecotone)](https://packagist.org/packages/ecotone/ecotone) +[![PHP Version Require](https://img.shields.io/packagist/dependency-v/ecotone/ecotone/php.svg)](https://packagist.org/packages/ecotone/ecotone) + +`Ecotone Framework` is a `Service Bus` which enables `Message-Driven` architecture in `PHP`. +Based on resilient Message-Driven principles provides support for building applications that follows `Domain-Driven Design` (DDD), `Command Query Responsibility Segregation` (CQRS) and `Event Sourcing` (ES). + +> The term "Ecotone", in ecology means transition area between ecosystems, such as forest and grassland. +The Ecotone Framework functions as transition area between your components, modules and services. It glues things together, yet respects the boundaries of each area. + +> Ecotone can be used with [Symfony](https://docs.ecotone.tech/modules/symfony-ddd-cqrs-event-sourcing) and [Laravel](https://docs.ecotone.tech/modules/laravel-ddd-cqrs-event-sourcing) frameworks. + +## Getting started + +The quickstart [page](https://docs.ecotone.tech/quick-start) of the +[reference guide](https://docs.ecotone.tech) provides a starting point for using Ecotone. +Read more on the [Ecotone's Blog](https://blog.ecotone.tech). + +## Feature requests and issue reporting + +Use [issue tracking system](https://github.com/ecotoneframework/ecotone-dev/issues) for new feature request and bugs. +Please verify that it's not already reported by someone else. + +## Contact + +If you want to talk or ask question about Ecotone + +- [**Twitter**](https://twitter.com/EcotonePHP) +- **ecotoneframework@gmail.com** +- [**Community Channel**](https://discord.gg/CctGMcrYnV) + +## Support Ecotone + +If you want to help building and improving Ecotone consider becoming a sponsor: + +- [Sponsor Ecotone](https://github.com/sponsors/dgafka) +- [Contribute to Ecotone](https://github.com/ecotoneframework/ecotone-dev). + +## Tags + +PHP DDD CQRS Event Sourcing Symfony Laravel Service Bus diff --git a/packages/EventSourcingV2/composer.json b/packages/EventSourcingV2/composer.json new file mode 100644 index 000000000..b653d0a7f --- /dev/null +++ b/packages/EventSourcingV2/composer.json @@ -0,0 +1,83 @@ +{ + "name": "ecotone/event-sourcing", + "license": [ + "Apache-2.0", + "proprietary" + ], + "homepage": "https://docs.ecotone.tech/", + "forum": "https://discord.gg/CctGMcrYnV", + "type": "library", + "minimum-stability": "dev", + "prefer-stable": true, + "authors": [ + { + "name": "Jean de La Bédoyère" + } + ], + "keywords": ["ecotone", "EventSourcingV2"], + "description": "Extends Ecotone with EventSourcingV2 integration", + "autoload": { + "psr-4": { + "Ecotone\\EventSourcingV2\\": "src" + } + }, + "autoload-dev": { + "psr-4": { + "Test\\Ecotone\\EventSourcingV2\\": [ + "tests" + ] + } + }, + "require": { + "ecotone/ecotone": "^1.62" + }, + "require-dev": { + "ext-pdo": "*", + "ext-pdo_pgsql": "*", + "doctrine/dbal": "^3.0", + "phpunit/phpunit": "^9.5", + "phpstan/phpstan": "^1.8", + "psr/container": "^2.0", + "symfony/event-dispatcher": "^7.2", + "symfony/process": "^7.2", + "symfony/console": "^7.2", + "wikimedia/composer-merge-plugin": "^2.1" + }, + "scripts": { + "tests:phpstan": "vendor/bin/phpstan", + "tests:phpunit": "vendor/bin/phpunit", + "tests:ci": [ + "@tests:phpstan", + "@tests:phpunit" + ] + }, + "extra": { + "branch-alias": { + "dev-main": "1.62-dev" + }, + "ecotone": { + "repository": "EventSourcingV2" + }, + "merge-plugin": { + "include": [ + "../local_packages.json" + ] + }, + "license-info": { + "Apache-2.0": { + "name": "Apache License 2.0", + "url": "https://github.com/ecotoneframework/ecotone-dev/blob/main/LICENSE", + "description": "Allows to use non Enterprise features of Ecotone. For more information please write to support@simplycodedsoftware.com" + }, + "proprietary": { + "name": "Enterprise License", + "description": "Allows to use Enterprise features of Ecotone. For more information please write to support@simplycodedsoftware.com" + } + } + }, + "config": { + "allow-plugins": { + "wikimedia/composer-merge-plugin": true + } + } +} diff --git a/packages/EventSourcingV2/phpstan.neon b/packages/EventSourcingV2/phpstan.neon new file mode 100644 index 000000000..672e0fa1f --- /dev/null +++ b/packages/EventSourcingV2/phpstan.neon @@ -0,0 +1,4 @@ +parameters: + level: 1 + paths: + - src \ No newline at end of file diff --git a/packages/EventSourcingV2/phpunit.xml.dist b/packages/EventSourcingV2/phpunit.xml.dist new file mode 100644 index 000000000..fc3ebe4f7 --- /dev/null +++ b/packages/EventSourcingV2/phpunit.xml.dist @@ -0,0 +1,20 @@ + + + + + ./src + + + + + + + + tests + + + diff --git a/packages/EventSourcingV2/src/Ecotone/Attribute/EventSourced.php b/packages/EventSourcingV2/src/Ecotone/Attribute/EventSourced.php new file mode 100644 index 000000000..69f3bebb6 --- /dev/null +++ b/packages/EventSourcingV2/src/Ecotone/Attribute/EventSourced.php @@ -0,0 +1,16 @@ +findAnnotatedClasses(EventSourced::class); + $projectionsClasses = $annotationRegistrationService->findAnnotatedClasses(Projection::class); + $eventHandlers = $annotationRegistrationService->findCombined(Projection::class, EventHandler::class); + + /** @var array $asynchronousProjections */ + $asynchronousProjections = []; + foreach ($projectionsClasses as $projectionClassName) { + $attributes = $annotationRegistrationService->getAnnotationsForClass($projectionClassName); + $projectionAttribute = null; + $asyncAttribute = null; + foreach ($attributes as $attribute) { + if ($attribute instanceof Projection) { + if ($asyncAttribute) { + $asynchronousProjections[$attribute->name] = $asyncAttribute; + break; + } else { + $projectionAttribute = $attribute; + } + } + if ($attribute instanceof Asynchronous) { + if ($projectionAttribute) { + $asynchronousProjections[$projectionAttribute->name] = $attribute; + break; + } else { + $asyncAttribute = $attribute; + } + } + } + } + + return new self($eventSourcedAggregates, $eventHandlers, $asynchronousProjections); + } + + /** + * @param array $eventSourcedAggregates + * @param array $eventHandlers + * @param array $asynchronousProjections + */ + public function __construct(private array $eventSourcedAggregates, private array $eventHandlers, private array $asynchronousProjections) + { + } + + public function prepare(Configuration $messagingConfiguration, array $extensionObjects, ModuleReferenceSearchService $moduleReferenceSearchService, InterfaceToCallRegistry $interfaceToCallRegistry): void + { + /** @var array> $projectionConfigurations */ + $projectionConfigurations = []; + $eventToAsyncProjectionTrigger = []; + foreach ($this->eventHandlers as $eventHandler) { + /** @var Projection $projectionAttribute */ + $projectionAttribute = $eventHandler->getAnnotationForClass(); + + $eventName = ModellingHandlerModule::getNamedMessageChannelForEventHandler($eventHandler, $interfaceToCallRegistry); + $inputChannelName = self::getNamedMessageChannelFor($projectionAttribute->name, $eventName); + + $messagingConfiguration->registerDefaultChannelFor(SimpleMessageChannelBuilder::createPublishSubscribeChannel($inputChannelName)); + + $eventHandlerTriggeringRoutingKey = ModellingHandlerModule::getHandlerChannel($eventHandler); + $eventHandlerExecutionRoutingKey = isset($this->asynchronousProjections[$projectionAttribute->name]) + ? AsynchronousModule::getHandlerExecutionChannel($eventHandlerTriggeringRoutingKey) + : $eventHandlerTriggeringRoutingKey; + + $messagingConfiguration->registerMessageHandler( + BridgeBuilder::create() + ->withInputChannelName($inputChannelName) + ->withOutputMessageChannel($eventHandlerExecutionRoutingKey) + ->withEndpointAnnotations([PriorityBasedOnType::fromAnnotatedFinding($eventHandler)->toAttributeDefinition()]) + ); + + $projectionConfigurations[$projectionAttribute->name][$eventName] = $inputChannelName; + if (isset($this->asynchronousProjections[$projectionAttribute->name])) { + $eventToAsyncProjectionTrigger[$eventName] ??= []; + if (!in_array($projectionAttribute->name, $eventToAsyncProjectionTrigger[$eventName])) { + $eventToAsyncProjectionTrigger[$eventName][] = $projectionAttribute->name; + } + } + } + + /** @var array $projectors */ + $projectors = []; + foreach ($projectionConfigurations as $projectionName => $projectionConfiguration) { + $projectors[$projectionName] = new Definition( + EcotoneProjector::class, + [ + new Reference(MessagingEntrypoint::class), + $projectionConfiguration, + ], + ); + } + $projectorsReference = new Reference(self::class . ".projectors"); + $messagingConfiguration->registerServiceDefinition($projectorsReference, new Definition(InMemoryPSRContainer::class, [$projectors], "createFromAssociativeArray")); + + $permanentProjectors = []; + + // Asynchronous projections + if ($this->asynchronousProjections) { + $messagingConfiguration->registerServiceDefinition(EcotoneAsynchronousProjectionRunner::class, new Definition(EcotoneAsynchronousProjectionRunner::class, [ + new Reference(EventStore::class), + $projectorsReference, + ])); + $messagingConfiguration->registerServiceDefinition(EcotoneAsynchronousProjectionTrigger::class, new Definition(EcotoneAsynchronousProjectionTrigger::class, [ + new Reference(MessagingEntrypoint::class), + $eventToAsyncProjectionTrigger, + ])); + $permanentProjectors[] = new Reference(EcotoneAsynchronousProjectionTrigger::class); + $messagingConfiguration->registerMessageHandler( + ServiceActivatorBuilder::create( + EcotoneAsynchronousProjectionRunner::class, + "run" + ) + ->withEndpointId(EcotoneAsynchronousProjectionRunner::PROJECTION_RUNNER_CHANNEL . ".endpoint") + ->withInputChannelName(EcotoneAsynchronousProjectionRunner::PROJECTION_RUNNER_CHANNEL) + ); + + foreach ($this->asynchronousProjections as $projectionName => $asynchronousAttribute) { + $messagingConfiguration->registerAsynchronousEndpoint( + $asynchronousAttribute->getChannelName(), + EcotoneAsynchronousProjectionRunner::PROJECTION_RUNNER_CHANNEL . ".endpoint" + ); + } + } + + // todo: use a real event store from configuration + $messagingConfiguration->registerServiceDefinition(EventStore::class, new Definition(InMemoryEventStore::class, [ + $projectorsReference, + $permanentProjectors, + ])); + + } + + public static function getNamedMessageChannelFor(string $projectionName, string $eventName): string + { + return "projection." . $projectionName . "." . $eventName; + } + + public function canHandle($extensionObject): bool + { + return false; + } + + public function getModulePackageName(): string + { + return "eventSourcingV2"; + } + + public function getModuleExtensions(ServiceConfiguration $serviceConfiguration, array $serviceExtensions): array + { + return [ + new EventSourcingAggregateRepositoryBuilder($this->eventSourcedAggregates), + new PureEventSourcingAggregateRepositoryBuilder($this->eventSourcedAggregates), + ]; + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/src/Ecotone/Config/PureEventSourcingAggregateRepositoryBuilder.php b/packages/EventSourcingV2/src/Ecotone/Config/PureEventSourcingAggregateRepositoryBuilder.php new file mode 100644 index 000000000..d3ef7a79c --- /dev/null +++ b/packages/EventSourcingV2/src/Ecotone/Config/PureEventSourcingAggregateRepositoryBuilder.php @@ -0,0 +1,40 @@ +eventSourcedAggregatesClasses, true); + } + + public function isEventSourced(): bool + { + return true; + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/src/Ecotone/EcotoneAsynchronousProjectionRunner.php b/packages/EventSourcingV2/src/Ecotone/EcotoneAsynchronousProjectionRunner.php new file mode 100644 index 000000000..f0848222c --- /dev/null +++ b/packages/EventSourcingV2/src/Ecotone/EcotoneAsynchronousProjectionRunner.php @@ -0,0 +1,45 @@ +ecotoneProjectors->has($command->subscription)) { + return; + } + /** @var EcotoneProjector $projector */ + $projector = $this->ecotoneProjectors->get($command->subscription); + + while(true) { + $eventPage = $this->eventStore->readFromSubscription($command->subscription); + foreach ($eventPage->events as $event) { + $projector->project($event); + } + $this->eventStore->ack($eventPage); + + if ($command->until !== null && $eventPage->endPosition->isBefore($command->until)) { + \usleep(10000); + } else { + break; + } + } + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/src/Ecotone/EcotoneAsynchronousProjectionRunnerCommand.php b/packages/EventSourcingV2/src/Ecotone/EcotoneAsynchronousProjectionRunnerCommand.php new file mode 100644 index 000000000..bac80bd2f --- /dev/null +++ b/packages/EventSourcingV2/src/Ecotone/EcotoneAsynchronousProjectionRunnerCommand.php @@ -0,0 +1,18 @@ + + */ + private array $projectionsToTrigger = []; + + /** + * @param array $eventToProjectionsMapping + */ + public function __construct( + private MessagingEntrypoint $messagingEntrypoint, + private array $eventToProjectionsMapping, + ) { + } + + public function project(PersistedEvent $event): void + { + $projections = $this->eventToProjectionsMapping[$event->event->type] ?? null; + if (! $projections) { + return; + } + foreach ($projections as $projection) { + $this->projectionsToTrigger[$projection] = $event->logEventId; + } + } + + public function flush(): void + { + if (empty($this->projectionsToTrigger)) { + return; + } + foreach ($this->projectionsToTrigger as $projection => $logEventId) { + $this->messagingEntrypoint->send( + new EcotoneAsynchronousProjectionRunnerCommand($projection, $logEventId), + EcotoneAsynchronousProjectionRunner::PROJECTION_RUNNER_CHANNEL, + ); + } + $this->projectionsToTrigger = []; + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/src/Ecotone/EcotoneDbalConnection.php b/packages/EventSourcingV2/src/Ecotone/EcotoneDbalConnection.php new file mode 100644 index 000000000..856f99f61 --- /dev/null +++ b/packages/EventSourcingV2/src/Ecotone/EcotoneDbalConnection.php @@ -0,0 +1,46 @@ +adapter()->prepare($query); + } + + public function execute(string $query): void + { + $this->adapter()->execute($query); + } + + public function beginTransaction(): Transaction + { + return $this->adapter()->beginTransaction(); + } + + protected function adapter(): DoctrineConnection + { + $connectionFactory = new DbalReconnectableConnectionFactory($this->connectionFactory); + + $doctrineDbalConnection = $connectionFactory->getConnection(); + + return new DoctrineConnection($doctrineDbalConnection); + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/src/Ecotone/EcotoneProjector.php b/packages/EventSourcingV2/src/Ecotone/EcotoneProjector.php new file mode 100644 index 000000000..caf3bd623 --- /dev/null +++ b/packages/EventSourcingV2/src/Ecotone/EcotoneProjector.php @@ -0,0 +1,59 @@ + $eventToChannelMapping + */ + public function __construct( + private MessagingEntrypoint $messagingEntrypoint, + private array $eventToChannelMapping, + private ?string $setupChannel = null, + private ?string $tearDownChannel = null, + ) { + } + + public function project(PersistedEvent $event): void + { + $route = $this->eventToChannelMapping[$event->event->type] ?? null; + if ($route === null) { + return; + } + $this->messagingEntrypoint->send( + $event->event->payload, + $route, + ); + } + + public function setUp(): void + { + if ($this->setupChannel === null) { + return; + } + $this->messagingEntrypoint->send( + null, + $this->setupChannel, + ); + } + + public function tearDown(): void + { + if ($this->tearDownChannel === null) { + return; + } + $this->messagingEntrypoint->send( + null, + $this->tearDownChannel, + ); + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/src/Ecotone/EventSourcedAggregateRepositoryTrait.php b/packages/EventSourcingV2/src/Ecotone/EventSourcedAggregateRepositoryTrait.php new file mode 100644 index 000000000..8a5928e31 --- /dev/null +++ b/packages/EventSourcingV2/src/Ecotone/EventSourcedAggregateRepositoryTrait.php @@ -0,0 +1,34 @@ +getEventSourcedAttribute($aggregateClassName) !== null; + } + + protected function getEventSourcedAttribute(string|object $objectOrClass): ?EventSourced + { + $reflectionClass = new \ReflectionClass($objectOrClass); + $eventSourcedAttributes = $reflectionClass->getAttributes(EventSourced::class); + if (count($eventSourcedAttributes) === 0) { + return null; + } + return reset($eventSourcedAttributes)->newInstance(); + } + + protected function getStreamId(EventSourced $eventSourcedOptions, array $identifiers) + { + return $eventSourcedOptions->name . '-' . implode('-', $identifiers); + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/src/Ecotone/PureEventSourcedAggregateRepository.php b/packages/EventSourcingV2/src/Ecotone/PureEventSourcedAggregateRepository.php new file mode 100644 index 000000000..e64d59fb1 --- /dev/null +++ b/packages/EventSourcingV2/src/Ecotone/PureEventSourcedAggregateRepository.php @@ -0,0 +1,67 @@ +getEventSourcedAttribute($aggregateClassName); + if ($eventSourcedAttribute === null) { + throw new RuntimeException("Aggregate class $aggregateClassName is not event sourced"); + } + + $streamId = $this->getStreamId($eventSourcedAttribute, $identifiers); + $persistedEvents = $this->eventStore->load(new StreamEventId($streamId)); + + $ecotoneEvents = []; + $version = 0; + foreach ($persistedEvents as $persistedEvent) { + $ecotoneEvents[] = \Ecotone\Modelling\Event::createWithType($persistedEvent->event->type, $persistedEvent->event->payload); + $version = $persistedEvent->streamEventId->version; + } + + return EventStream::createWith($version, $ecotoneEvents); + } + + public function save(array $identifiers, string $aggregateClassName, array $events, array $metadata, int $versionBeforeHandling): void + { + $eventSourcedAttribute = $this->getEventSourcedAttribute($aggregateClassName); + if ($eventSourcedAttribute === null) { + throw new RuntimeException("Aggregate class " . $aggregateClassName . " is not event sourced"); + } + + $streamId = $this->getStreamId($eventSourcedAttribute, $identifiers); + $eventStoreEvents = []; + foreach ($events as $ecotoneEvent) { + if ($ecotoneEvent instanceof \Ecotone\Modelling\Event) { + $eventStoreEvents[] = new Event($ecotoneEvent->getEventName(), $ecotoneEvent->getPayload()); + } else { + $eventStoreEvents[] = new Event(\get_class($ecotoneEvent), $ecotoneEvent); + } + } + $this->eventStore->append(new StreamEventId($streamId), $eventStoreEvents); + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/src/Ecotone/StandaloneAggregate/EventSourcedAggregateRepository.php b/packages/EventSourcingV2/src/Ecotone/StandaloneAggregate/EventSourcedAggregateRepository.php new file mode 100644 index 000000000..5af5ae97f --- /dev/null +++ b/packages/EventSourcingV2/src/Ecotone/StandaloneAggregate/EventSourcedAggregateRepository.php @@ -0,0 +1,78 @@ +getEventSourcedAttribute($aggregateClassName); + if ($eventSourcedAttribute === null) { + throw new RuntimeException("Aggregate class $aggregateClassName is not event sourced"); + } + + $streamId = $this->getStreamId($eventSourcedAttribute, $identifiers); + $persistedEvents = $this->eventStore->load(new StreamEventId($streamId)); + + $businessEvents = []; + foreach ($persistedEvents as $persistedEvent) { + $businessEvents[] = $persistedEvent->event->payload; + } + + return $aggregateClassName::fromEvents($businessEvents); + } + + public function save(array $identifiers, object $aggregate, array $metadata, ?int $versionBeforeHandling): void + { + $eventSourcedAttribute = $this->getEventSourcedAttribute($aggregate); + if ($eventSourcedAttribute === null) { + throw new RuntimeException("Aggregate class " . get_class($aggregate) . " is not event sourced"); + } + + $streamId = $this->getStreamId($eventSourcedAttribute, $identifiers); + $businessEvents = $this->getMutatingEvents($aggregate); + $events = []; + foreach ($businessEvents as $businessEvent) { + $events[] = new Event(\get_class($businessEvent), $businessEvent); + } + $this->eventStore->append(new StreamEventId($streamId), $events); + } + + private function getMutatingEvents(string|object $objectOrClass): ?array + { + $reflectionClass = new \ReflectionClass($objectOrClass); + foreach ($reflectionClass->getMethods() as $method) { + foreach ($method->getAttributes() as $attribute) { + if ($attribute->getName() === MutatingEvents::class) { + $methodName = $method->getName(); + return $objectOrClass->$methodName(); + } + } + } + return null; + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/src/Ecotone/StandaloneAggregate/EventSourcingAggregateRepositoryBuilder.php b/packages/EventSourcingV2/src/Ecotone/StandaloneAggregate/EventSourcingAggregateRepositoryBuilder.php new file mode 100644 index 000000000..a06b3e52e --- /dev/null +++ b/packages/EventSourcingV2/src/Ecotone/StandaloneAggregate/EventSourcingAggregateRepositoryBuilder.php @@ -0,0 +1,40 @@ +eventSourcedAggregatesClasses, true); + } + + public function isEventSourced(): bool + { + return false; + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/src/Ecotone/StandaloneAggregate/EventStream.php b/packages/EventSourcingV2/src/Ecotone/StandaloneAggregate/EventStream.php new file mode 100644 index 000000000..87dbf1e96 --- /dev/null +++ b/packages/EventSourcingV2/src/Ecotone/StandaloneAggregate/EventStream.php @@ -0,0 +1,33 @@ +events; + } + + public function eventSourcedAttribute(): EventSourced + { + return $this->eventSourced; + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/src/Ecotone/StandaloneAggregate/TransformPureAggregateStreamToAggregateInterceptor.php b/packages/EventSourcingV2/src/Ecotone/StandaloneAggregate/TransformPureAggregateStreamToAggregateInterceptor.php new file mode 100644 index 000000000..254850a8e --- /dev/null +++ b/packages/EventSourcingV2/src/Ecotone/StandaloneAggregate/TransformPureAggregateStreamToAggregateInterceptor.php @@ -0,0 +1,19 @@ +proceed(); + return new EventStream($eventSourced, $events); + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/src/Ecotone/StandaloneAggregate/WithEventSourcingAttributes.php b/packages/EventSourcingV2/src/Ecotone/StandaloneAggregate/WithEventSourcingAttributes.php new file mode 100644 index 000000000..57c8f08b9 --- /dev/null +++ b/packages/EventSourcingV2/src/Ecotone/StandaloneAggregate/WithEventSourcingAttributes.php @@ -0,0 +1,71 @@ +applyHandler($event); + } + + return $instance; + } + + protected static function setEventHandlersFromAttributes(): void + { + // this should be done ecotone framework bootstrap + // Done here with reflection as a PoC + $reflectionClass = new \ReflectionClass(static::class); + foreach ($reflectionClass->getMethods() as $method) { + foreach ($method->getAttributes() as $attribute) { + if ($attribute->getName() === EventSourcingHandler::class) { + $methodName = $method->getName(); + $reflectionMethod = $reflectionClass->getMethod($methodName); + $eventClass = $reflectionMethod->getParameters()[0]->getType()->getName(); + self::$eventHandlers[$eventClass] = $methodName; + } + } + } + } + + #[MutatingEvents] + public function mutatingEvents(): array + { + return $this->mutatingEvents; + } + + protected function apply(object $event): void + { + $this->mutatingEvents[] = $event; + $this->applyHandler($event); + } + + protected function applyHandler(object $event): void + { + $eventClass = get_class($event); + if (!self::$eventHandlers) { + self::setEventHandlersFromAttributes(); + } + if (!isset(self::$eventHandlers[$eventClass])) { + throw new \RuntimeException("No handler for event {$eventClass}"); + } + $handler = self::$eventHandlers[$eventClass]; + $this->$handler($event); + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/src/EventStore/Dbal/Connection.php b/packages/EventSourcingV2/src/EventStore/Dbal/Connection.php new file mode 100644 index 000000000..77355613b --- /dev/null +++ b/packages/EventSourcingV2/src/EventStore/Dbal/Connection.php @@ -0,0 +1,15 @@ +dbalConnection->prepare($query); + return new DoctrineStatement($doctrineStatement); + } + + public function execute(string $query): void + { + $this->dbalConnection->executeStatement($query); + } + + public function beginTransaction(): Transaction + { + if ($this->dbalConnection->isTransactionActive()) { + return new NoOpTransaction(); + } else { + $this->dbalConnection->beginTransaction(); + return new DoctrineTransaction($this->dbalConnection); + } + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/src/EventStore/Dbal/Doctrine/DoctrineStatement.php b/packages/EventSourcingV2/src/EventStore/Dbal/Doctrine/DoctrineStatement.php new file mode 100644 index 000000000..7e5392f00 --- /dev/null +++ b/packages/EventSourcingV2/src/EventStore/Dbal/Doctrine/DoctrineStatement.php @@ -0,0 +1,65 @@ + $value) { + $doctrineType = $this->typeToDoctrineType($types[$key] ?? self::PARAM_STR); + $this->doctrineStatement->bindValue(\is_string($key) ? $key : $key + 1, $value, $doctrineType); + } + $this->result = $this->doctrineStatement->executeQuery(); + } catch (\Doctrine\DBAL\Exception\DriverException $e) { + throw new \Ecotone\EventSourcingV2\EventStore\Dbal\DriverException($e->getCode(), $e); + } + } + + public function fetch(): array|false + { + try { + return $this->result?->fetchAssociative() ?? false; + } catch (\Doctrine\DBAL\Exception\DriverException $e) { + throw new \Ecotone\EventSourcingV2\EventStore\Dbal\DriverException($e->getCode(), $e); + } + } + + public function fetchColumn(int $columnNumber = 0): mixed + { + try { + return $this->result?->fetchOne(); + } catch (\Doctrine\DBAL\Exception\DriverException $e) { + throw new \Ecotone\EventSourcingV2\EventStore\Dbal\DriverException($e->getCode(), $e); + } + } + + public function rowCount(): int + { + try { + return $this->result?->rowCount() ?? 0; + } catch (\Doctrine\DBAL\Exception\DriverException $e) { + throw new \Ecotone\EventSourcingV2\EventStore\Dbal\DriverException($e->getCode(), $e); + } + } + + private function typeToDoctrineType(int $type): int + { + return match ($type) { + self::PARAM_STR => ParameterType::STRING, + self::PARAM_INT => ParameterType::INTEGER, + default => throw new \InvalidArgumentException("Unsupported type") + }; + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/src/EventStore/Dbal/Doctrine/DoctrineTransaction.php b/packages/EventSourcingV2/src/EventStore/Dbal/Doctrine/DoctrineTransaction.php new file mode 100644 index 000000000..5dab30277 --- /dev/null +++ b/packages/EventSourcingV2/src/EventStore/Dbal/Doctrine/DoctrineTransaction.php @@ -0,0 +1,25 @@ +dbalConnection->commit(); + } + + public function rollBack(): void + { + $this->dbalConnection->rollBack(); + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/src/EventStore/Dbal/DriverException.php b/packages/EventSourcingV2/src/EventStore/Dbal/DriverException.php new file mode 100644 index 000000000..ff0b817d0 --- /dev/null +++ b/packages/EventSourcingV2/src/EventStore/Dbal/DriverException.php @@ -0,0 +1,21 @@ +getMessage(), $code, $previous); + } + + public function getErrorCode(): string + { + return $this->getCode(); + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/src/EventStore/Dbal/NoOpTransaction.php b/packages/EventSourcingV2/src/EventStore/Dbal/NoOpTransaction.php new file mode 100644 index 000000000..49dbf029b --- /dev/null +++ b/packages/EventSourcingV2/src/EventStore/Dbal/NoOpTransaction.php @@ -0,0 +1,19 @@ +pdo->prepare($query)); + } catch (\PDOException $e) { + throw new DriverException($e->errorInfo[1] ?? 0, $e); + } + } + + public function beginTransaction(): Transaction + { + if ($this->pdo->inTransaction()) { + return new NoOpTransaction(); + } else { + $this->pdo->beginTransaction(); + return new PdoTransaction($this->pdo); + } + } + + public function execute(string $query): void + { + try { + $this->pdo->exec($query); + } catch (\PDOException $e) { + throw new DriverException($e->errorInfo[1] ?? 0, $e); + } + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/src/EventStore/Dbal/Pdo/PdoStatement.php b/packages/EventSourcingV2/src/EventStore/Dbal/Pdo/PdoStatement.php new file mode 100644 index 000000000..526908005 --- /dev/null +++ b/packages/EventSourcingV2/src/EventStore/Dbal/Pdo/PdoStatement.php @@ -0,0 +1,66 @@ + $value) { + $pdoType = $this->typeToPdoType($types[$key] ?? self::PARAM_STR); + $this->pdoStatement->bindValue(\is_string($key) ? $key : $key + 1, $value, $pdoType); + } + $this->pdoStatement->execute(); + } catch (\PDOException $e) { + throw new DriverException($e->errorInfo[1] ?? 0, $e); + } + } + + public function fetch(): array|false + { + try { + return $this->pdoStatement->fetch(\PDO::FETCH_ASSOC); + } catch (\PDOException $e) { + throw new DriverException($e->errorInfo[1] ?? 0, $e); + } + } + + public function fetchColumn(int $columnNumber = 0): mixed + { + try { + return $this->pdoStatement->fetchColumn($columnNumber); + } catch (\PDOException $e) { + throw new DriverException($e->errorInfo[1] ?? 0, $e); + } + } + + public function rowCount(): int + { + try { + return $this->pdoStatement->rowCount(); + } catch (\PDOException $e) { + throw new DriverException($e->errorInfo[1] ?? 0, $e); + } + } + + private function typeToPdoType(int $type): int + { + return match ($type) { + self::PARAM_STR => \PDO::PARAM_STR, + self::PARAM_INT => \PDO::PARAM_INT, + default => throw new \InvalidArgumentException("Unsupported type") + }; + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/src/EventStore/Dbal/Pdo/PdoTransaction.php b/packages/EventSourcingV2/src/EventStore/Dbal/Pdo/PdoTransaction.php new file mode 100644 index 000000000..468bcd53a --- /dev/null +++ b/packages/EventSourcingV2/src/EventStore/Dbal/Pdo/PdoTransaction.php @@ -0,0 +1,25 @@ +pdo->commit(); + } + + public function rollBack(): void + { + $this->pdo->rollBack(); + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/src/EventStore/Dbal/Statement.php b/packages/EventSourcingV2/src/EventStore/Dbal/Statement.php new file mode 100644 index 000000000..e0d7215df --- /dev/null +++ b/packages/EventSourcingV2/src/EventStore/Dbal/Statement.php @@ -0,0 +1,17 @@ + + */ + public function load(StreamEventId $eventStreamId): iterable; +} \ No newline at end of file diff --git a/packages/EventSourcingV2/src/EventStore/LogEventId.php b/packages/EventSourcingV2/src/EventStore/LogEventId.php new file mode 100644 index 000000000..def6d5f95 --- /dev/null +++ b/packages/EventSourcingV2/src/EventStore/LogEventId.php @@ -0,0 +1,44 @@ +transactionId > $logEventId->transactionId || ($this->transactionId === $logEventId->transactionId && $this->sequenceNumber > $logEventId->sequenceNumber); + } + + public function isBefore(self $logEventId): bool + { + return $this->transactionId < $logEventId->transactionId || ($this->transactionId === $logEventId->transactionId && $this->sequenceNumber < $logEventId->sequenceNumber); + } + + public function isAfterOrEqual(self $logEventId): bool + { + return $this->isAfter($logEventId) || $this->equals($logEventId); + } + + public function isBeforeOrEqual(self $logEventId): bool + { + return $this->isBefore($logEventId) || $this->equals($logEventId); + } + + public function equals(self $logEventId): bool + { + return $this->transactionId === $logEventId->transactionId && $this->sequenceNumber === $logEventId->sequenceNumber; + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/src/EventStore/PersistedEvent.php b/packages/EventSourcingV2/src/EventStore/PersistedEvent.php new file mode 100644 index 000000000..bbfa996a2 --- /dev/null +++ b/packages/EventSourcingV2/src/EventStore/PersistedEvent.php @@ -0,0 +1,19 @@ + $events + */ + public function runProjectionsWith(array $events): void; + + public function addProjection(string $projectorName, string $state = "catchup"): void; + + public function removeProjection(string $projectorName): void; + + public function catchupProjection(string $projectorName, int $missingEventsMaxLoops = 100): void; +} \ No newline at end of file diff --git a/packages/EventSourcingV2/src/EventStore/Projection/ProjectionRunnerTrait.php b/packages/EventSourcingV2/src/EventStore/Projection/ProjectionRunnerTrait.php new file mode 100644 index 000000000..5fd7353a5 --- /dev/null +++ b/packages/EventSourcingV2/src/EventStore/Projection/ProjectionRunnerTrait.php @@ -0,0 +1,25 @@ + $events + */ + public function projectEvents(Projector $projector, iterable $events): void + { + foreach ($events as $event) { + $projector->project($event); + } + if ($projector instanceof FlushableProjector) { + $projector->flush(); + } + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/src/EventStore/Projection/Projector.php b/packages/EventSourcingV2/src/EventStore/Projection/Projector.php new file mode 100644 index 000000000..4056e506c --- /dev/null +++ b/packages/EventSourcingV2/src/EventStore/Projection/Projector.php @@ -0,0 +1,12 @@ +schemaIsKnownToExists && $this->createSchema) { + $this->schemaUp(); + $this->schemaIsKnownToExists = true; + } + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/src/EventStore/SQL/MysqlEventStore.php b/packages/EventSourcingV2/src/EventStore/SQL/MysqlEventStore.php new file mode 100644 index 000000000..c1b22bb73 --- /dev/null +++ b/packages/EventSourcingV2/src/EventStore/SQL/MysqlEventStore.php @@ -0,0 +1,548 @@ + $projectors + */ + public function __construct( + private Connection $connection, + protected array $projectors = [], + protected bool $ignoreUnknownProjectors = true, + protected string $eventTableName = 'es_event', + protected string $streamTableName = 'es_stream', + protected string $subscriptionTableName = 'es_subscription', + protected string $projectionTableName = 'es_projection', + protected bool $createSchema = true, + ) + { + } + + public function append(StreamEventId $eventStreamId, array $events): array + { + $this->ensureSchemaExists(); + + $transaction = $this->connection->beginTransaction(); + try { + $lastInsertIdStatement = $this->connection->prepare("SELECT LAST_INSERT_ID()"); + $streamVersionStatement = $this->connection->prepare("SELECT version FROM {$this->streamTableName} WHERE stream_id = ?"); + $streamVersionStatement->execute([(string) $eventStreamId->streamId]); + $actualStreamVersion = $streamVersionStatement->fetchColumn() ?: null; + + if ($eventStreamId->version && $actualStreamVersion !== $eventStreamId->version) { + throw new \RuntimeException('Concurrency error. Expected version ' . $eventStreamId->version . ' but got ' . $actualStreamVersion); + } + $version = $actualStreamVersion ?? 0; + $statement = $this->connection->prepare(<<eventTableName} (stream_id, version, event_type, payload, metadata) + VALUES (?, ?, ?, ?, ?) + SQL); + $persistedEvents = []; + foreach ($events as $event) { + $statement->execute([ + $eventStreamId->streamId, + $version++, + $event->type, + json_encode($event->payload, JSON_FORCE_OBJECT), + json_encode($event->metadata, JSON_FORCE_OBJECT), + ]); + $lastInsertIdStatement->execute(); + $lastInsertId = $lastInsertIdStatement->fetchColumn(); + $persistedEvents[] = new PersistedEvent( + new StreamEventId($eventStreamId->streamId, $version), + new LogEventId(0, (int) $lastInsertId), + $event, + ); + } + if ($actualStreamVersion === null) { + $this->connection->prepare("INSERT INTO {$this->streamTableName} (stream_id, version) VALUES (?, ?)")->execute([$eventStreamId->streamId, $version]); + } else { + $this->connection->prepare("UPDATE {$this->streamTableName} SET version = ? WHERE stream_id = ?")->execute([$version, $eventStreamId->streamId]); + } + + $this->runProjectionsWith($persistedEvents); + $transaction->commit(); + } catch (\Throwable $e) { + $transaction->rollBack(); + throw $e; + } + + return $persistedEvents; + } + + public function load(StreamEventId $eventStreamId): iterable + { + $statement = $this->connection->prepare("SELECT id, version, event_type, payload FROM {$this->eventTableName} WHERE stream_id = ? ORDER BY id"); + $statement->execute([$eventStreamId->streamId]); + + $events = []; + while ($row = $statement->fetch()) { + $events[] = new PersistedEvent( + new StreamEventId($eventStreamId->streamId, (int) $row['version']), + new LogEventId(0, (int) $row['id']), + new Event($row['event_type'], $row['payload']), + ); + } + + return $events; + } + + public function query(SubscriptionQuery $query): iterable + { + $whereParts = []; + $params = []; + if ($query->streamIds) { + $whereParts[] = 'e.stream_id IN (' . implode(', ', array_fill(0, count($query->streamIds), '?')) . ')'; + $params = array_merge($params, $query->streamIds); + } + if ($query->from) { + $whereParts[] = 'e.id > ?'; + $params[] = $query->from->sequenceNumber; + } + if ($query->to) { + $whereParts[] = 'e.id <= ?'; + $params[] = $query->to->sequenceNumber; + } + + $transaction = $this->connection->beginTransaction(); + try { + $lock = ''; + if (! $query->allowGaps) { + $maxIdStatement = $this->connection->prepare("SELECT MAX(id) FROM {$this->eventTableName}"); + $maxIdStatement->execute(); + $maxId = $maxIdStatement->fetchColumn(); + if ($maxId === null) { + return; + } + $whereParts[] = 'e.id <= ?'; + $params[] = $maxId; + $lock = 'FOR SHARE NOWAIT'; + } + $where = $whereParts ? 'WHERE ' . implode(' AND ', $whereParts) : ''; + $limit = $query->limit ? "LIMIT {$query->limit}" : ''; + + $sqlQuery = <<eventTableName} e + {$where} + ORDER BY e.id + SQL; + + $statement = $this->connection->prepare("$sqlQuery $limit $lock"); + $statement->execute($params); + + while ($row = $statement->fetch()) { + yield new PersistedEvent( + new StreamEventId($row['stream_id'], (int) $row['version']), + new LogEventId(0, (int) $row['id']), + new Event($row['event_type'], $row['payload']), + ); + } + } catch (DriverException $e) { + if ($e->getCode() === self::MYSQL_ER_LOCK_NOWAIT && isset($sqlQuery)) { + // query row one by one in case of NOWAIT exception + $statement = $this->connection->prepare("$sqlQuery LIMIT 1 OFFSET ? $lock"); + $offset = 0; + $offsetParamPosition = count($params); + while (true) { + try { + $statement->execute([ + ...$params, + $offset, + ], [ + $offsetParamPosition => Statement::PARAM_INT, + ]); + $row = $statement->fetch(); + yield new PersistedEvent( + new StreamEventId($row['stream_id'], (int) $row['version']), + new LogEventId(0, (int) $row['id']), + new Event($row['event_type'], $row['payload']), + ); + } catch (DriverException $e) { + if ($e->getCode() === self::MYSQL_ER_LOCK_NOWAIT) { + return; + } else { + throw $e; + } + } + + $offset++; + } + } else { + throw $e; + } + } finally { + $transaction->commit(); + } + } + + public function runProjectionsWith(array $events): void + { + $statement = $this->connection->prepare(<<projectionTableName} +WHERE state = 'inline' +ORDER BY name +SQL); + $statement->execute(); + + while ($projection = $statement->fetch()) { + $projector = $this->projectors[$projection['name']] ?? null; + if (!$projector) { + if ($this->ignoreUnknownProjectors) { + continue; + } + throw new \RuntimeException(\sprintf('Unknown projector "%s"', $projection['projector'])); + } + foreach ($events as $event) { + $projector->project($event); + } + } + } + + public function addProjection(string $projectorName, string $state = "catchup"): void + { + $this->ensureSchemaExists(); + + $projector = $this->getProjector($projectorName); + + $this->connection->prepare(<<projectionTableName} (name, state) + VALUES (?, ?) + SQL) + ->execute([$projectorName, $state]); + + if ($projector instanceof ProjectorWithSetup) { + $projector->setUp(); + } + } + + public function removeProjection(string $projectorName): void + { + $this->ensureSchemaExists(); + + $this->connection->prepare(<<projectionTableName} + WHERE name = ? + SQL) + ->execute([$projectorName]); + + $this->deleteSubscription($projectorName); + + try { + $projector = $this->getProjector($projectorName); + if ($projector instanceof ProjectorWithSetup) { + $projector->tearDown(); + } + } catch (\RuntimeException) { + // ignore + } + } + + public function catchupProjection(string $projectorName, int $missingEventsMaxLoops = 100): void + { + $this->ensureSchemaExists(); + + $projector = $this->getProjector($projectorName); + $transaction = $this->connection->beginTransaction(); + if ($transaction instanceof NoOpTransaction) { + throw new \RuntimeException('catchupProjection should not be called inside a transaction'); + } + try { + $statement = $this->connection->prepare(<<projectionTableName} + WHERE name = ? + FOR UPDATE + SQL); + $statement->execute([$projectorName]); + $projection = $statement->fetch(); + if (!$projection) { + throw new \RuntimeException('Projection not found'); + } + if ($projection['state'] === 'catchup') { + $this->createSubscription($projectorName, new SubscriptionQuery(limit: 1000)); + + $statement = $this->connection->prepare(<<projectionTableName} + SET state = 'catching_up' + WHERE name = ? + SQL); + $statement->execute([$projectorName]); + } else if ($projection['state'] !== 'catching_up') { + throw new \RuntimeException('Projection is not in catchup or catching_up state'); + } + + $transaction->commit(); + } catch (\Throwable $e) { + $transaction->rollBack(); + throw $e; + } + + do { + $page = $this->readFromSubscription($projectorName); + if ($page->events === []) { + break; + } + $transaction = $this->connection->beginTransaction(); + try { + foreach ($page->events as $event) { + $projector->project($event); + } + $this->ack($page); + $transaction->commit(); + } catch (\Throwable $e) { + $transaction->rollBack(); + throw $e; + } + } while (true); + + $transaction = $this->connection->beginTransaction(); + try { + // With this statement, we lock the event table for inserts, it should be quickly commited + $maxIdStatement = $this->connection->prepare("SELECT MAX(id) FROM {$this->eventTableName} FOR UPDATE"); + $updateProjectionStatement = $this->connection->prepare(<<projectionTableName} + SET state = 'inline', after_event_id = ? + WHERE name = ? + SQL); + $maxIdStatement->execute(); + $maxId = $maxIdStatement->fetchColumn(); + $updateProjectionStatement->execute([$maxId, $projectorName], [Statement::PARAM_INT]); + + $transaction->commit(); + } catch (\Throwable $e) { + $transaction->rollBack(); + throw $e; + } + + + // Execute missing events + $missingEventsLoop = 0; + $stop = false; + $toPosition = new LogEventId(0, $maxId); + while (true) { + $transaction = $this->connection->beginTransaction(); + try { + $page = $this->readFromSubscription($projectorName, $toPosition); + $lastPosition = $page->startPosition->sequenceNumber; + foreach ($page->events as $event) { + $projector->project($event); + $lastPosition = $event->logEventId->sequenceNumber; + } + if ($lastPosition === $maxId) { + $this->deleteSubscription($projectorName); + $stop = true; + } elseif ($page->events !== []) { + $this->ack($page); + } + + $transaction->commit(); + } catch (\Throwable $e) { + $transaction->rollBack(); + throw $e; + } + + if ($stop) { + break; + } + if ($missingEventsLoop < $missingEventsMaxLoops) { + $missingEventsLoop++; + \usleep(10000); + } else { + throw new \RuntimeException('Max missing events loop reached'); + } + + } + } + + public function createSubscription(string $subscriptionName, SubscriptionQuery $subscriptionQuery): void + { + $this->ensureSchemaExists(); + + $position = $subscriptionQuery->from ?? LogEventId::start(); + $this->connection->prepare(<<subscriptionTableName} (event_id, name, query) + VALUES (?, ?, ?) + SQL) + ->execute([ + $position->sequenceNumber, $subscriptionName, \json_encode($subscriptionQuery), + ]); + } + + public function deleteSubscription(string $subscriptionName): void + { + $this->ensureSchemaExists(); + + $this->connection->prepare(<<subscriptionTableName} + WHERE name = ? + SQL) + ->execute([$subscriptionName]); + } + + public function readFromSubscription(string $subscriptionName, ?LogEventId $inlineTo = null): EventPage + { + $statement = $this->connection->prepare(<<subscriptionTableName} + WHERE name = ? + -- FOR UPDATE + SQL); + $statement->execute([$subscriptionName]); + $row = $statement->fetch(); + if (!$row) { + throw new \RuntimeException(\sprintf('Subscription "%s" not found', $subscriptionName)); + } + $startPosition = new LogEventId(0, (int) $row['event_id']); + $baseQueryData = \json_decode($row['query'], true); + $to = $inlineTo ?? ($baseQueryData['to'] ? new LogEventId(0, $baseQueryData['to']['sequenceNumber']) : null); + $baseQuery = new SubscriptionQuery( + streamIds: $baseQueryData['streamIds'] ?? null, + from: $startPosition, + to: $to, + allowGaps: (bool) $baseQueryData['allowGaps'] ?? false, + limit: (int) $baseQueryData['limit'] ?? self::DEFAULT_BATCH_SIZE, + ); + $events = []; + $position = null; + /** @var PersistedEvent $event */ + foreach ($this->query($baseQuery) as $event) { + $events[] = $event; + $position = $event->logEventId; + } + + return new EventPage( + $subscriptionName, + $events, + $startPosition, + $position ?? $startPosition, + $baseQuery->limit); + } + + public function ack(EventPage $page): void + { + // todo: ensure the transaction is not already acked + $statement = $this->connection->prepare(<<subscriptionTableName} + SET event_id = ? + WHERE name = ? + SQL); + $statement->execute([$page->endPosition->sequenceNumber, $page->subscriptionName]); + if ($statement->rowCount() === 0) { + throw new \RuntimeException(\sprintf('Subscription "%s" not found', $page->subscriptionName)); + } + } + + public function schemaUp(): void + { + $this->connection->execute(<<connection->execute(<<connection; + } + + protected function getProjector(string $projectorName): Projector + { + $projector = $this->projectors[$projectorName] ?? null; + if (!$projector) { + throw new \RuntimeException('Unknown projector ' . $projectorName); + } + return $projector; + } + + protected function ensureSchemaExists(): void + { + if (!$this->schemaIsKnownToExists && $this->createSchema) { + $transaction = $this->connection->beginTransaction(); + try { + $wasInTransaction = $transaction instanceof NoOpTransaction; + $statement = $this->connection->prepare("SELECT 1 FROM {$this->eventTableName} LIMIT 1"); + $statement->execute(); + $transaction->commit(); + } catch (DriverException $e) { + $transaction->rollBack(); + if ($e->getCode() === self::MYSQL_ER_NO_SUCH_TABLE) { + if ($wasInTransaction) { + throw new \RuntimeException('ensureSchemaExists would have created the event store tables but it should not be called inside a transaction: the transaction would have been commited by DDL changes'); + } + $this->schemaUp(); + } + } + + $this->schemaIsKnownToExists = true; + } + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/src/EventStore/SQL/PostgresEventStore.php b/packages/EventSourcingV2/src/EventStore/SQL/PostgresEventStore.php new file mode 100644 index 000000000..5c41a060c --- /dev/null +++ b/packages/EventSourcingV2/src/EventStore/SQL/PostgresEventStore.php @@ -0,0 +1,540 @@ + $projectors + */ + public function __construct( + protected Connection $connection, + protected array $projectors = [], + protected bool $ignoreUnknownProjectors = true, + protected string $eventTableName = 'es_event', + protected string $streamTableName = 'es_stream', + protected string $subscriptionTableName = 'es_subscription', + protected string $projectionTableName = 'es_projection', + protected bool $createSchema = true, + ) { + } + + /** + * @inheritDoc + */ + public function append(StreamEventId $eventStreamId, array $events): array + { + $this->ensureSchemaExists(); + $transaction = $this->connection->beginTransaction(); + try { + $streamVersionStatement = $this->connection->prepare("SELECT version FROM {$this->streamTableName} WHERE stream_id = ?"); + $streamVersionStatement->execute([(string) $eventStreamId->streamId]); + $actualStreamVersion = $streamVersionStatement->fetchColumn() ?: null; + + if ($eventStreamId->version && $actualStreamVersion !== $eventStreamId->version) { + throw new \RuntimeException('Concurrency error. Expected version ' . $eventStreamId->version . ' but got ' . $actualStreamVersion); + } + $version = $actualStreamVersion ?? 0; + $statement = $this->connection->prepare(<<eventTableName} (stream_id, version, event_type, payload, metadata) + VALUES (?, ?, ?, ?, ?) + RETURNING id, transaction_id + SQL); + $persistedEvents = []; + foreach ($events as $event) { + $statement->execute([ + $eventStreamId->streamId, + $version++, + $event->type, + json_encode($event->payload, JSON_FORCE_OBJECT), + json_encode($event->metadata, JSON_FORCE_OBJECT), + ]); + $row = $statement->fetch(); + $persistedEvents[] = new PersistedEvent( + new StreamEventId($eventStreamId->streamId, $version), + new LogEventId((int) $row['transaction_id'], (int) $row['id']), + $event, + ); + } + if ($actualStreamVersion === null) { + $this->connection->prepare("INSERT INTO {$this->streamTableName} (stream_id, version) VALUES (?, ?)")->execute([$eventStreamId->streamId, $version]); + } else { + $this->connection->prepare("UPDATE {$this->streamTableName} SET version = ? WHERE stream_id = ?")->execute([$version, $eventStreamId->streamId]); + } + + $this->runProjectionsWith($persistedEvents); + $transaction->commit(); + } catch (\Throwable $e) { + $transaction->rollBack(); + throw $e; + } + + return $persistedEvents; + } + + /** + * @inheritDoc + */ + public function load(StreamEventId $eventStreamId): iterable + { + $statement = $this->connection->prepare("SELECT id, transaction_id, version, event_type, payload FROM {$this->eventTableName} WHERE stream_id = ? ORDER BY id"); + $statement->execute([$eventStreamId->streamId]); + + $events = []; + while ($row = $statement->fetch()) { + $events[] = new PersistedEvent( + new StreamEventId($eventStreamId->streamId, (int) $row['version']), + new LogEventId((int) $row['transaction_id'], (int) $row['id']), + new Event($row['event_type'], $row['payload']), + ); + } + + return $events; + } + + /** + * @return iterable + */ + public function query(SubscriptionQuery $query): iterable + { + $whereParts = []; + $params = []; + if ($query->streamIds) { + $whereParts[] = 'e.stream_id IN (' . implode(', ', array_fill(0, count($query->streamIds), '?')) . ')'; + $params = array_merge($params, $query->streamIds); + } + if ($query->from) { + $whereParts[] = '(e.transaction_id, e.id) > (?, ?)'; + $params[] = $query->from->transactionId; + $params[] = $query->from->sequenceNumber; + } + if ($query->allowGaps === false) { + $whereParts[] = "e.transaction_id < pg_snapshot_xmin(pg_current_snapshot())"; + } + + $where = $whereParts ? 'WHERE ' . implode(' AND ', $whereParts) : ''; + $limit = $query->limit ? "LIMIT {$query->limit}" : ''; + + $query = <<eventTableName} e +{$where} +ORDER BY e.transaction_id, e.id +{$limit} +SQL; + + $statement = $this->connection->prepare($query); + $statement->execute($params); + + while ($row = $statement->fetch()) { + yield new PersistedEvent( + new StreamEventId($row['stream_id'], (int) $row['version']), + new LogEventId((int) $row['transaction_id'], (int) $row['id']), + new Event($row['event_type'], $row['payload']), + ); + } + } + + public function connection(): Connection + { + return $this->connection; + } + + /** + * PersistentSubscriptions + */ + public function createSubscription(string $subscriptionName, SubscriptionQuery $subscriptionQuery): void + { + $this->ensureSchemaExists(); + + $position = $subscriptionQuery->from ?? LogEventId::start(); + $this->connection->prepare(<<subscriptionTableName} (transaction_id, event_id, name, query) + VALUES (?, ?, ?, ?) + SQL) + ->execute([ + $position->transactionId, $position->sequenceNumber, $subscriptionName, \json_encode($subscriptionQuery), + ]); + } + + public function deleteSubscription(string $subscriptionName): void + { + $this->ensureSchemaExists(); + + $this->connection->prepare(<<subscriptionTableName} + WHERE name = ? + SQL) + ->execute([$subscriptionName]); + } + + public function readFromSubscription(string $subscriptionName): EventPage + { + $statement = $this->connection->prepare(<<subscriptionTableName} + WHERE name = ? + -- FOR UPDATE + SQL); + $statement->execute([$subscriptionName]); + $row = $statement->fetch(); + if (!$row) { + throw new \RuntimeException(\sprintf('Subscription "%s" not found', $subscriptionName)); + } + $startPosition = new LogEventId((int) $row['transaction_id'], (int) $row['event_id']); + $baseQueryData = \json_decode($row['query'], true); + $baseQuery = new SubscriptionQuery( + streamIds: $baseQueryData['streamIds'] ?? null, + from: $startPosition, + allowGaps: (bool) $baseQueryData['allowGaps'] ?? false, + limit: (int) $baseQueryData['limit'] ?? self::DEFAULT_BATCH_SIZE, + ); + $events = []; + $position = null; + /** @var PersistedEvent $event */ + foreach ($this->query($baseQuery) as $event) { + $events[] = $event; + $position = $event->logEventId; + } + + return new EventPage( + $subscriptionName, + $events, + $startPosition, + $position ?? $startPosition, + $baseQuery->limit); + } + + public function ack(EventPage $page): void + { + // todo: ensure the transaction is not already acked + $statement = $this->connection->prepare(<<subscriptionTableName} + SET transaction_id = ?, event_id = ? + WHERE name = ? + SQL); + $statement->execute([$page->endPosition->transactionId, $page->endPosition->sequenceNumber, $page->subscriptionName]); + if ($statement->rowCount() === 0) { + throw new \RuntimeException(\sprintf('Subscription "%s" not found', $page->subscriptionName)); + } + } + + /** + * InlineProjectionManager + */ + public function runProjectionsWith(array $events): void + { + $statement = $this->connection->prepare(<<projectionTableName} +WHERE state = 'inline' AND ( + after_transaction_id IS NULL + OR + after_transaction_id < pg_current_xact_id()) +AND ( + before_transaction_id IS NULL + OR + before_transaction_id < pg_current_xact_id()) +ORDER BY name +FOR SHARE +SQL); + $statement->execute(); + + while ($projection = $statement->fetch()) { + $projector = $this->projectors[$projection['name']] ?? null; + if (!$projector) { + if ($this->ignoreUnknownProjectors) { + continue; + } + throw new \RuntimeException(\sprintf('Unknown projector "%s"', $projection['projector'])); + } + foreach ($events as $event) { + $projector->project($event); + } + } + } + + public function addProjection(string $projectorName, string $state = "catchup"): void + { + $this->ensureSchemaExists(); + + $projector = $this->getProjector($projectorName); + + $this->connection->prepare(<<projectionTableName} (name, state) + VALUES (?, ?) + SQL) + ->execute([$projectorName, $state]); + + if ($projector instanceof ProjectorWithSetup) { + $projector->setUp(); + } + } + + public function removeProjection(string $projectorName): void + { + $this->ensureSchemaExists(); + + $this->connection->prepare(<<projectionTableName} + WHERE name = ? + SQL) + ->execute([$projectorName]); + + $this->deleteSubscription($projectorName); + + try { + $projector = $this->getProjector($projectorName); + if ($projector instanceof ProjectorWithSetup) { + $projector->tearDown(); + } + } catch (\RuntimeException) { + // ignore + } + } + + public function catchupProjection(string $projectorName, int $missingEventsMaxLoops = 100): void + { + $this->ensureSchemaExists(); + + $transaction = $this->connection->beginTransaction(); + try { + $statement = $this->connection->prepare(<<projectionTableName} + WHERE name = ? + FOR UPDATE + SQL); + $statement->execute([$projectorName]); + $projection = $statement->fetch(); + if (!$projection) { + throw new \RuntimeException('Projection not found'); + } + if ($projection['state'] === 'catchup') { + $statement = $this->connection->prepare(<<projectionTableName} + SET state = 'catching_up' + WHERE name = ? + SQL); + $statement->execute([$projectorName]); + $this->createSubscription($projectorName, new SubscriptionQuery(limit: 1000)); + } elseif ($projection['state'] !== 'catching_up') { + throw new \RuntimeException('Cannot catchup projection in state ' . $projection['state']); + } + $projector = $this->getProjector($projectorName); + + $transaction->commit(); + } catch (\Throwable $e) { + $transaction->rollBack(); + throw $e; + } + + do { + $page = $this->readFromSubscription($projectorName); + if ($page->events === []) { + break; + } + $transaction = $this->connection->beginTransaction(); + try { + foreach ($page->events as $event) { + $projector->project($event); + } + $this->ack($page); + $transaction->commit(); + } catch (\Throwable $e) { + $transaction->rollBack(); + throw $e; + } + } while (true); + + $transaction = $this->connection->beginTransaction(); + try { + $lastTransactionIdStatement = $this->connection->prepare(<<projectionTableName} + SET state = 'inline', after_transaction_id = pg_snapshot_xmax(pg_current_snapshot()), before_transaction_id = NULL + WHERE name = ? + RETURNING after_transaction_id + SQL); + $lastTransactionIdStatement->execute([$projectorName]); + $lastTransactionId = $lastTransactionIdStatement->fetchColumn(); + + $transaction->commit(); + } catch (\Throwable $e) { + $transaction->rollBack(); + throw $e; + } + + + // Execute missing events + $missingEventsLoop = 0; + $currentXminStatement = $this->connection->prepare("SELECT pg_snapshot_xmin(pg_current_snapshot())"); + while ($missingEventsLoop < $missingEventsMaxLoops) { + $transaction = $this->connection->beginTransaction(); + try { + $currentXminStatement->execute(); + $currentXmin = $currentXminStatement->fetchColumn(); + $page = $this->readFromSubscription($projectorName); + foreach ($page->events as $event) { + if ($event->logEventId->transactionId > $lastTransactionId) { + break; + } + $projector->project($event); + } + $this->ack($page); + + $transaction->commit(); + } catch (\Throwable $e) { + $transaction->rollBack(); + throw $e; + } + + if ($currentXmin > $lastTransactionId) { + $this->persistentSubscriptions()->deleteSubscription($projectorName); + break; + } + + $missingEventsLoop++; + \usleep(1000); + } + } + + public function switchProjectionToSubscription(string $projectionName): void + { + $this->ensureSchemaExists(); + + $transaction = $this->connection->beginTransaction(); + try { + $statement = $this->connection->prepare(<<projectionTableName} + WHERE name = ? + FOR UPDATE + SQL); + $statement->execute([$projectionName]); + $projection = $statement->fetch(); + if (!$projection) { + throw new \RuntimeException('Projection not found'); + } + if ($projection['state'] !== 'inline') { + throw new \RuntimeException('Cannot switch projection in state ' . $projection['state']); + } + $statement = $this->connection->prepare(<<projectionTableName} + SET before_transaction_id = pg_snapshot_xmax(pg_current_snapshot()) + WHERE name = ? + RETURNING before_transaction_id + SQL); + $statement->execute([$projectionName]); + $beforeTransactionId = $statement->fetchColumn(); + $this->createSubscription($projectionName, new SubscriptionQuery(from: new LogEventId($beforeTransactionId, 0))); + + $transaction->commit(); + } catch (\Throwable $e) { + $transaction->rollBack(); + throw $e; + } + } + + protected function getProjector(string $projectorName): Projector + { + $projector = $this->projectors[$projectorName] ?? null; + if (!$projector) { + throw new \RuntimeException('Unknown projector ' . $projectorName); + } + return $projector; + } + + protected function persistentSubscriptions(): PersistentSubscriptions + { + return $this; + } + + public function schemaUp(): void + { + $this->connection->execute(<<eventTableName} +( + id BIGSERIAL PRIMARY KEY, + transaction_id XID8 NOT NULL DEFAULT pg_current_xact_id(), + stream_id UUID NOT NULL, + version INTEGER NOT NULL, + event_type TEXT NOT NULL, + payload JSON NOT NULL, + metadata JSON NOT NULL DEFAULT '{}', + UNIQUE (stream_id, version) +); + +CREATE INDEX IF NOT EXISTS idx_{$this->eventTableName}_transaction_id_id ON {$this->eventTableName} (transaction_id, id); +CREATE INDEX IF NOT EXISTS idx_{$this->eventTableName}_stream_id ON {$this->eventTableName} (stream_id); +CREATE INDEX IF NOT EXISTS idx_{$this->eventTableName}_version ON {$this->eventTableName} (version); + +CREATE TABLE IF NOT EXISTS {$this->streamTableName} +( + stream_id UUID NOT NULL PRIMARY KEY, + version INTEGER NOT NULL +); + +CREATE TABLE IF NOT EXISTS {$this->projectionTableName} +( + name TEXT NOT NULL PRIMARY KEY, + state TEXT NOT NULL, + after_transaction_id XID8, + before_transaction_id XID8, + metadata JSONB DEFAULT NULL +); +CREATE INDEX IF NOT EXISTS idx_{$this->projectionTableName}_state ON {$this->projectionTableName} (state); +CREATE INDEX IF NOT EXISTS idx_{$this->projectionTableName}_after_transaction_id ON {$this->projectionTableName} (after_transaction_id); +CREATE INDEX IF NOT EXISTS idx_{$this->projectionTableName}_before_transaction_id ON {$this->projectionTableName} (before_transaction_id); + +CREATE TABLE IF NOT EXISTS {$this->subscriptionTableName} +( + name TEXT NOT NULL PRIMARY KEY, + transaction_id XID8 NOT NULL, + event_id BIGINT NOT NULL, + query JSON NOT NULL +); +SQL); + } + + public function schemaDown(): void + { + $this->connection->execute(<<eventTableName}; + +DROP TABLE IF EXISTS {$this->streamTableName}; + +DROP TABLE IF EXISTS {$this->projectionTableName}; + +DROP TABLE IF EXISTS {$this->subscriptionTableName}; +SQL); + } + + protected function ensureSchemaExists(): void + { + if (!$this->schemaIsKnownToExists && $this->createSchema) { + $statement = $this->connection->prepare("SELECT to_regclass(?)"); + $statement->execute([$this->eventTableName]); + if ($statement->fetchColumn() === null) { + $this->schemaUp(); + } + $this->schemaIsKnownToExists = true; + } + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/src/EventStore/StreamEventId.php b/packages/EventSourcingV2/src/EventStore/StreamEventId.php new file mode 100644 index 000000000..dff16e565 --- /dev/null +++ b/packages/EventSourcingV2/src/EventStore/StreamEventId.php @@ -0,0 +1,31 @@ +streamId, $version); + } + + public function withoutVersion(): self + { + return new self($this->streamId); + } + + public function equals(self $eventStreamId): bool + { + return $this->streamId === $eventStreamId->streamId && $this->version === $eventStreamId->version; + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/src/EventStore/Subscription/EventLoader.php b/packages/EventSourcingV2/src/EventStore/Subscription/EventLoader.php new file mode 100644 index 000000000..e8e93f686 --- /dev/null +++ b/packages/EventSourcingV2/src/EventStore/Subscription/EventLoader.php @@ -0,0 +1,16 @@ + + */ + public function query(SubscriptionQuery $query): iterable; +} \ No newline at end of file diff --git a/packages/EventSourcingV2/src/EventStore/Subscription/EventPage.php b/packages/EventSourcingV2/src/EventStore/Subscription/EventPage.php new file mode 100644 index 000000000..a3f1bc481 --- /dev/null +++ b/packages/EventSourcingV2/src/EventStore/Subscription/EventPage.php @@ -0,0 +1,23 @@ + $events + */ + public function __construct( + public readonly string $subscriptionName, + public readonly array $events, + public readonly LogEventId $startPosition, + public readonly LogEventId $endPosition, + public readonly int $requestedBatchSize, + ) { + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/src/EventStore/Subscription/PersistentSubscriptions.php b/packages/EventSourcingV2/src/EventStore/Subscription/PersistentSubscriptions.php new file mode 100644 index 000000000..dda4fa05e --- /dev/null +++ b/packages/EventSourcingV2/src/EventStore/Subscription/PersistentSubscriptions.php @@ -0,0 +1,17 @@ +streamIds, + $position, + $this->to, + $this->allowGaps, + $this->limit, + ); + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/src/EventStore/Test/InMemoryEventStore.php b/packages/EventSourcingV2/src/EventStore/Test/InMemoryEventStore.php new file mode 100644 index 000000000..8ab8583ad --- /dev/null +++ b/packages/EventSourcingV2/src/EventStore/Test/InMemoryEventStore.php @@ -0,0 +1,201 @@ + */ + private array $events = []; + + /** @var array */ + private array $streams = []; + + /** + * @var array + */ + private array $projections = []; + + /** + * @var array + */ + private array $subscriptions = []; + private ContainerInterface $dynamicProjectors; + + /** + * @param array $permanentProjectors + */ + public function __construct( + ContainerInterface $dynamicProjectors = null, + private array $permanentProjectors = [], + ) { + $this->dynamicProjectors = $dynamicProjectors ?? InMemoryPSRContainer::createEmpty(); + } + + public function append(StreamEventId $eventStreamId, array $events): array + { + $stream = $this->streams[(string) $eventStreamId->streamId] ?? null; + if ($stream === null) { + $stream = $this->streams[(string) $eventStreamId->streamId] = new InMemoryStream((string) $eventStreamId->streamId, 0); + } + $persistedEvents = []; + foreach ($events as $event) { + $persistedEvents[] = $persistedEvent = new PersistedEvent( + new StreamEventId($eventStreamId->streamId, $stream->version++), + new LogEventId(1, $this->nextEventId), + $event, + ); + $stream->events[] = $this->events[$this->nextEventId] = $persistedEvent; + $this->nextEventId++; + } + + $this->runProjectionsWith($persistedEvents); + + return $persistedEvents; + } + + public function load(StreamEventId $eventStreamId): iterable + { + $stream = $this->streams[(string) $eventStreamId->streamId] ?? null; + if ($stream === null) { + return []; + } + reset($stream->events); + foreach ($stream->events as $event) { + if ($eventStreamId->version && $event->streamEventId->version < $eventStreamId->version) { + continue; + } + yield $event; + } + } + + public function query(SubscriptionQuery $query): iterable + { + foreach ($this->events as $event) { + if ($query->streamIds && !in_array($event->streamEventId->streamId, $query->streamIds, true)) { + continue; + } + if ($query->from && $event->logEventId->transactionId < $query->from->transactionId) { + continue; + } + if ($query->from && $event->logEventId->transactionId === $query->from->transactionId && $event->logEventId->sequenceNumber <= $query->from->sequenceNumber) { + continue; + } + yield $event; + } + } + + public function runProjectionsWith(array $events): void + { + foreach ($this->projections as $projectorName => $state) { + if ($state === "inline") { + $projector = $this->dynamicProjectors->get($projectorName); + $this->projectEvents($projector, $events); + } + } + foreach ($this->permanentProjectors as $projector) { + $this->projectEvents($projector, $events); + } + } + + public function addProjection(string $projectorName, string $state = "catchup"): void + { + $this->projections[$projectorName] = $state; + } + + public function removeProjection(string $projectorName): void + { + unset($this->projections[$projectorName]); + } + + public function catchupProjection(string $projectorName, int $missingEventsMaxLoops = 100): void + { + if (!isset($this->projections[$projectorName])) { + throw new \InvalidArgumentException("Projection not found"); + } + if ($this->projections[$projectorName] !== "catchup") { + throw new \InvalidArgumentException("Projection is not in catchup state"); + } + $projector = $this->dynamicProjectors->get($projectorName); + + $this->projectEvents($projector, $this->query(new SubscriptionQuery())); + + $this->projections[$projectorName] = "inline"; + } + + public function createSubscription(string $subscriptionName, SubscriptionQuery $subscriptionQuery): void + { + $this->subscriptions[$subscriptionName] = $subscriptionQuery; + } + + public function deleteSubscription(string $subscriptionName): void + { + unset($this->subscriptions[$subscriptionName]); + } + + public function readFromSubscription(string $subscriptionName): EventPage + { + $subscriptionQuery = $this->subscriptions[$subscriptionName] ?? null; + if ($subscriptionQuery === null) { + throw new \InvalidArgumentException("Subscription not found"); + } + /** @var array $persistedEvents */ + $persistedEvents = \iterator_to_array($this->query($subscriptionQuery)); + $lastEvent = end($persistedEvents); + $firstEvent = reset($persistedEvents); + return new EventPage( + $subscriptionName, + $persistedEvents, + $firstEvent ? $firstEvent->logEventId : $subscriptionQuery->from, + $lastEvent ? $lastEvent->logEventId : $subscriptionQuery->from, + $subscriptionQuery->limit ?? 0, + ); + } + + public function ack(EventPage $page): void + { + $subscriptionQuery = $this->subscriptions[$page->subscriptionName] ?? null; + if ($subscriptionQuery === null) { + throw new \InvalidArgumentException("Subscription not found"); + } + if (!$page->events) { + return; + } + $lastEvent = $page->events[\array_key_last($page->events)]; + $this->subscriptions[$page->subscriptionName] = $subscriptionQuery->withFromPosition($lastEvent->logEventId); + } +} + +/** + * @internal + */ +class InMemoryStream { + /** + * @param array $events + */ + public function __construct( + public string $streamId, + public int $version, + public array $events = [], + ) { + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/tests/EventStore/CatchupProjectionTestCaseTrait.php b/packages/EventSourcingV2/tests/EventStore/CatchupProjectionTestCaseTrait.php new file mode 100644 index 000000000..8aaa24a54 --- /dev/null +++ b/packages/EventSourcingV2/tests/EventStore/CatchupProjectionTestCaseTrait.php @@ -0,0 +1,51 @@ +toString(); + $streamId = new StreamEventId(Uuid::uuid4()); + + $eventStore = $this->config()->createEventStore( + projectors: [ + $projectionName => $counterProjection = new InMemoryEventCounterProjector([$streamId->streamId]), + ], + ); + $eventStore->addProjection($projectionName); + + $eventStore->append($streamId, [ + new Event('event_type', ['data' => 'value']), + new Event('event_type', ['data' => 'value']), + new Event('event_type', ['data' => 'value']), + new Event('event_type', ['data' => 'value']), + new Event('event_type', ['data' => 'value']), + new Event('event_type', ['data' => 'value']), + new Event('event_type', ['data' => 'value']), + ]); + + self::assertEquals(0, $counterProjection->getCounter()); + + $eventStore->catchupProjection($projectionName); + + self::assertEquals(7, $counterProjection->getCounter()); + + $eventStore->append($streamId, [ + new Event('event_type', ['data' => 'value']), + new Event('event_type', ['data' => 'value']), + ]); + + self::assertEquals(9, $counterProjection->getCounter()); + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/tests/EventStore/CatchupProjectionTransactionalTestTrait.php b/packages/EventSourcingV2/tests/EventStore/CatchupProjectionTransactionalTestTrait.php new file mode 100644 index 000000000..ba1922921 --- /dev/null +++ b/packages/EventSourcingV2/tests/EventStore/CatchupProjectionTransactionalTestTrait.php @@ -0,0 +1,69 @@ +config()->bindConnectionString(); + $longRunningProcessInput = new InputStream(); + $longRunningProcess = new Process(['php', __DIR__ . '/console.php', 'long-running-append', '--dbConfig', $config->toString()]); + $longRunningProcess->setInput($longRunningProcessInput); + $initProcess = new Process(['php', __DIR__ . '/console.php', 'init', '--dbConfig', $config->toString()]); + $catchupProcess = new Process(['php', __DIR__ . '/console.php', 'catchup-projection', '--dbConfig', $config->toString()]); + + $connection = $this->config()->getConnection(); + $counterBaseProjection = new PostgresTableProjector($connection, 'test_event_base'); + $counterCatchupProjection = new PostgresTableProjector($connection, 'test_event_catchup'); + + $initProcess->run(); + self::assertTrue($initProcess->isSuccessful(), "Init process failed: " . $initProcess->getOutput() . $initProcess->getErrorOutput()); + self::assertEmpty($counterCatchupProjection->getState()); + + $longRunningProcess->start(); + $longRunningProcess->waitUntil(function ($type, $output) { + return $output === "Events appended, waiting some input to commit\n"; + }); + + $catchupProcess->start(); + + $maxSleep = 10000; + while ($counterBaseProjection->getState() != $counterCatchupProjection->getState()) { + usleep(2000); + $maxSleep--; + if ($maxSleep === 0) { + $catchupProcess->stop(); + $longRunningProcess->stop(); + self::assertEquals($counterBaseProjection->getState(), $counterCatchupProjection->getState(),'Projection did not catch up'); + } + } + + $longRunningProcessInput->write("trigger commit\n"); + $longRunningProcessInput->close(); + + $longRunningProcess->wait(); + $catchupProcess->wait(); + + $realEventIdsStatement = $connection->prepare('SELECT id FROM es_event ORDER BY id'); + $realEventIdsStatement->execute(); + $realEventIds = []; + while ($row = $realEventIdsStatement->fetch()) { + $realEventIds[] = (int) $row['id']; + } + + self::assertTrue($longRunningProcess->isSuccessful()); + self::assertTrue($catchupProcess->isSuccessful(), $catchupProcess->getOutput()); + self::assertEquals($realEventIds, $counterBaseProjection->getState(), "Base projection did not catch up"); + self::assertEquals($realEventIds, $counterCatchupProjection->getState(), "Catchup projection did not catch up"); + } + +} \ No newline at end of file diff --git a/packages/EventSourcingV2/tests/EventStore/EventLoaderTestCaseTrait.php b/packages/EventSourcingV2/tests/EventStore/EventLoaderTestCaseTrait.php new file mode 100644 index 000000000..23b5b2888 --- /dev/null +++ b/packages/EventSourcingV2/tests/EventStore/EventLoaderTestCaseTrait.php @@ -0,0 +1,37 @@ +createEventStore(); + \assert($eventStore instanceof EventLoader); + + $existingEvents = \iterator_to_array($eventStore->query(new SubscriptionQuery())); + $lastExistingEvent = \end($existingEvents); + $eventStore->append(new StreamEventId(Uuid::uuid4()), [ + new Event('event_type', ['data' => 'value']), + new Event('event_type', ['data' => 'value']), + ]); + + $startPosition = $lastExistingEvent ? $lastExistingEvent->logEventId : null; + + $events = \iterator_to_array($eventStore->query(new SubscriptionQuery( + from: $startPosition, + ))); + + self::assertCount(2, $events); + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/tests/EventStore/EventStoreTestCaseTrait.php b/packages/EventSourcingV2/tests/EventStore/EventStoreTestCaseTrait.php new file mode 100644 index 000000000..70a3f22f8 --- /dev/null +++ b/packages/EventSourcingV2/tests/EventStore/EventStoreTestCaseTrait.php @@ -0,0 +1,35 @@ +createEventStore(); + + $eventStreamId = new StreamEventId(Uuid::uuid4()->toString()); + $eventStore->append($eventStreamId, [ + new Event('event_type', ['data' => 'value']), + new Event('event_type', ['data' => 'value']), + ]); + $eventStore->append(new StreamEventId(Uuid::uuid4()->toString()), [ + new Event('event_type', ['data' => 'value']), + new Event('event_type', ['data' => 'value']), + ]); + + $events = \iterator_to_array($eventStore->load($eventStreamId)); + + self::assertCount(2, $events); + foreach ($events as $event) { + self::assertInstanceOf(PersistedEvent::class, $event); + } + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/tests/EventStore/Fixtures/InMemoryEventCounterProjector.php b/packages/EventSourcingV2/tests/EventStore/Fixtures/InMemoryEventCounterProjector.php new file mode 100644 index 000000000..564ab260e --- /dev/null +++ b/packages/EventSourcingV2/tests/EventStore/Fixtures/InMemoryEventCounterProjector.php @@ -0,0 +1,34 @@ +streams = array_map('strval', $streams); + } + } + + public function project(PersistedEvent $event): void + { + if ($this->streams && !in_array((string) $event->streamEventId->streamId, $this->streams, true)) { + return; + } + $this->counter++; + } + + public function getCounter(): int + { + return $this->counter; + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/tests/EventStore/Fixtures/PostgresEventCounterProjector.php b/packages/EventSourcingV2/tests/EventStore/Fixtures/PostgresEventCounterProjector.php new file mode 100644 index 000000000..71a88cd20 --- /dev/null +++ b/packages/EventSourcingV2/tests/EventStore/Fixtures/PostgresEventCounterProjector.php @@ -0,0 +1,74 @@ +streams = array_map('strval', $streams); + } + } + + public function project(PersistedEvent $event): void + { + if ($this->streams && !in_array((string) $event->streamEventId->streamId, $this->streams, true)) { + return; + } + $statement = $this->connection->prepare(<<tableName} (event_type, counter) + VALUES (?, 1) + ON CONFLICT (event_type) DO + UPDATE SET counter = {$this->tableName}.counter + 1 + WHERE {$this->tableName}.event_type = ? + SQL); + + $statement->execute([$event->event->type, $event->event->type]); + } + + /** + * @return array + */ + public function getCounters(): array + { + $statement = $this->connection->prepare(<<tableName} + SQL); + $statement->execute(); + + $counters = []; + while ($row = $statement->fetch()) { + $counters[$row['event_type']] = (int) $row['counter']; + } + return $counters; + } + + public function setUp(): void + { + $this->connection->prepare(<<tableName} ( + event_type VARCHAR(255) PRIMARY KEY, + counter INT NOT NULL DEFAULT 0 + ) + SQL)->execute(); + } + + public function tearDown(): void + { + $this->connection->prepare(<<tableName} + SQL)->execute(); + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/tests/EventStore/Fixtures/PostgresTableProjector.php b/packages/EventSourcingV2/tests/EventStore/Fixtures/PostgresTableProjector.php new file mode 100644 index 000000000..16a29bfc1 --- /dev/null +++ b/packages/EventSourcingV2/tests/EventStore/Fixtures/PostgresTableProjector.php @@ -0,0 +1,62 @@ +connection->prepare(<<tableName} (event_id) + VALUES (?) + SQL) + ->execute([$event->logEventId->sequenceNumber]); + } + + /** + * @return array + */ + public function getState(): array + { + $statement = $this->connection->prepare(<<tableName} + ORDER BY event_id + SQL); + $statement->execute(); + + $rows = []; + while ($row = $statement->fetch()) { + $rows[] = (int) $row['event_id']; + } + return $rows; + } + + public function setUp(): void + { + $this->connection->prepare(<<tableName} ( + id SERIAL PRIMARY KEY, + event_id BIGINT + ) + SQL)->execute(); + + } + + public function tearDown(): void + { + $this->connection->prepare("DROP TABLE {$this->tableName}")->execute(); + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/tests/EventStore/SQL/Helpers/DatabaseConfig.php b/packages/EventSourcingV2/tests/EventStore/SQL/Helpers/DatabaseConfig.php new file mode 100644 index 000000000..c748547a2 --- /dev/null +++ b/packages/EventSourcingV2/tests/EventStore/SQL/Helpers/DatabaseConfig.php @@ -0,0 +1,122 @@ +connectionString)) { + return $this->connectionString; + } else { + $envKey = match ($this->db) { + 'pg' => 'DATABASE_POSTGRES', + 'mysql' => 'DATABASE_MYSQL', + default => throw new \InvalidArgumentException("Unknown db: {$this->db}"), + }; + return getenv($envKey) ?: throw new \InvalidArgumentException("Missing env var: {$envKey}"); + } + } + + public function bindConnectionString(): self + { + return new self( + db: $this->db, + driver: $this->driver, + connectionString: $this->getConnectionString(), + ); + } + + public function getConnection(): Connection + { + return match ($this->driver) { + 'pdo' => new PdoConnection($this->getPdoConnection()), + 'doctrine' => new DoctrineConnection($this->getDoctrineConnection()), + default => throw new \InvalidArgumentException("Unknown driver: {$this->driver}"), + }; + } + + public function createEventStore( + array $projectors = [], + bool $ignoreUnknownProjectors = true, + string $eventTableName = 'es_event', + string $streamTableName = 'es_stream', + string $subscriptionTableName = 'es_subscription', + string $projectionTableName = 'es_projection', + Connection $connection = null, + ): PostgresEventStore|MysqlEventStore + { + $className = $this->getEventStoreClass(); + $eventStore = new $className( + $connection ?? $this->getConnection(), + $projectors, + $ignoreUnknownProjectors, + $eventTableName, + $streamTableName, + $subscriptionTableName, + $projectionTableName + ); +// $eventStore->schemaUp(); + return $eventStore; + } + + /** + * @return class-string + */ + public function getEventStoreClass(): string + { + return match ($this->db) { + 'pg' => PostgresEventStore::class, + 'mysql' => MysqlEventStore::class, + default => throw new \InvalidArgumentException("Unknown db: {$this->db}"), + }; + } + + public function toString() + { + return "{$this->db}::{$this->driver}::{$this->getConnectionString()}"; + } + + public static function fromString(string $config): self + { + $parts = explode('::', $config); + return new self( + db: \array_shift($parts), + driver: \array_shift($parts), + connectionString: implode('::', $parts), + ); + } + + private function getDoctrineConnection(): \Doctrine\DBAL\Connection + { + $parser = new DsnParser([ + 'mysql' => 'pdo_mysql', + 'postgres' => 'pdo_pgsql', + 'postgresql' => 'pdo_pgsql', + 'pgsql' => 'pdo_pgsql', + ]); + return DriverManager::getConnection($parser->parse($this->getConnectionString())); + } + + private function getPdoConnection(): \PDO + { + return $this->getDoctrineConnection()->getNativeConnection(); + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/tests/EventStore/SQL/Helpers/PdoDsn.php b/packages/EventSourcingV2/tests/EventStore/SQL/Helpers/PdoDsn.php new file mode 100644 index 000000000..ec9ae4112 --- /dev/null +++ b/packages/EventSourcingV2/tests/EventStore/SQL/Helpers/PdoDsn.php @@ -0,0 +1,102 @@ +dsn = $dsn; + $this->parseDsn($dsn); + } + + private function parseDsn(string $dsn): void + { + $dsn = trim($dsn); + + if (strpos($dsn, ':') === false) { + throw new \LogicException(sprintf('The DSN is invalid. It does not have scheme separator ":".')); + } + + list($prefix, $dsnWithoutPrefix) = preg_split('#\s*:\s*#', $dsn, 2); + + $this->protocol = $prefix; + + if (preg_match('/^[a-z\d]+$/', strtolower($prefix)) == false) { + throw new \LogicException('The DSN is invalid. Prefix contains illegal symbols.'); + } + + $dsnElements = preg_split('#\s*\;\s*#', $dsnWithoutPrefix); + + $elements = []; + foreach ($dsnElements as $element) { + if (strpos($dsnWithoutPrefix, '=') !== false) { + list($key, $value) = preg_split('#\s*=\s*#', $element, 2); + $elements[$key] = $value; + } else { + $elements = [ + $dsnWithoutPrefix, + ]; + } + } + $this->parameters = $elements; + } + + public function getDsn(): string + { + return $this->dsn; + } + + public function getProtocol(): ?string + { + return $this->protocol; + } + + public function getDatabase(): ?string + { + return $this->getAttribute('dbname') ?? null; + } + + public function getHost(): string + { + return $this->getAttribute('host'); + } + + public function getPort(): string + { + return $this->getAttribute('port'); + } + + public function getCharset(): string + { + return $this->getAttribute('charset'); + } + + /** + * Get an attribute from the $attributes array. + */ + private function getAttribute(string $key): mixed + { + if (isset($this->parameters[$key])) { + return $this->parameters[$key]; + } + + return null; + } + + public function getParameters(): array + { + return $this->parameters; + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/tests/EventStore/SQL/MysqlDoctrineIntegrationTest.php b/packages/EventSourcingV2/tests/EventStore/SQL/MysqlDoctrineIntegrationTest.php new file mode 100644 index 000000000..47765c6cb --- /dev/null +++ b/packages/EventSourcingV2/tests/EventStore/SQL/MysqlDoctrineIntegrationTest.php @@ -0,0 +1,15 @@ +createEventStore(...$args); + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/tests/EventStore/SubscriptionTestCaseTrait.php b/packages/EventSourcingV2/tests/EventStore/SubscriptionTestCaseTrait.php new file mode 100644 index 000000000..71f25ae43 --- /dev/null +++ b/packages/EventSourcingV2/tests/EventStore/SubscriptionTestCaseTrait.php @@ -0,0 +1,47 @@ +config()->createEventStore(); + $eventStreamId = new StreamEventId(Uuid::uuid4()->toString()); + + $eventStore->deleteSubscription(__METHOD__); + $eventStore->createSubscription(__METHOD__, new SubscriptionQuery(streamIds: [$eventStreamId->streamId])); + + $page = $eventStore->readFromSubscription(__METHOD__); + self::assertEmpty($page->events); + + $eventStore->append($eventStreamId, [ + new Event('event_type', ['data' => 'value']), + new Event('event_type', ['data' => 'value']), + ]); + + $page = $eventStore->readFromSubscription(__METHOD__); + self::assertCount(2, $page->events); + + $eventStore->append($eventStreamId, [ + new Event('event_type', ['data' => 'value']), + new Event('event_type', ['data' => 'value']), + ]); + + $page = $eventStore->readFromSubscription(__METHOD__); + self::assertCount(4, $page->events); + + $eventStore->ack($page); + $page = $eventStore->readFromSubscription(__METHOD__); + self::assertCount(0, $page->events); + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/tests/EventStore/SubscriptionTransactionalTestCaseTrait.php b/packages/EventSourcingV2/tests/EventStore/SubscriptionTransactionalTestCaseTrait.php new file mode 100644 index 000000000..e94823e2e --- /dev/null +++ b/packages/EventSourcingV2/tests/EventStore/SubscriptionTransactionalTestCaseTrait.php @@ -0,0 +1,72 @@ +config()->getConnection(); + $connection2 = $this->config()->getConnection(); + + $eventStore1 = $this->config()->createEventStore(connection: $connection1); + $eventStore2 = $this->config()->createEventStore(connection: $connection2); + + // Session 1 + { + $transaction1 = $connection1->beginTransaction(); + $eventStreamId1 = new StreamEventId(Uuid::uuid4()->toString()); + $eventStore1->append($eventStreamId1, [ + new Event('event_type', ['data' => 'value']), + new Event('event_type', ['data' => 'value']), + ]); + } + + // Session 2 must not see events from Session 1 + { + // When I add and commit new events to a different stream + $eventStore2->append($eventStreamId2 = new StreamEventId(Uuid::uuid4()->toString()), [ + new Event('event_type', ['data' => 'value']), + ]); + self::assertCount( + 0, + iterator_to_array($eventStore2->query(new SubscriptionQuery(streamIds: [$eventStreamId1->streamId, $eventStreamId2->streamId]))), + "Session 2 must not see neither events from Session 1 and Session 2 if no Gaps", + ); + + self::assertCount( + 1, + iterator_to_array($eventStore2->query(new SubscriptionQuery(streamIds: [$eventStreamId1->streamId, $eventStreamId2->streamId], allowGaps: true))), + "If gaps are allowed, Session 2 must not see events from Session 1 (not committed) but see events of Session 2", + ); + + } + + // Commit Session 1 + { + try { + $transaction1->commit(); + } catch (\Throwable $e) { + $transaction1->rollBack(); + throw $e; + } + } + + self::assertCount( + 3, + iterator_to_array($eventStore2->query(new SubscriptionQuery(streamIds: [$eventStreamId1->streamId, $eventStreamId2->streamId], allowGaps: true))), + "Session 2 must see events from Session 1 and Session 2 after Session 1 commits" + ); + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/tests/EventStore/Test/InMemoryEventStoreTest.php b/packages/EventSourcingV2/tests/EventStore/Test/InMemoryEventStoreTest.php new file mode 100644 index 000000000..bab146529 --- /dev/null +++ b/packages/EventSourcingV2/tests/EventStore/Test/InMemoryEventStoreTest.php @@ -0,0 +1,22 @@ +getConnection(); + + return $config->createEventStore( + projectors: [ + 'base' => new PostgresTableProjector($connection, 'test_event_base'), + 'catchup' => new PostgresTableProjector($connection, 'test_event_catchup'), + ], + connection: $connection, + ); +} + +$application->register('long-running-append') + ->addOption('dbConfig', null, InputOption::VALUE_REQUIRED, 'Database configuration serialized') + ->addOption('streamId', null, InputOption::VALUE_OPTIONAL, 'Stream ID') + ->addOption('start_event_count', null, InputOption::VALUE_OPTIONAL, 'Number of events appended at start', 1) + ->addOption('event_count', null, InputOption::VALUE_OPTIONAL, 'Number of events to append', 1) + ->setCode(function (InputInterface $input, OutputInterface $output) { + $eventStore = createEventStore($input->getOption('dbConfig')); + $connection = $eventStore->connection(); + + $streamId = new StreamEventId($input->getOption('streamId') ?: Uuid::uuid4()); + $startEventCount = (int) $input->getOption('start_event_count'); + $eventCount = (int) $input->getOption('event_count'); + + $eventStore->append($streamId, array_map(fn () => new Event('start_event', ['data' => 'value']), range(1, $startEventCount))); + + $transaction = $connection->beginTransaction(); + + try { + $eventStore->append($streamId, array_map(fn () => new Event('long_running_event', ['data' => 'value']), range(1, $eventCount))); + + $output->writeln('Events appended, waiting some input to commit'); + + $questionHelper = new QuestionHelper(); + $question = new Question('Press enter to commit'); + + $questionHelper->ask($input, $output, $question); + + $transaction->commit(); + $output->writeln('Events committed'); + return Command::SUCCESS; + } catch (\Throwable $e) { + $output->writeln('Error occurred, rolling back'); + $transaction->rollBack(); + throw $e; + } + }); + +$application->register("catchup-projection") + ->addOption('dbConfig', null, InputOption::VALUE_REQUIRED, 'Database configuration serialized') + ->addOption('streamId', null, InputOption::VALUE_OPTIONAL, 'Stream ID') + ->setCode(function (InputInterface $input, OutputInterface $output) { + $eventStore = createEventStore($input->getOption('dbConfig')); + + $output->writeln('Running catchup projection'); + + try { + $eventStore->catchupProjection('catchup'); + } catch (\Throwable $e) { + $output->writeln(sprintf('Error occurred: %s', $e->getMessage())); + throw $e; + } + + $output->writeln('Catchup projection done'); + return Command::SUCCESS; + }); + +$application->register("init") + ->addOption('dbConfig', null, InputOption::VALUE_REQUIRED, 'Database configuration serialized') + ->addOption('streamId', null, InputOption::VALUE_OPTIONAL, 'Stream ID') + ->setCode(function (InputInterface $input, OutputInterface $output) { + $eventStore = createEventStore($input->getOption('dbConfig')); + + $eventStore->removeProjection('base'); + $eventStore->removeProjection('catchup'); + + $eventStore->addProjection('base'); + $eventStore->catchupProjection('base'); + + $eventStore->addProjection('catchup'); + + $output->writeln('Projections initialized'); + return Command::SUCCESS; + }); + +$application->register("create-schema") + ->addOption('dbConfig', null, InputOption::VALUE_REQUIRED, 'Database configuration serialized') + ->setCode(function (InputInterface $input, OutputInterface $output) { + $eventStore = createEventStore($input->getOption('dbConfig')); + + $eventStore->schemaUp(); + return Command::SUCCESS; + }); + +$application->register("drop-schema") + ->addOption('dbConfig', null, InputOption::VALUE_REQUIRED, 'Database configuration serialized') + ->setCode(function (InputInterface $input, OutputInterface $output) { + $eventStore = createEventStore($input->getOption('dbConfig')); + + $eventStore->schemaDown(); + return Command::SUCCESS; + }); + + +$application->run(); \ No newline at end of file diff --git a/packages/EventSourcingV2/tests/Fixture/CounterProjection/AsyncCounterProjection.php b/packages/EventSourcingV2/tests/Fixture/CounterProjection/AsyncCounterProjection.php new file mode 100644 index 000000000..0c7b12af6 --- /dev/null +++ b/packages/EventSourcingV2/tests/Fixture/CounterProjection/AsyncCounterProjection.php @@ -0,0 +1,50 @@ +counters[\get_class($event)] = ($this->counters[\get_class($event)] ?? 0) + 1; + } + + #[EventHandler(endpointId: 'onTicketWasAssigned')] + public function onTicketWasAssigned(TicketWasAssigned $event): void + { + $this->counters[\get_class($event)] = ($this->counters[\get_class($event)] ?? 0) + 1; + } + + public function getCounters(): array + { + return $this->counters; + } + + public function count(string $eventClass = null): int + { + if ($eventClass === null) { + return array_sum($this->counters); + } + return $this->counters[$eventClass] ?? 0; + } + + public function reset(): void + { + $this->counters = []; + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/tests/Fixture/CounterProjection/CounterProjection.php b/packages/EventSourcingV2/tests/Fixture/CounterProjection/CounterProjection.php new file mode 100644 index 000000000..8525dc28f --- /dev/null +++ b/packages/EventSourcingV2/tests/Fixture/CounterProjection/CounterProjection.php @@ -0,0 +1,48 @@ +counters[\get_class($event)] = ($this->counters[\get_class($event)] ?? 0) + 1; + } + + #[EventHandler] + public function onTicketWasAssigned(TicketWasAssigned $event): void + { + $this->counters[\get_class($event)] = ($this->counters[\get_class($event)] ?? 0) + 1; + } + + public function getCounters(): array + { + return $this->counters; + } + + public function count(string $eventClass = null): int + { + if ($eventClass === null) { + return array_sum($this->counters); + } + return $this->counters[$eventClass] ?? 0; + } + + public function reset(): void + { + $this->counters = []; + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/tests/Fixture/EventNotifier/EventNotifier.php b/packages/EventSourcingV2/tests/Fixture/EventNotifier/EventNotifier.php new file mode 100644 index 000000000..d49e9a386 --- /dev/null +++ b/packages/EventSourcingV2/tests/Fixture/EventNotifier/EventNotifier.php @@ -0,0 +1,30 @@ +notifiedEvents[] = $events; + } + + public function getNotifiedEvents(): array + { + return $this->notifiedEvents; + } + + public function clear(): void + { + $this->notifiedEvents = []; + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/tests/Fixture/Ticket/AssignTicket.php b/packages/EventSourcingV2/tests/Fixture/Ticket/AssignTicket.php new file mode 100644 index 000000000..5d0ea9620 --- /dev/null +++ b/packages/EventSourcingV2/tests/Fixture/Ticket/AssignTicket.php @@ -0,0 +1,14 @@ +apply(new TicketWasCreated($command->ticketId)); + return $ticket; + } + + #[CommandHandler] + public function assign(AssignTicket $command): void + { + $this->apply(new TicketWasAssigned($command->ticketId, $command->assignee)); + } + + public function getTicketId(): string + { + return $this->ticketId; + } + + public function getAssignee(): string + { + return $this->assignee; + } + + #[EventSourcingHandler] + protected function applyTicketWasCreated(TicketWasCreated $event): void + { + $this->ticketId = $event->id; + } + + #[EventSourcingHandler] + protected function applyTicketWasAssigned(TicketWasAssigned $event): void + { + $this->assignee = $event->assignee; + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/tests/Fixture/Ticket/TicketWasAssigned.php b/packages/EventSourcingV2/tests/Fixture/Ticket/TicketWasAssigned.php new file mode 100644 index 000000000..1daa397df --- /dev/null +++ b/packages/EventSourcingV2/tests/Fixture/Ticket/TicketWasAssigned.php @@ -0,0 +1,14 @@ +ticketId)]; + } + + #[CommandHandler] + public function assign(AssignTicket $command): array + { + return [new TicketWasAssigned($command->ticketId, $command->assignee)]; + } + + public function getTicketId(): string + { + return $this->ticketId; + } + + public function getAssignee(): string + { + return $this->assignee; + } + + #[EventSourcingHandler] + public function applyTicketWasCreated(TicketWasCreated $event): void + { + $this->ticketId = $event->id; + } + + #[EventSourcingHandler] + public function applyTicketWasAssigned(TicketWasAssigned $event): void + { + $this->assignee = $event->assignee; + } +} \ No newline at end of file diff --git a/packages/EventSourcingV2/tests/Integration/TicketAggregateTest.php b/packages/EventSourcingV2/tests/Integration/TicketAggregateTest.php new file mode 100644 index 000000000..278c83e13 --- /dev/null +++ b/packages/EventSourcingV2/tests/Integration/TicketAggregateTest.php @@ -0,0 +1,145 @@ +withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::EVENT_SOURCING_V2_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE])), + addInMemoryStateStoredRepository: false, + addInMemoryEventSourcedRepository: false + ); + } + + public function aggregateClassDataProvider(): array + { + return [ + [Ticket::class], + [TicketPure::class], + ]; + } + + /** + * @dataProvider aggregateClassDataProvider + */ + public function test_creating_ticket(string $aggregateClass): void + { + $ecotone = self::bootstrapFlowTesting([$aggregateClass]); + + $ecotone->sendCommand(new CreateTicket("1")); + + $ticket = $ecotone->getAggregate($aggregateClass, "1"); + + self::assertEquals("1", $ticket->getTicketId()); + self::assertEquals(null, $ticket->getAssignee()); + } + + /** + * @dataProvider aggregateClassDataProvider + */ + public function test_event_sourced_aggregate_events(string $aggregateClass): void + { + $ecotone = self::bootstrapFlowTesting([$aggregateClass, EventNotifier::class], [$eventNotifier = new EventNotifier()]); + + $ecotone->sendCommand(new CreateTicket("1")); + $ecotone->sendCommand(new AssignTicket("1", "John")); + + self::assertEquals([ + new TicketWasCreated("1"), + new TicketWasAssigned("1", "John") + ], $eventNotifier->getNotifiedEvents()); + } + + /** + * @dataProvider aggregateClassDataProvider + */ + public function test_projection(string $aggregateClass): void + { + $ecotone = self::bootstrapFlowTesting([$aggregateClass, CounterProjection::class], [$counterProjection = new CounterProjection()]); + $eventStore = $ecotone->getGateway(EventStore::class); + + $ecotone->sendCommand(new CreateTicket("1")); + + self::assertEquals(0, $counterProjection->count(), "Projection should not be run before initialization"); + + assert($eventStore instanceof InlineProjectionManager); + + $eventStore->addProjection('counter'); + + self::assertEquals(0, $counterProjection->count(), "Projection should not run without a catchup"); + + $eventStore->catchupProjection('counter'); + + self::assertEquals(1, $counterProjection->count(), "Projection should catchup existing events"); + + $ecotone->sendCommand(new CreateTicket("1")); + + self::assertEquals(2, $counterProjection->count(), "Projection should run new events"); + } + + /** + * @dataProvider aggregateClassDataProvider + */ + public function test_async_projection(string $aggregateClass): void + { + $ecotone = self::bootstrapFlowTesting([$aggregateClass, AsyncCounterProjection::class], [$counterProjection = new AsyncCounterProjection()]); + $eventStore = $ecotone->getGateway(EventStore::class); + /** @var EcotoneAsynchronousProjectionRunner $projectionRunner */ + $projectionRunner = $ecotone->getGateway(EcotoneAsynchronousProjectionRunner::class); + + $ecotone->sendCommand(new CreateTicket("1")); + + self::assertEquals(0, $counterProjection->count(), "Projection should not be run before initialization"); + + assert($eventStore instanceof PersistentSubscriptions); + assert($eventStore instanceof InlineProjectionManager); + + $eventStore->createSubscription('counter_async', new SubscriptionQuery()); + + self::assertEquals(0, $counterProjection->count(), "Projection should not run without a catchup"); + + $ecotone->run("async_channel"); + +// $projectionRunner->run(new EcotoneAsynchronousProjectionRunnerCommand('counter_async')); + + self::assertEquals(1, $counterProjection->count(), "Projection should catchup existing events"); + + $ecotone->sendCommand(new CreateTicket("1")); + + self::assertEquals(1, $counterProjection->count(), "Projection should not run new events synchronously"); + + $ecotone->run("async_channel"); + + self::assertEquals(2, $counterProjection->count(), "Projection should run new events after running async channel"); + } +} \ No newline at end of file diff --git a/packages/PdoEventSourcing/src/Config/StreamNameMapper.php b/packages/PdoEventSourcing/src/Config/StreamNameMapper.php index 453fb3485..3e2eed7a0 100644 --- a/packages/PdoEventSourcing/src/Config/StreamNameMapper.php +++ b/packages/PdoEventSourcing/src/Config/StreamNameMapper.php @@ -13,7 +13,7 @@ /** * licence Apache-2.0 */ -final class StreamNameMapper implements MessageProcessor, DefinedObject +final class StreamNameMapper implements MessageProcessor { public function process(Message $message): Message { @@ -22,9 +22,4 @@ public function process(Message $message): Message $message->getHeaders()->get(ProjectionEventHandler::PROJECTION_NAME) ))->build(); } - - public function getDefinition(): Definition - { - return new Definition(self::class); - } } diff --git a/packages/PdoEventSourcing/tests/Fixture/TicketWithCounterProjection/CounterProjection.php b/packages/PdoEventSourcing/tests/Fixture/TicketWithCounterProjection/CounterProjection.php new file mode 100644 index 000000000..8cd0d2b23 --- /dev/null +++ b/packages/PdoEventSourcing/tests/Fixture/TicketWithCounterProjection/CounterProjection.php @@ -0,0 +1,29 @@ +counter; + } + + #[EventHandler] + public function onAny(object $event): void + { + $this->counter++; + } +} \ No newline at end of file diff --git a/packages/PdoEventSourcing/tests/InMemory/EcotoneLiteEventSourcingTest.php b/packages/PdoEventSourcing/tests/InMemory/EcotoneLiteEventSourcingTest.php index 105e933f5..070a3a67c 100644 --- a/packages/PdoEventSourcing/tests/InMemory/EcotoneLiteEventSourcingTest.php +++ b/packages/PdoEventSourcing/tests/InMemory/EcotoneLiteEventSourcingTest.php @@ -17,6 +17,7 @@ use Test\Ecotone\EventSourcing\Fixture\Ticket\Command\RegisterTicket; use Test\Ecotone\EventSourcing\Fixture\Ticket\Ticket; use Test\Ecotone\EventSourcing\Fixture\Ticket\TicketEventConverter; +use Test\Ecotone\EventSourcing\Fixture\TicketWithCounterProjection\CounterProjection; use Test\Ecotone\EventSourcing\Fixture\TicketWithInMemoryAsynchronousEventDrivenProjection\InProgressTicketList; use Test\Ecotone\EventSourcing\Fixture\TicketWithInMemoryAsynchronousEventDrivenProjection\ProjectionConfiguration; @@ -227,4 +228,24 @@ public function test_deleting_projection_table(): void $projectionManager->initializeProjection('inProgressTicketList'); self::assertCount(5, $connection->fetchAllAssociative('select * from in_progress_tickets'), 'Read model table should be rebuild after second initialization'); } + + public function test_it_CANNOT_project_object_type_event_handler(): void + { + $ecotoneTestSupport = EcotoneLite::bootstrapForTesting( + [Ticket::class, TicketEventConverter::class, CounterProjection::class], + [new TicketEventConverter(), $counter = new CounterProjection()], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::EVENT_SOURCING_PACKAGE])) + ->withEnvironment('test') + ->withExtensionObjects([ + EventSourcingConfiguration::createInMemory(), + ]), + ); + + $this->assertSame(0, $counter->getCounter()); + + $ecotoneTestSupport->getCommandBus()->send(new RegisterTicket('1', 'johny', 'alert')); + + $this->assertSame(0, $counter->getCounter(), "Projection should not be triggered"); + } } diff --git a/packages/local_packages.json b/packages/local_packages.json index b97dbf8d1..346990767 100644 --- a/packages/local_packages.json +++ b/packages/local_packages.json @@ -20,6 +20,10 @@ "type": "path", "url": "PdoEventSourcing" }, + { + "type": "path", + "url": "EventSourcingV2" + }, { "type": "path", "url": "JmsConverter" diff --git a/phpunit.xml.dist b/phpunit.xml.dist index f66d2c039..4d110173d 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -18,6 +18,7 @@ packages/Sqs/src packages/Redis/src packages/Kafka/src + packages/EventSourcing/src @@ -35,9 +36,12 @@ packages/Dbal/tests - + packages/PdoEventSourcing/tests + + packages/EventSourcingV2/tests + packages/JmsConverter/tests