Java 响应式编程【Reactive Programming in Java】

学习来源:Exploring reactive programming in Java by Miro Cupak

最近学习RxJava。RxJava 在 GitHub 主页上的自我介绍:

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

翻译:RxJava是Reactive Extensions(响应式扩展)在 Java VM 上的实现,是一个使用可观测的序列来组成异步的、基于事件的程序的库。

两个关键词:Reactive Extensionsasynchronous

RxJava 的核心是响应式编程,所解决的问题是“异步”

所以在学习Rx库之前,有必要了解下什么是响应式编程


RX

Reactive Extensions是指什么。

Rx取自于ReactiveX,官网对其的介绍如下。

An API for asynchronous programming with observable streams

一个带有可观察的流的异步编程的API

还是离不开“异步”,提到异步就不得不提到线程

所以,线程是响应式编程的第一步!


响应式的由来

故事从上古时期说起:

Java 1:

众所周知,Java在设计之初就是一门支持多线程的语言。在Java1.0版本中,要想开辟一个新线程,需使用Thread:

1
2
Thread thread = new Thread(() -> System.out.println("hello world"));
thread.start()

除此之外,它几乎不能做任何复杂的事情。

你可以说它有异步,但是他的整个异步系统处于“瘫痪”状态,简陋至极。当然也没有“响应”可言,可以叫他“零响应式”。

Java 5:

这个版本中为我们新增了三个很实用的接口:ExcutorService、Future和Callable。

Callable使Runnable有了返回值;ExecutorService.submit(callable)方法返回一个Future,便可以从Future中获取异步执行返回的结果。

1
2
3
ExecutorService e = Executors.newSingleThreadExecutor();
Future<String> future = e.submit(() -> "hello world");
future.get();

这样一来,我们有能力提供一个复杂的“异步”系统,但是“被动”且“低廉”。不管怎么说,它有了自己的一个异步系统,我们姑且称之为“一级响应式”。

Java 7

Java 7开始引入了一种新的Fork/Join线程池(Pool),它可以执行一种特殊的任务:把一个大任务拆成多个小任务并行执行。

同样的代码可以如下执行:

1
2
3
ExecutorService e = ForkJoinPool.commonPool();
Future<String> future = e.submit(() -> "hello world");
future.get();

在此支持下,我们可以很好的做到各个线程的同步。我们称之为“二级响应式”。

但是饶是如此,使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,主线程会被迫等待。

Java 8

从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法

1
2
3
4
CompletableFuture cf = new CompletableFuture<String>();
//cf.complete("hello");
cf.completeExceptionally(new Exception("error"));
cf.get();

等待、成功、异常,都可以作为方法传给CompletableFuture。

CompletableFuture中还内置很多方法,分为三类;

  1. What kind of task is this;
  2. What kind of an Opreation is it support;
  3. What thread should run for the task.

其中,task可以是runnable、consumer或者function。 3种。

Opretion可以是Chain(链接操作)、compose(组合)、combine AND( AND结合)、combine OR(OR结合)。4种。

thread可以是:当前线程、主线程 和 自己定义的某线程。3种。

一番排列组合下来,一共有3 × 4 × 3 = 36 种方法。

高效,漂亮,易使用,非阻塞。可以是“三级响应式”了

后续版本种还加入了延时方法,completeOnTimeOut()和orTime(),已经做到了很好的异步操作。

Java 9

到此为止这个异步系统还差点什么。比如当出线“生产者”比“消费者”快的情况下【背压问题】,不能很好的解决。

所以,Java 9 引入了Flow接口。

1
2
3
4
5
6
SimpleSubscriber sub = new SimpleSubscriber();
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
publisher.subscribe(sub);

publisher.submit("msg");
publisher.close();

这一套接口使用下来,越发接近我们想要的模型了,是“四级响应式

但是它并不完美,因为一般异步都涉及到网络请求。

Java 9 到 Java 11

这其中新增了Http2接口,除了支持webSocket外,还对Flow进行了很好的适配,使得网络的响应更加容易了。至此我们称之为“五级响应式”

这些在API的种种变化,使得我们操作异步更加方便优雅,响应式的程度也就越高。

不过这些改变都在JDK的层面上。

Reactive Libraries

在往上走就是Library层面的故事了。我们的RxJava库,就属于这个层面。

框架

Library往上走就是框架层面,框架中也有响应的支持。


综上,最后结论如图:

image-20201205010159396

响应式编程使得我们操作异步更加方便。

相关文章:

给 Android 开发者的 RxJava 详解

廖雪峰的官方网站–Java篇