-
2022-05-31 19:32:23
先说结论:
ENSP模拟环境下,限制vlan b访问 vlan a的某个地址,只能在vlan b的inbound 方向上做策略,或者在vlan a的视图下做inbound,outbound无论是在vlan a或者vlan b视图下,都不起作用。
实际环境中,我今天下午测试的时候,交换机真机环境下没有问题。
以下是测试的过程:
最近工作中用到了traffic-policy ,之前用traffic-filter的时候在Inbound和outbound 的时候,在哪个方向上应用,没有迷惑,可是在用traffic-policy 的时候,发现只有在inbound方向上才有用,在vlan视图下用outbound,不起作用,
我用ensp做了个模拟,做了个最严格,或者说极端情况下的acl deny,结果还是能ping通,说明traffic-policy在vlan视图下,outbound是不起作用的。
正常情况:要想做到不让192.168.2.0网段Ping通192.168.1.2,只需要在vlan20视图下,做
traffic-policy inbound就可可以,配置如图,
acl number 3000 rule 5 deny icmp destination 192.168.1.2 0 # traffic classifier 3000 operator and if-match acl 3000 # traffic behavior deny deny # traffic policy 3000 classifier 3000 behavior deny # drop-profile default # vlan 20 traffic-policy 3000 inbound # interface Vlanif10 ip address 192.168.1.1 255.255.255.0 # interface Vlanif20 ip address 192.168.2.1 255.255.255.0 # interface MEth0/0/1 # interface GigabitEthernet0/0/1 port link-type access port default vlan 10 # interface GigabitEthernet0/0/2 port link-type access port default vlan 10 # interface GigabitEthernet0/0/3 port link-type access port default vlan 20 # interface GigabitEthernet0/0/4 port link-type access port default vlan 20
这样的配置下,在vlan 20 下的电脑是不能ping通192.168.1.2的
------------------------------------------------------------------------------------
异常情况:vlan 视图下outbound方向
基于上面正确的配置,是应用在vlan 20的inbound方向的,我考虑,我将数据包从vlan20送过去,用的inbound,那么到达vlan10以后,在发送给电脑,应该是outbound,于是我在vlan10下面配置了traffic-policy 3000 outbound,下面就只贴不一样的关键配置了
测试1:
测试1: acl number 3000 rule 5 deny icmp destination 192.168.1.2 0 # traffic classifier 3000 operator and if-match acl 3000 # traffic behavior deny deny # traffic policy 3000 classifier 3000 behavior deny # vlan 10 traffic-policy 3000 outbound # 结果这样的配置,vlan 20的电脑还是能Ping通192.168.1.2
基于上面的情况,我在想是不是destination和source不对,于是我就改成了这样
测试2:vlan10视图下的outbound
测试2: acl number 3000 rule 5 deny icmp destination 192.168.1.2 0 rule 10 deny icmp source 192.168.1.2 0 # traffic classifier 3000 operator and if-match acl 3000 # traffic behavior deny deny # traffic policy 3000 classifier 3000 behavior deny # drop-profile default # vlan 10 traffic-policy 3000 outbound 结果这样,vlan 20 还是能ping通192.168.1.2
我已经在acl里禁止了source 和destination地址都是192.168.1.2的地址,然后应用在了vlan 10的outbound方向,结果在pc3上Ping 192.168.1.2还是可以通的。
后来我想干脆我给vlan10的icmp全禁止不就行了,
于是我做了极端情况下的acl配置,结果,vlan20还是能ping通192.168.1.2
测试3:禁止icmp的情况下,vlan10 outbound依然不起作用
【测试3:极端情况】 acl number 3000 rule 5 deny icmp rule 10 deny ip # traffic classifier 3000 operator and if-match acl 3000 # traffic behavior deny deny # traffic policy 3000 classifier 3000 behavior deny # drop-profile default # vlan 10 traffic-policy 3000 outbound # 结果,这样的acl,vlan 20下的pc3依然能ping通192.168.1.2 到这里我就开始怀疑是不是ensp有Bug,从理论上来说,vlan10 不会跟外界有icmp通信的
同样的 ,将这个极端情况的acl应用在vlan 20视图下,依然不起作用。
测试4:禁止vlan 20视图下的icmp
测试4: acl number 3000 rule 5 deny icmp rule 10 deny ip # traffic classifier 3000 operator and if-match acl 3000 # traffic behavior deny deny # traffic policy 3000 classifier 3000 behavior deny # vlan 20 traffic-policy 3000 outbound 结果,这样的情况下,vlan 20的PC3依然能ping通192.168.1.2 ^_^
到这里,traffic-policy 在vlan视图下的outbound,彻底给我整不会了,于是开始大量的查找资料,目前查到的资料里都说是在目的vlan下做outbound就可以,可是测试上却不可以,目前还没更好的例子,有大神知道这是为什么吗?
2022.5.31
更多相关内容 -
H3C应用交付产品维护指导-服务器与outbound.pptx
2021-10-11 14:02:05H3C应用交付产品维护指导-服务器与outbound.pptx -
outbound-link-checker:一个有用的工具,用于检查和列出网站的出站链接
2021-05-11 00:06:52介绍 这个简单的工具可让您列出您网域中的所有出站链接,以捕获可疑和垃圾链接。 您可以交互地将域列入白名单,以将其标记为安全域,并忽略所有指向... go get github.com/ashishb/outbound-link-checker 样品用量 -
abiquo-api-outbound-java-client:一个Java客户端,用于连接和使用Abiquo API出站功能
2021-05-13 15:22:05Abiquo出站API客户端 Abiquo云平台的2.6版引入了Outbound API,这是一个事件流,它将使客户和第三方软件提供商可以将外部系统(例如备份软件)与Abiquo平台集成在一起。 可以使用标准的服务器发送事件(SSE)协议... -
SAP PI 配置 从配置到开发测试 Inbound outbound都覆盖
2020-08-05 09:27:281. SLD(System landscape directory)中创建software component 2. ESR(Enterprise service repository)中创建software component version 3. 创建namespace 4. 创建folder 4. 创建date type DT 并激活 ... -
Pipeline outbound
2021-03-15 21:07:06netty源码死磕8Pipeline outbound 出站流程揭秘1. Pipeline outbound流程1.1. 出站的定义简单回顾一下。出站(outbound) 操作,通常是处于上层的Netty channel,去操作底层Java NIO channel/OIO Channel。主要出站...netty源码死磕8
Pipeline outbound 出站流程揭秘
1. Pipeline outbound流程
1.1. 出站的定义
简单回顾一下。
出站(outbound) 操作,通常是处于上层的Netty channel,去操作底层Java NIO channel/OIO Channel。
主要出站(outbound)操作如下:
1. 端口绑定 bind
2. 连接服务端 connect
3. write写通道
4. flush刷新通道
5. read读通道
6. 主动断开连接 disconnect
7. 主动关闭通道 close
最为常见,也是最容易理解的出站操作,是第3个操作 —— write写通道。
一个Netty Channel的write 出站操作 实例:
// server向 Channel写登录响应
ctx.channel().write(“恭喜,登录成功”);
//....
1.2. 出站处理器Handler
对于出站操作,有相应的出站Handler处理器。
有四个比较重要的出站Handler类。
这个四个 Handler 相关的类结构图如下:
在抽象的ChannelOutboundHandler 接口中,定义了所有的出站操作的方法声明。
在ChannelOutboundHandlerAdapter 出站适配器中,提供了出站操作的默认实现。如果要实现定制的出站业务逻辑,继承ChannelOutboundHandlerAdapter 适配器即可。ChannelOutboundHandlerAdapter 面向的是通用的出站处理场景。
有一个特殊场景的出站处理器——HeadContext。先按下不表,稍候重点介绍。
1.3. 出站的上下文包裹器Context
虽然有专门的Handler,但是,并没有专门的出站Context上下文包裹器。
强调一下:
没有单独的出站上下文Context基类。出站和入站,复用了同一个上下文Context基类。它就是AbstractChannelHandlerContext。
在这个AbstractChannelHandlerContext基类中,定义了每一个出站操作的默认实现。
基本的出站方法如下:
AbstractChannelHandlerContext.bind(SocketAddress, ChannelPromise)
AbstractChannelHandlerContext.connect(SocketAddress,SocketAddress, hannelPromise)
AbstractChannelHandlerContext.write(Object, ChannelPromise)
AbstractChannelHandlerContext.flush()
AbstractChannelHandlerContext.read()
AbstractChannelHandlerContext.disconnect(ChannelPromise)
AbstractChannelHandlerContext.close(ChannelPromise)
赘述一遍:
Context类型的接口是ChannelHandlerContext,抽象的基类是AbstractChannelHandlerContext。
出站和入站的区分,通过基类AbstractChannelHandlerContext的两个属性来完成——outbound、intbound。
出站Context的两个属性的值是:
(1)AbstractChannelHandlerContext基类对象的属性outbound的值为false
(2)AbstractChannelHandlerContext基类对象的属性intbound值为true
Pipeline 的第一个节点HeadContext,outbound属性值为true,所以一个典型的出站上下文。
1.4. 流逼哄哄的HeadContext
为什么说流逼哄哄呢?
因为:HeadContext不光是一个出站类型的上下文Context, 而且它完成整个出站流程的最后一棒。
不信,看源码:
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler
{
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline)
{
//父类构造器
super(pipeline, null, HEAD_NAME, false, true);
//...
}
}
在HeadContext的构造器中,调用super 方法去初始化基类AbstractChannelHandlerContext实例。
注意,第四个、第五个参数非常重要。
第四个参数false,此参数对应是是基类inbound的值,表示Head不是入站Context。
第五个参数true,此参数对应是是基类outbound的值,表示Head是一个出站Context。
所以,在作为上下文Context角色的时候,HeadContext是黑白分明的、没有含糊的。它就是一个出站上下文。
但是,顺便提及一下,HeadContext还承担了另外的两个角色:
(1)入站处理器
(2)出站处理器
所以,总计一下,其实HeadContext 承担了3个角色。
HeadContext作为Handler处理器的角色使用的时候,HeadContext整个Handler是一个两面派,承担了两个Handler角色:
(1)HeadContext是一个入站Handler。HeadContext 是入站流水线处理处理的起点。是入站Handler队列的排头兵。
(2)HeadContext是一个出站Handler。HeadContext 是出站流水线处理的终点,完成了出站的最后棒—— 执行最终的Java IO Channel底层的出站方法。
整个出站处理的流水线,是如何一步一步,流转到最后一棒的呢?
1.5. TailContext出站流程的起点
出站处理,起点在TailContext。
这一点,和入站处理的流程刚好反过来。
在Netty 的Pipeline流水线上,出站流程的起点是TailContext,终点是HeadContext,流水线执行的方向是从尾到头。
强调再强调:
出站流程,只有outbound 类型的 Context 参与。inbound 类型的上下文Context不参与(TailContext另说)。
上图中,橙色的是outbound Context ,是出站类型,肯定参与出站流程的。紫色的是inbound context,是入站类型的上下文Context。
上图中,橙色的Context有两个,分别是EncoderContext和HeaderContext两个Context。EncoderContext 负责出站报文的编码,一般将Java 对象编码成特定格式的传输数据包data package。HeaderContext 负责将数据包写出到Channel 通道。
在Pipeline创建的时候,加入Handler之前,Pipeline就是有两个默认的Context——HeadContext,和TailContext。
最初的Pipeline结构,如下图所示:
TailContext是出站起点,HeadContext是出站的终点。也就是说,Pipeline 从创建开始,就具已经备了Channel出站操作的能力的。
关键的问题是:作为出站的起点,为什么TailContext不是橙色呢?
首先,TailContext不是outbound类型,反而,是inbound入站类型的上下文包裹器。
其次,TailContext 在出站流水线上,仅仅是承担了一个启动工作,寻找出第一个真正的出站Context,并且,将出站的第一棒交给他。
总之,在出站流程上,TailContext作用,只是一把钥匙,仅此而已。
1.6. 出站write操作的流程实例
老规则,先上例子。
以最为常见、最好理解的出站操作——Netty Channel 出站write操作为例,将outbound处理出站流程做一个详细的描述。
整个写出站的入站处理流程图,如下:
1.7. 出站操作的最初源头
再看一次Netty Channel的write出站实例:
// server向客户 Channel写登录响应
ctx.channel().write(“恭喜,登录成功”);
//....
写操作一般的源头是从Netty 的通道channel开始的。当服务器需要发送一个业务消息到客户端,会使用到之前打开的客户端通道channel,调用通道channel的出站写操作write方法,完成写操作。
这个write方法的Netty 源码,在基类AbstractChannel 实现了一个基础的版本。
代码如下:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel
{
//…
@Override
public ChannelFuture write(Object msg) {
return pipeline.write(msg);
}
//…
}
回忆一下通道和流水线的关系:一个通道一个pipeline流水线。一个流水线串起来一系列的Handler。
所以,通道将出站的操作,直接委托给了自己的成员——pipeline流水线。直接调用pipeline流水线的出站操作去完成。
也就是说,Channel 是甩手掌柜,将出站操作委托给了Pipeline。然而,Pipeline还是一个甩手掌柜。
Pipeline直接甩给了谁呢?
Pipeline 将出站操作,甩给双向链表的最后一个节点—— tail 节点。Pipeline的源码如下:
public class ChannelPipeline …..
{
//…
@Override
public final ChannelFuture write(Object msg) {
return tail.write(msg);
}
//…
}于是乎,在pipeline的链表上,tail节点,偏偏就是出站操作的启动节点。
1.8. Tail是出站操作的起点
TailContext 类的定义中,并没有实现 write写出站的方法。这个write(Object msg) 方法,定义在TailContext的基类——AbstractChannelHandlerContext 中。
代码如下:
abstract class AbstractChannelHandlerContext
extends DefaultAttributeMap implements ChannelHandlerContext
{
//……
@Override
public ChannelFuture write(Object msg) {
//….
return write(msg, newPromise());
}
@Override
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
//…
write(msg, false, promise);
return promise;
}
//……
@Override
public ChannelPromise newPromise() {
return new DefaultChannelPromise(channel(), executor());
}
//第三个重载的write
private void write(Object msg, boolean flush, ChannelPromise promise){
//...
//找出下一棒 next
AbstractChannelHandlerContext next = findContextOutbound();
//....
//执行下一棒 next.invoke
next.invokeWrite(msg, promise);
//...
}
}
有三个版本的write重载方法:
ChannelFuture write(Object msg)
ChannelFuture write(Object msg,ChannelPromise promise)
ChannelFuture write(Object msg, boolean flush,ChannelPromise promise)
调用的次序是:
第一个调用第二个,第二个调用第三个。
第一个write 创建了一个ChannelPromise 对象,这个对象实例非常重要。因为Netty的write出站操作,并不一定是一调用write就立即执行,更多的时候是异步执行的。write返回的这个ChannelPromise 对象,是专门提供给业务程序,用来干预异步操作的过程。
可以通过ChannelPromise 实例,监听异步处理的是否结束,完成write出站正真执行后的一些业务处理,比如,统计出站操作执行的时间等等。
ChannelPromise 接口,继承了 Netty 的Future的接口,使用了Future/Promise 模式。这个是一种异步处理干预的经典的模式。疯狂创客圈另外开了一篇文章,专门讲述Future/Promise 模式。
第二个write,最为单薄。简单的直接调用第三个write,调用前设置flush 参数的值为false。flush 参数,表示是否要将缓冲区ByteBuf中的数据,立即写入Java IO Channel底层套接字,发送出去。一般情况下,第二个write设置了false,表示不立即发出,尽量减少底层的发送,提升性能。
第三个write,最为重要,也最为复杂。分成两步,第一步是找出下一棒 next,下一棒next也是一个出站Context。第二步是,执行下一棒的invoke方法,也即是next.invokeWrite(msg, promise);
完成以上的三步操作,TailContext 终于将write出站的实际工作,交到了第一棒outbound Context的手中。
至此,TailContext终于完成的启动write流程的使命。
1.9. 出站流程小迭代的五个动作
一般来说,Pipeline上会有多个OutBound Context(包裹着Handler),每一个OutBound Context 的处理,可以看成是大的流水处理中的一次小迭代。
每一个小迭代,有五个动作。
五个动作,具体如下:
(1)context.write(msg,promise)
(2)context.write(msg,flush,promise)
(3)context.findContextOutbound();
(4)next.invokeWrite(msg,promise)
(5)handler.write(this,msg,promise)
Context中的write(msg,promise)方法,是整个小迭代的起点。局部的流程图如下:
整个五个动作中,只有第五步在Handler中定义。其他的四步,都在Context中定义。
第一步、第二步的 write 方法在前面已经详细介绍过了。这两步主要完成promise实例的 创建,flush 参数的设置。
现在到了比较关键的步骤:第三步。这一步是寻找出站的下一棒。
1.10. findContextOutbound找出下一棒
出站流程的寻找下一棒的工作,和入站处理的寻找下一棒的方向,刚好反过来。
出站流程,查找的方向是从尾到头。这就用到的双向链表的指针是prev——向前的指针。具体来说,从当前的context开始,不断的使用prev指针,进行循环迭代查找。一直找到终点HeadContext,结束。
Netty源码如下:
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext
{
//…
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
//…
}这个是一个标准的链表的前向查询。
每一次的查找,this表示当前的查询所在的context 节点,this.prev表示前一个context节点。
查找时,用到的双向链表的指针是prev——向前的指针。通过while循环,一直往Pipeline的前面查找,如果前面的context不是outbound出站上下文,则一直向前。直到,直到,一直到查找出下一个出站Context上下文为止。
最初的查找,从TailContext开始,this就是TailContext。后续的每一次查找,都是从当前的Context上下文开始的。
1.11. context的invokeWrite
找到下一棒出站Context后,执行context的invokeWrite的操作。
源码如下:
abstract class AbstractChannelHandlerContext
extends DefaultAttributeMap implements ChannelHandlerContext
{
//……
private void invokeWrite(Object msg, ChannelPromise promise) {
//...
invokeWrite0(msg, promise);
//...
}
//……
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
}
context的invokeWrite操作,最终调用到了其所包裹的handler的write方法。完成 定义在handler中的业务处理动作。
1.12. 默认的write出站实现
默认的write 出站方法的实现,定义在ChannelOutboundHandlerAdapter 中。write方法的源码如下:
public class ChannelOutboundHandlerAdapter
extends ChannelHandlerAdapter
implements ChannelOutboundHandler
{
//……
@Override
public void write(ChannelHandlerContext ctx,
Object msg, ChannelPromise promise) throws Exception
{
ctx.write(msg, promise);
}
//……
}
Handler的write出站操作,已经到了一轮出站小迭代的最后一步。这个默认的write方法,简单的调用context. write方法,回到了小迭代的第一步。
换句话说,默认的ChannelOutboundHandlerAdapter 中的handler方法,是流水线的迭代一个一个环节前后连接起来,的关键的一小步,保证了流水线不被中断掉。
反复进行小迭代,迭代处理完中间的业务handler之后,就会走到流水线的HeadContext。
1.13. HeadContext出站的最后一棒
在出站迭代处理pipeline的最后一步, 会来到HeadContext。
HeadContext是如何完成最后一棒的呢?
上源码:
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler
{
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
//父类构造器
super(pipeline, null, HEAD_NAME, false, true);
this.unsafe = pipeline.channel().unsafe();
//...
}
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
this.unsafe.write(msg, promise);
}
}
HeadContext 包含了一个unsafe 成员。 这个unsafe 成员,是一个只供在netty内部使用的类型。
unsafe 主要功能,就是完成对 Java NIO 底层Channel的写入 。
由此可见,在Netty中的三大上下文包裹器HeadContext、TailContext 、 DefaultChannelHandlerContext中,HeadContext是离 Java NIO 底层Channel最近的地方。三大包裹器,除了HeadContext,也没有谁包含Unsafe。对完成出站的最终操作职能来说,没有谁比HeadContext 更加直接。所以,这个出站处理的最后一棒,只能是HeadContext 了,呵呵。
至此为止,write出站的整个流水线流程,已经全部讲完。
从具体到抽象,我们再回到出站处理的通用流程。
1.14. 出站操作的全流程
基本上,在流程上,所有的出站事件的处理过程,是一致的。
为了方便说明,使用OUT_EVT符号代替一个通用出站操作。
通用的出站Outbound操作处理过程,大致如下:
(1)channel.OUT_EVT(msg);
(2)pipeline.OUT_EVT(msg);
(3)context.OUT_EVT(msg);
(4)context.OUT_EVT(msg,promise);
(5)context.OUT_EVT(msg,flush,promise);
(6)context.findContextOutbound();
(7)next.invoke(msg,flush,promise);
(8)handle.OUT_EVT(context,msg,promise);
(9)context.OUT_EVT(msg,promise);
上面的流程,如果短时间内看不懂,可以在回头看看write出站的实例。
无编程不创客,无案例不学习。疯狂创客圈,一大波高手正在交流、学习中!
疯狂创客圈 Netty 死磕系列10多篇深度文章:【博客园 总入口】 QQ群:104131248
-
Outbound-Blocker:使用 Windows 防火墙 API(从 Windows Vista 开始实现)和 netsh 允许您轻松阻止(并随后...
2021-06-07 00:12:57出站拦截器使用 Windows 防火墙 API(从 Windows Vista 开始实现)和 netsh 允许您轻松阻止(并随后取消阻止)指定应用程序的出站流量。 对此的灵感归功于 Rockstar 为单人模式制定了 GTA 5 多人游戏禁令预编译的二... -
SAP WM 高阶之2-Step Picking for Outbound Delivery
2022-03-03 17:24:14SAP WM 高阶之2-Step Picking Outbound Delivery Part I:相关说明 本文主要展示使用SAP WM的2-Step Picking功能为外向交货单做拣配的功能。关于2-Step Picking功能相关的后台配置,请参见我之前的文章,...SAP WM 高阶之2-Step Picking for Outbound Delivery
Part I:相关说明
本文主要展示使用SAP WM的2-Step Picking功能为外向交货单做拣配的功能。关于2-Step Picking功能相关的后台配置,请参见我之前的文章,这里不再重复说明。
物料号760启用了2-Step Picking.
如下销售订单736,
凭证流,
有2张OBD(outbound delivery).
本文就是展示如何通过SAP WM模块里的2-step Picking功能为这2张OBD做拣配。
Part II:功能展示
1, 执行事务代码VL06P为交货单创建Group。
输入shipping point NMDC等参数,参数,进入如下界面,
通过菜单 Subsequent Functions ->Group->Create with WM reference, 进入如下界面,
输入group的描述信息,回车,进入如下界面,
选中item,点击“Activate 2-Step Picking”按钮,
系统提示:Group 316 with reference to WM saved. 如上图。
2, 执行事务代码LX39为group做Pick & Allocation 两个步骤。
在Group字段里选择刚刚创建的group number(注意不能直接输入group 号码316),
按照Group description,我们找到了Group number是12。
实际上,在Table T311里,刚刚我们创建的所谓group 316的记录如下:
其真实的group号码是12,而非系统提示的316!笔者在测试过程中反复被提示说group number 3##不存在。郁闷了很久!
我们继续执行事务代码LX39,
执行,进入如下的界面:
第一步,为pick创建TO。
TO 42 被成功创建。
检查TO 42的数据,
注意:destination storage type是200。
刷新数据,
Confirm TOs,
Pick相关的TO单据被确认。
刷新数据,
第二步, 为Allocation创建TO单据。鼠标点击Allocation,然后Create TOs按钮,进入如下界面:
输入仓库号,组号,点击’Start Multiple proc.’按钮,进入如下界面:
系统提示:2 TO created.
看细节信息,
系统显示如下TO相关的信息,
看这2个TO单据,
注意:这2个TO都是将物料的库存从200存储区转入916存储区。
将Allocation相关的TO全部confirm. 刷新数据,
至此,对SO# 736的后续2个OBD已经通过2-step picking的方式完成了拣配。再看其凭证流,
TO单据都是completed的状态。不过Pick相关的TO#42并没有显示在凭证流里。
注:本文基于SAP S4/HANA 1909系统。
-完-
写于2022-3-3
-
Netty(十一)源码解析 之 Channel 的 inBound 与 outBound 处理器
2020-08-17 18:05:37Channel 的 inBound 与 outBound 处理器 1. 先了解一下相关核心类的API ChannelHandler:包含所有处理器的共性方法,主要是handlerAdded和handlerRemoved方法,一但当前处理器被添加成功或移除都会触发该回调方法...Channel 的 inBound 与 outBound 处理器
1. 先了解一下相关核心类的API
ChannelHandler、ChannelInbound/OutboundHandler
- ChannelHandler:包含所有处理器的共性方法,主要是handlerAdded和handlerRemoved方法,一但当前处理器被添加成功或移除都会触发该回调方法。
- ChannelInboundHandler:包含的都是一些
被动调用
的方法,数据流向是从外面传进来的
例如channelRead,外面传数据来了,管道有数据了就会触发;channelActive,外面请求连接,管道被激活了就会触发… - ChannelOutboundHandler:包含的都是一些
主动调用
的方法,数据流向是从内向外发的
例如bind,绑定端口;connect,连接;disconnect,断开连接;close关闭通道;write写数据…
ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter在实现ChannelInboundHandler和ChannelOutboundHandler的时候,默认全部都是直接调用ChannelHandlerContext各种fireXXX方法,这些方法的含义都是触发下一个节点的处理器对应的方法的执行
// 简单看几个方法 public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler { @Skip @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelRegistered(); } @Skip @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelUnregistered(); } ... }
ChannelHandlerContext
我们看一下ChannelHandlerContext ,它代表的是ChannelPipline链上的节点:
/** * Enables a {@link ChannelHandler} to interact with its {@link ChannelPipeline} * and other handlers. Among other things a handler can notify the next {@link ChannelHandler} in the * {@link ChannelPipeline} as well as modify the {@link ChannelPipeline} it belongs to dynamically. * 可以使当前节点对象的{@link ChannelHandler}和它的{@link ChannelPipeline}以及其他处理器交互。 * 除此之外,处理器可以通知{@link ChannelPipeline}中的下一个{@link ChannelHandler},并动态地 * 修改它所属的{@link ChannelPipeline}。 * * ... */ public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker { //列了一些自己本身的方法,简单看名字就知道啥意思了,就不写注释了 Channel channel(); EventExecutor executor(); String name(); ChannelHandler handler(); boolean isRemoved(); //省略了一些重写父类AttributeMap,ChannelInboundInvoker,ChannelOutboundInvoker的方法 //重写父类方法原因是返回值的类改为了自己 ... ChannelPipeline pipeline(); //返回ByteBuf分配器 ByteBufAllocator alloc(); ... }
继承了三个接口,先看AttributeMap
public interface AttributeMap { //获取指定key的属性值,就是我们通过bootstrap.attr设置的那些属性值 <T> Attribute<T> attr(AttributeKey<T> key); //判断有没有这个属性值 <T> boolean hasAttr(AttributeKey<T> key); }
ChannelInboundInvoker:ChannelInbound调用者,包含的方法都是为了触发ChannelInbound处理器对应的方法
public interface ChannelInboundInvoker { ChannelInboundInvoker fireChannelRegistered(); ChannelInboundInvoker fireChannelUnregistered(); ChannelInboundInvoker fireChannelActive(); ChannelInboundInvoker fireChannelInactive(); ChannelInboundInvoker fireExceptionCaught(Throwable cause); ChannelInboundInvoker fireUserEventTriggered(Object event); ChannelInboundInvoker fireChannelRead(Object msg); ChannelInboundInvoker fireChannelReadComplete(); ChannelInboundInvoker fireChannelWritabilityChanged(); }
ChannelOutboundInvoker :同理,可以触发ChannelOutbound处理器相关的方法
public interface ChannelOutboundInvoker { ChannelFuture bind(SocketAddress localAddress); ChannelFuture connect(SocketAddress remoteAddress); ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress); ChannelFuture disconnect(); ChannelFuture close(); ChannelFuture deregister(); ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise); ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise); ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise); ChannelFuture disconnect(ChannelPromise promise); ChannelFuture close(ChannelPromise promise); ChannelFuture deregister(ChannelPromise promise); ChannelOutboundInvoker read(); ChannelFuture write(Object msg); ChannelFuture write(Object msg, ChannelPromise promise); ... }
ChannelHandlerContext 实现了这两个接口,也就是说节点具有触发Inboud和outbound处理器相关方法的能力,同时也表明ChannelHandlerContext 既可以封装Inoud处理器,也可以封装Outbound处理器
ChannelPipeline
看下ChannelPipeline:
/** * A list of {@link ChannelHandler}s which handles or intercepts inbound events and outbound operations of a * {@link Channel}. {@link ChannelPipeline} implements an advanced form of the * <a href="http://www.oracle.com/technetwork/java/interceptingfilter-142169.html">Intercepting Filter</a> pattern * to give a user full control over how an event is handled and how the {@link ChannelHandler}s in a pipeline * interact with each other. * 一组{@link ChannelHandler}的列表,用于处理或拦截{@link Channel}的入站事件和出站操作。 * {@link ChannelPipeline}实现的一种高级形式的拦截过滤器模式让用户完全控制如何处理事件以及管道中的{@link ChannelHandler}如何相互交互。 * * ... * <pre> * I/O Request * via {@link Channel} or * {@link ChannelHandlerContext} * | * +---------------------------------------------------+---------------+ * | ChannelPipeline | | * | \|/ | * | +---------------------+ +-----------+----------+ | * | | Inbound Handler N | | Outbound Handler 1 | | * | +----------+----------+ +-----------+----------+ | * | /|\ | | * | | \|/ | * | +----------+----------+ +-----------+----------+ | * | | Inbound Handler N-1 | | Outbound Handler 2 | | * | +----------+----------+ +-----------+----------+ | * | /|\ . | * | . . | * | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()| * | [ method call] [method call] | * | . . | * | . \|/ | * | +----------+----------+ +-----------+----------+ | * | | Inbound Handler 2 | | Outbound Handler M-1 | | * | +----------+----------+ +-----------+----------+ | * | /|\ | | * | | \|/ | * | +----------+----------+ +-----------+----------+ | * | | Inbound Handler 1 | | Outbound Handler M | | * | +----------+----------+ +-----------+----------+ | * | /|\ | | * +---------------+-----------------------------------+---------------+ * | \|/ * +---------------+-----------------------------------+---------------+ * | | | | * | [ Socket.read() ] [ Socket.write() ] | * | | * | Netty Internal I/O Threads (Transport Implementation) | * +-------------------------------------------------------------------+ * ... */ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> { ... }
ChannelPipeline 中有各种add方法用来添加节点
除了继承ChannelInboundInvoker和ChannelOutboundInvoker,它还实现了Iterable接口,可以迭代pipline中的ChannelHandler
public class DefaultChannelPipeline implements ChannelPipeline { ... //返回一个迭代器 public final Iterator<Map.Entry<String, ChannelHandler>> iterator() { return toMap().entrySet().iterator(); } //将ChannelPipeline 中的所有ChannelHandler转成一个map public final Map<String, ChannelHandler> toMap() { Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>(); AbstractChannelHandlerContext ctx = head.next; for (;;) { if (ctx == tail) { return map; } // 节点名称是作为map的key出现的 map.put(ctx.name(), ctx.handler()); ctx = ctx.next; } } //发现pipline中的所有fire方法实现,调用都是从head头结点开始的!! @Override public final ChannelPipeline fireChannelActive() { AbstractChannelHandlerContext.invokeChannelActive(head); return this; } @Override public final ChannelPipeline fireChannelInactive() { AbstractChannelHandlerContext.invokeChannelInactive(head); return this; } @Override public final ChannelPipeline fireExceptionCaught(Throwable cause) { AbstractChannelHandlerContext.invokeExceptionCaught(head, cause); return this; } ... //在看实现的ChannelOutboundInvoker里面的方法,都是从尾节点开始的! @Override public final ChannelFuture bind(SocketAddress localAddress) { return tail.bind(localAddress); } @Override public final ChannelFuture connect(SocketAddress remoteAddress) { return tail.connect(remoteAddress); } @Override public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { return tail.connect(remoteAddress, localAddress); } ... }
发现pipline中的所有fire方法实现,如果实现的是ChannelInboundInvoker的方法,调用都是从head头结点开始的!!如果实现的是ChannelOutboundInvoker的方法,调用都是从tail尾结点开始的!!
其实也很好理解- 如果数据是从外面传进来的,则按照从头节点到尾节点的顺序依次拦截处理
- 如果数据从内向外发出的,则按照尾节点到头节点的顺序依次拦截处理
接下来跟踪三个例子,看事件是如何在Inbound和Outbond处理器传递的
2. Inbound 事件的传递
(1) 创建工程 16-channelInboundHandler
复制 02-socket 工程,在其基础上进行修改:16-channelInboundHandler
(2) 删除类
将其客户端代码全部删除,再将服务端原来自定义的处理器类删除。
(3) 定义服务端处理器
在这里定义三个处理器。
public class ChannelInboundHandler1 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("1111 " + msg); ctx.fireChannelRead(msg); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.channel().pipeline().fireChannelRead("Hello World 1111"); } }
public class ChannelInboundHandler2 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("2222 " + msg); ctx.fireChannelRead(msg); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.channel().pipeline().fireChannelRead("Hello World 2222"); } }
public class ChannelInboundHandler3 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("3333 " + msg); ctx.fireChannelRead(msg); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.channel().pipeline().fireChannelRead("Hello World 3333"); } }
(4) 定义服务端启动类
public class SomeServer { public static void main(String[] args) throws InterruptedException { EventLoopGroup parentGroup = new NioEventLoopGroup(); EventLoopGroup childGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(parentGroup, childGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ChannelInboundHandler1()); pipeline.addLast(new ChannelInboundHandler2()); pipeline.addLast(new ChannelInboundHandler3()); } }); ChannelFuture future = bootstrap.bind(8888).sync(); System.out.println("服务器已启动。。。"); future.channel().closeFuture().sync(); } finally { parentGroup.shutdownGracefully(); childGroup.shutdownGracefully(); } } }
(5) 演示
启动服务端后,用telnet命令进行访问:
服务端显示结果:
可以看出来执行链路是
ChannelInboundHandler1.channelActive -> ChannelInboundHandler1.channelRead -> ChannelInboundHandler2.channelRead ->ChannelInboundHandler3.channelRead我们把处理器1,2,3顺序颠倒一下在测试:
pipeline.addLast(new ChannelInboundHandler2()); pipeline.addLast(new ChannelInboundHandler1()); pipeline.addLast(new ChannelInboundHandler3());
这次的结果是:
执行链路:ChannelInboundHandler2.channelActive -> ChannelInboundHandler2.channelRead -> ChannelInboundHandler1.channelRead ->ChannelInboundHandler3.channelRead
可以发现channelActive 方法只触发了一次,并且是第一个处理器,因为我们实现的3个处理器中channelActive里都是调用了pipeline.fireChannelRead方法,即channelActive拦截成功以后,后续节点的channelActive就不会在执行了,直接从pipline上第一个节点重新开始触发ChannelRead方法
如果我们把第一个处理器的channelActive方法注释掉呢?我们把处理器的顺序还原,1,2,3,并把处理器1和3的channelActive方法注释掉,再次演示:
执行链路:ChannelInboundHandler2.channelActive -> ChannelInboundHandler1.channelRead -> ChannelInboundHandler2.channelRead ->ChannelInboundHandler3.channelRead
虽然ChannelInboundHandler1处理器器排在第一个,但是它没有实现channelActive方法,所以执行到了第二个处理器的channelActive方法,第二个处理器channelActive里又调用了pipline.fireChannelRead方法,从pipline上第一个节点重新开始触发ChannelRead方法
(6) 源码分析:
其实主要分析就是这两行代码:
ctx.channel().pipeline().fireChannelRead("Hello World 2222");
ctx.fireChannelRead(msg);
之前介绍过ChannelPipeline和ChannelHandlerContext都实现了ChannelInboundInvoker接口,所以都具有触发Inboud处理器对应方法的功能,两者有啥区别?其实上面演示已经能看出来了,ChannelPipeline的fire方法,触发的方法是从pipline链上第一个节点开始的,而ChannelHandlerContext的fire方法,触发的方法是当前节点的下一个节点的(
其实是下一个对应方法标记为允许执行的节点,之前有分析过标记逻辑
)源码分析,先从ctx.channel().pipeline().fireChannelRead开始:
public class ChannelInboundHandler2 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("2222 " + msg); ctx.fireChannelRead(msg); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 准备触发pipeline中第一个节点,即head节点的channelRead()方法 ctx.channel().pipeline().fireChannelRead("Hello World 2222"); } }
public class DefaultChannelPipeline implements ChannelPipeline { ... @Override public final ChannelPipeline fireChannelRead(Object msg) { // 准备触发head节点的channelRead()方法 AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; } ... }
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint { ... static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { //注意此时,next就是head节点 final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); //获取节点执行器,希望通过执行器的线程执行 //如果当前线程就是执行器的线程执行执行,否则封装成任务异步,让执行器线程执行 EventExecutor executor = next.executor(); if (executor.inEventLoop()) { // 准备触发当前next节点的channelRead() next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } } ... }
先看下final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, “msg”), next);干嘛的?
//io.netty.channel.DefaultChannelPipeline#touch private final boolean touch = ResourceLeakDetector.isEnabled(); //io.netty.channel.DefaultChannelPipeline#touch final Object touch(Object msg, AbstractChannelHandlerContext next) { //touch:如果资源泄漏检测启用,返回{@code true}。 return touch ? ReferenceCountUtil.touch(msg, next) : msg; } //io.netty.util.ReferenceCountUtil#touch(T, java.lang.Object) public static <T> T touch(T msg, Object hint) { if (msg instanceof ReferenceCounted) { return (T) ((ReferenceCounted) msg).touch(hint); } return msg; } //io.netty.util.ReferenceCounted#touch(java.lang.Object)方法注释: /** * Records the current access location of this object with an additional arbitrary information * for debugging purposes. If this object is determined to be leaked, the information * recorded by this operation will be provided to you via {@link ResourceLeakDetector}. * 记录此对象的当前访问位置以及用于调试的附加任意信息。如果确定该对象被泄露, * 此操作记录的信息将通过{@link ResourceLeakDetector}提供给您。 */ ReferenceCounted touch(Object hint);
其实我们不用关注,这个东西应该是编码做调试的时候用的,开启以后,会向msg里面传一些信息,正常运行一般不会用,这个m就是原来的数据。
继续跟invokeChannelRead,这个是触发当前节点channelRead方法
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint { ... private void invokeChannelRead(Object msg) { // 判断当前节点封装的处理器状态是否为已添加 if (invokeHandler()) { try { //已经添加了处理器,则直接调用处理器的channelRead方法 //注意此时是Head节点 ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { // 引发异常的处理 notifyHandlerException(t); } } else { fireChannelRead(msg); } } //判断当前节点处理器状态,判断处理器是否已经添加了 private boolean invokeHandler() { // Store in local variable to reduce volatile reads. int handlerState = this.handlerState; return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING); } ... }
注意此时是Head节点,我们跟Head节点的channelRead:
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, HeadContext.class); unsafe = pipeline.channel().unsafe(); setAddComplete(); } ... // 这是head节点的channelRead() @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // 触发当前节点的下一个节点的channelRead() ctx.fireChannelRead(msg); } ... }
看到又回到了节点的AbstractChannelHandlerContext,之前invokeChannelRead(Object)是触发当前节点channelRead方法,而fireChannelRead是触发当前节点的下一个节点的channelRead方法
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint { ... @Override public ChannelHandlerContext fireChannelRead(final Object msg) { // findContextInbound(MASK_CHANNEL_READ) 从当前节点开始查看“包含mask所标识的方法的”节点 // invokeChannelRead() 准备调用指定节点的channelRead() invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg); return this; } private AbstractChannelHandlerContext findContextInbound(int mask) { // 从当前节点开始查看“包含mask所标识的方法的”节点 AbstractChannelHandlerContext ctx = this; do { // Nio的SELECTIONKEY也是这么玩的 // 与运算,11得1,其他情况都是0 // 0001 1101 // 0000 0010 // 0000 0000(结果) // 与预算为0,说明不包含 // 0001 1101 // 0000 0100 // 0000 0100(结果) // 与预算不为0,说明不包含 ctx = ctx.next; } while ((ctx.executionMask & mask) == 0); return ctx; } ... }
findContextInbound方法的含义就是查找当前节点之后,“包含mask所标识的方法的”的第一个节点
看到执行invokeChannelRead,这个静态方法又回到我们之前跟DefaultChannelPipeline.fireChannelRead时,通过这个方法调用HeadContext节点的时候了,只不过这个时候next是我们的第一个处理器节点ChannelInboundHandler1:
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint { ... //准备调用指定节点的channelRead() static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { //注意此时next就是我们的第一个处理器节点ChannelInboundHandler1,因为它实现了channelRead方法 final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { // 准备触发当前next节点的channelRead() next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } } private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { //这个handler就是1号节点的处理器 ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } } ... }
我们追踪ChannelInboundHandler1 的channelRead方法:
public class ChannelInboundHandler1 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("1111 " + msg); // 触发当前节点的下一个节点的channelRead() ctx.fireChannelRead(msg); } // 在一个channel的pipeline中,若有多个处理器都重写的channelActive()方法, // 则只有一个处理器的channelActive()会被触发,哪一个?只有第一个重写的channelActive()会执行 // @Override // public void channelActive(ChannelHandlerContext ctx) throws Exception { // pipeline的fireChannelRead()是触发head节点的channelRead() // ctx.channel().pipeline().fireChannelRead("Hello World 1111"); // } }
看到这里逻辑就和Head节点一样了,只要执行ctx.fireChannelRead(msg),就会触发当前节点之后第一个实现channelRead()的方法的节点(实现标准是方法没有被@Skip标识,这个之前也讲过),后面就不跟了意思是一样的,这一次会找到二号节点,执行channelRead方法,然后再次调用ctx.fireChannelRead(msg),找到三号节点调用channelRead方法,最后到tail节点:
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { ... @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { onUnhandledInboundMessage(msg); } ... } //TailContext 是DefaultChannelPipeline的内部类 //io.netty.channel.DefaultChannelPipeline#onUnhandledInboundMessage protected void onUnhandledInboundMessage(Object msg) { try { //日志:丢弃inbound的消息,该消息已经到达pipline的尾部,请检查pipline配置 //最后走到tail节点,让tail节点释放msg不推荐 logger.debug( "Discarded inbound message {} that reached at the tail of the pipeline. " + "Please check your pipeline configuration.", msg); } finally { // 释放msg ReferenceCountUtil.release(msg); } } //io.netty.util.ReferenceCountUtil#release(java.lang.Object) public static boolean release(Object msg) { if (msg instanceof ReferenceCounted) { return ((ReferenceCounted) msg).release(); } return false; }
之前入门的时候讲过SimpleChannelInboundHandler,它和ChannelInboundHandlerAdapter区别就是:
- ChannelInboundHandlerAdapter:不会自动释放channelRead()中的msg,
除非执行ctx.fireChannelRead(msg)方法一直传到尾节点
- SimpleChannelInboundHandler:会自动释放channelRead()中的msg
public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter { ... @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { boolean release = true; try { if (acceptInboundMessage(msg)) { @SuppressWarnings("unchecked") I imsg = (I) msg; channelRead0(ctx, imsg); } else { release = false; ctx.fireChannelRead(msg); } } finally { if (autoRelease && release) { //释放 ReferenceCountUtil.release(msg); } } } ... }
3. Outbound 事件的传递
(1) 创建工程 17-channelOutboundHandler
复制 16-channelInboundHandler 工程,在其基础上进行修改:17-channelOutboundHandler
(2) 删除类
删除之前的三个 ChannelInboundHandler 类。
(3) 定义服务端处理器
在这里定义三个处理器。
public class ChannelOutboundHandler1 extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("1111 " + msg); ctx.write(msg, promise); } } public class ChannelOutboundHandler2 extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("2222 " + msg); ctx.write(msg, promise); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { ctx.executor().schedule(() -> { // 调用pipeline的write() ctx.channel().write("Hello World 2222"); }, 1, TimeUnit.SECONDS); } } public class ChannelOutboundHandler3 extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("3333 " + msg); // 调用当前节点的下一个节点的write() ctx.write(msg, promise); } }
(4) 修改服务端启动类
public class SomeServer { public static void main(String[] args) throws InterruptedException { EventLoopGroup parentGroup = new NioEventLoopGroup(); EventLoopGroup childGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(parentGroup, childGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ChannelOutboundHandler1()); pipeline.addLast(new ChannelOutboundHandler2()); pipeline.addLast(new ChannelOutboundHandler3()); } }); ChannelFuture future = bootstrap.bind(8888).sync(); System.out.println("服务器已启动。。。"); future.channel().closeFuture().sync(); } finally { parentGroup.shutdownGracefully(); childGroup.shutdownGracefully(); } } }
(5) 演示
看到顺序是3,2,1,这就是Outbound和Inbound区别,Outbound是从tail节点开始的!
注意有个细节,在ChannelOutboundHandler2.handlerAdded方法里,我们添加的是一个定时任务
!public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor { ... @Override public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { ObjectUtil.checkNotNull(command, "command"); ObjectUtil.checkNotNull(unit, "unit"); if (delay < 0) { delay = 0; } validateScheduled0(delay, unit); // 将任务command封装为一个定时任务 // schedule() 将定时任务添加到定时任务队列 return schedule(new ScheduledFutureTask<Void>( this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay)))); } <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) { if (inEventLoop()) { // 将定时任务添加到定时任务队列 // 至与添加完以后在哪执行的,之前已经分析过了 scheduledTaskQueue().add(task); } else { execute(new Runnable() { @Override public void run() { scheduledTaskQueue().add(task); } }); } return task; } ... }
为什么要加定时任务?如果不加,演示效果:
public class ChannelOutboundHandler2 extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("2222 " + msg); ctx.write(msg, promise); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { ctx.channel().write("Hello World 2222"); //ctx.executor().schedule(() -> { // 调用pipeline的write() // ctx.channel().write("Hello World 2222"); // }, 1, TimeUnit.SECONDS); } }
为什么没有处理器3呢?
主线程在添加处理器的时候,添加到第二个处理器以后,添加成功触发handlerAdded回调的时候,这些回调方法都是通过执行器的线程处理的,是两个线程,所以在触发handlerAdded写数据的时候,此时处理器3可能还没添加成功,所以就出现如上情况。(6) 源码分析:
我们从ChannelOutboundHandler2 开始追踪:
public class ChannelOutboundHandler2 extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("2222 " + msg); ctx.write(msg, promise); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { //一旦ChannelOutboundHandler2 添加成功会触发该回调 //因为是定时任务,所以执行的时候已经延迟1秒,这个时候处理器3已经添加成功了 ctx.executor().schedule(() -> { // 调用pipeline的write() ctx.channel().write("Hello World 2222"); }, 1, TimeUnit.SECONDS); } }
然后执行ctx.channel().write,
注意这是Channel.write方法不是ChannelHandlerContext.write方法
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { ... @Override public ChannelFuture write(Object msg) { // 调用tail节点的write() return pipeline.write(msg); } ... }
继续看pipeline.write:
public class DefaultChannelPipeline implements ChannelPipeline { ... @Override public final ChannelFuture write(Object msg) { // 调用tail节点的write() return tail.write(msg); } ... }
看到调用的是tail节点的write,tail节点没有重写write方法,用的父类AbstractChannelHandlerContext 的write:
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint { ... // tail节点重写的write() @Override public ChannelFuture write(Object msg) { return write(msg, newPromise()); } @Override public ChannelFuture write(final Object msg, final ChannelPromise promise) { write(msg, false, promise); return promise; } private void write(Object msg, boolean flush, ChannelPromise promise) { ObjectUtil.checkNotNull(msg, "msg"); try { // 若promise失效,则释放msg if (isNotValidPromise(promise, true)) { ReferenceCountUtil.release(msg); // cancelled return; } } catch (RuntimeException e) { ReferenceCountUtil.release(msg); throw e; } // 从尾节点的顺序开始查找下一个节点 // 如果flush是true,则(MASK_WRITE | MASK_FLUSH)或运算,就是writeAndFlush方法 final AbstractChannelHandlerContext next = findContextOutbound(flush ? (MASK_WRITE | MASK_FLUSH) : MASK_WRITE); //之前有介绍过不说了,一般不用关注 final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { //开启flush就是writeAndFlush next.invokeWriteAndFlush(m, promise); } else { //我们的Demo应该走的是这里,注意此时next应该是节点3 // 准备调用当前next节点的write() next.invokeWrite(m, promise); } } else { final AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } if (!safeExecute(executor, task, promise, m)) { // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes // and put it back in the Recycler for re-use later. // // See https://github.com/netty/netty/issues/8343. task.cancel(); } } } private AbstractChannelHandlerContext findContextOutbound(int mask) { // 从当前节点开始向前查找包含mask所指定方法的节点 AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while ((ctx.executionMask & mask) == 0); return ctx; } ... }
继续跟invokeWrite:
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint { ... private void invokeWrite(Object msg, ChannelPromise promise) { //invokeHandler判断处理器状态是否已经添加成功 if (invokeHandler()) { invokeWrite0(msg, promise); } else { write(msg, promise); } } private void invokeWrite0(Object msg, ChannelPromise promise) { try { //调用处理器的write方法,此时是3号节点的处理器 ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } ... }
public class ChannelOutboundHandler3 extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("3333 " + msg); // 调用当前节点的上一个节点的write() ctx.write(msg, promise); } }
3号节点再次调用ChannelHandlerContext .write方法:
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint { ... @Override public ChannelFuture write(final Object msg, final ChannelPromise promise) { write(msg, false, promise); return promise; } ... }
这里和tail节点执行write逻辑基本一样了,就不继续追踪了,一直会向上传递,最终会走到Head节点的write方法:
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { ... @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { // 调用底层unsafe的write(),完成真正的写操作 unsafe.write(msg, promise); } }
结论:InboudHandle传消息的时候是从Head节点开始往后传一直传到Tail,而OutboundHandle消息传递是从tail节点往前传,一直传到Head
4. 异常的传递与处理
(1) 创建工程 18-exceptionHandler
复制 16-channelInboundHandler 工程,在其基础上进行修改:18-exceptionHandler
(2) 修改 2 号 InboundHandler
在 2 号 InboundHandler 中产生异常。
public class ChannelInboundHandler2 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("InboundHandler222 - 正常读取客户端数据"); throw new ArrayIndexOutOfBoundsException("InboundHandler222 发生异常"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("exception -- InboundHandler222"); // 调用当前节点的下一个节点的exceptionCaught() ctx.fireExceptionCaught(cause); } }
(3) 修改1号与3号 InboundHandler
public class ChannelInboundHandler1 extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("exception -- InboundHandler111"); ctx.fireExceptionCaught(cause); } } public class ChannelInboundHandler3 extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("exception -- InboundHandler333"); ctx.fireExceptionCaught(cause); } }
(4) 添加两个 OutboundHandler
我们发现 ChannelOutboundHandlerAdapter 中的 exceptionCaught()方法已经过时,但我们在这里仅仅就是为了演示异常信息的传递方向,所以没有妨碍。
public class ChannelOutboundHandler1 extends ChannelOutboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("exception -- OutboundHandler111"); ctx.fireExceptionCaught(cause); } } public class ChannelOutboundHandler2 extends ChannelOutboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("exception -- OutboundHandler222"); ctx.fireExceptionCaught(cause); } }
(5) 定义异常处理器
// 异常处理器 public class ExceptionCaughtHandler extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { String msg = "发生异常"; if (cause instanceof ArrayIndexOutOfBoundsException) { msg = "发生边界索引溢出异常"; } System.out.println(msg); // 不再向下传递 // ctx.fireExceptionCaught(cause); } }
(6) 修改启动类
将异常处理器添加到 ChannelPipeline 的最后。
public class SomeServer { public static void main(String[] args) throws InterruptedException { EventLoopGroup parentGroup = new NioEventLoopGroup(); EventLoopGroup childGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(parentGroup, childGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ChannelInboundHandler1()); pipeline.addLast(new ChannelInboundHandler2()); // 异常抛出 pipeline.addLast(new ChannelInboundHandler3()); pipeline.addLast(new ChannelOutboundHandler1()); pipeline.addLast(new ChannelOutboundHandler2()); pipeline.addLast(new ExceptionCaughtHandler()); } }); ChannelFuture future = bootstrap.bind(8888).sync(); System.out.println("服务器已启动。。。"); future.channel().closeFuture().sync(); } finally { parentGroup.shutdownGracefully(); childGroup.shutdownGracefully(); } } }
(7) 演示
ExceptionCaughtHandler先不添加时的效果:
服务端启动,向服务端发送一个消息
效果:
可以看出来调用链:
ChannelInboundHandler2.channelRead -> ChannelInboundHandler2.exceptionCaught -> ChannelInboundHandler3.exceptionCaught -> ChannelOutboundHandler1.exceptionCaught -> ChannelOutboundHandler2.exceptionCaught也就是说异常在传递的时候是不分Inbound、Outbound,通通是从发生异常的节点往后传
(实际上是先执行当前处理器的exceptionCaught方法,而我们实现是又调用了ctx.fireExceptionCaught(cause),才导致往后传的)(8)源码分析:
我们从抛出异常的地方ChannelInboundHandler2.channelRead开始:
public class ChannelInboundHandler2 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("InboundHandler222 - 正常读取客户端数据"); throw new ArrayIndexOutOfBoundsException("InboundHandler222 发生异常"); } ... }
channelRead抛出异常,肯定有捕获的地方,是在AbstractChannelHandlerContext.invokeChannelRead执行处理器read方法的时候:
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint { ... private void invokeChannelRead(Object msg) { if (invokeHandler()) { // 判断当前节点封装的处理器状态是否为已添加 try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { // 引发异常的处理 notifyHandlerException(t); } } else { fireChannelRead(msg); } } private void notifyHandlerException(Throwable cause) { if (inExceptionCaught(cause)) { if (logger.isWarnEnabled()) { logger.warn( "An exception was thrown by a user handler " + "while handling an exceptionCaught event", cause); } return; } // 调用当前发生异常的节点的exceptionCaught() invokeExceptionCaught(cause); } private void invokeExceptionCaught(final Throwable cause) { if (invokeHandler()) { try { //调用当前节点处理器的exceptionCaught,注意当前节点是ChannelInboundHandler2 handler().exceptionCaught(this, cause); } catch (Throwable error) { if (logger.isDebugEnabled()) { logger.debug( "An exception {}" + "was thrown by a user handler's exceptionCaught() " + "method while handling the following exception:", ThrowableUtil.stackTraceToString(error), cause); } else if (logger.isWarnEnabled()) { logger.warn( "An exception '{}' [enable DEBUG level for full stacktrace] " + "was thrown by a user handler's exceptionCaught() " + "method while handling the following exception:", error, cause); } } } else { fireExceptionCaught(cause); } } ... }
可以看出来会调用当前处理器的exceptionCaught方法,即ChannelInboundHandler2.exceptionCaught:
public class ChannelInboundHandler2 extends ChannelInboundHandlerAdapter { ... @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("exception -- InboundHandler222"); // 调用当前节点的下一个节点的exceptionCaught() ctx.fireExceptionCaught(cause); } }
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint { ... @Override public ChannelHandlerContext fireExceptionCaught(final Throwable cause) { // findContextInbound之前讲Inbound消息传递讲过,实际上就是从当前节点的后面进行查询 // 此时会查询出ChannelInboundHandler3节点 // 调用指定节点的exceptionCaught() invokeExceptionCaught(findContextInbound(MASK_EXCEPTION_CAUGHT), cause); return this; } //调用指定节点的exceptionCaught() static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) { ObjectUtil.checkNotNull(cause, "cause"); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { // 调用当前next节点的exceptionCaught() // 此时next就是ChannelInboundHandler3节点 next.invokeExceptionCaught(cause); } else { try { executor.execute(new Runnable() { @Override public void run() { next.invokeExceptionCaught(cause); } }); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to submit an exceptionCaught() event.", t); logger.warn("The exceptionCaught() event that was failed to submit was:", cause); } } } } ... }
后面不跟了,逻辑基本一样,最终会执行到tail节点的exceptionCaught:
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { ... @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { onUnhandledInboundException(cause); } protected void onUnhandledInboundException(Throwable cause) { try { //警告:异常传递到pipline尾部,通常意味着最后一个处理器都没有处理这个异常 //不建议 logger.warn( "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " + "It usually means the last handler in the pipeline did not handle the exception.", cause); } finally { // 释放异常信息cause ReferenceCountUtil.release(cause); } } ... }
一般异常处理我们会再定义一个专门的异常处理器,放到pipline链的最后,即上面第五步,直接演示结果:
-
outbound:Go 的简单网络客户端,灵感来自 python-requests
2021-06-20 13:46:31outbound 是 Golang 的多功能 http 客户端。 还有一个,如果你愿意的话。 它借鉴了其他各种软件包和客户端的许多最佳实践。 它将他们的范例组合成一个易于使用但功能齐全的出站 http 客户端。 大部分灵感(和口号)... -
test_outbound_socket_bridge.c
2021-12-30 17:02:26freeswitch 外绑定案例 -
Envoy流量劫持后outbound部分处理逻辑
2021-09-25 00:35:11因为iptables劫持流量的时候,只会在Envoy中基于outbound流量出口15006创建一个真实的物理连接socket,所有的流量到了这个socket之后,会先匹配到对应Listener。 Listener可以理解成一个逻辑连接,存储在内存中,... -
mandrill-inbound-outbound-forward-php:用于使用 Mandrill 入站出站服务转发电子邮件的 PHP 库
2021-05-31 00:41:16使用 Mandrill Outbound 转发另一个电子邮件/邮箱(如 GMAIL)上的电子邮件 收到的每封电子邮件都将占用 1 个发送槽(被转发到您指定的地址)。 包括 带有域 example.com 的配置示例 composer.json 配置文件 先决... -
plugin-flex-outbound-dialpad:用于拨号盘解决方案的示例Twilio Flex插件,可用于会议和主管监控以及外部...
2021-05-13 09:50:29插件-flex-outbound-dialpad 该插件旨在演示如何使用本机呼叫业务流程从发出出站呼叫,因此入站呼叫功能(如主管监视以及冷热转移)也适用于出站呼叫。 该插件还提供了执行外部会议的能力,从而可以利用上的工作这个... -
Lite Outbound Manager-开源
2021-05-02 04:49:16基于PHP的Web应用程序易于管理呼叫中心出站活动。 使用外部XML文件可自定义的通话结果和电话采访。 -
PyPI 官网下载 | outbound-1.1.0.tar.gz
2022-02-10 18:34:27资源来自pypi官网。 资源全名:outbound-1.1.0.tar.gz -
如何使用jdbc-outbound-channel-adapter过滤Spring Integration流?
2021-03-15 21:06:37我有一个要求,我必须FTP从远程服务器读取XML文件并将其转储到本地目录中 . 之后,我必须使用传入的有效负载中的值对数据库发出SQL查询,然后 - 根据查询的结果 - 决定是否要继续流 .channel="inboundFTPFileChannel... -
Outbound New Tab-crx插件
2021-04-07 05:44:45语言:English 从新标签中的出站的冒险经历。 在每个新选项卡中,我们将分享来自世界某个地方的令人难以置信的冒险。 照片来自出境集体,旅行者社区分享当地冒险,旅行故事等。 想在出站选项卡中看到您的照片?... -
IDoc DESADV trigger inbound delivery after PGI for inter-company STO‘s outbound delivery
2021-11-22 17:35:45IDoc DESADV trigger inbound delivery after PGI for inter-company STO's outbound delivery In the inter-company purchase (inter-company stock transfer) process, a common solution is to automatically ... -
Netty Inbound/Outbound通道处理器定义
2017-07-27 09:05:00实际上消息编码器为Outbound通道处理器,下面我们来看一下Outbound处理器的定义。 package io.netty.channel; import java.net.SocketAddress; /** * {@link ChannelHandler} which will get notified for IO-... -
template-sdfc-outbound-message-to-gsheet:收到Salesforce Outbound消息时将行添加到Google表格
2021-03-08 22:07:21模板sdfc出站消息到gsheet 收到Salesforce Outbound消息时将行添加到Google表格 -
identity-outbound-auth-duo
2021-04-12 21:17:01欢迎使用WSO2身份服务器(IS)Duo身份验证器。 WSO2 IS是最好的Identity Server之一,它使您可以完全从应用程序中减轻身份和用户权利管理负担。 它具有许多功能,支持许多行业标准,最重要的是,它允许您根据安全... -
交换机调用ACL时候的inbound和outbound该怎么用?
2020-04-09 00:01:25我们知道,简单的ACL(访问控制列表)配置以后,通常在物理接口或者vlanif虚接口下调用,但是我们时常不明白,到底什么时候该用inbound,什么时候该用outbound,下面是我自己简单的理解。 一:物理接口下调用 ... -
Spring Integration Java DSL-如何调用int-http:outbound-gateway?
2021-07-17 02:48:53I have a piece in the flow where a ReST API call is made:reply-channel="logger"url="${api.base.uri}/data"http-method="PUT"expected-response-type="java.lang.String"/>logger-name="logger"expression=... -
核心9303经常有外国的暴力尝试登录-可以用traffic-policy gongji global outbound
2022-03-07 16:09:54# 只能用traffic-policy gongji global outbound 这个策略的 outbound的方向来实现 用 in不行 traffic-policy gongji global 这个 匹配的 acl 不分deny 和 permit 只要 能被 acl 匹配 然后就看 流行为的动作 但是在... -
outbound_diary:IIT KGP IR CELL出港日记我在DJANGO的版本
2021-03-02 03:40:25outbound_diary IIT KGP红外线电池我的版本在DJANGO的日记在命令行中打开向下定位文件夹的目录,在窗口中运行“ py manage.py runserver”,在浏览器中的服务器上,运行“ / diary /”以查看页面 -
Netty之ChannelPipeline(三)Outbound与Inbound事件的传播
2020-02-11 00:06:54Outbound事件是请求事件。 Outbound事件的发起者是Channel,处理者是Unsafe。 Outbound事件在Pipeline中传输方向是tail -> head。 Outbount事件其中之一bind,以bind为例: AbstractChannel bind(SocketAddress ... -
Web应用的 outbound TCP连接到达一定数量后,客户代码继续创建连接出错 1
2022-08-08 18:18:43如果您一个实例上部署了多个应用和web作业,那么总连接数是这些应用和web作业所发连接之和。如果是web app应用,是以定价层为计量单位的,如果一个定价层中包