@@ -29,10 +29,13 @@ def __init__(
29
29
exchange_params = None ,
30
30
* ,
31
31
default_send_properties = None ,
32
+ mq_url = None ,
33
+ mq_exchange = None ,
34
+ logger = None
32
35
):
33
- self .mq_url = None
34
- self .mq_exchange = None
35
- self .logger = None
36
+ self .mq_url = mq_url
37
+ self .mq_exchange = mq_exchange
38
+ self .logger = logger
36
39
self .body_parser = body_parser
37
40
self .msg_parser = msg_parser
38
41
self .exchange_params = exchange_params or ExchangeParams ()
@@ -106,15 +109,17 @@ def send(
106
109
retries : int = 5 ,
107
110
message_version : str = "v1.0.0" ,
108
111
debug_exchange : bool = False ,
112
+ exchange_name : str = None ,
109
113
** properties ,
110
114
):
111
115
filterwarnings (action = "ignore" , message = "unclosed" , category = ResourceWarning )
112
- exchange_name = (
113
- f"{ self .mq_exchange } -development" if self .development else self .mq_exchange
116
+ exchange_name = self .mq_exchange if exchange_name is None else exchange_name
117
+ exchange = (
118
+ f"{ exchange_name } -development" if self .development else exchange_name
114
119
)
115
120
self ._validate_channel_connection ()
116
121
self .channel .exchange .declare (
117
- exchange = f"{ exchange_name } -debug" if debug_exchange else exchange_name ,
122
+ exchange = f"{ exchange } -debug" if debug_exchange else exchange ,
118
123
exchange_type = exchange_type ,
119
124
passive = self .exchange_params .passive ,
120
125
durable = self .exchange_params .durable ,
@@ -123,7 +128,7 @@ def send(
123
128
124
129
retry_call (
125
130
self ._publish_to_channel ,
126
- (body , routing_key , message_version , debug_exchange ),
131
+ (body , routing_key , message_version , debug_exchange , exchange_name ),
127
132
properties ,
128
133
exceptions = (AMQPConnectionError , AssertionError ),
129
134
tries = retries ,
@@ -137,6 +142,7 @@ def _publish_to_channel(
137
142
routing_key : str ,
138
143
message_version : str ,
139
144
debug_exchange : bool = False ,
145
+ exchange_name : str = None ,
140
146
** properties ,
141
147
):
142
148
encoded_body = json .dumps (body , cls = self .json_encoder ).encode ("utf-8" )
@@ -149,9 +155,6 @@ def _publish_to_channel(
149
155
properties ["headers" ] = {}
150
156
properties ["headers" ]["x-message-version" ] = message_version
151
157
filterwarnings (action = "ignore" , message = "unclosed" , category = ResourceWarning )
152
- exchange_name = (
153
- f"{ self .mq_exchange } -development" if self .development else self .mq_exchange
154
- )
155
158
156
159
self ._validate_channel_connection ()
157
160
self .channel .basic .publish (
0 commit comments