Functional Reactive Programming

Functional Reactive Programming,以下简称FRP。中文译作函数式响应式编程,又称为函数式反应式编程,是一种采用函数式编程,进行面向异步数据流的编程范式。

它不是某一个语言的工具,而是一种更高抽象级别的编程范式。在很多领域中,已经诞生了不少成熟的响应式编程框架,比如RxJS、RxJava、Project Reactor……

在这个通用的编程范式中,其核心的异步事件流叫做Observerble/Stream。如果使用到Java Stream API、EventBus、观察者模式的实现等工具的话,应该会对这两个名词很熟悉。所以在讲FRP之前。先介绍函数式编程、观察者模式,以及事件驱动模式。

本文会以一些前端以及后端的案例,去讲一些函数式响应式编程的应用场景。

# 函数式编程

在函数式编程里面,函数为第一等公民,或者,纯函数是唯一的第一等公民(Pure function is the one and only first-class citizen)。函数式应该是无副作用的,在函数式编程里面,一般只能够是无状态的纯函数,同时变量也应该是常量化的。借此可以得到更加可预测和更可靠的编程代码。

# 命令式编程

int[] arr = new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9};
List<String> stringValueList = new ArrayList<>();

for(int idx = 0; idx < arr.length; idx++) {
    stringValueList.add(String.valueOf(arr[idx]));
}

for(int idx = 0; idx < stringValueList.size(); idx++) {
    log.info(stringValueList.get(idx));
}

# 函数式编程

int[] arr = new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9};

// 一般来说要求 ImmutableList
List<String> stringValueList = Arrays.stream(arr)
        .mapToObj(String::valueOf)
        .collect(Collectors.toList());

// or
Arrays.stream(arr)
        .mapToObj(String::valueOf)
        .forEach(log::info);

借助函数式编程的特性,我们可以实现很多天然支持高并发的需求,因为只要遵循函数式的编程范式,只要都是常量,就不存在共享内存、死锁等情景。这也是纯函数式编程中确定性/不变性带来的优势。

而且函数式编程还拥有很多丰富的特性,比如lambda表达式、高阶函数、柯里化、闭包(Java也有)……

# 观察者模式 & 发布订阅模式

观察者模式,是一种设计模式。意为建立一种对象与对象之间的依赖关系,一个对象发生改变时将自动通知其他对象,其他对象将相应做出反应。

发布订阅模式和观察者模式的一个小小拓展。

  • 观察者模式

    核心是一个Subject,其维护一组Observers(观察者),为特定的event(实践)来notify(通知)观察者。

  • 发布订阅模式

    有小小的不同,在观察者模式中的Subject叫做Publisher,而观察者变成Subscriber订阅者。它们的区别是发布者和订阅者并不知道对方的存在,它们之间通过消息中介连接起来。

具体的函数差别就是。观察者中的Subject通过fireEvent直接通知所有的Observer,而发布订阅中Publisher通过publishEvent到一个特定的Channel,然后订阅者订阅这个Channel收到Event。

# 事件驱动 & 消息驱动

事件驱动的定义和解释很长,但是以一个GUI典型用例来讲就很简单了。一个按钮被点击,产生了一个Client Event,然后以这个Event来驱动业务的运行,这就是事件驱动。

而观察者模式就是我们最常用的事件驱动模型。

消息驱动和事件驱动类似的,区别是事件/消息的传递,一个是Push、一个是Pull。事件驱动是主动推送事件的,而消息驱动是由第三方(系统等……)侦测捕获的。

# FRP编程范式

事件驱动,基于观察者,借助函数式基于异步数据流进行编程。

# 假设案例一

体现事件驱动的思想,响应式流控缓冲,以及函数式编程的思想。

# 点击一个button打印一条log

# Normal HTML
<script>
    const log = () => {
        console.log("click");
    };
</script>

<button onclick="log()" name="button">button</button>
# Binding
<script>
    document
        .querySelector("#button")
        .addEventListener('click', () => console.log("click"));
</script>

<button id="button" name="button">button</button>
# Reactive
<script>
    const button = document.querySelector("#button");
    Rx
        .Observable
        .fromEvent(button, "click");
        .subscribe(() => console.log("click"))
    
</script>

<button id="button" name="button">button</button>

在这里看起来越来越长,好像FRP很啰嗦。

# 点击该button的时候,不仅要打印一条log了,我还想再弹出一个对话框

