Skip to content

推送服务

小马哥 edited this page Nov 23, 2017 · 22 revisions

简介

Hprose 2.0 最大的亮点就是增加了推送功能的支持,而且这个功能的增加是在不修改现有通讯协议的方式下实现的,因此,这里的推送服务,即使不是 Hprose 2.0 的客户端或者服务器也可以使用。

当然,在旧版本的客户端调用推送服务,或者在旧版本的服务器上自己实现推送,需要多写一些代码。所以,如果你所使用的语言支持 Hprose 2.0,那么推荐直接使用 Hprose 2.0 的推送 API 来做推送,这样会极大的减少你的工作量。

下面我们来分别介绍一下客户端和服务器端增加的关于推送的 API。

客户端

客户端关于推送的方法只有两个,它们分别是:

subscribe 方法

public final void subscribe(String name, Action<Object> callback);
public final void subscribe(String name, Action<Object> callback, int timeout);
public final <T> void subscribe(String name, Action<T> callback, Type type);
public final <T> void subscribe(String name, Action<T> callback, Type type, int timeout);

public final void subscribe(String name, String id, Action<Object> callback);
public final void subscribe(String name, String id, Action<Object> callback, int timeout);
public final <T> void subscribe(String name, String id, Action<T> callback, Type type);
public final <T> void subscribe(String name, String id, Action<T> callback, Type type, int timeout);

public final void subscribe(String name, Action<Object> callback, boolean failswitch);
public final void subscribe(String name, Action<Object> callback, int timeout, boolean failswitch);
public final <T> void subscribe(String name, Action<T> callback, Type type, boolean failswitch);
public final <T> void subscribe(String name, Action<T> callback, Type type, int timeout, boolean failswitch);

public final void subscribe(String name, String id, Action<Object> callback, boolean failswitch);
public final void subscribe(String name, String id, Action<Object> callback, int timeout, boolean failswitch);
public final <T> void subscribe(String name, String id, Action<T> callback, Type type, boolean failswitch);
public final <T> void subscribe(String name, String id, Action<T> callback, Type type, int timeout, boolean failswitch);

subscribe 方法的用处是订阅服务器端的推送服务。该方法有两种方式,一种是自动获取设置客户端 id,另一种是手动设置客户端 id

参数 name 是订阅的主题名,它实际上也是一个服务器端的方法,该方法与普通方法的区别是,它只有一个参数 id,该参数表示客户端的唯一编号,该方法的返回值即推送信息,当返回值为 null 或者抛出异常时,客户端会忽略并再次调用该 name 对应的方法。当该方法返回推送消息时,callback 回调函数会执行,并同时再次调用该 name 对应的方法。因此当没有推送消息时,该方法不应该立即返回值,而应该挂起等待,直到超时或者有推送消息时再返回结果。

当然,对于开发者来说,自己实现一个完善的推送方法还是有一定难度的。因此,Hprose 2.0 的服务器端已经提供了一整套的专门用于推送的 API,通过这些 API,可以方便的自动实现用于推送的服务方法。在后面介绍服务器端时,我们再介绍这部分内容。

参数 id 是客户端的唯一编号,如果省略的话,客户端会使用自动编号机制,如果该自动编号未初始化,会自动调用一个名字为 # 的服务器端远程方法,之所以使用这个特殊的名字是为了防止跟用户发布的普通方法发生冲突。Hprose 2.0 服务器已经自动实现了该方法,但是用户也可以用自己的实现来替换它,它现在的默认实现是一个 UUID 字符串。当用户指定了 id 参数时,客户端会将它作为该 name 对应方法的参数值传给服务器端,但不会修改客户端的 id 属性值。

参数 callback 是用来处理推送消息的回调函数,该参数不能省略。

参数 type 是服务器端推送数据的类型,跟回调参数的类型一致,这里使用 Type 而不是 Class 是为了支持泛型。

参数 timeout 是等待推送消息的超时时间,单位是毫秒(ms),可以省略,默认值与 timeout 属性值相同。超时之后并不会产生异常,而是重新请求推送。因此,如果用户要在服务器端自己实现推送方法,应当注意处理好同一个客户端对同一个推送方法可能会进行重复调用的问题。如果使用 Hprose 2.0 提供的推送 API,则不需要关心这一点。

参数 failswitch 表示当客户端与服务器端通讯中发生网络故障,是否自动切换服务器。默认值是 false,表示不切换。

对于同一个推送主题,subscribe 方法允许被多次调用,这样可以对同一个推送主题指定多个不同的回调方法。但通常没有必要也不推荐这样做。

unsubscribe 方法

public void unsubscribe(String name)
public <T> void unsubscribe(String name, Action<T> callback)
public void unsubscribe(String name, String id)
public <T> void unsubscribe(String name, String id, Action<T> callback)

该方法用于取消订阅推送主题。当调用该方法时,带有 callback 参数,将只取消对该 callback 方法的回调,除非这是该主题上最后一个 callback,否则对该主题远程方法的调用并不会中断。当所有的 callback 都被取消之后,或者当调用该方法时,没有指定 callback 参数时,将会中断对该主题远程方法的循环调用。

如果 id 参数未指定,那么当客户端 id 属性有值时,将只取消对该 id 属性值对应的推送主题的订阅。当客户端 id 属性未初始化时,将会取消该主题上所有的订阅。

通常来说,当你调用 subscribe 方法时如果指定了 id 参数,那么当调用 unsubscribe 方法时你也应该指定相同的 id 参数。当你调用 subscribe 方法时没有指定 id 参数,那么当调用 unsubscribe 方法时你也不需要指定 id 参数。

autoId 方法

public synchronized String autoId();

该方法返回当前客户端在进行推送订阅时自动获取的唯一编号。如果自动编号没有初始化,则自动调用远程服务初始化。

##isSubscribed 方法

public boolean isSubscribed(String name)

name 所对应的主题已被订阅时,返回 true,否则返回 false

subscribedList 方法

public String[] subscribedList()

返回已被订阅的主题的列表,返回值是一个字符串数组。数组元素为已订阅的主题名称。

服务器端

服务器端提供了比较多的关于推送的 API,包括广播,多播和单播方式的推送,还有超时,心跳,推送事件等设置。

timeout 属性

public int getTimeout();
public void setTimeout(int timeout);

该属性设置推送空闲超时的。该属性默认值为 120000,单位是毫秒(ms),即 2 分钟。

当服务器发布了推送主题后(后面会专门介绍推送),客户端会跟服务器端保持一个长连接,如果达到超时时间,仍然没有任何消息推送给客户端,则返回 null,此时,如果客户端仍然在线的话,则会立即再次发送获取推送主题的请求。服务器端通过这个方式可以获知客户端是否还在线。

heartbeat 属性

public int getHeartbeat();
public void setHeartbeat(int heartbeat);

该属性用来设置推送的心跳检测间隔时间。该属性默认值为 3000,单位是毫秒,即 3 秒钟。

当服务器端推送数据给客户端后,如果客户端在 heartbeat 时间内没有取走推送数据,则服务器端认为客户端以掉线。对于以掉线的客户端,服务器端会清除为该客户端分配的内存空间,并将该客户端从推送列表中移除。

timeoutheartbeat 属性在检测客户端是否离线时是相互配合的,当服务器端没有向客户端推送任何消息时,服务器端需要至少 timeout + heartbeat 的时间才能检测到客户端以离线。当服务器端有向客户端推送消息时,则在推送消息之后经过 heartbeat 时间可以检测到客户端以掉线。

timeoutheartbeat 设置的时间越短,检测到客户端离线的时间就越短。但是需要注意以下几个问题:

timeout 时间越短,服务器端和客户端之间的用于检测是否掉线的通讯就越频繁,所以不应该将 timeout 设置的过短,否则会严重增加服务器的负担。

因此,timeout 的设置一般不应少于 30 秒。对于负载比较高的服务器,保持默认值就是一个不错的选项。

对于推送频繁的服务器来说,heartbeat 时间越长,对于已经离线的客户端,在服务器端存储的离线消息就越多,这会严重的占用服务器端的内存,因此,不宜将 heartbeat 的时间设置的过长。

如果 heartbeat 的时间设置的过短,客户端可能会因为网络原因导致不能及时取走推送消息,这就会导致错误的离线判断,当错误离线判断发生后,会丢失一些推送消息。

因此,heartbeat 的选择则应根据客户端的网络情况来决定,如果客户端都是来自局域网,并且客户端数量较少,设置为 1 秒甚至更短的时间也是可以的。而对于比较慢速且不太稳定的移动网络,设置为 5 秒或者 10 秒可能是一个比较合适的取值。对于普通的互联网客户端来说,保持默认值就可以了。

pushEvent 属性

public PushEvent getPushEvent();
public void setPushEvent(PushEvent pushEvent);

该属性用来设置推送事件。推送事件接口 PushEvent 中包含两个事件方法:

void subscribe(String topic, String id, HproseService service);
void unsubscribe(String topic, String id, HproseService service);

当编号为 id 的客户端订阅主题 topic 时,触发 subscribe 事件。

当编号为 id 的客户端退订主题 topic 时,触发 unsubscribe 事件。

publish 方法

public final void publish(String topic);
public final void publish(String topic, int timeout);
public final void publish(String topic, int timeout, int heartbeat);

public final void publish(String[] topics);
public final void publish(String[] topics, int timeout);
public final void publish(String[] topics, int timeout, int heartbeat);

该方法用于发布一个或一组推送主题。这个推送的主题实际上是一个自动生成的远程服务方法。它的功能就是实现推送。

topic 为主题名,topics 为一组主题名。

这里 timeoutheartbeat 参数在前面的属性介绍里已经说明过了,这里不再重复。

publish 方法仅仅是告诉客户端,现在有一个叫做 topic 的推送主题可以订阅。

而要真正推送数据给客户端,则需要使用以下几个方法。

广播

public final void broadcast(String topic, Object result);
public final void broadcast(String topic, Object result, Action<String[]> callback);

public final Promise<String[]> push(String topic, Object result);

这里有两个推送方法:broadcastpush

这两个方法功能相同,但是 broadcast 方法支持回调,该回调方法的参数是一个整数数组,该数组中保存的是所有推送成功的客户端 id。而 push 方法是将这个数组作为一个 Promise<String[]> 结果返回。

一旦服务器启动,你可以在任何地方进行数据推送。比如在其它的服务方法中,在服务器事件中,甚至在服务器外的并行运行的函数中。例如:

时间推送服务器

package hprose.example.push;

import hprose.server.HproseTcpServer;
import hprose.util.concurrent.Timer;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Calendar;

public class TimePushServer {
    public static void main(String[] args) throws URISyntaxException, IOException {
        HproseTcpServer server = new HproseTcpServer("tcp://0.0.0.0:8080");
        server.publish("time");
        server.start();
        Timer timer = new Timer(() -> server.push("time", Calendar.getInstance()));
        timer.setInterval(1000);
        System.out.println("START");
        System.in.read();
        server.stop();
        System.out.println("STOP");
    }
}

时间显示客户端

package hprose.example.push;

import hprose.client.HproseClient;
import java.util.Date;
import java.util.concurrent.CountDownLatch;

public class TimePushClient {
    public static void main(String[] args) throws Exception {
        final HproseClient client = HproseClient.create("tcp://127.0.0.1:8080");
        final CountDownLatch counter = new CountDownLatch(10);
        client.subscribe("time", (Date time) -> {
            if (counter.getCount() > 0) {
                counter.countDown();
                System.out.println(time);
            }
            else {
                client.unsubscribe("time");
            }
        }, Date.class);
        Thread.sleep(12000);
    }
}

该程序运行结果为:

Thu Jun 30 22:43:46 CST 2016
Thu Jun 30 22:43:47 CST 2016
Thu Jun 30 22:43:48 CST 2016
Thu Jun 30 22:43:49 CST 2016
Thu Jun 30 22:43:50 CST 2016
Thu Jun 30 22:43:51 CST 2016
Thu Jun 30 22:43:52 CST 2016
Thu Jun 30 22:43:53 CST 2016
Thu Jun 30 22:43:54 CST 2016
Thu Jun 30 22:43:55 CST 2016

推送客户端同样支持使用接口方式调用,例如上面的时间显示客户端例子可以改为:

package hprose.example.push;

import hprose.client.HproseClient;
import hprose.util.concurrent.Action;
import java.util.Date;
import java.util.concurrent.CountDownLatch;

interface IPushTime {
    public void time(Action<Date> callback);
}

public class TimePushClient2 {
    public static void main(String[] args) throws Exception {
        final HproseClient client = HproseClient.create("tcp://127.0.0.1:8080");
        IPushTime pushTime = client.useService(IPushTime.class);
        final CountDownLatch counter = new CountDownLatch(10);
        pushTime.time((Date time) -> {
            if (counter.getCount() > 0) {
                counter.countDown();
                System.out.println(time);
            }
            else {
                client.unsubscribe("time");
            }
        });
        Thread.sleep(12000);
    }
}

运行结果是一样的。通过接口方式调用推送时,方法名就是订阅的主题名,推送方法可以接收一个或两个参数,如果只有一个参数,那么参数应该为 Action<T> callback,如果有两个参数,那么第一个参数为 String id, 第二个参数为 Action<T> callback。推送方法的返回值类型应为 void。

如果需要设置超时,使用 @Timeout(timeout) 注解来标注即可。使用接口方式的好处是,如果推送的类型是一个嵌套的泛型,可以更准确的将该类型传递给客户端进行反序列化。

