Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add subscribe & unsubscribe combine stream function #100

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public interface WebSocketStreamClient {
int listenUserStream(String listenKey, WebSocketOpenCallback onOpenCallback, WebSocketMessageCallback onMessageCallback, WebSocketClosingCallback onClosingCallback, WebSocketClosedCallback onClosedCallback, WebSocketFailureCallback onFailureCallback);
int combineStreams(ArrayList<String> streams, WebSocketMessageCallback callback);
int combineStreams(ArrayList<String> streams, WebSocketOpenCallback onOpenCallback, WebSocketMessageCallback onMessageCallback, WebSocketClosingCallback onClosingCallback, WebSocketClosedCallback onClosedCallback, WebSocketFailureCallback onFailureCallback);
void subscribeCombineStreams(ArrayList<String> streams, int connectionId);
void unsubscribeCombineStreams(ArrayList<String> streams, int connectionId);
void closeConnection(int streamId);
void closeAllConnections();
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Iterator;
import java.util.Map;

import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -563,6 +564,58 @@ public int combineStreams(ArrayList<String> streams, WebSocketOpenCallback onOpe
return createConnection(onOpenCallback, onMessageCallback, onClosingCallback, onClosedCallback, onFailureCallback, request);
}

/**
* Subscribe stream names using combine stream connection that already initiated
*
* @param streams ArrayList of stream names to be subscribed <br>
* @param connectionId Id of initiated combined stream connection that will be used for sending subscribe request
*
* @see <a href="https://binance-docs.github.io/apidocs/spot/en/#live-subscribing-unsubscribing-to-streams">
* https://binance-docs.github.io/apidocs/spot/en/#live-subscribing-unsubscribing-to-streams</a>
*/
@Override
public void subscribeCombineStreams(ArrayList<String> streams, int connectionId) {
if (connections.containsKey(connectionId)) {
WebSocketConnection connection = connections.get(connectionId);

JSONObject params = new JSONObject();
params.put("method", "SUBSCRIBE");
params.put("params", streams);
params.put("id", System.currentTimeMillis());

logger.info("Sending subscribe request to connection id {} with stream {}", connectionId, streams);
connection.send(params.toString());
} else {
logger.info("Connection ID {} does not exist!", connectionId);
}
}

/**
* Unsubscribe stream names using combine stream connection that already initiated
*
* @param streams ArrayList of stream names to be unsubscribed <br>
* @param connectionId Id of initiated combined stream connection that will be used for sending unsubscribe request
*
* @see <a href="https://binance-docs.github.io/apidocs/spot/en/#live-subscribing-unsubscribing-to-streams">
* https://binance-docs.github.io/apidocs/spot/en/#live-subscribing-unsubscribing-to-streams</a>
*/
@Override
public void unsubscribeCombineStreams(ArrayList<String> streams, int connectionId) {
if (connections.containsKey(connectionId)) {
WebSocketConnection connection = connections.get(connectionId);

JSONObject params = new JSONObject();
params.put("method", "UNSUBSCRIBE");
params.put("params", streams);
params.put("id", System.currentTimeMillis());

logger.info("Sending unsubscribe request to connection id {} with stream {}", connectionId, streams);
connection.send(params.toString());
} else {
logger.info("Connection ID {} does not exist!", connectionId);
}
}

/**
* Closes a specific stream based on stream ID.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package examples.websocketstream;

import com.binance.connector.client.WebSocketStreamClient;
import com.binance.connector.client.impl.WebSocketStreamClientImpl;
import java.util.ArrayList;

public final class SubscribeCombineStreams {
private SubscribeCombineStreams() {
}

public static void main(String[] args) throws InterruptedException {
final long sleepTime = 3000;
WebSocketStreamClient client = new WebSocketStreamClientImpl();

ArrayList<String> streams = new ArrayList<>();
streams.add("btcusdt@trade");
streams.add("bnbusdt@trade");

int connectionId = client.combineStreams(streams, ((event) -> {
System.out.println(event);
}));

ArrayList<String> streamsToSubscribe = new ArrayList<>();
streamsToSubscribe.add("solusdt@trade");
streamsToSubscribe.add("ltcusdt@trade");
client.subscribeCombineStreams(streamsToSubscribe, connectionId);

Thread.sleep(sleepTime);
client.closeAllConnections();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package examples.websocketstream;

import com.binance.connector.client.WebSocketStreamClient;
import com.binance.connector.client.impl.WebSocketStreamClientImpl;
import java.util.ArrayList;

public final class UnsubscribeCombineStreams {
private UnsubscribeCombineStreams() {
}

public static void main(String[] args) throws InterruptedException {
final long sleepTime = 3000;
WebSocketStreamClient client = new WebSocketStreamClientImpl();

ArrayList<String> streams = new ArrayList<>();
streams.add("btcusdt@trade");
streams.add("bnbusdt@trade");

int connectionId = client.combineStreams(streams, ((event) -> {
System.out.println(event);
}));

client.unsubscribeCombineStreams(streams, connectionId);

Thread.sleep(sleepTime);
client.closeAllConnections();
}
}