-
Notifications
You must be signed in to change notification settings - Fork 187
异步编程——Promise
Java 提供了一套 Future
异步模型,但是在 Java 8 之前只能通过 Get
方法来同步获取结果,而不能通过回调方式来异步获取结果。Java 8 增加了 CompletableFuture
,可以通过回调来获取结果,这种就是 Promise
异步模型。但是仅能在 Java 8 中才能使用。
当然 Guava、Netty 4+ 等 Java 框架也提供了 Promise
异步模型的实现。但是最低支持的 JDK 版本是 Java 6,对 Java 5 并不支持。
Hprose 为了更好的实现异步服务和异步调用,也提供了一套 Promise
异步模型实现。它基本上是参照 Promise/A+(中文版) 规范实现的。
Hprose for Java 的 Promise
实现在 hprose.util.concurrent
包中,该包中包含有以下接口:
Callback
Action
Func
Thenable
Rejector
Resolver
Executor
Handler
Reducer
类:
Promise
PromiseFuture
Threads
Timer
TypeException
另外还有一个枚举类型:
State
其中,Callback
是一个空接口,它是 Action
和 Func
的父接口,Action
表示一个有参数无返回值的回调函数,Func
表示一个有参数有返回值的回调函数。
Thenable
接口包含有一个 then
方法,Promise
类是 Thenable
接口的一个实现。
Resolver
接口包含有一个 resolve
方法,Rejector
接口包含有一个 reject
方法,Executor
接口包含有一个 exec
方法,该方法的两个参数分别是 Resolver
和 Rejector
接口的对象,Promise
类是 Resolver
和 Rejector
接口的一个实现,并且包含有一个 Executor
参数的构造器。在介绍 Promise
构造器时会举例介绍该接口的使用方法。
Handler
接口是 Promise
类中的 forEach
,every
,some
,filter
,map
方法的回调参数类型。
Reducer
接口是 Promise
类中的 reduce
和 reduceRight
方法的回调参数类型。
Promise
类是 Promise
异步模型的主要实现类,后面将对该类进行详细的介绍。
PromiseFuture
类是对 Promise
类的一个 Future
接口包装。
Threads
是一个静态类,提供了一些关于线程管理的静态帮助方法,比如:获取所有线程,获取主线程,获取根线程组,注册 JVM 退出时回调函数并在主线程推出时结束整个程序。该类主要用于 Hprose 内部使用,用户可以不用关心该类,所以本文不对该类做详细介绍。
Timer
是一个计时器类,也是用于 Hprose 内部使用,本文也不对该类做详细介绍。
TypeException
是一个异常类,当 Promise
对象以它自己作为任务结束结果时,抛出该异常。
State
表示 Promise
对象的状态,包含有结果待定(PENDING
),已接受(FULFILLED
),已拒绝(REJECTED
)三种状态。
该类是一个泛型类,其泛型参数表示其接受的结果类型。它是 Promise/A+(中文版) 规范的一个实现,不过也有略微不同之处:
在 Promise/A+ 规范中,then
方法的 onFulfilled
和 onRejected
回调方法是异步执行的,且在 then
方法被调用的那一轮事件循环之后的新执行栈中执行。而在 Hprose for Java 的实现中,为了减少线程切换引起的不必要的性能损失,这部分并未严格遵守该规范。当执行 then
方法时,如果任务尚未完成,onFulfilled
和 onRejected
回调方法会在结束任务的线程中执行,如果任务已完成,则在执行 then
方法时,立即执行。
除了规范中的 then
方法以外,Promise
类还提供了许多方法,下面将详细介绍。
hprose 中提供了多种方法来创建 Promise
对象。除了可以使用构造器创建之外,还可以通过一些 Promise
类上的静态方法来创建。
Promise<T> promise = new Promise<T>();
无参构造函数创建一个结果待定的 Promise
对象,其泛型参数表示其能够接受的结果类型,其泛型参数可以省略。可以将来通过 resolve
方法来设定其成功值,或通过 reject
方法来设定其失败原因。
package hprose.example.promise;
import hprose.util.concurrent.Action;
import hprose.util.concurrent.Promise;
import java.util.concurrent.Callable;
public class Exam1 {
public static void main(String[] args) {
Promise<String> promise = new Promise<String>(new Callable<String>() {
public String call() throws Exception {
return "hprose";
}
});
promise.then(new Action<String>() {
public void call(String value) throws Throwable {
System.out.println(value);
}
});
}
}
该 promise
对象中已经包含了成功值,可以使用 then
方法来得到它。运行结果为:
hprose
该代码在 Java 8 下,可以简写为:
package hprose.example.promise;
import hprose.util.concurrent.Promise;
public class Exam1 {
public static void main(String[] args) {
Promise<String> promise = new Promise<>(() -> "hprose");
promise.then((String value) -> System.out.println(value));
}
}
package hprose.example.promise;
import hprose.util.concurrent.Action;
import hprose.util.concurrent.Promise;
import java.util.concurrent.Callable;
public class Exam2 {
public static void main(String[] args) {
Promise<?> promise = new Promise(new Callable() {
public Object call() throws Exception {
throw new Exception("hprose");
}
});
promise.catchError(new Action<Throwable>() {
public void call(Throwable value) throws Throwable {
System.out.println(value);
}
});
}
}
该 promise
对象中已经包含了失败值,可以使用 catchError
方法来得到它。该程序运行结果为:
java.lang.Exception: hprose
该代码在 Java 8 下,可以简写为:
package hprose.example.promise;
import hprose.util.concurrent.Promise;
public class Exam2 {
public static void main(String[] args) {
Promise<?> promise = new Promise(() -> {
throw new Exception("hprose");
});
promise.catchError((Throwable value) -> {
System.out.println(value);
});
}
}
new Promise(executor);
new Promise((Resolver<V> resolver, Rejector rejector) -> { ... });
executor
是一个 Executor
接口的实现,它只有一个 exec
方法,该方法有两个参数:
-
Resolver resolver
: 当接受结果时,调用该对象上的 resolve 方法。 -
Rejector rejector
: 当拒绝结果时,调用该对象上的 rejector 方法。
例如:
package hprose.example.promise;
import hprose.util.concurrent.Action;
import hprose.util.concurrent.Executor;
import hprose.util.concurrent.Promise;
import hprose.util.concurrent.Rejector;
import hprose.util.concurrent.Resolver;
public class Exam3 {
public static void main(String[] args) {
Promise<Integer> promise = new Promise<Integer>(new Executor<Integer>() {
@Override
public void exec(Resolver<Integer> resolver, Rejector rejector) {
resolver.resolve(100);
}
});
promise.then(new Action<Integer>() {
public void call(Integer value) throws Throwable {
System.out.println(value);
}
});
}
}
该程序运行结果为:
100
该代码在 Java 8 下,可以简写为:
package hprose.example.promise;
import hprose.util.concurrent.Promise;
import hprose.util.concurrent.Rejector;
import hprose.util.concurrent.Resolver;
public class Exam3 {
public static void main(String[] args) {
Promise<Integer> promise = new Promise<>(
(Resolver<Integer> resolver, Rejector rejector) -> {
resolver.resolve(100);
}
);
promise.then((Integer value) -> {
System.out.println(value);
});
}
}
Promise
上提供了 4 个工厂方法,它们分别是:
value
error
sync
delayed
package hprose.example.promise;
import hprose.util.concurrent.Action;
import hprose.util.concurrent.Promise;
public class Exam4 {
public static void main(String[] args) {
Promise<String> promise = Promise.value("hprose");
promise.then(new Action<String>() {
public void call(String value) throws Throwable {
System.out.println(value);
}
});
}
}
使用 value
来创建一个成功(fulfilled)状态的 promise
对象效果跟前面用 Promise
构造器创建的效果一样,但是写起来更加简单,不再需要把结果放入一个函数中作为返回值返回了。
该代码在 Java 8 下,可以简写为:
package hprose.example.promise;
import hprose.util.concurrent.Promise;
public class Exam4 {
public static void main(String[] args) {
Promise<String> promise = Promise.value("hprose");
promise.then((String value) -> {
System.out.println(value);
});
}
}
package hprose.example.promise;
import hprose.util.concurrent.Action;
import hprose.util.concurrent.Promise;
public class Exam5 {
public static void main(String[] args) {
Promise<?> promise = Promise.error(new Exception("hprose"));
promise.catchError(new Action<Throwable>() {
public void call(Throwable value) throws Throwable {
System.out.println(value);
}
});
}
}
使用 error
来创建一个失败(rejected)状态的 promise
对象效果跟前面用 Promise
构造器创建的效果也一样,但是写起来也更加简单,不再需要把失败原因放入一个函数中作为异常抛出了。
该代码在 Java 8 下,可以简写为:
package hprose.example.promise;
import hprose.util.concurrent.Promise;
public class Exam5 {
public static void main(String[] args) {
Promise<?> promise = Promise.error(new Exception("hprose"));
promise.catchError((Throwable value) -> {
System.out.println(value);
});
}
}
Promise
上提供了一个:
Promise.sync(computation)
方法可以让我们同步的创建一个 promise
对象。
这里“同步”的意思是指 computation
的执行是同步执行的。而通过 Promise
构造器创建 promise
对象时,computation
是异步执行的。为了可以更好地理解这一点,我们来看一个具体的例子:
package hprose.example.promise;
import hprose.util.concurrent.Action;
import hprose.util.concurrent.Promise;
import java.util.concurrent.Callable;
public class Exam6 {
public static void async() {
System.out.println("before Promise constructor");
Promise<String> promise = new Promise<String>(new Callable<String>() {
public String call() throws Exception {
Thread.sleep(100);
System.out.println("running Promise constructor");
return "promise from Promise constructor";
}
});
promise.then(new Action<String>() {
public void call(String value) throws Throwable {
System.out.println(value);
}
});
System.out.println("after Promise constructor");
}
public static void sync() {
System.out.println("before Promise.sync");
Promise<String> promise = Promise.sync(new Callable<String>() {
public String call() throws Exception {
Thread.sleep(100);
System.out.println("running Promise.sync");
return "promise from Promise.sync";
}
});
promise.then(new Action<String>() {
public void call(String value) throws Throwable {
System.out.println(value);
}
});
System.out.println("after Promise.sync");
}
public static void main(String[] args) {
async();
sync();
}
}
运行结果如下:
before Promise constructor
after Promise constructor
before Promise.sync
running Promise constructor
promise from Promise constructor
running Promise.sync
promise from Promise.sync
after Promise.sync
该代码在 Java 8 下,可以简写为:
package hprose.example.promise;
import hprose.util.concurrent.Promise;
public class Exam6 {
public static void async() {
System.out.println("before Promise constructor");
Promise<String> promise = new Promise<>(() -> {
Thread.sleep(100);
System.out.println("running Promise constructor");
return "promise from Promise constructor";
});
promise.then((String value) -> {
System.out.println(value);
});
System.out.println("after Promise constructor");
}
public static void sync() {
System.out.println("before Promise.sync");
Promise<String> promise = Promise.sync(() -> {
Thread.sleep(100);
System.out.println("running Promise.sync");
return "promise from Promise.sync";
});
promise.then((String value) -> {
System.out.println(value);
});
System.out.println("after Promise.sync");
}
public static void main(String[] args) {
async();
sync();
}
}
从结果我们可以看出,Promise.sync
方法中的 computation
是同步执行的,而 Promise
构造器中的 computation
是异步执行的。另外,我们还可以看出这里的 then
跟 JavaScript 的不一样,JavaScript 的 then
是异步的,而 Java 版本的 then
是同步的。
虽然通过 Promise 构造器来创建一个 promise 对象跟使用 Promise.sync 方法来比是异步的,但只是在执行顺序上能看出差别来,但是它并不会让你感到有明显的延时。如果你需要创建一个 promise 对象并且延迟一段时间后再执行 computation 函数,那么你可以使用
Promise.delayed(long duration, TimeUnit timeunit, final Object value)
Promise.delayed(long duration, final Object value)
方法。
delayed
方法的第一个参数 duration
是一个延时时长,第二个参数 timeunit
是延时单位,如果省略默认为毫秒。最后一个参数 value
既可以是一个 Callable
的回调,也可以是一个其它类型的值(包括 promise
对象)。当 value
不是函数时,在延时之后,直接将 value
设置为 promise
对象的值。
例如(为了代码简洁,后面代码一律采用 Java8 的形式):
package hprose.example.promise;
import hprose.util.concurrent.Promise;
public class Exam7 {
public static void normal() {
System.out.println(System.currentTimeMillis() + ": before Promise constructor");
Promise<String> promise = new Promise<>(() -> {
System.out.println(System.currentTimeMillis() + ": running Promise constructor");
return "promise from Promise constructor";
});
promise.then((String value) -> {
System.out.println(System.currentTimeMillis() + ": " + value);
});
System.out.println(System.currentTimeMillis() + ": after Promise constructor");
}
public static void delayed() {
System.out.println(System.currentTimeMillis() + ": before Promise.delayed");
Promise<String> promise = Promise.delayed(300, () -> {
System.out.println(System.currentTimeMillis() + ": running Promise.delayed");
return "promise from Promise.delayed";
});
promise.then((String value) -> {
System.out.println(System.currentTimeMillis() + ": " + value);
});
System.out.println(System.currentTimeMillis() + ": after Promise.delayed");
}
public static void main(String[] args) throws InterruptedException {
delayed();
normal();
Thread.sleep(400);
}
}
该程序运行结果为:
1466322667552: before Promise.delayed
1466322667589: after Promise.delayed
1466322667589: before Promise constructor
1466322667590: running Promise constructor
1466322667590: promise from Promise constructor
1466322667590: after Promise constructor
1466322667890: running Promise.delayed
1466322667890: promise from Promise.delayed
注意,该程序最后加了 400 毫秒延时是为了能够显示最后的结果,否则还等不到输出:
1466322667890: running Promise.delayed
1466322667890: promise from Promise.delayed
程序就结束了。