37
37
import org .apache .inlong .audit .service .consume .KafkaConsume ;
38
38
import org .apache .inlong .audit .service .consume .PulsarConsume ;
39
39
import org .apache .inlong .audit .service .consume .TubeConsume ;
40
+ import org .apache .inlong .common .constant .MQType ;
40
41
import org .apache .inlong .common .pojo .audit .AuditConfigRequest ;
41
42
import org .apache .inlong .common .pojo .audit .MQInfo ;
42
43
import org .slf4j .Logger ;
45
46
import org .springframework .beans .factory .annotation .Autowired ;
46
47
import org .springframework .stereotype .Service ;
47
48
48
- import java .io .IOException ;
49
49
import java .io .InputStream ;
50
50
import java .util .ArrayList ;
51
51
import java .util .List ;
@@ -70,6 +70,9 @@ public class AuditMsgConsumerServer implements InitializingBean {
70
70
71
71
private static final String DEFAULT_CONFIG_PROPERTIES = "application.properties" ;
72
72
73
+ // interval time of getting mq config
74
+ private static final int INTERVAL_MS = 5000 ;
75
+
73
76
private final CloseableHttpClient httpClient = HttpClientBuilder .create ().build ();
74
77
75
78
private final Gson gson = new Gson ();
@@ -83,24 +86,25 @@ public void afterPropertiesSet() {
83
86
List <InsertData > insertServiceList = this .getInsertServiceList ();
84
87
85
88
for (MQInfo mqInfo : mqInfoList ) {
86
- if (mqConfig .isPulsar ()) {
89
+ if (mqConfig .isPulsar () && MQType . PULSAR . equals ( mqInfo . getMqType ()) ) {
87
90
mqConfig .setPulsarServerUrl (mqInfo .getUrl ());
88
91
mqConsume = new PulsarConsume (insertServiceList , storeConfig , mqConfig );
89
92
break ;
90
- } else if (mqConfig .isTube ()) {
93
+ } else if (mqConfig .isTube () && MQType . TUBEMQ . equals ( mqInfo . getMqType ()) ) {
91
94
mqConfig .setTubeMasterList (mqInfo .getUrl ());
92
95
mqConsume = new TubeConsume (insertServiceList , storeConfig , mqConfig );
93
96
break ;
94
- } else if (mqConfig .isKafka ()) {
97
+ } else if (mqConfig .isKafka () && MQType . KAFKA . equals ( mqInfo . getMqType ()) ) {
95
98
mqConfig .setKafkaServerUrl (mqInfo .getUrl ());
96
99
mqConsume = new KafkaConsume (insertServiceList , storeConfig , mqConfig );
97
100
break ;
98
- } else {
99
- LOG .error ("Unknown MessageQueue {}" , mqConfig .getMqType ());
100
- return ;
101
101
}
102
102
}
103
103
104
+ if (mqConsume == null ) {
105
+ LOG .error ("Unknown MessageQueue {}" , mqConfig .getMqType ());
106
+ }
107
+
104
108
if (storeConfig .isElasticsearchStore ()) {
105
109
esService .startTimerRoutine ();
106
110
}
@@ -133,19 +137,23 @@ private List<InsertData> getInsertServiceList() {
133
137
134
138
private List <MQInfo > getClusterFromManager () {
135
139
Properties properties = new Properties ();
140
+ List <MQInfo > mqConfig ;
136
141
try (InputStream inputStream = getClass ().getClassLoader ().getResourceAsStream (DEFAULT_CONFIG_PROPERTIES )) {
137
142
properties .load (inputStream );
138
143
String managerHosts = properties .getProperty ("manager.hosts" );
139
144
String clusterTag = properties .getProperty ("proxy.cluster.tag" );
140
145
String [] hostList = StringUtils .split (managerHosts , "," );
141
146
for (String host : hostList ) {
142
- List <MQInfo > mqConfig = getMQConfig (host , clusterTag );
143
- if (ObjectUtils .isNotEmpty (mqConfig )) {
144
- LOG .info ("return mqConfig" );
145
- return mqConfig ;
147
+ while (true ) {
148
+ mqConfig = getMQConfig (host , clusterTag );
149
+ if (ObjectUtils .isNotEmpty (mqConfig )) {
150
+ return mqConfig ;
151
+ }
152
+ LOG .info ("MQ config may not be registered yet, wait for 5s and try again" );
153
+ Thread .sleep (INTERVAL_MS );
146
154
}
147
155
}
148
- } catch (IOException e ) {
156
+ } catch (Exception e ) {
149
157
throw new RuntimeException (e );
150
158
}
151
159
return null ;
0 commit comments