type
status
date
slug
summary
tags
category
icon
password
AI summary
出站--->Socket通道--->入站--->客户端
客户端发数据给服务端:
客户端--->出站--->Socket通道--->入站--->服务端

下面是Netty官方源码给的图,我个人觉的不是太好理解,上面的图好理解一些
ByteToMessageDecoder的小细节
- 由于发送的字符串是16字节,根据上面注释说的内容,decode会被调用两次
如下图验证结果:

- 同时又引出了一个小问题

当我们
MyClientHandler
传一个Long时,会调用我们的MyLongToByteEncoder
的编码器。那么控制台就会打印这样一句话:MyLongToByteEncoder encode 被调用。但是这里并没有调用编码器,这是为什么呢?MyClientHandler
这个处理器的后一个处理器是MyLongToByteEncoder
MyLongToByteEncoder
的父类是MessageToByteEncoder
,在MessageToByteEncoder
中有下面的一个方法
3.当我们以这样的形式发送数据

这两个类型并不匹配,也就不会走编码器。因此我们编写 Encoder 是要注意传入的数据类型和处理的数据类型一致
结论:
- 不论解码器
handler
还是编码器handler
即接收的消息类型必须与待处理的消息类型一致,否则该handler
不会被执行
- 在解码器进行数据解码时,需要判断缓存区(
ByteBuf
)的数据是否足够,否则接收到的结果会期望结果可能不一致。
解码器 - ReplayingDecoder
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
ReplayingDecoder
扩展了ByteToMessageDecoder
类,使用这个类,我们不必调用readableBytes()
方法,也就不用判断还有没有足够的数据来读取。参数S
指定了用户状态管理的类型,其中Void
代表不需要状态管理
- 应用实例:使用
ReplayingDecoder
编写解码器,对前面的案例进行简化[案例演示]
ReplayingDecoder
- 并不是所有的
ByteBuf
操作都被支持,如果调用了一个不被支持的方法,将会抛出一个UnsupportedOperationException
。 ReplayingDecoder
在某些情况下可能稍慢于ByteToMessageDecoder
,例如网络缓慢并且消息格式复杂时,消息会被拆成了多个碎片,速度变慢
使用方便,但它也有一些局限性:
其它编解码器

LineBasedFrameDecoder
:这个类在Netty
内部也有使用,它使用行尾控制字符(\n或者\r\n)作为分隔符来解析数据。
DelimiterBasedFrameDecoder
:使用自定义的特殊字符作为消息的分隔符。
HttpObjectDecoder
:一个HTTP
数据的解码器
- 在
Maven
中添加对Log4j
的依赖在pom.xml
- 配置
Log4j
,在resources/log4j.properties
- 演示整合

TCP 粘包和拆包及解决方案
TCP 粘包和拆包基本介绍
TCP
是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务器端)都要有一一成对的socket
,因此,发送端为了将多个发给接收端的包,更有效的发给对方,使用了优化方法(Nagle
算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样做虽然提高了效率,但是接收端就难于分辨出完整的数据包了,因为面向流的通信是无消息保护边界的
- 由于
TCP
无消息保护边界,需要在接收端处理消息边界问题,也就是我们所说的粘包、拆包问题,看一张图
TCP
粘包、拆包图解

假设客户端分别发送了两个数据包
D1
和 D2
给服务端,由于服务端一次读取到字节数是不确定的,故可能存在以下四种情况:- 服务端分两次读取到了两个独立的数据包,分别是
D1
和D2
,没有粘包和拆包
- 服务端一次接受到了两个数据包,
D1
和D2
粘合在一起,称之为TCP
粘包
- 服务端分两次读取到了数据包,第一次读取到了完整的
D1
包和D2
包的部分内容,第二次读取到了D2
包的剩余内容,这称之为TCP
拆包
- 服务端分两次读取到了数据包,第一次读取到了
D1
包的部分内容D1_1
,第二次读取到了D1
包的剩余部分内容D1_2
和完整的D2
包。
TCP 粘包和拆包现象实例
在编写
Netty
程序时,如果没有做处理,就会发生粘包和拆包的问题看一个具体的实例:
MyServer
MyServerInitializer
MyServerHandler
MyClient
MyClientInitializer
MyClientHandler
效果
第一次运行:
Client

Server

第二次运行:
Client

Server

可以看到第一次运行时,服务器一次性将10个数据都接收了,第二次运行时分六次接收的,这就很形象的看出了TCP的粘包现象。
TCP 粘包和拆包解决方案
- 常用方案:使用自定义协议+编解码器来解决
- 关键就是要解决服务器端每次读取数据长度的问题,这个问题解决,就不会出现服务器多读或少读数据的问题,从而避免的
TCP
粘包、拆包。
看一个具体的实例
- 要求客户端发送
5
个Message
对象,客户端每次发送一个Message
对象
- 服务器端每次接收一个
Message
,分5
次进行解码,每读取到一个Message
,会回复一个Message
对象给客户端。

MessageProtocol
MyServer
MyServerInitializer
MyServerHandler
MyClient
MyClientInitializer
MyClientHandler
MyMessageEncoder
MyByteToLongDecoder2
效果
Client输出
Server输出
无论运行几次,Server都是分5次接收的,这样就解决了TCP粘包问题。
Netty 心跳(heartbeat)服务源码剖析
源码剖析目的
作为一个网络框架,提供了诸多功能,比如编码解码等,
还提供了非常重 要的一个服务-----心跳机制
。通过心跳检查对方是否有效,这是
框架 中是必不可少的功能。下面我们分析一下Netty内部 心跳服务源码实现。

源码剖析
说明
- Netty 提供了 IdleStateHandler ,ReadTimeoutHandler,WriteTimeoutHandler 三 个Handler 检测连接的有效性,重点分析
IdleStateHandler
.
- 如图 3)
ReadTimeout
事件和WriteTimeout
事件都会自动关闭连接,而且,属于异常处理,所以,这里只是介绍以下,我们重点看IdleStateHandler。

IdleStateHandler分析
handlerAdded
方法 当该handler
被添加到pipeline
中时,则调用initialize
方法只要给定的参数大于0,就创建一个定时任务,每个事件都创建。同时,将state状态设置为1,防止重复初始化调用initOutputChanged方法,初始化“监控出站数据属性”。 三个任务类

这3个定时任务分别对应读,写,读或者写事件。共有一个父类(
AbstractIdleTask
)。这个父类提供了一个模板方法说明: 1)得到用户设置的超时时间。 2)如果读取操作结束了(执行了channelReadComplete方法设置),就用当前时间减去给定时间和最后一次读(执操作的时间行了channelReadComplete方法设置),如果小于O,就触发事件。反之,继续放入队 列。间隔时间是新的计算时间。 3)触发的逻辑是:首先将任务再次放到队列,时间是刚开始设置的时间,返回一个promise对象,用于做 取消操作。然后,设置first属性为false,表示,下一次读取不再是第一次了,这个属性在channelRead方 法会被改成rue。 4)创建一个IdleStateEvent类型的写事件对象,将此对象传递给用户的UserEventTriggered方法。完成触 发事件的操作。 5)总的来说,每次读取操作都会记录一个时间,定时任务时间到了,会计算当前时间和最后一次读的时间的间隔,如果间隔超过了设置的时间,就触发UserEventTriggered方法。∥前面介绍IdleStateHandler说过,可以看一下
写事件的run方法(即VriterIdleTimeoutTask的run方法)分析
说明: 写任务的代码逻辑基本和读任务的逻辑一样,唯一不同的就是有一个针对出站较慢数据的判断 hasOutputChanged
所有事件的run方法(即AllldleTimeoutTask的rum方法)分析
说明: 1)表示这个监控着所有的事件。当读写事件发生时,都会记录。代码逻辑和写事件的的基本一致: 2)需要大家注意的地方是 long nextDelay allldleTimeNanos; if (!reading){ ∥当前时间减去最后一次写或读的时间,若大于0,说明超时了 nextDelay -ticksInNanos()-Math.max(lastReadTime,last Write Time); 3)这里的时间计算是取读写事件中的最大值来的。然后像写事件一样,判断是否发生了写的慢的情况。
10.小结Nety的心跳机制 l)IdleStateHandler可以实现心跳功能,当服务器和客户端没有任何读写交互时,并超过了给定的时间,则会 触发用户handler的userEventTriggered方法。用户可以在这个方法中尝试向对方发送信息,如果发送失败,则关 闭连接。 2)IdleStateHandler的实现基于EventLoop的定时任务,每次读写都会记录一个值,在定时任务运行的时候, 通过计算当前时间和设置时间和上次事件发生时间的结果,来判断是否空闲。 3)内部有3个定时任务,分别对应读事件,写事件,读写事件。通常用户监听读写事件就足够了。 4)同时,IdleStateHandler内部也考虑了一些极端情况:客户端接收缓慢,一次接收数据的速度超过了设置的 空闲时间。Netty通过构造方法中的observeOutput属性来决定是否对出站缓冲区的情况进行判断。 5)如果出站缓慢,Ny不认为这是空闲,也就不触发空闲事件。但第一次无论如何也是要触发的。因为第一 次无法判断是出站缓慢还是空闲。当然,出站缓慢的话,可能造成OOM,OOM比空闲的问题更大。 6)所以,当你的应用出现了内存溢出,OOM之类,并且写空闲极少发生(使用了observeOutput为true), 那么就需要注意是不是数据出站速度过慢。 7)还有一个注意的地方:就是ReadTimeoutHandler,它继承自IdleStateHandler,当触发读空闲事件的时候, 就触发ctx.fireExceptionCaught方法,并传入一个ReadTimeoutException,然后关闭Socket。 8)而WriteTimeoutHandler的实现不是基于IdleStateHandler的,他的原理是,当调用write方法的时候,会 创建一个定时任务,任务内容是根据传入的promise的完成情况来判断是否超出了写的时间。当定时任务根据指 定时间开始运行,发现promise的isDone方法返回false,表明还没有写完,说明超时了,则抛出异常。当write 方法完成后,会打断定时任务。
用 Netty 自己实现简单的RPC
RPC 基本介绍
RPC(Remote Procedure Call)
—远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程
- 两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样(如图)

过程:
- 调用者(
Caller
),调用远程API(Remote API
)
- 调用远程API会通过一个RPC代理(
RpcProxy
)
- RPC代理再去调用
RpcInvoker
(这个是PRC的调用者)
RpcInvoker
通过RPC连接器(RpcConnector
)
- RPC连接器用两台机器规定好的PRC协议(
RpcProtocol
)把数据进行编码
- 接着RPC连接器通过RpcChannel通道发送到对方的PRC接收器(RpcAcceptor)
- PRC接收器通过PRC协议进行解码拿到数据
- 然后将数据传给
RpcProcessor
RpcProcessor
再传给RpcInvoker
RpcInvoker
调用Remote API
- 最后推给被调用者(Callee)
- 常见的
RPC
框架有:比较知名的如阿里的Dubbo
、Google
的gRPC
、Go
语言的rpcx
、Apache
的thrift
,Spring
旗下的SpringCloud
。

我们的RPC 调用流程图

RPC 调用流程说明
- 服务消费方(
client
)以本地调用方式调用服务
client stub
接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
client stub
将消息进行编码并发送到服务端
server stub
收到消息后进行解码
server stub
根据解码结果调用本地的服务
- 本地服务执行并将结果返回给
server stub
server stub
将返回导入结果进行编码并发送至消费方
client stub
接收到消息并进行解码
- 服务消费方(
client
)得到结果
小结:
RPC
的目标就是将 2 - 8
这些步骤都封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用己实现 Dubbo RPC(基于 Netty)
需求说明
Dubbo
底层使用了Netty
作为网络通讯框架,要求用Netty
实现一个简单的RPC
框架
- 模仿
Dubbo
,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用Netty 4.1.20
设计说明
- 创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。
- 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
- 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用
Netty
请求提供者返回数据
- 开发的分析图

代码
封装的RPC
可以把这块代码理解成封装的dubbo
NettyServer
NettyServerHandler
NettyClientHandler
NettyClient
接口HelloService
HelloServiceImpl
ServerBootStrap
ClientBootStrap
调用过程
ClientBootstrap#main
发起调用
- 走到下面这一行代码后
- 调用
NettyClient#getBean
,在此方法里与服务端建立链接。
- 于是就执行
NettyClientHandler#channelActive
- 接着回到
NettyClient#getBean
调用NettyClientHandler#setPara
,调用完之后再回到NettyClient#getBean
,用线程池提交任务
- 因为用线程池提交了任务,就准备执行
NettyClientHandler#call
线程任务
- 在
NettyClientHandler#call
中发送数据给服务提供者
由于还没收到服务提供者的数据结果,所以wait住
- 来到了服务提供者这边,从Socket通道中收到了数据,所以执行
NettyServerHandler#channelRead
,然后因为此方法中执行了
- 就去
HelloServiceImpl#hello
中执行业务逻辑,返回数据给NettyServerHandler#channelRead
,NettyServerHandler#channelRead
再把数据发给客户端
NettyClientHandler#channelRead
收到服务提供者发来的数据,唤醒之前wait的线程
- 所以之前wait的线程从
NettyClientHandler#call
苏醒,返回result给NettyClient#getBean
NettyClient#getBean
get()到数据,ClientBootstrap#main
中的此函数调用返回,得到服务端提供的数据。
13.至此,一次RPC调用结束。
效果
ClientBootstrap打印
ServerBootstrap打印
- 作者:IT小舟
- 链接:https://www.codezhou.top/article/JAVA%E3%80%90Netty%E3%80%91%E7%AC%AC%E4%B8%89%E8%AE%B2%EF%BC%88%E6%8C%81%E7%BB%AD%E6%9B%B4%E6%96%B0%EF%BC%89
- 声明:本文采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。