-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAbstractEndpoint.php
296 lines (250 loc) · 8.16 KB
/
AbstractEndpoint.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
<?php
/**
* @author Marwan Al-Soltany <[email protected]>
* @copyright Marwan Al-Soltany 2020
* For the full copyright and license information, please view
* the LICENSE file that was distributed with this source code.
*/
declare(strict_types=1);
namespace MAKS\AmqpAgent\RPC;
use Exception;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Message\AMQPMessage;
use MAKS\AmqpAgent\RPC\AbstractEndpointInterface;
use MAKS\AmqpAgent\Helper\EventTrait;
use MAKS\AmqpAgent\Exception\MagicMethodsExceptionsTrait;
use MAKS\AmqpAgent\Exception\RPCEndpointException;
use MAKS\AmqpAgent\Config\RPCEndpointParameters as Parameters;
/**
* An abstract class implementing the basic functionality of an endpoint.
* @since 2.0.0
* @api
*/
abstract class AbstractEndpoint implements AbstractEndpointInterface
{
use MagicMethodsExceptionsTrait;
use EventTrait;
/**
* The connection options of the RPC endpoint.
* @var array
*/
protected $connectionOptions;
/**
* The queue name of the RPC endpoint.
* @var string
*/
protected $queueName;
/**
* Whether the endpoint is connected to RabbitMQ server or not.
* @var bool
*/
protected $connected;
/**
* The endpoint connection.
* @var AMQPStreamConnection
*/
protected $connection;
/**
* The endpoint channel.
* @var AMQPChannel
*/
protected $channel;
/**
* The request body.
* @var string
*/
protected $requestBody;
/**
* Requests conveyor.
* @var string
*/
protected $requestQueue;
/**
* The response body.
* @var string
*/
protected $responseBody;
/**
* Responses conveyor.
* @var string
*/
protected $responseQueue;
/**
* Correlation ID of the last request/response.
* @var string
*/
protected $correlationId;
/**
* Class constructor.
* @param array $connectionOptions [optional] The overrides for the default connection options of the RPC endpoint.
* @param string $queueName [optional] The override for the default queue name of the RPC endpoint.
*/
public function __construct(?array $connectionOptions = [], ?string $queueName = null)
{
$this->connectionOptions = Parameters::patch($connectionOptions, 'RPC_CONNECTION_OPTIONS');
$this->queueName = empty($queueName) ? Parameters::RPC_QUEUE_NAME : $queueName;
}
/**
* Closes the connection with RabbitMQ server before destroying the object.
*/
public function __destruct()
{
$this->disconnect();
}
/**
* Opens a connection with RabbitMQ server.
* @param array|null $connectionOptions [optional] The overrides for the default connection options of the RPC endpoint.
* @return self
* @throws RPCEndpointException If the endpoint is already connected.
*/
public function connect(?array $connectionOptions = [])
{
$this->connectionOptions = Parameters::patchWith(
$connectionOptions ?? [],
$this->connectionOptions
);
if ($this->isConnected()) {
throw new RPCEndpointException('Endpoint is already connected!');
}
$parameters = array_values($this->connectionOptions);
$this->connection = new AMQPStreamConnection(...$parameters);
$this->trigger('connection.after.open', [$this->connection]);
$this->channel = $this->connection->channel();
$this->trigger('channel.after.open', [$this->channel]);
return $this;
}
/**
* Closes the connection with RabbitMQ server.
* @return void
*/
public function disconnect(): void
{
if ($this->isConnected()) {
$this->connected = null;
$this->trigger('channel.before.close', [$this->channel]);
$this->channel->close();
$this->trigger('connection.before.close', [$this->connection]);
$this->connection->close();
}
}
/**
* Returns whether the endpoint is connected or not.
* @return bool
*/
public function isConnected(): bool
{
$this->connected = (
isset($this->connection) &&
isset($this->channel) &&
$this->connection->isConnected() &&
$this->channel->is_open()
);
return $this->connected;
}
/**
* Returns the connection used by the endpoint.
* @return AMQPStreamConnection
*/
public function getConnection(): AMQPStreamConnection
{
return $this->connection;
}
/**
* The time needed for the round-trip to RabbitMQ server in milliseconds.
* Note that if the endpoint is not connected yet, this method will establish a new connection only for checking.
* @return float A two decimal points rounded float.
*/
final public function ping(): float
{
try {
$pingConnection = $this->connection;
if (!isset($pingConnection) || !$pingConnection->isConnected()) {
$parameters = array_values($this->connectionOptions);
$pingConnection = new AMQPStreamConnection(...$parameters);
}
$pingChannel = $pingConnection->channel();
[$pingQueue] = $pingChannel->queue_declare(
null,
false,
false,
true,
true
);
$pingChannel->basic_qos(
null,
1,
null
);
$pingEcho = null;
$pingChannel->basic_consume(
$pingQueue,
null,
false,
false,
false,
false,
function ($message) use (&$pingEcho) {
$message->ack();
$pingEcho = $message->body;
}
);
$pingStartTime = microtime(true);
$pingChannel->basic_publish(
new AMQPMessage(__FUNCTION__),
null,
$pingQueue
);
while (!$pingEcho) {
$pingChannel->wait();
}
$pingEndTime = microtime(true);
$pingChannel->queue_delete($pingQueue);
if ($pingConnection === $this->connection) {
$pingChannel->close();
} else {
$pingChannel->close();
$pingConnection->close();
}
return round(($pingEndTime - $pingStartTime) * 1000, 2);
} catch (Exception $error) {
RPCEndpointException::rethrow($error);
}
}
/**
* Hooking method based on events to manipulate the request/response during the endpoint/message life cycle.
* Check out `self::$events` via `self::getEvents()` after processing at least one request/response to see all available events.
*
* The parameters will be passed to the callback as follows:
* 1. `$listenedOnObject` (first segment of event name e.g. `'connection.after.open'` will be `$connection`),
* 2. `$calledOnObject` (the object this method was called on e.g. `$endpoint`),
* 3. `$eventName` (the event was listened on e.g. `'connection.after.open'`).
* ```
* $endpoint->on('connection.after.open', function ($connection, $endpoint, $event) {
* ...
* });
* ```
* @param string $event The event to listen on.
* @param callable $callback The callback to execute.
* @return self
*/
final public function on(string $event, callable $callback)
{
$this->bind($event, function (...$arguments) use ($event, $callback) {
call_user_func_array(
$callback,
array_merge(
$arguments,
[$this, $event]
)
);
});
return $this;
}
/**
* Hook method to manipulate the message (request/response) when extending the class.
* @param AMQPMessage $message
* @return string
*/
abstract protected function callback(AMQPMessage $message): string;
}