Skip to the content.

Quarkus - 开始响应式编程

原文链接 : https://quarkus.io/guides/getting-started-reactive

📢 : 本篇教程将会带你进入一个你可能从来没有接触过的世界, 即响应式编程, 这个响应式并不是指我们平时在网页上看到的响应式UI布局, 而是指程序通过异步, 非阻塞地对I/O事件做出反应与处理. 在Spring家族中, 与之对应的是Spring WebFlux, 如果你以前没有听过响应式编程, 那我强烈建议你先上网搜一下什么是”Spring WebFlux”, 或者看一看这一篇入门小文章.

这是一个和传统java开发完全不一样的世界, 一个完全异步非阻塞的美妙世界, 开发响应式程序使用的语法, 代码风格等和以前的传统java编程模式是完全不一样, 希望你在学习前有心理准备😁.

需要注意以下两点 :

  1. quarkus并不强制你学习响应式编程, 你使用quarkus开发的代码既可以是同步的, 也可以是异步的; 既可以是响应式的, 也可以是非响应式的.
  2. 响应式编程并不会让你的程序变得更显著的快, 它最显著的作用是能让程序在相同硬件性能下提高并发数, 但是如果你不差钱, 直接更换性能更好的机器也能够提高并发数. 所以在我看来, 学习响应式编程最大的作用是, 能让你享受收获知识的快乐😁.

了解如何用Quarkus创建一个响应式应用程序, 并探索Quarkus提供的不同响应式功能. 本教程包括 :

先决条件

要完成这个教程, 你需要 :

项目源码

我们建议你按照从Bootstrapping项目开始的说明, 一步一步地创建应用程序.

不过, 你可以直接进入已完成的例子.

下载源码或克隆git仓库.

git clone https://github.com/quarkusio/quarkus-quickstarts.git

代码位于getting-started-reactive目录和getting-started-reactive-crud下.

Quarkus响应式的多个方面

