⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 processorpool.java

📁 国内很牛的软件公司花费两年半开发的用EJB3开发的代码,采用STRUTS和EJB3,目前系统进行第二版.所以拿出来共享
💻 JAVA
字号:
package com.ufmobile.platform.mstreet.processor;

import java.util.ArrayList;

import javax.persistence.PersistenceException;

import org.hibernate.exception.GenericJDBCException;

import com.ufmobile.platform.log.RunTimeLogger;
import com.ufmobile.platform.log.SysTimerLogger;

public class ProcessorPool extends Thread{
	private ArrayList<ProcessorThread> threadList;
	private int max_thread_count;
	private MOProcessor moProcessor;
	private MTProcessor mtProcessor;
	private boolean isMO;
	private long minID;
	private int localId;
	public ProcessorPool(int localId, MOProcessor processor, int max){
		this.max_thread_count = max;
		this.moProcessor = processor;
		isMO = true;
		threadList = new ArrayList<ProcessorThread>();
		this.localId = localId;
	}
	public ProcessorPool(int localId, MTProcessor processor, int max){
		this.max_thread_count = max;
		this.mtProcessor = processor;
		isMO = false;
		threadList = new ArrayList<ProcessorThread>();
		this.localId = localId;
	}
	public ArrayList<ProcessorThread> getThreadList(){
		return threadList;
	}
	public synchronized void activeByHand(){
		boolean isExist = false;
		for(ProcessorThread thread : threadList){
			if(thread.isRun()){
				isExist = true;
				break;
			}
		}
		if(!isExist){
			notifyAll();
		}
	}
	public void run(){
		while(true){
			try{
				if(isMO){
					this.minID = moProcessor.getMinID(localId);
				}
				else{
					this.minID = mtProcessor.getMinID(localId);
				}
				RunTimeLogger.info(this, "the minId is " + this.minID);				
				if(minID > 0){
					if(threadList.size() == 0){					
						for(int i = threadList.size(); i < max_thread_count; i++){		
							if(isMO){
								ProcessorThread thread = new ProcessorThread(localId, moProcessor, i, this);
								threadList.add(thread);
								thread.start();
							}
							else{
								ProcessorThread thread = new ProcessorThread(localId, mtProcessor, i, this);
								threadList.add(thread);
								thread.start();
							}						
						}
					}
					else{
						for(ProcessorThread thread : threadList){
							thread.setRun(true);
						}
					}					
					synchronized(this){
						while(true){
							this.wait();
							break;
						}
					}
				}				
			}
			catch(Exception e){
				RunTimeLogger.error(this, "ProcessorPool.run因为异常停止了 ", e);				
				if(e instanceof PersistenceException){
					try{
						PersistenceException pe = (PersistenceException)e;
						if(pe.getCause() != null && pe.getCause() instanceof GenericJDBCException){
							ProcessorFactory.getEntityManager(true, localId);
						}
					}
					catch(Exception ee){
						RunTimeLogger.error(this, "getEntityManager(true)", ee);
					}					
				}
			}
			try {
				sleep(1000);
			} catch (InterruptedException e) {				
				e.printStackTrace();
				break;
			}
		}
	}
	synchronized long getNextID(){
		long ret = minID;
		minID++;
		return ret;
	}
}
class ProcessorThread extends Thread{
	private MOProcessor moProcessor;
	private MTProcessor mtProcessor;
	private boolean isMO;
	private int index;
	private ProcessorPool pool;
	private boolean isRun;
	private int localId;
	public boolean isRun() {
		return isRun;
	}
	public ProcessorThread(int localId, MOProcessor processor, int index, ProcessorPool pool) {
		this.moProcessor = processor;
		this.isMO = true;
		this.index = index;
		this.pool = pool;
		this.isRun = true;	
		this.localId = localId;
	}
	public ProcessorThread(int localId, MTProcessor processor, int index, ProcessorPool pool) {
		this.mtProcessor = processor;
		this.isMO = false;
		this.index = index;
		this.pool = pool;
		this.isRun = true;		
		this.localId = localId;
	}
	public synchronized void setRun(boolean b){		
		this.isRun = b;
		notifyAll();
	}
	public void run(){
		while(true){
			try{			
				long id = pool.getNextID();
				if(isMO){
					while(MOProcessor.process(localId, moProcessor,id) && isRun){
						id = pool.getNextID();
					}
				}
				else{
					while(MTProcessor.process(mtProcessor,id, localId) && isRun){
						id = pool.getNextID();
					}
				}
			}
			catch(Exception e){
				RunTimeLogger.error(this, e.getMessage(), e);
				if(isMO) {
					SysTimerLogger.log("MO分发线程"+ index +"停止: " + e.getMessage());
				}
				else {
					SysTimerLogger.log("MT分发线程"+ index +"停止:" + e.getMessage());
				}	
			}
			finally{
				try{
					sleep(1000);
				}
				catch(Exception e){
					RunTimeLogger.error(this, e.getMessage(), e);
				}
				isRun = false;
				pool.activeByHand();
				synchronized(this){
					while(true){
						try {
							this.wait();
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
						break;
					}
				}
			}
		}
	}
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -