⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 defaultdispatcher.java

📁 银行项目为后台socket通信写的程序
💻 JAVA
字号:
/****************************************************************************
 * Package		: com.ecSolutions.ecAppServer.server.session.dispatcher
 * File			: DefaultDispatcher.java
 * Create Date  : 2007-7-20
 * Author		: Steven Chen
 * 
 * Copyright(C) 2006 ecSolutions(shanghai) Co.,Limited.All Rights Reserved.
 *			
 ***************************************************************************/
package com.ecSolutions.ecAppServer.server.session.dispatcher;

import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;

import com.ecSolutions.ecAppServer.server.Session;
import com.ecSolutions.ecAppServer.server.util.Configuration;
import com.ecSolutions.ecAppServer.server.util.ElapsedTime;
import com.ecSolutions.ecAppServer.server.util.LogThreadGroup;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import edu.emory.mathcs.backport.java.util.concurrent.ArrayBlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;

/**
 * Default dispatcher implementation, support block operation such as
 * Future.complete.
 * 
 * @author Steven Chen
 * @version $Id: DefaultDispatcher.java,v 1.3 2007/07/26 05:18:09 stevenchen Exp $
 */
public class DefaultDispatcher implements Dispatcher {

	private static final Log log = LogFactory.getLog(DefaultDispatcher.class);

	private static final ThreadGroup THREAD_GROUP = new ThreadGroup(LogThreadGroup.CINDY_THREAD_GROUP, "Dispatcher");

	private static final AtomicInteger COUNTER = new AtomicInteger();

	private final Dispatcher dispatcher = new DirectDispatcher();

	private final int capacity = Math.max(1, Configuration.getDispatcherCapacity());

	private final int keepAliveTime = Math.max(0, Configuration.getDispatcherKeepAliveTime());

	private final int concurrent = Math.max(1, Configuration.getDispatcherConcurrent());

	private final Object mainLock = new Object();

	private final List idleWorkers = new LinkedList();

	private final Map sessionMap = new WeakHashMap();

	private final Worker[] activeWorkers = new Worker[concurrent];

	private int currentConcurrent = 0;

	private int indexOf(Worker worker) {
		for (int i = 0; i < concurrent; i++) {
			if (activeWorkers[i] == worker)
				return i;
		}
		return -1;
	}

	/**
	 * Dispatch session events.
	 * 
	 * @author Steven Chen
	 * @version $Id: DefaultDispatcher.java,v 1.3 2007/07/26 05:18:09 stevenchen Exp $
	 */
	private class Worker extends Thread {

		private volatile BlockingQueue queue;

		private volatile boolean blocked;

		public Worker() {
			super(THREAD_GROUP, "Dispatcher-" + COUNTER.incrementAndGet());
		}

		private void setQueue(BlockingQueue queue) {
			this.queue = (queue == null ? new ArrayBlockingQueue(capacity) : queue);
		}

		private Runnable getTask() {
			Runnable runnable = null;
			try {
				runnable = (Runnable) queue.poll(keepAliveTime, TimeUnit.MILLISECONDS);
			} catch (InterruptedException e) {
			}

			// similar double-checked locking
			if (runnable == null) {
				synchronized (mainLock) {
					runnable = (Runnable) queue.poll();
					if (runnable == null) {
						// thus getWorker method will not use this thread
						blocked = true;

						activeWorkers[indexOf(this)] = null;
						currentConcurrent--;
					}
				}
			}
			return runnable;
		}

		public void run() {
			for (Runnable task = null; (task = getTask()) != null; task = null) {
				try {
					task.run();
				} catch (Throwable e) { // protect catch
					log.error(e, e);
				}

				if (blocked) {
					synchronized (mainLock) {
						queue = null;
						idleWorkers.add(this);

						synchronized (this) {
							try {
								wait(keepAliveTime);
							} catch (InterruptedException e) {
							}
						}

						if (idleWorkers.remove(this)) // timeout
							break;
						else
							blocked = false; // have new task
					}
				}
			}
		}

	}

	private int index = -1; // choose worker index

	private Worker getWorker(Session session) {
		// get last worker
		Worker worker = (Worker) sessionMap.get(session);
		if (worker != null && !worker.blocked)
			return worker;

		if (currentConcurrent >= concurrent) {
			// reach concurrent limit
			index = (index + 1) % concurrent;
			worker = activeWorkers[index];
		} else {
			// create new worker
			worker = newWorker(null);
			activeWorkers[indexOf(null)] = worker;
			currentConcurrent++;
		}

		sessionMap.put(session, worker);
		return worker;
	}

	private Worker newWorker(BlockingQueue queue) {
		Worker worker = null;
		if (idleWorkers.isEmpty()) {
			worker = new Worker();
			worker.setQueue(queue);
			worker.start();
		} else { // reuse idle worker
			worker = (Worker) idleWorkers.remove(idleWorkers.size() - 1);
			worker.setQueue(queue);
			synchronized (worker) {
				worker.notify();
			}
		}
		return worker;
	}

	private final ElapsedTime elapsedTime = new ElapsedTime();

	public void dispatch(Session session, Runnable event) {
		Worker worker = null;
		synchronized (mainLock) {
			worker = getWorker(session);
		}
		if (Thread.currentThread() == worker) {
			dispatcher.dispatch(session, event);
		} else {
			BlockingQueue queue = worker.queue;
			if (!queue.offer(event)) {
				// flow control
				if (elapsedTime.getElapsedTime() >= 10000) {
					elapsedTime.reset();
					log.warn("dispatcher flow control");
				}
				try {
					queue.put(event);
				} catch (InterruptedException e) {
				}
			}
		}
	}

	public void block() {
		Thread currentThread = Thread.currentThread();
		if (currentThread instanceof Worker) {
			Worker worker = (Worker) currentThread;
			worker.blocked = true;

			synchronized (mainLock) {
				Worker newWorker = newWorker(worker.queue);
				activeWorkers[indexOf(worker)] = newWorker;
			}
		} else {
			dispatcher.block();
		}
	}

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -