📄 replicationlocal.java
字号:
{
RowSet sourceRS = (RowSet)pi.getSerializableObject();
RowSet targetRS = getRowSet(data.Sql, null);
Object result = doIt (START, "sync", new Object[] // Merge
{data.TableName, data.KeyColumns, sourceRS, targetRS, m_test, Boolean.TRUE});
boolean replicated = isReplicated(result);
if (replicated)
log.fine("mergeDataTable -> " + TableName + " - " + result);
else
{
m_replicated = false;
log.severe ("mergeDataTable -> " + TableName + " - " + result);
}
rLog.setIsReplicated(replicated);
if (result != null)
rLog.setP_Msg("< " + result.toString());
sourceRS.close();
sourceRS = null;
targetRS.close();
targetRS = null;
}
rLog.save();
return !pi.isError();
} // mergeDataTable
/**
* Get Key Columns (PK or FK) of Table
* @param AD_Table_ID id
* @return Key Columns
*/
public String[] getKeyColumns (int AD_Table_ID)
{
ArrayList<String> list = new ArrayList<String>();
PreparedStatement pstmt = null;
try
{
// Get Keys
String sql = "SELECT ColumnName FROM AD_Column "
+ "WHERE AD_Table_ID=?"
+ " AND IsKey='Y'";
pstmt = DB.prepareStatement(sql, get_TrxName());
pstmt.setInt(1, AD_Table_ID);
ResultSet rs = pstmt.executeQuery();
while (rs.next())
list.add(rs.getString(1));
rs.close();
// no keys - search for parents
if (list.size() == 0)
{
sql = "SELECT ColumnName FROM AD_Column "
+ "WHERE AD_Table_ID=?"
+ " AND IsParent='Y'";
pstmt = DB.prepareStatement(sql, get_TrxName());
pstmt.setInt(1, AD_Table_ID);
rs = pstmt.executeQuery();
while (rs.next())
list.add(rs.getString(1));
rs.close();
}
pstmt.close();
pstmt = null;
}
catch (Exception e)
{
log.log(Level.SEVERE, "getKeyColumns", e);
}
try
{
if (pstmt != null)
pstmt.close();
}
catch (Exception e)
{
}
// Convert to Array
String[] retValue = new String[list.size()];
list.toArray(retValue);
return retValue;
} // getKeyColumns
/*************************************************************************/
/**
* Send Updates to Remote (i.e. r/o on remote)
* @throws Exception
*/
private void sendUpdates() throws Exception
{
log.info("sendUpdates");
//
String sql = "SELECT rt.AD_Table_ID, rt.ReplicationType, t.TableName, rt.AD_ReplicationTable_ID "
+ "FROM AD_ReplicationTable rt"
+ " INNER JOIN AD_Table t ON (rt.AD_Table_ID=t.AD_Table_ID) "
+ "WHERE rt.IsActive='Y' AND t.IsActive='Y'"
+ " AND AD_ReplicationStrategy_ID=?" // #1
+ " AND rt.ReplicationType='R' " // Reference
+ "ORDER BY t.LoadSeq";
RowSet rowset = getRowSet(sql, new Object[]{new Integer(m_replication.getAD_ReplicationStrategy_ID())});
try
{
while (rowset.next())
sendUpdatesTable (rowset.getInt(1), rowset.getString(3), rowset.getInt(4));
rowset.close();
}
catch (SQLException ex)
{
log.log(Level.SEVERE, "sendUpdates", ex);
m_replicated = false;
}
} // sendUpdates
/**
* Send UPdates to Remote
* @param AD_Table_ID table id
* @param TableName table
* @param AD_ReplicationTable_ID id
* @return true if success
* @throws Exception
*/
private boolean sendUpdatesTable (int AD_Table_ID, String TableName, int AD_ReplicationTable_ID) throws Exception
{
RemoteUpdateVO data = new RemoteUpdateVO();
data.Test = m_test;
data.TableName = TableName;
// Create SQL
StringBuffer sql = new StringBuffer ("SELECT * FROM ")
.append(TableName)
.append(" WHERE AD_Client_ID=").append(m_replication.getRemote_Client_ID());
if (m_replication.getRemote_Org_ID() != 0)
sql.append(" AND AD_Org_ID IN (0,").append(m_replication.getRemote_Org_ID()).append(")");
if (m_replication.getDateLastRun() != null)
sql.append(" AND Updated >= ").append(DB.TO_DATE(m_replication.getDateLastRun(), false));
sql.append(" ORDER BY ");
data.KeyColumns = getKeyColumns(AD_Table_ID);
if (data.KeyColumns == null || data.KeyColumns.length == 0)
{
log.log(Level.SEVERE, "sendUpdatesTable - No KeyColumns for " + TableName);
m_replicated = false;
return false;
}
for (int i = 0; i < data.KeyColumns.length; i++)
{
if (i > 0)
sql.append(",");
sql.append(data.KeyColumns[i]);
}
data.Sql = sql.toString();
// New Data
data.CentralData = getRowSet(data.Sql, null);
if (data.CentralData == null)
{
log.fine("sendUpdatesTable - Null - " + TableName);
m_replicated = false;
return false;
}
int rows = 0;
try
{
if (data.CentralData.last())
rows = data.CentralData.getRow();
data.CentralData.beforeFirst(); // rewind
}
catch (SQLException ex)
{
log.fine("RowCheck " + ex);
m_replicated = false;
return false;
}
if (rows == 0)
{
log.fine("No Rows - " + TableName);
return true;
}
else
log.fine(TableName + " #" + rows);
// Process Info
ProcessInfo pi = new ProcessInfo("SendUpdates", 0);
pi.setClassName (REMOTE);
pi.setSerializableObject(data);
// send it
pi = m_serverRemote.process (new Properties (), pi);
log.info("sendUpdatesTable - " + pi);
ProcessInfoLog[] logs = pi.getLogs();
String msg = "> ";
if (logs != null && logs.length > 0)
msg += logs[0].getP_Msg(); // Remote Message
//
MReplicationLog rLog = new MReplicationLog (getCtx(), m_replicationRun.getAD_Replication_Run_ID(), AD_ReplicationTable_ID, msg, get_TrxName());
if (pi.isError())
m_replicated = false;
rLog.setIsReplicated(!pi.isError());
rLog.save();
return !pi.isError();
} // sendUpdatesTable
/**
* Clean up resources (connections)
*/
private void exit()
{
log.info ("exit");
Object result = doIt(START, "exit", null);
ProcessInfo pi = new ProcessInfo("Exit", 0);
pi.setClassName (REMOTE);
pi.setSerializableObject(m_replicationStart);
// send it
try
{
m_serverRemote.process (new Properties (), pi);
}
catch (Exception ex)
{
}
} // exit
/**************************************************************************
* Get RowSet of Local Connection
* @param sql sql
* @param args optional argument array - supported: Integer, Timestamp, BigDecimal - rest is concerted to String
* @return row set
*/
public static RowSet getRowSet (String sql, Object[] args)
{
// shared connection
Connection conn = DB.getConnectionRO();
PreparedStatement pstmt = null;
RowSet rowSet = null;
//
try
{
pstmt = conn.prepareStatement(sql,
ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
// Set Parameters
if (args != null)
{
for (int i = 0; i < args.length; i++)
{
if (args[i] == null)
s_log.log(Level.SEVERE, "NULL Argument " + i);
else if (args[i] instanceof Integer)
pstmt.setInt(i+1, ((Integer)args[i]).intValue());
else if (args[i] instanceof Timestamp)
pstmt.setTimestamp(i+1, (Timestamp)args[i]);
else if (args[i] instanceof BigDecimal)
pstmt.setBigDecimal(i+1, (BigDecimal)args[i]);
else
pstmt.setString(i+1, args[i].toString());
}
}
//
ResultSet rs = pstmt.executeQuery();
rowSet = CCachedRowSet.getRowSet(rs);
pstmt.close();
pstmt = null;
}
catch (Exception ex)
{
s_log.log(Level.SEVERE, sql, ex);
throw new RuntimeException (ex);
}
// Close Cursor
try
{
if (pstmt != null)
pstmt.close();
pstmt = null;
}
catch (Exception e)
{
s_log.log(Level.SEVERE, "close pstmt", e);
}
return rowSet;
} // getRowSet
/**
* Is data successful replicated
* @param result sync return value
* @return true if replicated
*/
public static boolean isReplicated (Object result)
{
boolean replicated = result != null && !Boolean.FALSE.equals(result);
if (replicated)
replicated = result.toString().endsWith("Errors=0");
return replicated;
} // isReplicated
} // ReplicationLocal
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -