-
Notifications
You must be signed in to change notification settings - Fork 187
异步编程——Promise
Java 提供了一套 Future
异步模型,但是在 Java 8 之前只能通过 Get
方法来同步获取结果,而不能通过回调方式来异步获取结果。Java 8 增加了 CompletableFuture
,可以通过回调来获取结果,这种就是 Promise
异步模型。但是仅能在 Java 8 中才能使用。
当然 Guava、Netty 4+ 等 Java 框架也提供了 Promise
异步模型的实现。但是最低支持的 JDK 版本是 Java 6,对 Java 5 并不支持。
Hprose 为了更好的实现异步服务和异步调用,也提供了一套 Promise
异步模型实现。它基本上是参照 Promise/A+(中文版) 规范实现的。
Hprose for Java 的 Promise
实现在 hprose.util.concurrent
包中,该包中包含有以下接口:
Callback
Action
Func
Thenable
Rejector
Resolver
Executor
Handler
Reducer
类:
Promise
PromiseFuture
Threads
Timer
TypeException
另外还有一个枚举类型:
State
其中,Callback
是一个空接口,它是 Action
和 Func
的父接口,Action
表示一个有参数无返回值的回调函数,Func
表示一个有参数有返回值的回调函数。
Thenable
接口包含有一个 then
方法,Promise
类是 Thenable
接口的一个实现。
Resolver
接口包含有一个 resolve
方法,Rejector
接口包含有一个 reject
方法,Executor
接口包含有一个 exec
方法,该方法的两个参数分别是 Resolver
和 Rejector
接口的对象,Promise
类是 Resolver
和 Rejector
接口的一个实现,并且包含有一个 Executor
参数的构造器。在介绍 Promise
构造器时会举例介绍该接口的使用方法。
Handler
接口是 Promise
类中的 forEach
,every
,some
,filter
,map
方法的回调参数类型。
Reducer
接口是 Promise
类中的 reduce
和 reduceRight
方法的回调参数类型。
Promise
类是 Promise
异步模型的主要实现类,后面将对该类进行详细的介绍。
PromiseFuture
类是对 Promise
类的一个 Future
接口包装。
Threads
是一个静态类,提供了一些关于线程管理的静态帮助方法,比如:获取所有线程,获取主线程,获取根线程组,注册 JVM 退出时回调函数并在主线程推出时结束整个程序。该类主要用于 Hprose 内部使用,用户可以不用关心该类,所以本文不对该类做详细介绍。
Timer
是一个计时器类,也是用于 Hprose 内部使用,本文也不对该类做详细介绍。
TypeException
是一个异常类,当 Promise
对象以它自己作为任务结束结果时,抛出该异常。
State
表示 Promise
对象的状态,包含有结果待定(PENDING
),已接受(FULFILLED
),已拒绝(REJECTED
)三种状态。
该类是一个泛型类,其泛型参数表示其接受的结果类型。它是 Promise/A+(中文版) 规范的一个实现,不过也有略微不同之处:
在 Promise/A+ 规范中,then
方法的 onFulfilled
和 onRejected
回调方法是异步执行的,且在 then
方法被调用的那一轮事件循环之后的新执行栈中执行。而在 Hprose for Java 的实现中,为了减少线程切换引起的不必要的性能损失,这部分并未严格遵守该规范。当执行 then
方法时,如果任务尚未完成,onFulfilled
和 onRejected
回调方法会在结束任务的线程中执行,如果任务已完成,则在执行 then
方法时,立即执行。
除了规范中的 then
方法以外,Promise
类还提供了许多方法,下面将详细介绍。
hprose 中提供了多种方法来创建 Promise
对象。除了可以使用构造器创建之外,还可以通过一些 Promise
类上的静态方法来创建。
Promise<T> promise = new Promise<T>();
无参构造函数创建一个结果待定的 Promise
对象,其泛型参数表示其能够接受的结果类型,其泛型参数可以省略。可以将来通过 resolve
方法来设定其成功值,或通过 reject
方法来设定其失败原因。
package hprose.example.promise;
import hprose.util.concurrent.Action;
import hprose.util.concurrent.Promise;
import java.util.concurrent.Callable;
public class Exam1 {
public static void main(String[] args) {
Promise<String> promise = new Promise<String>(new Callable<String>() {
public String call() throws Exception {
return "hprose";
}
});
promise.then(new Action<String>() {
public void call(String value) throws Throwable {
System.out.println(value);
}
});
}
}
该 promise
对象中已经包含了成功值,可以使用 then
方法来得到它。运行结果为:
hprose
该代码在 Java 8 下,可以简写为:
package hprose.example.promise;
import hprose.util.concurrent.Promise;
public class Exam1 {
public static void main(String[] args) {
Promise<String> promise = new Promise<>(() -> "hprose");
promise.then((String value) -> System.out.println(value));
}
}
package hprose.example.promise;
import hprose.util.concurrent.Action;
import hprose.util.concurrent.Promise;
import java.util.concurrent.Callable;
public class Exam2 {
public static void main(String[] args) {
Promise<?> promise = new Promise(new Callable() {
public Object call() throws Exception {
throw new Exception("hprose");
}
});
promise.catchError(new Action<Throwable>() {
public void call(Throwable value) throws Throwable {
System.out.println(value);
}
});
}
}
该 promise
对象中已经包含了失败值,可以使用 catchError
方法来得到它。该程序运行结果为:
java.lang.Exception: hprose
该代码在 Java 8 下,可以简写为:
package hprose.example.promise;
import hprose.util.concurrent.Promise;
public class Exam2 {
public static void main(String[] args) {
Promise<?> promise = new Promise(() -> {
throw new Exception("hprose");
});
promise.catchError((Throwable value) -> {
System.out.println(value);
});
}
}
new Promise(executor);
new Promise((Resolver resolver, Rejector rejector) -> { ... });
executor
是一个 Executor
接口的实现,它只有一个 exec
方法,该方法有两个参数:
-
Resolver resolver
: 当接受结果时,调用该对象上的 resolve 方法。 -
Rejector rejector
: 当拒绝结果时,调用该对象上的 rejector 方法。
例如:
package hprose.example.promise;
import hprose.util.concurrent.Action;
import hprose.util.concurrent.Executor;
import hprose.util.concurrent.Promise;
import hprose.util.concurrent.Rejector;
import hprose.util.concurrent.Resolver;
public class Exam3 {
public static void main(String[] args) {
Promise<Integer> promise = new Promise<Integer>(new Executor() {
@Override
public void exec(Resolver resolver, Rejector rejector) {
resolver.resolve(100);
}
});
promise.then(new Action<Integer>() {
public void call(Integer value) throws Throwable {
System.out.println(value);
}
});
}
}
该程序运行结果为:
100
该代码在 Java 8 下,可以简写为:
package hprose.example.promise;
import hprose.util.concurrent.Promise;
import hprose.util.concurrent.Rejector;
import hprose.util.concurrent.Resolver;
public class Exam3 {
public static void main(String[] args) {
Promise<Integer> promise = new Promise<>((Resolver resolver, Rejector rejector) -> {
resolver.resolve(100);
});
promise.then((Integer value) -> {
System.out.println(value);
});
}
}