# Normal HTML

在这里你不能再添加多一个onclick,你只能绑定一个0级事件处理函数。

你不得不写一个大而全的函数,或者写一个分发函数。

<script>
    const log = () => {
        console.log("click");
        alert("click");
    };
</script>

<button onclick="log()" name="button">button</button>
# Binding

很轻松,加多一个Listener即可

<script>
    const button = document.querySelector("#button");
    button.addEventListener('click', () => console.log("click"));
    button.addEventListener('click', () => alert("click"));
</script>

<button id="button" name="button">button</button>
# Reactive

很轻松,加多一个Subscriber即可

<script>
    const button = document.querySelector("#button");
    const clickStream = Rx.Observable.fromEvent(button, "click");
    
    clickStream.subscribe(() => console.log("click"));
    clickStream.subscribe(() => alert("click"));
</script>

<button id="button" name="button">button</button>

然后变更一下需求

# 限制打印log的频率,即1s内最多打印一条日志

# Normal HTML

这问题老大了,你可能需要一个工具库。自己写一个实现,或者使用lodash工具来帮助。

我这么做了,但是顺带做了1s内只能弹出对话框一次。那么正确的方法怎么做呢?懒得写了

<script src="lodash.js"></script>
<script>
    const log =  _.throttle(() => {
        console.log("click");
        alert("click");
    }, 1000);
</script>

<button onclick="log()" name="button">button</button>
# Binding

也不算很难,引入lodash,wrap一下原函数即可

<script>
    const button = document.querySelector("#button");
    button.addEventListener('click', _.throttle(() => console.log("click"), 1000));
    button.addEventListener('click', () => alert("click"));
</script>

<button id="button" name="button">button</button>
# Reactive

Rx自带流控,当然,这是Rx在JS中的实现。可能没有那么好看,不过已经很方便了。

<script>
    const button = document.querySelector("#button");
    const clickStream = Rx.Observable.fromEvent(button, "click");
    
    clickStream
        .pipe(Rx.throttleTime(1000))
        .subscribe(() => console.log("click"));
    clickStream.subscribe(() => alert("click"));
    
</script>

<button id="button" name="button">button</button>

# 当点击button-b的时候同样打印这条日志

这里只对比普通的事件驱动的JS,和RxJS响应式的区别。

# Binding
<script>
    const button = document.querySelector("#button");
    const buttonB = document.querySelector("#button-b");   
    const logFunc = _.throttle(() => console.log("click"), 1000);
    
    button.addEventListener('click', () => alert("click"));
    button.addEventListener('click', logFunc);
    buttonB.addEventListener('click', logFunc);
</script>

<button id="button" name="button">button</button>
<button id="button-b" name="button-b">button-b</button>
# Reactive
<script>
    const button = document.querySelector("#button");
    const buttonB = document.querySelector("#button-b");   
    const clickStream = Rx.Observable.fromEvent(button, "click");
    const clickBStream = Rx.Observable.fromEvent(button, "click");
    
    clickStream
        .merge(clickBStream)
        .pipe(Rx.throttleTime(1000))
        .subscribe(() => console.log("click"));
    clickStream.subscribe(() => alert("click"));
    
</script>

<button id="button" name="button">button</button>
<button id="button-b" name="button-b">button-b</button>

# 点击button后延迟1s再打印日志

到这里Reactive的优势完全体现出来了。对于Reactive来讲,编程只是对事件源进行不断的进行map/reduce,以及过滤、流控、缓冲……

而普通的Binding,虽然一样基于事件驱动,但是它以命令式的编程去实现了我们的需求。

# Binding
<script>
    const button = document.querySelector("#button");
    const buttonB = document.querySelector("#button-b");   
    const logFunc = _.throttle(
        () => setTimeout(
            () => console.log("click"),
            1000
        ),
        1000
    );
    
    button.addEventListener('click', () => alert("click"));
    button.addEventListener('click', logFunc);
    buttonB.addEventListener('click', logFunc);
</script>

<button id="button" name="button">button</button>
<button id="button-b" name="button-b">button-b</button>
# Reactive
<script>
    const button = document.querySelector("#button");
    const buttonB = document.querySelector("#button-b");   
    const clickStream = Rx.Observable.fromEvent(button, "click");
    const clickBStream = Rx.Observable.fromEvent(button, "click");
    
    clickStream
        .merge(clickBStream)
        .pipe(Rx.throttleTime(1000))
        .pipe(Rx.delay(1000))
        .subscribe(() => console.log("click"));
    clickStream.subscribe(() => alert("click"));
</script>

<button id="button" name="button">button</button>
<button id="button-b" name="button-b">button-b</button>

# 假设案例二

体现异步数据流编程。案例为典型的Callback Hell

# Normal JS

const param = null;

getDataAsync(param, (res1, err1) => {
    if (err1 != null) return
    getDataAsync(param, (res2, err2) => {
        if (err2 != null) return
        getDataAsync(param, (res3, err3) => {
            if (err3 != null) return
            getDataAsync(param, (res4, err4) => {
                if (err4 != null) return
                console.log(`res=${res1 + res2 + res3 + res4}`);
            })
        })
    })
})

# Promise

Promise 其实有一定的FRP思想在里面。

const getData = param => new Promise((resolve, reject) => {
    getDataAsync(param, (res, err) => {
        if (err4 != null) 
            reject(err)
        else 
            resolve(res);
    })
})

const param = null;
getData(param)
    .then(res => getData(param).then(res2 => res + res2))
    .then(res => getData(param).then(res2 => res + res2))
    .then(res => getData(param).then(res2 => res + res2))
    .then(res => console.log(`res=${res}`));

# Reactive

const param = null;

// 转换为Observable
const ob = Rx.Observable.create(sink => {
    getDataAsync(param, (res, err) => {
        if (err4 != null) return;
        sink.next(res);
    })
});

ob
    .flatMap(res => ob.map(res2 => res + res2))
    .flatMap(res => ob.map(res2 => res + res2))
    .flatMap(res => ob.map(res2 => res + res2))
    .flatMap(res => ob.map(res2 => res + res2))
    .subscribe(res => console.log(`res=${res}`))

# 假设案例三

体现异步IO,后端的一大难点。对于NIO编程,Netty是Java里面绝对的主流框架。尽管Netty已经对原生NIO进行了改良、封装、进一步设计(应用事件驱动)等方法,力求提供给程序员便捷、安全地进行网络编程。但是Netty依旧没有那么容易入门和学习。而且Netty作为基准开发的微服务框架、数据库驱动、网络客户端、网络服务器等……为了易于开发和理解,依然没有完全利用到NIO的优势。

这都是因为同步编程模型和开发难度与普通的异步编程的开发难度相差甚远。

这里有一个典型后端案例

进行一次HTTP交互,以TCP为最底层,解码到HTTP Request,然后进行业务逻辑,然后持久到数据库中,返回HTTP Response,编码发出

# BIO

在Tomcat旧版本的同步阻塞通信模型中

  1. 它会分配一条线程进行TCP应答、解码。

  2. 通过Servlet技术,在当前线程中进行业务逻辑。

  3. 借助JDBC技术,在当前线程中进行数据库读写。

完成一次完整的业务,每一条TCP连接都占有一条线程,而且每一条线程都会进行的同步阻塞的IO。在这个最简单的业务中,总共就有4次IO。假设40ms完成这一次请求响应,其中可能95%的时间都浪费在阻塞等待中。更不要讲一条TCP连接绝大部分时间都在空闲中,却一直占有一条线程。

在这样的通信模型中,注定承受不住很大的并发。甚至TCP连接数一多,很容易就出现拒绝应答、服务器忙的情景。但是服务器却远远没到真正忙的时候

# NIO

Servlet 3.0 之后引入了异步模式,Tomcat高版本也引入了NIO的应答线程和业务线程。主流网络编程框架Netty也是一款NIO框架。为了高性能,必须选择更强大的NIO模式。

以SpringMVC技术开发的框架,目前支持直接使用Tomcat、Netty、Undertow……作为其Web应用服务器。这一切都系Spring Boot 的AutoConfiguration中帮你做好了。

不过你真的能完美应用到NIO吗?

借助SpringMVC,HTTP协议层面确实是基于NIO实现了。但是你的业务、数据库、HTTP客户端、RPC客户端,却大部分都是同步阻塞的框架,大部分做法都是将这样的耗时任务作为一个Task投进单独的一个业务线程池。虽然不会出现拒绝应答的情景了,但是当业务线程繁忙的时候,一样会出现服务器忙的情景。

BIO

因为无论在习惯不习惯了同步编程的时候,异步编程实在是太难了!

以普通的异步模式去解决这个问题

