AI智能
改变未来

以NIOServer示例代码分析java NIO的底层原理

一、NIOServer示例代码

[code]package io.netty.example.javanio_test;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;public class NIOServer {private Selector selector;public static void main(String[] args) throws IOException {NIOServer server = new NIOServer();//初始化服务器,绑定8888端口server.initServer(8888);        //监听客户端请求server.listen();}public void initServer(int port) throws IOException{// 打开ServerSocket通道ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();        //设置为非阻塞模式serverSocketChannel.configureBlocking(false);//绑定方式是将ServerSocketChannel对应的FileDescriptor与本地的地址和端口进行绑定serverSocketChannel.socket().bind(new InetSocketAddress(port));// 获取一个选择器this.selector = Selector.open();// 将通道管理器与该通道进行绑定,并为该通道注册SelectionKey.OP_ACCEPT事件// 注册事件后,当该事件触发时会使selector.select()返回,// 否则selector.select()一直阻塞serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);}public void listen() throws IOException{System.out.println(\"启动服务器!\");while (true) {// select()方法一直阻塞直到有注册的通道准备好了才会返回selector.select();Iterator<?> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = (SelectionKey) iterator.next();// 删除已选的key,防止重复处理iterator.remove();handler(key);}}}public void handler(SelectionKey key)throws IOException{if (key.isAcceptable()) {//接收客户端请求handlerAccept(key);}else if (key.isReadable()){//处理读handlerRead(key);}else if (key.isWritable()){//处理写System.out.println(\"can write!\");}else if (key.isConnectable()){//连接System.out.println(\"is connectable\");}}public void handlerAccept(SelectionKey key) throws IOException{// 从SelectionKey中获取ServerSocketChannelServerSocketChannel server = (ServerSocketChannel) key.channel();// 获取SocketChannelSocketChannel socketChannel = server.accept();// 设置成非阻塞socketChannel.configureBlocking(false);System.out.println(\"与客户端建立连接\");// 为socketChannel通道建立 OP_READ 读操作,使客户端发送的内容可以被读到socketChannel.register(selector, SelectionKey.OP_READ);// 往客户端发送发送信息socketChannel.write(ByteBuffer.wrap(\"connected\\n\".getBytes()));}public void handlerRead(SelectionKey key)throws IOException{SocketChannel socketChannel = (SocketChannel) key.channel();// 创建读取缓冲区ByteBuffer byteBuffer = ByteBuffer.allocate(512);// 从通道读取可读取的字节数try {int readcount = socketChannel.read(byteBuffer);if (readcount > 0) {byte[] data = byteBuffer.array();String msg = new String(data);System.out.println(\"服务端收到的信息为:\\n\" + msg);ByteBuffer outBuffer = ByteBuffer.wrap(\"收到\\n\".getBytes());socketChannel.write(outBuffer);} else {System.out.println(\"客户端异常退出\");}} catch (IOException e) {System.out.println(\"异常信息:\\n\" + e.getMessage());key.cancel();}}}

 

二、代码分析

1、initServer方法分析

(1)打开ServerSocketChannel

[code]ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        1)调用ServerSocketChannel.open()方法

[code]public static ServerSocketChannel open() throws IOException {return SelectorProvider.provider().openServerSocketChannel();}
  • 创建SelectorPriverder,调用SelectorProvider的provider方法。本例是在windows系统上运行的
[code]public static SelectorProvider provider() {synchronized (lock) {if (provider != null)return provider;        //安全访问return AccessController.doPrivileged(new PrivilegedAction<SelectorProvider>() {public SelectorProvider run() {                        //按property方式加载provider,不做具体分析if (loadProviderFromProperty())return provider;                        //按service方式加载provider,不做具体分析if (loadProviderAsService())return provider;                        //默认的provider创建方式,本例采用的是如下的创建方式provider = sun.nio.ch.DefaultSelectorProvider.create();return provider;}});}}
  •    调用DefaultSelectorProvider的create()方法,使用new方式创建WindowsSelectorProvider
[code]public static SelectorProvider create() {return new WindowsSelectorProvider();}
  • 回过头来看openServerSocketChannel方法,实际上调用到了SelectorProviderImpl的对应的方法。使用new方式创建ServerSocketChannelImpl
[code]public ServerSocketChannel openServerSocketChannel() throws IOException {return new ServerSocketChannelImpl(this);}
  • 接下来看ServerSocketChannelImpl的构造方法
[code]ServerSocketChannelImpl(SelectorProvider var1) throws IOException {    //调用父类构造器,将SelectorProvider赋值给父类的属性super(var1);    //新建FileDescriptor,此时即将fd与ServerSocketChannel进行绑定this.fd = Net.serverSocket(true);    //给fdVal赋值this.fdVal = IOUtil.fdVal(this.fd);    //初始化通道状态为0this.state = 0;}

2)配置ServerSocketChannel为非阻塞模式

[code]serverSocketChannel.configureBlocking(false);

3)ServerSocketChannel的绑定

[code]//绑定本地8888端口serverSocketChannel.socket().bind(new InetSocketAddress(port));
  • socket的创建,调用ServerSocketChannelImpl的socket()方法
[code]public ServerSocket socket() {synchronized(this.stateLock) {if (this.socket == null) {//此处ServerSocketAdaptor继承了ServerSocket,并将ServerSocketChannelImpl赋值给ServerSocketAdaptor属性this.socket = ServerSocketAdaptor.create(this);}return this.socket;}}
  • 再看ServerSocket的bind方法
[code]public void bind(SocketAddress endpoint) throws IOException {bind(endpoint, 50);}
  • 继续分析重载的bind方法
[code]public void bind(SocketAddress endpoint, int backlog) throws IOException {    //判断Socket是否关闭if (isClosed())throw new SocketException(\"Socket is closed\");    //判断是否已经绑定if (!oldImpl && isBound())throw new SocketException(\"Already bound\");    //判断绑定的地址是否为空,如果为空则新建地址,并绑定0端口if (endpoint == null)endpoint = new InetSocketAddress(0);    //判断是否为InetSocketAddress类型实例if (!(endpoint instanceof InetSocketAddress))throw new IllegalArgumentException(\"Unsupported address type\");InetSocketAddress epoint = (InetSocketAddress) endpoint;    //判断地址是否未解析if (epoint.isUnresolved())throw new SocketException(\"Unresolved address\");if (backlog < 1)backlog = 50;try {SecurityManager security = System.getSecurityManager();if (security != null)security.checkListen(epoint.getPort());        //getImpl()方法所创建的实例未SocksSocketImpl,SocksSocketImpl继承了PlainSocketImpl,PlainSocketImpl又继承了                AbstractPlainSocketImplgetImpl().bind(epoint.getAddress(), epoint.getPort());        //监听getImpl().listen(backlog);bound = true;} catch(SecurityException e) {bound = false;throw e;} catch(IOException e) {bound = false;throw e;}}
  • PlainSocketImpl的构造方法,本例中impl创建为DualStackPlainSocketImpl
[code]PlainSocketImpl() {if (useDualStackImpl) {impl = new DualStackPlainSocketImpl(exclusiveBind);} else {impl = new TwoStacksPlainSocketImpl(exclusiveBind);}}
  • 回头看getImpl().bind(…)方法,实际上调用到AbstractSocksSocketImpl的bind方法
[code]protected synchronized void bind(InetAddress address, int lport)throws IOException{synchronized (fdLock) {if (!closePending && (socket == null || !socket.isBound())) {NetHooks.beforeTcpBind(fd, address, lport);}}socketBind(address, lport);if (socket != null)socket.setBound();if (serverSocket != null)serverSocket.setBound();}
  • socketBind(address,lport)方法分析,实际调用到DualStackPlainSocketImpl的socketBind(…)方法
[code]void socketBind(InetAddress address, int port) throws IOException {    //校验并获取到本地的FileDescriptorint nativefd = checkAndReturnNativeFD();if (address == null)throw new NullPointerException(\"inet address argument is null.\");//调用native的bind0方法将FileDescriptor的fd与指定的地址和端口绑定bind0(nativefd, address, port, exclusiveBind);if (port == 0) {localport = localPort0(nativefd);} else {localport = port;}this.address = address;}
  • bind0方法
[code]static native void bind0(int fd, InetAddress localAddress, int localport,boolean exclBind)throws IOException;

4)打开选择器

[code]// 获取一个选择器this.selector = Selector.open();
  • 调用Selector的open方法
[code]public static Selector open() throws IOException {return SelectorProvider.provider().openSelector();}
  • 上文讲过,此例在windows系统,已经创建了SelectorProvider为WindowsSelectorProvider,调用WindowsSelectorProvider的openSelector方法,使用new创建了WindowsSelectorImpl实例。对于WindowsSelectorImpl的构造方法,可以自行研究,这里不再赘述。
[code]public AbstractSelector openSelector() throws IOException {return new WindowsSelectorImpl(this);}

5)进行ServerSocketChannel的注册

[code]serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
  • 调用到SelectableChannel的注册方法
[code]public final SelectionKey register(Selector sel, int ops)throws ClosedChannelException{return register(sel, ops, null);}
  • 继续调用到AbstractSelectableChannel的register方法
[code]public final SelectionKey register(Selector sel, int ops,Object att)throws ClosedChannelException{synchronized (regLock) {//判断通道是否打开if (!isOpen())throw new ClosedChannelException();        //判断通道感兴趣的事件是否为0if ((ops & ~validOps()) != 0)throw new IllegalArgumentException();        //判断是否阻塞if (blocking)throw new IllegalBlockingModeException();        //在给定的Selector中寻找SelectionKeySelectionKey k = findKey(sel);        //key不为空,则将感兴趣的事件设置为Acceptif (k != null) {k.interestOps(ops);k.attach(att);}if (k == null) {// New registrationsynchronized (keyLock) {if (!isOpen())throw new ClosedChannelException();                //key为空,则新建key并注册到selector上k = ((AbstractSelector)sel).register(this, ops, att);addKey(k);}}return k;}}
 
  • findKey(…)方法的逻辑
[code]private SelectionKey findKey(Selector sel) {synchronized (keyLock) {        //keys为通道中维护的SelectionKey数组if (keys == null)return null;        //如果keys数组中找个该key所对应的selector为所给定的selector,则返回该keyfor (int i = 0; i < keys.length; i++)if ((keys[i] != null) && (keys[i].selector() == sel))return keys[i];return null;}}
  • SelectorImpl对应的register新的SelectionKey的逻辑
[code]protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) {if (!(var1 instanceof SelChImpl)) {throw new IllegalSelectorException();} else {        //新建一个SelectionKeySelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this);var4.attach(var3);synchronized(this.publicKeys) {            //进行注册this.implRegister(var4);}//设置key感兴趣的事件var4.interestOps(var2);return var4;}}
  • 调用WindowsSelectorImpl的implRegister方法
[code]protected void implRegister(SelectionKeyImpl var1) {synchronized(this.closeLock) {if (this.pollWrapper == null) {throw new ClosedSelectorException();} else {this.growIfNeeded();            //channelArray数组中新增一个Channelthis.channelArray[this.totalChannels] = var1;            //设置Selectionkey的indexvar1.setIndex(this.totalChannels);            //将SelectionKey放入fdMap中this.fdMap.put(var1);            //将SelectionKey放入key set 中this.keys.add(var1);            //将SelectionKey放入pollWrapper中this.pollWrapper.addEntry(this.totalChannels, var1);            //selector的channel总数加1++this.totalChannels;}}}

2、listen方法分析

(1)selector.select()方法分析

  • 调用到SelectorImpl的select方法
[code]public int select() throws IOException {return this.select(0L);}
  • 继续调用SelectorImpl的register重载方法
[code]public int select(long var1) throws IOException {if (var1 < 0L) {throw new IllegalArgumentException(\"Negative timeout\");} else {        //进行真正的选择return this.lockAndDoSelect(var1 == 0L ? -1L : var1);}}
  • 继续调用SelectorImpl的lockAndDoSelect方法
[code]private int lockAndDoSelect(long var1) throws IOException {    //对该Selector加锁synchronized(this) {if (!this.isOpen()) {throw new ClosedSelectorException();} else {int var10000;            //对publicKeys加锁synchronized(this.publicKeys) {synchronized(this.publicSelectedKeys) {var10000 = this.doSelect(var1);}}return var10000;}}}
  • 继续调用到WindowsSelectorImpl的doSelect方法
[code]protected int doSelect(long var1) throws IOException {if (this.channelArray == null) {throw new ClosedSelectorException();} else {this.timeout = var1;        //处理已经注销的SelectionKeythis.processDeregisterQueue();if (this.interruptTriggered) {this.resetWakeupSocket();return 0;} else {            //调整线程数this.adjustThreadsCount();            //重置完成锁this.finishLock.reset();            //启动线程,在windows系统中,每个线程最多轮询1024个句柄this.startLock.startThreads();try {                //标识开始,与end方法成对使用this.begin();try {                    //进行真正的轮询this.subSelector.poll();} catch (IOException var7) {this.finishLock.setException(var7);}if (this.threads.size() > 0) {this.finishLock.waitForHelperThreads();}} finally {                //结束this.end();}//检查异常this.finishLock.checkForException();            //处理注销的SelectionKeythis.processDeregisterQueue();            //更新被选择的keyint var3 = this.updateSelectedKeys();this.resetWakeupSocket();return var3;}}}
 
  • processDeregisterQueue方法分析
[code]void processDeregisterQueue() throws IOException {    //获取selector中的canceled setSet var1 = this.cancelledKeys();    //对canceledSet加锁synchronized(var1) {if (!var1.isEmpty()) {Iterator var3 = var1.iterator();//循环set中的元素while(var3.hasNext()) {SelectionKeyImpl var4 = (SelectionKeyImpl)var3.next();try {                    //具体的注销实现,可以自行了解this.implDereg(var4);} catch (SocketException var11) {throw new IOException(\"Error deregistering key\", var11);} finally {                    //将SelectionKey从set中移除var3.remove();}}}}}
  • adjustThreadsCount方法分析
[code]private void adjustThreadsCount() {int var1;    //如果threadCount大于threads的size,则新建一个线程,并将线程加入到threads中,然后启动if (this.threadsCount > this.threads.size()) {for(var1 = this.threads.size(); var1 < this.threadsCount; ++var1) {WindowsSelectorImpl.SelectThread var2 = new WindowsSelectorImpl.SelectThread(var1);this.threads.add(var2);var2.setDaemon(true);var2.start();}    //如果threadCount小于threds的size,则需要将threads的数目缩减到与threadCount相同。} else if (this.threadsCount < this.threads.size()) {for(var1 = this.threads.size() - 1; var1 >= this.threadsCount; --var1) {((WindowsSelectorImpl.SelectThread)this.threads.remove(var1)).makeZombie();}}}
  • this.finishLock.reset()
[code]private void reset() {    //设置需要完成的线程数this.threadsToFinish = WindowsSelectorImpl.this.threads.size();}
  • this.startLock.startThreads方法
[code]private synchronized void startThreads() {    //对运行的线程数加1++this.runsCounter;    //进行广播通知this.notifyAll();}
  • this.begin方法
[code]protected final void begin() {if (interruptor == null) {interruptor = new Interruptible() {public void interrupt(Thread ignore) {AbstractSelector.this.wakeup();}};}    //将interruptor赋值给给Thread的Interrupbible(即blocker)AbstractInterruptibleChannel.blockedOn(interruptor);Thread me = Thread.currentThread();if (me.isInterrupted())        //如果线程中断则进行中断操作interruptor.interrupt(me);}
  • this.subSelector.poll()方法
[code]private int poll() throws IOException {    //调用native的poll0方法,重点关注readFds,writeFds,exceptFds,后续会用到return this.poll0(WindowsSelectorImpl.this.pollWrapper.pollArrayAddress, Math.min(WindowsSelectorImpl.this.totalChannels, 1024), this.readFds, this.writeFds, this.exceptFds, WindowsSelectorImpl.this.timeout);}
  • this.updateSelectedKeys()方法
[code]private int updateSelectedKeys() {++this.updateCount;byte var1 = 0;    //处理当前线程轮询的keyint var4 = var1 + this.subSelector.processSelectedKeys(this.updateCount);WindowsSelectorImpl.SelectThread var3;    //处理其他线程轮循到SelectionKeyfor(Iterator var2 = this.threads.iterator(); var2.hasNext(); var4 += var3.subSelector.processSelectedKeys(this.updateCount)) {var3 = (WindowsSelectorImpl.SelectThread)var2.next();}return var4;}
  • this.subSelector.processSelectedKeys(…)
[code]private int processSelectedKeys(long var1) {byte var3 = 0;    //处理可读的FileDescriptorint var4 = var3 + this.processFDSet(var1, this.readFds, Net.POLLIN, false);    //处理可写的FileDescriptorvar4 += this.processFDSet(var1, this.writeFds, Net.POLLCONN | Net.POLLOUT, false);    //处理例外的FileDescriptorvar4 += this.processFDSet(var1, this.exceptFds, Net.POLLIN | Net.POLLCONN | Net.POLLOUT, true);return var4;}private int processFDSet(long var1, int[] var3, int var4, boolean var5) {int var6 = 0;for(int var7 = 1; var7 <= var3[0]; ++var7) {int var8 = var3[var7];if (var8 == WindowsSelectorImpl.this.wakeupSourceFd) {synchronized(WindowsSelectorImpl.this.interruptLock) {WindowsSelectorImpl.this.interruptTriggered = true;}} else {            //根据FileDescriptor从fdMap中获取到MapEntryWindowsSelectorImpl.MapEntry var9 = WindowsSelectorImpl.this.fdMap.get(var8);            //如果MapEntry 不为空if (var9 != null) {                //获取SelectionKeySelectionKeyImpl var10 = var9.ski;                //!var5 或者SelectionKey所对应的channel不是SocketChannelImpl的实例或者不丢弃紧急数据if (!var5 || !(var10.channel() instanceof SocketChannelImpl) || !WindowsSelectorImpl.this.discardUrgentData(var8)) {if (WindowsSelectorImpl.this.selectedKeys.contains(var10)) {                        //清除数不等于更新数if (var9.clearedCount != var1) {                            //转换并设置准备好的感兴趣事件if (var10.channel.translateAndSetReadyOps(var4, var10) && var9.updateCount != var1) {                               //更新更新数var9.updateCount = var1;++var6;}} else if (var10.channel.translateAndUpdateReadyOps(var4, var10) && var9.updateCount != var1) {var9.updateCount = var1;++var6;}var9.clearedCount = var1;} else {if (var9.clearedCount != var1) {                            //转换并设置准备好的感兴趣事件var10.channel.translateAndSetReadyOps(var4, var10);if ((var10.nioReadyOps() & var10.nioInterestOps()) != 0) {                                //将准备好的SelectionKey添加到selectedKeys中WindowsSelectorImpl.this.selectedKeys.add(var10);var9.updateCount = var1;++var6;}} else {var10.channel.translateAndUpdateReadyOps(var4, var10);if ((var10.nioReadyOps() & var10.nioInterestOps()) != 0) {                                //将准备好的SelectionKey添加到selectedKeys中WindowsSelectorImpl.this.selectedKeys.add(var10);var9.updateCount = var1;++var6;}}var9.clearedCount = var1;}}}}}return var6;}
  • processFDSet(…)方法的作用即是将准备好的SelectionKey放入到selectedKeys中
赞(0) 打赏
未经允许不得转载:爱站程序员基地 » 以NIOServer示例代码分析java NIO的底层原理