📄 dataqueue.java
字号:
/*
* DataQueue.java
*
* Created on 2003年3月10日, 下午2:34
*/
package collector.queue;
import java.util.*;
import collector.common.*;
import pbs.service.struct.*;
/**
*
* @author WanhJun
*/
public class DataQueue {
static int MAX_DATA_BUFFER_NUM_1 = CollectorDefine.MAX_DATA_BUFFER_NUM * 8;
static List[] m_DataBuffer = new List[1];
// static List[] m_DataBuffer = new List[CollectorDefine.MAX_THREAD_NUM+1];
static List m_DisplayBuffer = null;
static List m_FinishTaskBuffer = null;
static List m_SendedTaskBuffer = null;
//static ArrayList[] m_SendDataBuffer = new ArrayList[Constant.
// MAX_DATA_TYPE_NUM];
static ArrayList m_DataContentBuffer = null;
// static meterData m_ClearmeterData = null;
public DataQueue() {
System.out.println(" 初始化数据存储缓冲区。。。");
for (int i = 0; i < 1; i++) {
// for (int i = 0; i < CollectorDefine.MAX_THREAD_NUM+1; i++) {
m_DataBuffer[i] = null;
m_DataBuffer[i] = Collections.synchronizedList(new ArrayList(
MAX_DATA_BUFFER_NUM_1 + 1));
for (int j = 0; j < MAX_DATA_BUFFER_NUM_1; j++) {
meterData m_meterData = new meterData();
m_DataBuffer[i].add(m_meterData);
}
HeadPointInfo m_HeadPointInfo = new HeadPointInfo();
m_DataBuffer[i].add(m_HeadPointInfo);
}
// System.out.println(" 初始化数据存储缓冲区 1。。。");
m_DisplayBuffer = null;
m_DisplayBuffer = Collections.synchronizedList(new ArrayList(
CollectorDefine.MAX_DISPLAY_DATA_NUM + 1));
for (int j = 0; j < CollectorDefine.MAX_DISPLAY_DATA_NUM; j++) {
meterData m_meterData = null;
m_meterData = new meterData();
m_DisplayBuffer.add(m_meterData);
}
// System.out.println(" 初始化数据存储缓冲区 2。。。");
HeadPointInfo m_HeadPointInfo = null;
m_HeadPointInfo = new HeadPointInfo();
m_DisplayBuffer.add(m_HeadPointInfo);
m_FinishTaskBuffer = null;
m_FinishTaskBuffer = Collections.synchronizedList(new ArrayList(
CollectorDefine.MAX_FINISH_TASK_NUM + 1));
for (int j = 0; j < CollectorDefine.MAX_FINISH_TASK_NUM; j++) {
FinishTaskInfo m_FinishTaskInfo = null;
m_FinishTaskInfo = new FinishTaskInfo();
m_FinishTaskBuffer.add(m_FinishTaskInfo);
}
// System.out.println(" 初始化数据存储缓冲区 3。。。");
m_HeadPointInfo = null;
m_HeadPointInfo = new HeadPointInfo();
m_FinishTaskBuffer.add(m_HeadPointInfo);
m_SendedTaskBuffer = null;
m_SendedTaskBuffer = Collections.synchronizedList(new ArrayList(
CollectorDefine.MAX_FINISH_TASK_NUM + 1));
for (int j = 0; j < CollectorDefine.MAX_FINISH_TASK_NUM; j++) {
FinishTaskInfo m_FinishTaskInfo = null;
m_FinishTaskInfo = new FinishTaskInfo();
m_SendedTaskBuffer.add(m_FinishTaskInfo);
}
// System.out.println(" 初始化数据存储缓冲区 4。。。");
m_HeadPointInfo = null;
m_HeadPointInfo = new HeadPointInfo();
m_SendedTaskBuffer.add(m_HeadPointInfo);
// System.out.println(" 初始化数据存储缓冲区 5。。。");
m_DataContentBuffer = null;
m_DataContentBuffer = new ArrayList(MAX_DATA_BUFFER_NUM_1 + 1);
for (int j = 0; j < MAX_DATA_BUFFER_NUM_1; j++) {
meterData m_meterData = null;
m_meterData = new meterData();
m_DataContentBuffer.add(m_meterData);
}
// System.out.println(" 初始化数据存储缓冲区 6。。。");
m_HeadPointInfo = null;
m_HeadPointInfo = new HeadPointInfo();
m_DataContentBuffer.add(m_HeadPointInfo);
System.out.println(" 初始化数据存储缓冲区 over。。。");
}
public static int getDataBufferNum(int m_taskno) {
if (m_taskno >= CollectorDefine.MAX_THREAD_NUM || m_taskno < 0) {
CFunction.writeLog("dataqueue",
" getDataBufferNum In DataQueue Error #1 , m_taskno:" +
m_taskno, null);
return -1;
}
//wj add
// m_taskno = m_taskno / 10;
m_taskno = 0;
//
int m_SynFlag = -1;
int m_Num = 0;
synchronized (m_DataBuffer[m_taskno]) {
HeadPointInfo m_HeadPointInfo = (HeadPointInfo) m_DataBuffer[
m_taskno].get(MAX_DATA_BUFFER_NUM_1);
if (m_HeadPointInfo == null) {
System.out.println("getDataBufferNum In DataQueue Error #1");
return -1;
}
int m_Limit = m_HeadPointInfo.getLimit();
int m_Index = m_HeadPointInfo.getIndex();
if (m_Limit >= m_Index) {
m_Num = m_Limit - m_Index;
/* CFunction.writeLog("dataqueue",
"----getnum,m_Index:"+m_Index+"m_Limit:"+m_Limit+"m_Num:"+m_Num,null);
*/
}
else {
m_Num = MAX_DATA_BUFFER_NUM_1 - m_Index + m_Limit;
/* CFunction.writeLog("dataqueue",
"----getnum,m_Index:"+m_Index+"m_Limit:"+m_Limit+"m_Num:"+m_Num,null);
*/
}
m_SynFlag = 1;
}
if (m_SynFlag <= 0) {
System.out.println(
"getDataBufferNum In DataQueue synchronized Error ");
}
// wj add at jiangsu 20060524
// if (m_Num <=0)
// clearHashMark(m_taskno);
return m_Num;
}
public static int putDataBuffer(meterData m_pmeterData) {
if (CollectorDefine.sendflag != true) {
CFunction.writeLog("dataqueue", " putDataBuffer In DataQueue Error #0.1", null);
return -1;
}
if (m_pmeterData == null) {
CFunction.writeLog("dataqueue", " putDataBuffer In DataQueue Error #1", null);
return -1;
}
/* int m_TerminalNo = m_pmeterData.getTerminalNo();
if (m_TerminalNo >= CollectorDefine.MAX_THREAD_NUM || m_TerminalNo < 0) {
System.out.println(" 严重错误:厂号错误(putDataBuffer)");
return -1;
}
*/
int dataType = m_pmeterData.getDataType();
/* if (dataType >= Constant.MAX_DATA_TYPE_NUM) {
System.out.println(" 严重错误:无此数据类型(putDataBuffer)");
return -1;
}
*/
long m_devid = m_pmeterData.getDevId();
if (m_devid <= 0) {
CFunction.writeLog("dataqueue",
" putDataBuffer In DataQueue Error #2, meterid =" +
m_pmeterData.getMeterId() +
"m_devid= " + m_devid + "facid= " +
m_pmeterData.getFacId(), null);
return -1;
}
// wj add
if (CollectorDefine.dataAchiveTime > 0) {
if ( (m_pmeterData.getOccurTime()) < CollectorDefine.dataAchiveTime) {
CFunction.writeLog("exceed_dispatch", "id:" + m_pmeterData.toString() +
"发布时间:" +
CFunction.getFullTimeStr(CollectorDefine.
dataAchiveTime *
1000) +
"occ_time:" + m_pmeterData.getOccurTime() +
"dataAchiveTime:" + CollectorDefine.dataAchiveTime, null);
}
}
//
long m_TaskType = m_pmeterData.getTaskType();
int taskno = m_pmeterData.getFacNo();
if (taskno < 0 || taskno >= CollectorDefine.MAX_THREAD_NUM) {
CFunction.writeLog("dataqueue",
" putDataBuffer In DataQueue Error # 3,getFacNo(putDataBuffer)", null);
return CollectorDefine.PROGRAM_ERROR_1;
}
//wj add
// taskno = taskno / 10;
taskno = 0;
//
/*
// wj add at jiangsu 20060524
long termid = m_pmeterData.getTerminalId();
int old_taskno = getHaskMark( termid);
if (old_taskno <0)
putHashMark(termid,taskno);
else if (old_taskno == taskno)
{
}
else {
CFunction.writeLog("dataqueue",
" putDataBuffer In DataQueue Error # 3.3,getFacNo(putDataBuffer),termid:"+
termid + "old_taskno:"+old_taskno + "taskno:"+taskno, null);
taskno = old_taskno;
}
//
*/
// 946684800 is 2000.1.1.
if (m_TaskType != CollectorDefine.ONTIME_TASK &&
m_TaskType != CollectorDefine.SERVER_TASK &&
m_TaskType != CollectorDefine.ONTIME_SERVER_TASK) {
CFunction.writeLog("dataqueue", " putDataBuffer In DataQueue Error #4", null);
return CollectorDefine.PROGRAM_ERROR_1;
}
int m_SynFlag = -1;
// meterData m_meterData = null;
if (CollectorDefine.LOCAL_COLLECTOR != CollectorDefine.AUTO_MODE
&& CollectorDefine.LOCAL_COLLECTOR != CollectorDefine.SINGLE_MODE
&&
CollectorDefine.LOCAL_COLLECTOR != CollectorDefine.SYSTEM_CONNECT_MODE) {
// DataToFile.putDataFile(m_valueType, m_meterData);
}
else {
synchronized (m_DataBuffer[taskno]) {
HeadPointInfo m_HeadPointInfo = (HeadPointInfo) m_DataBuffer[
taskno].get(MAX_DATA_BUFFER_NUM_1);
int m_Limit = m_HeadPointInfo.getLimit();
int m_Index = m_HeadPointInfo.getIndex();
int m_Limit1 = m_Limit + 1;
if (m_Limit1 >= MAX_DATA_BUFFER_NUM_1) {
m_Limit1 = 0;
}
if (m_Limit1 == m_Index) {
return -1;
}
// wj del 20051201
// m_meterData = (meterData) m_DataBuffer[taskno].get(m_Limit1);
// m_meterData.copymeterData(m_pmeterData);
// m_DataBuffer[taskno].set(m_Limit1, m_meterData);
// wj add 20051201
m_DataBuffer[taskno].set(m_Limit1, m_pmeterData);
m_HeadPointInfo.putIndex(m_Index);
m_HeadPointInfo.putLimit(m_Limit1);
/* CFunction.writeLog("dataqueue",
"-----m_Index:"+m_Index+"m_Limit1:"+m_Limit1,null);
*/
m_DataBuffer[taskno].set(MAX_DATA_BUFFER_NUM_1,
m_HeadPointInfo);
m_SynFlag = 1;
}
if (m_SynFlag <= 0) {
CFunction.writeLog("dataqueue",
"putDataBuffer In DataQueue synchronized Error ", null);
return -1;
}
}
/* CFunction.writeLog("m_meterData.getTerminalNo():" +
m_meterData.getTerminalNo() +
"CollectorDefine.m_DataTerminal:" +
CollectorDefine.m_DataTerminal);
*/
/* if (m_pmeterData.getTerminalId() == CollectorDefine.m_DataTerminal) {
putDisplayBuffer(m_pmeterData);
}
return 1;
*/
if (CollectorDefine.GuiArray == null) {
return 1;
}
if (CollectorDefine.GuiArray.size() <= 0) {
return 1;
}
int size = CollectorDefine.GuiArray.size();
boolean find = false;
for (int kk = 0; kk < size; kk++) {
GuiObject go = (GuiObject) CollectorDefine.GuiArray.get(kk);
if (go == null) {
continue;
}
long dataterminal = go.getDataTerminal();
if (m_pmeterData.getTerminalId() == dataterminal) {
find = true;
break;
}
}
if (find == false) {
return 1;
}
putDisplayBuffer(m_pmeterData);
return 1;
}
public static int putDataBufferNotSave(meterData m_pmeterData) {
if (m_pmeterData == null) {
CFunction.writeLog("dataqueue", " putDataBuffer In DataQueue Error #1", null);
return -1;
}
int dataType = m_pmeterData.getDataType();
long m_devid = m_pmeterData.getDevId();
if (m_devid <= 0) {
CFunction.writeLog("dataqueue",
" putDataBuffer In DataQueue Error #2, meterid =" +
m_pmeterData.getMeterId() +
"m_devid= " + m_devid + "facid= " +
m_pmeterData.getFacId(), null);
return -1;
}
// wj add
if (CollectorDefine.dataAchiveTime > 0) {
if ( (m_pmeterData.getOccurTime()) < CollectorDefine.dataAchiveTime) {
CFunction.writeLog("exceed_dispatch", "id:" + m_pmeterData.toString() +
"发布时间:" +
CFunction.getFullTimeStr(CollectorDefine.
dataAchiveTime *
1000) +
"occ_time:" + m_pmeterData.getOccurTime() +
"dataAchiveTime:" + CollectorDefine.dataAchiveTime, null);
}
}
//
long m_TaskType = m_pmeterData.getTaskType();
int taskno = m_pmeterData.getFacNo();
if (taskno < 0 || taskno >= CollectorDefine.MAX_THREAD_NUM) {
CFunction.writeLog("dataqueue",
" putDataBuffer In DataQueue Error # 3,getFacNo(putDataBuffer)", null);
return CollectorDefine.PROGRAM_ERROR_1;
}
//wj add
// taskno = taskno / 10;
taskno = 0;
//
// 946684800 is 2000.1.1.
if (m_TaskType != CollectorDefine.ONTIME_TASK &&
m_TaskType != CollectorDefine.SERVER_TASK &&
m_TaskType != CollectorDefine.ONTIME_SERVER_TASK) {
CFunction.writeLog("dataqueue", " putDataBuffer In DataQueue Error #4", null);
return CollectorDefine.PROGRAM_ERROR_1;
}
// meterData m_meterData = null;
if (CollectorDefine.GuiArray == null) {
return 1;
}
if (CollectorDefine.GuiArray.size() <= 0) {
return 1;
}
int size = CollectorDefine.GuiArray.size();
boolean find = false;
for (int kk = 0; kk < size; kk++) {
GuiObject go = (GuiObject) CollectorDefine.GuiArray.get(kk);
if (go == null) {
continue;
}
long dataterminal = go.getDataTerminal();
if (m_pmeterData.getTerminalId() == dataterminal) {
find = true;
break;
}
}
if (find == false) {
return 1;
}
putDisplayBuffer(m_pmeterData);
return 1;
}
public static meterData getDataBuffer(int m_taskno) {
if (m_taskno >= CollectorDefine.MAX_THREAD_NUM || m_taskno < 0) {
CFunction.writeLog("dataqueue",
"getDataBuffer In DataQueue Error #1 , facno:" +
m_taskno, null);
return null;
}
//wj add
// m_taskno = m_taskno / 10;
m_taskno = 0;
//
int m_SynFlag = -1;
meterData m_meterData = null;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -