类AbstractBootstrap
类定义
AbstractBootstrap是Bootstrap的基类, 类定义如下:
package io.netty.bootstrap;
public abstract class AbstractBootstrap
<B extends AbstractBootstrap<B, C>, C extends Channel>
implements Cloneable {}
类定义中的泛型B要求是AbstractBootstrap的子类, 而泛型C要求是Channel的子类.
注意这里的泛型的用法,非常的巧妙。
成员变量
group属性
volatile EventLoopGroup group;
public B group(EventLoopGroup group) {
if (group == null) {
throw new NullPointerException("group");
}
if (this.group != null) {
throw new IllegalStateException("group set already");
}
this.group = group;
return (B) this;
}
public EventLooattrspGroup group() {
return group;
}
注意this.group只能设置一次, 这意味着group(group)方法只能被调用一次.
localAddress属性
localAddress用于绑定本地终端, 有多个设值的方法:
private volatile SocketAddress localAddress;
public B localAddress(SocketAddress localAddress) {
this.localAddress = localAddress;
return (B) this;
}
public B localAddress(int inetPort) {
return localAddress(new InetSocketAddress(inetPort));
}
public B localAddress(String inetHost, int inetPort) {
return localAddress(new InetSocketAddress(inetHost, inetPort));
}
public B localAddress(InetAddress inetHost, int inetPort) {
return localAddress(new InetSocketAddress(inetHost, inetPort));
}
final SocketAddress localAddress() {
return localAddress;
}
这些重载的localAddress(), 最终都指向了InetSocketAddress的几个构造函数.
options属性
options属性是一个LinkedHashMap, option()方法用于设置单个的key/value, 如果value为null则删除该key.
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
public <T> B option(ChannelOption<T> option, T value) {
if (option == null) {
throw new NullPointerException("option");
}
if (value == null) {
synchronized (options) {
options.remove(option);
}
} else {
synchronized (options) {
options.put(option, value);
}
}
return (B) this;
}
final Map<ChannelOption<?>, Object> options() {
return options;
}
attrs属性
attrs和options属性类似.
private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
public <T> B attr(AttributeKey<T> key, T value) {
if (key == null) {
throw new NullPointerException("key");
}
if (value == null) {
synchronized (attrs) {
attrs.remove(key);
}
} else {
synchronized (attrs) {
attrs.put(key, value);
}
}
return (B) this;
}
final Map<AttributeKey<?>, Object> attrs() {
return attrs;
}
handler属性
private volatile ChannelHandler handler;
public B handler(ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
return (B) this;
}
final ChannelHandler handler() {
return handler;
}
channelFactory属性
新旧两个ChannelFactory
channelFactory这个属性有点麻烦, 根源在于ChannelFactory这个类,netty中有新旧两个ChannelFactory,具体介绍见 Channel Factory
混合使用
但是现在的情况是内部已经转为使用新类, 对外的接口还是继续保持使用原来的旧类, 因此代码有些混乱:
// 这里的channelFactory的类型定义用的是旧类,因此需要加SuppressWarnings
@SuppressWarnings("deprecation")
private volatile ChannelFactory< ? extends C> channelFactory;
// 返回的channelFactory也是用的旧类, 没的说, 继续SuppressWarnings
@SuppressWarnings("deprecation")
final ChannelFactory< ? extends C> channelFactory() {
return channelFactory;
}
// 这个方法的参数是旧的"io.netty.bootstrap.ChannelFactory",已经被标志为"@Deprecated",尽量用下面的方法
@Deprecated
public B channelFactory(ChannelFactory< ? extends C> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return (B) this;
}
// 这个方法是现在推荐使用的设置channelFactory的方法, 使用新类"io.netty.channel.ChannelFactory"
@SuppressWarnings({ "unchecked", "deprecation" })
public B channelFactory(io.netty.channel.ChannelFactory< ? extends C> channelFactory) {
// 但是底层的实现还是调用回上面被废弃的channelFactory()方法
// 因为新类是继承自旧类的,所有只要简单转一下类型就好
return channelFactory((ChannelFactory<C>) channelFactory);
}
此外还有一个channel()方法可以非常方便的设置channelFactory:
public B channel(Class< ? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
类方法
validate()
validate()用于检验所有的参数, 实际代码中检查的是group和channelFactory两个参数, 这两个参数必须设置不能为空:
public B validate() {
if (group == null) {
throw new IllegalStateException("group not set");
}
if (channelFactory == null) {
throw new IllegalStateException("channel or channelFactory not set");
}
return (B) this;
}
register()
register()方法创建一个新的Channel并将它注册到EventLoop, 在执行前会调用validate()方法做前置检查:
public ChannelFuture register() {
validate();
return initAndRegister();
}
initAndRegister()是关键代码, 细细读一下:
final ChannelFuture initAndRegister() {
// 创建一个新的Channel
final Channel channel = channelFactory().newChannel();
try {
// 调用抽象方法, 子类来做初始化
init(channel);
} catch (Throwable t) {
// 如果出错, 强行关闭这个channel
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 创建成功则将这个channel注册到eventloop中
ChannelFuture regFuture = group().register(channel);
// 如果注册出错
if (regFuture.cause() != null) {
// 判断是否已经注册
if (channel.isRegistered()) {
// channel已经注册的就关闭
channel.close();
} else {
// 还没有注册的就强行关闭
channel.unsafe().closeForcibly();
}
}
// 如果代码走到这里而且promise没有失败, 那么是下面两种情况之一:
// 1) 如果我们尝试了从event loop中注册, 那么现在注册已经完成
// 现在可以安全的尝试 bind()或者connect(), 因为channel已经注册成功
// 2) 如果我们尝试了从另外一个线程中注册, 注册请求已经成功添加到event loop的任务队列中等待后续执行
// 现在可以安全的尝试 bind()或者connect():
// 因为 bind() 或 connect() 会在安排的注册任务之后执行
// 而register(), bind(), 和 connect() 都被确认是同一个线程
return regFuture;
}
中途调用的init()方法定义如下, 后面看具体子类代码时再展开.
abstract void init(Channel channel) throws Exception;
bind()
bind()方法有多个重载, 差异只是bind操作所需的InetSocketAddress参数从何而来而已:
从属性
this.localAddress
来这个时候bind()方法无需参数, 直接使用属性this.localAddress, 当前调用之前
this.localAddress
必须有赋值(通过函数localAddress()):public ChannelFuture bind() { validate(); SocketAddress localAddress = this.localAddress; if (localAddress == null) { throw new IllegalStateException("localAddress not set"); } return doBind(localAddress); }
从bind()方法的输入参数中来
在输入参数中来直接指定localAddress:
public ChannelFuture bind(SocketAddress localAddress) { validate(); if (localAddress == null) { throw new NullPointerException("localAddress"); } return doBind(localAddress); }
另外为了方便, 重载了下面三个方法, 用不同的方式来创建InetSocketAddress而已:
public ChannelFuture bind(int inetPort) { return bind(new InetSocketAddress(inetPort)); } public ChannelFuture bind(String inetHost, int inetPort) { return bind(new InetSocketAddress(inetHost, inetPort)); } public ChannelFuture bind(InetAddress inetHost, int inetPort) { return bind(new InetSocketAddress(inetHost, inetPort)); }
注: 使用带参数的bind()方法, 忽略了localAddress()设定的参数. 而且也没有设置localAddress属性. 这里的差异, 后面留意.
继续看doBind()方法的细节, 这个依然是这个类的核心内容:
private ChannelFuture doBind(final SocketAddress localAddress) {
// 调用initAndRegister()方法, 先初始化channel,并注册到event loop
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
// 检查注册的channel是否出错
if (regFuture.cause() != null) {
return regFuture;
}
// 检查注册操作是否完成
if (regFuture.isDone()) {
// 如果完成
// 在这个点上我们知道注册已经完成并且成功
// 继续bind操作, 创建一个ChannelPromise
ChannelPromise promise = channel.newPromise();
// 调用doBind0()方法来继续真正的bind操作
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// 通常这个时候注册的future应该都已经完成,但是万一没有, 我们也需要处理
// 为这个channel创建一个PendingRegistrationPromise
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
// 然后给注册的future添加一个listener, 在operationComplete()回调时
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
// 检查是否出错
if (cause != null) {
// 在event loop上注册失败, 因此直接让ChannelPromise失败, 避免一旦我们试图访问这个channel的eventloop导致IllegalStateException
promise.setFailure(cause);
} else {
// 注册已经成功, 因此设置正确的executor以便使用
// 注: 这里已经以前有过一个bug, 有issue记录
// See https://github.com/netty/netty/issues/2586
promise.executor = channel.eventLoop();
}
// 调用doBind0()方法来继续真正的bind操作
doBind0(regFuture, channel, localAddress, promise);
}
});
return promise;
}
}
关键的doBind0()方法:
private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
// 这个方法在channelRegistered()方法触发前被调用.
// 让handler有机会在它的channelRegistered()实现中构建pipeline
// 给channel的event loop增加一个一次性任务
channel.eventLoop().execute(new OneTimeTask() {
@Override
public void run() {
// 检查注册是否成功
if (regFuture.isSuccess()) {
// 如果成功则绑定localAddress到channel
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
// 如果不成功则设置错误到promise
promise.setFailure(regFuture.cause());
}
}
});
}