亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

【NIO】Chapter 4. Selectors

系統 1931 0

A single thread can monitor large numbers of channels with readiness selection.

The Selectorand related classes provide the APIs todo readiness selection on channels.

?

Selector Basics

You register one or more previously created selectable channels with a selector object.

?

A key that represents the relationship between one channel and oneselector is returned.

?

Selection keys remember what you are interested in for eachchannel.

?

They also track the operations of interest that their channel is currently ready to perform.

?

When you invoke select() on a selector object, the associated keys are updated by checking all the channels registered with that selector.

?

You can obtain a set of the keys whose channels were found to be ready at that point.

?

By iterating over these keys, you can service each channel that has become ready since the last time you invoked select().

?

?

事件驅動模型

注冊感興趣的事件到Selector中,當某個channel上發生了注冊的事件,將會得到通知。

?

The real power of readiness selection is that a potentially large number of channels can bechecked for readiness simultaneously.

?

Optionally, the invoking thread can ask to be put to sleep until one or more of the channels registered with the Selectoris ready, or it can periodically poll the selector to see if anything has become ready since the last check.

?

睡眠或者輪詢,以便發現某個事件已處于準備完成的狀態

?

True readiness selection must be done by the operating system.

One of the most important functions performed by an operating system is to handle I/O requests and notify processes when their data is ready.

Soit only makes sense to delegate this function down to the operating system.

The Selectorclass provides the abstraction by which Java code can request readiness selection service from the underlying operating system in a portable way.

?

由底層操作系統通知Java進程發生了某個事件需要處理,然后再傳遞到Selection中。

?

The Selector, SelectableChannel, and SelectionKey

?

Selector

The Selectorclass manages information about a set of registered channels and
their readiness states. Channels are registered with selectors, and a selector can be
asked to update the readiness states of the channels currently registered with it.
When doing so, the invoking thread can optionally indicate that it would prefer to
be suspended until one of the registered channels is ready.?

?

SelectableChannel

This abstract class provides the common methods needed to implement channel
selectability. It's the superclass of all channel classes that support readiness
selection. FileChannelobjects are not selectable because they don't extend from
SelectableChannel. All the socket channel classes are selectable,
as well as the channels obtained from a Pipeobject. SelectableChannelobjects
can be registered with Selectorobjects, along with an indication of which
operations on that channel are of interestfor that selector. A channel can be
registered with multiple selectors, but only once per selector.

?

SelectionKey

A SelectionKeyencapsulates the registration relationship between a specific
channel and a specific selector. A SelectionKeyobject is returned from
SelectableChannel.register()and serves as a token representing the registration.
SelectionKeyobjects contain two bit sets (encoded as integers) indicating which
channel operations the registrant has aninterest in and which operations the
channel is ready to perform.

?

A channel must first be placed in nonblocking mode (by calling configureBlocking(false)) before it can be registered with a selector.


【NIO】Chapter 4. Selectors
?

A selector maintains a set of channels to monitor.

The important thing is to remember that the Selector object controls the selection process for
the channels registered with it.

?

Selectors are the managing objects, not the selectable channel objects.

The Selectorobject performs readiness selection of channels registered with it and manages selection keys.

?

Setting Up Selectors

      Selector selector = Selector.open(); 
channel1.register (selector, SelectionKey.OP_READ); 
channel2.register (selector, SelectionKey.OP_WRITE); 
channel3.register (selector, SelectionKey.OP_READ | 
SelectionKey.OP_WRITE); 
// Wait up to 10 seconds for a channel to become ready 
readyCount = selector.select (10000); 
    

?

There are four defined selectable operations: read , write , connect , and accept .

      public static final int OP_READ
public static final int OP_WRITE
public static final int OP_CONNECT
public static final int OP_ACCEPT 
    

?

Not all operations are supported on all selectable channels. A SocketChannel
cannot do an accept, for example.

?

Channels are not immediately deregistered when the associated key is cancelled. They remain
registered until the next selection operation occurs .

?

Using Selection Keys

A key represents the registration of a particular channel object with a particular selector object.

?

When it's time to terminate that relationship, call the cancel()method on the SelectionKeyobject.

A key can be checked to see if it still represents a valid registration by calling its isValid()method. When a key is cancelled, it's placed in the cancelled set of the associated selector.

The registration is not immediately terminated, but the key is immediately invalidated,any cancelled keys will be cleared from the cancelled key set, and the corresponding deregistrations will be completed.

?

A channel can be registered with many selectors

?

When a channel is closed:

all keys associated with it are automatically cancelled

When a selector is closed:

all channels registered with that selector are deregistered, and the associated keys are
invalidated (cancelled).

When a key is cancelled:

calling any of its methods related to selection will throw a CancelledKeyException.

?

      if ((key.readyOps() & SelectionKey.OP_READ) != 0) 
{ 
myBuffer.clear(); 
key.channel().read (myBuffer); 
doSomethingWithBuffer (myBuffer.flip()); 
} 

if (key.isWritable()) 
//is equivalent to: 
if ((key.readyOps() & SelectionKey.OP_WRITE) != 0)
    

?

?

attach (Object ob)

This is a convenience that allows you to associate an arbitrary object with a key. This object can be a reference to anything meaningful to you, such as a business object , session handle , another channel, etc . This allows you to iterate through the keys associated with a selector, using the attached object handle on each as a reference to retrieve the associated context.

?

The attach()method stores the provided object reference in the key object.

?

If the selection key is long-lived, but the object you attach should not be, remember to clear the attachment when you're done. Otherwise, your attached object will not be garbage collected, and you may have a memory leak .

?

      SelectionKey key = channel.register (selector, SelectionKey.OP_READ, 
myObject); 

//is equivalent to this
SelectionKey key = channel.register (selector, SelectionKey.OP_READ); 
key.attach (myObject); 
    

?

?

The Selection Process

Each Selectorobject maintains three sets of keys:?

Registered key set

The set of currently registered keys associated with the selector. Not every
registered key is necessarily still valid. This set is returned by the keys()method
and may be empty. The registered key set is not directly modifiable;

?

Selected key set

A subset of the registered key set. Each member of this set is a key whose
associated channel was determined by the selector (during a prior selection
operation) to be ready for at least one of the operations in the key's interest set.
This set is returned by the selectedKeys()method (and may be empty).?

Keys can be directly removed from this set, but not added.

?

Cancelled key set

A subset of the registered key set, this set contains keys whose cancel()methods
have been called (the key has been invalidated), but they have not been
deregistered. This set is private to the selector object and cannot be accessed
directly.

?

Managing Selection Keys

The way to clear the ready set of a SelectionKeyis to remove the key itself from the set
of selected keys.

The ready set of a selection key is modified only by the Selectorobject

?

The conventional approach is to perform a select() call on the selector (which updates the selected key set) then iterate over the set of keys returned by selectedKeys().

As each key is examined in turn, the associated channel is dealt with according to the key's ready set.

The key is then removed from the selected key set (by calling remove() on the Iterator object), and the next key is examined.
When complete, the cycle repeats by calling select()again.

?

This example is bad:

reading the data synchronously in the main thread.

      package com.java.nio;

import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.logging.Logger;

public class SelectSockets {
	
	private final Logger logger = Logger.getLogger(getClass().getName());  

	private static int PORT_NUMBER = 1234;
	
	public static void main(String[] argv) throws Exception {
		new SelectSockets().go(argv);
	}

	private void go(String[] argv) throws Exception  {
		int port = PORT_NUMBER;
		if(argv.length>0) {
			port = Integer.parseInt(argv[0]);
		}
		logger.info("Listen on port " + port);
		
		//Allocate an unbound server socket channel
		ServerSocketChannel serverChannel = ServerSocketChannel.open();
		
		//Get the associated SeverSocket to bind it with
		ServerSocket serverSocket = serverChannel.socket();
		
		//Create a new Selector for use below
		Selector selector = Selector.open();
		
		//Set the port the server channel will listen to 
		serverSocket.bind(new InetSocketAddress(port));
		
		//Set nonblocking mode for the listening socket
		serverChannel.configureBlocking(false);
		
		//Register the ServerSocketChannle with the Selector
		serverChannel.register(selector, SelectionKey.OP_ACCEPT);
		
		while(true) {
			//This may block for a long time. Upon returning, the selected set contains keys of the ready channels.
			int n = selector.select();
			
			if(n==0)
				continue;
			
			//Get an iterator over the set of selected key
			Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
			
			//Look for each key in the selected set
			while(iter.hasNext()) {
				SelectionKey key = (SelectionKey)iter.next();
				
				//Is a new connection come in ?
				if(key.isAcceptable()) {
					ServerSocketChannel server = (ServerSocketChannel)key.channel();
					SocketChannel channel = server.accept();
					registerChannel(selector, channel, SelectionKey.OP_READ);
					
					sayHello(channel);
				} else if(key.isReadable()) {
					readDataFromSocket(key);
				}
				
				//Remove key from selected set; because it's been handled!
				iter.remove();
			}
		}
		
	}
	
	/**
	 * Use the same byte buffer for all channels. 
	 * A single thread is servicing all the channels, so no danger of concurrent acccess.
	 */
	private ByteBuffer buffer = ByteBuffer.allocate(1024);
	
	/**
	 * Sample data handler method for a channel with data ready to read.
	 */
	protected void readDataFromSocket(SelectionKey key) throws Exception {
		SocketChannel socketChannel = (SocketChannel)key.channel();
		int count;
		
		buffer.clear();//Empty buffer
		//Loop while data is available; channel is nonblocking
		while((count=socketChannel.read(buffer))>0) {
			buffer.flip();//Make buffer readable
			//Send the data;don't assume it goes all at once
			while(buffer.hasRemaining()) {
				socketChannel.write(buffer);//change it!
			}
			
			buffer.clear();//Empty buffer
		}
              	if(count<0) 
		     socketChannel.close();// Close channel on EOF, invalidates the key
	}

	/**
	 * A greeting to the incoming client connection. 
	 * @throws Exception 
	 */
	private void sayHello(SocketChannel channel) throws Exception {
		buffer.clear();
		buffer.put("Hi there!\r\n".getBytes());
		
		buffer.flip();
		channel.write(buffer);
	}

	/**
	 * Register the given channel with the given selector for the given operations of interest
	 */
	protected void registerChannel(Selector selector, SocketChannel channel,
			int ops) throws Exception {
		if(channel == null)
			return; //could happen
		
		//Set the new channel nonblocking
		channel.configureBlocking(false);
		
		//Register it with the selector
		channel.register(selector, ops);
	}
	
}

    

?

This example is good:

Uses a thread pool to service channels with data to read.

Passes the SelectionKey object to a worker thread for servicing.

SelectionKeySet被多線程操作是不安全的,但是可以把key分配給不同的線程去執行。

?

A better approach is to use one selector for all selectable channels and delegate the
servicing of ready channels to other threads.

You have a single point to monitor channel readiness and a decoupled pool of worker threads to handle the incoming data. The thread pool size can be tuned (or tune itself, dynamically) according to deployment conditions.
Management of selectable channels remains simple, and simple is good.?

使用一個selector對所有通道進行監測,委派“就緒”狀態的通道給其它線程去執行。

1個線程負責監聽channel是否處于某種就緒狀態

1個線程池負責與就緒狀態的通道進行交互(接受請求,讀取數據,寫出數據)

?

示例中的線程池部分需要使用Concurrent包中的線程池進行替換

?

服務端:

      package com.java.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.logging.Logger;

public class SelectSocketsThreadPool {
	
	private final Logger logger = Logger.getLogger(getClass().getName());
	
	private static final int MAX_THREADS = 5;
	
	private static int PORT_NUMBER = 1234;
	
	private ByteBuffer buffer = ByteBuffer.allocate(1024);
	
	private ThreadPool pool = new ThreadPool(MAX_THREADS);
	
	public static void main(String[] args) throws Exception {
		new SelectSocketsThreadPool().go(args);
	}
	

	private void go(String[] argv) throws Exception  {
		int port = PORT_NUMBER;
		if(argv.length>0) {
			port = Integer.parseInt(argv[0]);
		}
		logger.info("------>>>Listen on port " + port);
		
		//Allocate an unbound server socket channel
		ServerSocketChannel serverChannel = ServerSocketChannel.open();
		
		//Get the associated SeverSocket to bind it with
		ServerSocket serverSocket = serverChannel.socket();
		
		//Create a new Selector for use below
		Selector selector = Selector.open();
		
		//Set the port the server channel will listen to 
		serverSocket.bind(new InetSocketAddress(port));
		
		//Set nonblocking mode for the listening socket
		serverChannel.configureBlocking(false);
		
		//Register the ServerSocketChannle with the Selector
		serverChannel.register(selector, SelectionKey.OP_ACCEPT);
		
		while(true) {
			//This may block for a long time. Upon returning, the selected set contains keys of the ready channels.
			int n = selector.select();
			
			if(n==0)
				continue;
			
			//Get an iterator over the set of selected key
			Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
			
			//Look for each key in the selected set
			while(iter.hasNext()) {
				SelectionKey key = (SelectionKey)iter.next();
				
				//Is a new connection come in ?
				if(key.isAcceptable()) {
					ServerSocketChannel server = (ServerSocketChannel)key.channel();
					SocketChannel channel = server.accept();
					//注冊當前被接受的channel的"可讀事件"到selector中
					registerChannel(selector, channel, SelectionKey.OP_READ);
					
					sayHello(channel);//客戶端連接成功后便發送1條消息給客戶端
				} else if(key.isReadable()) {
					//當channel發生"可讀事件",開始從當前channel中讀取數據(具體什么時候數據到達利用了底層操作系統的功能)
					readDataFromSocket(key);
				}
				
				//Remove key from selected set; because it's been handled!
				iter.remove();
			}
		}
		
	}
	
	/**
	 * 將channel注冊到selector對象中,由selector負責監聽此通道上"請求連接"事件
	 */
	private void registerChannel(Selector selector, SocketChannel channel,
			int ops) throws Exception {
		if(channel == null)
			return; //could happen
		
		//Set the new channel nonblocking
		channel.configureBlocking(false);
		
		//Register it with the selector
		channel.register(selector, ops);
	}


	/**
	 * 通過線程池中的線程進行數據的讀取
	 */
	private void readDataFromSocket(SelectionKey key) throws Exception {
		Worker worker = pool.getWorker();
		if(worker==null)
			return;
		//Invoking this wakes up the worker thread, then returns
		worker.serviceChannel(key);
	}

	private void sayHello(SocketChannel channel) throws Exception {
		buffer.clear();
		buffer.put("Hi ~ welcome you!\r\n".getBytes());
		
		buffer.flip();
		channel.write(buffer);
	}
	
	
	/**
	 * 內部維護1個線程池
	 */
	private class ThreadPool {
		//Threads are cycled through a FIFO idle queue. 
		List<Worker> idle = new LinkedList<>();
		
		ThreadPool(int poolSize) {
			for(int i=0; i<poolSize; i++) {
				Worker thread = new Worker(this);
				thread.setName("【Woker " + (i+1) + "】");
				thread.start();
				idle.add(thread);
			}
		}
		
		/**
		 * Find an idle worker thread, if any. Could return null.
		 */
		Worker getWorker() {
			Worker worker = null;
			synchronized (idle) {
				if(!idle.isEmpty())
					worker = idle.remove(0);
			}
			return worker;
		}
		
		/**
		 * return itself to the idle pool. 
		 */
		void returnWoker(Worker worker) {
			synchronized (idle) {
				idle.add(worker);
			}
		}
	}
	
	/**
	 * 線程類
	 */
	private class Worker extends Thread {
		private ByteBuffer buffer = ByteBuffer.allocate(1024);
		private ThreadPool pool;
		private SelectionKey key;
		
		Worker(ThreadPool pool) {
			this.pool = pool;
		}
		
		synchronized void serviceChannel(SelectionKey key) {
			this.key = key;
			//This will cause the selector to ignore read-readiness for this channel while the worker thread is servicing it.
			key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
			this.notify();
		}
		
		@Override
		public synchronized void run() {
			
			logger.info(this.getName() + " is ready");
			
			//Loop forever waiting for work to do
			while(true) {
				try {
					this.wait();
				} catch (InterruptedException e) {
					e.printStackTrace();
					System.out.println(this.isInterrupted());
					//clear interrupt status
					Worker.interrupted();
				}
				
				if(key==null)
					return; // just in case
				
				logger.info(this.getName()+" has been awakened");
				
				try {
					drainChannel(key);
				} catch (Exception e) {
					e.printStackTrace();
					try {
						key.channel().close();//遇到異常關閉channel
					} catch (IOException e1) {
						e1.printStackTrace();
					}
					key.selector().wakeup();
				}
				
				key = null;
				
				// Done. Ready for more. Return to pool
				this.pool.returnWoker(this);
			}
		}
		
		
		/**
		 * 從channel中讀取數據
		 */
		void drainChannel(SelectionKey key) throws Exception {
			SocketChannel channel = (SocketChannel) key.channel();
			int count;
			buffer.clear();
			// Loop while data is available; channel is nonblocking 
			while((count=channel.read(buffer))>0) {
				buffer.flip();
				while(buffer.hasRemaining()) 
					channel.write(buffer);
				buffer.clear();
			}
			if(count<0) {
				logger.info(channel.toString() + " closed");
				channel.close();// Close channel on EOF; invalidates the key 
				return;
			}
			logger.info(key.toString() + " register OP_READ again!");
			key.interestOps(key.interestOps() | SelectionKey.OP_READ);
			key.selector().wakeup();
		}
	}
	
	
}

    

?

?

客戶端:

      package com.java.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.util.Iterator;
import java.util.Random;

public class SocketChannelClient {

	private CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
	private CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
	
	private final int SERVER_PORT = 1234;
	
	private Selector selector = null;
	private SocketChannel socket = null;
	private SelectionKey clientKey = null;
	
	public static void main(String[] args) {
		new SocketChannelClient().new ClientThread().start();
	}
	
	public SocketChannelClient() {
		init();
	}

	private void init() {
		try {
			//create a selector
			selector = Selector.open();
			
			//create socket and register
			socket = SocketChannel.open();
			socket.configureBlocking(false);
			
			clientKey = socket.register(selector, SelectionKey.OP_CONNECT);
			
			InetSocketAddress remote = new InetSocketAddress("localhost", SERVER_PORT);
			//connect to remote server
			socket.connect(remote);

		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	
	private class ClientThread extends Thread {
		
		int times = 0;
		
		@Override
		public void run() {
			try {
				//listening for event
				for(;;) {
					selector.select();
					
					Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
					while(iter.hasNext()) {
						SelectionKey key = iter.next();
						iter.remove();
						
						if(key.isConnectable()) {
							//connection event
							SocketChannel channel = (SocketChannel)key.channel();
							if(channel.isConnectionPending())
								channel.finishConnect();
							channel.register(selector, SelectionKey.OP_READ);
							send("Hello Server!");
						} else if(key.isReadable()) {
							//read event
							SocketChannel channel = (SocketChannel)key.channel();
							
							//read data
							ByteBuffer buffer = ByteBuffer.allocate(100);
							channel.read(buffer);
							buffer.flip();
							String msg = decoder.decode(buffer).toString();
							System.out.println("Receive :" + msg);
							Thread.sleep(3000);
							if(++times==10)
								throw new RuntimeException("達到最大通信次數,程序終止");
							send("abcdefghijklmnopqrst".substring(new Random().nextInt(10)));
						}
					}
				}
			} catch(Exception e) {
				e.printStackTrace(System.err);
			} finally {
				close();
			}
		}
		
		//send message to server
		public void send(String msg) {
			try {
				SocketChannel client = (SocketChannel)clientKey.channel();
				client.write(encoder.encode(CharBuffer.wrap(msg)));
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
		
		//shut down
		public void close() {
			try {
				selector.close();
				socket.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

}

    

?

?

【NIO】Chapter 4. Selectors


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 国产一区二区三区高清视频 | 四虎影院黄色 | 激情浪荡yin乱之合集 | 久久久久视频精品网 | 视色视频 | 亚洲天堂久久精品成人 | 国产成人亚洲综合91精品555 | 国内偷自视频区视频综合 | 996热精品视频在线观看 | 高清一区二区 | 久久影院一区二区三区 | 欧美另类第一页 | 欧美深夜影院 | 欧美精品免费在线观看 | 草久久| 久久综合九色 | 香蕉久久综合 | 日本不卡一区二区 | 九草视频在线 | 精品久久香蕉国产线看观看亚洲 | 婷婷亚洲五月色综合 | 欧美成人aaa大片 | 亚洲韩国日本一级二级r级 亚洲韩精品欧美一区二区三区 | 日本欧美一区二区三区在线 | 亚洲精品国产suv一区88 | 奇米青青草 | 国产精品视频一区国模私拍 | 国产成人性毛片aaww | 欧美xxx视频 | 国产一二三区在线观看 | 天天操天天看 | 午夜时刻免费实验区观看 | 亚洲精品98久久久久久中文字幕 | 国产精品亚洲国产 | 非常色的视频 | 香蕉色香蕉在线视频 | 特级黄色毛片 | 亚洲 欧美 另类 天天更新影院 | 奇米影视7777久久精品人人爽 | 亚洲成人观看 | 天天操天天插天天射 |