当前位置: 代码迷 >> 综合 >> 从零开始手写 dubbo rpc 框架-05-serial 序列化
  详细解决方案

从零开始手写 dubbo rpc 框架-05-serial 序列化

热度:92   发布时间:2024-01-06 10:35:17.0

序列化

为什么需要序列化

netty 底层都是基于 ByteBuf 进行通讯的。

前面我们通过编码器/解码器专门为计算的入参/出参进行处理,这样方便我们直接使用 pojo。

但是有一个问题,如果想把我们的项目抽象为框架,那就需要为所有的对象编写编码器/解码器。

显然,直接通过每一个对象写一对的方式是不现实的,而且用户如何使用,也是未知的。

序列化的方式

基于字节的实现,性能好,可读性不高。

基于字符串的实现,比如 json 序列化,可读性好,性能相对较差。

实现思路

可以将我们的 Pojo 全部转化为 byte,然后 Byte 转换为 ByteBuf 即可。

反之亦然。

代码实现

序列化 jar 依赖

<dependency><groupId>com.github.houbb</groupId><artifactId>json</artifactId><version>0.1.1</version>
</dependency>

服务端

服务端的序列化/反序列化调整为直接使用 JsonBs 实现。

import com.github.houbb.json.bs.JsonBs;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.common.model.CalculateRequest;
import com.github.houbb.rpc.common.model.CalculateResponse;
import com.github.houbb.rpc.common.service.Calculator;
import com.github.houbb.rpc.server.service.CalculatorService;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;/*** @author binbin.hou* @since 0.0.1*/
public class RpcServerHandler extends SimpleChannelInboundHandler {
    private static final Log log = LogFactory.getLog(RpcServerHandler.class);@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {
    final String id = ctx.channel().id().asLongText();log.info("[Server] channel {} connected " + id);}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
    final String id = ctx.channel().id().asLongText();ByteBuf byteBuf = (ByteBuf)msg;byte[] bytes = new byte[byteBuf.readableBytes()];byteBuf.readBytes(bytes);CalculateRequest request = JsonBs.deserializeBytes(bytes, CalculateRequest.class);log.info("[Server] receive channel {} request: {} from ", id, request);Calculator calculator = new CalculatorService();CalculateResponse response = calculator.sum(request);// 回写到 client 端byte[] responseBytes = JsonBs.serializeBytes(response);ByteBuf responseBuffer = Unpooled.copiedBuffer(responseBytes);ctx.writeAndFlush(responseBuffer);log.info("[Server] channel {} response {}", id, response);}}

客户端

客户端的序列化/反序列化调整为直接使用 JsonBs 实现。

import com.github.houbb.json.bs.JsonBs;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.client.core.RpcClient;
import com.github.houbb.rpc.common.model.CalculateResponse;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;/*** <p> 客户端处理类 </p>** <pre> Created: 2019/10/16 11:30 下午 </pre>* <pre> Project: rpc </pre>** @author houbinbin* @since 0.0.2*/
public class RpcClientHandler extends SimpleChannelInboundHandler {
    private static final Log log = LogFactory.getLog(RpcClient.class);/*** 响应信息* @since 0.0.4*/private CalculateResponse response;@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf byteBuf = (ByteBuf)msg;byte[] bytes = new byte[byteBuf.readableBytes()];byteBuf.readBytes(bytes);this.response = JsonBs.deserializeBytes(bytes, CalculateResponse.class);log.info("[Client] response is :{}", response);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    // 每次用完要关闭,不然拿不到response,我也不知道为啥(目测得了解netty才行)// 个人理解:如果不关闭,则永远会被阻塞。ctx.flush();ctx.close();}public CalculateResponse getResponse() {
    return response;}}
  相关解决方案