另外,你可能会注意到上面两个客户端的最后加上 Thread.sleep(12000);,这是因为推送是异步的,如果不加这个,程序会直接结束,看不到任何输出。如果是在图形界面程序中,就是完全不必要的了。

有时候,你可能想在某个服务方法中推送数据给客户端,但是该服务方法可能在其它文件中定义。因此,你得不到 server 对象。那这时还能进行推送吗?

答案是:可以的。我们前面说过,在服务方法中我们可以得到一个 context 参数,这个 context 参数中就包含有一个 clients 对象,这个对象上就包含了所有跟推送有关的方法,这些方法跟 server 对象上的推送方法是完全一样的。

例如:

服务器

package hprose.example.push;

import hprose.common.HproseException;
import hprose.server.HproseTcpServer;
import hprose.server.TcpContext;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Arrays;

public class BroadcastServer {
    public static String hello(String name) {
        TcpContext context = HproseTcpServer.getCurrentContext();
        System.out.println(Arrays.toString(context.clients.idlist("news")));
        context.clients.broadcast("news", "this is a pushed message:" + name);
        return "Hello " + name + "! -- " +
                context.getSocket().getRemoteSocketAddress().toString();
    }
    public static void main(String[] args) throws URISyntaxException, IOException {
        HproseTcpServer server = new HproseTcpServer("tcp://0.0.0.0:8081");
        server.add("hello", BroadcastServer.class);
        server.publish("news");
        server.start();
        System.out.println("START");
        System.in.read();
        server.stop();
        System.out.println("STOP");
    }
}

客户端

package hprose.example.push;

import hprose.client.HproseClient;
import hprose.util.concurrent.Action;

interface IBroadcast {
    public void news(Action<String> callback);
    public String hello(String name);
}

public class BroadcastClient {
    public static void main(String[] args) throws Exception {
        final HproseClient client = HproseClient.create("tcp://127.0.0.1:8081");
        IBroadcast bc = client.useService(IBroadcast.class);
        bc.news((String news) -> System.out.println(news));
        Thread.sleep(100);
        System.out.println(bc.hello("Hprose"));
        Thread.sleep(10000);
    }
}

现在我们先启动服务器,然后再运行两个客户端,则第一个客户端显示:

Hello Hprose! -- /127.0.0.1:51673
this is a pushed message:Hprose
this is a pushed message:Hprose

第二个客户端显示:

Hello Hprose! -- /127.0.0.1:51675
this is a pushed message:Hprose

这里我们在订阅推送主题 news 之后,做了一个 100ms 的延时,是因为订阅是异步的,而且要先从服务器端获取一个客户端唯一编号 id,然后再发起订阅请求,如果不加这个 100ms 延时,那么后面执行的 hello 调用就会先于订阅执行,这样每个客户端是收不到推送给自己的那条信息的。

从这个例子里,我们还可以看到如何服务方法中获取客户端的 IP 信息等信息。

注意:虽然上面的例子都是使用的 TCP 服务器和客户端,但是并不是说只有 Hprose 的 TCP 实现才支持推送服务,实际上 Hprose 的 HTTP 和 WebSocket 实现也支持推送。只是要配置 Servlet,不方便举例,所以这里只用 TCP 来举例。

多播

public final void multicast(String topic, String[] ids, Object result);
public final void multicast(String topic, String[] ids, Object result, Action<String[]> callback);
public final Promise<String[]> push(String topic, String[] ids, Object result);

跟广播类似,多播也有这样几种形式。跟广播相比,多播多了一个 ids 参数,它是一个客户端 id 的数组。也就是说,你可以向指定的一组客户端推送消息。

单播

public final void unicast(String topic, String id, Object result);
public final void unicast(String topic, String id, Object result, Action<Boolean> callback);
public final Promise<Boolean> push(String topic, String id, Object result);

单播是跟多播的形式也类似,只不过客户端 ids 数组参数变成了一个客户端 id 参数。

但是还有一点要注意,unicast 的回调方法跟 broadcastmulticast 不同,unicast 的回调方法的参数 Boolean 类型,该值为 true 是表示推送成功,为 false 表示推送失败。单播的 push 的返回值也同多播和广播不同,是 Promise<Boolean> 类型,其中的 Boolean 值含义同 unicast 回调参数一致。

idlist 方法

public final String[] idlist(String topic);

该方法用于获取当前在线的订阅了主题 topic 的所有客户端的 id 列表。

exist 方法

public final boolean exist(String topic, String id);

该方法用于快速判断 id 是否在当前在线的订阅了主题 topic 的客户端列表中。

注意,客户端在线状态是针对主题的,同一个客户端可能针对一个主题处于在线状态,但是针对另一个主题却处于离线状态,这种情况是正常的。