Skip to content

Commit 866a7f5

Browse files
committed
Increase timeouts in ZmqTransport tests to reduce flakiness
1 parent 4d89c85 commit 866a7f5

File tree

1 file changed

+31
-31
lines changed

1 file changed

+31
-31
lines changed

src/Abc.Zebus.Tests/Transport/ZmqTransportTests.cs

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public void should_not_filter_received_messages_when_environment_is_not_specifie
7272
var message = new FakeCommand(1).ToTransportMessage();
7373
transport1.Send(message, new[] { transport2Peer });
7474

75-
Wait.Until(() => transport2ReceivedMessages.Count >= 1, 2.Seconds());
75+
Wait.Until(() => transport2ReceivedMessages.Count >= 1, 30.Seconds());
7676
transport2ReceivedMessages.Single().Id.ShouldEqual(message.Id);
7777
}
7878

@@ -90,7 +90,7 @@ public void should_not_let_the_outbound_thread_die_if_a_peer_cannot_be_resolved(
9090
senderTransport.Send(message, new[] { nonExistingPeer });
9191
senderTransport.Send(message, new[] { destinationPeer });
9292

93-
Wait.Until(() => receivedMessages.Count >= 1, 2.Seconds(), "The outbound thread was killed and couldn't connect to the next peer");
93+
Wait.Until(() => receivedMessages.Count >= 1, 30.Seconds(), "The outbound thread was killed and couldn't connect to the next peer");
9494
}
9595

9696
[Test]
@@ -111,7 +111,7 @@ public void should_not_dispatch_messages_received_from_wrong_environment()
111111
transport2.Configure(transport2Peer.Id, _environment);
112112
transport1.Send(message2, new[] { transport2Peer }); //should arrive
113113

114-
Wait.Until(() => transport2ReceivedMessages.Count >= 1, 2.Seconds());
114+
Wait.Until(() => transport2ReceivedMessages.Count >= 1, 30.Seconds());
115115
transport2ReceivedMessages.Single().Id.ShouldEqual(message2.Id);
116116
}
117117

@@ -129,7 +129,7 @@ public void should_send_messages()
129129
var message1 = new FakeCommand(1).ToTransportMessage();
130130
transport1.Send(message1, new[] { transport2Peer });
131131

132-
Wait.Until(() => transport2ReceivedMessages.Count == 1, 2.Seconds());
132+
Wait.Until(() => transport2ReceivedMessages.Count == 1, 30.Seconds());
133133
var transport2ReceivedMessage = transport2ReceivedMessages.ExpectedSingle();
134134
transport2ReceivedMessage.ShouldHaveSamePropertiesAs(message1, "Environment", "WasPersisted");
135135
transport2ReceivedMessage.Environment.ShouldEqual("Test");
@@ -138,7 +138,7 @@ public void should_send_messages()
138138
var message2 = new FakeCommand(2).ToTransportMessage();
139139
transport2.Send(message2, new[] { transport1Peer });
140140

141-
Wait.Until(() => transport1ReceivedMessages.Count == 1, 2.Seconds());
141+
Wait.Until(() => transport1ReceivedMessages.Count == 1, 30.Seconds());
142142
var transport1ReceivedMessage = transport1ReceivedMessages.ExpectedSingle();
143143
transport1ReceivedMessage.ShouldHaveSamePropertiesAs(message2, "Environment", "WasPersisted");
144144
transport1ReceivedMessage.Environment.ShouldEqual("Test");
@@ -164,13 +164,13 @@ public void should_send_message_to_peer_and_persistence()
164164
var message = new FakeCommand(999).ToTransportMessage();
165165
senderTransport.Send(message, new[] { receiverPeer }, new SendContext { PersistentPeerIds = { receiverPeer.Id }, PersistencePeer = persistencePeer });
166166

167-
Wait.Until(() => receiverMessages.Count == 1, 2.Seconds());
167+
Wait.Until(() => receiverMessages.Count == 1, 30.Seconds());
168168
var messageFromReceiver = receiverMessages.ExpectedSingle();
169169
messageFromReceiver.ShouldHaveSamePropertiesAs(message, "Environment", "WasPersisted");
170170
messageFromReceiver.Environment.ShouldEqual("Test");
171171
messageFromReceiver.WasPersisted.ShouldEqual(true);
172172

173-
Wait.Until(() => persistenceMessages.Count == 1, 2.Seconds());
173+
Wait.Until(() => persistenceMessages.Count == 1, 30.Seconds());
174174
var messageFromPersistence = persistenceMessages.ExpectedSingle();
175175
messageFromPersistence.ShouldHaveSamePropertiesAs(message, "Environment", "WasPersisted", "PersistentPeerIds", "IsPersistTransportMessage");
176176
messageFromPersistence.Environment.ShouldEqual("Test");
@@ -194,7 +194,7 @@ public void should_send_message_to_persistence()
194194
var message = new FakeCommand(999).ToTransportMessage();
195195
senderTransport.Send(message, Enumerable.Empty<Peer>(), new SendContext { PersistentPeerIds = { receiverPeerId }, PersistencePeer = persistencePeer });
196196

197-
Wait.Until(() => persistenceMessages.Count == 1, 2.Seconds());
197+
Wait.Until(() => persistenceMessages.Count == 1, 30.Seconds());
198198
var messageFromPersistence = persistenceMessages.ExpectedSingle();
199199
messageFromPersistence.ShouldHaveSamePropertiesAs(message, "Environment", "WasPersisted", "PersistentPeerIds", "IsPersistTransportMessage");
200200
messageFromPersistence.Environment.ShouldEqual("Test");
@@ -217,7 +217,7 @@ public void should_send_persist_transport_message_to_persistence()
217217
var message = new FakeCommand(999).ToTransportMessage().ToPersistTransportMessage(receiverPeerId);
218218
senderTransport.Send(message, new[] { persistencePeer });
219219

220-
Wait.Until(() => persistenceMessages.Count == 1, 2.Seconds());
220+
Wait.Until(() => persistenceMessages.Count == 1, 30.Seconds());
221221
var messageFromPersistence = persistenceMessages.ExpectedSingle();
222222
messageFromPersistence.ShouldHaveSamePropertiesAs(message, "Environment", "WasPersisted");
223223
messageFromPersistence.Environment.ShouldEqual("Test");
@@ -238,7 +238,7 @@ public void should_write_WasPersisted_when_requested()
238238
sender.Send(message, new[] { receivingPeer }, new SendContext { PersistentPeerIds = { receivingPeer.Id } });
239239
sender.Send(otherMessage, new[] { receivingPeer }, new SendContext());
240240

241-
Wait.Until(() => receivedMessages.Count >= 2, 2.Seconds());
241+
Wait.Until(() => receivedMessages.Count >= 2, 30.Seconds());
242242
receivedMessages.Single(x => x.Id == message.Id).WasPersisted.ShouldEqual(true);
243243
receivedMessages.Single(x => x.Id == otherMessage.Id).WasPersisted.ShouldEqual(false);
244244
}
@@ -259,7 +259,7 @@ public void should_send_message_to_both_persisted_and_non_persisted_peers()
259259

260260
sender.Send(message, new[] { receivingPeer1, receivingPeer2 }, new SendContext { PersistentPeerIds = { receivingPeer1.Id } });
261261

262-
Wait.Until(() => receivedMessages.Count >= 2, 2.Seconds());
262+
Wait.Until(() => receivedMessages.Count >= 2, 30.Seconds());
263263
receivedMessages.ShouldContain(x => x.Id == message.Id && x.WasPersisted == true);
264264
receivedMessages.ShouldContain(x => x.Id == message.Id && x.WasPersisted == false);
265265
}
@@ -275,13 +275,13 @@ public void should_support_peer_endpoint_modifications()
275275
var receiver = receiverTransport1.GetPeer();
276276

277277
senderTransport.Send(new FakeCommand(0).ToTransportMessage(), new[] { receiver });
278-
Wait.Until(() => receivedMessages.Count == 1, 2.Seconds());
278+
Wait.Until(() => receivedMessages.Count == 1, 30.Seconds());
279279

280280
receiverTransport1.Stop();
281281
receiver.EndPoint = receiverTransport2.InboundEndPoint;
282282

283283
senderTransport.Send(new FakeCommand(0).ToTransportMessage(), new[] { receiver });
284-
Wait.Until(() => receivedMessages.Count == 2, 2.Seconds(), "unable to receive message");
284+
Wait.Until(() => receivedMessages.Count == 2, 30.Seconds(), "unable to receive message");
285285
}
286286

287287
[Test]
@@ -301,7 +301,7 @@ public void should_terminate_zmq_connection_of_a_forgotten_peer_after_some_time(
301301

302302
var message = new FakeCommand(1).ToTransportMessage();
303303
senderTransport.Send(message, new[] { receiverPeer });
304-
Wait.Until(() => senderTransport.OutboundSocketCount == 1, 2.Seconds());
304+
Wait.Until(() => senderTransport.OutboundSocketCount == 1, 30.Seconds());
305305

306306
senderTransport.OnPeerUpdated(receiverPeer.Id, PeerUpdateAction.Decommissioned);
307307

@@ -311,7 +311,7 @@ public void should_terminate_zmq_connection_of_a_forgotten_peer_after_some_time(
311311

312312
using (SystemDateTime.PauseTime(SystemDateTime.UtcNow.Add(30.Seconds())))
313313
{
314-
Wait.Until(() => senderTransport.OutboundSocketCount == 0, 1.Seconds(), "Socket should be disconnected");
314+
Wait.Until(() => senderTransport.OutboundSocketCount == 0, 30.Seconds(), "Socket should be disconnected");
315315
}
316316
}
317317

@@ -324,11 +324,11 @@ public void should_terminate_zmq_connection_of_a_started_peer_with_no_delay()
324324

325325
var message = new FakeCommand(1).ToTransportMessage();
326326
senderTransport.Send(message, new[] { receiverPeer });
327-
Wait.Until(() => senderTransport.OutboundSocketCount == 1, 2.Seconds());
327+
Wait.Until(() => senderTransport.OutboundSocketCount == 1, 30.Seconds());
328328

329329
senderTransport.OnPeerUpdated(receiverPeer.Id, PeerUpdateAction.Started);
330330

331-
Wait.Until(() => senderTransport.OutboundSocketCount == 0, 2.Seconds(), "Socket should be disconnected");
331+
Wait.Until(() => senderTransport.OutboundSocketCount == 0, 30.Seconds(), "Socket should be disconnected");
332332
}
333333

334334
[Test]
@@ -346,7 +346,7 @@ public void should_receive_many_messages()
346346
senderTransport.Send(message, new[] { receiver });
347347
}
348348

349-
Wait.Until(() => receivedMessages.Count == 10, 1.Second());
349+
Wait.Until(() => receivedMessages.Count == 10, 30.Seconds());
350350

351351
for (var i = 0; i < 10; ++i)
352352
{
@@ -375,7 +375,7 @@ public void should_send_message_to_multiple_peers(int peerCount)
375375
var message = new FakeCommand(999).ToTransportMessage();
376376
senderTransport.Send(message, receiverTransports.Select(x => x.GetPeer()));
377377

378-
Wait.Until(() => Volatile.Read(ref receivedMessagesCount) == peerCount, 30.Second());
378+
Wait.Until(() => Volatile.Read(ref receivedMessagesCount) == peerCount, 30.Seconds());
379379
}
380380

381381
[Test]
@@ -399,7 +399,7 @@ public void should_not_support_more_than_maximum_sockets()
399399
var message = new FakeCommand(999).ToTransportMessage();
400400
senderTransport.Send(message, receiverTransports.Select(x => x.GetPeer()));
401401

402-
Wait.Until(() => receivedMessages.Count == maximumSocketCount - 1, 10.Seconds());
402+
Wait.Until(() => receivedMessages.Count == maximumSocketCount - 1, 30.Seconds());
403403

404404
Thread.Sleep(1.Second());
405405

@@ -431,7 +431,7 @@ public void should_not_block_when_hitting_high_water_mark()
431431
senderTransport.Send(message, new[] { upReceiver, downReceiver });
432432

433433
var expectedMessageCount = i;
434-
Wait.Until(() => receivedMessages.Count == expectedMessageCount, 2.Seconds(), "Failed to send message after " + i + " successful sent");
434+
Wait.Until(() => receivedMessages.Count == expectedMessageCount, 30.Seconds(), "Failed to send message after " + i + " successful sent");
435435
}
436436
}
437437

@@ -467,7 +467,7 @@ public void should_not_wait_blocked_peers_on_every_send()
467467
}
468468

469469
var receiverStopwatch = Stopwatch.StartNew();
470-
Wait.Until(() => receivedMessages.Count == 10, 10.Seconds(), "Timed out while waiting for messages");
470+
Wait.Until(() => receivedMessages.Count == 10, 30.Seconds(), "Timed out while waiting for messages");
471471
receiverStopwatch.Stop();
472472
Console.WriteLine("Elapsed time to get messages: " + receiverStopwatch.Elapsed);
473473
receiverStopwatch.ElapsedMilliseconds.ShouldBeLessOrEqualThan(1000, "Throughput is too low");
@@ -488,7 +488,7 @@ public void should_not_wait_for_unknown_peer_on_every_send()
488488
senderTransport.Send(message, new[] { invalidPeer, receiver });
489489
}
490490

491-
Wait.Until(() => receivedMessageCount == 1000, 5.Seconds());
491+
Wait.Until(() => receivedMessageCount == 1000, 30.Seconds());
492492
}
493493

494494
[Test]
@@ -509,14 +509,14 @@ public void should_send_various_sized_messages()
509509
var bigMessage = new TransportMessage(new MessageTypeId(typeof(FakeCommand)), messageBytes, new PeerId("X"), senderTransport.InboundEndPoint);
510510
senderTransport.Send(bigMessage, new[] { receiver });
511511

512-
Wait.Until(() => receivedMessages.Count == 1, 2.Seconds());
512+
Wait.Until(() => receivedMessages.Count == 1, 30.Seconds());
513513

514514
receivedMessages[0].ShouldHaveSamePropertiesAs(bigMessage, "Environment", "WasPersisted");
515515

516516
var smallMessage = new TransportMessage(new MessageTypeId(typeof(FakeCommand)), new byte[1], new PeerId("X"), senderTransport.InboundEndPoint);
517517
senderTransport.Send(smallMessage, new[] { receiver });
518518

519-
Wait.Until(() => receivedMessages.Count == 2, 2.Seconds());
519+
Wait.Until(() => receivedMessages.Count == 2, 30.Seconds());
520520

521521
receivedMessages[1].ShouldHaveSamePropertiesAs(smallMessage, "Environment", "WasPersisted");
522522
}
@@ -530,7 +530,7 @@ public void should_send_message_to_self()
530530

531531
transport.Send(new FakeCommand(1).ToTransportMessage(), new[] { self });
532532

533-
Wait.Until(() => receivedMessages.Count == 1, 2.Seconds());
533+
Wait.Until(() => receivedMessages.Count == 1, 30.Seconds());
534534
}
535535

536536
[Test]
@@ -560,7 +560,7 @@ public void should_not_forward_messages_to_upper_layer_when_stopping()
560560
Console.WriteLine($"{sendCount} messages sent");
561561
});
562562

563-
Wait.Until(() => receivedMessages.Count > 1, 10.Seconds());
563+
Wait.Until(() => receivedMessages.Count > 1, 30.Seconds());
564564
Console.WriteLine("Message received");
565565

566566
receivingTransport.Stop();
@@ -601,7 +601,7 @@ public void should_process_all_messages_in_buffer_on_stop()
601601
});
602602

603603
senderTask.Start();
604-
Wait.Until(() => receivedMessageCount != 0, 2.Seconds());
604+
Wait.Until(() => receivedMessageCount != 0, 30.Seconds());
605605

606606
Log($"Stopping the sender");
607607
shouldSend[0] = false;
@@ -631,12 +631,12 @@ public void should_disconnect_peer_socket_of_a_stopped_peer_after_some_time()
631631

632632
transport1.Send(new FakeCommand(0).ToTransportMessage(), new[] { peer2 });
633633
transport2.Send(new FakeCommand(0).ToTransportMessage(), new[] { peer1 });
634-
Wait.Until(() => transport1.OutboundSocketCount == 1, 10.Seconds());
635-
Wait.Until(() => transport2.OutboundSocketCount == 1, 10.Seconds());
634+
Wait.Until(() => transport1.OutboundSocketCount == 1, 30.Seconds());
635+
Wait.Until(() => transport2.OutboundSocketCount == 1, 30.Seconds());
636636

637637
transport2.Stop();
638638

639-
Wait.Until(() => transport1.OutboundSocketCount == 0, 10.Seconds());
639+
Wait.Until(() => transport1.OutboundSocketCount == 0, 30.Seconds());
640640
}
641641

642642
private ZmqTransport CreateZmqTransport(string endPoint = "tcp://*:*", Action<TransportMessage> onMessageReceived = null, string peerId = null, string environment = _environment, ZmqSocketOptions socketOptions = null)

0 commit comments

Comments
 (0)