11from time import sleep
22
33from kubemq .queues import *
4+
5+
46def example_send_receive ():
57 client = Client (address = "localhost:50000" )
68 send_result = client .send_queues_message (
@@ -21,6 +23,7 @@ def example_send_receive():
2123 message .ack ()
2224 client .close ()
2325
26+
2427def example_send_receive_with_auto_ack ():
2528 client = Client (address = "localhost:50000" )
2629 send_result = client .send_queues_message (
@@ -32,10 +35,7 @@ def example_send_receive_with_auto_ack():
3235 print (f"Queue Message Sent: { send_result } " )
3336
3437 auto_ack_result = client .receive_queues_messages (
35- channel = "auto_ack" ,
36- max_messages = 1 ,
37- wait_timeout_in_seconds = 10 ,
38- auto_ack = True
38+ channel = "auto_ack" , max_messages = 1 , wait_timeout_in_seconds = 10 , auto_ack = True
3939 )
4040 for message in auto_ack_result .messages :
4141 print (f"Id:{ message .id } , Body:{ message .body .decode ('utf-8' )} " )
@@ -44,44 +44,51 @@ def example_send_receive_with_auto_ack():
4444
4545def example_send_receive_with_dlq ():
4646 client = Client (address = "localhost:50000" )
47- for i in range (10 ):
48- send_result = client .send_queues_message (
49- QueueMessage (
50- channel = "before_dlq" ,
51- body = f"Message { i + 1 } " .encode ('utf-8' ),
52- metadata = "some-metadata" ,
53- attempts_before_dead_letter_queue = 1 ,
54- dead_letter_queue = "dlq"
55- ))
56- print (f"Queue Message Sent: { send_result } " )
57-
58- dlq_result = client .receive_queues_messages (
59- channel = "before_dlq" ,
60- max_messages = 10 ,
61- wait_timeout_in_seconds = 10 ,
47+ send_result = client .send_queues_message (
48+ QueueMessage (
49+ channel = "python_process_queue" ,
50+ body = f"Message" .encode ("utf-8" ),
51+ metadata = "some-metadata" ,
52+ attempts_before_dead_letter_queue = 4 ,
53+ dead_letter_queue = "dlq_python_process_queue" ,
54+ )
6255 )
63- for message in dlq_result .messages :
64- print (f"Id:{ message .id } , Body:{ message .body .decode ('utf-8' )} " )
65- message .reject ()
56+ print (f"Queue Message Sent: { send_result } " )
6657
67- dlq_result = client .receive_queues_messages (
68- channel = "dlq" ,
69- max_messages = 10 ,
70- wait_timeout_in_seconds = 10 ,
58+ for i in range (2 ):
59+ receive_result = client .receive_queues_messages (
60+ channel = "python_process_queue" ,
61+ max_messages = 1 ,
62+ wait_timeout_in_seconds = 2 ,
63+ )
64+ if len (receive_result .messages ) == 0 :
65+ print ("No more messages" )
66+ break
67+ for message in receive_result .messages :
68+ print (
69+ f"Id:{ message .id } , Body:{ message .body .decode ('utf-8' )} , Receive Count:{ message .receive_count } "
70+ )
71+ message .reject ()
72+ receive_result = client .receive_queues_messages (
73+ channel = "python_process_queue" ,
74+ max_messages = 1 ,
75+ wait_timeout_in_seconds = 2 ,
7176 )
72- for message in dlq_result .messages :
73- print (f"Id:{ message .id } , Body:{ message .body .decode ('utf-8' )} " )
74- message .ack ()
77+ if len (receive_result .messages ) == 0 :
78+ print ("No more messages" )
79+ for message in receive_result .messages :
80+ print (
81+ f"Id:{ message .id } , Body:{ message .body .decode ('utf-8' )} , Receive Count:{ message .receive_count } "
82+ )
83+ message .re_queue ("python_process_re_queue" )
84+
7585 client .close ()
7686
87+
7788def example_send_receive_with_delay ():
7889 client = Client (address = "localhost:50000" )
7990 send_result = client .send_queues_message (
80- QueueMessage (
81- channel = "delay" ,
82- body = b"message with delay" ,
83- delay_in_seconds = 5
84- )
91+ QueueMessage (channel = "delay" , body = b"message with delay" , delay_in_seconds = 5 )
8592 )
8693 print (f"Queue Message Sent: { send_result } " )
8794
@@ -98,13 +105,14 @@ def example_send_receive_with_delay():
98105 message .ack ()
99106 client .close ()
100107
108+
101109def example_send_receive_with_expiration ():
102110 client = Client (address = "localhost:50000" )
103111 send_result = client .send_queues_message (
104112 QueueMessage (
105113 channel = "expiration" ,
106114 body = b"message with expiration" ,
107- expiration_in_seconds = 5
115+ expiration_in_seconds = 5 ,
108116 )
109117 )
110118 print (f"Queue Message Sent: { send_result } " )
@@ -117,6 +125,8 @@ def example_send_receive_with_expiration():
117125 )
118126 print (f"Received { len (expiration_result .messages )} messages" )
119127 client .close ()
128+
129+
120130def example_with_message_ack ():
121131 client = Client (address = "localhost:50000" )
122132 send_result = client .send_queues_message (
@@ -131,7 +141,7 @@ def example_with_message_ack():
131141 channel = "message_ack" ,
132142 max_messages = 1 ,
133143 wait_timeout_in_seconds = 10 ,
134- auto_ack = False
144+ auto_ack = False ,
135145 )
136146 for message in message_ack_result .messages :
137147 print (f"Id:{ message .id } , Body:{ message .body .decode ('utf-8' )} " )
@@ -153,7 +163,7 @@ def example_with_message_reject():
153163 channel = "message_reject" ,
154164 max_messages = 1 ,
155165 wait_timeout_in_seconds = 10 ,
156- auto_ack = False
166+ auto_ack = False ,
157167 )
158168 for message in message_reject_result .messages :
159169 print (f"Id:{ message .id } , Body:{ message .body .decode ('utf-8' )} " )
@@ -162,13 +172,14 @@ def example_with_message_reject():
162172 channel = "message_reject" ,
163173 max_messages = 1 ,
164174 wait_timeout_in_seconds = 10 ,
165- auto_ack = False
175+ auto_ack = False ,
166176 )
167177 for message in message_reject_result .messages :
168178 print (f"Id:{ message .id } , Body:{ message .body .decode ('utf-8' )} " )
169179 message .ack ()
170180 client .close ()
171181
182+
172183def example_with_message_requeue ():
173184 client = Client (address = "localhost:50000" )
174185 send_result = client .send_queues_message (
@@ -183,7 +194,7 @@ def example_with_message_requeue():
183194 channel = "message_requeue" ,
184195 max_messages = 1 ,
185196 wait_timeout_in_seconds = 10 ,
186- auto_ack = False
197+ auto_ack = False ,
187198 )
188199 for message in message_requeue_result .messages :
189200 print (f"Id:{ message .id } , Body:{ message .body .decode ('utf-8' )} " )
@@ -192,18 +203,20 @@ def example_with_message_requeue():
192203 channel = "requeue_channel" ,
193204 max_messages = 1 ,
194205 wait_timeout_in_seconds = 10 ,
195- auto_ack = False
206+ auto_ack = False ,
196207 )
197208 for message in message_requeue_result .messages :
198209 print (f"Id:{ message .id } , Body:{ message .body .decode ('utf-8' )} " )
199210 message .ack ()
211+
212+
200213def example_with_ack_all ():
201214 client = Client (address = "localhost:50000" )
202215 for i in range (10 ):
203216 send_result = client .send_queues_message (
204217 QueueMessage (
205218 channel = "ack_all" ,
206- body = f"Message { i + 1 } " .encode (' utf-8' ),
219+ body = f"Message { i + 1 } " .encode (" utf-8" ),
207220 metadata = "some-metadata" ,
208221 )
209222 )
@@ -225,7 +238,7 @@ def example_with_reject_all():
225238 send_result = client .send_queues_message (
226239 QueueMessage (
227240 channel = "reject_all" ,
228- body = f"Message { i + 1 } " .encode (' utf-8' ),
241+ body = f"Message { i + 1 } " .encode (" utf-8" ),
229242 )
230243 )
231244 print (f"Queue Message Sent: { send_result } " )
@@ -249,13 +262,14 @@ def example_with_reject_all():
249262 reject_all_result .ack_all ()
250263 client .close ()
251264
265+
252266def example_with_requeue_all ():
253267 client = Client (address = "localhost:50000" )
254268 for i in range (10 ):
255269 send_result = client .send_queues_message (
256270 QueueMessage (
257271 channel = "requeue_all" ,
258- body = f"Message { i + 1 } " .encode (' utf-8' ),
272+ body = f"Message { i + 1 } " .encode (" utf-8" ),
259273 )
260274 )
261275 print (f"Queue Message Sent: { send_result } " )
@@ -295,14 +309,15 @@ def example_with_visibility():
295309 max_messages = 1 ,
296310 wait_timeout_in_seconds = 10 ,
297311 auto_ack = False ,
298- visibility_seconds = 2
312+ visibility_seconds = 2 ,
299313 )
300314 for message in result .messages :
301315 print (f"Id:{ message .id } , Body:{ message .body .decode ('utf-8' )} " )
302316 time .sleep (1 )
303317 message .ack ()
304318 client .close ()
305319
320+
306321def example_with_visibility_expired ():
307322 client = Client (address = "localhost:50000" )
308323 send_result = client .send_queues_message (
@@ -318,7 +333,7 @@ def example_with_visibility_expired():
318333 max_messages = 1 ,
319334 wait_timeout_in_seconds = 10 ,
320335 auto_ack = False ,
321- visibility_seconds = 2
336+ visibility_seconds = 2 ,
322337 )
323338 for message in result .messages :
324339 try :
@@ -329,6 +344,7 @@ def example_with_visibility_expired():
329344 print (err )
330345 client .close ()
331346
347+
332348def example_with_visibility_extension ():
333349 client = Client (address = "localhost:50000" )
334350 send_result = client .send_queues_message (
@@ -344,7 +360,7 @@ def example_with_visibility_extension():
344360 max_messages = 1 ,
345361 wait_timeout_in_seconds = 10 ,
346362 auto_ack = False ,
347- visibility_seconds = 2
363+ visibility_seconds = 2 ,
348364 )
349365 for message in result .messages :
350366 try :
@@ -357,6 +373,7 @@ def example_with_visibility_extension():
357373 print (err )
358374 client .close ()
359375
376+
360377def example_with_wait_pull ():
361378 client = Client (address = "localhost:50000" )
362379 send_result = client .send_queues_message (
@@ -384,12 +401,13 @@ def example_with_wait_pull():
384401 print (f"Pull Id:{ message .id } , Body:{ message .body .decode ('utf-8' )} " )
385402 client .close ()
386403
404+
387405if __name__ == "__main__" :
388406 try :
389407
390- example_send_receive ()
408+ # example_send_receive()
391409 # example_send_receive_with_auto_ack()
392- # example_send_receive_with_dlq()
410+ example_send_receive_with_dlq ()
393411 # example_send_receive_with_delay()
394412 # example_send_receive_with_expiration()
395413 # example_with_message_ack()
0 commit comments