📄 objectdbsender.java
字号:
package com.wireless.sms.gwif.smsagent.workthread;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.StringReader;
import java.util.Hashtable;
import java.util.Map;
import java.util.Vector;
import org.apache.log4j.xml.DOMConfigurator;
import com.wireless.gwif.socketconn.Client;
import com.wireless.gwif.socketconn.ClientDefaultSendTemplet;
import com.wireless.gwif.socketconn.ObjectSendAdapter;
import com.wireless.sms.gwif.smsagent.global.LoggerConstant;
import com.wireless.sms.gwif.smsagent.global.SmsGWIFGlobal;
import com.wireless.sms.pub.entity.MT;
import com.wireless.sms.pub.entity.Stat;
import com.wireless.sms.pub.mq.ObjectQueue;
public class ObjectDbSender {
private static ObjectQueue objqueue = ObjectQueue.getInstance();
private static Map map = new Hashtable();
private static Vector vosa = new Vector();
private static boolean startFlag = false;
private static long INTERVAL = 60000;
public static void start(String[] hosts, String[] ports) {
LoggerConstant.stat_log.info("启动入库线程....");
if( startFlag )
return;
else
startFlag = true;
for(int i=0; i<hosts.length && i<ports.length; i++){
ObjectSendAdapter osa = new ObjectSendAdapter();
ObjectSendAdapter.Templet send = osa.new Templet() {
public void processSend() {
try{
SendMonitor.getInstance().setSendObjFlag(true);
if (objqueue.size() > 0 && !this.isClosed()) {
Object obj = objqueue.removeNoWait();
if (obj == null)
return;
boolean sendFlag = send(obj);
if (!sendFlag) {
objqueue.add(obj);
SmsGWIFGlobal.sendMonitor("K000101");
log.error("gateway : " + SmsGWIFGlobal.getInstance().GATEWAY +
" fail send obj to db module!");
}
}
}catch(Exception e){
log.error("Exception : ", e);
}
}
/**
* processException
* @param cause Throwable
*/
public void processException(Throwable cause) {
log.error("Exception : ", cause);
if (cause instanceof java.io.IOException || cause instanceof java.lang.IllegalStateException) {
log.info("Exception occur, restart ... " + this);
stopSendT();
if( this.idleFlag ){
SmsGWIFGlobal.sendMonitor("K000101");
restart(this);
}
}
}
public void processClosed() {
stopSendT();
if (this.idleFlag) {
log.info("Connection closed, restart ... " + this);
SmsGWIFGlobal.sendMonitor("K000101");
restart(this);
}
}
};
send.setLog(LoggerConstant.mo_log);
osa.start(send, hosts[i], ports[i]);
vosa.add(osa);
}
}
private synchronized static void restart(ClientDefaultSendTemplet templet){
if( map.containsKey(templet) ){
long time = ((Long)map.get(templet)).longValue();
long time_interval = System.currentTimeMillis() - time;
if( time_interval < (INTERVAL/2) ){
LoggerConstant.mo_log.info("Same templet restart in " + (INTERVAL/2) + " milliseconds, discard ...");
return;
}
}
map.put(templet, new Long(System.currentTimeMillis()));
final ClientDefaultSendTemplet tmpTemplet = templet;
Thread restartThread = new Thread(){
public void run(){
try{
for (int i = 0; i < vosa.size(); i++) {
Client client = ( (ObjectSendAdapter) vosa.elementAt(i)).getClient();
if (client.getConnTemplet() == tmpTemplet) {
try {Thread.sleep(INTERVAL);}catch (Exception e){}
client.restart();
LoggerConstant.mo_log.info("Client witch has templet " + client.getConnTemplet() + " restart over ...");
break;
}
}
}catch(Exception e){
LoggerConstant.mo_log.error("DBModule Exception", e);
}
}
};
restartThread.setDaemon(true);
restartThread.start();
}
public static void stop(){
if (!startFlag) {
return;
}
try{
for (int i = 0; i < vosa.size(); i++) {
ObjectSendAdapter tmposa = (ObjectSendAdapter) vosa.elementAt(i);
if (startFlag && tmposa != null) {
tmposa.stop();
}
}
}catch(Exception e){
LoggerConstant.mo_log.error("Exception : ", e);
}
startFlag = false;
}
public static void main(String args[]){
DOMConfigurator.configure(System.getProperty("log4j"));
try {
SmsGWIFGlobal agent = SmsGWIFGlobal.getInstance();
agent.initYD_System();
} catch (Throwable ex) {
ex.printStackTrace();
}
// ClientController.start();
// ObjectDbSender.start(new String[]{"192.168.2.85", "192.168.2.85", "192.168.2.85", "192.168.2.85" ,"192.168.2.85"}, new String[]{"9000", "9001", "9002", "9003", "9004"});
ObjectDbSender.start(new String[]{"192.168.1.87", "192.168.1.87", "192.168.1.87", "192.168.1.87" ,"192.168.1.87"}, new String[]{"9035", "9036", "9037", "9038", "9039"});
new Thread(){
public void run(){
int sum = 1000;
for(int i=0; i<sum; i++){
MT mt = new MT();
mt.setMsgContent("MT 信息 : " + i + " 测试状态");
mt.setFeeTermID("01011111112");
mt.setMtType((i%7) + "");
// mt.setMtType("2");
mt.setMsgID("TestForInsertDataToDatabase " + i);
ObjectQueue.getInstance().add(mt);
}
int errstatsum = 0;
for(int i=0; i<errstatsum; i++){
Stat stat = new Stat();
stat.setStat("DELIVRD");
stat.setGatewayID("999");
stat.setMsgContent("测试信息 " + i + " 发送成功");
stat.setMsgID("TestForInsertErrorDataToDatabase " + i);
ObjectQueue.getInstance().add(stat);
}
int statsum = 1000;
for(int i=0; i<statsum; i++){
Stat stat = new Stat();
stat.setStat("DELIVRD");
stat.setGatewayID("888");
stat.setMsgContent("测试信息 " + i + " 发送成功");
stat.setMsgID("TestForInsertDataToDatabase " + i);
ObjectQueue.getInstance().add(stat);
}
}
}.start();
// ObjectDbSender.start(SmsGWIFGlobal.OBJOUT_IP, SmsGWIFGlobal.OBJOUT_PORT);
// ObjectDbSender.start(new String[]{"192.168.2.85"}, new String[]{"9000"});
// ObjectDbSender.start(new String[]{"192.168.2.85", "192.168.2.85", "192.168.2.85", "192.168.2.85" ,"192.168.2.85"}, new String[]{"9000", "9001", "9002", "9003", "9004"});
// ObjectDbSender.start(new String[]{"127.0.0.1"}, new String[]{"9000"});
// ObjectDbSender.start(new String[]{
// "192.168.1.187",
// "192.168.1.187",
// "192.168.1.187",
// "192.168.1.187",
// "192.168.1.187"},
// new String[]{
// "9000",
// "9001",
// "9002",
// "9003",
// "9004"});
// ObjectDbSender.start(new String[]{"192.168.1.87"}, new String[]{"8005"});
// ObjectDbSender.start(new String[]{
// "218.97.254.134",
// "218.97.254.134",
// "218.97.254.134",
// "218.97.254.134",
// "218.97.254.134",
// "218.97.254.134",
// "218.97.254.134",
// "218.97.254.134",
// "218.97.254.134",
// "218.97.254.134",}, new String[]{
// "9000",
// "9005",
// "9010",
// "9015",
// "9020",
// "9025",
// "9030",
// "9035",
// "9040",
// "9045",
// });
// ObjectDbSender.start(new String[]{"211.94.156.185", "211.94.156.185",
// "211.94.156.185", "211.94.156.185",
// "211.94.156.185", "211.94.156.185",
// "211.94.156.185", "211.94.156.185",
// "211.94.156.185", "211.94.156.185"}, new String[]{"9000", "9001", "9002", "9003", "9004", "9005", "9006", "9007", "9008", "9009"});
// ObjectDbSender.start(new String[]{"211.94.156.185"}, new String[]{"9000"});
// new Thread(){
// public void run(){
// try{
// Thread.currentThread().sleep(30*1000);
// }catch(Exception e){}
//
// ObjectDbSender.stop();
// ClientController.stop();
// }
// }.start();
}
// public static void main(String args[]){
// DOMConfigurator.configure(System.getProperty("log4j"));
//
// try {
// SmsGWIFGlobal agent = SmsGWIFGlobal.getInstance();
// agent.initYD_System();
// } catch (Throwable ex) {
// ex.printStackTrace();
// return;
// }
//
// ObjectDbSender sender = new ObjectDbSender();
// sender.doReSave();
//
// ObjectDbSender.start(SmsGWIFGlobal.OBJOUT_IP, SmsGWIFGlobal.OBJOUT_PORT);
// }
public void doReSave(){
ObjectQueue queue = ObjectQueue.getInstance();
File file = new File("D:\\work\\sms\\javasms\\gwifnewest\\resave\\mtlog.txt");
new ReSaveThread(queue, file, 0);
}
class ReSaveThread extends Thread{
private ObjectQueue queue = null;
private java.io.File file = null;
private long lastModify = 0;
private int counter = 0;
public ReSaveThread(ObjectQueue queue,java.io.File file, long lastModify){
this.queue = queue;
this.file = file;
this.lastModify = lastModify;
start();
}
public void run(){
while(true){
if( file.lastModified() > lastModify ){
BufferedReader reader = null;
try{
reader = new BufferedReader(new FileReader(file));
String msg = null;
while( (msg = reader.readLine() ) != null ){
int index = msg.indexOf("<?xml");
if( index != -1 ){
msg = msg.substring(index);
Object obj = null;
if( msg.indexOf("<mt><mtID>") != -1 ){
obj = MT.getInstance(msg);
}
else{
obj = getStat(msg);
}
if( obj != null ){
LoggerConstant.mo_log.info("msg " + (++counter) + " : " + obj.toString());
queue.add(obj);
if( queue.size() > 5000 ){
try{
Thread.sleep(10000);
}catch(Exception e){}
}
}
}
else{
LoggerConstant.mo_log.info("当前匹配失败 : " + msg);
}
}
lastModify = file.lastModified();
}
catch(Exception e){
e.printStackTrace();
}
finally{
if( reader != null ){
try { reader.close(); } catch (IOException ex) { }
}
}
}
System.out.println("处理完一轮 ...");
try{
Thread.sleep(5000);
}catch(Exception e){}
}
}
public Stat getStat(String xmlContent){
Stat stat = null;
org.apache.commons.betwixt.io.BeanReader beanReader = new org.apache.commons.betwixt.io.BeanReader();
beanReader.getXMLIntrospector().setAttributesForPrimitives(false);
beanReader.setMatchIDs(false);
try {
beanReader.registerBeanClass("stat", com.wireless.sms.pub.entity.Stat.class);
stat = (Stat) beanReader.parse(new StringReader(new String(xmlContent)));
stat.setSubmitTime(null);
stat.setDoneTime("");
}
catch (java.beans.IntrospectionException ex2) {
ex2.printStackTrace();
}
catch (org.xml.sax.SAXException ex) {
ex.printStackTrace();
}
catch (IOException ex) {
ex.printStackTrace();
}
return stat;
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -