|
14 | 14 | -compile(export_all).
|
15 | 15 |
|
16 | 16 | -import(rabbit_federation_test_util,
|
17 |
| - [wait_for_federation/2, expect/3, expect/4, |
| 17 | + [wait_for_federation/2, expect/3, expect/4, expect/2, |
18 | 18 | set_upstream/4, set_upstream/5, clear_upstream/3, set_upstream_set/4, clear_upstream_set/3,
|
19 | 19 | set_policy/5, clear_policy/3,
|
20 | 20 | set_policy_pattern/5, set_policy_upstream/5, q/2, with_ch/3,
|
21 | 21 | maybe_declare_queue/3, delete_queue/2,
|
22 | 22 | federation_links_in_vhost/3]).
|
23 | 23 |
|
24 | 24 | -define(INITIAL_WAIT, 6000).
|
25 |
| --define(EXPECT_FEDERATION_TIMEOUT, 30000). |
| 25 | +-define(EXPECT_FEDERATION_TIMEOUT, 60000). |
26 | 26 |
|
27 | 27 | all() ->
|
28 | 28 | [
|
@@ -368,7 +368,20 @@ publish_expect(Config, Ch, X, Key, Q, Payload, Timeout) ->
|
368 | 368 | Status = rabbit_ct_broker_helpers:rpc(Config, 0,
|
369 | 369 | rabbit_federation_status, status, []),
|
370 | 370 | ct:pal("Federation status ~p", [Status]),
|
371 |
| - expect(Ch, Q, [Payload], Timeout). |
| 371 | + expect0(Config, Ch, Q, [Payload], Timeout). |
| 372 | + |
| 373 | +expect0(Config, Ch, Q, Fun) when is_function(Fun) -> |
| 374 | + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, |
| 375 | + no_ack = true}, self()), |
| 376 | + CTag = receive |
| 377 | + #'basic.consume_ok'{consumer_tag = CT} -> CT |
| 378 | + end, |
| 379 | + ct:pal("After subscribe, messages and consumers in broker ~p ", [rabbit_ct_broker_helpers:rabbitmqctl_list(Config, 0, ["list_queues", "name", "messages", "messages_ready", "consumers"])]), |
| 380 | + Fun(), |
| 381 | + amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}). |
| 382 | + |
| 383 | +expect0(Config, Ch, Q, Payloads, Timeout) -> |
| 384 | + expect0(Config, Ch, Q, fun() -> expect(Payloads, Timeout) end). |
372 | 385 |
|
373 | 386 | %% Doubled due to our strange basic.get behaviour.
|
374 | 387 | expect_empty(Ch, Q) ->
|
|
0 commit comments