Skip to content

异步编程——Promise

小马哥 edited this page Jun 21, 2016 · 19 revisions
Promises/A+ logo

概述

Java 提供了一套 Future 异步模型,但是在 Java 8 之前只能通过 Get 方法来同步获取结果,而不能通过回调方式来异步获取结果。Java 8 增加了 CompletableFuture,可以通过回调来获取结果,这种就是 Promise 异步模型。但是仅能在 Java 8 中才能使用。

当然 Guava、Netty 4+ 等 Java 框架也提供了 Promise 异步模型的实现。但是最低支持的 JDK 版本是 Java 6,对 Java 5 并不支持。

Hprose 为了更好的实现异步服务和异步调用,也提供了一套 Promise 异步模型实现。它基本上是参照 Promise/A+(中文版) 规范实现的。

Hprose for Java 的 Promise 实现在 hprose.util.concurrent 包中,该包中包含有以下接口:

  • Callback
  • Action
  • Func
  • Thenable
  • Rejector
  • Resolver
  • Executor
  • Handler
  • Reducer

类:

  • Promise
  • PromiseFuture
  • Threads
  • Timer
  • TypeException

另外还有一个枚举类型:

  • State

其中,Callback 是一个空接口,它是 ActionFunc 的父接口,Action 表示一个有参数无返回值的回调函数,Func 表示一个有参数有返回值的回调函数。

Thenable 接口包含有一个 then 方法,Promise 类是 Thenable 接口的一个实现。

Resolver 接口包含有一个 resolve 方法,Rejector 接口包含有一个 reject 方法,Executor 接口包含有一个 exec 方法,该方法的两个参数分别是 ResolverRejector 接口的对象,Promise 类是 ResolverRejector 接口的一个实现,并且包含有一个 Executor 参数的构造器。在介绍 Promise 构造器时会举例介绍该接口的使用方法。

Handler 接口是 Promise 类中的 forEacheverysomefiltermap 方法的回调参数类型。

Reducer 接口是 Promise 类中的 reducereduceRight 方法的回调参数类型。

Promise 类是 Promise 异步模型的主要实现类,后面将对该类进行详细的介绍。

PromiseFuture 类是对 Promise 类的一个 Future 接口包装。

Threads 是一个静态类,提供了一些关于线程管理的静态帮助方法,比如:获取所有线程,获取主线程,获取根线程组,注册 JVM 退出时回调函数并在主线程推出时结束整个程序。该类主要用于 Hprose 内部使用,用户可以不用关心该类,所以本文不对该类做详细介绍。

Timer 是一个计时器类,也是用于 Hprose 内部使用,本文也不对该类做详细介绍。

TypeException 是一个异常类,当 Promise 对象以它自己作为任务结束结果时,抛出该异常。

State 表示 Promise 对象的状态,包含有结果待定(PENDING),已接受(FULFILLED),已拒绝(REJECTED)三种状态。

Promise 类

简介

该类是一个泛型类,其泛型参数表示其接受的结果类型。它是 Promise/A+(中文版) 规范的一个实现,不过也有略微不同之处:

Promise/A+ 规范中,then 方法的 onFulfilledonRejected 回调方法是异步执行的,且在 then 方法被调用的那一轮事件循环之后的新执行栈中执行。而在 Hprose for Java 的实现中,为了减少线程切换引起的不必要的性能损失,这部分并未严格遵守该规范。当执行 then 方法时,如果任务尚未完成,onFulfilledonRejected 回调方法会在结束任务的线程中执行,如果任务已完成,则在执行 then 方法时,立即执行。

除了规范中的 then 方法以外,Promise 类还提供了许多方法,下面将详细介绍。

hprose 中提供了多种方法来创建 Promise 对象。除了可以使用构造器创建之外,还可以通过一些 Promise 类上的静态方法来创建。

创建 Promise 对象

使用 Promise 构造器

创建一个待定(pending)状态 promise 对象

Promise<T> promise = new Promise<T>();

无参构造函数创建一个结果待定的 Promise 对象,其泛型参数表示其能够接受的结果类型,其泛型参数可以省略。可以将来通过 resolve 方法来设定其成功值,或通过 reject 方法来设定其失败原因。

创建一个成功(fulfilled)状态的 promise 对象

package hprose.example.promise;

import hprose.util.concurrent.Action;
import hprose.util.concurrent.Promise;
import java.util.concurrent.Callable;

public class Exam1 {
    public static void main(String[] args) {
        Promise<String> promise = new Promise<String>(new Callable<String>() {
            public String call() throws Exception {
                return "hprose";
            }
        });
        promise.then(new Action<String>() {
            public void call(String value) throws Throwable {
                System.out.println(value);
            }
        });
    }
}

promise 对象中已经包含了成功值,可以使用 then 方法来得到它。运行结果为:

hprose

该代码在 Java 8 下,可以简写为:

package hprose.example.promise;

import hprose.util.concurrent.Promise;

public class Exam1 {
    public static void main(String[] args) {
        Promise<String> promise = new Promise<>(() -> "hprose");
        promise.then((String value) -> System.out.println(value));
    }
}

创建一个失败(rejected)状态的 promise 对象

package hprose.example.promise;

import hprose.util.concurrent.Action;
import hprose.util.concurrent.Promise;
import java.util.concurrent.Callable;

public class Exam2 {
    public static void main(String[] args) {
        Promise<?> promise = new Promise(new Callable() {
            public Object call() throws Exception {
                throw new Exception("hprose");
            }
        });
        promise.catchError(new Action<Throwable>() {
            public void call(Throwable value) throws Throwable {
                System.out.println(value);
            }
        });
    }
}

promise 对象中已经包含了失败值,可以使用 catchError 方法来得到它。该程序运行结果为:

java.lang.Exception: hprose

该代码在 Java 8 下,可以简写为:

package hprose.example.promise;

import hprose.util.concurrent.Promise;

public class Exam2 {
    public static void main(String[] args) {
        Promise<?> promise = new Promise(() -> {
            throw new Exception("hprose");
        });
        promise.catchError((Throwable value) -> {
            System.out.println(value);
        });
    }
}

通过 executor 回调来创建 promise 对象

new Promise(executor);
new Promise((Resolver<V> resolver, Rejector rejector) -> { ... });

executor 是一个 Executor 接口的实现,它只有一个 exec 方法,该方法有两个参数:

  • Resolver resolver: 当接受结果时,调用该对象上的 resolve 方法。
  • Rejector rejector: 当拒绝结果时,调用该对象上的 rejector 方法。

例如:

package hprose.example.promise;

import hprose.util.concurrent.Action;
import hprose.util.concurrent.Executor;
import hprose.util.concurrent.Promise;
import hprose.util.concurrent.Rejector;
import hprose.util.concurrent.Resolver;

public class Exam3 {
    public static void main(String[] args) {
        Promise<Integer> promise = new Promise<Integer>(new Executor<Integer>() {
            @Override
            public void exec(Resolver<Integer> resolver, Rejector rejector) {
                resolver.resolve(100);
            }
        });
        promise.then(new Action<Integer>() {
            public void call(Integer value) throws Throwable {
                System.out.println(value);
            }
        });
    }
}

该程序运行结果为:

100

该代码在 Java 8 下,可以简写为:

package hprose.example.promise;

import hprose.util.concurrent.Promise;
import hprose.util.concurrent.Rejector;
import hprose.util.concurrent.Resolver;

public class Exam3 {
    public static void main(String[] args) {
        Promise<Integer> promise = new Promise<>((Resolver<Integer> resolver, Rejector rejector) -> {
            resolver.resolve(100);
        });
        promise.then((Integer value) -> {
            System.out.println(value);
        });
    }
}

使用 Promise 上的工厂方法

Promise 上提供了 4 个工厂方法,它们分别是:

  • value
  • error
  • sync
  • delayed

创建一个成功(fulfilled)状态的 promise 对象

package hprose.example.promise;

import hprose.util.concurrent.Action;
import hprose.util.concurrent.Promise;

public class Exam4 {
    public static void main(String[] args) {
        Promise<String> promise = Promise.value("hprose");
        promise.then(new Action<String>() {
            public void call(String value) throws Throwable {
                System.out.println(value);
            }
        });
    }
}

使用 value 来创建一个成功(fulfilled)状态的 promise 对象效果跟前面用 Promise 构造器创建的效果一样,但是写起来更加简单,不再需要把结果放入一个函数中作为返回值返回了。

该代码在 Java 8 下,可以简写为:

package hprose.example.promise;

import hprose.util.concurrent.Promise;

public class Exam4 {
    public static void main(String[] args) {
        Promise<String> promise = Promise.value("hprose");
        promise.then((String value) -> {
            System.out.println(value);
        });
    }
}

创建一个失败(rejected)状态的 promise 对象

package hprose.example.promise;

import hprose.util.concurrent.Action;
import hprose.util.concurrent.Promise;

public class Exam5 {
    public static void main(String[] args) {
        Promise<?> promise = Promise.error(new Exception("hprose"));
        promise.catchError(new Action<Throwable>() {
            public void call(Throwable value) throws Throwable {
                System.out.println(value);
            }
        });
    }
}

使用 error来创建一个失败(rejected)状态的 promise 对象效果跟前面用 Promise 构造器创建的效果也一样,但是写起来也更加简单,不再需要把失败原因放入一个函数中作为异常抛出了。

该代码在 Java 8 下,可以简写为:

package hprose.example.promise;

import hprose.util.concurrent.Promise;

public class Exam5 {
    public static void main(String[] args) {
        Promise<?> promise = Promise.error(new Exception("hprose"));
        promise.catchError((Throwable value) -> {
            System.out.println(value);
        });
    }
}

同步创建一个 promise 对象

Promise 上提供了一个:

Promise.sync(computation)

方法可以让我们同步的创建一个 promise 对象。

这里“同步”的意思是指 computation 的执行是同步执行的。而通过 Promise 构造器创建 promise 对象时,computation 是异步执行的。为了可以更好地理解这一点,我们来看一个具体的例子:

package hprose.example.promise;

import hprose.util.concurrent.Action;
import hprose.util.concurrent.Promise;
import java.util.concurrent.Callable;

public class Exam6 {
    public static void async() {
        System.out.println("before Promise constructor");
        Promise<String> promise = new Promise<String>(new Callable<String>() {
            public String call() throws Exception {
                Thread.sleep(100);
                System.out.println("running Promise constructor");
                return "promise from Promise constructor";
            }
        });
        promise.then(new Action<String>() {
            public void call(String value) throws Throwable {
                System.out.println(value);
            }
        });
        System.out.println("after Promise constructor");
    }
    public static void sync() {
        System.out.println("before Promise.sync");
        Promise<String> promise = Promise.sync(new Callable<String>() {
            public String call() throws Exception {
                Thread.sleep(100);
                System.out.println("running Promise.sync");
                return "promise from Promise.sync";
            }
        });
        promise.then(new Action<String>() {
            public void call(String value) throws Throwable {
                System.out.println(value);
            }
        });
        System.out.println("after Promise.sync");
    }
    public static void main(String[] args) {
        async();
        sync();
    }
}

运行结果如下:

before Promise constructor
after Promise constructor
before Promise.sync
running Promise constructor
promise from Promise constructor
running Promise.sync
promise from Promise.sync
after Promise.sync

该代码在 Java 8 下,可以简写为:

package hprose.example.promise;

import hprose.util.concurrent.Promise;

public class Exam6 {
    public static void async() {
        System.out.println("before Promise constructor");
        Promise<String> promise = new Promise<>(() -> {
            Thread.sleep(100);
            System.out.println("running Promise constructor");
            return "promise from Promise constructor";
        });
        promise.then((String value) -> {
            System.out.println(value);
        });
        System.out.println("after Promise constructor");
    }
    public static void sync() {
        System.out.println("before Promise.sync");
        Promise<String> promise = Promise.sync(() -> {
            Thread.sleep(100);
            System.out.println("running Promise.sync");
            return "promise from Promise.sync";
        });
        promise.then((String value) -> {
            System.out.println(value);
        });
        System.out.println("after Promise.sync");
    }
    public static void main(String[] args) {
        async();
        sync();
    }
}

从结果我们可以看出,Promise.sync 方法中的 computation 是同步执行的,而 Promise 构造器中的 computation 是异步执行的。另外,我们还可以看出这里的 then 跟 JavaScript 的不一样,JavaScript 的 then 是异步的,而 Java 版本的 then 是同步的。

创建一个延迟 promise 对象

虽然通过 Promise 构造器来创建一个 promise 对象跟使用 Promise.sync 方法来比是异步的,但只是在执行顺序上能看出差别来,但是它并不会让你感到有明显的延时。如果你需要创建一个 promise 对象并且延迟一段时间后再执行 computation 函数,那么你可以使用

Promise.delayed(long duration, TimeUnit timeunit, final Object value)
Promise.delayed(long duration, final Object value)

方法。

delayed 方法的第一个参数 duration 是一个延时时长,第二个参数 timeunit 是延时单位,如果省略默认为毫秒。最后一个参数 value 既可以是一个 Callable 的回调,也可以是一个其它类型的值(包括 promise 对象)。当 value 不是函数时,在延时之后,直接将 value 设置为 promise 对象的值。

例如(为了代码简洁,后面代码一律采用 Java8 的形式):

package hprose.example.promise;

import hprose.util.concurrent.Promise;

public class Exam7 {
    public static void normal() {
        System.out.println(System.currentTimeMillis() + ": before Promise constructor");
        Promise<String> promise = new Promise<>(() -> {
            System.out.println(System.currentTimeMillis() + ": running Promise constructor");
            return "promise from Promise constructor";
        });
        promise.then((String value) -> {
            System.out.println(System.currentTimeMillis() + ": " + value);
        });
        System.out.println(System.currentTimeMillis() + ": after Promise constructor");
    }
    public static void delayed() {
        System.out.println(System.currentTimeMillis() + ": before Promise.delayed");
        Promise<String> promise = Promise.delayed(300, () -> {
            System.out.println(System.currentTimeMillis() + ": running Promise.delayed");
            return "promise from Promise.delayed";
        });
        promise.then((String value) -> {
            System.out.println(System.currentTimeMillis() + ": " + value);
        });
        System.out.println(System.currentTimeMillis() + ": after Promise.delayed");
    }
    public static void main(String[] args) throws InterruptedException {
        delayed();
        normal();
        Thread.sleep(400);
    }
}

该程序运行结果为:

1466322667552: before Promise.delayed
1466322667589: after Promise.delayed
1466322667589: before Promise constructor
1466322667590: running Promise constructor
1466322667590: promise from Promise constructor
1466322667590: after Promise constructor
1466322667890: running Promise.delayed
1466322667890: promise from Promise.delayed

注意,该程序最后加了 400 毫秒延时是为了能够显示最后的结果,否则还等不到输出:

1466322667890: running Promise.delayed
1466322667890: promise from Promise.delayed

程序就结束了。

Clone this wiki locally