-
Notifications
You must be signed in to change notification settings - Fork 187
推送服务
Hprose 2.0 最大的亮点就是增加了推送功能的支持,而且这个功能的增加是在不修改现有通讯协议的方式下实现的,因此,这里的推送服务,即使不是 Hprose 2.0 的客户端也可以使用。
当然,在旧版本的客户端调用推送服务,需要多写一些代码。所以,如果你所使用的语言支持 Hprose 2.0,那么推荐直接使用 Hprose 2.0 的推送 API 来做推送,这样会极大的减少你的工作量。
下面我们来介绍一下服务器端增加的关于推送的 API。
public int getTimeout();
public void setTimeout(int timeout);
该属性设置推送空闲超时的。该属性默认值为 120000,单位是毫秒(ms),即 2 分钟。
当服务器发布了推送主题后(后面会专门介绍推送),客户端会跟服务器端保持一个长连接,如果达到超时时间,仍然没有任何消息推送给客户端,则返回 null
,此时,如果客户端仍然在线的话,则会立即再次发送获取推送主题的请求。服务器端通过这个方式可以获知客户端是否还在线。
public int getHeartbeat();
public void setHeartbeat(int heartbeat);
该属性用来设置推送的心跳检测间隔时间。该属性默认值为 3000,单位是毫秒,即 3 秒钟。
当服务器端推送数据给客户端后,如果客户端在 heartbeat
时间内没有取走推送数据,则服务器端认为客户端以掉线。对于以掉线的客户端,服务器端会清除为该客户端分配的内存空间,并将该客户端从推送列表中移除。
timeout
和 heartbeat
属性在检测客户端是否离线时是相互配合的,当服务器端没有向客户端推送任何消息时,服务器端需要至少 timeout
+ heartbeat
的时间才能检测到客户端以离线。当服务器端有向客户端推送消息时,则在推送消息之后经过 heartbeat
时间可以检测到客户端以掉线。
timeout
和 heartbeat
设置的时间越短,检测到客户端离线的时间就越短。但是需要注意以下几个问题:
timeout
时间越短,服务器端和客户端之间的用于检测是否掉线的通讯就越频繁,所以不应该将 timeout
设置的过短,否则会严重增加服务器的负担。
因此,timeout
的设置一般不应少于 30 秒。对于负载比较高的服务器,保持默认值就是一个不错的选项。
对于推送频繁的服务器来说,heartbeat
时间越长,对于已经离线的客户端,在服务器端存储的离线消息就越多,这会严重的占用服务器端的内存,因此,不宜将 heartbeat
的时间设置的过长。
如果 heartbeat
的时间设置的过短,客户端可能会因为网络原因导致不能及时取走推送消息,这就会导致错误的离线判断,当错误离线判断发生后,会丢失一些推送消息。
因此,heartbeat
的选择则应根据客户端的网络情况来决定,如果客户端都是来自局域网,并且客户端数量较少,设置为 1 秒甚至更短的时间也是可以的。而对于比较慢速且不太稳定的移动网络,设置为 5 秒或者 10 秒可能是一个比较合适的取值。对于普通的互联网客户端来说,保持默认值就可以了。
public PushEvent getPushEvent();
public void setPushEvent(PushEvent pushEvent);
该属性用来设置推送事件。推送事件接口 PushEvent
中包含两个事件方法:
void subscribe(String topic, int id, HproseService service);
void unsubscribe(String topic, int id, HproseService service);
当编号为 id
的客户端订阅主题 topic
时,触发 subscribe
事件。
当编号为 id
的客户端退订主题 topic
时,触发 unsubscribe
事件。
public final void publish(String topic);
public final void publish(String topic, int timeout);
public final void publish(String topic, int timeout, int heartbeat);
public final void publish(String[] topics);
public final void publish(String[] topics, int timeout);
public final void publish(String[] topics, int timeout, int heartbeat);
该方法用于发布一个或一组推送主题。这个推送的主题实际上是一个自动生成的远程服务方法。它的功能就是实现推送。
topic
为主题名,topics
为一组主题名。
这里 timeout
和 heartbeat
参数在前面的属性介绍里已经说明过了,这里不再重复。
publish
方法仅仅是告诉客户端,现在有一个叫做 topic
的推送主题可以订阅。
而要真正推送数据给客户端,则需要使用以下几个方法。
public final void broadcast(String topic, Object result) throws HproseException;
public final void broadcast(String topic, Object result, Action<Integer[]> callback) throws HproseException;
public final Promise<Integer[]> push(String topic, Object result) throws HproseException;
这里有两个推送方法:broadcast
和 push
。
这两个方法功能相同,但是 broadcast
方法支持回调,该回调方法的参数是一个整数数组,该数组中保存的是所有推送成功的客户端 id
。而 push
方法是将这个数组作为一个 Promise<Integer[]>
结果返回。
一旦服务器启动,你可以在任何地方进行数据推送。比如在其它的服务方法中,在服务器事件中,甚至在服务器外的并行运行的函数中。例如:
时间推送服务器
package hprose.example.push;
import hprose.common.HproseException;
import hprose.server.HproseTcpServer;
import hprose.util.concurrent.Timer;
import java.io.IOException;
import java.net.URISyntaxException;
public class TimePushServer {
public static void main(String[] args) throws URISyntaxException, IOException {
HproseTcpServer server = new HproseTcpServer("tcp://0.0.0.0:8080");
server.publish("time");
server.start();
Timer timer = new Timer(() -> {
try {
server.push("time", java.util.Calendar.getInstance());
}
catch (HproseException ex) {}
});
timer.setInterval(1000);
System.out.println("START");
System.in.read();
server.stop();
System.out.println("STOP");
}
}
时间显示客户端
package hprose.example.push;
import hprose.client.HproseClient;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
public class TimePushClient {
public static void main(String[] args) throws IOException, URISyntaxException, Throwable {
final HproseClient client = HproseClient.create("tcp://127.0.0.1:8080");
final CountDownLatch counter = new CountDownLatch(10);
client.subscribe("time", (Date time) -> {
if (counter.getCount() > 0) {
counter.countDown();
System.out.println(time);
}
else {
client.unsubscribe("time");
}
}, Date.class);
Thread.sleep(12000);
}
}
该程序运行结果为:
Thu Jun 30 22:43:46 CST 2016
Thu Jun 30 22:43:47 CST 2016
Thu Jun 30 22:43:48 CST 2016
Thu Jun 30 22:43:49 CST 2016
Thu Jun 30 22:43:50 CST 2016
Thu Jun 30 22:43:51 CST 2016
Thu Jun 30 22:43:52 CST 2016
Thu Jun 30 22:43:53 CST 2016
Thu Jun 30 22:43:54 CST 2016
Thu Jun 30 22:43:55 CST 2016
推送客户端同样支持使用接口方式调用,例如上面的时间显示客户端例子可以改为:
package hprose.example.push;
import hprose.client.HproseClient;
import hprose.util.concurrent.Action;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
interface IPushTime {
public void time(Action<Date> callback);
}
public class TimePushClient2 {
public static void main(String[] args) throws IOException, URISyntaxException, Throwable {
final HproseClient client = HproseClient.create("tcp://127.0.0.1:8080");
IPushTime pushTime = client.useService(IPushTime.class);
final CountDownLatch counter = new CountDownLatch(10);
pushTime.time((Date time) -> {
if (counter.getCount() > 0) {
counter.countDown();
System.out.println(time);
}
else {
client.unsubscribe("time");
}
});
Thread.sleep(12000);
}
}
运行结果是一样的。通过接口方式调用推送时,方法名就是订阅的主题名,推送方法可以接收一个或两个参数,如果只有一个参数,那么参数应该为 Action<T> callback
,如果有两个参数,那么第一个参数为 Integer id
, 第二个参数为 Action<T> callback
。推送方法的返回值类型应为 void。
如果需要设置超时,使用 @Timeout(timeout)
注解来标注即可。使用接口方式的好处是,如果推送的类型是一个嵌套的泛型,可以更准确的将该类型传递给客户端进行反序列化。
另外,你可能会注意到上面两个客户端的最后加上 Thread.sleep(12000);
,这是因为推送是异步的,如果不加这个,程序会直接结束,看不到任何输出。如果是在图形界面程序中,就是完全不必要的了。
有时候,你可能想在某个服务方法中推送数据给客户端,但是该服务方法可能在其它文件中定义。因此,你得不到 server
对象。那这时还能进行推送吗?
答案是可以,没问题。我们前面说过,在服务方法中我们可以得到一个 context
参数,这个 context
参数中就包含有一个 clients
对象,这个对象上就包含了所有跟推送有关的方法,这些方法跟 server
对象上的推送方法是完全一样的。