Skip to content

Latest commit

 

History

History
859 lines (650 loc) · 49.7 KB

File metadata and controls

859 lines (650 loc) · 49.7 KB

🦝 CompletableFuture-Fu(CF-Fu)

Fast Build CI Strong Build CI Codecov Qodana Code Inspections Java support License Javadocs Maven Central GitHub Releases GitHub Stars GitHub Forks GitHub Issues GitHub Contributors GitHub repo size gitpod: Ready to Code

📖 English Documentation | 📖 中文文档


👉 cffuCompletableFuture-Fu 🦝)是一个小小的CompletableFuture(CF)辅助增强库,提升CF使用体验并减少误用,在业务中更方便高效安全地使用CF。😋🚀🦺

欢迎 👏 💖

shifu



🔧 功能

☘️ 补全应用开发中缺失的功能

  • 🏪 更方便的功能,如
    • 支持返回多个输入CF的运行结果,而不是返回CF<Void>没有包含输入CF的结果(CompletableFuture#allOf
      • 如方法allResultsFailFastOf / mSupplyFailFastAsync / thenMApplyMostSuccessAsync
    • 支持直接运行多个Action,而不是要先包装成CompletableFuture
      • 如方法mSupplyAsync / mRunFailFastAsync / thenMApplyAllSuccessAsync
      • 即 多指令单数据(MISD)风格处理
    • 支持异步并行处理集合数据,而不是先包装数据与ActionCompletableFuture
      • 如方法CfParallelUtils#parApplyFailFastAsync / CfParallelUtils#thenParAcceptAnySuccessAsync
      • 即 多指令单数据(MISD)风格处理
    • 支持输入CFAction的集合,而不是先转换集合成数据类型
      • 如方法CfIterableUtils#allResultsFailFastOf / CfIterableUtils#mSupplyFailFastAsync / CfIterableUtils#thenMApplyMostSuccessAsync
    • 支持设置缺省的业务线程池,CffuFactory#builder(executor)方法,而不是在异步执行时反复传入业务线程池参数
    • 支持处理指定异常类型的catching方法,而不是处理所有异常ThrowableCompletableFuture#exceptionally
  • 🚦 更高效灵活的并发执行策略,如
    • AllFailFast策略:当输入的多个CF有失败时快速失败返回,而不再于事无补地等待所有CF运行完成(CompletableFuture#allOf
    • AnySuccess策略:返回首个成功的CF结果,而不是首个完成但可能失败的CFCompletableFuture#anyOf
    • AllSuccess策略:返回多个CF中成功的结果,对于失败的CF返回指定的缺省值
    • MostSuccess策略:指定时间内返回多个CF中成功的结果,对于失败或超时的CF返回指定的缺省值
    • All(Complete) / Any(Complete)策略:这2个是CompletableFuture已有支持的策略
  • 🦺 更安全的使用方式,如
    • 超时执行安全的orTimeout / completeOnTimeout方法新实现
      • CF#orTimeout / CF#completeOnTimeout方法会导致CF的超时与延迟执行基础功能失效❗️
    • 一定不会修改CF结果的peek处理方法
      • whenComplete方法可能会修改CF的结果,返回CF的结果与输入并不一定一致
    • 支持超时的join(timeout, unit)方法
    • 支持禁止强制篡改,CffuFactoryBuilder#forbidObtrudeMethods方法
    • 在类方法附加完善的代码质量注解,在编码时IDE能尽早提示出问题
      • @NonNull@Nullable@CheckReturnValue@Contract
  • 🧩 缺失的基本功能,除了上面面向安全而新实现的方法,还有
    • 异步异常完成,completeExceptionallyAsync方法
    • 非阻塞地获取成功结果,对于失败的或还在运行中的CF则返回指定的缺省值,getSuccessNow方法
    • 解包装CF异常成业务异常,unwrapCfException方法

Backport支持Java 8Java 9+高版本的所有CF新功能方法在Java 8低版本直接可用,如

  • 超时控制:orTimeout / completeOnTimeout
  • 延迟执行:delayedExecutor
  • 工厂方法:failedFuture / completedStage / failedStage
  • 处理操作:completeAsync / exceptionallyAsync / exceptionallyCompose / copy
  • 非阻塞读:resultNow / exceptionNow / state

💪 已有功能的增强,如

  • anyOf方法:返回具体类型T(类型安全),而不是返回ObjectCompletableFuture#anyOf
  • allOf / anyOf方法:输入更宽泛的CompletionStage参数类型,而不是CompletableFuture类(CompletableFuture#allOf/anyOf

更多cffu功能及其使用方式的说明参见 cffu功能介绍

关于CompletableFuture

如何管理并发执行是个复杂易错的问题,业界有大量的工具、框架可以采用。

并发工具、框架的广度了解,可以看看如《七周七并发模型》、《Java虚拟机并发编程》、《Scala并发编程(第2版)》;更多关于并发主题的书籍参见书单

其中CompletableFuture(CF)有其优点:

  • 广为人知广泛使用,有一流的群众基础
    • CompletableFuture在2014年发布的Java 8提供,有10年了
    • CompletableFuture的父接口Future早在2004年发布的Java 5中提供,有20年了。虽然Future接口不支持运行结果的异步获取与并发执行逻辑的编排,但也让广大Java开发者熟悉了Future这个典型的概念与工具
  • 功能强大、但不会非常庞大复杂
    • 足以应对日常业务的异步并发需求
    • 其它大型并发框架(比如AkkaRxJava)在使用上需要理解的内容要多很多。当然基本的并发关注方面及其复杂性,与具体使用哪个工具无关,都是要理解与注意的
  • 高层抽象
  • Java标准库内置
    • 无需额外依赖,几乎总是可用
    • 相信有极高的实现质量

与其它并发工具、框架一样,CompletableFuture用于

  • 并发执行业务逻辑,或说编排并发处理流程或异步任务
  • 多核并行处理,充分利用资源
  • 缩短请求响应时间,提升业务响应性

值得更深入地了解和应用。 💕

👥 User Guide

1. cffu的使用方式

  • 🦝 使用Cffu
  • 🔧 使用CompletableFutureUtils工具类

1.1 推荐Cffu类的使用方式 🌟

相比调用CompletableFutureUtils工具类的静态方法,

  • 使用Cffu类就像使用CompletableFuture一样,新功能作为Cffu类的实例方法,可以自然方便地调用
    • Cffu类之于CompletableFuture的工具类CompletableFutureUtils,就像GuavaFluentFuture之于ListenableFuture的工具类Futures
  • Java语言不支持在已有类(CompletableFuture)上扩展方法,所以需要一个新的包装类(Cffu

如果你不想在项目中引入新类(Cffu类)、觉得这样增加了复杂性的话,完全可以将cffu库作为一个工具类来用:

  • 优化CompletableFuture使用的工具方法在业务项目中很常见
  • CompletableFutureUtils提供了一系列实用高效安全可靠的工具方法
  • 这种使用方式有些cffu功能没有提供(也没有想到合适的实现方案)
    如支持设置缺省的业务线程池、禁止强制篡改

1.2 迁移使用CompletableFuture类的代码到使用Cffu

为了方便自然地使用cffu库的增强功能与方法,可以迁移使用CompletableFuture类的已有代码到Cffu类。

1) 如果可以修改使用CompletableFuture的代码

迁移到Cffu类,包含2步简单的修改:

  • 在类型声明地方,将CompletableFuture类改成Cffu
  • CompletableFuture静态方法调用的地方,将类名CompletableFuture改成cffuFactory实例

之所以可以这样迁移,是因为:

  • CompletableFuture类的所有实例方法都在Cffu类中有实现,且有相同的方法签名与功能
  • CompletableFuture类的所有静态方法都在CffuFactory类中有实现,且有相同的方法签名与功能

2) 如果不能修改使用CompletableFuture的代码(如在外部库中返回的CF

使用CffuFactory.toCffu(CompletionStage)方法,将CompletableFutureCompletionStage转换成Cffu类型。

1.3 库依赖(包含CompletableFutureUtils工具类)

  • For Maven projects:

    <dependency>
      <groupId>io.foldright</groupId>
      <artifactId>cffu2</artifactId>
      <version>2.0.7</version>
    </dependency>
  • For Gradle projects:

    Gradle Kotlin DSL

    implementation("io.foldright:cffu2:2.0.7")

    Gradle Groovy DSL

    implementation 'io.foldright:cffu2:2.0.7'

2. cffu功能介绍

2.1 支持返回多个输入CF的整体运行结果

CompletableFutureallOf方法的返回类型是CF<Void>,没有包含输入CF的结果。为了获取输入CF的运行结果,需要:

  • allOf方法之后再通过入参CF的读操作(如join / get)来获取结果
  • 或是在传入的CompletableFuture Action中设置外部的变量
    • 需要注意多线程读写的线程安全问题 ⚠️🔀
      多线程读写涉及多线程数据传递的复杂性,遗漏并发逻辑的数据读写的正确处理是业务代码中的常见问题❗️
    • 并发深坑勿入,并发逻辑复杂易出Bug 🐞
      如果涉及超时则会更复杂,JDK CompletableFuture自身在Java 21中也有这方面的Bug修复

cffuallResultsFailFastOf / mSupplyFailFastAsync / thenMApplyMostSuccessAsync等方法提供了返回输入CF结果的功能。使用这些方法获取输入CF的整体运行结果:

  • 方便直接
  • 因为返回的是有整体结果的CF,可以继续串接非阻塞的操作,所以自然减少了阻塞读方法(如join / get)的使用,尽量降低业务逻辑的死锁风险
  • 规避了在业务逻辑中直接实现多线程读写逻辑的复杂线程安全问题与逻辑错误
  • 使用「可靠实现与测试的」库所提供的并发功能而不是去直接实现 是 最佳实践 🏆✅

示例代码如下:

public class AllResultsOfDemo {
  public static void main(String[] args) throws Exception {
    CompletableFuture<Integer> cf1 = CompletableFuture.completedFuture(21);
    CompletableFuture<Integer> cf2 = CompletableFuture.completedFuture(42);

    CompletableFuture<Void> all2 = CompletableFuture.allOf(cf1, cf2);
    // result type is Void!
    //
    // the result can be got by input argument `cf1.get()`, but it's cumbersome.
    // so we can see a lot of util methods to enhance `allOf` with the results in our project.

    CompletableFuture<List<Integer>> allResults2 = CompletableFutureUtils.allResultsOf(cf1, cf2);
    System.out.println(allResults2.get());
    // output: [21, 42]
  }
}

# 完整可运行的Demo代码参见AllResultsOfDemo.java

2.2 获取多个CF的所有结果,支持快速失败,而不是做于事无补的等待降低了业务响应性

CompletableFutureallOf方法会等待所有输入CF运行完成;即使有CF失败了也要等待后续CF都运行完成,再返回一个失败的CF

对于业务逻辑来说,这样失败且继续等待的策略(AllComplete),降低了业务响应性。

业务需要的是,当有输入CF失败了则快速失败不再做于事无补的等待(AllFailFast)。

  • cffu提供了相应的allResultsFailFastOf等方法,支持AllFailFast并发执行策略
  • AllFailFast并发执行策略是异步任务编排中最有用常用的模式
  • AllFailFast / AllComplete两者都是,仅当所有的输入都成功时,才返回成功的结果

更多说明可以看看文章CompletableFuture如何实现异步任务编排中最常用的模式 —— 快速失败

示例代码如下:

public class AllFastFailDemo {
  public static void main(String[] args) throws Exception {
    CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
      // a simulating long-running computation...
      sleep(2_000);
      return 42;
    });
    CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {
      // a simulating fast-failure computation...
      sleep(1);
      throw new RuntimeException();
    });

    CompletableFuture<List<Integer>> allResultsFailFastCf = CompletableFutureUtils.allResultsFailFastOf(cf1, cf2);
    // fail-fast without waiting long-running cf1
    try {
      allResultsFailFastCf.join();
    } catch (Exception e) {
      System.out.println(e);
    }
    // output: RuntimeException

    CompletableFuture<List<Integer>> allCf = CompletableFutureUtils.allResultsOf(cf1, cf2);
    // same failure result as allResultsFailFastCf but waiting long-running cf1...
    try {
      allCf.join();
    } catch (Exception e) {
      System.out.println(e);
    }
    // output: RuntimeException
  }
}

# 完整可运行的Demo代码参见AllFastFailDemo.java

2.3 获取多个CF的任一结果,支持首个成功的CF结果,而不是首个完成但失败的CF

CompletableFutureanyOf方法返回首个完成的CF,不会等待后续没有完成的CF;即使首个完成的CF是失败的,也会返回这个失败的CF结果。

业务逻辑一般需要的是首个成功的CF结果(AnySuccess),而不是首个完成但失败的CFAnyComplete)。

  • AnySuccess是异步任务编排中最有用常用的模式
  • cffu提供了相应的anySuccessOf等方法,支持AnySuccess并发执行策略
  • anySuccessOf只有当所有的输入CF都失败时,才返回失败结果

示例代码如下:

public class AnySuccessDemo {
  public static void main(String[] args) throws Exception {
    CompletableFuture<Integer> cf1 = CompletableFutureUtils.failedFuture(new RuntimeException());
    CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {
      // a simulating long-running computation...
      sleep(2_000);
      return 42;
    });

    CompletableFuture<Integer> anyCf = CompletableFutureUtils.anyOf(cf1, cf2);
    // first completed CF: cf1(but failed)
    try {
      anyCf.join();
    } catch (Exception e) {
      System.out.println(e);
    }
    // output: RuntimeException

    CompletableFuture<Integer> anySuccessCf = CompletableFutureUtils.anySuccessOf(cf1, cf2);
    // first completed and successful: cf2
    System.out.println(anySuccessCf.get());
    // output: 42
  }
}

# 完整可运行的Demo代码参见AnySuccessDemo.java

2.4 支持设置缺省的业务线程池

CompletableFuture异步执行(即*Async方法)使用的缺省线程池是ForkJoinPool.commonPool();业务中使用这个缺省线程池是很危险的❗

  • ForkJoinPool.commonPool()差不多是CPU个线程,合适执行CPU密集的任务;对于业务逻辑,往往有很多等待操作(如网络IO、阻塞等待)并不是CPU密集的,导致业务处理能力低下 🐌
  • ForkJoinPool使用的是无界队列;当大流量时任务会堆积,导致内存耗尽服务崩溃 🚨
    关于这个问题及原因的更多说明可以看看这篇文章

结果就是,在业务逻辑中,调用CompletableFuture*Async方法时,几乎每次都要反复传入指定的业务线程池;这让CompletableFuture的使用很繁琐易错 🤯❌

另外,当在底层逻辑底层操作回调业务时(如RPC回调),不合适或方便为业务提供线程池;使用Cffu设置缺省的上层业务指定的线程池既方便又合理安全。
这个使用场景的更多说明可以看看CompletableFuture原理与实践 - 4.2.3 异步RPC调用注意不要阻塞IO线程池

示例代码如下:

public class NoDefaultExecutorSettingForCompletableFuture {
  public static final Executor myBizExecutor = Executors.newCachedThreadPool();

  public static void main(String[] args) {
    CompletableFuture<Void> cf1 = CompletableFuture.runAsync(
        () -> System.out.println("doing a long time work!"),
        myBizExecutor);

    CompletableFuture<Void> cf2 = CompletableFuture
        .supplyAsync(
            () -> {
              System.out.println("doing another long time work!");
              return 42;
            },
            myBizExecutor)
        .thenAcceptAsync(
            i -> System.out.println("doing third long time work!"),
            myBizExecutor);

    CompletableFuture.allOf(cf1, cf2).join();
  }
}

# 完整可运行的Demo代码参见NoDefaultExecutorSettingForCompletableFuture.java

Cffu类支持设置缺省的业务线程池,规避上面的繁琐与危险。示例代码如下:

public class DefaultExecutorSettingForCffu {
  private static final ExecutorService myBizExecutor = Executors.newCachedThreadPool();
  private static final CffuFactory cffuFactory = CffuFactory.builder(myBizExecutor).build();

  public static void main(String[] args) {
    Cffu<Void> cf1 = cffuFactory.runAsync(() -> System.out.println("doing a long time work!"));

    Cffu<Void> cf2 = cffuFactory.supplyAsync(() -> {
      System.out.println("doing another long time work!");
      return 42;
    }).thenAcceptAsync(i -> System.out.println("doing third long time work!"));

    cffuFactory.allOf(cf1, cf2).join();
  }
}

# 完整可运行的Demo代码参见DefaultExecutorSettingForCffu.java

2.5 高效灵活的并发执行策略(AllFailFast / AnySuccess / AllSuccess / MostSuccess

除了上面提到AllFailFastAnySuccess这2个业务最有用常用并发执行策略,cffu库还支持AllSuccessMostSuccess

汇总说明如下:

  • CompletableFutureallOf方法会等待所有输入CF运行完成;即使有CF失败了也要等待后续CF都运行完成,再返回一个失败的CF
    • 对于业务逻辑来说,这样失败且继续等待的策略(AllComplete),减慢了业务响应性;会希望当有输入CF失败了,则快速失败不再做于事无补的等待
    • cffu提供了相应的allResultsFailFastOf等方法,支持AllFailFast并发执行策略
    • allOf / allResultsFailFastOf两者都是,只有当所有的输入CF都成功时,才返回成功结果
  • CompletableFutureanyOf方法返回首个完成的CF,不会等待后续没有完成的CF赛马模式;即使首个完成的CF是失败的,也会返回这个失败的CF结果。
    • 对于业务逻辑来说,想要的往往不是首个完成但失败的CF结果(AnyComplete),会希望赛马模式返回首个成功的CF结果
    • cffu提供了相应的anySuccessOf等方法,支持AnySuccess并发执行策略
    • anySuccessOf只有当所有的输入CF都失败时,才返回失败结果
  • 返回多个CF中成功的结果,对于失败的CF返回指定的缺省值
    • 业务逻辑包含容错时,当某些CF处理出错时可以使用成功的那部分结果,而不是整体失败
    • cffu提供了相应的allSuccessOf等方法,支持AllSuccess并发执行策略
  • 返回指定时间内多个CF中成功的结果,对于失败或超时的CF返回指定的缺省值
    • 业务是最终一致性时,尽量返回有的结果;对于没能及时返回还在运行中的CF,结果会写到分布式缓存中下次业务请求就有了,以避免重复计算
    • 这是个常见业务使用模式,cffu提供了相应的mostSuccessResultsOf等方法,支持MostSuccess并发执行策略

📔 关于多个CF的并发执行策略,可以看看JavaScript规范Promise Concurrency;在JavaScript中,Promise即对应CompletableFuture

JavaScript Promise提供了4个并发执行方法:

  • Promise.all():等待所有Promise运行成功,只要有一个失败就立即返回失败(AllFailFast
  • Promise.allSettled():等待所有Promise运行完成,不管成功失败(AllComplete
  • Promise.any():赛马模式,立即返回首个成功的PromiseAnySuccess
  • Promise.race():赛马模式,立即返回首个完成的PromiseAnyComplete

PS:JavaScript Promise的方法命名考究~ 👍

cffu的新方法支持了JavaScript Promise规范的并发执行方式~

示例代码如下:

public class ConcurrencyStrategyDemo {
  public static void main(String[] args) throws Exception {
    CompletableFuture<Integer> successCf = CompletableFuture.supplyAsync(() -> {
      sleep(300); // sleep SHORT time
      return 42;
    });
    CompletableFuture<Integer> successAfterLongTimeCf = CompletableFuture.supplyAsync(() -> {
      sleep(3000); // sleep LONG time
      return 4242;
    });
    CompletableFuture<Integer> failedCf = failedFuture(new RuntimeException("Bang!"));

    CompletableFuture<List<Integer>> mostSuccessCf = mostSuccessResultsOf(
        -1, 100, TimeUnit.MILLISECONDS, successCf, successAfterLongTimeCf, failedCf);
    System.out.println(mostSuccessCf.get());
    // output: [42, -1, -1]

    CompletableFuture<List<Integer>> allSuccessCf = CompletableFutureUtils.allSuccessResultsOf(
        -1, successCf, successAfterLongTimeCf, failed);
    System.out.println(allSuccessCf.get());
    // output: [42, -1, 4242]
  }
}

# 完整可运行的Demo代码参见ConcurrencyStrategyDemo.java

2.6 支持直接运行多个Action,而不是要先包装成CompletableFuture

CompletableFutureallOf/anyOf方法输入的是CompletableFuture;当业务直接有要编排业务逻辑方法时,仍然需要先包装成CompletableFuture再运行:

  • 繁琐
  • 模糊了业务流程
  • 简单包装多个ActionCF提交给allOf/anyOf的做法(业务代码往往是这样实现的),会呑异常❗️
    • 当输入Action的运行抛出多个异常时,这些异常至多只能有一个能通过返回CF反馈给业务,其它的异常则被默默地呑掉,影响业务问题的排查

cffu提供了直接运行多个Action的方法,解决上述问题:

  • 方便直接明了地表达与编排业务流程
  • 不呑异常,方便排查业务问题
    • 当多个输入Action的运行抛出多个异常时,会打印日志报告出没有在返回CF中反馈给业务的异常

这些多Action的方法也配套实现了「不同的并发执行策略」与「返回多个输入CF结果」的支持。

示例代码如下:

public class MultipleActionsDemo {
  private static final ExecutorService myBizExecutor = Executors.newCachedThreadPool();
  private static final CffuFactory cffuFactory = CffuFactory.builder(myBizExecutor).build();

  static void mRunAsyncDemo() {
    // wrap actions to CompletableFutures first, AWKWARD! 😖
    CompletableFuture.allOf(
        CompletableFuture.runAsync(() -> System.out.println("task1")),
        CompletableFuture.runAsync(() -> System.out.println("task2")),
        CompletableFuture.runAsync(() -> System.out.println("task3"))
    );
    completedFuture("task").thenCompose(v ->
        CompletableFuture.allOf(
            CompletableFuture.runAsync(() -> System.out.println(v + "1")),
            CompletableFuture.runAsync(() -> System.out.println(v + "2")),
            CompletableFuture.runAsync(() -> System.out.println(v + "3"))
        )
    );

    // just run multiple actions, fresh and cool 😋
    CompletableFutureUtils.mRunAsync(
        () -> System.out.println("task1"),
        () -> System.out.println("task2"),
        () -> System.out.println("task3")
    );
    cffuFactory.completedFuture("task").thenMAcceptAsync(
        (String v) -> System.out.println(v + "1"),
        v -> System.out.println(v + "2"),
        v -> System.out.println(v + "3")
    );
  }
}

# 完整可运行的Demo代码参见MultipleActionsDemo.java

2.7 支持异步并行处理集合数据,而不是先包装数据与ActionCompletableFuture

对于多个数据进行异步并行处理是业务常见需求,但使用CompletableFuture实现会比较繁琐复杂; 也模糊了业务流程,且简单实现会呑异常

cffu提供了异步并行处理的方法,解决上述问题。

这些异步并行处理的方法也配套实现了「不同的并发执行策略」的支持。

示例代码如下:

public class CfParallelDemo {
  static void parApplyFailFastAsyncDemo() {
    ////////////////////////////////////////////////////////////////////////
    // wrap data with action to CompletableFutures first, AWKWARD and COMPLEX! 😖
    ////////////////////////////////////////////////////////////////////////
    Function<Integer, Integer> fn = x -> x + 1;
    List<Integer> list = asList(42, 43, 44);

    CompletableFuture<Integer>[] cfs = new CompletableFuture[list.size()];
    for (int i = 0; i < list.size(); i++) {
      Integer e = list.get(i);
      cfs[i] = CompletableFuture.supplyAsync(() -> fn.apply(e));
    }
    CompletableFutureUtils.allResultsFailFastOf(cfs).thenAccept(System.out::println);
    // output: [43, 44, 45]

    ////////////////////////////////////////////////////////////////////////
    // just parallel process multiple data, fresh and cool 😋
    ////////////////////////////////////////////////////////////////////////
    CfParallelUtils.parApplyFailFastAsync(
        asList(42, 43, 44),
        x -> x + 1
    ).thenAccept(System.out::println);
    // output: [43, 44, 45]
  }
}

# 完整可运行的Demo代码参见CfParallelDemo.java

2.8 支持处理指定异常类型,而不是处理所有异常Throwable

在业务处理的try-catch语句中,catch所有异常(Throwable)往往不是最佳实践。

类似的,CompletableFuture#exceptionally方法,也是处理了所有异常(Throwable);
应该只处理当前业务自己清楚明确能恢复的具体异常,由外层处理其它的异常;避免掩盖Bug或是错误地处理了自己不能恢复的异常。

cffu提供了相应的catching*方法,支持指定要处理异常类型;相比CF#exceptionally方法新加了一个异常类型参数,使用方式类似,不附代码示例。

2.9 Backport支持Java 8

Java 9+高版本的所有CF新功能方法在Java 8低版本直接可用。

其中重要的backport功能有:

  • 超时控制:orTimeout / completeOnTimeout
  • 延迟执行:delayedExecutor
  • 工厂方法:failedFuture / completedStage / failedStage
  • 处理操作:completeAsync / exceptionallyAsync / exceptionallyCompose / copy
  • 非阻塞读:resultNow / exceptionNow / state

这些backport方法是CompletableFuture的已有功能,不附代码示例。

2.10 超时执行安全的orTimeout / completeOnTimeout新实现

CF#orTimeout() / CF#completeOnTimeout()方法当超时时使用CF内部的单线程ScheduledThreadPoolExecutor来触发业务逻辑执行,会导致CF的超时与延迟执行基础功能失效❗️

因为超时与延迟执行是基础功能,一旦失效会导致:

  • 业务功能的正确性问题,设置超时的触发不准延后
  • 系统稳定性问题,如线程中等待操作不能返回、其它有依赖的CF不能完成、线程池耗尽与内存泄露

cffu库提供了超时执行安全的新实现方法:

保证业务逻辑不会在CF的单线程ScheduledThreadPoolExecutor中执行。

更多说明参见:

2.11 支持超时的join方法

cf.join()方法「没有超时会永远等待」,在业务中很危险❗️当意外出现长时间等待时,会导致:

  • 主业务逻辑阻塞,没有机会做相应的处理,以及时响应用户
  • 会费掉一个线程,线程是很有限的资源(一般几百个),耗尽线程意味着服务瘫痪故障

join(timeout, unit)方法即支持超时的join方法;就像cf.get(timeout, unit)之于cf.get()

这个新方法使用简单类似,不附代码示例。

2.12 返回具体类型的anyOf方法

CompletableFutureanyOf()方法返回类型是Object,丢失具体类型,使用返回值时需要转型操作不方便,也不类型安全。

cffu提供的anySuccessOf() / anyOf()方法,返回具体类型T,而不是返回Object

这个方法使用简单类似,不附代码示例。

2.13 输入宽泛类型的allOf/anyOf方法

CompletableFutureallOf() / anyOf()方法输入参数类型是CompletableFuture,而不是更宽泛的CompletionStage类型;对于CompletionStage类型的输入,则需要调用CompletionStage#toCompletableFuture方法做转换。

cffu提供的allOf() / anyOf()方法输入更宽泛的CompletionStage参数类型,使用更方便。

方法使用简单类似,不附代码示例。

更多功能说明

可以参见:

3. cffu库提供的编排方法及其最佳实践

编排方法 指 有多个输入的方法,其中输入 指 需要并发执行的逻辑

cffu库支持3种形式的输入:

  1. Action
  2. 多数据(用相同Action处理各个数据)
  3. CompletableFuture

相比其它更简单的并发编程方式(包含结构化并发),能对多个输入进行灵活高效的编排,是CompletableFuture的优势。

关于编排的不同并发执行策略,参见上面的文档 2.5 高效灵活的并发执行策略(AllFailFast / AnySuccess / AllSuccess / MostSuccess

3.1 编排方法分组

1) 输入多Action

支持3种表示多Action的参数类型:变参数组、集合 和 Tuple(多个输入的泛型参数类型不同)。对应3组变体方法:

  • 多参数变参输入,输入类型是数组类型
    • 对应方法分组:
      • CompletableFutureUtils.M*方法,即Multi-Actions(M*) Methods
      • CompletableFutureUtils.thenM*方法,即Then-Multi-Actions(thenM*) Methods
  • 集合参数输入,输入类型是 Iterable
    • 对应方法分组:
      • CfIterableUtils.M*,即Multi-Actions(M*) Methods
      • CfIterableUtils.thenM*,即Then-Multi-Actions(thenM*) Methods
    • 这组方法的方法名与功能与上一组「多参数变参输入」一样,但多Action输入的参数类型不同(Iterable vs. 数组)
  • 泛型参数类型不同的多Action输入,输入类型是 Tuple
    • 对应方法分组:
      • CfTupleUtils.MTuple*,即Multi-Actions-Tuple(MTuple*) Methods
      • CfTupleUtils.thenMTuple*,即Then-Multi-Actions-Tuple(thenMTuple*) Methods

多个Action对(单个相同的)数据进行异步并行处理,即多指令单数据(MISD)。

2) 输入多个数据

对多个数据通过单个相同Action进行异步并行处理,即多指令单数据(MISD)。

对应方法分组:

  • CfParallelUtils.Par*方法,即Multi-Data(Par*) Methods
  • CfParallelUtils.thenPar*方法,即Then-Multi-Data(thenPar*) Methods

在业务逻辑中,应该使用集合持有多个数据而不是数组;如果业务逻辑持有的是数组类型的多个数据,也可以简单转换成集合类型,如通过方法Arrays.asList(...)cffu库不再提供多参数变参数组类型输入的方法变体。

3) 输入多CompletableFuture

与输入多Action一样,支持3种表示多Action的参数类型:变参数组、集合 和 Tuple(多个输入的泛型参数类型不同)。对应3组变体方法:

  • 多参数变参输入,输入类型是数组类型
    • 对应方法分组 CompletableFutureUtils.*Of
  • 输入集合,输入类型是Iterable
    • 对应方法分组CfIterableUtils.*Of
    • 这组方法的方法名与功能与上一组「多参数变参输入」一样,但多CompletableFuture输入的参数类型不同(Iterable vs. 数组)
  • 输入异质的不同类型,输入类型是Tuple
    • 对应方法分组CfTupleUtils.*TupleOf

3.2 编排方法选用的最佳实践 🏆

1) 当业务处理逻辑直接有多个Action

包含直接写的Lambda表达式形式的Action

  • Action个数固定/已知时,使用「多参数变参Action」方法,对应方法分组:
    • CompletableFutureUtils.M*方法,即Multi-Actions(M*) Methods
    • CompletableFutureUtils.thenM*方法,即Then-Multi-Actions(thenM*) Methods
  • Action个数不固定时,使用「Action集合」方法,对应方法分组:
    • CfIterableUtils.M*,即Multi-Actions(M*) Methods
    • CfIterableUtils.thenM*,即Then-Multi-Actions(thenM*) Methods

2) 当业务处理逻辑有多个数据进行异步并行处理时

使用「输入多个数据」方法,对应方法分组:

  • CfParallelUtils.Par*方法,即Multi-Data(Par*) Methods
  • CfParallelUtils.thenPar*方法,即Then-Multi-Data(thenPar*) Methods

3) 当业务处理逻辑输入只有多个CompletableFuture

如其它模块或三方库中方法返回的是CompletableFuture,要编排时只能使用输入多CompletableFuture的方法。

  • CompletableFuture个数固定/已知时,使用「多参数变参CompletableFuture」方法,对应方法分组:
    • 对应方法分组CfIterableUtils.*Of
  • CompletableFuture个数不固定时,使用「CompletableFuture集合」方法,对应方法分组:
    • 对应方法分组CfIterableUtils.*Of

