⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nonblockingsocketserverservice.java.svn-base

📁 梦界家园程序开发基底框架
💻 SVN-BASE
字号:
package jm.net.sv;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 高级服务器程序
 * 
 * @author spook
 * @version 1.3.1
 * @see JDK 1.5.4
 */
public class NonBlockingSocketServerService {

	private Selector selector;

	private ServerSocketChannel server;

	private int port = 8000;

	private long connectionsNum = 0l;

	private long errorsNum = 0l;

	private ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1000, 1000,
			60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());

	public NonBlockingSocketServerService() {
		threadPool.prestartAllCoreThreads();
		System.out.println("Inside default ctor");
	}

	public NonBlockingSocketServerService(int port) {
		threadPool.prestartAllCoreThreads();
		System.out.println("Inside the other ctor");
		this.port = port;
	}

	public void initializeOperations() throws IOException {
		System.out.println("Inside initialization");
		selector = Selector.open();
		server = ServerSocketChannel.open();
		server.configureBlocking(false);
		InetSocketAddress isa = new InetSocketAddress(port);
		server.socket().bind(isa);
		System.out.println("Accepting connections on port "
				+ server.socket().getLocalPort());
	}

	public void startServer() {

		System.out.println("Inside startserver");
		try {
			initializeOperations();
		} catch (Exception e) {
			e.printStackTrace();
			return;
		}

		System.out.println("Abt to block on select()");
		try {
			@SuppressWarnings("unused")
			SelectionKey acceptKey = server.register(selector,
					SelectionKey.OP_ACCEPT);
		} catch (ClosedChannelException e) {
			e.printStackTrace();
			return;
		}

		while (true) {
			try {
				// this may block for a long time, upon return the
				// selected set contains keys of the ready channels
				int n = selector.select();

				if (n == 0) {
					continue; // nothing to do
				}

				// get an iterator over the set of selected keys
				Iterator it = selector.selectedKeys().iterator();

				// look at each key in the selected set
				while (it.hasNext()) {
					SelectionKey key = (SelectionKey) it.next();

					// remove key from selected set, it's been handled
					it.remove();

					// Is a new connection coming in?
					if (key.isAcceptable()) {
						ServerSocketChannel ssc = (ServerSocketChannel) key
								.channel();

						SocketChannel socketChannel = ssc.accept();

						registerChannel(selector, socketChannel,
								SelectionKey.OP_READ);

						connectionsNum++;
						System.out.println("Connection established with "
								+ socketChannel);
						System.out.println("******connectionsNum:"
								+ connectionsNum + "******");
						System.out.println("******errorNum:" + errorsNum
								+ "******");
					}

					// is there data to read on this channel?
					if (key.isReadable()) {
						key.interestOps(key.interestOps()
								& (~SelectionKey.OP_READ));
						Task task = new Task(key);
						threadPool.execute(task);
					}
				}
			} catch (Exception e) {
				e.printStackTrace();
				selector.wakeup();
				errorsNum++;
			}
		}
	}

	/**
	 * Register the given channel with the given selector for the given
	 * operations of interest
	 */
	protected void registerChannel(Selector selector,
			SelectableChannel channel, int ops) throws Exception {
		if (channel == null) {
			return; // could happen
		}

		// set the new channel non-blocking
		channel.configureBlocking(false);

		// register it with the selector
		channel.register(selector, ops);
	}

	class Task implements Runnable {
		SelectionKey key = null;

		public Task(SelectionKey key) {
			this.key = key;
		}

		public void run() {
			SocketChannel socketChannel = (SocketChannel) key.channel();
			ByteBuffer buffer = ByteBuffer.allocate(2048);
			int count;
			buffer.clear();

			try {
				Thread.sleep(5000);
				buffer.clear();
				while ((count = (socketChannel.read(buffer))) > 0) {
					buffer.flip();
					while (buffer.hasRemaining()) {
						socketChannel.write(buffer);
					}
					buffer.clear();
				}

				if (count < 0) {
					try {
						socketChannel.close();
						return;
					} catch (Exception e) {
						e.printStackTrace();
					}
				}

				// resume interest in OP_READ
				key.interestOps(key.interestOps() | SelectionKey.OP_READ);

				// cycle the selector so this key is active again
				key.selector().wakeup();

			} catch (Exception e) {
				e.printStackTrace();
				System.out.println("Caught '" + e + "' closing channel");

				// close channel and nudge selector
				try {
					socketChannel.close();
				} catch (IOException ex) {
					ex.printStackTrace();
				}

				key.selector().wakeup();
				key = null;
				errorsNum++;
			}
		}
	}
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -