一、NIOServer示例代码
[code]package io.netty.example.javanio_test;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;public class NIOServer {private Selector selector;public static void main(String[] args) throws IOException {NIOServer server = new NIOServer();//初始化服务器,绑定8888端口server.initServer(8888); //监听客户端请求server.listen();}public void initServer(int port) throws IOException{// 打开ServerSocket通道ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //设置为非阻塞模式serverSocketChannel.configureBlocking(false);//绑定方式是将ServerSocketChannel对应的FileDescriptor与本地的地址和端口进行绑定serverSocketChannel.socket().bind(new InetSocketAddress(port));// 获取一个选择器this.selector = Selector.open();// 将通道管理器与该通道进行绑定,并为该通道注册SelectionKey.OP_ACCEPT事件// 注册事件后,当该事件触发时会使selector.select()返回,// 否则selector.select()一直阻塞serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);}public void listen() throws IOException{System.out.println(\"启动服务器!\");while (true) {// select()方法一直阻塞直到有注册的通道准备好了才会返回selector.select();Iterator<?> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = (SelectionKey) iterator.next();// 删除已选的key,防止重复处理iterator.remove();handler(key);}}}public void handler(SelectionKey key)throws IOException{if (key.isAcceptable()) {//接收客户端请求handlerAccept(key);}else if (key.isReadable()){//处理读handlerRead(key);}else if (key.isWritable()){//处理写System.out.println(\"can write!\");}else if (key.isConnectable()){//连接System.out.println(\"is connectable\");}}public void handlerAccept(SelectionKey key) throws IOException{// 从SelectionKey中获取ServerSocketChannelServerSocketChannel server = (ServerSocketChannel) key.channel();// 获取SocketChannelSocketChannel socketChannel = server.accept();// 设置成非阻塞socketChannel.configureBlocking(false);System.out.println(\"与客户端建立连接\");// 为socketChannel通道建立 OP_READ 读操作,使客户端发送的内容可以被读到socketChannel.register(selector, SelectionKey.OP_READ);// 往客户端发送发送信息socketChannel.write(ByteBuffer.wrap(\"connected\\n\".getBytes()));}public void handlerRead(SelectionKey key)throws IOException{SocketChannel socketChannel = (SocketChannel) key.channel();// 创建读取缓冲区ByteBuffer byteBuffer = ByteBuffer.allocate(512);// 从通道读取可读取的字节数try {int readcount = socketChannel.read(byteBuffer);if (readcount > 0) {byte[] data = byteBuffer.array();String msg = new String(data);System.out.println(\"服务端收到的信息为:\\n\" + msg);ByteBuffer outBuffer = ByteBuffer.wrap(\"收到\\n\".getBytes());socketChannel.write(outBuffer);} else {System.out.println(\"客户端异常退出\");}} catch (IOException e) {System.out.println(\"异常信息:\\n\" + e.getMessage());key.cancel();}}}
二、代码分析
1、initServer方法分析
(1)打开ServerSocketChannel
[code]ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
1)调用ServerSocketChannel.open()方法
[code]public static ServerSocketChannel open() throws IOException {return SelectorProvider.provider().openServerSocketChannel();}
- 创建SelectorPriverder,调用SelectorProvider的provider方法。本例是在windows系统上运行的
[code]public static SelectorProvider provider() {synchronized (lock) {if (provider != null)return provider; //安全访问return AccessController.doPrivileged(new PrivilegedAction<SelectorProvider>() {public SelectorProvider run() { //按property方式加载provider,不做具体分析if (loadProviderFromProperty())return provider; //按service方式加载provider,不做具体分析if (loadProviderAsService())return provider; //默认的provider创建方式,本例采用的是如下的创建方式provider = sun.nio.ch.DefaultSelectorProvider.create();return provider;}});}}
- 调用DefaultSelectorProvider的create()方法,使用new方式创建WindowsSelectorProvider
[code]public static SelectorProvider create() {return new WindowsSelectorProvider();}
- 回过头来看openServerSocketChannel方法,实际上调用到了SelectorProviderImpl的对应的方法。使用new方式创建ServerSocketChannelImpl
[code]public ServerSocketChannel openServerSocketChannel() throws IOException {return new ServerSocketChannelImpl(this);}
- 接下来看ServerSocketChannelImpl的构造方法
[code]ServerSocketChannelImpl(SelectorProvider var1) throws IOException { //调用父类构造器,将SelectorProvider赋值给父类的属性super(var1); //新建FileDescriptor,此时即将fd与ServerSocketChannel进行绑定this.fd = Net.serverSocket(true); //给fdVal赋值this.fdVal = IOUtil.fdVal(this.fd); //初始化通道状态为0this.state = 0;}
2)配置ServerSocketChannel为非阻塞模式
[code]serverSocketChannel.configureBlocking(false);
3)ServerSocketChannel的绑定
[code]//绑定本地8888端口serverSocketChannel.socket().bind(new InetSocketAddress(port));
- socket的创建,调用ServerSocketChannelImpl的socket()方法
[code]public ServerSocket socket() {synchronized(this.stateLock) {if (this.socket == null) {//此处ServerSocketAdaptor继承了ServerSocket,并将ServerSocketChannelImpl赋值给ServerSocketAdaptor属性this.socket = ServerSocketAdaptor.create(this);}return this.socket;}}
[code]public void bind(SocketAddress endpoint) throws IOException {bind(endpoint, 50);}
[code]public void bind(SocketAddress endpoint, int backlog) throws IOException { //判断Socket是否关闭if (isClosed())throw new SocketException(\"Socket is closed\"); //判断是否已经绑定if (!oldImpl && isBound())throw new SocketException(\"Already bound\"); //判断绑定的地址是否为空,如果为空则新建地址,并绑定0端口if (endpoint == null)endpoint = new InetSocketAddress(0); //判断是否为InetSocketAddress类型实例if (!(endpoint instanceof InetSocketAddress))throw new IllegalArgumentException(\"Unsupported address type\");InetSocketAddress epoint = (InetSocketAddress) endpoint; //判断地址是否未解析if (epoint.isUnresolved())throw new SocketException(\"Unresolved address\");if (backlog < 1)backlog = 50;try {SecurityManager security = System.getSecurityManager();if (security != null)security.checkListen(epoint.getPort()); //getImpl()方法所创建的实例未SocksSocketImpl,SocksSocketImpl继承了PlainSocketImpl,PlainSocketImpl又继承了 AbstractPlainSocketImplgetImpl().bind(epoint.getAddress(), epoint.getPort()); //监听getImpl().listen(backlog);bound = true;} catch(SecurityException e) {bound = false;throw e;} catch(IOException e) {bound = false;throw e;}}
- PlainSocketImpl的构造方法,本例中impl创建为DualStackPlainSocketImpl
[code]PlainSocketImpl() {if (useDualStackImpl) {impl = new DualStackPlainSocketImpl(exclusiveBind);} else {impl = new TwoStacksPlainSocketImpl(exclusiveBind);}}
- 回头看getImpl().bind(…)方法,实际上调用到AbstractSocksSocketImpl的bind方法
[code]protected synchronized void bind(InetAddress address, int lport)throws IOException{synchronized (fdLock) {if (!closePending && (socket == null || !socket.isBound())) {NetHooks.beforeTcpBind(fd, address, lport);}}socketBind(address, lport);if (socket != null)socket.setBound();if (serverSocket != null)serverSocket.setBound();}
- socketBind(address,lport)方法分析,实际调用到DualStackPlainSocketImpl的socketBind(…)方法
[code]void socketBind(InetAddress address, int port) throws IOException { //校验并获取到本地的FileDescriptorint nativefd = checkAndReturnNativeFD();if (address == null)throw new NullPointerException(\"inet address argument is null.\");//调用native的bind0方法将FileDescriptor的fd与指定的地址和端口绑定bind0(nativefd, address, port, exclusiveBind);if (port == 0) {localport = localPort0(nativefd);} else {localport = port;}this.address = address;}
[code]static native void bind0(int fd, InetAddress localAddress, int localport,boolean exclBind)throws IOException;
4)打开选择器
[code]// 获取一个选择器this.selector = Selector.open();
|
[code]public static Selector open() throws IOException {return SelectorProvider.provider().openSelector();}
|
- 上文讲过,此例在windows系统,已经创建了SelectorProvider为WindowsSelectorProvider,调用WindowsSelectorProvider的openSelector方法,使用new创建了WindowsSelectorImpl实例。对于WindowsSelectorImpl的构造方法,可以自行研究,这里不再赘述。
[code]public AbstractSelector openSelector() throws IOException {return new WindowsSelectorImpl(this);}
|
5)进行ServerSocketChannel的注册
[code]serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
|
- 调用到SelectableChannel的注册方法
[code]public final SelectionKey register(Selector sel, int ops)throws ClosedChannelException{return register(sel, ops, null);}
|
- 继续调用到AbstractSelectableChannel的register方法
[code]public final SelectionKey register(Selector sel, int ops,Object att)throws ClosedChannelException{synchronized (regLock) {//判断通道是否打开if (!isOpen())throw new ClosedChannelException(); //判断通道感兴趣的事件是否为0if ((ops & ~validOps()) != 0)throw new IllegalArgumentException(); //判断是否阻塞if (blocking)throw new IllegalBlockingModeException(); //在给定的Selector中寻找SelectionKeySelectionKey k = findKey(sel); //key不为空,则将感兴趣的事件设置为Acceptif (k != null) {k.interestOps(ops);k.attach(att);}if (k == null) {// New registrationsynchronized (keyLock) {if (!isOpen())throw new ClosedChannelException(); //key为空,则新建key并注册到selector上k = ((AbstractSelector)sel).register(this, ops, att);addKey(k);}}return k;}}
|
|
[code]private SelectionKey findKey(Selector sel) {synchronized (keyLock) { //keys为通道中维护的SelectionKey数组if (keys == null)return null; //如果keys数组中找个该key所对应的selector为所给定的selector,则返回该keyfor (int i = 0; i < keys.length; i++)if ((keys[i] != null) && (keys[i].selector() == sel))return keys[i];return null;}}
|
- SelectorImpl对应的register新的SelectionKey的逻辑
[code]protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) {if (!(var1 instanceof SelChImpl)) {throw new IllegalSelectorException();} else { //新建一个SelectionKeySelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this);var4.attach(var3);synchronized(this.publicKeys) { //进行注册this.implRegister(var4);}//设置key感兴趣的事件var4.interestOps(var2);return var4;}}
|
- 调用WindowsSelectorImpl的implRegister方法
[code]protected void implRegister(SelectionKeyImpl var1) {synchronized(this.closeLock) {if (this.pollWrapper == null) {throw new ClosedSelectorException();} else {this.growIfNeeded(); //channelArray数组中新增一个Channelthis.channelArray[this.totalChannels] = var1; //设置Selectionkey的indexvar1.setIndex(this.totalChannels); //将SelectionKey放入fdMap中this.fdMap.put(var1); //将SelectionKey放入key set 中this.keys.add(var1); //将SelectionKey放入pollWrapper中this.pollWrapper.addEntry(this.totalChannels, var1); //selector的channel总数加1++this.totalChannels;}}}
|
2、listen方法分析
(1)selector.select()方法分析
[code]public int select() throws IOException {return this.select(0L);}
|
- 继续调用SelectorImpl的register重载方法
[code]public int select(long var1) throws IOException {if (var1 < 0L) {throw new IllegalArgumentException(\"Negative timeout\");} else { //进行真正的选择return this.lockAndDoSelect(var1 == 0L ? -1L : var1);}}
|
- 继续调用SelectorImpl的lockAndDoSelect方法
[code]private int lockAndDoSelect(long var1) throws IOException { //对该Selector加锁synchronized(this) {if (!this.isOpen()) {throw new ClosedSelectorException();} else {int var10000; //对publicKeys加锁synchronized(this.publicKeys) {synchronized(this.publicSelectedKeys) {var10000 = this.doSelect(var1);}}return var10000;}}}
|
- 继续调用到WindowsSelectorImpl的doSelect方法
[code]protected int doSelect(long var1) throws IOException {if (this.channelArray == null) {throw new ClosedSelectorException();} else {this.timeout = var1; //处理已经注销的SelectionKeythis.processDeregisterQueue();if (this.interruptTriggered) {this.resetWakeupSocket();return 0;} else { //调整线程数this.adjustThreadsCount(); //重置完成锁this.finishLock.reset(); //启动线程,在windows系统中,每个线程最多轮询1024个句柄this.startLock.startThreads();try { //标识开始,与end方法成对使用this.begin();try { //进行真正的轮询this.subSelector.poll();} catch (IOException var7) {this.finishLock.setException(var7);}if (this.threads.size() > 0) {this.finishLock.waitForHelperThreads();}} finally { //结束this.end();}//检查异常this.finishLock.checkForException(); //处理注销的SelectionKeythis.processDeregisterQueue(); //更新被选择的keyint var3 = this.updateSelectedKeys();this.resetWakeupSocket();return var3;}}}
|
|
- processDeregisterQueue方法分析
[code]void processDeregisterQueue() throws IOException { //获取selector中的canceled setSet var1 = this.cancelledKeys(); //对canceledSet加锁synchronized(var1) {if (!var1.isEmpty()) {Iterator var3 = var1.iterator();//循环set中的元素while(var3.hasNext()) {SelectionKeyImpl var4 = (SelectionKeyImpl)var3.next();try { //具体的注销实现,可以自行了解this.implDereg(var4);} catch (SocketException var11) {throw new IOException(\"Error deregistering key\", var11);} finally { //将SelectionKey从set中移除var3.remove();}}}}}
|
[code]private void adjustThreadsCount() {int var1; //如果threadCount大于threads的size,则新建一个线程,并将线程加入到threads中,然后启动if (this.threadsCount > this.threads.size()) {for(var1 = this.threads.size(); var1 < this.threadsCount; ++var1) {WindowsSelectorImpl.SelectThread var2 = new WindowsSelectorImpl.SelectThread(var1);this.threads.add(var2);var2.setDaemon(true);var2.start();} //如果threadCount小于threds的size,则需要将threads的数目缩减到与threadCount相同。} else if (this.threadsCount < this.threads.size()) {for(var1 = this.threads.size() - 1; var1 >= this.threadsCount; --var1) {((WindowsSelectorImpl.SelectThread)this.threads.remove(var1)).makeZombie();}}}
|
[code]private void reset() { //设置需要完成的线程数this.threadsToFinish = WindowsSelectorImpl.this.threads.size();}
|
- this.startLock.startThreads方法
[code]private synchronized void startThreads() { //对运行的线程数加1++this.runsCounter; //进行广播通知this.notifyAll();}
|
[code]protected final void begin() {if (interruptor == null) {interruptor = new Interruptible() {public void interrupt(Thread ignore) {AbstractSelector.this.wakeup();}};} //将interruptor赋值给给Thread的Interrupbible(即blocker)AbstractInterruptibleChannel.blockedOn(interruptor);Thread me = Thread.currentThread();if (me.isInterrupted()) //如果线程中断则进行中断操作interruptor.interrupt(me);}
|
- this.subSelector.poll()方法
[code]private int poll() throws IOException { //调用native的poll0方法,重点关注readFds,writeFds,exceptFds,后续会用到return this.poll0(WindowsSelectorImpl.this.pollWrapper.pollArrayAddress, Math.min(WindowsSelectorImpl.this.totalChannels, 1024), this.readFds, this.writeFds, this.exceptFds, WindowsSelectorImpl.this.timeout);}
|
- this.updateSelectedKeys()方法
[code]private int updateSelectedKeys() {++this.updateCount;byte var1 = 0; //处理当前线程轮询的keyint var4 = var1 + this.subSelector.processSelectedKeys(this.updateCount);WindowsSelectorImpl.SelectThread var3; //处理其他线程轮循到SelectionKeyfor(Iterator var2 = this.threads.iterator(); var2.hasNext(); var4 += var3.subSelector.processSelectedKeys(this.updateCount)) {var3 = (WindowsSelectorImpl.SelectThread)var2.next();}return var4;}
|
- this.subSelector.processSelectedKeys(…)
[code]private int processSelectedKeys(long var1) {byte var3 = 0; //处理可读的FileDescriptorint var4 = var3 + this.processFDSet(var1, this.readFds, Net.POLLIN, false); //处理可写的FileDescriptorvar4 += this.processFDSet(var1, this.writeFds, Net.POLLCONN | Net.POLLOUT, false); //处理例外的FileDescriptorvar4 += this.processFDSet(var1, this.exceptFds, Net.POLLIN | Net.POLLCONN | Net.POLLOUT, true);return var4;}private int processFDSet(long var1, int[] var3, int var4, boolean var5) {int var6 = 0;for(int var7 = 1; var7 <= var3[0]; ++var7) {int var8 = var3[var7];if (var8 == WindowsSelectorImpl.this.wakeupSourceFd) {synchronized(WindowsSelectorImpl.this.interruptLock) {WindowsSelectorImpl.this.interruptTriggered = true;}} else { //根据FileDescriptor从fdMap中获取到MapEntryWindowsSelectorImpl.MapEntry var9 = WindowsSelectorImpl.this.fdMap.get(var8); //如果MapEntry 不为空if (var9 != null) { //获取SelectionKeySelectionKeyImpl var10 = var9.ski; //!var5 或者SelectionKey所对应的channel不是SocketChannelImpl的实例或者不丢弃紧急数据if (!var5 || !(var10.channel() instanceof SocketChannelImpl) || !WindowsSelectorImpl.this.discardUrgentData(var8)) {if (WindowsSelectorImpl.this.selectedKeys.contains(var10)) { //清除数不等于更新数if (var9.clearedCount != var1) { //转换并设置准备好的感兴趣事件if (var10.channel.translateAndSetReadyOps(var4, var10) && var9.updateCount != var1) { //更新更新数var9.updateCount = var1;++var6;}} else if (var10.channel.translateAndUpdateReadyOps(var4, var10) && var9.updateCount != var1) {var9.updateCount = var1;++var6;}var9.clearedCount = var1;} else {if (var9.clearedCount != var1) { //转换并设置准备好的感兴趣事件var10.channel.translateAndSetReadyOps(var4, var10);if ((var10.nioReadyOps() & var10.nioInterestOps()) != 0) { //将准备好的SelectionKey添加到selectedKeys中WindowsSelectorImpl.this.selectedKeys.add(var10);var9.updateCount = var1;++var6;}} else {var10.channel.translateAndUpdateReadyOps(var4, var10);if ((var10.nioReadyOps() & var10.nioInterestOps()) != 0) { //将准备好的SelectionKey添加到selectedKeys中WindowsSelectorImpl.this.selectedKeys.add(var10);var9.updateCount = var1;++var6;}}var9.clearedCount = var1;}}}}}return var6;}
|
- processFDSet(…)方法的作用即是将准备好的SelectionKey放入到selectedKeys中