⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 threadedeventdispatcher.java

📁 是关于smpp通信协议的程序,本人用java编写的,可以测试和其他调试
💻 JAVA
字号:
package ie.omk.smpp.event;import ie.omk.smpp.Connection;import ie.omk.smpp.message.SMPPPacket;import ie.omk.smpp.util.APIConfig;import ie.omk.smpp.util.PropertyNotFoundException;import java.util.ArrayList;import java.util.Iterator;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;/** An event dispatcher that does not block the receiver daemon thread. * <p> * <font size="+2"><b>This class is highly experimental</b></font>. * </p> * This dispatcher class has a pool of dispatcher threads. When an event is * delivered to this dispatcher by the receiver thread, it is added to a queue * and the method returns immediately. One dispatcher thread is then selected * and that thread is responsible for delivering the event to all registered * observers.  The receiver daemon is then free to continue adding new events to * the event queue, which will be processed by a thread in the thread pool. * @author Oran Kelly */public class ThreadedEventDispatcher implements EventDispatcher, Runnable {    private Log logger = LogFactory.getLog(ThreadedEventDispatcher.class);    /** Runner flag. If set to false, all dispatcher threads will exit on next     * iteration (some may be blocked on the queue).     */    private boolean running = true;    /** Size of the thread pool.     */    private int poolSize = 0;    /** Pool of event dispatcher threads.     */    private ThreadGroup threadPool = new ThreadGroup("DispatcherPool");    /** FIFO queue of packets and SMPP events.     */    private FIFOQueue queue = null;    /** Number of threads currently blocked on the queue.     */    private int threadsWaiting = 0;    /** List of observers registered for event delivery.     */    private ArrayList observers = new ArrayList();    /** Create a new threaded event dispatcher object.     */    public ThreadedEventDispatcher() {    }    /** Initialise this event dispatcher. This method will retrieve the size of     * the thread pool and FIFO queue from the API configuration and initialise     * both. See {@link ie.omk.smpp.util.APIConfig} class documentation for the     * appropriate configuration properties to use. If the properties are not     * found in the configuration, the current defaults are a thread pool size     * of <code>3</code> and a FIFO queue size of <code>100</code>.     */    public void init() {	int queueSize;	try {	    APIConfig cfg = APIConfig.getInstance();	    poolSize = cfg.getInt(APIConfig.EVENT_THREAD_POOL_SIZE);	    queueSize = cfg.getInt(APIConfig.EVENT_THREAD_FIFO_QUEUE_SIZE);	} catch (PropertyNotFoundException x) {	    poolSize = 3;	    queueSize = 100;	}	// The queue must be created before the thread pool is initialised!	queue = new FIFOQueue(queueSize);	initialiseThreadPool();    }    private void initialiseThreadPool() {	Thread t;	for (int i = 0; i < poolSize; i++) {	    t = new Thread(threadPool, this, "EventDispatch" + i);	    t.start();	}    }    /** Shut down all threads in the thread pool. This method will block until     * all threads have terminated properly. Applications should be careful not     * to use one of the thread pool's own threads to call this method as this     * will cause a runtime exception. How can this method wait for all the     * pool's threads to die if one of the pool's threads is executing this     * method?     */    public void destroy() {	logger.debug("Shutting down dispatch threads.");	// This could happen if an application attempts to set a new event	// dispatcher during event processing. There are probably many other	// ways this call-back could happen but it shouldn't!	if (Thread.currentThread().getThreadGroup() == threadPool) {	    logger.error("Cannot shut down the thread pool with one of it's own threads.");	    throw new RuntimeException();	}	running = false;	synchronized (queue) {	    queue.notifyAll();	}	logger.info("Waiting for threads in pool to die.");	final int waitTime = 50;	final int times = 1000 / waitTime; // allow a full second of waiting!	int time = 0;	Thread[] pool = new Thread[poolSize];	while (true) {	    try {		pool[0] = null;		threadPool.enumerate(pool, false);		if (pool[0] == null)		    break;		else		    logger.debug("There's still some threads running. Doing another loop..");		// Break out if it looks like we're stuck in an infinite loop		if (time >= times)		    break;		// What's a good time to wait for more threads to terminate?		Thread.sleep(waitTime);		synchronized (queue) {		    queue.notifyAll();		}	    } catch (InterruptedException x) {	    }	}	if (pool[0] != null)	    forceThreadExit();    }    private void forceThreadExit() {	logger.debug("Interrupting all remaining dispatcher threads.");	// this should wake any threads blocked on an object or sleeping..	threadPool.interrupt();	try {	    // 500 milliseconds - an eternity	    Thread.sleep(500);	} catch (InterruptedException x) {	}	synchronized (queue) {	    queue.notifyAll();	}	if (threadPool.activeCount() > 0) {	    logger.error("Some dispatcher threads are refusing to die. I give up.");	    if (logger.isDebugEnabled()) {		Thread[] pool = new Thread[threadPool.activeCount()];		threadPool.enumerate(pool, false);		logger.debug("Still-active threads:");		for (int i = 0; i < pool.length; i++) {		    logger.debug("  " + pool[i].getName());		}	    }	}    }    public void addObserver(ConnectionObserver observer) {	synchronized (observers) {	    if (!observers.contains(observer))		observers.add(observer);	}    }    public void removeObserver(ConnectionObserver observer) {	synchronized (observers) {	    observers.remove(observer);	}    }    public Iterator observerIterator() {	return (((ArrayList)observers.clone()).iterator());    }    public boolean contains(ConnectionObserver observer) {	synchronized (observers) {	    return (observers.contains(observer));	}    }    // notifyObservers is always single-threaded access as there's only 1    // receiver thread!    public void notifyObservers(Connection conn, SMPPEvent e) {	if (logger.isDebugEnabled())	    logger.debug("Notifying observers of a new SMPP event " + e.getType());	queue.put(conn, e);	if (threadsWaiting > 0) {	    synchronized (queue) {		queue.notify();	    }	}    }    public void notifyObservers(Connection conn, SMPPPacket pak) {	if (logger.isDebugEnabled()) {	    logger.debug("Notifying observers of a new SMPP packet ("		    + Integer.toHexString(pak.getCommandId()) + ","		    + Integer.toString(pak.getSequenceNum()) + ")");	}	queue.put(conn, pak);	if (threadsWaiting > 0) {	    synchronized (queue) {		queue.notify();	    }	}    }    public void run() {	NotificationDetails nd;	ConnectionObserver observer;	try {	    logger.debug("Thread " + Thread.currentThread().getName() + " started");	    while (running) {		nd = null;		synchronized (queue) {		    if (queue.isEmpty()) {			threadsWaiting++;			queue.wait();			threadsWaiting--;		    }		    nd = queue.get();		}		if (nd == null)		    continue;		// Get a shallow copy of the registered observers and iterate		// over it		Iterator i = ((ArrayList)observers.clone()).iterator();		while (i.hasNext()) {		    observer = (ConnectionObserver)i.next();		    if (nd.event == null)			observer.packetReceived(nd.conn, nd.pak);		    else			observer.update(nd.conn, nd.event);		} // end while	    } // end while	    logger.debug("Thread " + Thread.currentThread().getName() + " exiting");	} catch (Exception x) {	    logger.warn("Exception in dispatcher thread", x);	}    }    private class NotificationDetails {	public Connection conn = null;	public SMPPEvent event = null;	public SMPPPacket pak = null;	public NotificationDetails() {	}	public void setDetails(Connection c, SMPPEvent e, SMPPPacket p) {	    conn = c;	    event = e;	    pak = p;	}    }    /** A simple implementation of a FIFO queue. Need this to be as minimal as     * possible so it's zippidy quick. No synchronization is done here, it's     * handled by the relevant ThreadedEventDispatcher methods.     */    private class FIFOQueue {	private int head = 0, tail = 0;	private NotificationDetails[] queue = null;	public FIFOQueue(int queueSize) {	    if (queueSize < 1)		queueSize = 100;	    queue = new NotificationDetails[queueSize];	    for (int i = 0; i < queueSize; i++)		queue[i] = new NotificationDetails();	}	public void put(Connection c, SMPPPacket p) throws QueueFullException {	    if (isFull()) {		throw new QueueFullException();	    }	    queue[tail++].setDetails(c, null, p);	    if (tail >= queue.length)		tail = 0;	}	public void put(Connection c, SMPPEvent e) throws QueueFullException {	    if (isFull()) {		throw new QueueFullException();	    }	    queue[tail++].setDetails(c, e, null);	    if (tail >= queue.length)		tail = 0;	}	public NotificationDetails get() {	    NotificationDetails nd = null;	    if (!isEmpty()) {		nd = queue[head++];		if (head >= queue.length)		    head = 0;	    }	    return (nd);	}	public boolean isEmpty() {	    return (tail == head);	}	public boolean isFull() {	    if (tail > head)		return ((tail == queue.length - 1) && head == 0);	    else		return (tail == (head - 1));	}    }    private class QueueFullException extends RuntimeException {	public QueueFullException() {	}    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -