Skip to content

Commit 4f75c10

Browse files
committed
added example 6 rpc client/server
1 parent 9f0aec5 commit 4f75c10

File tree

2 files changed

+197
-0
lines changed

2 files changed

+197
-0
lines changed

php-amqp/rpc_client.php

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
#!/usr/bin/env php
2+
<?php
3+
/**
4+
rpc_client.php
5+
@author: Chimdi Azubuike <[email protected]>
6+
*/
7+
8+
class FibonacciRpcClient {
9+
private $connection;
10+
private $channel;
11+
12+
private $callbackQueueName;
13+
private $queueName = 'rpc_queue';
14+
private $rpcQueue = 'rpc_queue';
15+
16+
17+
private $response;
18+
19+
protected $queue;
20+
protected $corrId;
21+
22+
public function __construct() {
23+
$this->connection = $this->getAMQPConnection();
24+
$this->setChannel();
25+
$this->setExchange();
26+
}
27+
28+
/**
29+
AMQP Connection
30+
*/
31+
protected function getAMQPConnection() {
32+
$connection = new AMQPConnection();
33+
$connection->setHost('127.0.0.1');
34+
$connection->setLogin('guest');
35+
$connection->setPassword('guest');
36+
$connection->connect();
37+
return $connection;
38+
}
39+
40+
/**
41+
Declare Channel
42+
*/
43+
protected function setChannel() {
44+
$this->channel = new AMQPChannel($this->connection);
45+
$this->channel->setPrefetchCount(1);
46+
}
47+
48+
/**
49+
Declare Exchange
50+
*/
51+
protected function setExchange() {
52+
$this->exchange = new AMQPExchange($this->channel);
53+
}
54+
55+
public function on_response(AMQPEnvelope $message, AMQPQueue $queue) {
56+
print_r(func_get_args());
57+
}
58+
59+
public function call($value) {
60+
$this->response = NULL;
61+
$this->corrId = uniqid();
62+
63+
try {
64+
//Declare an nonymous channel
65+
$this->queue = new AMQPQueue($this->channel);
66+
$this->queue->setFlags(AMQP_EXCLUSIVE);
67+
$this->queue->declareQueue();
68+
$this->callbackQueueName = $this->queue->getName();
69+
70+
//Set Publish Attributes
71+
$attributes = array(
72+
'correlation_id' => $this->corrId,
73+
'reply_to' => $this->callbackQueueName
74+
);
75+
76+
$this->exchange->publish(
77+
$value,
78+
$this->rpcQueue,
79+
AMQP_NOPARAM,
80+
$attributes
81+
);
82+
83+
$callback = function(AMQPEnvelope $message, AMQPQueue $q) {
84+
if($message->getCorrelationId() == $this->corrId) {
85+
//echo sprintf("CorrelationID: %s",$message->getCorrelationId()), PHP_EOL;
86+
//echo sprintf("Response: %s",$message->getBody()), PHP_EOL;
87+
$this->response = $message->getBody();
88+
$q->nack($message->getDeliveryTag());
89+
return false;
90+
}
91+
};
92+
93+
$this->queue->consume($callback);
94+
95+
//Return RPC Results
96+
return $this->response;
97+
} catch(AMQPQueueException $ex) {
98+
print_r($ex);
99+
} catch(Exception $ex) {
100+
print_r($ex);
101+
}
102+
}
103+
}
104+
105+
$value = (isset($argv[1]))? $argv[1] : 5;
106+
$fibonacciRpc = new FibonacciRpcClient();
107+
echo sprintf(" [x] Requesting fib(%s)",$value), PHP_EOL;
108+
$response = $fibonacciRpc->call($value);
109+
echo sprintf(" [.] Received: %s",$response), PHP_EOL;

php-amqp/rpc_server.php

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
#!/usr/bin/env php
2+
<?php
3+
/**
4+
rpc_server.php
5+
@author: Chimdi Azubuike <[email protected]>
6+
*/
7+
8+
function fib($n) {
9+
if($n == 0)
10+
return 0;
11+
if($n == 1)
12+
return 1;
13+
return fib($n - 1) + fib($n - 2);
14+
}
15+
16+
function fast_fib($n) {
17+
if ($n < 0)
18+
throw new Exception('Negative number not implemented');
19+
else
20+
return fast_fib_calc($n)[0];
21+
}
22+
23+
24+
function fast_fib_calc($n) {
25+
if ($n == 0)
26+
return array(0, 1);
27+
else {
28+
list($a,$b) = fast_fib_calc(floor($n/2));
29+
$c = $a * ($b * 2 - $a);
30+
$d = $a * $a + $b * $b;
31+
if (($n % 2) == 0)
32+
return array($c, $d);
33+
else
34+
return array($d, $c + $d);
35+
}
36+
}
37+
38+
//Establish connection to AMQP
39+
$connection = new AMQPConnection();
40+
$connection->setHost('127.0.0.1');
41+
$connection->setLogin('guest');
42+
$connection->setPassword('guest');
43+
$connection->connect();
44+
45+
46+
//Declare Channel
47+
$channel = new AMQPChannel($connection);
48+
$channel->setPrefetchCount(1);
49+
50+
$exchange = new AMQPExchange($channel);
51+
52+
$queueName = 'rpc_queue';
53+
$queue = new AMQPQueue($channel);
54+
$queue->setName($queueName);
55+
$queue->declareQueue();
56+
57+
58+
59+
echo " [x] Awaiting RPC requests", PHP_EOL;
60+
$callback_func = function(AMQPEnvelope $message, AMQPQueue $q) use (&$exchange) {
61+
$n = intval($message->getBody());
62+
echo " [.] fib({$n})", PHP_EOL;
63+
64+
$attributes = array(
65+
'correlation_id' => $message->getCorrelationId()
66+
);
67+
68+
echo sprintf(" QueueName: %s", $q->getName()), PHP_EOL;
69+
echo sprintf(" ReplyTo: %s", $message->getReplyTo()), PHP_EOL;
70+
echo sprintf(" CorrelationID: %s", $message->getCorrelationId()), PHP_EOL;
71+
72+
$exchange->publish( (string)fast_fib($n),
73+
$message->getReplyTo(),
74+
AMQP_NOPARAM,
75+
$attributes
76+
);
77+
78+
$q->nack($message->getDeliveryTag());
79+
};
80+
81+
82+
try {
83+
$queue->consume($callback_func);
84+
} catch(AMQPQueueException $ex) {
85+
print_r($ex);
86+
} catch(Exception $ex) {
87+
print_r($ex);
88+
}

0 commit comments

Comments
 (0)