问题描述
我正在使用 Spring Webflux 进行评估,但我们必须支持期望 application/json 而不是 application/stream+json 的客户端。 我不清楚 Spring WebFlux 如何在需要 application/json 的客户端的情况下处理序列化 Flux。
如果 Flux 被序列化为 application/json 而不是 application/stream+json 是阻塞操作吗?
下面我将一个示例控制器放在一起来演示我所看到的。 当流是无限的并生成 application/json 时,不会向浏览器返回任何内容。 这似乎是合理的,因为它可能正在等待流终止。 当流是无限的并产生 application/stream+json 时,我会按预期在浏览器中连续看到 JSON 对象。 当 Flux 是有限的,比如 100 个元素,并且类型是 application/json 时,它会立即按预期呈现。 问题是,在序列化之前是否必须等待 Flux 终止,这是否会导致阻塞操作。 返回普通应用程序/json 时,使用 Flux 对性能和可扩展性有什么影响?
@RestController
public class ReactiveController {
/* Note: In the browser this sits forever and never renders */
@GetMapping(path = "/nonStreaming", produces = MediaType.APPLICATION_JSON_VALUE)
public Flux<Person> getPeopleNonStreaming() {
return Flux.interval(Duration.ofMillis(100))
.map(tick -> new Person("Dude", "Dude", tick));
}
/* Note: This renders in the browser in chunks forever */
@GetMapping(path = "/streaming", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<Person> getPeopleStreaming() {
return Flux.interval(Duration.ofMillis(100))
.map(tick -> new Person("Dude", "Dude", tick));
}
/* Note: This returns, but I can't tell if it is done in a non blocking manner. It
* appears to gather everything before serializing. */
@GetMapping(path = "/finiteFlux", produces = MediaType.APPLICATION_JSON_VALUE)
public Flux<Person> finiteFlux() {
return Flux.range(0, 100)
.map(tick -> new Person("Dude", "Dude", tick));
}
}
更新:
我在下面添加了其他日志信息:
流媒体似乎使用了两个不同的线程
2019-02-13 16:53:07.363 DEBUG 3416 --- [ctor-http-nio-2] o.s.w.s.adapter.HttpWebHandlerAdapter : [dac80fd4] HTTP GET "/streaming"
2019-02-13 16:53:07.384 DEBUG 3416 --- [ctor-http-nio-2] s.w.r.r.m.a.RequestMappingHandlerMapping : [dac80fd4] Mapped to public reactor.core.publisher.Flux<io.jkratz.reactivedemo.Person> io.jkratz.reactivedemo.ReactiveController.getPeopleStreaming()
2019-02-13 16:53:07.398 DEBUG 3416 --- [ctor-http-nio-2] o.s.w.r.r.m.a.ResponseBodyResultHandler : Using 'application/stream+json;q=0.8' given [text/html, application/xhtml+xml, image/webp, image/apng, application/xml;q=0.9, */*;q=0.8] and supported [application/stream+json]
2019-02-13 16:53:07.398 DEBUG 3416 --- [ctor-http-nio-2] o.s.w.r.r.m.a.ResponseBodyResultHandler : [dac80fd4] 0..N [io.jkratz.reactivedemo.Person]
2019-02-13 16:53:07.532 DEBUG 3416 --- [ parallel-1] o.s.http.codec.json.Jackson2JsonEncoder : [dac80fd4] Encoding [io.jkratz.reactivedemo.Person@6b3e843d]
2019-02-13 16:53:07.566 DEBUG 3416 --- [ctor-http-nio-2] r.n.channel.ChannelOperationsHandler : [id: 0xdac80fd4, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52398] Writing object DefaultHttpResponse(decodeResult: success, version: HTTP/1.1)
HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: application/stream+json;q=0.8;charset=UTF-8
2019-02-13 16:53:07.591 DEBUG 3416 --- [ctor-http-nio-2] r.n.channel.ChannelOperationsHandler : [id: 0xdac80fd4, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52398] Writing object
2019-02-13 16:53:07.629 DEBUG 3416 --- [ parallel-1] o.s.http.codec.json.Jackson2JsonEncoder : [dac80fd4] Encoding [io.jkratz.reactivedemo.Person@217d62db]
2019-02-13 16:53:07.630 DEBUG 3416 --- [ctor-http-nio-2] r.n.channel.ChannelOperationsHandler : [id: 0xdac80fd4, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52398] Writing object
2019-02-13 16:53:07.732 DEBUG 3416 --- [ parallel-1] o.s.http.codec.json.Jackson2JsonEncoder : [dac80fd4] Encoding [io.jkratz.reactivedemo.Person@741c0c88]
2019-02-13 16:53:07.732 DEBUG 3416 --- [ctor-http-nio-2] r.n.channel.ChannelOperationsHandler : [id: 0xdac80fd4, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52398] Writing object
2019-02-13 16:53:07.832 DEBUG 3416 --- [ parallel-1] o.s.http.codec.json.Jackson2JsonEncoder : [dac80fd4] Encoding [io.jkratz.reactivedemo.Person@7b8532e5]
而有限的 JSON 仅使用单个线程。
2019-02-13 16:55:34.431 DEBUG 3416 --- [ctor-http-nio-3] o.s.w.s.adapter.HttpWebHandlerAdapter : [5b048f46] HTTP GET "/finiteFlux"
2019-02-13 16:55:34.432 DEBUG 3416 --- [ctor-http-nio-3] s.w.r.r.m.a.RequestMappingHandlerMapping : [5b048f46] Mapped to public reactor.core.publisher.Flux<io.jkratz.reactivedemo.Person> io.jkratz.reactivedemo.ReactiveController.finiteFlux()
2019-02-13 16:55:34.434 DEBUG 3416 --- [ctor-http-nio-3] o.s.w.r.r.m.a.ResponseBodyResultHandler : Using 'application/json;q=0.8' given [text/html, application/xhtml+xml, image/webp, image/apng, application/xml;q=0.9, */*;q=0.8] and supported [application/json]
2019-02-13 16:55:34.435 DEBUG 3416 --- [ctor-http-nio-3] o.s.w.r.r.m.a.ResponseBodyResultHandler : [5b048f46] 0..N [io.jkratz.reactivedemo.Person]
2019-02-13 16:55:34.439 DEBUG 3416 --- [ctor-http-nio-3] o.s.http.codec.json.Jackson2JsonEncoder : [5b048f46] Encoding [[io.jkratz.reactivedemo.Person@425c8296, io.jkratz.reactivedemo.Person@22ae73df, io.jkratz.reactived (truncated)...]
2019-02-13 16:55:34.448 DEBUG 3416 --- [ctor-http-nio-3] r.n.channel.ChannelOperationsHandler : [id: 0x5b048f46, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52991] Writing object DefaultHttpResponse(decodeResult: success, version: HTTP/1.1)
HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: application/json;q=0.8;charset=UTF-8
2019-02-13 16:55:34.448 DEBUG 3416 --- [ctor-http-nio-3] r.n.channel.ChannelOperationsHandler : [id: 0x5b048f46, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52991] Writing object
2019-02-13 16:55:34.450 DEBUG 3416 --- [ctor-http-nio-3] o.s.w.s.adapter.HttpWebHandlerAdapter : [5b048f46] Completed 200 OK
2019-02-13 16:55:34.450 DEBUG 3416 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations : [id: 0x5b048f46, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52991] Last HTTP response frame
2019-02-13 16:55:34.450 DEBUG 3416 --- [ctor-http-nio-3] r.n.channel.ChannelOperationsHandler : [id: 0x5b048f46, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52991] Writing object EmptyLastHttpContent
2019-02-13 16:55:34.450 DEBUG 3416 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations : [id: 0x5b048f46, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52991] Decreasing pending responses, now 0
2019-02-13 16:55:34.451 DEBUG 3416 --- [ctor-http-nio-3] r.n.channel.ChannelOperationsHandler : [id: 0x5b048f46, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52991] No ChannelOperation attached. Dropping: EmptyLastHttpContent
1楼
在流式 mimetype ( application/stream+json
) 的情况下,Spring WebFlux 中默认配置的 JSON 编解码器将序列化为 JSON 并在网络上刷新Flux
输入的每个元素。
当流是无限的,或者当您想在信息可用时立即将信息推送到客户端时,此行为很方便。
请注意,这会带来性能成本,因为多次调用序列化程序和刷新会占用资源。
对于非流式类型( application/json
),Spring WebFlux 中默认配置的 JSON 编解码器将序列化为 JSON 并一次性刷新到网络。
它将在内存中缓冲Flux<YourObject>
并一次性将其序列化。
这并不意味着操作是阻塞的,因为生成的Flux<Databuffer>
以反应方式写入网络。
这里没有什么阻碍。
这只是“流式传输数据和使用更多资源”与“更有效地缓冲和使用资源”之间的权衡。
在流的情况下,事情更有可能由不同的工作线程处理,因为工作项在不同的时间间隔可用。 在简单的 JSON 响应的情况下 - 它也可能由一个或多个线程处理:这取决于有效负载大小,如果远程客户端很慢。
2楼
似乎所有的魔法都发生在 AbstractJackson2Encoder#encode 方法中。
这是常规application/json
序列化的代码:
// non-streaming
return Flux.from(inputStream)
.collectList() // make Mono<List<YourClass>> from Flux<YourClass>
.map(list -> encodeValue(list, bufferFactory, listType, mimeType, hints)) // serialize list to JSON and write to DataBuffer
.flux(); // get Flux<DataBuffer> from Mono<DataBuffer>
所以,是的,它在序列化之前等待 Flux 终止。
性能改进是有问题的,因为它总是必须等待所有数据序列化。
因此,在application/json
媒体类型的情况下,Flux 或常规List
之间没有太大区别