2020import java .net .http .HttpClient ;
2121import java .net .http .WebSocket ;
2222import java .util .*;
23- import java .util .concurrent .CompletableFuture ;
24- import java .util .concurrent .CompletionStage ;
25- import java .util .concurrent .Future ;
23+ import java .util .concurrent .*;
2624
2725public final class WebSocketConnection implements WebSocket .Listener {
2826 /**
@@ -50,6 +48,8 @@ public final class WebSocketConnection implements WebSocket.Listener {
5048 */
5149 private final ArrayList <String > messages = new ArrayList <>();
5250
51+ private CompletableFuture <Void > messageQueueCleared = new CompletableFuture <>();
52+
5353 /**
5454 * is the connection ready
5555 */
@@ -108,10 +108,12 @@ public void unsubscribe(@NotNull StreamType type) {
108108 return ;
109109 }
110110
111- Stream <?> stream = this .streams .get (type .getStreamClass ());
112- if (stream != null ) {
113- stream .stop ();
114- this .streams .remove (type .getStreamClass ());
111+ synchronized (streams ) {
112+ Stream <?> stream = this .streams .get (type .getStreamClass ());
113+ if (stream != null ) {
114+ stream .stop ();
115+ this .streams .remove (type .getStreamClass ());
116+ }
115117 }
116118 }
117119
@@ -121,14 +123,8 @@ public void unsubscribe(@NotNull StreamType type) {
121123 * @param command minecraft command
122124 * @return was the command executed
123125 */
124- public boolean executeCommand (String command ) {
125- ConsoleStream stream = this .getStream (ConsoleStream .class );
126- if (stream != null ) {
127- stream .executeCommand (command );
128- return true ;
129- }
130-
131- return false ;
126+ public CompletableFuture <Void > executeCommand (String command ) {
127+ return this .getOrCreateStream (ConsoleStream .class ).executeCommand (command );
132128 }
133129
134130 /**
@@ -191,9 +187,24 @@ public Future<Server> waitForStatus(Set<ServerStatus> statuses) {
191187 return new WaitForStatusSubscriber (statuses , this .getStream (ServerStatusStream .class ));
192188 }
193189
194- private <T extends Stream <?>> T getStream (Class <T > c ) {
195- @ SuppressWarnings ("unchecked" ) T stream = (T ) this .streams .get (c );
196- return stream ;
190+ private <T extends Stream <?>> @ NotNull T getOrCreateStream (Class <T > clazz ) {
191+ synchronized (streams ) {
192+ @ SuppressWarnings ("unchecked" ) T stream = (T ) this .streams .computeIfAbsent (clazz , this ::createAndStartStream );
193+ return stream ;
194+ }
195+ }
196+
197+ private <T extends Stream <?>> @ Nullable T getStream (Class <T > clazz ) {
198+ synchronized (streams ) {
199+ @ SuppressWarnings ("unchecked" ) T stream = (T ) this .streams .get (clazz );
200+ return stream ;
201+ }
202+ }
203+
204+ private @ NotNull Stream <?> createAndStartStream (Class <? extends Stream <?>> clazz ) {
205+ var created = StreamType .get (clazz ).construct (this , gson );
206+ created .start ();
207+ return created ;
197208 }
198209
199210 private void connect () {
@@ -214,11 +225,7 @@ private void connect() {
214225 */
215226 @ ApiStatus .Internal
216227 public <T > void addStreamSubscriber (Class <? extends Stream <T >> c , T subscriber ) {
217- if (!this .streams .containsKey (c )) {
218- this .streams .put (c , StreamType .get (c ).construct (this , gson ));
219- }
220-
221- getStream (c ).addSubscriber (subscriber );
228+ getOrCreateStream (c ).addSubscriber (subscriber );
222229 }
223230
224231 /**
@@ -229,25 +236,28 @@ public <T> void addStreamSubscriber(Class<? extends Stream<T>> c, T subscriber)
229236 */
230237 @ ApiStatus .Internal
231238 public <T > void removeStreamSubscriber (Class <? extends Stream <T >> c , T subscriber ) {
232- if (!this .streams .containsKey (c )) {
239+ var stream = getStream (c );
240+ if (stream == null ) {
233241 return ;
234242 }
235243
236- getStream ( c ) .removeSubscriber (subscriber );
244+ stream .removeSubscriber (subscriber );
237245 unsubscribeFromEmptyStreams ();
238246 }
239247
240248 @ ApiStatus .Internal
241249 public void unsubscribeFromEmptyStreams () {
242- for (Stream <?> stream : streams .values ()) {
243- if (stream .hasNoSubscribers ()) {
244- this .unsubscribe (stream .getType ());
250+ synchronized (streams ) {
251+ for (Stream <?> stream : new ArrayList <>(streams .values ())) {
252+ if (stream .hasNoSubscribers ()) {
253+ this .unsubscribe (stream .getType ());
254+ }
245255 }
246- }
247256
248- // server status stream is always active
249- if (streams .size () == 1 && getStream (ServerStatusStream .class ).hasNoSubscribers ()) {
250- this .server .unsubscribe ();
257+ // server status stream is always active
258+ if (streams .size () == 1 && getOrCreateStream (ServerStatusStream .class ).hasNoSubscribers ()) {
259+ this .server .unsubscribe ();
260+ }
251261 }
252262 }
253263
@@ -282,11 +292,13 @@ public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean
282292 webSocket .sendText (x , true );
283293 }
284294 this .messages .clear ();
295+ this .messageQueueCleared .complete (null );
296+ this .messageQueueCleared = new CompletableFuture <>();
285297 break ;
286298
287299 default :
288300 final StreamType name = StreamType .get (message .get ("stream" ).getAsString ());
289- final Stream <?> stream = streams . get (name .getStreamClass ());
301+ final Stream <?> stream = getStream (name .getStreamClass ());
290302 if (stream != null ) {
291303 stream .onMessage (type , message );
292304 }
@@ -298,8 +310,10 @@ public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean
298310 @ ApiStatus .Internal
299311 public void onStatusChange () {
300312 // start/stop streams based on status
301- for (Stream <?> s : streams .values ()) {
302- s .onStatusChange ();
313+ synchronized (streams ) {
314+ for (Stream <?> s : streams .values ()) {
315+ s .onStatusChange ();
316+ }
303317 }
304318 }
305319
@@ -308,8 +322,10 @@ public void onStatusChange() {
308322 public CompletionStage <?> onClose (WebSocket webSocket , int statusCode , String reason ) {
309323 logger .info ("Websocket connection to {} closed: {} {}" , uri , statusCode , reason );
310324
311- for (Stream <?> stream : streams .values ()) {
312- stream .onDisconnected ();
325+ synchronized (streams ) {
326+ for (Stream <?> stream : streams .values ()) {
327+ stream .onDisconnected ();
328+ }
313329 }
314330
315331 if (this .shouldAutoReconnect ()) {
@@ -337,12 +353,12 @@ public void onError(WebSocket webSocket, Throwable error) {
337353 * @param data web socket message
338354 */
339355 @ ApiStatus .Internal
340- public void sendWhenReady (String data ) {
356+ public CompletableFuture < Void > sendWhenReady (String data ) {
341357 if (this .client == null || !this .ready ) {
342358 this .messages .add (data );
343- return ;
359+ return messageQueueCleared ;
344360 }
345- this .client .sendText (data , true );
361+ return this .client .sendText (data , true ). thenAccept ( x -> {} );
346362 }
347363
348364 /**
0 commit comments