Skip to content

异步编程——Promise

小马哥 edited this page Jun 19, 2016 · 19 revisions
Promises/A+ logo

概述

Java 提供了一套 Future 异步模型,但是在 Java 8 之前只能通过 Get 方法来同步获取结果,而不能通过回调方式来异步获取结果。Java 8 增加了 CompletableFuture,可以通过回调来获取结果,这种就是 Promise 异步模型。但是仅能在 Java 8 中才能使用。

当然 Guava、Netty 4+ 等 Java 框架也提供了 Promise 异步模型的实现。但是最低支持的 JDK 版本是 Java 6,对 Java 5 并不支持。

Hprose 为了更好的实现异步服务和异步调用,也提供了一套 Promise 异步模型实现。它基本上是参照 Promise/A+(中文版) 规范实现的。

Hprose for Java 的 Promise 实现在 hprose.util.concurrent 包中,该包中包含有以下接口:

  • Callback
  • Action
  • Func
  • Thenable
  • Rejector
  • Resolver
  • Executor
  • Handler
  • Reducer

类:

  • Promise
  • PromiseFuture
  • Threads
  • Timer
  • TypeException

另外还有一个枚举类型:

  • State

其中,Callback 是一个空接口,它是 ActionFunc 的父接口,Action 表示一个有参数无返回值的回调函数,Func 表示一个有参数有返回值的回调函数。

Thenable 接口包含有一个 then 方法,Promise 类是 Thenable 接口的一个实现。

Resolver 接口包含有一个 resolve 方法,Rejector 接口包含有一个 reject 方法,Executor 接口包含有一个 exec 方法,该方法的两个参数分别是 ResolverRejector 接口的对象,Promise 类是 ResolverRejector 接口的一个实现,并且包含有一个 Executor 参数的构造器。在介绍 Promise 构造器时会举例介绍该接口的使用方法。

Handler 接口是 Promise 类中的 forEacheverysomefiltermap 方法的回调参数类型。

Reducer 接口是 Promise 类中的 reducereduceRight 方法的回调参数类型。

Promise 类是 Promise 异步模型的主要实现类,后面将对该类进行详细的介绍。

PromiseFuture 类是对 Promise 类的一个 Future 接口包装。

Threads 是一个静态类,提供了一些关于线程管理的静态帮助方法,比如:获取所有线程,获取主线程,获取根线程组,注册 JVM 退出时回调函数并在主线程推出时结束整个程序。该类主要用于 Hprose 内部使用,用户可以不用关心该类,所以本文不对该类做详细介绍。

Timer 是一个计时器类,也是用于 Hprose 内部使用,本文也不对该类做详细介绍。

TypeException 是一个异常类,当 Promise 对象以它自己作为任务结束结果时,抛出该异常。

State 表示 Promise 对象的状态,包含有结果待定(PENDING),已接受(FULFILLED),已拒绝(REJECTED)三种状态。

Promise 类

简介

该类是一个泛型类,其泛型参数表示其接受的结果类型。它是 Promise/A+(中文版) 规范的一个实现,不过也有略微不同之处:

Promise/A+ 规范中,then 方法的 onFulfilledonRejected 回调方法是异步执行的,且在 then 方法被调用的那一轮事件循环之后的新执行栈中执行。而在 Hprose for Java 的实现中,为了减少线程切换引起的不必要的性能损失,这部分并未严格遵守该规范。当执行 then 方法时,如果任务尚未完成,onFulfilledonRejected 回调方法会在结束任务的线程中执行,如果任务已完成,则在执行 then 方法时,立即执行。

除了规范中的 then 方法以外,Promise 类还提供了许多方法,下面将详细介绍。

创建 Promise 对象

hprose 中提供了多种方法来创建 Promise 对象。除了可以使用构造器创建之外,还可以通过一些 Promise 类上的静态方法来创建。

使用 Promise 构造器

创建一个待定(pending)状态 promise 对象

Promise<T> promise = new Promise<T>();

无参构造函数创建一个结果待定的 Promise 对象,其泛型参数表示其能够接受的结果类型,其泛型参数可以省略。可以将来通过 resolve 方法来设定其成功值,或通过 reject 方法来设定其失败原因。

创建一个成功(fulfilled)状态的 promise 对象

package hprose.example.promise;

import hprose.util.concurrent.Action;
import hprose.util.concurrent.Promise;
import java.util.concurrent.Callable;

public class Exam1 {
    public static void main(String[] args) {
        Promise<String> promise = new Promise<String>(new Callable<String>() {
            public String call() throws Exception {
                return "hprose";
            }
        });
        promise.then(new Action<String>() {
            public void call(String value) throws Throwable {
                System.out.println(value);
            }
        });
    }
}

promise 对象中已经包含了成功值,可以使用 then 方法来得到它。运行结果为:

hprose

该代码在 Java 8 下,可以简写为:

package hprose.example.promise;

import hprose.util.concurrent.Promise;

public class Exam1 {
    public static void main(String[] args) {
        Promise<String> promise = new Promise<>(() -> "hprose");
        promise.then((String value) -> System.out.println(value));
    }
}

创建一个失败(rejected)状态的 promise 对象

package hprose.example.promise;

import hprose.util.concurrent.Action;
import hprose.util.concurrent.Promise;
import java.util.concurrent.Callable;

public class Exam2 {
    public static void main(String[] args) {
        Promise<?> promise = new Promise(new Callable() {
            public Object call() throws Exception {
                throw new Exception("hprose");
            }
        });
        promise.catchError(new Action<Throwable>() {
            public void call(Throwable value) throws Throwable {
                System.out.println(value);
            }
        });
    }
}

promise 对象中已经包含了失败值,可以使用 catchError 方法来得到它。该程序运行结果为:

java.lang.Exception: hprose

该代码在 Java 8 下,可以简写为:

package hprose.example.promise;

import hprose.util.concurrent.Promise;

public class Exam2 {
    public static void main(String[] args) {
        Promise<?> promise = new Promise(() -> {
            throw new Exception("hprose");
        });
        promise.catchError((Throwable value) -> {
            System.out.println(value);
        });
    }
}

通过 executor 回调来创建 promise 对象

new Promise(executor);
new Promise((Resolver resolver, Rejector rejector) -> { ... });

executor 是一个 Executor 接口的实现,它只有一个 exec 方法,该方法有两个参数:

  • Resolver resolver: 当接受结果时,调用该对象上的 resolve 方法。
  • Rejector rejector: 当拒绝结果时,调用该对象上的 rejector 方法。

例如:

package hprose.example.promise;

import hprose.util.concurrent.Action;
import hprose.util.concurrent.Executor;
import hprose.util.concurrent.Promise;
import hprose.util.concurrent.Rejector;
import hprose.util.concurrent.Resolver;

public class Exam3 {
    public static void main(String[] args) {
        Promise<Integer> promise = new Promise<Integer>(new Executor() {
            @Override
            public void exec(Resolver resolver, Rejector rejector) {
                resolver.resolve(100);
            }
        });
        promise.then(new Action<Integer>() {
            public void call(Integer value) throws Throwable {
                System.out.println(value);
            }
        });
    }
}

该程序运行结果为:

100

该代码在 Java 8 下,可以简写为:

package hprose.example.promise;

import hprose.util.concurrent.Promise;
import hprose.util.concurrent.Rejector;
import hprose.util.concurrent.Resolver;

public class Exam3 {
    public static void main(String[] args) {
        Promise<Integer> promise = new Promise<>((Resolver resolver, Rejector rejector) -> {
            resolver.resolve(100);
        });
        promise.then((Integer value) -> {
            System.out.println(value);
        });
    }
}
Clone this wiki locally