Skip to content

异步编程——Promise

小马哥 edited this page Jun 21, 2016 · 19 revisions
Promises/A+ logo

概述

Java 提供了一套 Future 异步模型,但是在 Java 8 之前只能通过 Get 方法来同步获取结果,而不能通过回调方式来异步获取结果。Java 8 增加了 CompletableFuture,可以通过回调来获取结果,这种就是 Promise 异步模型。但是仅能在 Java 8 中才能使用。

当然 Guava、Netty 4+ 等 Java 框架也提供了 Promise 异步模型的实现。但是最低支持的 JDK 版本是 Java 6,对 Java 5 并不支持。

Hprose 为了更好的实现异步服务和异步调用,也提供了一套 Promise 异步模型实现。它基本上是参照 Promise/A+(中文版) 规范实现的。

Hprose for Java 的 Promise 实现在 hprose.util.concurrent 包中,该包中包含有以下接口:

  • Callback
  • Action
  • Func
  • Thenable
  • Rejector
  • Resolver
  • Executor
  • Handler
  • Reducer

类:

  • Promise
  • PromiseFuture
  • Threads
  • Timer
  • TypeException

另外还有一个枚举类型:

  • State

其中,Callback 是一个空接口,它是 ActionFunc 的父接口,Action 表示一个有参数无返回值的回调函数,Func 表示一个有参数有返回值的回调函数。

Thenable 接口包含有一个 then 方法,Promise 类是 Thenable 接口的一个实现。

Resolver 接口包含有一个 resolve 方法,Rejector 接口包含有一个 reject 方法,Executor 接口包含有一个 exec 方法,该方法的两个参数分别是 ResolverRejector 接口的对象,Promise 类是 ResolverRejector 接口的一个实现,并且包含有一个 Executor 参数的构造器。在介绍 Promise 构造器时会举例介绍该接口的使用方法。

Handler 接口是 Promise 类中的 forEacheverysomefiltermap 方法的回调参数类型。

Reducer 接口是 Promise 类中的 reducereduceRight 方法的回调参数类型。

Promise 类是 Promise 异步模型的主要实现类,后面将对该类进行详细的介绍。

PromiseFuture 类是对 Promise 类的一个 Future 接口包装。

Threads 是一个静态类,提供了一些关于线程管理的静态帮助方法,比如:获取所有线程,获取主线程,获取根线程组,注册 JVM 退出时回调函数并在主线程推出时结束整个程序。该类主要用于 Hprose 内部使用,用户可以不用关心该类,所以本文不对该类做详细介绍。

Timer 是一个计时器类,也是用于 Hprose 内部使用,本文也不对该类做详细介绍。

TypeException 是一个异常类,当 Promise 对象以它自己作为任务结束结果时,抛出该异常。

State 表示 Promise 对象的状态,包含有结果待定(PENDING),已接受(FULFILLED),已拒绝(REJECTED)三种状态。

Promise 类

简介

该类是一个泛型类,其泛型参数表示其接受的结果类型。它是 Promise/A+(中文版) 规范的一个实现,不过也有略微不同之处:

Promise/A+ 规范中,then 方法的 onFulfilledonRejected 回调方法是异步执行的,且在 then 方法被调用的那一轮事件循环之后的新执行栈中执行。而在 Hprose for Java 的实现中,为了减少线程切换引起的不必要的性能损失,这部分并未严格遵守该规范。当执行 then 方法时,如果任务尚未完成,onFulfilledonRejected 回调方法会在结束任务的线程中执行,如果任务已完成,则在执行 then 方法时,立即执行。

除了规范中的 then 方法以外,Promise 类还提供了许多方法,下面将详细介绍。

hprose 中提供了多种方法来创建 Promise 对象。除了可以使用构造器创建之外,还可以通过一些 Promise 类上的静态方法来创建。

Promise 构造器

创建一个待定(pending)状态 promise 对象

Promise<T> promise = new Promise<T>();

无参构造函数创建一个结果待定的 Promise 对象,其泛型参数表示其能够接受的结果类型,其泛型参数可以省略。可以将来通过 resolve 方法来设定其成功值,或通过 reject 方法来设定其失败原因。

创建一个成功(fulfilled)状态的 promise 对象

package hprose.example.promise;

import hprose.util.concurrent.Action;
import hprose.util.concurrent.Promise;
import java.util.concurrent.Callable;

public class Exam1 {
    public static void main(String[] args) {
        Promise<String> promise = new Promise<String>(new Callable<String>() {
            public String call() throws Exception {
                return "hprose";
            }
        });
        promise.then(new Action<String>() {
            public void call(String value) throws Throwable {
                System.out.println(value);
            }
        });
    }
}

promise 对象中已经包含了成功值,可以使用 then 方法来得到它。运行结果为:

hprose

该代码在 Java 8 下,可以简写为:

package hprose.example.promise;

import hprose.util.concurrent.Promise;

public class Exam1 {
    public static void main(String[] args) {
        Promise<String> promise = new Promise<>(() -> "hprose");
        promise.then((String value) -> System.out.println(value));
    }
}

创建一个失败(rejected)状态的 promise 对象

package hprose.example.promise;

import hprose.util.concurrent.Action;
import hprose.util.concurrent.Promise;
import java.util.concurrent.Callable;

public class Exam2 {
    public static void main(String[] args) {
        Promise<?> promise = new Promise(new Callable() {
            public Object call() throws Exception {
                throw new Exception("hprose");
            }
        });
        promise.catchError(new Action<Throwable>() {
            public void call(Throwable value) throws Throwable {
                System.out.println(value);
            }
        });
    }
}

promise 对象中已经包含了失败值,可以使用 catchError 方法来得到它。该程序运行结果为:

java.lang.Exception: hprose

该代码在 Java 8 下,可以简写为:

package hprose.example.promise;

import hprose.util.concurrent.Promise;

public class Exam2 {
    public static void main(String[] args) {
        Promise<?> promise = new Promise(() -> {
            throw new Exception("hprose");
        });
        promise.catchError((Throwable value) -> {
            System.out.println(value);
        });
    }
}

通过 executor 回调来创建 promise 对象

new Promise(executor);
new Promise((Resolver<V> resolver, Rejector rejector) -> { ... });

executor 是一个 Executor 接口的实现,它只有一个 exec 方法,该方法有两个参数:

  • Resolver resolver: 当接受结果时,调用该对象上的 resolve 方法。
  • Rejector rejector: 当拒绝结果时,调用该对象上的 rejector 方法。

例如:

package hprose.example.promise;

import hprose.util.concurrent.Action;
import hprose.util.concurrent.Executor;
import hprose.util.concurrent.Promise;
import hprose.util.concurrent.Rejector;
import hprose.util.concurrent.Resolver;

public class Exam3 {
    public static void main(String[] args) {
        Promise<Integer> promise = new Promise<Integer>(new Executor<Integer>() {
            @Override
            public void exec(Resolver<Integer> resolver, Rejector rejector) {
                resolver.resolve(100);
            }
        });
        promise.then(new Action<Integer>() {
            public void call(Integer value) throws Throwable {
                System.out.println(value);
            }
        });
    }
}

该程序运行结果为:

100

该代码在 Java 8 下,可以简写为:

package hprose.example.promise;

import hprose.util.concurrent.Promise;
import hprose.util.concurrent.Rejector;
import hprose.util.concurrent.Resolver;

public class Exam3 {
    public static void main(String[] args) {
        Promise<Integer> promise = new Promise<>(
            (Resolver<Integer> resolver, Rejector rejector) -> {
                resolver.resolve(100);
            }
        );
        promise.then((Integer value) -> {
            System.out.println(value);
        });
    }
}

Promise 上的工厂方法

Promise 上提供了 4 个工厂方法,它们分别是:

  • value
  • error
  • sync
  • delayed

创建一个成功(fulfilled)状态的 promise 对象

package hprose.example.promise;

import hprose.util.concurrent.Action;
import hprose.util.concurrent.Promise;

public class Exam4 {
    public static void main(String[] args) {
        Promise<String> promise = Promise.value("hprose");
        promise.then(new Action<String>() {
            public void call(String value) throws Throwable {
                System.out.println(value);
            }
        });
    }
}

使用 value 来创建一个成功(fulfilled)状态的 promise 对象效果跟前面用 Promise 构造器创建的效果一样,但是写起来更加简单,不再需要把结果放入一个函数中作为返回值返回了。

该代码在 Java 8 下,可以简写为:

package hprose.example.promise;

import hprose.util.concurrent.Promise;

public class Exam4 {
    public static void main(String[] args) {
        Promise<String> promise = Promise.value("hprose");
        promise.then((String value) -> {
            System.out.println(value);
        });
    }
}

创建一个失败(rejected)状态的 promise 对象

package hprose.example.promise;

import hprose.util.concurrent.Action;
import hprose.util.concurrent.Promise;

public class Exam5 {
    public static void main(String[] args) {
        Promise<?> promise = Promise.error(new Exception("hprose"));
        promise.catchError(new Action<Throwable>() {
            public void call(Throwable value) throws Throwable {
                System.out.println(value);
            }
        });
    }
}

使用 error来创建一个失败(rejected)状态的 promise 对象效果跟前面用 Promise 构造器创建的效果也一样,但是写起来也更加简单,不再需要把失败原因放入一个函数中作为异常抛出了。

该代码在 Java 8 下,可以简写为:

package hprose.example.promise;

import hprose.util.concurrent.Promise;

public class Exam5 {
    public static void main(String[] args) {
        Promise<?> promise = Promise.error(new Exception("hprose"));
        promise.catchError((Throwable value) -> {
            System.out.println(value);
        });
    }
}

同步创建一个 promise 对象

Promise 上提供了一个:

Promise.sync(computation)

方法可以让我们同步的创建一个 promise 对象。

这里“同步”的意思是指 computation 的执行是同步执行的。而通过 Promise 构造器创建 promise 对象时,computation 是异步执行的。为了可以更好地理解这一点,我们来看一个具体的例子:

package hprose.example.promise;

import hprose.util.concurrent.Action;
import hprose.util.concurrent.Promise;
import java.util.concurrent.Callable;

public class Exam6 {
    public static void async() {
        System.out.println("before Promise constructor");
        Promise<String> promise = new Promise<String>(new Callable<String>() {
            public String call() throws Exception {
                Thread.sleep(100);
                System.out.println("running Promise constructor");
                return "promise from Promise constructor";
            }
        });
        promise.then(new Action<String>() {
            public void call(String value) throws Throwable {
                System.out.println(value);
            }
        });
        System.out.println("after Promise constructor");
    }
    public static void sync() {
        System.out.println("before Promise.sync");
        Promise<String> promise = Promise.sync(new Callable<String>() {
            public String call() throws Exception {
                Thread.sleep(100);
                System.out.println("running Promise.sync");
                return "promise from Promise.sync";
            }
        });
        promise.then(new Action<String>() {
            public void call(String value) throws Throwable {
                System.out.println(value);
            }
        });
        System.out.println("after Promise.sync");
    }
    public static void main(String[] args) {
        async();
        sync();
    }
}

运行结果如下:

before Promise constructor
after Promise constructor
before Promise.sync
running Promise constructor
promise from Promise constructor
running Promise.sync
promise from Promise.sync
after Promise.sync

该代码在 Java 8 下,可以简写为:

package hprose.example.promise;

import hprose.util.concurrent.Promise;

public class Exam6 {
    public static void async() {
        System.out.println("before Promise constructor");
        Promise<String> promise = new Promise<>(() -> {
            Thread.sleep(100);
            System.out.println("running Promise constructor");
            return "promise from Promise constructor";
        });
        promise.then((String value) -> {
            System.out.println(value);
        });
        System.out.println("after Promise constructor");
    }
    public static void sync() {
        System.out.println("before Promise.sync");
        Promise<String> promise = Promise.sync(() -> {
            Thread.sleep(100);
            System.out.println("running Promise.sync");
            return "promise from Promise.sync";
        });
        promise.then((String value) -> {
            System.out.println(value);
        });
        System.out.println("after Promise.sync");
    }
    public static void main(String[] args) {
        async();
        sync();
    }
}

从结果我们可以看出,Promise.sync 方法中的 computation 是同步执行的,而 Promise 构造器中的 computation 是异步执行的。另外,我们还可以看出这里的 then 跟 JavaScript 的不一样,JavaScript 的 then 是异步的,而 Java 版本的 then 是同步的。

创建一个延迟 promise 对象

虽然通过 Promise 构造器来创建一个 promise 对象跟使用 Promise.sync 方法来比是异步的,但只是在执行顺序上能看出差别来,但是它并不会让你感到有明显的延时。如果你需要创建一个 promise 对象并且延迟一段时间后再执行 computation 函数,那么你可以使用

Promise.delayed(long duration, TimeUnit timeunit, final Object value)
Promise.delayed(long duration, final Object value)

方法。

delayed 方法的第一个参数 duration 是一个延时时长,第二个参数 timeunit 是延时单位,如果省略默认为毫秒。最后一个参数 value 既可以是一个 Callable 的回调,也可以是一个其它类型的值(包括 promise 对象)。当 value 不是函数时,在延时之后,直接将 value 设置为 promise 对象的值。

例如(为了代码简洁,后面代码一律采用 Java8 的形式):

package hprose.example.promise;

import hprose.util.concurrent.Promise;

public class Exam7 {
    public static void normal() {
        System.out.println(System.currentTimeMillis() + ": before Promise constructor");
        Promise<String> promise = new Promise<>(() -> {
            System.out.println(System.currentTimeMillis() + ": running Promise constructor");
            return "promise from Promise constructor";
        });
        promise.then((String value) -> {
            System.out.println(System.currentTimeMillis() + ": " + value);
        });
        System.out.println(System.currentTimeMillis() + ": after Promise constructor");
    }
    public static void delayed() {
        System.out.println(System.currentTimeMillis() + ": before Promise.delayed");
        Promise<String> promise = Promise.delayed(300, () -> {
            System.out.println(System.currentTimeMillis() + ": running Promise.delayed");
            return "promise from Promise.delayed";
        });
        promise.then((String value) -> {
            System.out.println(System.currentTimeMillis() + ": " + value);
        });
        System.out.println(System.currentTimeMillis() + ": after Promise.delayed");
    }
    public static void main(String[] args) throws InterruptedException {
        delayed();
        normal();
        Thread.sleep(400);
    }
}

该程序运行结果为:

1466322667552: before Promise.delayed
1466322667589: after Promise.delayed
1466322667589: before Promise constructor
1466322667590: running Promise constructor
1466322667590: promise from Promise constructor
1466322667590: after Promise constructor
1466322667890: running Promise.delayed
1466322667890: promise from Promise.delayed

注意,该程序最后加了 400 毫秒延时是为了能够显示最后的结果,否则还等不到输出:

1466322667890: running Promise.delayed
1466322667890: promise from Promise.delayed

程序就结束了。

Promise 的基本方法

then 方法

then 方法是 Promise 的核心和精髓所在。它有两个参数:onfulfill, onreject。这两个参数皆为 Callback 类型(ActionFunc 都是 Callback 的子接口)。当它们为 null 时,它们将会被忽略。当 promise 对象状态为待定(pending)时,这两个回调方法都不会执行,直到 promise 对象的状态变为成功(fulfilled)或失败(rejected)。当 promise 对象状态为成功(fulfilled)时,onfulfill 会被回调,参数值为成功值。当 promise 对象状态为失败(rejected)时,onreject 会被回调,参数值为失败原因。

then 方法的返回值是一个新的 promise 对象,它的值由 onfulfillonreject 的返回值或抛出的异常来决定。如果 onfulfillonreject 在执行过程中没有抛出异常,那么新的 promise 对象的状态为成功(fulfilled),其值为 onfulfillonreject 的返回值。如果这两个回调中抛出了异常,那么新的 promise 对象的状态将被设置为失败(rejected),抛出的异常作为新的 promise 对象的失败原因。

同一个 promise 对象的 then 方法可以被多次调用,其值不会因为调用 then 方法而改变。当 then 方法被多次调用时,所有的 onfulfill, onreject 将按照原始的调用顺序被执行。

因为 then 方法的返回值还是一个 promise 对象,因此可以使用链式调用的方式实现异步编程串行化。

promise 的成功值被设置为另一个 promise 对象(为了区分,将其命名为 promise2)时,then 方法中的两个回调函数得到的参数是 promise2 对象的最终展开值,而不是 promise2 对象本身。当 promise2 的最终展开值为成功值时,onfulfill 会被调用,当 promise2 的最终展开值为失败原因时,onreject 会被调用。

done 方法

then 方法类似,但 done 方法没有返回值,不支持链式调用,因此在 done 方法的回调函数中,通常不会返回值。如果在 done 方法的回调中发生异常,会直接抛出,并且无法被捕获。

catchError 方法

promise.catchError(final Action<Throwable> onreject);
promise.catchError<T>(final Func<T, Throwable> onreject);

该方法是 then(null, onreject) 的简化写法。

promise.catchError(final Action<Throwable> onreject, final Func<Boolean, Throwable> test);
promise.catchError<T>(final Func<T, Throwable> onreject, final Func<Boolean, Throwable> test);

两个参数的 catchError 是一个参数的 catchError 的增强版,第一个参数 onreject 跟一个参数版本的 onreject 参数相同,第二个参数是一个测试函数。例如:

package hprose.example.promise;

import hprose.util.concurrent.Promise;
import hprose.util.concurrent.TypeException;
import java.util.concurrent.TimeoutException;

public class Exam8 {
    public static void main(String[] args) {
        Promise<?> promise = new Promise(() -> {
            throw new TypeException("typeException");
        });
        promise.catchError((Throwable reason) -> {
            return "this is a TimeoutException";
        }, (Throwable reason) -> reason instanceof TimeoutException)
                .catchError((Throwable reason) -> {
            return "this is a TypeException";
        }, (Throwable reason) -> reason instanceof TypeException)
                .then((String value) -> System.out.println(value));
    }
}

该程序运行结果为:

this is a TypeException

fail 方法

该方法是 done(null, onreject) 的简化方法。

resolve 方法

该方法可以将状态为待定(pending)的 promise 对象变为成功(fulfilled)状态。

该方法的参数值可以为任意类型。

该方法已绑定到它所在的 promise 对象,因此可以安全的作为回调函数进行传递。

reject 方法

该方法可以将状态为待定(pending)的 promise 对象变为失败(rejected)状态。

该方法的参数值可以为任意类型。

该方法已绑定到它所在的 promise 对象,因此可以安全的作为回调函数进行传递。

whenComplete 方法

有时候,你不但想要在成功(fulfilled)时执行某段代码,而且在失败(rejected)时也想执行这段代码,那你可以使用 whenComplete 方法。该方法的参数为一个无参回调函数。该方法执行后会返回一个新的 promise 对象,除非在回调函数中抛出异常,否则返回的 promise 对象的值跟原 promise 对象的值相同。

complete 方法

该方法的回调函数 oncomplete 在不论成功还是失败的情况下都会执行,并且支持链式调用。相当于:then(oncomplete, oncomplete) 的简化写法。

always 方法

该方法的回调函数 oncomplete 在不论成功还是失败的情况下都会执行,但不支持链式调用。相当于:done(oncomplete, oncomplete) 的简化写法。

fill 方法

将当前 promise 对象的值充填到参数所表示的 promise 对象中。

Promise 上的静态辅助方法

isPromise 方法

Promise.isPromise(obj)

用来判断是否是 Promise 实例对象。

toPromise 方法

Promise.toPromise(obj)

如果 obj 是一个 Promise 对象,那么直接返回 obj,否则返回 Promise.value(obj)

all 方法

Promise.all(array[, type])

该方法的参数 array 为数组或者值为数组的 promise 对象,最后的可选参数 type 表示返回结果的数组元素类型。该方法返回一个 promise 对象,该 promise 对象会在数组参数内的所有 promise 都被设置为成功(fulfilled)状态时,才被设置为成功(fulfilled)状态,其值为数组参数中所有 promise 对象的最终展开值组成的数组,其数组元素与原数组元素一一对应。

race 方法

Promise.race(array[, type])

该方法返回一个 promise 对象,这个 promise 在数组参数中的任意一个 promise 被设置为成功(fulfilled)或失败(rejected)后,立刻以相同的成功值被设置为成功(fulfilled)或以相同的失败原因被设置为失败(rejected)。

join 方法

Promise.join([arg1[, arg2[, arg3...]]]);

该方法的功能同 all 方法类似,但它与 all 方法的参数不同,我们来举例看一下它们的差别:

package hprose.example.promise;

import hprose.util.concurrent.Promise;

public class Exam9 {
    public static void main(String[] args) {
        Promise<Integer> promise = Promise.value(100);
        Promise.all(new Object[] { true, promise }).then((Object[] values) -> {
            System.out.print(values[0]);
            System.out.println(values[1]);
        });
        Promise.join(true, promise).then((Object[] values) -> {
            System.out.print(values[0]);
            System.out.println(values[1]);
        });
    }
}

输出结果如下:

true100
true100

any 方法

Promise.any(array[, type])

该方法是 race 方法的改进版。

对于 race 方法,如果输入的数组为空,返回的 promise 对象将永远保持为待定(pending)状态。

而对于 any 方法,如果输入的数组为空,返回的 promise 对象将被设置为失败状态,失败原因是一个 IllegalArgumentException 对象。

对于 race 方法,数组参数中的任意一个 promise 被设置为成功(fulfilled)或失败(rejected)后,返回的 promise 对象就会被设定为成功(fulfilled)或失败(rejected)状态。

而对于 any 方法,只有当数组参数中的所有 promise 被设置为失败状态时,返回的 promise 对象才会被设定为失败状态。否则,返回的 promise 对象被设置为第一个被设置为成功(fulfilled)状态的成功值。

run 方法

Promise.run(handler[, arg1[, arg2[, arg3...]]]);

run 方法的作用是异步执行 handler 函数并返回一个包含执行结果的 promise 对象,handler 的参数分别为 arg1, arg2, arg3 ...。参数可以是普通值,也可以是 promise 对象,如果是 promise 对象,则等待其变为成功(fulfilled)状态时再将其成功值代入 handler 函数。如果变为失败(rejected)状态,run 返回的 promise 对象被设置为该失败原因。如果参数中,有多个 promise 对象变为失败(rejected)状态,则第一个变为失败状态的 promise 对象的失败原因被设置为 run 返回的 promise 对象的失败原因。当参数中的 promise 对象都变为成功(fulfilled)状态时,handler 函数才会执行,如果在 handler 执行的过程中,抛出了异常,则该异常作为 run 返回的 promise 对象的失败原因。如果没有异常,则 handler 函数的返回值,作为 run 返回的 promise 对象的成功值。

例如:

package hprose.example.promise;

import hprose.util.concurrent.Promise;

public class Exam10 {
    public static void main(String[] args) throws InterruptedException {
        Promise<Integer> promise = Promise.value(100);
        Promise.run((Object[] values) -> {
            return (Integer)values[0] + (Integer)values[1];
        }, promise, 200).then((value) -> {
            System.out.print(value);
        });
    }
}

输出结果为:

300

forEach 方法

Promise.forEach(callback[, arg1[, arg2[, arg3...]]])
Promise.forEach(array, callback);

该方法有两种形式,功能是遍历数组(或参数)中的每一个元素并执行回调 callback。其中参数 array 可以是一个包含了 promise 元素的数组,也可以是一个包含了数组的 promise 对象。返回值是一个 promise 对象。如果参数数组中的 promise 对象为失败(rejected)状态,则该方法返回的 promise 对象被设置为失败(rejected)状态,且设为相同失败原因。如果在 callback 回调中抛出了异常,则该方法返回的 promise 对象也被设置为失败(rejected)状态,失败原因被设置为抛出的异常值。

例如:

package hprose.example.promise;

import hprose.util.concurrent.Promise;

public class Exam11 {
    public static void main(String[] args) throws InterruptedException {
        Promise.forEach((Integer element) -> {
            System.out.println(element);
        }, 1, Promise.value(2), 3);
        Promise.forEach(new Object[] {1, Promise.value(2), 3},
                (Integer element, int index) -> {
                    System.out.println("a[" + index + "] = " + element);
                    return null;
                }
        );
    }
}

输出结果为:

1
2
3
a[0] = 1
a[1] = 2
a[2] = 3

every 方法

Promise.every(callback[, arg1[, arg2[, arg3...]]])
Promise.every(array, callback);

该方法有两种形式,功能是遍历数组(或参数)中的每一个元素并执行回调 callback,当所有 callback 的返回值都为 true 时,结果为 true,否则为 false。其中参数 array 可以是一个包含了 promise 元素的数组,也可以是一个包含了数组的 promise 对象。返回值是一个 promise 对象。如果参数数组中的 promise 对象为失败(rejected)状态,则该方法返回的 promise 对象被设置为失败(rejected)状态,且设为相同失败原因。如果在 callback 回调中抛出了异常,则该方法返回的 promise 对象也被设置为失败(rejected)状态,失败原因被设置为抛出的异常值。

例如:

package hprose.example.promise;

import hprose.util.concurrent.Promise;

public class Exam12 {
    public static void main(String[] args) throws InterruptedException {
        Promise.every((Integer element) -> element > 10,
                12, Promise.value(5), 8, Promise.value(130), 44)
                .then((Boolean value) -> System.out.println(value));
        Promise.every(new Object[] {12, Promise.value(54), 18, Promise.value(130), 44},
                (Integer element, int index) -> element > 10)
                .then((Boolean value) -> System.out.println(value));
    }
}

运行结果为:

false
true

some 方法

Promise.some(callback[, arg1[, arg2[, arg3...]]])
Promise.some(array, callback);

该方法有两种形式,功能是遍历数组(或参数)中的每一个元素并执行回调 callback,当任意一个 callback 的返回值为 true 时,结果为 true,否则为 false。其中参数 array 可以是一个包含了 promise 元素的数组,也可以是一个包含了数组的 promise 对象。返回值是一个 promise 对象。如果参数数组中的 promise 对象为失败(rejected)状态,则该方法返回的 promise 对象被设置为失败(rejected)状态,且设为相同失败原因。如果在 callback 回调中抛出了异常,则该方法返回的 promise 对象也被设置为失败(rejected)状态,失败原因被设置为抛出的异常值。

例如:

package hprose.example.promise;

import hprose.util.concurrent.Promise;

public class Exam13 {
    public static void main(String[] args) throws InterruptedException {
        Promise.some((Integer element) -> element > 10,
                12, Promise.value(5), 8, Promise.value(130), 44)
                .then((Boolean value) -> System.out.println(value));
        Promise.some(new Object[] {12, Promise.value(54), 18, Promise.value(130), 44},
                (Integer element, int index) -> element > 10)
                .then((Boolean value) -> System.out.println(value));
        Promise.some(new Object[] {1, Promise.value(5), 8, Promise.value(3), 4},
                (Integer element, int index) -> element > 10)
                .then((Boolean value) -> System.out.println(value));
    }
}

运行结果为:

true
true
false

filter 方法

Promise.filter(callback[, arg1[, arg2[, arg3...]]])
Promise.filter(array, callback[, type]);

该方法有两种形式,功能是遍历数组(或参数)中的每一个元素并执行回调 callbackcallback 的返回值为 true 的元素所组成的数组将作为 filter 返回结果的 promise 对象所包含的值。其中参数 array 可以是一个包含了 promise 元素的数组,也可以是一个包含了数组的 promise 对象。返回值是一个 promise 对象。如果参数数组中的 promise 对象为失败(rejected)状态,则该方法返回的 promise 对象被设置为失败(rejected)状态,且设为相同失败原因。如果在 callback 回调中抛出了异常,则该方法返回的 promise 对象也被设置为失败(rejected)状态,失败原因被设置为抛出的异常值。

例如:

package hprose.example.promise;

import hprose.util.concurrent.Promise;
import java.util.Arrays;

public class Exam14 {
    public static void main(String[] args) throws InterruptedException {
        Promise.filter((Integer element) -> element > 10,
                12, Promise.value(5), 8, Promise.value(130), 44)
                .then((Object[] value) -> System.out.println(Arrays.toString(value)));
        Promise.filter(new Object[] {12, Promise.value(54), 18, Promise.value(130), 44},
                (Integer element, int index) -> element > 10, Integer.class)
                .then((Integer[] value) -> System.out.println(Arrays.toString(value)));
        Promise.filter(new Object[] {1, Promise.value(5), 8, Promise.value(3), 4},
                (Integer element, int index) -> element > 10)
                .then((Object[] value) -> System.out.println(Arrays.toString(value)));
    }
}

结果为:

[12, 130, 44]
[12, 54, 18, 130, 44]
[]

map 方法

Promise.map(callback[, arg1[, arg2[, arg3...]]])
Promise.map(array, callback[, type]);

该方法有两种形式,功能是遍历数组(或参数)中的每一个元素并执行回调 callbackcallback 的返回值所组成的数组将作为 map 返回结果的 promise 对象所包含的值。其中参数 array 可以是一个包含了 promise 元素的数组,也可以是一个包含了数组的 promise 对象。返回值是一个 promise 对象。如果参数数组中的 promise 对象为失败(rejected)状态,则该方法返回的 promise 对象被设置为失败(rejected)状态,且设为相同失败原因。如果在 callback 回调中抛出了异常,则该方法返回的 promise 对象也被设置为失败(rejected)状态,失败原因被设置为抛出的异常值。

package hprose.example.promise;

import hprose.util.concurrent.Promise;
import java.util.Arrays;

public class Exam15 {
    public static void main(String[] args) throws InterruptedException {
        Promise.map(Math::sqrt, 1.0, Promise.value(4.0), Promise.value(9.0), 16.0)
                .then((Object[] value) -> System.out.println(Arrays.toString(value)));
        Promise.map(new Object[] {1, Promise.value(2), 3, Promise.value(4), 5},
                (Integer element, int index) -> element * 2, Integer.class)
                .then((Integer[] value) -> System.out.println(Arrays.toString(value)));
        Promise.map(new Object[] {1, Promise.value(2), 3, Promise.value(4), 5},
                (Integer element, int index) -> element * element)
                .then((Object[] value) -> System.out.println(Arrays.toString(value)));
    }
}

执行结果为:

[1.0, 2.0, 3.0, 4.0]
[2, 4, 6, 8, 10]
[1, 4, 9, 16, 25]

reduce 方法

Promise.reduce(array, callback[, initialValue])

该方法的功能是遍历数组(或参数)中的每一个元素,并把前一次执行 callback 的结果(第一次是初始值)和此元素作为参数传入 callback 执行,最后一次执行 callback 的返回值作为 reduce 的返回结果的 promise 对象所包含的值。其中参数 array 可以是一个包含了 promise 元素的数组,也可以是一个包含了数组的 promise 对象。返回值是一个 promise 对象。如果参数数组中的 promise 对象为失败(rejected)状态,则该方法返回的 promise 对象被设置为失败(rejected)状态,且设为相同失败原因。如果在 callback 回调中抛出了异常,则该方法返回的 promise 对象也被设置为失败(rejected)状态,失败原因被设置为抛出的异常值。

initialValue 的值也可以是一个 promise 对象。

例如:

package hprose.example.promise;

import hprose.util.concurrent.Promise;

public class Exam16 {
    public static void main(String[] args) throws InterruptedException {
        Promise.reduce(new Object[] {1, Promise.value(2), 3, Promise.value(4), 5},
                (Integer prev, Integer element, int index) -> prev + element)
                .then((Integer value) -> System.out.println(value));
        Promise.reduce(new Object[] {1, Promise.value(2), 3, Promise.value(4), 5},
                (Integer prev, Integer element, int index) -> prev * element, 10)
                .then((Integer value) -> System.out.println(value));
    }
}

该程序运行结果为:

15
1200

reduceRight 方法

Promise.reduceRight(array, callback[, initialValue])

该方法的功能跟 reduce 方法类似,只是遍历顺序不同,reduceRight 是从右向左遍历数组的。

initialValue 的值也可以是一个 promise 对象。

例如:

package hprose.example.promise;

import hprose.util.concurrent.Promise;

public class Exam17 {
    public static void main(String[] args) throws InterruptedException {
        Promise.reduce(new Object[] {1, Promise.value(2), 3, Promise.value(4), 5},
                (Integer prev, Integer element, int index) -> prev - element)
                .then((Integer value) -> System.out.println(value));
        Promise.reduceRight(new Object[] {1, Promise.value(2), 3, Promise.value(4), 5},
                (Integer prev, Integer element, int index) -> prev - element)
                .then((Integer value) -> System.out.println(value));
    }
}

该程序执行结果为:

-13
-5

Promise 上的辅助方法

timeout 方法

promise.timeout(duration, timeunit, reason)
promise.timeout(duration, reason)
promise.timeout(duration)

创建一个新的 promise 对象,当超过设定的时间 duration(默认单位 timeunit 是毫秒),源 promise 对象如果还未被设置为成功(fulfilled)或失败(rejected),则新的 promise 对象被设置为一个 TimeoutException 或者自定义的 reason。否则,其值跟源 promise 相同。

package hprose.example.promise;

import hprose.util.concurrent.Promise;

public class Exam18 {
    public static void main(String[] args) throws InterruptedException {
        Promise<Integer> p1 = Promise.delayed(200, 2).timeout(300);
        Promise<Integer> p2 = Promise.delayed(500, 5).timeout(300);
        p1.then((Integer i) -> System.out.println(i),
                (Throwable e) -> System.out.println(e));
        p2.then((Integer i) -> System.out.println(i),
                (Throwable e) -> System.out.println(e));
        Thread.sleep(500);
    }
}

该程序运行结果为:

2
java.util.concurrent.TimeoutException: timeout

delay 方法

promise.delay(duration[, timeunit])

创建一个新的 promise 对象,当经过时间 duration(默认单位 timeunit 是毫秒)后,该 promise 对象的值将被设置为跟源 promise 对象相同的成功值。如果源 promise 对象被设置为失败(rejected)状态,新的 promise 对象将立即被设为相同的失败原因,而无等待。

例如:

package hprose.example.promise;

import hprose.util.concurrent.Promise;

public class Exam19 {
    public static void main(String[] args) throws InterruptedException {
        Promise<Integer> p1 = Promise.value(2).delay(200).timeout(300);
        Promise<Integer> p2 = Promise.value(5).delay(500).timeout(300);
        p1.then((Integer i) -> System.out.println(i),
                (Throwable e) -> System.out.println(e));
        p2.then((Integer i) -> System.out.println(i),
                (Throwable e) -> System.out.println(e));
        Thread.sleep(500);
    }
}

该程序运行结果为:

2
java.util.concurrent.TimeoutException: timeout

tap 方法

promise.tap(onfulfilledSideEffect)

以下两种写法是等价的:

promise.then(new Func<V, V>() {
    public V call(V value) throws Throwable {
        onfulfilledSideEffect.call(value);
        return value;
    }
});

promise.tap(onfulfilledSideEffect);

toFuture 方法

promise.toFuture();

promise 对象转换为 Future 对象,返回的是一个 PromiseFuture 实例。

getState 方法

promise.getState();

返回 promsie 对象的状态值。

getValue 方法

promise.getValue();

返回 promise 对象的值,只有 promise 的状态为 FULFILLED 时,getValue 才有有效值,但是该值也可能是一个 promise 对象。因此,最好不要使用该方法,要同步获取结果,使用 toFuture 方法更好。

getReason 方法

promise.getReason();

返回 promise 对象的失败原因,只有 promise 的状态为 REJECTED 时,getReason 才有有效值。

all 方法

promise.all([type]);

该方法与静态方法 all 功能一致,promise 对象本身即为要操作的数组。

race 方法

promise.race([type]);

该方法与静态方法 race 功能一致,promise 对象本身即为要操作的数组。

any 方法

promise.any([type]);

该方法与静态方法 any 功能一致,promise 对象本身即为要操作的数组。

forEach 方法

promise.forEach(callback);

该方法与静态方法 forEach 功能一致,promise 对象本身即为要操作的数组。

every 方法

promise.every(callback);

该方法与静态方法 every 功能一致,promise 对象本身即为要操作的数组。

some 方法

promise.some(callback);

该方法与静态方法 some 功能一致,promise 对象本身即为要操作的数组。

filter 方法

promise.filter(callback[, type]);

该方法与静态方法 filter 功能一致,promise 对象本身即为要操作的数组。

map 方法

promise.map(callback[, type]);

该方法与静态方法 map 功能一致,promise 对象本身即为要操作的数组。

reduce 方法

promise.reduce(callback[, initialValue]);

该方法与静态方法 reduce 功能一致,promise 对象本身即为要操作的数组。

reduceRight 方法

promise.reduceRight(callback[, initialValue]);

该方法与静态方法 reduceRight 功能一致,promise 对象本身即为要操作的数组。

Clone this wiki locally