相比上面的方法分组(多Action/多数据),这些输入多个CompletableFuture的方法:

  • 会呑异常❗️
    • 当输入CompletableFuture的运行抛出多个异常时,这些异常至多只能有一个能通过返回CF反馈给业务,其它的异常则被默默地呑掉,影响业务问题的排查
  • 额外的包装逻辑代码繁琐,并且模糊了业务流程

在业务开发中,可以将这些输入多个CompletableFuture的方法当作下层基础方法,仅在必要时才使用。

在关键业务逻辑中,使用这些方法注意实现好异常报告逻辑(即不要呑异常):

  • 如何实现可以参考cffu实现代码,如CompletableFutureUtils.mSupplyFailFastAsync()
  • cffu库提供了实现编排异常报告的支持工具类SwallowedExceptionHandleUtils

🔌 API Docs

🍪依赖

可以在 central.sonatype.com 查看最新版本与可用版本列表。

  • cffu库(包含Java CompletableFuture的增强CompletableFutureUtils):
    • For Maven projects:

      <dependency>
        <groupId>io.foldright</groupId>
        <artifactId>cffu2</artifactId>
        <version>2.0.7</version>
      </dependency>
    • For Gradle projects:

      Gradle Kotlin DSL

      implementation("io.foldright:cffu2:2.0.7")

      Gradle Groovy DSL

      implementation 'io.foldright:cffu2:2.0.7'
  • 📌 TransmittableThreadLocal(TTL)cffu executor wrapper SPI实现
    • For Maven projects:

      <dependency>
        <groupId>io.foldright</groupId>
        <artifactId>cffu2-ttl-executor-wrapper</artifactId>
        <version>2.0.7</version>
        <scope>runtime</scope>
      </dependency>
    • For Gradle projects:

      Gradle Kotlin DSL

      runtimeOnly("io.foldright:cffu2-ttl-executor-wrapper:2.0.7")

      Gradle Groovy DSL

      runtimeOnly 'io.foldright:cffu2-ttl-executor-wrapper:2.0.7'
  • cffu bom:
    • For Maven projects:

      <dependency>
        <groupId>io.foldright</groupId>
        <artifactId>cffu2-bom</artifactId>
        <version>2.0.7</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    • For Gradle projects:

      Gradle Kotlin DSL

      implementation(platform("io.foldright:cffu2-bom:2.0.7"))

      Gradle Groovy DSL

      implementation platform('io.foldright:cffu2-bom:2.0.7')

📚 更多资料

👋 关于库名

cffuCompletableFuture-Fu的缩写;读作C Fu,谐音Shifu/师傅

嗯嗯,想到了《功夫熊猫》里可爱的小浣熊师傅吧~ 🦝

shifu