Skip to content

推送服务

小马哥 edited this page Jul 1, 2016 · 22 revisions

Hprose 2.0 最大的亮点就是增加了推送功能的支持,而且这个功能的增加是在不修改现有通讯协议的方式下实现的,因此,这里的推送服务,即使不是 Hprose 2.0 的客户端也可以使用。

当然,在旧版本的客户端调用推送服务,需要多写一些代码。所以,如果你所使用的语言支持 Hprose 2.0,那么推荐直接使用 Hprose 2.0 的推送 API 来做推送,这样会极大的减少你的工作量。

下面我们来介绍一下服务器端增加的关于推送的 API。

timeout 属性

public int getTimeout();
public void setTimeout(int timeout);

该属性设置推送空闲超时的。该属性默认值为 120000,单位是毫秒(ms),即 2 分钟。

当服务器发布了推送主题后(后面会专门介绍推送),客户端会跟服务器端保持一个长连接,如果达到超时时间,仍然没有任何消息推送给客户端,则返回 null,此时,如果客户端仍然在线的话,则会立即再次发送获取推送主题的请求。服务器端通过这个方式可以获知客户端是否还在线。

heartbeat 属性

public int getHeartbeat();
public void setHeartbeat(int heartbeat);

该属性用来设置推送的心跳检测间隔时间。该属性默认值为 3000,单位是毫秒,即 3 秒钟。

当服务器端推送数据给客户端后,如果客户端在 heartbeat 时间内没有取走推送数据,则服务器端认为客户端以掉线。对于以掉线的客户端,服务器端会清除为该客户端分配的内存空间,并将该客户端从推送列表中移除。

timeoutheartbeat 属性在检测客户端是否离线时是相互配合的,当服务器端没有向客户端推送任何消息时,服务器端需要至少 timeout + heartbeat 的时间才能检测到客户端以离线。当服务器端有向客户端推送消息时,则在推送消息之后经过 heartbeat 时间可以检测到客户端以掉线。

timeoutheartbeat 设置的时间越短,检测到客户端离线的时间就越短。但是需要注意以下几个问题:

timeout 时间越短,服务器端和客户端之间的用于检测是否掉线的通讯就越频繁,所以不应该将 timeout 设置的过短,否则会严重增加服务器的负担。

因此,timeout 的设置一般不应少于 30 秒。对于负载比较高的服务器,保持默认值就是一个不错的选项。

对于推送频繁的服务器来说,heartbeat 时间越长,对于已经离线的客户端,在服务器端存储的离线消息就越多,这会严重的占用服务器端的内存,因此,不宜将 heartbeat 的时间设置的过长。

如果 heartbeat 的时间设置的过短,客户端可能会因为网络原因导致不能及时取走推送消息,这就会导致错误的离线判断,当错误离线判断发生后,会丢失一些推送消息。

因此,heartbeat 的选择则应根据客户端的网络情况来决定,如果客户端都是来自局域网,并且客户端数量较少,设置为 1 秒甚至更短的时间也是可以的。而对于比较慢速且不太稳定的移动网络,设置为 5 秒或者 10 秒可能是一个比较合适的取值。对于普通的互联网客户端来说,保持默认值就可以了。

pushEvent 属性

public PushEvent getPushEvent();
public void setPushEvent(PushEvent pushEvent);

该属性用来设置推送事件。推送事件接口 PushEvent 中包含两个事件方法:

void subscribe(String topic, int id, HproseService service);
void unsubscribe(String topic, int id, HproseService service);

当编号为 id 的客户端订阅主题 topic 时,触发 subscribe 事件。

当编号为 id 的客户端退订主题 topic 时,触发 unsubscribe 事件。

publish 方法

public final void publish(String topic);
public final void publish(String topic, int timeout);
public final void publish(String topic, int timeout, int heartbeat);

public final void publish(String[] topics);
public final void publish(String[] topics, int timeout);
public final void publish(String[] topics, int timeout, int heartbeat);

该方法用于发布一个或一组推送主题。这个推送的主题实际上是一个自动生成的远程服务方法。它的功能就是实现推送。

topic 为主题名,topics 为一组主题名。

这里 timeoutheartbeat 参数在前面的属性介绍里已经说明过了,这里不再重复。

publish 方法仅仅是告诉客户端,现在有一个叫做 topic 的推送主题可以订阅。

而要真正推送数据给客户端,则需要使用以下几个方法。

广播

public final void broadcast(String topic, Object result) throws HproseException;
public final void broadcast(String topic, Object result, Action<Integer[]> callback) throws HproseException;

public final Promise<Integer[]> push(String topic, Object result) throws HproseException;

这里有两个推送方法:broadcastpush

这两个方法功能相同,但是 broadcast 方法支持回调,该回调方法的参数是一个整数数组,该数组中保存的是所有推送成功的客户端 id。而 push 方法是将这个数组作为一个 Promise<Integer[]> 结果返回。

一旦服务器启动,你可以在任何地方进行数据推送。比如在其它的服务方法中,在服务器事件中,甚至在服务器外的并行运行的函数中。例如:

时间推送服务器

package hprose.example.push;

import hprose.common.HproseException;
import hprose.server.HproseTcpServer;
import hprose.util.concurrent.Timer;
import java.io.IOException;
import java.net.URISyntaxException;

public class TimePushServer {
    public static void main(String[] args) throws URISyntaxException, IOException {
        HproseTcpServer server = new HproseTcpServer("tcp://0.0.0.0:8080");
        server.publish("time");
        server.start();
        Timer timer = new Timer(() -> {
            try {
                server.push("time", java.util.Calendar.getInstance());
            }
            catch (HproseException ex) {}
        });
        timer.setInterval(1000);
        System.out.println("START");
        System.in.read();
        server.stop();
        System.out.println("STOP");
    }
}

时间显示客户端

package hprose.example.push;

import hprose.client.HproseClient;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Date;
import java.util.concurrent.CountDownLatch;

public class TimePushClient {
    public static void main(String[] args) throws IOException, URISyntaxException, Throwable {
        final HproseClient client = HproseClient.create("tcp://127.0.0.1:8080");
        final CountDownLatch counter = new CountDownLatch(10);
        client.subscribe("time", (Date time) -> {
            if (counter.getCount() > 0) {
                counter.countDown();
                System.out.println(time);
            }
            else {
                client.unsubscribe("time");
            }
        }, Date.class);
        Thread.sleep(12000);
    }
}

该程序运行结果为:

Thu Jun 30 22:43:46 CST 2016
Thu Jun 30 22:43:47 CST 2016
Thu Jun 30 22:43:48 CST 2016
Thu Jun 30 22:43:49 CST 2016
Thu Jun 30 22:43:50 CST 2016
Thu Jun 30 22:43:51 CST 2016
Thu Jun 30 22:43:52 CST 2016
Thu Jun 30 22:43:53 CST 2016
Thu Jun 30 22:43:54 CST 2016
Thu Jun 30 22:43:55 CST 2016

推送客户端同样支持使用接口方式调用,例如上面的时间显示客户端例子可以改为:

package hprose.example.push;

import hprose.client.HproseClient;
import hprose.util.concurrent.Action;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Date;
import java.util.concurrent.CountDownLatch;

interface IPushTime {
    public void time(Action<Date> callback);
}

public class TimePushClient2 {
    public static void main(String[] args) throws IOException, URISyntaxException, Throwable {
        final HproseClient client = HproseClient.create("tcp://127.0.0.1:8080");
        IPushTime pushTime = client.useService(IPushTime.class);
        final CountDownLatch counter = new CountDownLatch(10);
        pushTime.time((Date time) -> {
            if (counter.getCount() > 0) {
                counter.countDown();
                System.out.println(time);
            }
            else {
                client.unsubscribe("time");
            }
        });
        Thread.sleep(12000);
    }
}

运行结果是一样的。通过接口方式调用推送时,方法名就是订阅的主题名,推送方法可以接收一个或两个参数,如果只有一个参数,那么参数应该为 Action<T> callback,如果有两个参数,那么第一个参数为 Integer id, 第二个参数为 Action<T> callback。推送方法的返回值类型应为 void。

如果需要设置超时,使用 @Timeout(timeout) 注解来标注即可。使用接口方式的好处是,如果推送的类型是一个嵌套的泛型,可以更准确的将该类型传递给客户端进行反序列化。

另外,你可能会注意到上面两个客户端的最后加上 Thread.sleep(12000);,这是因为推送是异步的,如果不加这个,程序会直接结束,看不到任何输出。如果是在图形界面程序中,就是完全不必要的了。

有时候,你可能想在某个服务方法中推送数据给客户端,但是该服务方法可能在其它文件中定义。因此,你得不到 server 对象。那这时还能进行推送吗?

答案是可以,没问题。我们前面说过,在服务方法中我们可以得到一个 context 参数,这个 context 参数中就包含有一个 clients 对象,这个对象上就包含了所有跟推送有关的方法,这些方法跟 server 对象上的推送方法是完全一样的。

Clone this wiki locally