⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 selectorimpl.java

📁 java1.6众多例子参考
💻 JAVA
字号:
/* * @(#)SelectorImpl.java	1.18 05/11/17 *  * Copyright 2006 Sun Microsystems, Inc. All rights reserved. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. */package com.sun.corba.se.impl.transport;import java.io.IOException;import java.nio.channels.ClosedChannelException;import java.nio.channels.SelectableChannel;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.util.ArrayList;import java.util.HashMap;import java.util.Iterator;import java.util.List;import com.sun.corba.se.pept.broker.Broker;import com.sun.corba.se.pept.transport.Acceptor;import com.sun.corba.se.pept.transport.Connection;import com.sun.corba.se.pept.transport.EventHandler;import com.sun.corba.se.pept.transport.ListenerThread;import com.sun.corba.se.pept.transport.ReaderThread;import com.sun.corba.se.spi.logging.CORBALogDomains;import com.sun.corba.se.spi.orb.ORB;import com.sun.corba.se.spi.orbutil.threadpool.Work;import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException;import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;import com.sun.corba.se.impl.logging.ORBUtilSystemException;import com.sun.corba.se.impl.orbutil.ORBUtility;/** * @author Harold Carr */public class SelectorImpl    extends	Thread    implements	com.sun.corba.se.pept.transport.Selector{    private ORB orb;    private Selector selector;    private long timeout;    private List deferredRegistrations;    private List interestOpsList;    private HashMap listenerThreads;    private HashMap readerThreads;    private boolean selectorStarted;    private boolean closed;    private ORBUtilSystemException wrapper ;    public SelectorImpl(ORB orb)    {	this.orb = orb;	selector = null;	selectorStarted = false;	timeout = 60000;	deferredRegistrations = new ArrayList();	interestOpsList = new ArrayList();	listenerThreads = new HashMap();	readerThreads = new HashMap();	closed = false;        wrapper = ORBUtilSystemException.get(orb,CORBALogDomains.RPC_TRANSPORT);    }    public void setTimeout(long timeout)     {	this.timeout = timeout;    }    public long getTimeout()    {	return timeout;    }    public void registerInterestOps(EventHandler eventHandler)    {	if (orb.transportDebugFlag) {	    dprint(".registerInterestOps:-> " + eventHandler);	}	SelectionKey selectionKey = eventHandler.getSelectionKey();	if (selectionKey.isValid()) {            int ehOps = eventHandler.getInterestOps();            SelectionKeyAndOp keyAndOp = new SelectionKeyAndOp(selectionKey, ehOps);	    synchronized(interestOpsList) {		interestOpsList.add(keyAndOp);	    }            // tell Selector Thread there's an update to a SelectorKey's Ops            selector.wakeup();	}	else {            wrapper.selectionKeyInvalid(eventHandler.toString());	    if (orb.transportDebugFlag) {		dprint(".registerInterestOps: EventHandler SelectionKey not valid " + eventHandler);	    }	}	if (orb.transportDebugFlag) {	    dprint(".registerInterestOps:<- ");	}    }    public void registerForEvent(EventHandler eventHandler)    {	if (orb.transportDebugFlag) {	    dprint(".registerForEvent: " + eventHandler);	}	if (isClosed()) {	    if (orb.transportDebugFlag) {		dprint(".registerForEvent: closed: " + eventHandler);	    }	    return;	}	if (eventHandler.shouldUseSelectThreadToWait()) {	    synchronized (deferredRegistrations) {		deferredRegistrations.add(eventHandler);	    }	    if (! selectorStarted) {		startSelector();	    }	    selector.wakeup();	    return;	}	switch (eventHandler.getInterestOps()) {	case SelectionKey.OP_ACCEPT :	    createListenerThread(eventHandler);	    break;	case SelectionKey.OP_READ :	    createReaderThread(eventHandler);	    break;	default:	    if (orb.transportDebugFlag) {		dprint(".registerForEvent: default: " + eventHandler);	    }	    throw new RuntimeException(                "SelectorImpl.registerForEvent: unknown interest ops");	}    }    public void unregisterForEvent(EventHandler eventHandler)    {	if (orb.transportDebugFlag) {	    dprint(".unregisterForEvent: " + eventHandler);	}	if (isClosed()) {	    if (orb.transportDebugFlag) {		dprint(".unregisterForEvent: closed: " + eventHandler);	    }	    return;	}	if (eventHandler.shouldUseSelectThreadToWait()) {	    SelectionKey selectionKey = eventHandler.getSelectionKey();	    selectionKey.cancel();	    selector.wakeup();	    return;	}	switch (eventHandler.getInterestOps()) {	case SelectionKey.OP_ACCEPT :	    destroyListenerThread(eventHandler);	    break;	case SelectionKey.OP_READ :	    destroyReaderThread(eventHandler);	    break;	default:	    if (orb.transportDebugFlag) {		dprint(".unregisterForEvent: default: " + eventHandler);	    }	    throw new RuntimeException(                "SelectorImpl.uregisterForEvent: unknown interest ops");	}    }    public void close()    {	if (orb.transportDebugFlag) {	    dprint(".close");	}	if (isClosed()) {	    if (orb.transportDebugFlag) {		dprint(".close: already closed");	    }	    return;	}	setClosed(true);	Iterator i;	// Kill listeners.	i = listenerThreads.values().iterator();	while (i.hasNext()) {	    ListenerThread listenerThread = (ListenerThread) i.next();	    listenerThread.close();	}	// Kill readers.	i = readerThreads.values().iterator();	while (i.hasNext()) {	    ReaderThread readerThread = (ReaderThread) i.next();	    readerThread.close();	}	// Selector	try {	    if (selector != null) {		// wakeup Selector thread to process close request		selector.wakeup();	    }	} catch (Throwable t) {	    if (orb.transportDebugFlag) {		dprint(".close: selector.close: " + t);	    }	}    }    ///////////////////////////////////////////////////    //    // Thread methods.    //    public void run()    {	setName("SelectorThread");	while (!closed) {	    try {		int n = 0;		if (timeout == 0 && orb.transportDebugFlag) {		    dprint(".run: Beginning of selection cycle");		}		handleDeferredRegistrations();		enableInterestOps();		try {		    n = selector.select(timeout);		} catch (IOException  e) {		    if (orb.transportDebugFlag) {			dprint(".run: selector.select: " + e);		    }		}		if (closed) {		    selector.close();		    if (orb.transportDebugFlag) {			dprint(".run: closed - .run return");		    }		    return;		}		/*		  if (timeout == 0 && orb.transportDebugFlag) {		  dprint(".run: selector.select() returned: " + n);		  }		  if (n == 0) {		  continue;		  }		*/		Iterator iterator = selector.selectedKeys().iterator();		if (orb.transportDebugFlag) {		    if (iterator.hasNext()) {			dprint(".run: n = " + n);		    }		}		while (iterator.hasNext()) {		    SelectionKey selectionKey = (SelectionKey) iterator.next();		    iterator.remove();		    EventHandler eventHandler = (EventHandler)			selectionKey.attachment();		    try {			eventHandler.handleEvent();		    } catch (Throwable t) {			if (orb.transportDebugFlag) {			    dprint(".run: eventHandler.handleEvent", t);			}		    }		}		if (timeout == 0 && orb.transportDebugFlag) {		    dprint(".run: End of selection cycle");		}	    } catch (Throwable t) {		// IMPORTANT: ignore all errors so the select thread keeps running.		// Otherwise a guaranteed hang.		if (orb.transportDebugFlag) {		    dprint(".run: ignoring", t);		}	    }	}    }    /////////////////////////////////////////////////////    //    // Implementation.    //    private synchronized boolean isClosed ()    {	return closed;    }    private synchronized void setClosed(boolean closed)    {	this.closed = closed;    }    private void startSelector()    {	try {	    selector = Selector.open();	} catch (IOException e) {	    if (orb.transportDebugFlag) {		dprint(".startSelector: Selector.open: IOException: " + e);	    }	    // REVISIT - better handling/reporting	    RuntimeException rte =		new RuntimeException(".startSelector: Selector.open exception");	    rte.initCause(e);	    throw rte;	}	setDaemon(true);	start();	selectorStarted = true;	if (orb.transportDebugFlag) {	    dprint(".startSelector: selector.start completed.");	}    }    private void handleDeferredRegistrations()    {	synchronized (deferredRegistrations) {            int deferredListSize = deferredRegistrations.size();            for (int i = 0; i < deferredListSize; i++) {                EventHandler eventHandler = 		    (EventHandler)deferredRegistrations.get(i);                if (orb.transportDebugFlag) {                    dprint(".handleDeferredRegistrations: " + eventHandler);                }                SelectableChannel channel = eventHandler.getChannel();                SelectionKey selectionKey = null;                try {                    selectionKey =                        channel.register(selector,                                         eventHandler.getInterestOps(),                                         (Object)eventHandler);                } catch (ClosedChannelException e) {                    if (orb.transportDebugFlag) {                        dprint(".handleDeferredRegistrations: " + e);                    }                }                eventHandler.setSelectionKey(selectionKey);            }            deferredRegistrations.clear();        }    }    private void enableInterestOps()    {	synchronized (interestOpsList) {	    int listSize = interestOpsList.size();	    if (listSize > 0) {                if (orb.transportDebugFlag) {                    dprint(".enableInterestOps:->");                }                SelectionKey selectionKey = null;		SelectionKeyAndOp keyAndOp = null;		int keyOp, selectionKeyOps = 0;		for (int i = 0; i < listSize; i++) {		    keyAndOp = (SelectionKeyAndOp)interestOpsList.get(i);		    selectionKey = keyAndOp.selectionKey;		    // Need to check if the SelectionKey is valid because a		    // connection's SelectionKey could be put on the list to		    // have its OP enabled and before it's enabled be reclaimed.		    // Otherwise, the enabling of the OP will throw an exception		    // here and exit this method an potentially not enable all		    // registered ops.		    //		    // So, we ignore SelectionKeys that are invalid. They will get		    // cleaned up on the next Selector.select() call.		    if (selectionKey.isValid()) {                        if (orb.transportDebugFlag) {                            dprint(".enableInterestOps: " + keyAndOp);                        }		        keyOp = keyAndOp.keyOp;		        selectionKeyOps = selectionKey.interestOps();		        selectionKey.interestOps(selectionKeyOps | keyOp);		    }		}		interestOpsList.clear();                if (orb.transportDebugFlag) {                    dprint(".enableInterestOps:<-");                }	    }	}    }    private void createListenerThread(EventHandler eventHandler)    {	if (orb.transportDebugFlag) {	    dprint(".createListenerThread: " + eventHandler);	}	Acceptor acceptor = eventHandler.getAcceptor();	ListenerThread listenerThread =	    new ListenerThreadImpl(orb, acceptor, this);	listenerThreads.put(eventHandler, listenerThread);	Throwable throwable = null;	try {	    orb.getThreadPoolManager().getThreadPool(0)		.getWorkQueue(0).addWork((Work)listenerThread);	} catch (NoSuchThreadPoolException e) {	    throwable = e;	} catch (NoSuchWorkQueueException e) {	    throwable = e;	}	if (throwable != null) {	    RuntimeException rte = new RuntimeException(throwable.toString());	    rte.initCause(throwable);	    throw rte;	}    }    private void destroyListenerThread(EventHandler eventHandler)    {	if (orb.transportDebugFlag) {	    dprint(".destroyListenerThread: " + eventHandler);	}	ListenerThread listenerThread = (ListenerThread)	    listenerThreads.get(eventHandler);	if (listenerThread == null) {	    if (orb.transportDebugFlag) {		dprint(".destroyListenerThread: cannot find ListenerThread - ignoring.");	    }	    return;	}	listenerThreads.remove(eventHandler);	listenerThread.close();    }    private void createReaderThread(EventHandler eventHandler)    {	if (orb.transportDebugFlag) {	    dprint(".createReaderThread: " + eventHandler);	}	Connection connection = eventHandler.getConnection();	ReaderThread readerThread = 	    new ReaderThreadImpl(orb, connection, this);	readerThreads.put(eventHandler, readerThread);	Throwable throwable = null;	try {	    orb.getThreadPoolManager().getThreadPool(0)		.getWorkQueue(0).addWork((Work)readerThread);	} catch (NoSuchThreadPoolException e) {	    throwable = e;	} catch (NoSuchWorkQueueException e) {	    throwable = e;	}	if (throwable != null) {	    RuntimeException rte = new RuntimeException(throwable.toString());	    rte.initCause(throwable);	    throw rte;	}    }    private void destroyReaderThread(EventHandler eventHandler)    {	if (orb.transportDebugFlag) {	    dprint(".destroyReaderThread: " + eventHandler);	}	ReaderThread readerThread = (ReaderThread)	    readerThreads.get(eventHandler);	if (readerThread == null) {	    if (orb.transportDebugFlag) {		dprint(".destroyReaderThread: cannot find ReaderThread - ignoring.");	    }	    return;	}	readerThreads.remove(eventHandler);	readerThread.close();    }    private void dprint(String msg)    {	ORBUtility.dprint("SelectorImpl", msg);    }    protected void dprint(String msg, Throwable t)    {	dprint(msg);	t.printStackTrace(System.out);    }    // Private class to contain a SelectionKey and a SelectionKey op.    // Used only by SelectorImpl to register and enable SelectionKey    // Op.    // REVISIT - Could do away with this class and use the EventHanlder    //           directly.    private class SelectionKeyAndOp    {        // A SelectionKey.[OP_READ|OP_WRITE|OP_ACCEPT|OP_CONNECT]        public int keyOp;        public SelectionKey selectionKey;        // constructor        public SelectionKeyAndOp(SelectionKey selectionKey, int keyOp) {	    this.selectionKey = selectionKey;	    this.keyOp = keyOp;	}    }// End of file.}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -