📄 moinfobillhandler.java.svn-base
字号:
package com.onet.autobill.bill;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.sql.SQLException;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.apache.log4j.Logger;
import com.onet.autobill.bill.common.ArrayTool;
import com.onet.autobill.bill.common.FileType;
import com.onet.autobill.bill.convert.ConvertFilter;
import com.onet.autobill.db.DbService;
import com.onet.autobill.model.MachineConfig;
import com.onet.autobill.util.CmdHelper;
import com.onet.pojomsg.onetmsg3.SM_Base_Msg;
import com.onet.pojomsg.onetmsg3.SM_Constant;
import com.onet.pojomsg.onetmsg3.SM_Submit_Resp_Msg;
/**
* bill倒库操作类
*
* @author mengwei
*
*/
public class MoInfoBillHandler {
private static Logger logger = Logger.getLogger(MoInfoBillHandler.class);
private Map<String,SM_Base_Msg> msgMap = new LinkedHashMap<String,SM_Base_Msg>();
/**
* 输出正常消息的流句柄
*/
private DataOutputStream dataOutMsg = null;
private DbService dbService;
private List<ConvertFilter> convertFilterList;
public MoInfoBillHandler() {
}
/**
* 进行倒库操作
*
* @return true 成功 ; false 失败
*/
public boolean importBillFile2DB(String fileName,
MachineConfig machineConfig, String tmpCopyPath,
String tmpTargetPath) {
if(convertFilterList==null){
logger.error("消息转换过滤器没有配置");
return false;
}
if (!copyImportFile(fileName, machineConfig, tmpCopyPath)) {
return false;
}
FileType fileType = getBillFileType(fileName, machineConfig);
if (!init(fileName, machineConfig, tmpTargetPath, fileType)) {
return false;
}
logger.debug("开始处理原始临时拷贝文件:" + tmpCopyPath + fileName);
machineConfig.getLastAutobillNode();
int row = 0;
String[] fields = null;
int successCount = 0 ;
int autobillNode = machineConfig.getAutobillNode();
LineIterator it = null;
try {
it = FileUtils.lineIterator(new File(tmpCopyPath + fileName));
while (it.hasNext()) {
String line = it.nextLine();
if(line.equalsIgnoreCase("")){
continue;
}
// logger.debug(line);
row++;
// logger.debug("处理个数row:"+row);
// if (row <= autobillNode) {
// continue;
// }
fields = line.split("\t");
SM_Base_Msg message = copyAndConvertMsg(fields, fileType);
if (message == null) {
// logger.warn("消息转换出现异常,行数为:" + row);
continue;
}
writeToMemory(message);
successCount++;
}
logger.debug("写入内存成功");
for(Map.Entry<String, SM_Base_Msg> entry : msgMap.entrySet())
{
writeToFile(entry.getValue());
}
this.msgMap = null;
} catch (Exception e) {
logger.error("临时拷贝文件:" + tmpCopyPath + fileName+"处理失败", e);
} finally {
closeOutStream();
if (it != null)
LineIterator.closeQuietly(it);
}
logger.info("成功地完成二次加工的记录条数是:" + successCount);
//没有新的bill记录,发送提醒
if(successCount==0){
errAlertUpdate(0, machineConfig);
return false;
}
return this.importIntoDB(fileName, machineConfig, tmpCopyPath, tmpTargetPath, fileType, row);
}
/**
* 从BILL服务器上复制BILL文件到指定的目录下
*
* @return true 复制成功
*/
private boolean copyImportFile(String fileName,
MachineConfig machineConfig, String tmpCopyPath) {
logger.debug("开始复制文件:" + fileName + " FROM "
+ machineConfig.getMachineLogPath() + " TO " + tmpCopyPath
+ fileName);
try {
CmdHelper.copyFile(machineConfig.getMachineLogPath() + "\\"
+ fileName, tmpCopyPath + fileName);
} catch (Exception e) {
logger.error(machineConfig.getMachineLogPath() + "\\" + fileName
+ "文件复制到本地失败", e);
return false;
}
logger.debug(machineConfig.getMachineLogPath() + "\\" + fileName
+ "文件复制到本地成功");
return true;
}
/**
* 获得bill文件类型
*
* @param fileName
* @param machineConfig
* @return
*/
private FileType getBillFileType(String fileName,
MachineConfig machineConfig) {
if(machineConfig.getLogCode()==400){
logger.debug("BILL文件是路由中心记录上行信息类型");
return FileType.MOINFO;
}else{
logger.warn("BILL文件类型无法识别");
return FileType.UNKNOW;
}
}
/**
* 初始化输出流
*
* @param msgFileName
* @param statusFileName
* @param type
* @return
*/
private boolean initOutStream(String msgFileName,
FileType fileType) {
closeOutStream();
try {
File msgFile = new File(msgFileName);
dataOutMsg = new DataOutputStream(new FileOutputStream(msgFile));
} catch (FileNotFoundException e) {
logger.error("没有找到文件", e);
return false;
} catch (Exception e) {
logger.error("", e);
return false;
}
return true;
}
/**
* 关闭输出流
*/
private void closeOutStream() {
try {
if (dataOutMsg != null) {
dataOutMsg.close();
dataOutMsg = null;
}
} catch (IOException ex) {
logger.error("", ex);
}
}
/**
* 异常报警
*
* @param node
* @param machineConfigId
*/
private void errAlertUpdate(int node, MachineConfig machineConfig) {
dbService.errNotify(node, machineConfig);
}
/**
* 初始化数据
*
* @param fileName
* @param machineConfig
* @param tmpTargetPath
* @param fileType
* @return
*/
private boolean init(String fileName, MachineConfig machineConfig,
String tmpTargetPath, FileType fileType) {
if (fileType == FileType.UNKNOW) {
errAlertUpdate(0, machineConfig);
return false;
}
if (!initOutStream(tmpTargetPath + fileName, fileType)) {
errAlertUpdate(0, machineConfig);
return false;
}
return true;
}
/**
* 消息转换,更新一系列字段
*
* @param fields
* @param fileType
* @return
*/
private SM_Base_Msg copyAndConvertMsg(String[] fields, FileType fileType) {
try {
SM_Base_Msg message = ArrayTool.getSmMsgByArray(fields,fileType);
if(message!=null){
for(ConvertFilter convertFilter : convertFilterList){
convertFilter.convert(message);
}
}
return message;
} catch (Exception e) {
logger.warn("消息转换失败",e);
return null;
}
}
/**
* 把字符串数组写入文件
*
* @param fileType
* @param isReply
* @return
*/
private boolean writeToFile(SM_Base_Msg message) {
StringBuffer strBuff = ArrayTool.getStringBufferBySmMsg(message);
try {
if(strBuff!=null){
dataOutMsg.write(strBuff.toString().getBytes());
dataOutMsg.flush();
}
return true;
} catch (IOException e) {
logger.error("", e);
return false;
}
}
private boolean writeToMemory(SM_Base_Msg message){
if(message.getCommandID()==SM_Constant.TYPE_SM_SUBMIT_RESP){
SM_Submit_Resp_Msg moinfo = (SM_Submit_Resp_Msg)message;
msgMap.put(moinfo.getMsgID(), message);
return true;
}
return false;
}
/**
* 将BILL文件导入到数据库中
* @return
*/
private boolean importIntoDB(String fileName,
MachineConfig machineConfig, String tmpCopyPath,
String tmpTargetPath,FileType fileType,int lineNode) {
boolean importSuccess = false;
String[] splitName = fileName.split("_");
if (splitName.length < 2) {
logger.error("Bill文件名有误");
return importSuccess;
}
String dateStr ="";
int statusCode = -1;
int logCode = machineConfig.getLogCode();
//当时彩信时,文件名称格式和短信的不一样
if(logCode==320){
dateStr = splitName[1].substring(0, 6);
}else{
dateStr = splitName[2].substring(0, 6);
}
// 调用sqlserver DTS将文件的数据导入数据库
try {
// 插入DELIVER和REPORT话单
if(fileType == FileType.MOINFO){
statusCode = dbService.importDB(tmpTargetPath + fileName,
dbService.getTableName(0, logCode, 0, dateStr));
if (statusCode != 0){
logger.warn("将文件:" + tmpTargetPath+fileName + "倒入数据库时发生异常!");
errAlertUpdate(lineNode,machineConfig);
return false;
}
}
logger.debug("文件" + fileName + "导入完成");
dbService.updateMachineConfigNodeAndTime(lineNode, machineConfig);
if (statusCode == 0){
importSuccess = true;
}
}
catch (SQLException ex) {
logger.error("文件导入数据库异常", ex);
return importSuccess;
}
return importSuccess;
}
public void setDbService(DbService dbService) {
this.dbService = dbService;
}
public void setConvertFilterList(List<ConvertFilter> convertFilterList) {
this.convertFilterList = convertFilterList;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -