📄 selectorthread.java
字号:
final SelectorHandler handlerInfo,
final CallbackErrorHandler errorHandler) {
invokeLater(new Runnable() {
public void run() {
try {
registerChannelNow(channel, selectionKeys, handlerInfo);
} catch (IOException e) {
errorHandler.handleError(e);
}
}
});
}
/**
* Registers a SelectableChannel with this selector. This channel will
* start to be monitored by the selector for the set of events associated
* with it. When an event is raised, the corresponding handler is
* called.
*
* This method can be called multiple times with the same channel
* and selector. Subsequent calls update the associated interest set
* and selector handler to the ones given as arguments.
*
* This method should only be called on the selector thread. Otherwise
* an exception is thrown. Use the registerChannelLater() when calling
* from another thread.
*
* @param channel The channel to be monitored.
* @param selectionKeys The interest set. Should be a combination of
* SelectionKey constants.
* @param handler The handler for events raised on the registered channel.
*/
public void registerChannelNow(SelectableChannel channel,
int selectionKeys,
SelectorHandler handlerInfo) throws IOException {
if (Thread.currentThread() != selectorThread) {
throw new IOException("Method can only be called from selector thread");
}
if (!channel.isOpen()) {
throw new IOException("Channel is not open.");
}
try {
if (channel.isRegistered()) {
SelectionKey sk = channel.keyFor(selector);
assert sk != null : "Channel is already registered with other selector";
sk.interestOps(selectionKeys);
Object previousAttach = sk.attach(handlerInfo);
assert previousAttach != null;
} else {
channel.configureBlocking(false);
channel.register(selector, selectionKeys, handlerInfo);
}
} catch (Exception e) {
IOException ioe = new IOException("Error registering channel.");
ioe.initCause(e);
throw ioe;
}
}
/**
* Executes the given task in the selector thread. This method returns
* as soon as the task is scheduled, without waiting for it to be
* executed.
*
* @param run The task to be executed.
*/
public void invokeLater(Runnable run) {
synchronized (pendingInvocations) {
pendingInvocations.add(run);
}
selector.wakeup();
}
/**
* Executes the given task synchronously in the selector thread. This
* method schedules the task, waits for its execution and only then
* returns.
*
* @param run The task to be executed on the selector's thread.
*/
public void invokeAndWait(final Runnable task)
throws InterruptedException
{
if (Thread.currentThread() == selectorThread) {
// We are in the selector's thread. No need to schedule
// execution
task.run();
} else {
// Used to deliver the notification that the task is executed
final Object latch = new Object();
synchronized (latch) {
// Uses the invokeLater method with a newly created task
this.invokeLater(new Runnable() {
public void run() {
task.run();
// Notifies
latch.notify();
}
});
// Wait for the task to complete.
latch.wait();
}
// Ok, we are done, the task was executed. Proceed.
}
}
/**
* Executes all tasks queued for execution on the selector's thread.
*
* Should be called holding the lock to <code>pendingInvocations</code>.
*
*/
private void doInvocations() {
synchronized (pendingInvocations) {
for (int i = 0; i < pendingInvocations.size(); i++) {
Runnable task = (Runnable) pendingInvocations.get(i);
task.run();
}
pendingInvocations.clear();
}
}
/**
* Main cycle. This is where event processing and
* dispatching happens.
*/
public void run() {
// For use with Log4j. Setups the logging context so that logging
// messages can be printed together with the identificate of the
// thread that generated them.
// String threadName = Thread.currentThread().getName();
// String nameParts[] = threadName.split("-");
// NDC.push("S" + nameParts[nameParts.length-1]);
// Here's where everything happens. The select method will
// return when any operations registered above have occurred, the
// thread has been interrupted, etc.
while (true) {
// Execute all the pending tasks.
doInvocations();
// Time to terminate?
if (closeRequested) {
return;
}
// [SSL] Give the secure sockets a chance to handle their events
sscManager.fireEvents();
int selectedKeys = 0;
try {
selectedKeys = selector.select();
} catch (IOException ioe) {
// Select should never throw an exception under normal
// operation. If this happens, print the error and try to
// continue working.
ioe.printStackTrace();
continue;
}
if (selectedKeys == 0) {
// Go back to the beginning of the loop
continue;
}
// Someone is ready for IO, get the ready keys
Iterator it = selector.selectedKeys().iterator();
// Walk through the collection of ready keys and dispatch
// any active event.
while (it.hasNext()) {
SelectionKey sk = (SelectionKey)it.next();
it.remove();
try {
// Obtain the interest of the key
int readyOps = sk.readyOps();
// Disable the interest for the operation that is ready.
// This prevents the same event from being raised multiple
// times.
sk.interestOps(sk.interestOps() & ~readyOps);
SelectorHandler handler =
(SelectorHandler) sk.attachment();
// Some of the operations set in the selection key
// might no longer be valid when the handler is executed.
// So handlers should take precautions against this
// possibility.
// Check what are the interests that are active and
// dispatch the event to the appropriate method.
if (sk.isAcceptable()) {
// A connection is ready to be completed
((AcceptSelectorHandler)handler).handleAccept();
} else if (sk.isConnectable()) {
// A connection is ready to be accepted
((ConnectorSelectorHandler)handler).handleConnect();
} else {
ReadWriteSelectorHandler rwHandler =
(ReadWriteSelectorHandler)handler;
// Readable or writable
if (sk.isReadable()) {
// It is possible to read
rwHandler.handleRead();
}
// Check if the key is still valid, since it might
// have been invalidated in the read handler
// (for instance, the socket might have been closed)
if (sk.isValid() && sk.isWritable()) {
// It is read to write
rwHandler.handleWrite();
}
}
} catch (Throwable t) {
// No exceptions should be thrown in the previous block!
// So kill everything if one is detected.
// Makes debugging easier.
closeSelectorAndChannels();
System.out.println("Selector caught an exception:");
t.printStackTrace();
return;
}
}
}
}
/**
* Closes all channels registered with the selector. Used to
* clean up when the selector dies and cannot be recovered.
*/
private void closeSelectorAndChannels() {
Set keys = selector.keys();
for (Iterator iter = keys.iterator(); iter.hasNext();) {
SelectionKey key = (SelectionKey)iter.next();
try {
key.channel().close();
} catch (IOException e) {
// Ignore
}
}
try {
selector.close();
} catch (IOException e) {
// Ignore
}
}
/**
* @return Returns the sscManager.
*/
public SSLChannelManager getSscManager() {
return sscManager;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -