-
Notifications
You must be signed in to change notification settings - Fork 0
/
publisher.py
128 lines (97 loc) · 3.56 KB
/
publisher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import asyncio
import aiormq
import logging
import json
import inspect
log = logging.getLogger(__name__)
class BasicPublisher:
"""
Encapsulates the AMQP publishing mechanism
Use this class directly if you need a generic publisher or use any of the subclasses for more specific publishers
"""
PUBLISH_TIMEOUT = 5
MAX_LOG_LEN = 500
def __init__(self, ch):
self._ch = ch
# Default reply_to
self.reply_to = None
async def publish(self, message, exchange='', routing_key='', reply_to=None, persistent=False, corr_id=None, timeout=None, mandatory=False, headers={}):
"""
Publishes a message. Only only publishes with a timeout
:raises TimeoutError: if a timeout was defined
:raises TypeError: if the message is not JSON serializable
If you set mandatory=True and the message can't be delivered, aiormq.exceptions.PublishError will be raised
:param message: an object that will be encoded as JSON
:param timeout:
"""
msg_str = str(message)
if len(msg_str) > self.MAX_LOG_LEN:
msg_str = msg_str[:self.MAX_LOG_LEN-3]+"..."
if not headers:
log.info('Publish message: %s', msg_str)
else:
log.info('Publish message: %s (headers: %s)', msg_str, str(headers))
body = json.dumps(message).encode()
reply_to = reply_to or self.reply_to
delivery_mode = 1
if persistent:
delivery_mode = 2
timeout = timeout or self.PUBLISH_TIMEOUT
frame = inspect.currentframe()
args, _, _, locals = inspect.getargvalues(frame)
props = {arg : locals[arg] for arg in args[2:]}
log.debug('Message properties: %s', str(props))
try:
await self._ch.basic_publish(
body,
exchange = exchange,
routing_key = routing_key,
properties = aiormq.spec.Basic.Properties(
correlation_id = corr_id,
delivery_mode = delivery_mode,
reply_to = reply_to,
headers = headers
),
mandatory = mandatory,
timeout = timeout
)
except asyncio.TimeoutError:
log.warning("Timeout while publishing")
raise
class DirectPublisher(BasicPublisher):
"""
Publish directly to a queue using the default exchange
"""
QUEUE_DECLARE_TIMEOUT = 1
def __init__(self, ch, dest, reply_to=None):
"""
:param dest: name of the destination queue (str)
"""
self._ch = ch
self._dest = dest
self.reply_to = reply_to
# TODO: Do you use this? or instead use the resource class BasicQueue directly to avoid redundancies?
async def init_dest_queue(self, **kwargs):
"""
Initialize the destination queue (make sure that it exists)
:raises: TimeoutError
"""
# TODO: passive=true should only ensure that it exists (throw an error if not)
kwargs['queue'] = self._dest
if 'timeout' not in kwargs:
kwargs['timeout'] = self.QUEUE_DECLARE_TIMEOUT
result = await self._ch.queue_declare(**kwargs)
async def publish(self, message, **kwargs):
await super().publish(message, exchange='', routing_key=self._dest, **kwargs)
class ExchangePublisher(BasicPublisher):
# TODO: check whether exchange exists? Or assume that?
def __init__(self, ch, exchange: str, reply_to=None):
self._ch = ch
assert isinstance(exchange, str)
self._exchange = exchange
self.reply_to = reply_to
async def publish(self, message, **kwargs):
await super().publish(message, exchange=self._exchange, **kwargs)
class HeadersExchangePublisher(ExchangePublisher):
async def publish(self, message, headers={}, **kwargs):
await super().publish(message, headers=headers, **kwargs)