当前位置: 代码迷 >> 综合 >> Scala zio-streams 与 akka-stream 的集成 ZStream错误处理
  详细解决方案

Scala zio-streams 与 akka-stream 的集成 ZStream错误处理

热度:108   发布时间:2023-09-23 09:57:00.0

zio-streams 与 akka-stream 的集成

总的来说,我在 zim 中集成 akka-stream,其实只是为了集成 akka-http,众所周知,akka-http 是构建在 akka-stream 上的,而 akka-stream 依赖 akka-actor。

对于任意 zio 应用,无外乎就是返回ZIOZStream类型的数据。zim 中为了省事,所有接口都使用了ZStream,这点其实很不好,因为ZStreamZIO操作更多,更复杂。
所以 zim 代码中到处都是runHead,甚至不少runCollect,这都是因为过度使用ZStream导致的。一般而言,仅当数据返回(长)列表时,使用ZStream才有意义。

受限于第一次写 zio 应用,目前也只能把东西搞出来,至于搞得好不好目前看来还不行,所以请忽悠以下代码的丑陋,仅供学习和参考。

zim 第二个问题是,需要兼容旧的前端接口,旧返回结构ResultSet如下:

{
    "code": 0,"msg": "","data":[]
}{
    "code": 0,"msg": "","data":{
    }
}

由于最终在外层包裹了一层JSON,所以实际上是把ZStream作为一个普通集合来使用的,并没有使用流的特性。
个人认为,因为最终返回的是一个大对象而不是List生成的 stream,这意味即使使用流了,也是一次性传输的。

鉴于上面的大前提,下面仅是解决思路,不是最佳实践。

依赖

zio-interop-reactivestreams
zio-streams
akka-http
akka-stream

核心代码如下:

  /*** @tparam T 支持的多元素的类型* @return*/def buildFlowResponse[T <: Product]: stream.Stream[Throwable, T] => Future[Either[ZimError, Source[ByteString, Any]]] = respStream => {
    val resp = for {
    list <- respStream.runCollectresp = ResultSet[List[T]](data = list.toList).asJson.noSpacesr <- ZStream(resp).map(body => ByteString(body)).toPublisher} yield rFuture.successful(Right(Source.fromPublisher(unsafeRun(resp))))}

上面代码的一些注意事项:

  1. 该函数需要输入一个Stream[Throwable, T]ZStream的别名)
  2. 该函数需要泛型T,以便支持所有T的序列化,这里asJson来自Circe库。
  3. 该函数输出一个akka stream类型Future[Either[ZimError, Source[ByteString, Any]]],最外层加Future是为了给tapir使用。
    • 这里是从Publisher创建Source
    • 该函数直接在内存进行runCollect,以便生成ResultSet对象所需要的data数据,创建出完整的ResultSet结构
    • 使用 zio-interop-reactivestreams 提供的toPublisher函数将ZStream转换为 reactive Streams,也就是 akka stream

如果不需要包装一层ResultSet就更方便了。

使用

这里是使用了tapir,暂时忽略。

  lazy val getOffLineMessageRoute: Route =AkkaHttpServerInterpreter().toRoute(ZimUserEndpoint.getOffLineMessageEndpoint.serverLogic {
     user => _ =>// getOffLineMessage返回一个ZStreamval resultStream = apiApplication.getOffLineMessage(user.id)buildFlowResponse(resultStream)})

这样就创建了一个 akka-stream 的Route对象了。tapir 的好处是能使用 DSL 来描述请求和响应,并具备文档化的能力,同时兼容各大 Scala web平台。
需要注意的是,这里的Route和 akka http 自己的Route对使用者来说是没有任何区别。

ZStream 错误处理

在 ZIO 中,抛异常可以使用ZIO.fail(BusinessException()),或者在 stream 中使用ZStream.fail(BusinessException()),他们的处理方式相同。

以 zim 为例,有以下业务异常:

sealed trait ZimError extends Throwable with Product {
    val msg: Stringval code: Int
}object ZimError {
    case class BusinessException(override val code: Int = SystemConstant.ERROR,override val msg: String = SystemConstant.ERROR_MESSAGE) extends ZimError
}

在 akka-http 中可以使用ExceptionHandler统一处理Route的异常,如:

  // 注意这里是PartialFunction,不能使用`_`匹配// customExceptionHandler作为Route.seal(getOffLineMessageRoute)的隐式参数被使用。implicit def customExceptionHandler: ExceptionHandler = ExceptionHandler {
    case e: Unauthorized =>extractUri {
     uri =>Logger.root.error(s"Request to $uri could not be handled normally cause by ${e.toString}")getFromResource("static/html/403.html")}case e: Exception => extractUri {
     uri =>Logger.root.error(s"Request to $uri could not be handled normally cause by ${e.toString}")getFromResource("static/html/500.html")}}

还有一种方法是在 ZIO 处理完逻辑结束时,通过 ZIO 提供的一些操作把前面抛出的业务异常捕获为有效的返回数据,比如转成 json,如下:

  def buildFlowResponse[T <: Product]: stream.Stream[Throwable, T] => Future[Either[ZimError, Source[ByteString, Any]]] = respStream => {
    val resp = (for {
    list <- respStream.runCollect // 这样就变成了 ZIO 包含 List数据,不能没办法包一层 ResultSetresp = ResultSet[List[T]](data = list.toList).asJson.noSpacesr <- ZStream(resp).map(body => ByteString(body)).toPublisher } yield r).catchSome(catchStreamError) // 捕获一些特定异常,这样返回给akka-http的就不会出现错误,而是一个正常的 jsonFuture.successful(Right(Source.fromPublisher(unsafeRun(resp))))}// 匹配业务异常病获取错误信息和错误码,返回给前端// 如果需要返回错误页面或者重定向,哪还是得是有 akka-http 那种ExceptionHandler。@inline private def catchStreamError: PartialFunction[Throwable, ZIO[Any, Throwable, Publisher[ByteString]]] = {
    case BusinessException(ec, em) =>ZStream.succeed(ResultSet(code = ec, msg = em).asJson.noSpaces).map(body => ByteString(body)).toPublisher}: PartialFunction[Throwable, ZIO[Any, Throwable, Publisher[ByteString]]]

首次发布于 https://bitlap.org/lab/zim/zim_6

  相关解决方案