📄 billhandlermanager.java.svn-base
字号:
package com.onet.autobill.bill;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import com.onet.autobill.bill.common.ResourceCollect;
import com.onet.autobill.bill.convert.ConvertFilter;
import com.onet.autobill.db.DbService;
import com.onet.autobill.model.MachineConfig;
import com.onet.autobill.util.CmdHelper;
import com.onet.autobill.util.DateFormat;
/**
* BILL话单的导入管理,它负责所有存放BILL服务器上的所有BILL话单的倒库
*
* @author mengwei
*
*/
public class BillHandlerManager {
private static Logger logger = Logger.getLogger(BillHandlerManager.class);
private int timeOutForImportBill = 90 * 60;
private DbService dbService;
private ResourceCollect resourceCollect = new ResourceCollect();
private ExecutorService executorService;
private List<ConvertFilter> convertFilterList;
private Future<Boolean> futures[];
private boolean beWantMutiThread = true;
private boolean totelRun =false;
private boolean updateRun = false;
private Date minTime = new Date();
/**
* 临时的存放文件目录
*/
private String copyPath;
/**
* 日志转换后文件(倒入数据库文件)存放的目录
*/
private String targetPath;
public BillHandlerManager() {
}
/**
* 将所有已注册BILL服务器上的BILL文件导入到数据库中
*/
public boolean importBills2DB() {
if(dbService==null){
return false;
}
// 获得有哪些服务器上的BILL需要导入
List<MachineConfig> machineConfigList = dbService.getAllMachineConfigs();
if (machineConfigList==null||machineConfigList.size() == 0) {
// 无倒库配置
// dbService.configInit();
return false;
}
if (importBills2DB(machineConfigList)) {
dbService.updateDeliverInfo(this.minTime);
logger.debug(updateRun + "" + totelRun);
if (updateRun) {
dbService.updateSubmitStatusAndProductId(this.minTime);
}
if (totelRun) {
dbService.doAutoTestStat(this.minTime);
}
// logger.info("开始更改状态值");
// AutoChangeMessage autoChangeMessage = new AutoChangeMessage ();
// autoChangeMessage.work();
// logger.info("更改状态值成功");
}
return true;
}
/**
* 开始运行多个倒库线程
* @param machineConfigList 网关配置信息
* @return
*/
private boolean importBills2DB(List<MachineConfig> machineConfigList) {
logger.debug("取得需要倒库的服务器个数:" + machineConfigList.size());
if (false == initResourceCollect()) {
return false;
}
logger.info("开始倒日志任务服务,时间为:"
+ DateFormat.DateFormatToString(new Date(), DateFormat.TYPE_S));
// 系统希望以多线程的方式运转倒库,计算需要的线程个数
if (true == beWantMutiThread) {
this.createThreadPool(machineConfigList);
}
this.startThread(machineConfigList);
// 若是多线程进行,则在此等待各个工作者线程结束
if (true == beWantMutiThread) {
waitImportThreadsFinished();
}
return true;
}
@SuppressWarnings("unchecked")
private void createThreadPool(List<MachineConfig> machineConfigList){
Set<String> machines = new HashSet<String>();
for (MachineConfig machineConfig : machineConfigList) {
initMinTime(machineConfig.getLastImportTime());
if (machines.contains(machineConfig.getMachineName()) == false) {
machines.add(machineConfig.getMachineName());
}
}
logger.debug("machines.size() is :"+machines.size());
executorService = Executors.newFixedThreadPool(machines.size());
futures = new Future[machineConfigList.size()];
}
/**
* 执行倒库线程
* @param machineConfigList
*/
private void startThread(List<MachineConfig> machineConfigList) {
for (int count = 0; count < machineConfigList.size(); count++) {
MachineConfig machineConfig = machineConfigList.get(count);
logger.debug(machineConfig.getMachineLogPath()+"路径线程开始启动");
String fileCopyPath = copyPath + "\\" + count + "\\";
String fileTargetPath = targetPath + "\\" + count + "\\";
CmdHelper.makeFolder(fileCopyPath);
CmdHelper.makeFolder(fileTargetPath);
BillHandlerThread billHandlerThread = new BillHandlerThread(
machineConfig, fileCopyPath, fileTargetPath);
billHandlerThread.setDbService(dbService);
billHandlerThread.setConvertFilterList(convertFilterList);
// 系统希望以多线程的方式运转倒库
if (true == beWantMutiThread) {
futures[count] = executorService.submit(billHandlerThread);
} else { // 否则就按照串行方式处理倒库
try {
billHandlerThread.call();
} catch (Exception e) {
logger.error("MachineId为:" + machineConfig.getMachineId()
+ "线程调用失败", e);
}
}
}
}
/**
* 加载数据资源,加载消息转换队列
*
* @return
*/
private boolean initResourceCollect() {
if (convertFilterList == null) {
logger.error("无消息转换队列,无法进行数据转换操作");
return false;
}
logger.debug("加载路由,号段,省份,产品!");
if (!resourceCollect.initResource(dbService)) {
return false;
}
for (ConvertFilter convertFilter : convertFilterList) {
convertFilter.initResource(resourceCollect);
}
return true;
}
/**
* 创建必要的临时目录和保存二次加工的文件使用的目录
*
* @return true 创建成功; false 创建失败
*/
private boolean makeLogFolder() {
if(copyPath==null||targetPath==null){
logger.error("拷贝路径或目标路径没有配置,无法启动倒库");
return false;
}
logger.debug("copyPath is :"+copyPath);
logger.debug("targetPath is :"+targetPath);
// 先清理目录
CmdHelper.clearDirectory(copyPath);
CmdHelper.clearDirectory(targetPath);
// 创建目录
if (CmdHelper.makeFolder(copyPath) == false
|| CmdHelper.makeFolder(targetPath) == false) {
logger.error("创建目录结构失败");
return false;
}
return true;
}
/**
* 判断倒库任务是否已经完成
*
* @return true 已经完成, false 没有完成
*/
private boolean waitImportThreadsFinished() {
if (futures == null || futures.length == 0) {
return true;
}
logger.debug("线程个数:" + futures.length);
for (int i = 0; i < futures.length; i++) {
Future<Boolean> f = futures[i];
try {
logger.debug("查看倒库任务是否已经完成");
f.get(timeOutForImportBill, TimeUnit.SECONDS);
logger.debug("倒库线程已经完成任务");
} catch (Exception e) {
logger.error("", e);
}
}
return true;
}
public boolean initService() {
if (makeLogFolder() == false||initResourceCollect()==false) {
return false;
}
return true;
}
/**
* 关闭管理模块
*/
public void closeManager() {
if (executorService != null) {
executorService.shutdownNow();
}
if (copyPath != null && copyPath.equals("") == false) {
CmdHelper.clearDirectory(copyPath);
}
if (targetPath != null && targetPath.equals("") == false) {
CmdHelper.clearDirectory(targetPath);
}
}
private void initMinTime(Date lastDateTime){
if(lastDateTime!=null){
if(lastDateTime.before(this.minTime)){
this.minTime = lastDateTime;
}
}
}
public void setDbService(DbService dbService) {
this.dbService = dbService;
}
public void setCopyPath(String copyPath) {
this.copyPath = copyPath;
}
public void setTargetPath(String targetPath) {
this.targetPath = targetPath;
}
public void setBeWantMutiThread(boolean beWantMutiThread) {
this.beWantMutiThread = beWantMutiThread;
}
public void setTimeOutForImportBill(int timeOutForImportBill) {
this.timeOutForImportBill = timeOutForImportBill;
}
public void setConvertFilterList(List<ConvertFilter> convertFilterList) {
this.convertFilterList = convertFilterList;
}
public void setTotelRun(boolean totelRun) {
this.totelRun = totelRun;
}
public void setUpdateRun(boolean updateRun) {
this.updateRun = updateRun;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -