Skip to content

Commit e892e82

Browse files
authored
Projecting improvements 1 (#522)
* fix DbalProjectionStateStorage.php for manager registry connection factory * init all projections in one command * better gap timeout processing
1 parent af65f40 commit e892e82

File tree

10 files changed

+80
-21
lines changed

10 files changed

+80
-21
lines changed

packages/Ecotone/src/Messaging/Scheduling/Duration.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ public static function seconds(int|float $seconds): self
2929
return new self((int) round($seconds * 1_000_000));
3030
}
3131

32+
public static function minutes(int|float $seconds): self
33+
{
34+
return new self((int) round($seconds * 60 * 1_000_000));
35+
}
36+
3237
public static function zero(): self
3338
{
3439
return new self(0);

packages/Ecotone/src/Projecting/Config/ProjectingConsoleCommands.php

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
namespace Ecotone\Projecting\Config;
99

1010
use Ecotone\Messaging\Attribute\ConsoleCommand;
11+
use Ecotone\Messaging\Attribute\ConsoleParameterOption;
1112
use Ecotone\Projecting\ProjectionRegistry;
1213
use InvalidArgumentException;
1314

@@ -18,12 +19,21 @@ public function __construct(private ProjectionRegistry $registry)
1819
}
1920

2021
#[ConsoleCommand('ecotone:projection:init')]
21-
public function initProjection(string $name): void
22+
public function initProjection(?string $name = null, #[ConsoleParameterOption] bool $all = false): void
2223
{
23-
if (! $this->registry->has($name)) {
24-
throw new InvalidArgumentException("There is no projection with name {$name}");
24+
if ($name === null) {
25+
if (! $all) {
26+
throw new InvalidArgumentException('You need to provide projection name or use --all option');
27+
}
28+
foreach ($this->registry->projectionNames() as $projection) {
29+
$this->registry->get($projection)->init();
30+
}
31+
} else {
32+
if (! $this->registry->has($name)) {
33+
throw new InvalidArgumentException("There is no projection with name {$name}");
34+
}
35+
$this->registry->get($name)->init();
2536
}
26-
$this->registry->get($name)->init();
2737
}
2838

2939
#[ConsoleCommand('ecotone:projection:backfill')]

packages/Ecotone/src/Projecting/InMemory/InMemoryProjectionRegistry.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,9 @@ public function get(string $id): ProjectingManager
3636

3737
return $this->projectionManagers[$id];
3838
}
39+
40+
public function projectionNames(): iterable
41+
{
42+
return array_keys($this->projectionManagers);
43+
}
3944
}

packages/Ecotone/src/Projecting/ProjectingManager.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ public function __construct(
1919
private PartitionProvider $partitionProvider,
2020
private string $projectionName,
2121
private int $batchSize = 1000,
22+
private bool $autoInit = true,
2223
) {
2324
if ($batchSize < 1) {
2425
throw new InvalidArgumentException('Batch size must be at least 1');
@@ -28,7 +29,9 @@ public function __construct(
2829
// This is the method that is linked to the event bus routing channel
2930
public function execute(?string $partitionKey = null): void
3031
{
31-
$this->init();
32+
if ($this->autoInit) {
33+
$this->init();
34+
}
3235

3336
do {
3437
$transaction = $this->projectionStateStorage->beginTransaction();

packages/Ecotone/src/Projecting/ProjectionRegistry.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,6 @@
1212
interface ProjectionRegistry extends ContainerInterface
1313
{
1414
public function get(string $id): ProjectingManager;
15+
/** @return iterable<string> */
16+
public function projectionNames(): iterable;
1517
}

packages/PdoEventSourcing/src/Projecting/PartitionState/DbalProjectionStateStorage.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Ecotone\Projecting\Transaction;
1616
use Enqueue\Dbal\DbalConnectionFactory;
1717

18+
use Enqueue\Dbal\ManagerRegistryConnectionFactory;
1819
use function json_decode;
1920
use function json_encode;
2021

@@ -28,7 +29,7 @@ class DbalProjectionStateStorage implements ProjectionStateStorage
2829
private bool $initialized = false;
2930

3031
public function __construct(
31-
DbalConnectionFactory $connectionFactory,
32+
DbalConnectionFactory|ManagerRegistryConnectionFactory $connectionFactory,
3233
private string $stateTable = 'ecotone_projection_state',
3334
) {
3435
$this->connection = $connectionFactory->createContext()->getDbalConnection();

packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public function __construct(
2525
private EcotoneClockInterface $clock,
2626
private string $streamName,
2727
private int $maxGapOffset = 5_000,
28-
private ?Duration $gapTimeout = null,
28+
private ?Duration $gapTimeout = null,
2929
) {
3030
}
3131

@@ -39,6 +39,7 @@ public function load(?string $lastPosition, int $count, ?string $partitionKey =
3939
} else {
4040
$eventsInGaps = $this->eventStore->load(
4141
$this->streamName,
42+
count: count($tracking->getGaps()),
4243
metadataMatcher: (new MetadataMatcher())
4344
->withMetadataMatch('no', Operator::IN(), $tracking->getGaps(), FieldType::MESSAGE_PROPERTY()),
4445
deserialize: false,
@@ -55,9 +56,13 @@ public function load(?string $lastPosition, int $count, ?string $partitionKey =
5556

5657
$allEvents = [...$eventsInGaps, ...$events];
5758

59+
$now = $this->clock->now();
60+
$cutoffTimestamp = $this->gapTimeout ? $now->sub($this->gapTimeout)->getTimestamp() : 0;
5861
foreach ($allEvents as $event) {
5962
$position = $event->getMetadata()['_position'] ?? throw new RuntimeException('Event does not have a position');
60-
$tracking->advanceTo((int) $position);
63+
$timestamp = $event->getMetadata()['timestamp'] ?? throw new RuntimeException('Event does not have a timestamp');
64+
$insertGaps = $timestamp > $cutoffTimestamp;
65+
$tracking->advanceTo((int) $position, $insertGaps);
6166
}
6267

6368
$tracking->cleanByMaxOffset($this->maxGapOffset);
@@ -77,31 +82,41 @@ private function cleanGapsByTimeout(GapAwarePosition $tracking): void
7782
return;
7883
}
7984

80-
$minGap = min($gaps);
81-
$maxGap = max($gaps);
85+
$minGap = $gaps[0];
86+
$maxGap = $gaps[count($gaps) - 1];
8287

8388
// Query interleaved events in the gap range
8489
$interleavedEvents = $this->eventStore->load(
8590
$this->streamName,
91+
count: count($gaps),
8692
metadataMatcher: (new MetadataMatcher())
8793
->withMetadataMatch('no', Operator::GREATER_THAN_EQUALS(), $minGap, FieldType::MESSAGE_PROPERTY())
88-
->withMetadataMatch('no', Operator::LOWER_THAN_EQUALS(), $maxGap, FieldType::MESSAGE_PROPERTY()),
94+
->withMetadataMatch('no', Operator::LOWER_THAN_EQUALS(), $maxGap + 1, FieldType::MESSAGE_PROPERTY()),
8995
deserialize: false,
9096
);
9197

92-
$timestampThreshold = $this->clock->now()->unixTime()->sub($this->gapTimeout)->inSeconds();
98+
$timestampThreshold = $this->clock->now()->sub($this->gapTimeout)->unixTime()->inSeconds();
9399

94100
// Find the highest position with timestamp < timeThreshold
95101
$cutoffPosition = $minGap; // default: keep all gaps
96102
foreach ($interleavedEvents as $event) {
97103
$metadata = $event->getMetadata();
98-
$position = $metadata['_position'] ?? null;
104+
$interleavedEventPosition = ((int)$metadata['_position']) ?? null;
99105
$timestamp = $metadata['timestamp'] ?? null;
100106

101-
if ($position !== null && $timestamp !== null) {
102-
if ($timestamp < $timestampThreshold && $position > $cutoffPosition) {
103-
$cutoffPosition = $position + 1; // Remove gaps below this position
104-
}
107+
if ($interleavedEventPosition === null || $timestamp === null) {
108+
break;
109+
}
110+
if ($timestamp > $timestampThreshold) {
111+
// Event is recent, do not remove any gaps below this position
112+
break;
113+
}
114+
if (in_array($interleavedEventPosition, $gaps, true)) {
115+
// This position is a gap, stop cleaning
116+
break;
117+
}
118+
if ($timestamp < $timestampThreshold && $interleavedEventPosition > $cutoffPosition) {
119+
$cutoffPosition = $interleavedEventPosition + 1; // Remove gaps below this position
105120
}
106121
}
107122

packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSourceBuilder.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use Ecotone\Messaging\Config\Container\Definition;
1212
use Ecotone\Messaging\Config\Container\MessagingContainerBuilder;
1313
use Ecotone\Messaging\Config\Container\Reference;
14+
use Ecotone\Messaging\Scheduling\Duration;
1415
use Ecotone\Messaging\Scheduling\EcotoneClockInterface;
1516
use Ecotone\Projecting\Config\ProjectionComponentBuilder;
1617
use Ecotone\Projecting\StreamSource;
@@ -36,6 +37,8 @@ public function compile(MessagingContainerBuilder $builder): Definition|Referenc
3637
Reference::to(EventStore::class),
3738
Reference::to(EcotoneClockInterface::class),
3839
$this->streamName,
40+
5_000,
41+
new Definition(Duration::class, [60], 'seconds')
3942
],
4043
);
4144
}

packages/PdoEventSourcing/src/Projecting/StreamSource/GapAwarePosition.php

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,18 @@ public function getGaps(): array
7474
return $this->gaps;
7575
}
7676

77-
public function advanceTo(int $position): void
77+
public function advanceTo(int $position, bool $processGaps = true): void
7878
{
79+
if ($processGaps === false) {
80+
if ($position > $this->position) {
81+
$this->position = $position;
82+
} elseif ($position <= $this->position) {
83+
throw new InvalidArgumentException('Cannot advance to a position less than or equal to the current position. Current position: ' . $this->position . ', new position: ' . $position);
84+
}
85+
return;
86+
}
87+
88+
// With gap processing
7989
if ($position === $this->position + 1) {
8090
$this->position++;
8191
} elseif (in_array($position, $this->gaps, true)) {

packages/PdoEventSourcing/tests/Projecting/GapAwarePositionIntegrationTest.php

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,6 @@ public function test_gap_timeout_cleaning(): void
131131
self::$eventStore,
132132
self::$clock,
133133
Ticket::STREAM_NAME,
134-
maxGapOffset: 1000,
135134
gapTimeout: Duration::seconds(5)
136135
);
137136

@@ -142,15 +141,21 @@ public function test_gap_timeout_cleaning(): void
142141
$tracking = GapAwarePosition::fromString($result->lastPosition);
143142
self::assertSame([2, 4, 6], $tracking->getGaps());
144143

145-
// Delay 2 more seconds to exceed timeout for first gaps
144+
// Delay 2 more seconds to exceed timeout for first gaps (6 seconds since insertion)
146145
self::$clock->sleep(Duration::seconds(2));
147146

148147
// Execute
149148
$result = $streamSource->load(null, 100);
150149

151-
// Verify: Gaps 2, 7 should be removed (old timestamps), gaps 5, 9 should remain (recent timestamps)
150+
// Verify: Gaps 2, 4 should be removed (old timestamps), gap 6 should remain (recent timestamps)
152151
$newTracking = GapAwarePosition::fromString($result->lastPosition);
153152
self::assertSame([6], $newTracking->getGaps());
153+
154+
// Delay 3 more second to exceed timeout for all gaps (6 seconds since insertion of the last event)
155+
self::$clock->sleep(Duration::seconds(4));
156+
$result = $streamSource->load($result->lastPosition, 100);
157+
$newTracking = GapAwarePosition::fromString($result->lastPosition);
158+
self::assertSame([], $newTracking->getGaps());
154159
}
155160

156161
public function test_gap_cleaning_noop_when_no_gaps(): void

0 commit comments

Comments
 (0)