@RestController
public class helloController() {
    @Resource 
    private AsyncDb asyncDb;

    @GetMapping("/hello")
    public DeferredResult<String> hello() {
        DeferredResult<String> deferredResult = new DeferredResult<>();
        asyncDb.query(
            "select value from db where id = 1",
            (res) -> {
                deferredResult.setData(res);
            });
        return deferredResult;
    }
}

看起来没有什么问题,但是在实际的生产环境中,除了数据库,还要缓存处理、事务处理、日志处理、消息队列、并发任务、AOP切面、动态代理……数不清的逻辑。

稍微拓展一下,以下是一个错误的代码

@RestController
public class helloController() {
    @Resource 
    private AsyncDb asyncDb;

    @GetMapping("/hello")
    @Transational
    public DeferredResult<Integer> hello() {
        DeferredResult<Integer> deferredResult = new DeferredResult<>();
        AtomicInteger i = new AtomicInteger(0);
        new Thread(() -> {
            CountdownLatch cdl = new CountdownLatch(1);
            cdl.await();
            deferredResult.setData(i.get());
        })
        asyncDb.query(
            "select value from db where id = 1",
            (res) -> {
                i.addAndGet(Integer.valueOf(res));
            });
        asyncDb.query(
            "select value from db where id = 2",
            (res) -> {
                i.addAndGet(Integer.valueOf(res));
            }); 
        asyncDb.exec("insert into db(id, value) values (3, '3')",); 
        asyncDb.exec("insert into db(id, value) values (4, '4')",); 
        return deferredResult;
    }
}

这里模拟了同时读取两次数据库并聚合运算,这部分没有问题,是传统的异步编程范式。

然后开启了事务插入两行,这部分是绝对不会成功的。因为每一条exec都是一条即时返回的异步指令,@Transational标记的方法区会立即退出,执行指令的根本不是当前方法、当前线程。

除了事务,还有AOP无法拦截正常代码,ThreadLocal线程变量失去作用,代理出现意外的线程安全问题,动态数据源因为异步无法进行……

# Project Reactor

Project Reactor 是一个实现响应式编程思想的JVM响应式编程库。结合Spring Reactive技术栈,可以很方便的使用响应式编程轻松的实现以往复杂晦涩的逻辑。

WebFlux是Spring5中,基于Project Reactor编程库的,MVC的实现。与之对应的传统实现是SpringMVC。

public class helloWebHandler() {
    @Resource 
    private ReactiveDb reactiveDb;
    
    private Mono<Integer> queryDb(Integer id) {
        return reactiveDb
            .query("select value from db where id = {}", id)
            .map(Integer::valueOf);
    }

    public Mono<ServerResponse> hello(ServerRequest serverRequest) {
        return serverRequest
            .queryParam
            .get("id")
            .map(Integer::valueOf)
            // flatten 数据流
            .flatMap(::queryDb)
            // zip 另外一条数据流
            .zipWith(queryDb(2))
            .map(tuple2 -> tuple2.getT1() + tuple2.get(T2))
            .map(res -> ServerResponse.ok().syncBody(res).build());
    }
}

这里把一整个业务转换为数据流的转换,而其中的数据是异步的。我们的代码只是声明了数据流的转换而已,真正的触发是在最终的订阅中实现(此处是Web Server)。

而且我们还使用到了一些更高级的方式。flatMap的意思是展平数据流,在函数式编程里面有一种特殊的Stream叫Meta Stream ,它是一种包含Stream的Stream,类型是Stream<Stream<T>>。用指针的话来说也就是指向一个Stream的Stream。在响应式编程里,意味着未来的数据通过map出一个未来的未来的数据。

在函数式里面,我们可以通过flatten方式去展平这样一个Stream,在响应式里面一样可以使用flatten方式去展平数据流。这样可以很轻易的在异步数据流在使用异步数据流,在异步数据计算中根本没有异步调用的难点。

# IO!!

更重要的IO中,DB不是阻塞的。当DB发出SQL之后,线程就会解放出来,这时候没有占用任何一条线程,仅仅是保留一些堆栈上下文,真正的数据已经流出去了。当响应到来的时候,再从线程池冲抽出一条线程处理IO读,然后再继续数据的流动。

NIO

得益于Project Reactor的设计,可以几乎无难度进行线程切换,和线程调度。以函数式的编程范式,无差别的编写同步和异步的代码。"