函数式编程 & 反应式编程

# 函数式编程

在函数式编程里面, 函数为第一等公民, 或者 纯函数是唯一的第一等公民(Pure function is the one and only first-class citizen). 函数式是无副作用的, 在函数是编程里面, 只能够是无状态的纯函数, 同时变量也应该是常量化的. 借此可以得到更加可预测和更可靠的编程代码.

# 反应式编程

反应式编程是一种编写异步数据流的编程方式. 利用函数式编程组合数据流的验算, 整个程序将成为事件和行为的组合. 它可以提高代码的抽象级别, 因而可以专注于定义业务逻辑的事件的相互依赖性.

# Project Reactor

PR 是一个实现以上思想的基于JVM的反应式编程库.

(以下机翻官网) 特点

  • REACTIVE CORE

    Reactor是一个具有高效的需求管理的完全无阻塞的基础. 它直接直接与Java Functional API, Completable Future, Stream和Duration进行交互

  • TYPED [0|1|N] SEQUENCES

    Reactor提供2个反应式组合Publisher, Flux[0:N]和Mono[0:1] , 它们广泛地实现了反应性扩展

  • NON BLOCKING IO

    reactor适用于微服务体系结构, 为HTTP(包括WebSockets), TCP和UDP提供背压式网络引擎.

# 对比传统

# Future

        CountDownLatch countDownLatch = new CountDownLatch(1);
        FutureTask<String> futureTask = new FutureTask<>(() -> {
            try {
                Thread.sleep(2 * 1000);
                log.info("doing something");
                return "done";
            } finally {
                countDownLatch.countDown();
            }
        });
        executor.execute(futureTask);

        log.info("task is done? {}", futureTask.isDone());
        countDownLatch.await();
        log.info("task is done? {}", futureTask.isDone());
        log.info("task end. {}", futureTask.get());

# Callback

        CountDownLatch countDownLatch = new CountDownLatch(1);
        Callback<String, Void> callback = param -> {
            log.info("task end. {}", param);
            countDownLatch.countDown();
            return null;
        };

        FutureTask<Void> futureTask = new FutureTask<>(() -> {
            try {
                Thread.sleep(2 * 1000);
                log.info("doing something");
                callback.call("done");
                return null;
            } finally {
                countDownLatch.countDown();
            }
        });

        executor.execute(futureTask);

        log.info("task is done? {}", futureTask.isDone());
        countDownLatch.await();
        log.info("task is done? {}", futureTask.isDone());
        log.info("task end. {}", futureTask.get());

# CompletableFuture

        CountDownLatch countDownLatch = new CountDownLatch(1);

        CompletableFuture completableFuture = CompletableFuture
                .supplyAsync(() -> {
                    try {
                        Thread.sleep(2 * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info("doing something");
                    return "done";
                }, executor)
                .thenAccept(str -> {
                    log.info("task end. {}", str);
                    countDownLatch.countDown();
                });

        log.info("task is done? {}", completableFuture.isDone());
        countDownLatch.await();
        log.info("task is done? {}", completableFuture.isDone());

# Reactor

        CountDownLatch countDownLatch = new CountDownLatch(1);

        Disposable disposable = Mono
                .fromSupplier(() -> {
                    try {
                        Thread.sleep(2 * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info("doing something");
                    return "done";
                })
                .cast(String.class)
                .subscribe(str -> {
                    log.info("task end. {}", str);
                    countDownLatch.countDown();
                });

        log.info("task is done? {}", disposable.isDisposed());
        countDownLatch.await();
        log.info("task is done? {}", disposable.isDisposed());

Reactor 和 CompletableFuture 在这里很相似, 这也印证了上文特点之一. 你可以很方便的通过大部分方式创建你的Publisher

  • Mono.just 单纯创建
  • Mono.create 由一个Consumer<MonoSink<T>>Callback 创建
  • defer延迟创建Publisher
  • delay通过Duration空延迟
  • empty
  • from从另一个Publisher
  • fromCallable从一个Callable
  • fromFuture从一个CompletableFuture
  • fromRunnable, fromSupplier,using
  • ......
        Mono.using(
                () -> Mono.just(1),
                param -> Mono.just(String.valueOf(param)),
                integer -> log.info("cleanup"),
                true
        );