@@ -179,26 +179,37 @@ async def consume_unban_messages(connection, channel, queue, api_clients):
179
179
await message .ack ()
180
180
except json .JSONDecodeError :
181
181
logging .error ("Fehler beim Parsen der JSON-Daten" )
182
- await message .nack (requeue ( True ) )
182
+ await message .nack (requeue = True )
183
183
except Exception as e :
184
184
logging .error (f"Unerwarteter Fehler beim Verarbeiten der Nachricht: { e } " )
185
- await message .nack (requeue ( True ) )
185
+ await message .nack (requeue = True )
186
186
187
187
async def connect_to_tempban_rabbitmq (client_id ):
188
- logging .info (f"Versuche, eine Verbindung zu RabbitMQ für Tempban-Nachrichten herzustellen für Client { client_id } ..." )
189
- tempban_connection = await aio_pika .connect_robust (
190
- f"amqp://{ RABBITMQ_USER } :{ RABBITMQ_PASS } @{ RABBITMQ_HOST } :{ RABBITMQ_PORT } /" ,
191
- loop = asyncio .get_running_loop (),
192
- heartbeat = 600 ,
193
- client_properties = {'connection_name' : f'tempban_connection_{ client_id } ' }
194
- )
195
- tempban_channel = await tempban_connection .channel ()
196
- exchange_name = f'tempbans_fanout_{ client_id } '
197
- tempban_exchange = await tempban_channel .declare_exchange (exchange_name , ExchangeType .FANOUT , durable = True )
198
- tempban_queue = await tempban_channel .declare_queue (f'tempbans_queue_{ client_id } ' , durable = True )
199
- await tempban_queue .bind (tempban_exchange , routing_key = '' )
200
- logging .info (f"RabbitMQ Queue tempbans_queue_{ client_id } deklariert und gebunden." )
201
- return tempban_connection , tempban_channel , tempban_queue
188
+ try :
189
+ logging .info (f"Versuche, eine Verbindung zu RabbitMQ für Tempban-Nachrichten herzustellen für Client { client_id } ..." )
190
+
191
+ tempban_connection = await aio_pika .connect_robust (
192
+ f"amqp://{ RABBITMQ_USER } :{ RABBITMQ_PASS } @{ RABBITMQ_HOST } :{ RABBITMQ_PORT } /" ,
193
+ loop = asyncio .get_running_loop (),
194
+ heartbeat = 600 ,
195
+ client_properties = {'connection_name' : f'tempban_connection_{ client_id } ' }
196
+ )
197
+
198
+ tempban_channel = await tempban_connection .channel ()
199
+ exchange_name = f'tempbans_fanout_{ client_id } '
200
+
201
+ tempban_exchange = await tempban_channel .declare_exchange (exchange_name , ExchangeType .FANOUT , durable = True )
202
+ queue_name = f'tempbans_queue_{ client_id } '
203
+ tempban_queue = await tempban_channel .declare_queue (queue_name , durable = True )
204
+
205
+ await tempban_queue .bind (tempban_exchange , routing_key = '' )
206
+ logging .info (f"RabbitMQ Queue { queue_name } deklariert und gebunden." )
207
+
208
+ return tempban_connection , tempban_channel , tempban_queue
209
+
210
+ except Exception as e :
211
+ logging .error (f"Fehler beim Verbinden mit RabbitMQ für Tempban-Nachrichten: { e } " )
212
+ raise # Optional: Weiterleiten des Fehlers oder Versuch eines erneuten Verbindungsaufbaus
202
213
203
214
async def consume_tempban_messages (connection , channel , queue , api_clients ):
204
215
logging .info ("Beginne mit dem Empfang von Tempban-Nachrichten..." )
@@ -210,15 +221,16 @@ async def consume_tempban_messages(connection, channel, queue, api_clients):
210
221
ban_data = json .loads (message .body .decode ())
211
222
logging .info (f"Empfangene Tempban-Daten: { ban_data } " )
212
223
213
- player_name = ban_data .get ('player_name' , ban_data .get ('player' ))
214
- player_id = ban_data .get ('player_id' , ban_data .get ('steam_id_64' ))
224
+ # Dynamische Zuordnung von player_id und player_name basierend auf der API-Version
225
+ player_name = ban_data .get ('player_name' ) or ban_data .get ('player' )
226
+ player_id = ban_data .get ('player_id' ) or ban_data .get ('steam_id_64' )
215
227
216
228
if not player_name :
217
229
logging .error (f"Fehlendes 'player_name' in den Tempban-Daten: { ban_data } " )
218
230
if not player_id :
219
231
logging .error (f"Fehlendes 'player_id' in den Tempban-Daten: { ban_data } " )
220
232
if not player_name or not player_id :
221
- await message .nack (requeue ( True ) )
233
+ await message .nack (requeue = True )
222
234
continue
223
235
224
236
for api_client in api_clients :
@@ -234,7 +246,7 @@ async def consume_tempban_messages(connection, channel, queue, api_clients):
234
246
await message .nack (requeue = True )
235
247
except Exception as e :
236
248
logging .error (f"Unerwarteter Fehler beim Verarbeiten der Nachricht: { e } " )
237
- await message .nack (requeue ( True ) )
249
+ await message .nack (requeue = True )
238
250
239
251
async def connect_to_watchlist_rabbitmq (client_id ):
240
252
logging .info (f"Versuche, eine Verbindung zu RabbitMQ für Watchlist-Nachrichten herzustellen für Client { client_id } ..." )
@@ -258,36 +270,58 @@ async def consume_watchlist_messages(connection, channel, queue, api_clients):
258
270
try :
259
271
watchlist_data = json .loads (message .body .decode ())
260
272
logging .info (f"Empfangene Watchlist-Daten: { watchlist_data } " )
261
-
262
- player_name = watchlist_data .get ('player_name' , watchlist_data .get ('player' ))
263
- player_id = watchlist_data .get ('player_id' , watchlist_data .get ('steam_id_64' ))
264
-
265
- # Überprüfung auf fehlende oder ungültige player_id
266
- if not player_id :
267
- logging .error (f"Fehlendes 'player_id' in den Watchlist-Daten: { watchlist_data } " )
268
- await message .nack (requeue = False ) # Nachricht nicht erneut in die Queue einreihen
269
- continue
270
273
271
- # Überprüfung auf fehlende erforderliche Felder
272
- if not player_name :
273
- logging .error (f"Fehlende erforderliche Datenfelder in den Watchlist-Daten: { watchlist_data } " )
274
- await message .nack (requeue (False ))
275
- continue
274
+ processed = False # Flag to track if the message has been processed
276
275
276
+ # Iteriere über die API-Clients, um die Nachricht an den richtigen API-Client zu senden
277
277
for api_client in api_clients :
278
278
version = api_client .api_version
279
279
280
+ if version .startswith ("v10" ):
281
+ # v10 API verwendet player_name und player_id
282
+ player_name = watchlist_data .get ('player_name' )
283
+ player_id = watchlist_data .get ('player_id' )
284
+ else :
285
+ # v9.x API verwendet player und steam_id_64
286
+ player_name = watchlist_data .get ('player' )
287
+ player_id = watchlist_data .get ('steam_id_64' )
288
+
289
+ # Überprüfung auf fehlende oder ungültige player_id
290
+ if not player_id :
291
+ logging .error (f"Fehlendes 'player_id' oder 'steam_id_64' in den Watchlist-Daten: { watchlist_data } " )
292
+ break # Verlassen der Schleife, um die Nachricht nicht weiter zu verarbeiten
293
+
294
+ # Überprüfung auf fehlende erforderliche Felder
295
+ if not player_name :
296
+ logging .error (f"Fehlende erforderliche Datenfelder in den Watchlist-Daten: { watchlist_data } " )
297
+ break # Verlassen der Schleife, um die Nachricht nicht weiter zu verarbeiten
298
+
299
+ # Call the appropriate API method based on the client version
280
300
if api_client .do_watch_player (player_name , player_id , watchlist_data ['reason' ], watchlist_data ['by' ]):
281
301
logging .info (f"Spieler erfolgreich zur Watchlist hinzugefügt: { player_id } " )
302
+ processed = True
282
303
else :
283
304
logging .error (f"Fehler beim Hinzufügen zur Watchlist für Player ID: { player_id } " )
284
- await message .ack ()
305
+ break # Nachricht nicht weiterverarbeiten
306
+
307
+ if processed :
308
+ await message .ack ()
309
+ else :
310
+ await message .nack (requeue = False )
311
+
285
312
except json .JSONDecodeError :
286
313
logging .error ("Fehler beim Parsen der JSON-Daten" )
287
- await message .nack (requeue (True ))
314
+ try :
315
+ await message .nack (requeue = True )
316
+ except aio_pika .exceptions .MessageProcessError :
317
+ logging .error ("Nachricht konnte nicht zurückgestellt werden, da sie bereits verarbeitet wurde." )
318
+
288
319
except Exception as e :
289
320
logging .error (f"Unerwarteter Fehler beim Verarbeiten der Nachricht: { e } " )
290
- await message .nack (requeue (True ))
321
+ try :
322
+ await message .nack (requeue = True )
323
+ except aio_pika .exceptions .MessageProcessError :
324
+ logging .error ("Nachricht konnte nicht zurückgestellt werden, da sie bereits verarbeitet wurde." )
291
325
292
326
async def connect_to_unwatch_rabbitmq (client_id ):
293
327
logging .info (f"Versuche, eine Verbindung zu RabbitMQ für Unwatch-Nachrichten herzustellen für Client { client_id } ..." )
0 commit comments