Skip to content

Commit 769883d

Browse files
committed
Add RPC tutorial
1 parent 85f7e55 commit 769883d

File tree

5 files changed

+123
-2
lines changed

5 files changed

+123
-2
lines changed

erlang/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,8 @@ You need Erlang Client binaries:
4747

4848
rebar3 shell --eval 'receive_logs_topic:start(["*.rabbit"]), init:stop().'
4949
rebar3 shell --eval 'emit_log_topic:start(["red.rabbit", "Hello"]), init:stop().'
50+
51+
[Tutorial Six: RPC](https://www.rabbitmq.com/tutorials/tutorial-six-python.html):
52+
53+
rebar3 shell --eval 'rpc_server:start(), init:stop().'
54+
rebar3 shell --eval 'rpc_client:start(["10"]), init:stop().'

erlang/rebar.config

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
{erl_opts, [debug_info]}.
22

33
{deps, [
4-
{amqp_client, "3.12.2"}
4+
{amqp_client, "3.12.2"},
5+
{uuid, {git, "https://github.com/okeuday/uuid.git", {tag, "v2.0.7"}}}
56
]}.
67

78
{shell, [

erlang/rebar.lock

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,17 @@
33
{<<"credentials_obfuscation">>,
44
{pkg,<<"credentials_obfuscation">>,<<"3.4.0">>},
55
1},
6+
{<<"quickrand">>,
7+
{git,"https://github.com/okeuday/quickrand.git",
8+
{ref,"65332de501998764f437c3ffe05d744f582d7622"}},
9+
1},
610
{<<"rabbit_common">>,{pkg,<<"rabbit_common">>,<<"3.12.2">>},1},
711
{<<"recon">>,{pkg,<<"recon">>,<<"2.5.3">>},2},
8-
{<<"thoas">>,{pkg,<<"thoas">>,<<"1.0.0">>},2}]}.
12+
{<<"thoas">>,{pkg,<<"thoas">>,<<"1.0.0">>},2},
13+
{<<"uuid">>,
14+
{git,"https://github.com/okeuday/uuid.git",
15+
{ref,"7c2d1320c8e61e0fe25a66ecf4761e4b5b5803d6"}},
16+
0}]}.
917
[
1018
{pkg_hash,[
1119
{<<"amqp_client">>, <<"19770F1075FE697BEA8AA77E29DF38BD8A439F8D9F1D8A8FCCB56AE0C7AF73CD">>},

erlang/src/rpc_client.erl

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
-module(rpc_client).
2+
-export([start/1]).
3+
4+
-include_lib("amqp_client/include/amqp_client.hrl").
5+
6+
start(Argv) ->
7+
Num = case Argv of
8+
[] -> 10;
9+
[Arg] -> list_to_integer(Arg)
10+
end,
11+
io:format(" [x] Requesting fib(~p)~n", [Num]),
12+
Response = call(Num),
13+
io:format(" [.] Got ~p~n", [Response]),
14+
ok.
15+
16+
call(Num) ->
17+
{ok, Connection} =
18+
amqp_connection:start(#amqp_params_network{host = "localhost"}),
19+
{ok, Channel} = amqp_connection:open_channel(Connection),
20+
RequestQueue = <<"rpc_queue">>,
21+
CorrelationId = uuid:get_v4(),
22+
23+
amqp_channel:call(Channel, #'queue.declare'{queue = RequestQueue}),
24+
25+
#'queue.declare_ok'{queue = ReplyQueue} =
26+
amqp_channel:call(Channel, #'queue.declare'{exclusive = true}),
27+
28+
Method = #'basic.consume'{queue = ReplyQueue, no_ack = true},
29+
amqp_channel:subscribe(Channel, Method, self()),
30+
31+
amqp_channel:cast(Channel,
32+
#'basic.publish'{
33+
exchange = <<>>,
34+
routing_key = RequestQueue},
35+
#amqp_msg{
36+
payload = integer_to_binary(Num),
37+
props = #'P_basic'{
38+
reply_to = ReplyQueue,
39+
correlation_id = CorrelationId}
40+
}),
41+
42+
Response = wait_for_messages(CorrelationId),
43+
44+
amqp_channel:close(Channel),
45+
amqp_connection:close(Connection),
46+
47+
Response.
48+
49+
wait_for_messages(CorrelationId) ->
50+
receive
51+
{#'basic.deliver'{}, #amqp_msg{payload = Body, props = Props}} ->
52+
#'P_basic'{correlation_id = CorrId} = Props,
53+
if CorrelationId == CorrId ->
54+
binary_to_integer(Body);
55+
true ->
56+
-1
57+
end
58+
end.

erlang/src/rpc_server.erl

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
-module(rpc_server).
2+
-export([start/0]).
3+
4+
-include_lib("amqp_client/include/amqp_client.hrl").
5+
6+
start() ->
7+
{ok, Connection} =
8+
amqp_connection:start(#amqp_params_network{host = "localhost", heartbeat = 30}),
9+
{ok, Channel} = amqp_connection:open_channel(Connection),
10+
11+
amqp_channel:call(Channel, #'queue.declare'{queue = <<"rpc_queue">>}),
12+
io:format(" [*] Waiting for messages. To exit press CTRL+C~n"),
13+
14+
amqp_channel:call(Channel, #'basic.qos'{prefetch_count = 1}),
15+
16+
Method = #'basic.consume'{queue = <<"rpc_queue">>},
17+
amqp_channel:subscribe(Channel, Method, self()),
18+
loop(Channel).
19+
20+
loop(Channel) ->
21+
receive
22+
{#'basic.deliver'{delivery_tag = DeliveryTag}, #amqp_msg{payload = Body, props = Props}} ->
23+
#'P_basic'{reply_to = ReplyTo, correlation_id = CorrelationId} = Props,
24+
Num = binary_to_integer(Body),
25+
io:format(" [.] fib(~p)~n", [Num]),
26+
Response = fib(Num),
27+
28+
amqp_channel:cast(Channel,
29+
#'basic.publish'{
30+
exchange = <<>>,
31+
routing_key = ReplyTo},
32+
#amqp_msg{
33+
payload = integer_to_binary(Response),
34+
props = #'P_basic'{
35+
correlation_id = CorrelationId}
36+
}),
37+
38+
amqp_channel:cast(Channel,
39+
#'basic.ack'{
40+
delivery_tag = DeliveryTag
41+
}),
42+
43+
loop(Channel)
44+
end.
45+
46+
fib(0) -> 0;
47+
fib(1) -> 1;
48+
fib(N) when N > 1 -> fib(N-1) + fib(N-2);
49+
fib(N) when N =< 0 -> -1.

0 commit comments

Comments
 (0)