登录
登录 注册新账号
注册
已有账号登录
Java读源码之Netty深入剖析
schelling 阅读 162 次
8月5日发布

如果要阅读源码,首先就要学会基本的设计模式。
设计模式是前人总结出来的软件设计方法,有利于使代码更加简洁优雅。
了解了netty的设计模式,再去看源码,会有一种焕然大悟的感觉。
想要学习全部的全部的《Java读源码之Netty深入剖析》可以看网盘地址
一、单例模式
单例模式是最常见的设计模式:
1、忽略反射的影响,全局只有一个实例
2、有可能会出现延迟创建实例对象,要使用的时候才创建3、这种设计模式能够避免线程安全问题
netty单例模式分析详情-----------------------------------------------------------------------------------------------

netty设计模式-单例模式
单例模式是最常见的设计模式:
1、忽略反射的影响,全局只有一个实例
2、有可能会出现延迟创建实例对象,要使用的时候才创建
3、这种设计模式能够避免线程安全问题

最常见的单例模式实现方法:
懒汉模式(单例对象 volatile + 双重检测机制 -> 禁止指令重排)
public class SingletonExample5 {
// 私有构造函数
private SingletonExample5() {
}
// 1、memory = allocate() 分配对象的内存空间
// 2、ctorInstance() 初始化对象
// 3、instance = memory 设置instance指向刚分配的内存
// 单例对象 volatile + 双重检测机制 -> 禁止指令重排
private volatile static SingletonExample5 instance = null;

   // 静态的工厂方法
   public static SingletonExample5 getInstance() {
       if (instance == null) { // 双重检测机制        // B
           synchronized (SingletonExample5.class) { // 同步锁
               if (instance == null) {
                   instance = new SingletonExample5(); // A - 3
               }
           }
       }
       return instance;
   }

用私有构造函数实现全局一个对象实例,用静态工厂方法实现延迟加载,用各种锁的机制,实现线程安全。
netty里面的单例模式有MqttEncoder和ReadTimeoutException,代码如下:
ChannelHandler.Sharable
public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {

   public static final MqttEncoder INSTANCE = new MqttEncoder();

   private MqttEncoder() { }

public final class ReadTimeoutException extends TimeoutException {

   private static final long serialVersionUID = 169287984113283421L;

   public static final ReadTimeoutException INSTANCE = new ReadTimeoutException();

   private ReadTimeoutException() { }

}
他们都是使用私有构造函数实现全局单个对象实例,用public static final实现延迟加载和线程安全。

二、策略模式
1、封装一系列可替换的算法家族
2、支持动态选择某一个策略
netty策略模式分析详情-------------------------------------------------------------------------------------------------------------------

netty设计模式-策略模式
策略模式的特点大致如下:
1、封装一系列可替换的算法家族
2、支持动态选择某一个策略

常见的策略模式实现方式
/**
* see DefaultEventExecutorChooserFactory#newChooser(EventExecutor[])
*/
public class Strategy {
private Cache cacheMemory = new CacheMemoryImpl();
private Cache cacheRedis = new CacheRedisImpl();

   public interface Cache {
       boolean add(String key, Object object);
   }

   public class CacheMemoryImpl implements Cache {
       Override
       public boolean add(String key, Object object) {
           // 保存到map
           return false;
       }
   }

   public class CacheRedisImpl implements Cache {
       Override
       public boolean add(String key, Object object) {
           // 保存到redis
           return false;
       }
   }

   public Cache getCache(String key) {
       if (key.length() < 10) {
           return cacheRedis;
       }
       return cacheMemory;
   }

}
在netty的DefaultEventExecutorChooserFactory 的里面,就是如此实现策略模式的:
UnstableApi
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

   public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

   private DefaultEventExecutorChooserFactory() { }

   SuppressWarnings("unchecked")
   Override
   public EventExecutorChooser newChooser(EventExecutor[] executors) {
       if (isPowerOfTwo(executors.length)) {
           return new PowerOfTowEventExecutorChooser(executors);
       } else {
           return new GenericEventExecutorChooser(executors);
       }
   }

   private static boolean isPowerOfTwo(int val) {
       return (val & -val) == val;
   }

   private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
       private final AtomicInteger idx = new AtomicInteger();
       private final EventExecutor[] executors;

       PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
           this.executors = executors;
       }

       Override
       public EventExecutor next() {
           return executors[idx.getAndIncrement() & executors.length - 1];
       }
   }

   private static final class GenericEventExecutorChooser implements EventExecutorChooser {
       private final AtomicInteger idx = new AtomicInteger();
       private final EventExecutor[] executors;

       GenericEventExecutorChooser(EventExecutor[] executors) {
           this.executors = executors;
       }

       Override
       public EventExecutor next() {
           return executors[Math.abs(idx.getAndIncrement() % executors.length)];
       }
   }

}

三、装饰器模式
装饰器模式被大量地使用在各种框架的源码里面,真正学会了对看源码和设计软件受益匪浅。
1、装饰者和被装饰者继承同一个接口
2、装饰者给被装饰者动态修改行为
netty装饰器模式分析详情---------------------------------------------------------------------------------------------------------

netty设计模式-装饰器模式

装饰器模式被大量地使用在各种框架的源码里面,真正学会了对看源码和设计软件受益匪浅。
1、装饰者和被装饰者继承同一个接口
2、装饰者给被装饰者动态修改行为

首先我们一生活中的例子来看一看装饰器模式:
/**
* see io.netty.buffer.WrappedByteBuf;
* see io.netty.buffer.UnreleasableByteBuf
* see io.netty.buffer.SimpleLeakAwareByteBuf
*/
public class Decorate {

   // 优惠方案
   public interface OnSalePlan {
       float getPrice(float oldPrice);
   }

   // 无优惠
   public static class NonePlan implements OnSalePlan {
       static final OnSalePlan INSTANCE = new NonePlan();

       private NonePlan() {

       }

       public float getPrice(float oldPrice) {
           return oldPrice;
       }
   }

   // 立减优惠
   public static class KnockPlan implements OnSalePlan {
       // 立减金额
       private float amount;

       public KnockPlan(float amount) {
           this.amount = amount;
       }

       public float getPrice(float oldPrice) {
           return oldPrice - amount;
       }
   }


   // 打折优惠
   public static class DiscountPlan implements OnSalePlan {
       // 折扣
       public int discount;
       private OnSalePlan previousPlan;

       public DiscountPlan(int discount, OnSalePlan previousPlan) {
           this.discount = discount;
           this.previousPlan = previousPlan;
       }

       public DiscountPlan(int discount) {
           this(discount, NonePlan.INSTANCE);
       }

       public float getPrice(float oldPrice) {
           return previousPlan.getPrice(oldPrice) * discount / 10;
       }
   }

   public static void main(String[] args) {
       DiscountPlan simpleDiscountPlan = new DiscountPlan(5);
       System.out.println(simpleDiscountPlan.getPrice(100));

       KnockPlan previousPlan = new KnockPlan(50);
       DiscountPlan complexDiscountPlan = new DiscountPlan(5, previousPlan);
       System.out.println(complexDiscountPlan.getPrice(100));
   }

}
我们以OnSalePlan 作为接口,装饰者或者被装饰者都继承或者实现这个接口。
NonePlan是一种实现方案、KnockPlan也是一种实现方案、DiscountPlan也是一种实现方案,但是它可以实现我们的装饰器模式。
在main函数里面,我们首先实现了一个简单的打折方案,这里没有用到装饰器模式,打印出50;重点在第二个方案,首先实例出一个立减方案,立减方案KnockPlan作为被装饰者传入到DiscountPlan打折方案里面,DiscountPlan就是装饰者,它装饰了KnockPlan的行为,使得它的行为得以改变。
在netty里面,我们以WrappedByteBuf、UnreleasableByteBuf 、SimpleLeakAwareByteBuf这三个类讲解分析里面用到的设计模式。
WrappedByteBuf是所有装饰器的基类,它继承自ByteBuf
class WrappedByteBuf extends ByteBuf {

   protected final ByteBuf buf;

   protected WrappedByteBuf(ByteBuf buf) {
       if (buf == null) {
           throw new NullPointerException("buf");
       }
       this.buf = buf;
   }

   Override
   public final boolean hasMemoryAddress() {
       return buf.hasMemoryAddress();
   }

   Override
   public final long memoryAddress() {
       return buf.memoryAddress();
   }
   ...

}
首先看构造函数,传入一个ByteBuf实例,这个传入的实例就是被装饰者,它的行为可以被当前这个类,也就是WrappedByteBuf(也就是装饰者)动态改变。因为这个WrappedByteBuf它只是装饰器的基类,所以他只对传入的被装饰者的行为做简单的返回,没做任何修改,…后面是更多的方法,都是直接调用被装饰者的方法。

接下来看WrappedBuf的一个子类UnreleasableByteBuf,这个类一看就是什么什么不释放的类,具体什么我们先不管,我们找到源代码:
final class UnreleasableByteBuf extends WrappedByteBuf {

   private SwappedByteBuf swappedBuf;

   UnreleasableByteBuf(ByteBuf buf) {
       super(buf);
   }

   ...

   Override
   public boolean release() {
       return false;
   }

   ...

}
首先调用父类WrappedByteBuf的构造方法,前面我们分析过,就是把被装饰者传进来,以供以后使用。然后看里面有一行 public boolean release() { return false;},我们不管它release释放了什么,反正它release就是返回false,装饰或者说是改变了被装饰者buf的行为。
另外一个WrappedByteBuf的子类SimpleLeakAwareByteBuf,这个类顾名思义就是内存泄漏自动感知的一个ByteBuf,源码:
final class SimpleLeakAwareByteBuf extends WrappedByteBuf {

   private final ResourceLeak leak;

   SimpleLeakAwareByteBuf(ByteBuf buf, ResourceLeak leak) {
       super(buf);
       this.leak = leak;
   }

   ...

   Override
   public boolean release() {
       boolean deallocated =  super.release();
       if (deallocated) {
           leak.close();
       }
       return deallocated;
   }

   ...

}
构造器还是调用父类的方法,在release 这个方法里面,如果发现内存泄漏了,就执行leak.close()这个方法,然后在返回,其实也是修饰了被装饰者,动态改变了被装饰着的行为。
当然WrappedByteBuf还有很多子类的,我们目前只分析这两个来实现分析netty装饰器模式。

四、观察者模式
1、观察者和被观察者
2、观察者订阅消息,被观察者发布消息
3、订阅则能收到,取消订阅收不到
netty观察者模式分析详情---------------------------------------------------------------------------------------------------------------

netty设计模式-观察者模式

这个模式可以说在java的源码里面应用很广泛了,各种addListener,future这些,最终都是观察者模式的体现。
1、观察者和被观察者
2、观察者订阅消息,被观察者发布消息
3、订阅则能收到,取消订阅收不到
首先我们看看观察者模式的一个生活中的例子。
/**
* 被观察者
*/
public interface Observerable {
void registerObserver(Observer o);

       void removeObserver(Observer o);

       void notifyObserver();
   }

   /**
    * 观察者
    */
   public interface Observer {
       void notify(String message);
   }

   public static class Girl implements Observerable {
       private String message;

       List<Observer> observerList;

       public Girl() {
           observerList = new ArrayList<>();
       }


       Override
       public void registerObserver(Observer o) {
           observerList.add(o);
       }

       Override
       public void removeObserver(Observer o) {
           observerList.remove(o);
       }

       Override
       public void notifyObserver() {
           for (Observer observer : observerList) {
               observer.notify(message);
           }
       }

       public void hasBoyFriend() {
           message = "女神有男朋友了";
           notifyObserver();
       }

       public void getMarried() {
           message = "女神结婚了,你们都死心吧!";
           notifyObserver();
       }

       public void getSingled() {
           message = "女神单身了,你们有机会了!";
           notifyObserver();
       }
   }

   /**
    * 男孩
    */
   public static class Boy implements Observer {
       public void notify(String message) {
           System.out.println("男孩收到消息:" + message);
       }
   }

   /**
    * 男人
    */
   public static class Man implements Observer {
       public void notify(String message) {
           System.out.println("男人收到消息:" + message);
       }
   }

   /**
    * 老男人
    */
   public static class OldMan implements Observer {
       public void notify(String message) {
           System.out.println("老男人收到消息:" + message);
       }
   }

   public static void main(String[] args) {
       Girl girl = new Girl();
       Boy boy = new Boy();
       Man man = new Man();
       OldMan oldMan = new OldMan();

       // 通知男孩、男人、老男人,女神有男朋友了
       girl.registerObserver(boy);
       girl.registerObserver(man);
       girl.registerObserver(oldMan);
       girl.hasBoyFriend();
       System.out.println("====================");

       // 通知男孩,男人,女神结婚了
       girl.removeObserver(oldMan);
       girl.getMarried();
       System.out.println("====================");


       girl.registerObserver(oldMan);
       girl.getSingled();
   }

流程就是,观察者注册到被观察者(也就是被观察者的列表里面添加观察者),然后发生了事情,被观察者就调用列表里面观察者的方法。
对于netty,我们首先查看一下我们使用的例子
public void write(NioSocketChannel channel, Object object) {
ChannelFuture channelFuture = channel.writeAndFlush(object);
channelFuture.addListener(future -> {
if (future.isSuccess()) {

           } else {

           }
       });
       channelFuture.addListener(future -> {
           if (future.isSuccess()) {

           } else {

           }
       });
   }

首先创建一个channelFuture,这个就是被观察者,addListener把观察者都加进去。
首先查看被观察者创建的过程,writeAndFlush一直跟进去,可以看到这样一段代码:
Override
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise());
}
newPromise()这个函数其实就是创建一个被观察者。newPromise()跟进去就是这样一行代码:
Override
public ChannelPromise newPromise() {
return new DefaultChannelPromise(channel(), executor());
}
ChannelPromise就是被观察者。

创建被观察者之后,就要把观察者添加进去。跟进去ChannelPromise的实现者DefaultChannelPromise的addListener方法,一直跟进去,会看到这样一串有趣的代码:
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners == null) {
listeners = listener;
} else if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).add(listener);
} else {
listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
}
}
添加listener就添加listener,为什么还要分三段?netty在这里做了一个优化。首先我们要知道的是,listeners定义为一个Object,意味着它可以装换成任意对象。
/**
* One or more listeners. Can be a {link GenericFutureListener} or a {link DefaultFutureListeners}.
* If {code null}, it means either 1) no listeners were added yet or 2) all listeners were notified.
*
* Threading - synchronized(this). We must support adding listeners when there is no EventExecutor.
*/
private Object listeners;
那么,如果第一个listener进来,直接定义listeners为listener;第二个进来,就把它定义为DefaultFutureListeners,把之前的listener和现在进来的包装进去,其实这个时候listeners就变成了一个列表,点进去DefaultFutureListeners就能看到如下代码:
SuppressWarnings("unchecked")
DefaultFutureListeners(
GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) {
listeners = new GenericFutureListener[2];
listeners[0] = first;
listeners[1] = second;
size = 2;
if (first instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
if (second instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
}
第三次就把新加进来的添加到DefaultFutureListeners里面就行了。有兴趣自己看源码。

把观察者添加进去之后,什么时候通知呢。netty的NioSocketChannel的WriteAndFlush执行完,成功或者失败,都会通知到listener。继续看writeAndFlush方法,跟进去会看到:
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
首先看invokeWrite0,这个代码中可能会有失败,会告知listener;最终如果成功在invokeFlush0里面体现,也会通知给listener;
先看invokeWrite0,跟进去看到 write方法,用HeadContext的实现,再跟进去,调用DefaultPromiser的tryFailure方法就能看到notifyListener()了。notifyListeners();
notifyListeners();这个才是是通知linstener的方法。
再一直跟进去会看到这段代码:
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
l.operationComplete(future)这个方法就会通知到观察者。
同理,invokeFlush0进去,使用HeadContext的flush方法,一直进去,使用NioSocketChannel的doWrite方法,一直进去,看到这样的代码:
Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = -1;

       boolean setOpWrite = false;
       for (;;) {
           Object msg = in.current();
           if (msg == null) {
               // Wrote all messages.
               clearOpWrite();
               // Directly return here so incompleteWrite(...) is not called.
               return;
           }

           if (msg instanceof ByteBuf) {
               ByteBuf buf = (ByteBuf) msg;
               int readableBytes = buf.readableBytes();
               if (readableBytes == 0) {
                   in.remove();
                   continue;
               }

               boolean done = false;
               long flushedAmount = 0;
               if (writeSpinCount == -1) {
                   writeSpinCount = config().getWriteSpinCount();
               }
               for (int i = writeSpinCount - 1; i >= 0; i --) {
                   int localFlushedAmount = doWriteBytes(buf);
                   if (localFlushedAmount == 0) {
                       setOpWrite = true;
                       break;
                   }

                   flushedAmount += localFlushedAmount;
                   if (!buf.isReadable()) {
                       done = true;
                       break;
                   }
               }

               in.progress(flushedAmount);

               if (done) {
                   in.remove();
               } else {
                   // Break the loop and so incompleteWrite(...) is called.
                   break;
               }
           } else if (msg instanceof FileRegion) {
               FileRegion region = (FileRegion) msg;
               boolean done = region.transferred() >= region.count();

               if (!done) {
                   long flushedAmount = 0;
                   if (writeSpinCount == -1) {
                       writeSpinCount = config().getWriteSpinCount();
                   }

                   for (int i = writeSpinCount - 1; i >= 0; i--) {
                       long localFlushedAmount = doWriteFileRegion(region);
                       if (localFlushedAmount == 0) {
                           setOpWrite = true;
                           break;
                       }

                       flushedAmount += localFlushedAmount;
                       if (region.transferred() >= region.count()) {
                           done = true;
                           break;
                       }
                   }

                   in.progress(flushedAmount);
               }

               if (done) {
                   in.remove();
               } else {
                   // Break the loop and so incompleteWrite(...) is called.
                   break;
               }
           } else {
               // Should not reach here.
               throw new Error();
           }
       }
       incompleteWrite(setOpWrite);
   }

这里有个in.remove()的代码,当读完了之后,就可以删除Buffer,然后通知listener成功了。
in.remove()进去,有个safeSuccess()方法,一直trySuccess(),使用DefaultPromise的trySuccess(),最后又看到这个熟悉的方法了:

   Override
   public boolean trySuccess(V result) {
       if (setSuccess0(result)) {
           notifyListeners();
           return true;
       }
       return false;
   }

最后都同 invokeWrite0一样。

netty就通过Promise模式或者说ChannelFuture模式实现了观察者模式。netty就通过实现观察者模式达到异步通知,每次写成功写失败,都会回调给listener,也就是回调到future.isSuccess()等方法。其实就是变种的观察者模式。

五、迭代器模式
1、有一个迭代器接口。
2、对容器里面各个对象进行访问。
netty迭代器模式分析详情---------------------------------------------------------------------------------------------------

迭代器模式使用很多,但是被我们经常忽略它居然也是一种模式。
1、有一个迭代器接口。
2、对容器里面各个对象进行访问。
netty里面的CompositeByteBuf这个零拷贝的实现,就使用了迭代器模式。首先看一段代码:
public static void main(String[] args) {
ByteBuf header = Unpooled.wrappedBuffer(new byte[]{1, 2, 3});
ByteBuf body = Unpooled.wrappedBuffer(new byte[]{4, 5, 6});

       ByteBuf merge = merge(header, body);
       merge.forEachByte(value -> {
           System.out.println(value);
           return true;
       });
   }

   public static ByteBuf merge(ByteBuf header, ByteBuf body) {
       CompositeByteBuf byteBuf = ByteBufAllocator.DEFAULT.compositeBuffer(2);
       byteBuf.addComponent(true, header);
       byteBuf.addComponent(true, body);

       return byteBuf;
   }

这段代码是把两个ByteBuf添加到一起,forEachByte就是实现了迭代器模式。那么怎么说它是零拷贝呢?
找forEachByte的实现,在AbstractByteBuf里面。
有这样一段代码:
Override
public int forEachByte(ByteProcessor processor) {
ensureAccessible();
try {
return forEachByteAsc0(readerIndex, writerIndex, processor);
} catch (Exception e) {
PlatformDependent.throwException(e);
return -1;
}
}
从readerIndex开始读,读到writeIndex。继续点进去查看:
private int forEachByteAsc0(int start, int end, ByteProcessor processor) throws Exception {
for (; start < end; ++start) {
if (!processor.process(_getByte(start))) {
return start;
}
}

       return -1;
   }

查看_getByte的实现,当然是找CompositeByteBuf的实现了:
Override
protected byte _getByte(int index) {
Component c = findComponent(index);
return c.buf.getByte(index - c.offset);
}
先找出是哪个componet,然后迭代的时候其实是直接返回这个componet的byte内容就可以,这样就实现了零拷贝。别的类迭代的话,可能会把所有的数据都复制一遍。

六、责任链模式
1、责任处理器接口
2、创建链,添加删除责任处理器接口
3、上下文
4、责任链终止机制。
netty责任链模式分析详情-------------------------------------------------------------------------------------------------------------

netty设计模式-责任链模式

责任链模式的定义:使多个对象都有机会处理请求,从而避免请求的发送者和接受者之间的耦合关系, 将这个对象连成一条链,并沿着这条链传递该请求,直到有一个对象处理他为止。
首先来看看责任链模式的四个要素:
1、责任处理器接口
2、创建链,添加删除责任处理器接口
3、上下文
4、责任链终止机制。
在netty里面,很明显channelHandler和Pipeline构成了责任链模式。让我们通过上面的要素,一个一个分析

1、首先是责任处理器接口:
ChannelHandler就是责任处理器接口,ChannelInboundHandler、ChannelOuntboundHandler是它的两个增强。

2、找到创建链,添加删除责任处理器接口ChannelPipeline:
里面有各种add和remove的方法

3、然后是上下文ChannelHandlerContext:
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {

   /**
    * Return the {link Channel} which is bound to the {link ChannelHandlerContext}.
    */
   Channel channel();

   /**
    * Returns the {link EventExecutor} which is used to execute an arbitrary task.
    */
   EventExecutor executor();

   ...

}
里面有两个最重要的方法,一个返回绑定的channel,一个返回executor来执行任务。

4、最后我们看责任链终止机制
现在我们自定义一个InBoundHandlerC
public class InBoundHandlerC extends ChannelInboundHandlerAdapter {
Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerC: " + msg);
ctx.fireChannelRead(msg);
}
}
ctx.fiteChannelRead方法就是为了把责任传递下去。如果注释掉了,消息就不会传递。另外如果不重写channelRead方法,默认会传递,让我们查看ChannelInboundHandlerAdapter的channelRead方法:
Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
默认会传递。这里另外提一句,java里面的很多filter,是否继续向下传递,都是return true还是return 方法来实现的。

最后,消息是如何一步步向下传递的呢,让我们看AbstractChannelHandlerContext的fireChannelRead方法:
Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
继续点击findContextInbound(),可以看到:
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
也就是,这里是不停地指向下一个对象实现的。