1. 我向 Apache Dubbo 提的一个 PR

我为 Apache Dubbo 提了如下的一个 PR,试图为 Telnet 通信模式提供一个 Idle 连接计时关闭特性。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dubbo.qos.server.handler;

import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleStateEvent;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;

@ChannelHandler.Sharable
  public class TelnetIdleEventHandler extends ChannelDuplexHandler {
    private static final Logger log = LoggerFactory.getLogger(IdleEventHandler.class);

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
      // server will close channel when server don't receive any request from client util timeout.
      if (evt instanceof IdleStateEvent) {
        Channel channel = ctx.channel();
        log.info("IdleStateEvent triggered, close channel " + channel);
        channel.close();
      } else {
        super.userEventTriggered(ctx, evt);
      }
    }
  }

但是 Reviewer 要求我将 @ChannelHandler.Sharable 注解删去,他说道:

It’s alway create new TelnetIdleEventHandler for channel.

我想我应该是对 @ChannelHandler.Sharable 注解学习地不到位,那么 @ChannelHandler.Sharable 注解究竟能够起到什么作用呢?

2. Netty 多线程并发模型与 Sharable 注解

我们可以注意到 Netty 中的多线程并发问题即:workerGroup 内的多 EventLoop 线程并发模型。因此我们可以做一个等式:

Netty 的多线程并发模型 = workerGroup 中的多个 EventLoop 线程试图使用同一个实例,并同时试图修改该实例的状态(同时意味着没有使用良好的锁机制)。

那么,什么时候会出现同一个 Handler 被多线程同时并发使用呢?

下面是一个例子。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
//1.先构造一个 Handler 实例
MyHandler handler = new MyHanlder();

serverBootstrap.childHandler(new ChannelInitializer<Channel>() {

  @Override
  protected void initChannel(Channel ch) throws Exception {
    //向 Pipeline 中加入此 Handler 实例
    ch.pipeline().addLast(handler);
  }
});

每一个 TCP 请求到来时,生成一个 Channel 实例,然后利用 ChannelInitializer#initChannel 方法来初始化此 Channel 对应的 Pipeline,此时同一个 MyHandler 类实例就会被加入到 Pipeline 中。

由于不同的 TCP 连接将会分别被不同的 EventLoop 线程负责处理,因此会出现这样一个问题:

  • 多个 Pipeline 分别被多个 EventLoop 线程负责执行;
  • 多个 Pipeline 中却共享同一个 MyHandler 类实例;

显而易见,此时出现了多线程并发模型。如下图所示:

SharableHandler

其中黄、绿代表被不同 EventLoop 线程独占的 Handler 实例,而粉色代表被所有 EventLoop 线程共享的 Handler 实例。可见,多个线程并发共享 MyHandler 实例,自然会出现并发安全问题。

如果你试图共享上述 MyHandler,那么你必须给 MyHandler 加上 @ChannelHandler.Sharable 注解,否则在第二个连接到来时,Netty 就会抛出 MyHandler is not a @Sharable handler, so can't be added or removed multiple times. 的异常。

注意:即使你这个类就是无状态的,百分百并发安全,在没有使用 @ChannelHandler.Sharable 注解的条件下,又在多 Pipeline 中共享此实例,就会抛出异常。

但是如果这是一个有状态的 Handler,例如下面这个例子:

1
2
3
4
5
6
7
8
@ChannelHandler.Sharable
public class ConcurrencyHandler extends SimpleChannelInboundHandler {
    int count =0;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(count++);
    }
}

ConcurrencyHandler 类显然是一个非并发安全的类,但是因为使用 @ChannelHandler.Sharable 注解,Netty 并不会抛出任何异常。

因此,使用 @ChannelHandler.Sharable 注解仅仅会使 Netty 对多 Pipeline 共享同一 Handler 实例这一情况放弃抛出异常,但是 Handler 的并发安全性仍然依赖于程序员自行确保,Netty 本身并不做任何额外检查。

现在可以回答为什么 Dubbo Reviewer 要求我删去 @ChannelHandler.Sharable 注解了。其原因在于如果使用如下方式添加 Handler,无论是否有 @ChannelHandler.Sharable 注解,Netty 始终会为每一个 ChannelPipeline 创建一个单独的 Channel。

1
2
3
4
5
6
7
8
serverBootstrap.childHandler(new ChannelInitializer<Channel>() {

  @Override
  protected void initChannel(Channel ch) throws Exception {
    //向 Pipeline 中加入新构造的 Handler 实例
    ch.pipeline().addLast(new MyHanlder(););
  }
});

3. 总结

@ChannelHandler.Sharable 注解的作用非常小:如果 Handler 使用这个注解,那么当 Netty 发现多 Pipeline 将使用同一个 Handler 时就不会抛出异常,仅此而已。

但是,单例 Handler 的线程并发安全性 Netty 框架本身并不做任何假设与支持,其完全依赖于程序员自行的设计。

即使是被 @ChannelHandler.Sharable 注解标记的 Handler,如果是通过 new 构造器方式加入到 Pipeline,那么 Handler 将不会是单例模式,每一个 Pipeline 将使用不同的 Handler 实例。

谨记,@ChannelHandler.Sharable 注解作用非常小,几乎仅仅起到标记作用。

补充:什么时候该使用 @ChannelHandler.Sharable 注解?

  • 有需求:试图利用单例来减少 GC、内存,毕竟为每一个连接重新构造一个 Handler 是一笔不小的开销;
  • 有场景:需要跨 Channel 收集一些统计信息,例如总的 TCP 连接数;
  • 有状态时并发安全:自己确保 Handler 并发安全,了解 Netty 的并发安全模型;
  • 无状态:如果 Handler 与连接状态无关,那么也无需线程安全确保机制;