3
3
// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
4
4
5
5
using System ;
6
+ using System . Collections . Generic ;
6
7
using System . Linq ;
7
8
using System . Text ;
8
9
using System . Threading . Tasks ;
@@ -22,8 +23,8 @@ public async Task ValidateBuilderRaiseExceptionIfQueueOrExchangeAreNotSetCorrect
22
23
Assert . NotNull ( _management ) ;
23
24
24
25
await Assert . ThrowsAsync < InvalidAddressException > ( ( ) =>
25
- _connection . PublisherBuilder ( ) . Queue ( "queue_and_exchange_cant_set_together" ) .
26
- Exchange ( "queue_and_exchange_cant_set_together" ) . BuildAsync ( ) ) ;
26
+ _connection . PublisherBuilder ( ) . Queue ( "queue_and_exchange_cant_set_together" )
27
+ . Exchange ( "queue_and_exchange_cant_set_together" ) . BuildAsync ( ) ) ;
27
28
28
29
await _connection . CloseAsync ( ) ;
29
30
Assert . Empty ( _connection . Publishers ) ;
@@ -192,6 +193,7 @@ public async Task PublisherSendingShouldThrowWhenExchangeHasBeenDeleted()
192
193
publishOutcome = nextPublishResult . Outcome ;
193
194
break ;
194
195
}
196
+
195
197
await Task . Delay ( TimeSpan . FromMilliseconds ( 100 ) ) ;
196
198
}
197
199
@@ -243,6 +245,7 @@ public async Task PublisherSendingShouldThrowWhenQueueHasBeenDeleted()
243
245
publishOutcome = nextPublishResult . Outcome ;
244
246
break ;
245
247
}
248
+
246
249
await Task . Delay ( TimeSpan . FromMilliseconds ( 100 ) ) ;
247
250
}
248
251
@@ -256,4 +259,56 @@ public async Task PublisherSendingShouldThrowWhenQueueHasBeenDeleted()
256
259
await publisher . CloseAsync ( ) ;
257
260
publisher . Dispose ( ) ;
258
261
}
262
+
263
+ [ Theory ]
264
+ [ InlineData ( QueueType . QUORUM ) ]
265
+ [ InlineData ( QueueType . CLASSIC ) ]
266
+ public async Task MessageShouldBeDurableByDefault ( QueueType queueType )
267
+ {
268
+ Assert . NotNull ( _connection ) ;
269
+ Assert . NotNull ( _management ) ;
270
+
271
+ IQueueSpecification queueSpec = _management . Queue ( _queueName ) . Type ( queueType ) ;
272
+ await queueSpec . DeclareAsync ( ) ;
273
+
274
+ IPublisher publisher = await _connection . PublisherBuilder ( ) . Queue ( queueSpec ) . BuildAsync ( ) ;
275
+ List < IMessage > messages = new ( ) ;
276
+ TaskCompletionSource < List < IMessage > > tcs = new ( ) ;
277
+ IConsumer consumer = await _connection . ConsumerBuilder ( )
278
+ . Queue ( queueSpec )
279
+ . MessageHandler ( ( context , message ) =>
280
+ {
281
+ messages . Add ( message ) ;
282
+ context . Accept ( ) ;
283
+ if ( messages . Count == 2 )
284
+ {
285
+ tcs . SetResult ( messages ) ;
286
+ }
287
+
288
+ return Task . CompletedTask ;
289
+ } ) . BuildAndStartAsync ( ) ;
290
+
291
+ // the first message should be durable by default
292
+ AmqpMessage durable = new ( "Hello wold!" ) ;
293
+ PublishResult pr = await publisher . PublishAsync ( durable ) ;
294
+ Assert . Equal ( OutcomeState . Accepted , pr . Outcome . State ) ;
295
+ Assert . True ( durable . Durable ( ) ) ;
296
+
297
+ // the second message should be not durable set by the user
298
+
299
+ AmqpMessage notDurable = new ( "Hello wold!" ) ;
300
+ notDurable . Durable ( false ) ;
301
+ PublishResult pr2 = await publisher . PublishAsync ( notDurable ) ;
302
+ Assert . Equal ( OutcomeState . Accepted , pr2 . Outcome . State ) ;
303
+ Assert . False ( notDurable . Durable ( ) ) ;
304
+ var r = await tcs . Task . WaitAsync ( TimeSpan . FromSeconds ( 10 ) ) ;
305
+ Assert . True ( r [ 0 ] . Durable ( ) ) ;
306
+ Assert . False ( r [ 1 ] . Durable ( ) ) ;
307
+
308
+ await consumer . CloseAsync ( ) ;
309
+ await publisher . CloseAsync ( ) ;
310
+ await queueSpec . DeleteAsync ( ) ;
311
+
312
+ Assert . Empty ( _connection . Publishers ) ;
313
+ }
259
314
}
0 commit comments