当前位置: 代码迷 >> 综合 >> Netty(编解码器框架)
  详细解决方案

Netty(编解码器框架)

热度:34   发布时间:2023-11-20 01:27:16.0

    每个网络应用程序都必须定义如何解析在两个节点之间来回传输的原始字节,以及如何将其和目标应用程序的数据格式做相互转换。这种转换逻辑由编解码器处理,编解码器  由编码器和解码器组成,它们每种都可以将字节流从一种格式转换为另一种格式。

    如果将消息看作是对于特定的应用程序具有具体含义的结构化的字节序列—它的数据。那么编码器是将消息转换为适合于传输的格式(最有可能的就是字节流);而对应的解码器则是将网络字节流转换回应用程序的消息格式。因此,编码器操作出站数据,而解码器处理入站数据。

解码器

因为解码器是负责将入站数据从一种格式转换到另一种格式的,所以Netty的解码器实现了ChannelInboundHandler。

  • 将字节解码为消息——ByteToMessageDecoder和ReplayingDecoder;
  • 将一种消息类型解码为另一种——MessageToMessageDecoder。

抽象类ByteToMessageDecoder 

    将字节解码为消息(或者另一个字节序列),Netty为它提供了一个抽象的基类:ByteToMessageDecoder。由于你不可能知道远程节点是否会一次性地发送一个完整的消息,所以这个类会对入站数据进行缓冲,直到它准备好处理。

方法

描述

decode(

            ChannelHandlerContext ctx,

            ByteBuf in,

            List<Object> out)

必须实现的唯一抽象方法。

方法被调用时传入一个包含传入数据的ByteBuf,和一个添加解码消息的List。

对方法的调用会重复进行,知道没有新元素被添加到List,或ByteBuf中没有更多可读取的字节。

如果List不为空,它的内容会被传递给ChannelPipeline中的下一个ChannelInboundHandler

decodeLast(

            ChannelHandlerContext ctx,

            ByteBuf in,

            List<Object> out)

简单调用decode()方法,

当Channel状态为非活动时,这个方法会被调用一次。

可以重写该方法已提供特殊处理

   假设接收了一个包含简单int的字节流,每个int都需要被单独处理。在这种情况下,需要从入站ByteBuf中读取每个int,并将它传递给ChannelPipeline中的下一个ChannelInboundHandler。为了解码这个字节流,需要扩展ByteToMessageDecoder类。 

public class ToIntegerDecoder extends ByteToMessageDecoder {@Overridepublic void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {if (in.readableBytes() >= 4) {out.add(in.readInt());}}
}

编解码器中的引用计数

    引用计数需要特别的注意。对于编码器和解码器来说,其过程也是相当的简单:一旦消息被编码或者解码,它就会被ReferenceCountUtil.release(message)调用自动释放。如果你需要保留引用以便稍后使用,那么你可以调用ReferenceCountUtil.retain(message)方法。这将会增加该引用计数,从而防止该消息被释放。

抽象类ReplayingDecoder

    ReplayingDecoder扩展了ByteToMessageDecoder类, 使得我们不必调用readableBytes()方法。它通过使用一个自定义的ByteBuf实现,ReplayingDecoderByteBuf,包装传入的ByteBuf实现了这一点,其将在内部执行该调用。

public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder

类型参数S指定了用于状态管理的类型,其中Void代表不需要状态管理。

public class ToIntegerDecoder2 extends ReplayingDecoder<Void> {//扩展Replaying-Decoder<Void>以将字节解码为消息@Overridepublic void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)//传入的ByteBuf是ReplayingDecoderByteBufthrowsException {out.add(in.readInt());//从入站ByteBuf中读取一个int,并将其添加到解码消息的List中}
}

    和之前一样,从ByteBuf中提取的int将会被添加到List中。如果没有足够的字节可用,这个readInt()方法的实现将会抛出一个Error,其将在基类中被捕获并处理。当有更多的数据可供读取时,该decode()方法将会被再次调用。

注意ReplayingDecoderByteBuf的下面这些方面:

  • 并不是所有的ByteBuf操作都被支持,如果调用了一个不被支持的方法,将会抛出一个UnsupportedOperationException;
  • ReplayingDecoder稍慢于ByteToMessageDecoder。

这里有一个简单的准则:如果使用ByteToMessageDecoder不会引入太多的复杂性,那么请使用它;否则,请使用ReplayingDecoder。 

更多的解码器

io.netty.handler.codec.LineBasedFrameDecoder—这个类在Netty内部也有使用,它使用了行尾控制字符(\n或者\r\n)来解析消息数据;

io.netty.handler.codec.http.HttpObjectDecoder—一个 HTTP数据的解码器。

在io.netty.handler.codec子包下面,你将会发现更多用于特定用例的编码器和解码器实现。

抽象类MessageToMessageDecoder

public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter

类型参数I指定了decode()方法的输入参数msg的类型,它是你必须实现的唯一方法。

方法

描述

decode (

          ChannelHandlerContext ctx,

          I msg,

          List<Object> out)

对于每个需要被解码为另一种格式的入站消息来说,该方法都将被调用。

解码消息随后会传递给 ChannelPipeline中的下一个ChannelInboundHandler

     示例,编写一个IntegerToStringDecoder解码器来扩展MessageTo-MessageDecoder<Integer>。它的decode()方法会把Integer参数转换为它的String表示:

public class IntegerToStringDecoder extends MessageToMessageDecoder<Integer>//扩展了MessageToMessageDecoder<Integer>
{@Overrideprotected void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {out.add(String.valueOf(msg));//将Integer消息转换为它的String表示,并将其添加到输出的List中}
}

io.netty.handler.codec.http.HttpObjectAggregator类,它扩展了MessageToMessageDecoder<HttpObject>

TooLongFrameException类

    由于Netty是一个异步框架,所以需要在字节可以解码之前在内存中缓冲它们。因此,不能让解码器缓冲大量的数据以至于耗尽可用的内存。为了解除这个常  见的顾虑,Netty提供了TooLongFrameException类,其将由解码器在帧超出指定的大小限制时抛出。

    为了避免这种情况,你可以设置一个最大字节数的阈值,如果超出该阈值,则会导致抛出一个TooLongFrameException(随后会被ChannelHandler.exceptionCaught()方法捕获)。然后,如何处理该异常则完全取决于该解码器的用户。某些协议(如HTTP)可能允许你返回一个特殊的响应。而在其他的情况下,唯一的选择可能就是关闭对应的连接。

public class ToIntegerDecoder extends ByteToMessageDecoder {private static final int MAX_FRAME_SIZE= 1024;@Overridepublic void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {int readable = in.readableBytes();if (readable  > MAX_FRAME_SIZE)//检查缓冲区中是否有超过MAX_FRAME_SIZE个字节{in.skipBytes(readable);//130第10章  编解码器框架将Integer消息转换为它的String表示,并将其添加到输出的List中跳过所有的可读字节,抛出TooLongFrame-Exception并通知ChannelHandlerthrow new TooLongFrameException("Frametoo  big!");}if (readable >= 4) {out.add(in.readInt());}}
}

编码器

编码器实现了ChannelOutboundHandler,并将出站数据从一种格式转换为另一种格式,和解码器的功能正好相反。

  • 将消息编码为字节;
  • 将消息编码为消息。

抽象类MessageToByteEncoder

方法

描述

encode(

          ChannelHandlerContext ctx,

          I msg,

          ByteBuf out)

encode()方法是你需要实现的唯一抽象方法。它被调用时将会传入要被该类编码为ByteBuf的(类型为I的)出站消息。该ByteBuf随后将会被转发给ChannelPipeline中的下一个ChannelOutboundHandler

    这个类只有一个方法,而解码器有两个。原因是解码器通常需要在Channel关闭之后产生最后一个消息(因此也就有了decodeLast()方法)。这显然不适用于编码器的场景——在连接被关闭之后仍然产生一个消息是毫无意义的。

    示例,ShortToByteEncoder,其接受一个Short类型的实例作为消息,将它编码为Short的原子类型值,并将它写入ByteBuf中,其将随后被转发给ChannelPipeline中的下一个ChannelOutboundHandler。每个传出的Short值都将会占用ByteBuf中的2字节。

public class ShortToByteEncoder extends MessageToByteEncoder<Short> {@Overridepublic voidencode(ChannelHandlerContext ctx, Short msg, ByteBuf out) throws Exception {out.writeShort(msg);//将Short写入ByteBuf中}
}

 抽象类MessageToMessageEncoder

方法

描述

encode(ChannelHandlerContext ctx,I msg,List<Object> out) 这是你需要实现的唯一方法。每个通过write()方法写入的消息都将会被传递给encode()方法,以编码为一个或者多个出站消息。随后,这些出站消息将会被转发给ChannelPipeline中的下一个ChannelOutboundHandler

示例,编码器将每个出站Integer的String表示添加到了该List中。 

public class IntegerToStringEncoder extends MessageToMessageEncoder<Integer> {@Overridepublic void encode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {out.add(String.valueOf(msg));}
}

抽象的编解码器类

    它结合了ByteToMessageDecoder以及它的逆向——MessageToByteEncoder

方法名称

描述

decode(

            ChannelHandlerContext ctx,

             ByteBuf in,

            List<Object>)

只要有字节可以被消费,这个方法就将被调用。它将入站ByteBuf 转换为指定的消息格式,并将其转发给ChannelPipeline 中的下一个ChannelInboundHandler

decodeLast(

            ChannelHandlerContext ctx,

            ByteBuf in,

            List<Object>  out)

这个方法默认实现委托给decode()方法。在Channel状态变为非活动时被调用一次。可以被重写以实现特殊处理。

encode(

            ChannelHandlerContext ctx,

            I msg,

            ByteBuf  out)

对每个将被编码并写入出站ByteBuf的消息来说,这个方法都会被调用。

抽象类MessageToMessageCodec 

通过使用MessageToMessageCodec,我们可以在一个单个的类中实现该转换的往返过程。

MessageToMessageCodec是一个参数化的类:

public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelDuplexHandler

方法名称

描述

protected abstract decode(

            ChannelHandlerContext ctx,

            INBOUND_IN msg,

            List<Object> out)

这个方法被调用时会被传入INBOUND_IN类型的消息。它将把它们解码为OUTBOUND_IN类型的消息,这些消息将被转发给ChannelPipeline中的下一个Channel-InboundHandler

protected abstract encode(

            ChannelHandlerContext ctx,

           OUTBOUND_IN msg,

            List<Object> out)

对于每个OUTBOUND_IN类型的消息,这个方法都将会被调用。这些消息将会被编码为INBOUND_IN类型的消息,然后被转发给ChannelPipeline中的下一个ChannelOutboundHandler

    decode()方法是  将INBOUND_IN类型的消息转换为OUTBOUND_IN类型的消息,而encode()方法则进行它的逆向操作。将INBOUND_IN类型的消息看作是通过网络发送的类型,而将OUTBOUND_IN类型的消息看作是应用程序所处理的类型,将可能有助于理解。

CombinedChannelDuplexHandler类

    这个类充当了ChannelInboundHandler和ChannelOutboundHandler(该类的类型参数I和O)的容器。通过提供分别继承了解码器类和编码器类的类型,我们可以实现一个编解码器,而又不必直接扩展抽象的编解码器类。

public class ByteToCharDecoder extends ByteToMessageDecoder {@Overridepublic void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)throws Exception {while (in.readableBytes() >= 2) {out.add(in.readChar());}}
}

这里的decode()方法一次将从ByteBuf中提取2字节,并将它们作为char写入到List中,其将会被自动装箱为Character对象。

public class CharToByteEncoder extends MessageToByteEncoder<Character>
{@Overridepublic void encode(ChannelHandlerContext ctx, Character msg, ByteBuf out) throws Exception{out.writeChar(msg);//将Character解码为char,并将其写入到出站ByteBuf中}
}

有了解码器和编码器,将它们来构建一个编解码器。 

public class CombinedByteCharCodec extends CombinedChannelDuplexHandler<ByteToCharDecoder, CharToByteEncoder>
{public CombinedByteCharCodec(){super(new ByteToCharDecoder(), new CharToByteEncoder());}
}

参考《Netty实战》