Quarkus是响应式的. 如果你打开引擎盖看一下, 会发现一个响应式引擎为你的Quarkus应用程序提供动力. 这个引擎就是Eclipse Vert.x (https://vertx.io). 所有的网络I/O都通过非阻塞和响应式的Vert.x引擎.

Quarkus is based on a reactive engine

让我们举两个例子来解释它是如何工作的. 想象传入一个HTTP请求. 嵌入Quarkus的 (Vert.x)HTTP服务器接口接收到请求, 然后将其路由到应用程序. 如果请求的目标是一个命令式(📢 : 命令式是指传统的java开发代码)方法 (传统的JAX-RS接口, 用@Blocking注释的代码……), 路由层会在一个工作线程worker thread中调用资源方法, 并在数据可用时写入响应. 到目前为止, 和传统的开发方法没有区别, 没有什么新的或突出的问题. 下面的图片描述了这种行为. 在这种情况下, 应用程序代码在工作线程中被调用, 而业务逻辑可以阻塞该线程.

Behavior when using the imperative routes

但是, 如果HTTP请求的目标是一个响应式方法 (使用RESTEasy Reactive的JAX-RS, 响应式路由, @Incoming方法不注有@Blocking…), 路由层在I/O线程上调用路由, 这会带来很多好处, 比如更高的并发性和性能.

Behavior when using the reactive routes

这时Quarkus使用I/O线程来调用你的代码, 这可以节省(📢 : save这里我不确定是指节省还是指保存)上下文切换, 避免大量的线程池管理, 从而提高资源利用率. 但是, 代码绝不能阻塞该线程. 为什么呢?因为, I/O线程是用来处理多个并发请求的. 一旦一个请求因为需要执行一些I/O而不能取得进展, quarkus就会调度这些I/O并继续程序剩下的部分 (📢 : passes a continuation真心难翻译). 它会释放线程以处理其他请求. 当刚刚没有进展的I/O完成后, 会继续被执行, 回到I/O线程上.

因此, 许多Quarkus组件的设计都考虑到了响应式, 如数据库访问 (PostgreSQL、MySQL、Mongo等)、应用服务 (邮件、模板引擎等)、消息传递 (Kafka、AMQP等)等等. 但是, 为了充分受益于这种模式, 应用程序代码应该以非阻塞的方式编写 (📢 : 指你的代码从接口层, service层, dao层这一整条链路的所有代码得是响应式的, 这也是目前响应式编程难以落地的原因). 在这里,响应式API是一种终极武器.

Mutiny - 一个响应式编程库

📢 : 这个部分是整个quarkus响应式编程的最重要的地方, 可以说, 学习怎么在quarkus中响应式编程的本质, 就是学习怎么调用Mutiny这个类库, 所以这里我要先介绍一下Mutiny是什么,Vert.x又是什么.

Vert.x是一个基于全异步Java服务器Netty (Spring WebFlux也是基于Netty)的JVM、轻量级、高性能应用平台, 但是quarkus团队觉得Vert.x的上手门槛太高, 而且API设计过于拉了, 所以开发了一个Mutiny库, 来封装和统一Vert.x的API. 这里我强烈建议先看一遍Mutiny的官网教程, 为此我也写了一篇Mutiny入门教程博客, 可以先看了那篇入门教程, 再看下面的内容.

Mutiny是一个响应式编程库, 允许表达和编排异步动作. 它提供两种类型 :

这两种类型都是惰性的, 并遵循一个订阅模式. 只有在有实际需要的时候才开始计算 (即有订阅者加入).

uni.subscribe().with(
    result -> System.out.println("result is " + result),
    failure -> failure.printStackTrace()
);

multi.subscribe().with(
    item -> System.out.println("Got " + item),
    failure -> failure.printStackTrace()
);

Uni和Multi都提供了事件驱动的API: 你可以表达你想在一个给定的事件 (成功、失败等)中做什么. 这些API被分为几组 (操作类型), 以使其更具表现力, 并避免在一个单一的类上有100多个方法. 主要的操作类型是关于对失败的反应, 完成, 操作项目, 提取, 或收集它们. 它通过可导航的API提供了一个流畅的编码体验, 因此不需要太多关于响应式的知识.

httpCall
    .onFailure().recoverWithItem("my fallback");

你可能想知道Reactive Streams (https://www.reactive-streams.org/). Multi实现了Reactive Streams Publisher, 因此实现了Reactive Streams的反压机制 (也可翻译成背压, 背压的意思是当消费者消费的速度跟不上生产者生产速度时所采取的一种机制). Uni没有实现Publisher, 因为对Uni的订阅就足以表明你对结果感兴趣. 这又是考虑到了更简单、更顺畅的API的想法, 因为Reactive Streams的订阅/请求仪式更复杂.

拥抱Quarkus的响应式和命令式统一的支柱吧, Uni和Multi都提供了通往命令式结构 (📢 : 命令式结构即传统的java代码形式)的桥梁. 例如, 你可以将Multi转化为Iterable或者await Uni产出的item.

// 阻塞直到结果可用
String result = uni.await().indefinitely();

// 将异步的流转换为同步的迭代器
stream.subscribe().asIterable().forEach(s -> System.out.println("Item is " + s));

在这一点上, 如果你是RxJava或Reactor的用户, 你可能会想如何使用你熟悉的Flowable、Single、Flux、Mono…Mutiny允许将Unis和Multis从RX Java和Reactor类型中转换出来.

Maybe<String> maybe = uni.convert().with(UniRxConverters.toMaybe());
Flux<String> flux = multi.convert().with(MultiReactorConverters.toFlux());

但是, Vert.x呢?Vert.x的API也可以使用Mutiny类型. 下面的片段显示了Vert.x Web客户端的用法.

// Use io.vertx.mutiny.ext.web.client.WebClient
client = WebClient.create(vertx,
                new WebClientOptions().setDefaultHost("fruityvice.com").setDefaultPort(443).setSsl(true)
                        .setTrustAll(true));
// ...
Uni<JsonObject> uni =
    client.get("/api/fruit/" + name)
        .send()
        .onItem().transform(resp -> {
            if (resp.statusCode() == 200) {
                return resp.bodyAsJsonObject();
            } else {
                return new JsonObject()
                        .put("code", resp.statusCode())
                        .put("message", resp.bodyAsString());
            }
        });

最后但并非最不重要的是, Mutiny内置了与SmallRye Context Propagation的集成, 因此你可以在你的响应式管道中传播事务、可追溯数据等.

不过, 说得够多了, 让我们动手吧!

运行程序

有几种方法可以用Quarkus实现响应式应用. 在本教程中, 我们将使用RESTEasy Reactive, 这是一个受益于Quarkus响应式引擎的RESTEasy实现. 默认情况下, 它在I/O线程上调用HTTP端点.

虽然可以使用传统的RESTEasy和Mutiny搭配, 但你需要添加quarkus-resteasy-mutiny扩展, 而且该方法仍然会在一个工作线程上被调用. 因此, 虽然这种搭配照样可以进行响应式编程, 但却还是需要工作线程, 这就违背了响应式的目的.

对于Linux和MacOS用户,

mvn io.quarkus:quarkus-maven-plugin:2.0.2.Final:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=getting-started-reactive \
    -DclassName="org.acme.getting.started.ReactiveGreetingResource" \
    -Dpath="/hello" \
    -Dextensions="resteasy-reactive"
cd getting-started-reactive

对于Win用户,

mvn io.quarkus:quarkus-maven-plugin:2.0.2.Final:create -DprojectGroupId=org.acme -DprojectArtifactId=getting-started-reactive -DclassName="org.acme.getting.started.ReactiveGreetingResource" -Dpath="/hello" -Dextensions="resteasy-reactive"
mvn io.quarkus:quarkus-maven-plugin:2.0.2.Final:create "-DprojectGroupId=org.acme" "-DprojectArtifactId=getting-started-reactive" "-DclassName=org.acme.getting.started.ReactiveGreetingResource" "-Dpath=/hello" "-Dextensions=resteasy-reactive"

项目会生成在./getting-started-reactive, 里面包含了如下内容 :

响应式JAX-RS资源

在项目创建过程中, src/main/java/org/acme/getting/started/ReactiveGreetingResource.java文件已经被创建, 内容如下 :

package org.acme.getting.started;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path("/hello")
public class ReactiveGreetingResource {

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public String hello() {
        return "Hello RESTEasy Reactive";
    }
}

这是一个非常简单的REST端点, 对/hello的请求返回 Hello RESTEasy Reactive字符串. 由于它使用了RESTEAsy Reactive, 这个方法是在I/O线程上调用的.

如果想强行让Quarkus在一个工作线程上调用这个方法, 请用io.smallrye.common.annotation.Blocking注解来注解它. 你可以在一个方法、一个类上使用@Blocking, 也可以通过注解一个应用类来为整个应用程序启用它.

import javax.ws.rs.ApplicationPath;
import javax.ws.rs.core.Application;
import io.smallrye.common.annotation.Blocking;

@ApplicationPath("/")
@Blocking
public class RestBlockingApplication extends Application {
}

现在让我们创建一个ReactiveGreetingService类, 内容如下 :

package org.acme.getting.started;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;

import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;

@ApplicationScoped
public class ReactiveGreetingService {

    public Uni<String> greeting(String name) {
        return Uni.createFrom().item(name)
                .onItem().transform(n -> String.format("hello %s", n));
    }
}

然后, 编辑ReactiveGreetingResource类以匹配以下内容 :

package org.acme.getting.started;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import org.reactivestreams.Publisher;

@Path("/hello")
public class ReactiveGreetingResource {

    @Inject
    ReactiveGreetingService service;

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("/greeting/{name}")
    public Uni<String> greeting(String name) {
        return service.greeting(name);
    }

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public String hello() {
        return "hello";
    }
}

ReactiveGreetingService类包含一个产生Uni的简单方法. 虽然在这个例子中, 产生的item被立即返回, 但你可以想象到的是任何异步API的返回值都是一个Uni或者Multi. 我们将在本教程的后面介绍这一点.

现在, 用以下方式启动应用程序 :

mvn quarkus:dev

一旦运行, 通过打开http://localhost:8080/hello/greeting/neo, 检查你是否得到预期的问候信息.

处理数据流

到目前为止, 我们只返回一个异步的结果. 在这一节中, 我们用传达多个项目的流来扩展这个应用程序. 这些流可以来自Kafka或任何其他数据源, 但为了保持简单, 我们只是生成定期的问候信息.

ReactiveGreetingService中, 添加以下方法 :

public Multi<String> greetings(int count, String name) {
  return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
        .onItem().transform(n -> String.format("hello %s - %d", name, n))
        .select().first(count);
}

你可能需要添加 import io.smallrye.mutiny.Multi; 和import java.time.Duration; 语句.

它每秒钟生成一条问候信息, 在计数信息后停止.

ReactiveGreetingResource中添加以下方法.

@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/greeting/{count}/{name}")
public Multi<String> greetings(int count, String name) {
  return service.greetings(count, name);
}

这个端点将项目以JSON数组的形式流向客户端. 消息的名称和数量是用路径参数来设定的.

因此, 调用该端点会产生类似的结果 :

$ curl http://localhost:8080/hello/greeting/3/neo
["hello neo - 0", "hello neo - 1", "hello neo - 2"]

我们还可以通过返回一个Multi来生成服务器发送的事件响应:

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestSseElementType(MediaType.TEXT_PLAIN)
@Path("/stream/{count}/{name}")
public Multi<String> greetingsAsStream(int count, String name) {
    return service.greetings(count, name);
}

与前面的代码片段唯一不同的是@Produces的类型和 @RestSseElementType 注解代表的每个事件的类型. 由于 @Produces 注解定义了 SERVER_SENT_EVENTS, JAX-RS 需要它来知道每个 (嵌套)事件的内容类型.

你可能需要添加 import org.jboss.resteasy.reactive.RestSseElementType; 语句.

你可以通过以下方式看到结果 :

$ curl -N http://localhost:8080/hello/stream/5/neo
data: hello neo - 0

data: hello neo - 1

data: hello neo - 2

data: hello neo - 3

data: hello neo - 4

使用响应式API

使用Quarkus响应式API

Quarkus使用Mutiny模型提供了许多响应式API. 在这一节中, 我们将看到如何使用Reactive PostgreSQL驱动, 以非阻塞和响应式的方式与你的数据库交互.

使用创建一个新的项目 :

mvn io.quarkus:quarkus-maven-plugin:2.0.2.Final:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=getting-started-reactive-crud \
    -DclassName="org.acme.reactive.crud.FruitResource" \
    -Dpath="/fruits" \
    -Dextensions="resteasy-reactive,resteasy-reactive-jackson,reactive-pg-client"
cd getting-started-reactive-crud

这个应用程序是与PostgreSQL数据库交互的, 所以你需要一个数据库, 这里直接用docker启动一个, 如果没安装docker则需要本机安装一个数据库 :

docker run --ulimit memlock=-1:-1 -it --rm=true --memory-swappiness=0 \
           --name postgres-quarkus-reactive -e POSTGRES_USER=quarkus_test \
           -e POSTGRES_PASSWORD=quarkus_test -e POSTGRES_DB=quarkus_test \
           -p 5432:5432 postgres:11.2

然后, 让我们来配置我们的数据源. 打开src/main/resources/application.properties, 添加以下内容 :

quarkus.datasource.db-kind=postgresql
quarkus.datasource.username=quarkus_test
quarkus.datasource.password=quarkus_test
quarkus.datasource.reactive.url=postgresql://localhost:5432/quarkus_test
myapp.schema.create=true

前面的3行定义了数据源. 最后一行将在应用程序中使用, 以表明我们是否在应用程序被初始化时插入一些项目.

现在, 让我们来创建我们的实体. 创建org.acme.reactive.crud.Fruit类, 内容如下 :

package org.acme.reactive.crud;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.pgclient.PgPool;
import io.vertx.mutiny.sqlclient.Row;
import io.vertx.mutiny.sqlclient.RowSet;
import io.vertx.mutiny.sqlclient.Tuple;

import java.util.stream.StreamSupport;

public class Fruit {

    public Long id;

    public String name;

    public Fruit() {
        // default constructor.
    }

    public Fruit(String name) {
        this.name = name;
    }

    public Fruit(Long id, String name) {
        this.id = id;
        this.name = name;
    }

    public static Multi<Fruit> findAll(PgPool client) {
        return client.query("SELECT id, name FROM fruits ORDER BY name ASC").execute()
                // Create a Multi from the set of rows:
                .onItem().transformToMulti(set -> Multi.createFrom().items(() -> StreamSupport.stream(set.spliterator(), false)))
                // For each row create a fruit instance
                .onItem().transform(Fruit::from);
    }

    public static Uni<Fruit> findById(PgPool client, Long id) {
        return client.preparedQuery("SELECT id, name FROM fruits WHERE id = $1").execute(Tuple.of(id))
                .onItem().transform(RowSet::iterator)
                .onItem().transform(iterator -> iterator.hasNext() ? from(iterator.next()) : null);
    }

    public Uni<Long> save(PgPool client) {
        return client.preparedQuery("INSERT INTO fruits (name) VALUES ($1) RETURNING (id)").execute(Tuple.of(name))
                .onItem().transform(pgRowSet -> pgRowSet.iterator().next().getLong("id"));
    }

    public Uni<Boolean> update(PgPool client) {
        return client.preparedQuery("UPDATE fruits SET name = $1 WHERE id = $2").execute(Tuple.of(name, id))
                .onItem().transform(pgRowSet -> pgRowSet.rowCount() == 1);
    }

    public static Uni<Boolean> delete(PgPool client, Long id) {
        return client.preparedQuery("DELETE FROM fruits WHERE id = $1").execute(Tuple.of(id))
                .onItem().transform(pgRowSet -> pgRowSet.rowCount() == 1);
    }

    private static Fruit from(Row row) {
        return new Fruit(row.getLong("id"), row.getString("name"));
    }
}

这个实体包含一些字段和方法, 用于从数据库中查找、更新和删除行. 这些方法返回Unis或Multis, 因为当结果被检索到时, 所产生的项目会被异步地返回出来. 注意, quarkus已经封装好了响应式PostgreSQL客户端以兼容Mutiny, 所以你只需将数据库中的结果转化为业务友好的对象.

为了在应用程序启动时初始化数据库, 我们将创建一个名为DBInit的类, 内容如下.

package org.acme.reactive.crud;

import io.quarkus.runtime.StartupEvent;
import io.vertx.mutiny.pgclient.PgPool;
import org.eclipse.microprofile.config.inject.ConfigProperty;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;

@ApplicationScoped
public class DBInit {

    private final PgPool client;
    private final boolean schemaCreate;

    public DBInit(PgPool client, @ConfigProperty(name = "myapp.schema.create", defaultValue = "true") boolean schemaCreate) {
        this.client = client;
        this.schemaCreate = schemaCreate;
    }

    void onStart(@Observes StartupEvent ev) {
        if (schemaCreate) {
            initdb();
        }
    }

    private void initdb() {
        client.query("DROP TABLE IF EXISTS fruits").execute()
                .flatMap(r -> client.query("CREATE TABLE fruits (id SERIAL PRIMARY KEY, name TEXT NOT NULL)").execute())
                .flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Kiwi')").execute())
                .flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Durian')").execute())
                .flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Pomelo')").execute())
                .flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Lychee')").execute())
                .await().indefinitely();
    }
}

然后, 让我们在FruitResource中使用这个Fruit类. 编辑FruitResource类, 使其与以下内容相匹配 :

package org.acme.reactive.crud;

import java.net.URI;

import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Response.Status;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.pgclient.PgPool;

@Path("fruits")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class FruitResource {

    private final PgPool client;

    public FruitResource(PgPool client) {
        this.client = client;
    }

    @GET
    public Multi<Fruit> get() {
        return Fruit.findAll(client);
    }

    @GET
    @Path("{id}")
    public Uni<Response> getSingle(Long id) {
        return Fruit.findById(client, id)
                .onItem().transform(fruit -> fruit != null ? Response.ok(fruit) : Response.status(Status.NOT_FOUND))
                .onItem().transform(ResponseBuilder::build);
    }

    @POST
    public Uni<Response> create(Fruit fruit) {
        return fruit.save(client)
                .onItem().transform(id -> URI.create("/fruits/" + id))
                .onItem().transform(uri -> Response.created(uri).build());
    }

    @PUT
    @Path("{id}")
    public Uni<Response> update(Long id, Fruit fruit) {
        return fruit.update(client)
                .onItem().transform(updated -> updated ? Status.OK : Status.NOT_FOUND)
                .onItem().transform(status -> Response.status(status).build());
    }

    @DELETE
    @Path("{id}")
    public Uni<Response> delete(Long id) {
        return Fruit.delete(client, id)
                .onItem().transform(deleted -> deleted ? Status.NO_CONTENT : Status.NOT_FOUND)
                .onItem().transform(status -> Response.status(status).build());
    }
}

该资源根据Fruit类产生的结果返回UniMulti实例.

使用Vert.x客户端

前面的例子使用的是Quarkus提供的服务. 同时, 你也可以直接使用Vert.x客户端.

首先, 确保quarkus-vertx扩展已经存在. 如果没有, 通过执行下面的命令激活该扩展 :

mvn io.quarkus:quarkus-maven-plugin:2.0.2.Final:add-extensions \
    -Dextensions=vertx

或者手动添加quarkus-vertx到你的依赖项中 :

<dependency>
	<groupId>io.quarkus</groupId>
	<artifactId>quarkus-vertx</artifactId>
</dependency>

这里是一个Mutiny版本的Vert.x API列表. 这个API被分为几个工件, 你可以独立导入.

groupId:artifactId Description
io.smallrye.reactive:smallrye-mutiny-vertx-core Mutiny API for Vert.x Core
io.smallrye.reactive:smallrye-mutiny-vertx-mail-client Mutiny API for the Vert.x Mail Client
io.smallrye.reactive:smallrye-mutiny-vertx-web-client Mutiny API for the Vert.x Web Client
io.smallrye.reactive:smallrye-mutiny-vertx-mongo-client Mutiny API for the Vert.x Mongo Client
io.smallrye.reactive:smallrye-mutiny-vertx-redis-client Mutiny API for the Vert.x Redis Client
io.smallrye.reactive:smallrye-mutiny-vertx-cassandra-client Mutiny API for the Vert.x Cassandra Client
io.smallrye.reactive:smallrye-mutiny-vertx-consul-client Mutiny API for the Vert.x Consul Client
io.smallrye.reactive:smallrye-mutiny-vertx-kafka-client Mutiny API for the Vert.x Kafka Client
io.smallrye.reactive:smallrye-mutiny-vertx-amqp-client Mutiny API for the Vert.x AMQP Client
io.smallrye.reactive:smallrye-mutiny-vertx-rabbitmq-client Mutiny API for the Vert.x RabbitMQ Client

你也可以在http://smallrye.io/smallrye-reactive-utils/apidocs/, 查看可用的API.

让我们举个例子. 在你的应用程序中添加以下依赖关系 :

<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>smallrye-mutiny-vertx-web-client</artifactId>
</dependency>

它提供了Vert.x Web客户端的Mutiny API. 然后, 你可以按以下方式使用网络客户端 :

package org.acme.vertx;

import io.smallrye.mutiny.Uni;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClientOptions;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.ext.web.client.WebClient;
import org.jboss.resteasy.annotations.jaxrs.PathParam;

import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path("/fruit-data")
public class ResourceUsingWebClient {

    private final WebClient client;

    public ResourceUsingWebClient(Vertx vertx) {
        this.client = WebClient.create(vertx,
                new WebClientOptions().setDefaultHost("fruityvice.com").setDefaultPort(443).setSsl(true)
                        .setTrustAll(true));
    }

    @GET
    @Produces(MediaType.APPLICATION_JSON)
    @Path("/{name}")
    public Uni<JsonObject> getFruitData(@PathParam("name") String name) {
        return client.get("/api/fruit/" + name)
                .send()
                .map(resp -> {
                    if (resp.statusCode() == 200) {
                        return resp.bodyAsJsonObject();
                    } else {
                        return new JsonObject()
                                .put("code", resp.statusCode())
                                .put("message", resp.bodyAsString());
                    }
                });
    }

}

有两个重要的点.

Mutiny版本的Vert.x APIs还提供了.

使用RxJava或Reactor APIs

Mutiny提供了将RxJava 2Project Reactor类型转换为Uni和Multi的实用程序.

RxJava 2转换器在以下依赖性中可用 :

<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>mutiny-rxjava</artifactId>
</dependency>

因此, 如果你有一个返回RxJava 2类型 (Completable, Single, Maybe, Observable, Flowable)的API, 你可以按以下方式创建Unis和Multis :

mport io.smallrye.mutiny.converters.multi.MultiRxConverters;
import io.smallrye.mutiny.converters.uni.UniRxConverters;
// ...
Uni<Void> uniFromCompletable = Uni.createFrom().converter(UniRxConverters.fromCompletable(), completable);
Uni<String> uniFromSingle = Uni.createFrom().converter(UniRxConverters.fromSingle(), single);
Uni<String> uniFromMaybe = Uni.createFrom().converter(UniRxConverters.fromMaybe(), maybe);
Uni<String> uniFromEmptyMaybe = Uni.createFrom().converter(UniRxConverters.fromMaybe(), emptyMaybe);
Uni<String> uniFromObservable = Uni.createFrom().converter(UniRxConverters.fromObservable(), observable);
Uni<String> uniFromFlowable = Uni.createFrom().converter(UniRxConverters.fromFlowable(), flowable);

Multi<Void> multiFromCompletable = Multi.createFrom().converter(MultiRxConverters.fromCompletable(), completable);
Multi<String> multiFromSingle = Multi.createFrom().converter(MultiRxConverters.fromSingle(), single);
Multi<String> multiFromMaybe = Multi.createFrom().converter(MultiRxConverters.fromMaybe(), maybe);
Multi<String> multiFromEmptyMaybe = Multi.createFrom().converter(MultiRxConverters.fromMaybe(), emptyMaybe);
Multi<String> multiFromObservable = Multi.createFrom().converter(MultiRxConverters.fromObservable(), observable);
Multi<String> multiFromFlowable = Multi.createFrom().converter(MultiRxConverters.fromFlowable(), flowable);

你也可以将Unis和Multis转化为RxJava类型 :

Completable completable = uni.convert().with(UniRxConverters.toCompletable());
Single<Optional<String>> single = uni.convert().with(UniRxConverters.toSingle());
Single<String> single2 = uni.convert().with(UniRxConverters.toSingle().failOnNull());
Maybe<String> maybe = uni.convert().with(UniRxConverters.toMaybe());
Observable<String> observable = uni.convert().with(UniRxConverters.toObservable());
Flowable<String> flowable = uni.convert().with(UniRxConverters.toFlowable());
// ...
Completable completable = multi.convert().with(MultiRxConverters.toCompletable());
Single<Optional<String>> single = multi.convert().with(MultiRxConverters.toSingle());
Single<String> single2 = multi.convert().with(MultiRxConverters
        .toSingle().onEmptyThrow(() -> new Exception("D'oh!")));
Maybe<String> maybe = multi.convert().with(MultiRxConverters.toMaybe());
Observable<String> observable = multi.convert().with(MultiRxConverters.toObservable());
Flowable<String> flowable = multi.convert().with(MultiRxConverters.toFlowable());

Project Reactor转换器在以下依赖性中可用 :

<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>mutiny-reactor</artifactId>
</dependency>

因此, 如果你有一个返回Reactor类型的API (Mono, Flux), 你可以按以下方式创建Unis和Multis :

import io.smallrye.mutiny.converters.multi.MultiReactorConverters;
import io.smallrye.mutiny.converters.uni.UniReactorConverters;
// ...
Uni<String> uniFromMono = Uni.createFrom().converter(UniReactorConverters.fromMono(), mono);
Uni<String> uniFromFlux = Uni.createFrom().converter(UniReactorConverters.fromFlux(), flux);

Multi<String> multiFromMono = Multi.createFrom().converter(MultiReactorConverters.fromMono(), mono);
Multi<String> multiFromFlux = Multi.createFrom().converter(MultiReactorConverters.fromFlux(), flux);

你也可以将Unis和Multis转化为反应器类型 :

Mono<String> mono = uni.convert().with(UniReactorConverters.toMono());
Flux<String> flux = uni.convert().with(UniReactorConverters.toFlux());

Mono<String> mono2 = multi.convert().with(MultiReactorConverters.toMono());
Flux<String> flux2 = multi.convert().with(MultiReactorConverters.toFlux());

使用 CompletionStages 或 Publisher API

如果你面对一个使用CompletionStage、CompletableFuture或Publisher的API, 你可以来回转换. 首先, Uni和Multi都可以从一个CompletionStage或从一个Supplier< CompletionStage >创建. 比如说 :

CompletableFuture<String> future = Uni
        // Create from a Completion Stage
        .createFrom().completionStage(CompletableFuture.supplyAsync(() -> "hello"));

在Uni上, 你也可以使用subscribeAsCompletionStage()产生一个CompletionStage, 它将获得Uni所发出的item或失败.

你还可以使用createFrom().publisher(Publisher)从Publisher的实例中创建Unis和Multis. 你可以使用toMulti将Uni转换为Publisher. 事实上, Multi 实现了 Publisher.

总结

📢 : 如果你已经看到这里了, 说明你已经学会1+1=2了, 接下来可以去解微分方程了👏

简单归纳一下这篇教程的重点 :

  1. 什么是响应式编程 ?
  2. 怎么使用Mutiny这个库 ?

quarkus厉害就厉害在它官方原生统一了命令式编程和响应式编程, 并且所有的响应式API都会统一返回一个Uni或者Multi, 因此你只要学会怎么用Mutiny即可.

What’s next?

本教程是对Quarkus中的reactive的介绍. 有很多Quarkus的功能已经是响应式的了. 下面的列表给你几个例子 :