📄 abstractprocessingcontroller.java
字号:
package org.trinet.pcs;
import java.sql.*;
public abstract class AbstractProcessingController implements ProcessingControllerIF {
public static final int DEFAULT_THREAD_SLEEP_MILLIS = 10;
protected int defaultSleepTimeMillis;
protected ProcessControlIdentifier defaultPCID;
protected Connection connection;
protected CallableStatement pcsGetNextIdForProcessingStmt;
protected CallableStatement pcsResultIdProcessingStmt;
protected CallableStatement pcsPostIdForProcessingStmt;
protected PreparedStatement pcsGetPostIdsForDateRangeProcessingStmt;
protected PreparedStatement pcsCheckIdTableStmt;
// configure instances below in concrete subclass
protected String pcsCheckIdSQLString; // id type table specific query, usually event
protected String pcsGetPostIdsForDateRangeSQLString; // id type table specific query, usually event
protected AbstractProcessingController () {}
protected AbstractProcessingController(String defaultGroupName, String defaultThreadName, String defaultStateName) {
this(defaultGroupName, defaultThreadName, defaultStateName, DEFAULT_THREAD_SLEEP_MILLIS, null);
}
protected AbstractProcessingController(String defaultGroupName, String defaultThreadName, String defaultStateName,
int defaultSleepTimeMillis) {
this(defaultGroupName, defaultThreadName, defaultStateName, defaultSleepTimeMillis, null);
}
protected AbstractProcessingController(String defaultGroupName, String defaultThreadName, String defaultStateName,
int defaultSleepTimeMillis, Connection connection) {
this(new ProcessControlIdentifier(defaultGroupName, defaultThreadName, defaultStateName),
defaultSleepTimeMillis,
connection);
}
protected AbstractProcessingController(ProcessControlIdentifier pcId) {
this(pcId, DEFAULT_THREAD_SLEEP_MILLIS, null);
}
protected AbstractProcessingController(ProcessControlIdentifier pcId, int defaultSleepTimeMillis, Connection connection) {
setDefaultProcessControlIdentifier(pcId);
setDefaultSleepTimeMillis(defaultSleepTimeMillis);
setConnection(connection);
}
public void sleep() {
sleep(defaultSleepTimeMillis);
}
public void sleep(int millisecs) {
try {
Thread.currentThread().sleep(millisecs);
}
catch (InterruptedException ex) {
ex.printStackTrace();
}
}
public void setDefaultSleepTimeMillis(int millisecs) {
defaultSleepTimeMillis = millisecs;
}
// use default DataSource connection
public void setDefaultConnection() {
this.connection = org.trinet.jasi.DataSource.getConnection();
//org.trinet.jasi.DataSource.getDefaultConnection();
}
public Connection getConnection() {
return connection;
}
public void setConnection(Connection connection) {
this.connection = connection;
}
public String getDefaultGroupName() {
return (defaultPCID == null) ? null : defaultPCID.getGroupName();
}
public void setDefaultGroupName(String groupName) {
if (defaultPCID == null) defaultPCID = new ProcessControlIdentifier();
defaultPCID.setGroupName(groupName);
}
public String getDefaultThreadName() {
return (defaultPCID == null) ? null : defaultPCID.getThreadName();
}
public void setDefaultThreadName(String threadName) {
if (defaultPCID == null) defaultPCID = new ProcessControlIdentifier();
defaultPCID.setThreadName(threadName);
}
public String getDefaultStateName() {
return (defaultPCID == null) ? null : defaultPCID.getStateName();
}
public void setDefaultStateName(String stateName) {
if (defaultPCID == null) defaultPCID = new ProcessControlIdentifier();
defaultPCID.setStateName(stateName);;
}
public ProcessControlIdentifier getDefaultProcessControlIdentifier() {
return defaultPCID;
}
public void setDefaultProcessControlIdentifier(ProcessControlIdentifier pcId) {
defaultPCID = pcId;
}
public void setDefaultProcessControlIdentifier(String defaultGroupName, String defaultThreadName, String defaultStateName) {
if (defaultPCID == null) defaultPCID = new ProcessControlIdentifier();
defaultPCID.setProcessControlNames(defaultGroupName, defaultThreadName, defaultStateName);
}
public long getNextId() {
if (defaultPCID == null) throw new NullPointerException("Must initialize ProcessControlIdentifier names.");
return getNextId(defaultPCID.getGroupName(), defaultPCID.getThreadName(), defaultPCID.getStateName());
}
public long getNextId(ProcessControlIdentifier pcId) {
return getNextId(pcId.getGroupName(), pcId.getThreadName(), pcId.getStateName());
}
public abstract int processId(long id) ;
public int processNextId() {
long id = getNextId();
return (id > 0l) ? processId(id) : 0;
}
public int resultId(long id, int resultCode) {
return resultId(id, defaultPCID.getGroupName(), defaultPCID.getThreadName(), defaultPCID.getStateName(), resultCode);
}
public int resultId(long id, ProcessControlIdentifier pcId, int resultCode) {
return resultId(id, pcId.getGroupName(), pcId.getThreadName(), pcId.getStateName(), resultCode);
}
public int postId(long id, ProcessControlIdentifier pcId, int rank) {
return postId(id, pcId.getGroupName(), pcId.getThreadName(), pcId.getStateName(), rank);
}
public int processIds() {
int status = 0;
int count = 0;
do {
status = processNextId();
if (status > 0) count++;
} while (status > 0);
return (status < 0) ? status : count;
}
public int processIds(ProcessControlIdentifier pcId, int rank, org.trinet.util.DateRange dateRange) {
return processIds(pcId.getGroupName(), pcId.getThreadName(), pcId.getStateName(), rank, dateRange.getMinEpochSecs(),
dateRange.getMaxEpochSecs());
}
// Define these methods:
public long getNextId(String groupName, String threadName, String stateName) {
long id = 0;
try {
if (pcsGetNextIdForProcessingStmt == null) {
pcsGetNextIdForProcessingStmt = connection.prepareCall(PCS_GET_NEXT_ID_FOR_PROCESSING_FUNCTION);
}
pcsGetNextIdForProcessingStmt.registerOutParameter(1, java.sql.Types.BIGINT);
pcsGetNextIdForProcessingStmt.setString(2, groupName);
pcsGetNextIdForProcessingStmt.setString(3, threadName);
pcsGetNextIdForProcessingStmt.setString(4, stateName);
pcsGetNextIdForProcessingStmt.execute();
id = pcsGetNextIdForProcessingStmt.getLong(1);
}
catch (SQLException ex) {
ex.printStackTrace();
return ProcessingResult.DB_SQL_EXCEPTION.getIdCode();
}
return id;
}
public int postId(long id, String groupName, String threadName, String stateName, int postingRank) {
if (! isValidId(id))
return ProcessingResult.INVALID_ID.getIdCode();
if ( ! isValidRank(postingRank))
return ProcessingResult.INVALID_POSTING_RANK.getIdCode();
try {
if (pcsPostIdForProcessingStmt == null) {
pcsPostIdForProcessingStmt = connection.prepareCall(PCS_POST_ID_FOR_PROCESSING_FUNCTION);
}
pcsPostIdForProcessingStmt.setString(1, groupName);
pcsPostIdForProcessingStmt.setString(2, threadName);
pcsPostIdForProcessingStmt.setLong(3, id);
pcsPostIdForProcessingStmt.setString(4, stateName);
pcsPostIdForProcessingStmt.setInt(5, postingRank);
pcsPostIdForProcessingStmt.execute();
}
catch (SQLException ex) {
ex.printStackTrace();
return ProcessingResult.DB_SQL_EXCEPTION.getIdCode();
}
return ProcessingResult.UNIT_SUCCESS.getIdCode();
}
public int resultId(long id, String groupName, String threadName, String stateName, int resultCode) {
try {
if (pcsResultIdProcessingStmt == null) {
pcsResultIdProcessingStmt = connection.prepareCall(PCS_RESULT_ID_PROCESSING_FUNCTION);
}
pcsResultIdProcessingStmt.setString(1, groupName);
pcsResultIdProcessingStmt.setString(2, threadName);
pcsResultIdProcessingStmt.setLong(3, id);
pcsResultIdProcessingStmt.setString(4, stateName);
pcsResultIdProcessingStmt.setInt(5, resultCode);
pcsResultIdProcessingStmt.execute();
}
catch (SQLException ex) {
ex.printStackTrace();
return ProcessingResult.DB_SQL_EXCEPTION.getIdCode();
}
return ProcessingResult.UNIT_SUCCESS.getIdCode();
}
public int processIds(String groupName, String threadName, String stateName, int rank, double startTime, double endTime) {
int status = 0;
int count = 0;
try {
if (pcsGetPostIdsForDateRangeProcessingStmt == null) {
pcsGetPostIdsForDateRangeProcessingStmt = connection.prepareStatement(pcsGetPostIdsForDateRangeSQLString);
}
pcsGetPostIdsForDateRangeProcessingStmt.setDouble(1, startTime);
pcsGetPostIdsForDateRangeProcessingStmt.setDouble(2, endTime);
ResultSet rs = pcsGetPostIdsForDateRangeProcessingStmt.executeQuery();
setDefaultProcessControlIdentifier(groupName, threadName, stateName);
while (rs.next()) {
long id = rs.getLong(1);
status = postId(id, groupName, threadName, stateName, rank);
if (status <= 0) break;
status = processId(id);
if (status < 0 && status != ProcessingResult.INVALID_ID.getIdCode()) break;
if (status > 0) count++;
}
rs.close();
}
catch (SQLException ex) {
ex.printStackTrace();
return ProcessingResult.DB_SQL_EXCEPTION.getIdCode();
}
if (status < 0) return status;
else return count;
//return ProcessingResult.UNIT_SUCCESS.getIdCode();
}
public void closeProcessing() {
try {
if (pcsGetNextIdForProcessingStmt != null) pcsGetNextIdForProcessingStmt.close();
if (pcsResultIdProcessingStmt != null) pcsResultIdProcessingStmt.close();
if (pcsPostIdForProcessingStmt != null) pcsPostIdForProcessingStmt.close();
if (pcsGetPostIdsForDateRangeProcessingStmt != null) pcsGetPostIdsForDateRangeProcessingStmt.close();
if (pcsCheckIdTableStmt != null) pcsCheckIdTableStmt.close();
pcsGetNextIdForProcessingStmt = null;
pcsResultIdProcessingStmt = null;
pcsPostIdForProcessingStmt = null;
pcsGetPostIdsForDateRangeProcessingStmt = null;
pcsCheckIdTableStmt = null;
}
catch (SQLException ex) {
ex.printStackTrace();
}
}
public boolean isValidRank(int rank) { return (rank > -1) ; }
public boolean isValidId(long id) {
if (id < 0l) return false;
boolean status = false;
try {
if (pcsCheckIdTableStmt == null) pcsCheckIdTableStmt = connection.prepareStatement(pcsCheckIdSQLString);
pcsCheckIdTableStmt.setLong(1, id);
ResultSet rs = pcsCheckIdTableStmt.executeQuery();
if (rs.next()) {
long resultId = rs.getLong(1);
if (resultId > 0 && ! rs.wasNull()) status = true;
rs.close();
}
}
catch (SQLException ex) {
ex.printStackTrace();
}
if (!status) System.out.println("ProcessingController isValidId unable to find id: " + id + " in table.");
return status;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -