📄 reportrespqueue.java
字号:
package com.sms.platform.switchcenter.inner.reportrespqueue;
import java.util.ArrayList;
import org.apache.log4j.Logger;
import com.pub.berkeleydb.BklyDatabase;
import com.pub.berkeleydb.BklyEnv;
import com.pub.berkeleydb.BklyIndex;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.SecondaryCursor;
import com.sleepycat.je.Transaction;
import com.sms.protocol.standard12.Standard_Inner_Response;
public class ReportRespQueue extends BklyDatabase {
private static Logger log = Logger.getLogger(ReportRespQueue.class);
private BklyIndex _idx = null;
public ReportRespQueue() {
super._recordClass = Standard_Inner_Response.class;
super._rc = new ReportRespCreator();
}
public int open(String path, String name) {
BklyEnv env = new BklyEnv();
if (env.open(path, false, false, true, true, true) != 0) {
return -10;
}
return this.open(name, env);
}
public int open(String name, BklyEnv env) {
if (super.open(name, env) != 0) {
return -1;
}
_idx = new BklyIndex(new ReportRespTimeIndexCreator());
if (_idx.open("reportrespsendtime", this, true) != 0) {
this.close();
return -2;
}
return 0;
}
public int open(String path, String name, String indexname) {
BklyEnv env = new BklyEnv();
if (env.open(path, false, false, true, true, true) != 0) {
return -10;
}
return this.open(name, env, indexname);
}
public int open(String name, BklyEnv env, String indexname) {
if (super.open(name, env) != 0) {
return -1;
}
_idx = new BklyIndex(new ReportRespTimeIndexCreator());
if (_idx.open(indexname, this, true) != 0) {
this.close();
return -2;
}
return 0;
}
/**
* MT入队列
* @param sr MT
* @return int
*/
public /*synchronized*/ int put(Standard_Inner_Response sr) {
int result = super.put(sr);
// log.debug("put : " + sr.getMessageID() + "|" + sr.getSrcClientID() + "|"+sr.getSequenceId()+"|"+result);
return result;
}
/**
* 直接获取MT,获取队列中的第一个
* @return MT
*/
public synchronized Standard_Inner_Response pop() {
Standard_Inner_Response sr = new Standard_Inner_Response();
Standard_Inner_Response result = super.get(sr, false, true) == 0 ? sr : null;
sr = null;
return result;
}
/**
* 直接获取MT,获取队列中的第一个,赋值
* @param sr MT
* @return int
*/
public synchronized int pop(Standard_Inner_Response sr) {
int result = 0;
if (sr == null) {
result = -1;
} else {
result = super.get(sr, false, true);
}
return result;
}
/**
* MT赋key值后,在队列中找到相应的MT
* @param sr MT
* @return int
*/
public /*synchronized*/ int searchPop(Standard_Inner_Response sr) {
int result = 0;
if (sr == null) {
result = -1;
} else {
result = super.get(sr, true, true);
// log.debug("searchPop : " + sr.getMessageID() + "|" + sr.getSrcClientID() + "|"+sr.getSequenceId()+"|"+result);
}
return result;
}
private SecondaryCursor _cur = null;
private int _minDelay = 0; //second
private Transaction tnx = null;
DatabaseEntry pkey = null; // new DatabaseEntry();
/**
* 根据超时时间获取MT,每次获取一个
* @param delay int
* @return MT
*/
public Standard_Inner_Response dropFirstTimeout(int delay) throws Exception{
Standard_Inner_Response sr = null;
if (openCursor(delay) == 0) {
sr = nextCursor();
if (sr != null)
delCurKey();
closeCursor();
}
return sr;
}
/**
* 查看或删除超时数据
* @param delay int 超时时间 单位秒
* @param count int 准备查看或删除的数量
* @param del boolean 是否删除
* @return ArrayList
*/
public ArrayList dropTimeout(int delay,int count,boolean del) throws Exception{
Standard_Inner_Response sr = null;
ArrayList mtlist = new ArrayList();
if (openCursor(delay) == 0) {
for(int i = 0 ; i < count ; i++){
sr = nextCursor();
if (sr != null){
mtlist.add(sr);
if(del)
delCurKey();
} else {
break;
}
}
closeCursor();
}
return mtlist;
}
public int openCursor(int delay) {
_minDelay = delay;
try {
tnx = super.getOriginEnv().beginTransaction(null, null);
_cur = _idx.getOriginIndex().openSecondaryCursor(tnx, null);
} catch (Exception ex) {
log.error(null, ex);
}
return _cur == null ? -1 : 0;
}
public Standard_Inner_Response nextCursor() throws Exception{
if (_cur != null) {
DatabaseEntry key, /*pkey,*/ value;
key = new DatabaseEntry();
pkey = new DatabaseEntry();
value = new DatabaseEntry();
OperationStatus ret = OperationStatus.NOTFOUND;
try {
ret = _cur.getNext(key, pkey, value, LockMode.DEFAULT); //swan
} catch (DatabaseException ex) {
}
if (ret == OperationStatus.SUCCESS) {
Standard_Inner_Response sir = new Standard_Inner_Response();
_rc.fromKey(sir, pkey);
_rc.fromValue(sir, value);
if (System.currentTimeMillis() - sir.getSendTime() > _minDelay * 1000L ||
System.currentTimeMillis() - sir.getSendTime() < -100) {
return sir;
} else
closeCursor(); //swan
} else
closeCursor(); //swan
}
return null;
}
public int delCurKey() {
OperationStatus ret = OperationStatus.NOTFOUND;
if (_cur != null) {
try {
//ret = _cur.delete();
ret = this.getOriginDB().delete(tnx, pkey); //
} catch (DatabaseException ex) {
}
}
return ret == OperationStatus.SUCCESS ? 0 : -1;
}
public int closeCursor() {
if (_cur != null) {
try {
_cur.close();
tnx.commit();
} catch (DatabaseException ex) {
}
_cur = null;
tnx = null;
}
return 0;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -