@@ -103,10 +103,11 @@ class Notificator {
103103 throw err ;
104104 } finally {
105105 await log . close ( ) ;
106- this . notif_to_connect . clear ( ) ;
107106 for ( const conn of this . connect_str_to_connection . values ( ) ) {
108107 conn . destroy ( ) ;
109108 }
109+ this . connect_str_to_connection . clear ( ) ;
110+ this . notif_to_connect . clear ( ) ;
110111 }
111112 }
112113 }
@@ -259,18 +260,25 @@ class KafkaNotificator {
259260
260261 async connect ( ) {
261262 this . connection = new Kafka . HighLevelProducer ( this . connect_obj . kafka_options_object ) ;
263+ dbg . log2 ( "Kafka producer connecting, connect =" , this . connect_obj ) ;
262264 await new Promise ( ( res , rej ) => {
263265 this . connection . on ( 'ready' , ( ) => {
266+ dbg . log2 ( "Kafka producer connected for connection =" , this . connect_obj ) ;
264267 res ( ) ;
265268 } ) ;
266269 this . connection . on ( 'connection.failure' , err => {
270+ dbg . error ( "Kafka producer failed to connect. connect = " , this . connect_obj , ", err =" , err ) ;
267271 rej ( err ) ;
268272 } ) ;
269273 this . connection . on ( 'event.log' , arg => {
270- dbg . log1 ( "event log" , arg ) ;
274+ dbg . log2 ( "event log" , arg ) ;
275+ } ) ;
276+ this . connection . on ( 'event.error' , arg => {
277+ dbg . error ( "event error =" , arg ) ;
271278 } ) ;
272279 this . connection . connect ( ) ;
273280 } ) ;
281+ dbg . log2 ( "Kafka producer's connect done, connect =" , this . connect_obj ) ;
274282 this . connection . setPollInterval ( 100 ) ;
275283 }
276284
@@ -283,10 +291,12 @@ class KafkaNotificator {
283291 Buffer . from ( JSON . stringify ( notif . notif ) ) ,
284292 null ,
285293 Date . now ( ) ,
286- ( err , offset ) => {
294+ err => {
287295 if ( err ) {
296+ dbg . error ( "Failed to notify. Connect =" , connect_obj , ", notif =" , notif ) ;
288297 promise_failure_cb ( notif , failure_ctxt , err ) . then ( resolve ) ;
289298 } else {
299+ dbg . log2 ( "Kafka notify successful. Connect =" , connect_obj , ", notif =" , notif ) ;
290300 resolve ( ) ;
291301 }
292302 }
@@ -295,8 +305,10 @@ class KafkaNotificator {
295305 }
296306
297307 destroy ( ) {
298- this . connection . flush ( 10000 ) ;
299- this . connection . disconnect ( ) ;
308+ if ( this . connection . isConnected ( ) ) {
309+ this . connection . flush ( 10000 ) ;
310+ this . connection . disconnect ( ) ;
311+ }
300312 }
301313}
302314
0 commit comments