⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 threadedeventdispatcher.java

📁 开放源码的smpp协议开发包
💻 JAVA
字号:
package ie.omk.smpp.event;import ie.omk.smpp.Connection;import ie.omk.smpp.message.SMPPPacket;import ie.omk.smpp.util.APIConfig;import ie.omk.smpp.util.PropertyNotFoundException;import java.util.ArrayList;import java.util.Iterator;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;/** * An event dispatcher that does not block the receiver daemon thread. * <p> * <font size="+2"> <b>This class is highly experimental </b> </font>. * </p> * This dispatcher class has a pool of dispatcher threads. When an event is * delivered to this dispatcher by the receiver thread, it is added to a queue * and the method returns immediately. One dispatcher thread is then selected * and that thread is responsible for delivering the event to all registered * observers. The receiver daemon is then free to continue adding new events to * the event queue, which will be processed by a thread in the thread pool. *  * @author Oran Kelly */public class ThreadedEventDispatcher implements EventDispatcher, Runnable {    private Log logger = LogFactory.getLog(ThreadedEventDispatcher.class);    /**     * Runner flag. If set to false, all dispatcher threads will exit on next     * iteration (some may be blocked on the queue).     */    private boolean running = true;    /**     * Size of the thread pool.     */    private int poolSize = 0;    /**     * Pool of event dispatcher threads.     */    private ThreadGroup threadPool = new ThreadGroup("DispatcherPool");    /**     * FIFO queue of packets and SMPP events.     */    private FIFOQueue queue = null;    /**     * Number of threads currently blocked on the queue.     */    private int threadsWaiting = 0;    /**     * List of observers registered for event delivery.     */    private ArrayList observers = new ArrayList();    /**     * Create a new threaded event dispatcher object.     */    public ThreadedEventDispatcher() {    }    /**     * Initialise this event dispatcher. This method will retrieve the size of     * the thread pool and FIFO queue from the API configuration and initialise     * both. See {@link ie.omk.smpp.util.APIConfig}class documentation for the     * appropriate configuration properties to use. If the properties are not     * found in the configuration, the current defaults are a thread pool size     * of <code>3</code> and a FIFO queue size of <code>100</code>.     */    public void init() {        int queueSize;        try {            APIConfig cfg = APIConfig.getInstance();            poolSize = cfg.getInt(APIConfig.EVENT_THREAD_POOL_SIZE);            queueSize = cfg.getInt(APIConfig.EVENT_THREAD_FIFO_QUEUE_SIZE);        } catch (PropertyNotFoundException x) {            poolSize = 3;            queueSize = 100;        }        // The queue must be created before the thread pool is initialised!        queue = new FIFOQueue(queueSize);        initialiseThreadPool();    }    private void initialiseThreadPool() {        Thread t;        for (int i = 0; i < poolSize; i++) {            t = new Thread(threadPool, this, "EventDispatch" + i);            t.start();        }    }    /**     * Shut down all threads in the thread pool. This method will block until     * all threads have terminated properly. Applications should be careful not     * to use one of the thread pool's own threads to call this method as this     * will cause a runtime exception. How can this method wait for all the     * pool's threads to die if one of the pool's threads is executing this     * method?     */    public void destroy() {        logger.debug("Shutting down dispatch threads.");        // This could happen if an application attempts to set a new event        // dispatcher during event processing. There are probably many other        // ways this call-back could happen but it shouldn't!        if (Thread.currentThread().getThreadGroup() == threadPool) {            logger                    .error("Cannot shut down the thread pool with one of it's own threads.");            throw new RuntimeException();        }        running = false;        synchronized (queue) {            queue.notifyAll();        }        logger.info("Waiting for threads in pool to die.");        final int waitTime = 50;        final int times = 1000 / waitTime; // allow a full second of waiting!        int time = 0;        Thread[] pool = new Thread[poolSize];        while (true) {            try {                pool[0] = null;                threadPool.enumerate(pool, false);                if (pool[0] == null)                    break;                else                    logger                            .debug("There's still some threads running. Doing another loop..");                // Break out if it looks like we're stuck in an infinite loop                if (time >= times)                    break;                // What's a good time to wait for more threads to terminate?                Thread.sleep(waitTime);                synchronized (queue) {                    queue.notifyAll();                }            } catch (InterruptedException x) {            }        }        if (pool[0] != null)            forceThreadExit();    }    private void forceThreadExit() {        logger.debug("Interrupting all remaining dispatcher threads.");        // this should wake any threads blocked on an object or sleeping..        threadPool.interrupt();        try {            // 500 milliseconds - an eternity            Thread.sleep(500);        } catch (InterruptedException x) {        }        synchronized (queue) {            queue.notifyAll();        }        if (threadPool.activeCount() > 0) {            logger                    .error("Some dispatcher threads are refusing to die. I give up.");            if (logger.isDebugEnabled()) {                Thread[] pool = new Thread[threadPool.activeCount()];                threadPool.enumerate(pool, false);                logger.debug("Still-active threads:");                for (int i = 0; i < pool.length; i++) {                    logger.debug("  " + pool[i].getName());                }            }        }    }    public void addObserver(ConnectionObserver observer) {        synchronized (observers) {            if (!observers.contains(observer))                observers.add(observer);        }    }    public void removeObserver(ConnectionObserver observer) {        synchronized (observers) {            observers.remove(observer);        }    }    public Iterator observerIterator() {        return (((ArrayList) observers.clone()).iterator());    }    public boolean contains(ConnectionObserver observer) {        synchronized (observers) {            return (observers.contains(observer));        }    }    // notifyObservers is always single-threaded access as there's only 1    // receiver thread!    public void notifyObservers(Connection conn, SMPPEvent e) {        if (logger.isDebugEnabled())            logger.debug("Notifying observers of a new SMPP event "                    + e.getType());        queue.put(conn, e);        if (threadsWaiting > 0) {            synchronized (queue) {                queue.notify();            }        }    }    public void notifyObservers(Connection conn, SMPPPacket pak) {        if (logger.isDebugEnabled()) {            logger.debug("Notifying observers of a new SMPP packet ("                    + Integer.toHexString(pak.getCommandId()) + ","                    + Integer.toString(pak.getSequenceNum()) + ")");        }        queue.put(conn, pak);        if (threadsWaiting > 0) {            synchronized (queue) {                queue.notify();            }        }    }    public void run() {        NotificationDetails nd;        ConnectionObserver observer;        try {            logger.debug("Thread " + Thread.currentThread().getName()                    + " started");            while (running) {                nd = null;                synchronized (queue) {                    if (queue.isEmpty()) {                        threadsWaiting++;                        queue.wait();                        threadsWaiting--;                    }                    nd = queue.get();                }                if (nd == null)                    continue;                // Get a shallow copy of the registered observers and iterate                // over it                Iterator i = ((ArrayList) observers.clone()).iterator();                while (i.hasNext()) {                    observer = (ConnectionObserver) i.next();                    if (nd.event == null)                        observer.packetReceived(nd.conn, nd.pak);                    else                        observer.update(nd.conn, nd.event);                } // end while            } // end while            logger.debug("Thread " + Thread.currentThread().getName()                    + " exiting");        } catch (Exception x) {            logger.warn("Exception in dispatcher thread", x);        }    }    private class NotificationDetails {        public Connection conn = null;        public SMPPEvent event = null;        public SMPPPacket pak = null;        public NotificationDetails() {        }        public void setDetails(Connection c, SMPPEvent e, SMPPPacket p) {            conn = c;            event = e;            pak = p;        }    }    /**     * A simple implementation of a FIFO queue. Need this to be as minimal as     * possible so it's zippidy quick. No synchronization is done here, it's     * handled by the relevant ThreadedEventDispatcher methods.     */    private class FIFOQueue {        private int head = 0, tail = 0;        private NotificationDetails[] queue = null;        public FIFOQueue(int queueSize) {            if (queueSize < 1)                queueSize = 100;            queue = new NotificationDetails[queueSize];            for (int i = 0; i < queueSize; i++)                queue[i] = new NotificationDetails();        }        public void put(Connection c, SMPPPacket p) throws QueueFullException {            if (isFull()) {                throw new QueueFullException();            }            queue[tail++].setDetails(c, null, p);            if (tail >= queue.length)                tail = 0;        }        public void put(Connection c, SMPPEvent e) throws QueueFullException {            if (isFull()) {                throw new QueueFullException();            }            queue[tail++].setDetails(c, e, null);            if (tail >= queue.length)                tail = 0;        }        public NotificationDetails get() {            NotificationDetails nd = null;            if (!isEmpty()) {                nd = queue[head++];                if (head >= queue.length)                    head = 0;            }            return (nd);        }        public boolean isEmpty() {            return (tail == head);        }        public boolean isFull() {            if (tail > head)                return ((tail == queue.length - 1) && head == 0);            else                return (tail == (head - 1));        }    }    private class QueueFullException extends RuntimeException {        public QueueFullException() {        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -