📄 processorpool.java
字号:
package com.ufmobile.platform.mstreet.processor;
import java.util.ArrayList;
import javax.persistence.PersistenceException;
import org.hibernate.exception.GenericJDBCException;
import com.ufmobile.platform.log.RunTimeLogger;
import com.ufmobile.platform.log.SysTimerLogger;
public class ProcessorPool extends Thread{
private ArrayList<ProcessorThread> threadList;
private int max_thread_count;
private MOProcessor moProcessor;
private MTProcessor mtProcessor;
private boolean isMO;
private long minID;
private int localId;
public ProcessorPool(int localId, MOProcessor processor, int max){
this.max_thread_count = max;
this.moProcessor = processor;
isMO = true;
threadList = new ArrayList<ProcessorThread>();
this.localId = localId;
}
public ProcessorPool(int localId, MTProcessor processor, int max){
this.max_thread_count = max;
this.mtProcessor = processor;
isMO = false;
threadList = new ArrayList<ProcessorThread>();
this.localId = localId;
}
public ArrayList<ProcessorThread> getThreadList(){
return threadList;
}
public synchronized void activeByHand(){
boolean isExist = false;
for(ProcessorThread thread : threadList){
if(thread.isRun()){
isExist = true;
break;
}
}
if(!isExist){
notifyAll();
}
}
public void run(){
while(true){
try{
if(isMO){
this.minID = moProcessor.getMinID(localId);
}
else{
this.minID = mtProcessor.getMinID(localId);
}
RunTimeLogger.info(this, "the minId is " + this.minID);
if(minID > 0){
if(threadList.size() == 0){
for(int i = threadList.size(); i < max_thread_count; i++){
if(isMO){
ProcessorThread thread = new ProcessorThread(localId, moProcessor, i, this);
threadList.add(thread);
thread.start();
}
else{
ProcessorThread thread = new ProcessorThread(localId, mtProcessor, i, this);
threadList.add(thread);
thread.start();
}
}
}
else{
for(ProcessorThread thread : threadList){
thread.setRun(true);
}
}
synchronized(this){
while(true){
this.wait();
break;
}
}
}
}
catch(Exception e){
RunTimeLogger.error(this, "ProcessorPool.run因为异常停止了 ", e);
if(e instanceof PersistenceException){
try{
PersistenceException pe = (PersistenceException)e;
if(pe.getCause() != null && pe.getCause() instanceof GenericJDBCException){
ProcessorFactory.getEntityManager(true, localId);
}
}
catch(Exception ee){
RunTimeLogger.error(this, "getEntityManager(true)", ee);
}
}
}
try {
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}
synchronized long getNextID(){
long ret = minID;
minID++;
return ret;
}
}
class ProcessorThread extends Thread{
private MOProcessor moProcessor;
private MTProcessor mtProcessor;
private boolean isMO;
private int index;
private ProcessorPool pool;
private boolean isRun;
private int localId;
public boolean isRun() {
return isRun;
}
public ProcessorThread(int localId, MOProcessor processor, int index, ProcessorPool pool) {
this.moProcessor = processor;
this.isMO = true;
this.index = index;
this.pool = pool;
this.isRun = true;
this.localId = localId;
}
public ProcessorThread(int localId, MTProcessor processor, int index, ProcessorPool pool) {
this.mtProcessor = processor;
this.isMO = false;
this.index = index;
this.pool = pool;
this.isRun = true;
this.localId = localId;
}
public synchronized void setRun(boolean b){
this.isRun = b;
notifyAll();
}
public void run(){
while(true){
try{
long id = pool.getNextID();
if(isMO){
while(MOProcessor.process(localId, moProcessor,id) && isRun){
id = pool.getNextID();
}
}
else{
while(MTProcessor.process(mtProcessor,id, localId) && isRun){
id = pool.getNextID();
}
}
}
catch(Exception e){
RunTimeLogger.error(this, e.getMessage(), e);
if(isMO) {
SysTimerLogger.log("MO分发线程"+ index +"停止: " + e.getMessage());
}
else {
SysTimerLogger.log("MT分发线程"+ index +"停止:" + e.getMessage());
}
}
finally{
try{
sleep(1000);
}
catch(Exception e){
RunTimeLogger.error(this, e.getMessage(), e);
}
isRun = false;
pool.activeByHand();
synchronized(this){
while(true){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
break;
}
}
}
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -