📄 nonblockingsocketserverservice.java.svn-base
字号:
package jm.net.sv;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 高级服务器程序
*
* @author spook
* @version 1.3.1
* @see JDK 1.5.4
*/
public class NonBlockingSocketServerService {
private Selector selector;
private ServerSocketChannel server;
private int port = 8000;
private long connectionsNum = 0l;
private long errorsNum = 0l;
private ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1000, 1000,
60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
public NonBlockingSocketServerService() {
threadPool.prestartAllCoreThreads();
System.out.println("Inside default ctor");
}
public NonBlockingSocketServerService(int port) {
threadPool.prestartAllCoreThreads();
System.out.println("Inside the other ctor");
this.port = port;
}
public void initializeOperations() throws IOException {
System.out.println("Inside initialization");
selector = Selector.open();
server = ServerSocketChannel.open();
server.configureBlocking(false);
InetSocketAddress isa = new InetSocketAddress(port);
server.socket().bind(isa);
System.out.println("Accepting connections on port "
+ server.socket().getLocalPort());
}
public void startServer() {
System.out.println("Inside startserver");
try {
initializeOperations();
} catch (Exception e) {
e.printStackTrace();
return;
}
System.out.println("Abt to block on select()");
try {
@SuppressWarnings("unused")
SelectionKey acceptKey = server.register(selector,
SelectionKey.OP_ACCEPT);
} catch (ClosedChannelException e) {
e.printStackTrace();
return;
}
while (true) {
try {
// this may block for a long time, upon return the
// selected set contains keys of the ready channels
int n = selector.select();
if (n == 0) {
continue; // nothing to do
}
// get an iterator over the set of selected keys
Iterator it = selector.selectedKeys().iterator();
// look at each key in the selected set
while (it.hasNext()) {
SelectionKey key = (SelectionKey) it.next();
// remove key from selected set, it's been handled
it.remove();
// Is a new connection coming in?
if (key.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel) key
.channel();
SocketChannel socketChannel = ssc.accept();
registerChannel(selector, socketChannel,
SelectionKey.OP_READ);
connectionsNum++;
System.out.println("Connection established with "
+ socketChannel);
System.out.println("******connectionsNum:"
+ connectionsNum + "******");
System.out.println("******errorNum:" + errorsNum
+ "******");
}
// is there data to read on this channel?
if (key.isReadable()) {
key.interestOps(key.interestOps()
& (~SelectionKey.OP_READ));
Task task = new Task(key);
threadPool.execute(task);
}
}
} catch (Exception e) {
e.printStackTrace();
selector.wakeup();
errorsNum++;
}
}
}
/**
* Register the given channel with the given selector for the given
* operations of interest
*/
protected void registerChannel(Selector selector,
SelectableChannel channel, int ops) throws Exception {
if (channel == null) {
return; // could happen
}
// set the new channel non-blocking
channel.configureBlocking(false);
// register it with the selector
channel.register(selector, ops);
}
class Task implements Runnable {
SelectionKey key = null;
public Task(SelectionKey key) {
this.key = key;
}
public void run() {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(2048);
int count;
buffer.clear();
try {
Thread.sleep(5000);
buffer.clear();
while ((count = (socketChannel.read(buffer))) > 0) {
buffer.flip();
while (buffer.hasRemaining()) {
socketChannel.write(buffer);
}
buffer.clear();
}
if (count < 0) {
try {
socketChannel.close();
return;
} catch (Exception e) {
e.printStackTrace();
}
}
// resume interest in OP_READ
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
// cycle the selector so this key is active again
key.selector().wakeup();
} catch (Exception e) {
e.printStackTrace();
System.out.println("Caught '" + e + "' closing channel");
// close channel and nudge selector
try {
socketChannel.close();
} catch (IOException ex) {
ex.printStackTrace();
}
key.selector().wakeup();
key = null;
errorsNum++;
}
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -