Skip to content

Commit 77dd79d

Browse files
authored
Merge pull request #98 from Cafeine42/master
Improve SQS Transport Resolver to match standard configuration
2 parents 8cd7e1c + 9a4d547 commit 77dd79d

File tree

3 files changed

+121
-6
lines changed

3 files changed

+121
-6
lines changed

src/Resources/config/services.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ services:
2222
# SQS
2323
Bref\Symfony\Messenger\Service\Sqs\SqsTransportNameResolver:
2424
arguments:
25-
- '@Bref\Symfony\Messenger\Service\MessengerTransportConfiguration'
25+
$messengerTransportsConfiguration: '%messenger.transports%'
2626

2727
# EventBridge
2828
Bref\Symfony\Messenger\Service\EventBridge\EventBridgeTransportFactory:
Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1-
<?php
1+
<?php declare(strict_types=1);
22

33
namespace Bref\Symfony\Messenger\Service\Sqs;
44

55
use Bref\Event\Sqs\SqsRecord;
6-
use Bref\Symfony\Messenger\Service\MessengerTransportConfiguration;
76
use InvalidArgumentException;
87

98
/** @final */
@@ -12,18 +11,45 @@ class SqsTransportNameResolver
1211
private const TRANSPORT_PROTOCOL = 'sqs://';
1312

1413
public function __construct(
15-
private MessengerTransportConfiguration $configurationProvider
14+
private array $messengerTransportsConfiguration
1615
) {
1716
}
1817

1918
public function __invoke(SqsRecord $sqsRecord): string
2019
{
21-
if (!array_key_exists('eventSourceARN', $sqsRecord->toArray())) {
20+
if (! array_key_exists('eventSourceARN', $sqsRecord->toArray())) {
2221
throw new InvalidArgumentException('EventSourceArn is missing in sqs record.');
2322
}
2423

2524
$eventSourceArn = $sqsRecord->toArray()['eventSourceARN'];
25+
$eventSourceWithProtocol = self::TRANSPORT_PROTOCOL . $eventSourceArn;
2626

27-
return $this->configurationProvider->provideTransportFromEventSource(self::TRANSPORT_PROTOCOL . $eventSourceArn);
27+
foreach ($this->messengerTransportsConfiguration as $messengerTransport => $messengerOptions) {
28+
$dsn = $this->extractDsnFromTransport($messengerOptions);
29+
30+
if ($dsn === $eventSourceWithProtocol) {
31+
return $messengerTransport;
32+
}
33+
34+
// Rebuild SQS ARN from https://sqs.eu-west-3.amazonaws.com/0123456789/messages?key=value
35+
if (preg_match('/^https:\/\/sqs\.([^.]+)\.amazonaws\.com\/([^\/]+)\/([^?]+)/', (string) $dsn, $matches)) {
36+
$arn = 'arn:aws:sqs:' . $matches[1] . ':' . $matches[2] . ':' . $matches[3];
37+
38+
if ($eventSourceWithProtocol === 'sqs://' . $arn) {
39+
return $messengerTransport;
40+
}
41+
}
42+
}
43+
44+
throw new InvalidArgumentException(sprintf('No transport found for eventSource "%s".', $eventSourceWithProtocol));
45+
}
46+
47+
private function extractDsnFromTransport(string|array $messengerTransport): string
48+
{
49+
if (is_array($messengerTransport) && array_key_exists('dsn', $messengerTransport)) {
50+
return $messengerTransport['dsn'];
51+
}
52+
53+
return $messengerTransport;
2854
}
2955
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
<?php declare(strict_types=1);
2+
3+
namespace Bref\Symfony\Messenger\Test\Unit\Service\Sqs;
4+
5+
use Bref\Event\Sqs\SqsRecord;
6+
use Bref\Symfony\Messenger\Service\Sqs\SqsTransportNameResolver;
7+
use InvalidArgumentException;
8+
use PHPUnit\Framework\TestCase;
9+
use Prophecy\PhpUnit\ProphecyTrait;
10+
11+
final class SqsTransportNameResolverTest extends TestCase
12+
{
13+
use ProphecyTrait;
14+
15+
public function test_event_source_can_resolved_as_expected(): void
16+
{
17+
$messengerTransportsConfiguration = [
18+
'async' => [
19+
'dsn' => 'sqs://arn:aws:sqs:us-east-1:1234567890:some-queue-name',
20+
],
21+
];
22+
23+
$transportNameResolver = new SqsTransportNameResolver($messengerTransportsConfiguration);
24+
25+
$event = new SqsRecord([
26+
'messageId' => '19dd0b57-b21e-4ac1-bd88-01bbb068cb78',
27+
'body' => 'Test message.',
28+
'messageAttributes' => [],
29+
'attributes' => [
30+
'ApproximateReceiveCount' => '1',
31+
],
32+
'receiptHandle' => 'AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...',
33+
'eventSource' => 'aws:sqs',
34+
'eventSourceARN' => 'arn:aws:sqs:us-east-1:1234567890:some-queue-name',
35+
]);
36+
37+
self::assertSame('async', ($transportNameResolver)($event));
38+
}
39+
40+
public function test_event_source_can_resolved_as_expected_with_queue_url(): void
41+
{
42+
$messengerTransportsConfiguration = [
43+
'async' => [
44+
'dsn' => 'https://sqs.us-east-1.amazonaws.com/1234567890/some-queue-name',
45+
],
46+
];
47+
48+
$transportNameResolver = new SqsTransportNameResolver($messengerTransportsConfiguration);
49+
50+
$event = new SqsRecord([
51+
'messageId' => '19dd0b57-b21e-4ac1-bd88-01bbb068cb78',
52+
'body' => 'Test message.',
53+
'messageAttributes' => [],
54+
'attributes' => [
55+
'ApproximateReceiveCount' => '1',
56+
],
57+
'receiptHandle' => 'AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...',
58+
'eventSource' => 'aws:sqs',
59+
'eventSourceARN' => 'arn:aws:sqs:us-east-1:1234567890:some-queue-name',
60+
]);
61+
62+
self::assertEquals('async', ($transportNameResolver)($event));
63+
}
64+
65+
public function test_throws_exception_if_event_source_arn_does_not_exist(): void
66+
{
67+
$messengerTransportsConfiguration = [
68+
'transport1' => [
69+
'dsn' => 'sqs://arn:aws:sqs:us-east-1:0123456789:some-queue-name',
70+
],
71+
];
72+
73+
$transportNameResolver = new SqsTransportNameResolver($messengerTransportsConfiguration);
74+
75+
$eventWithMissingeventSourceARN = new SqsRecord([
76+
'messageId' => '19dd0b57-b21e-4ac1-bd88-01bbb068cb78',
77+
'body' => 'Test message.',
78+
'messageAttributes' => [],
79+
'attributes' => [
80+
'ApproximateReceiveCount' => '1',
81+
],
82+
'receiptHandle' => 'AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...',
83+
'eventSource' => 'aws:sqs',
84+
]);
85+
86+
$this->expectException(InvalidArgumentException::class);
87+
($transportNameResolver)($eventWithMissingeventSourceARN);
88+
}
89+
}

0 commit comments

Comments
 (0)