⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 replicationlocal.java

📁 大家共享愉快, 共享愉快, 共享愉快, 共享愉快,共享愉快
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/******************************************************************************
 * The contents of this file are subject to the   Compiere License  Version 1.1
 * ("License"); You may not use this file except in compliance with the License
 * You may obtain a copy of the License at http://www.compiere.org/license.html
 * Software distributed under the License is distributed on an  "AS IS"  basis,
 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for
 * the specific language governing rights and limitations under the License.
 * The Original Code is Compiere ERP & CRM Smart Business Solution. The Initial
 * Developer of the Original Code is Jorg Janke. Portions created by Jorg Janke
 * are Copyright (C) 1999-2005 Jorg Janke.
 * All parts are Copyright (C) 1999-2005 ComPiere, Inc.  All Rights Reserved.
 * Contributor(s): ______________________________________.
 *****************************************************************************/
package org.compiere.process;

import java.math.*;
import java.sql.*;
import java.util.*;
import java.util.logging.*;
import javax.naming.*;
import javax.sql.*;
import org.compiere.db.*;
import org.compiere.interfaces.*;
import org.compiere.model.*;
import org.compiere.util.*;

/**
 * 	Local (Central) Data Replication.
 * 	Note: requires migration technology
 *
 *  @author Jorg Janke
 *  @version $Id: ReplicationLocal.java,v 1.18 2005/09/29 22:01:54 jjanke Exp $
 */
public class ReplicationLocal extends SvrProcess
{
	/**	System Record				*/
	private	MSystem				m_system = null;
	/**	Replication Info			*/
	private MReplication 		m_replication = null;
	/**	Replication Run				*/
	private MReplicationRun 	m_replicationRun = null;
	/**	Test Flag					*/
	private Boolean				m_test = Boolean.FALSE;
	/**	Replication Flag			*/
	private boolean				m_replicated = true;
	/**	Remote Server				*/
	private Server 				m_serverRemote = null;
	private long				m_start = System.currentTimeMillis();
	/**	Date Run on Remote			*/
	private Timestamp			m_replicationStart = new Timestamp (m_start);

	/**	Logger						*/
	private static CLogger		s_log = CLogger.getCLogger(ReplicationLocal.class);

	/**	Remote class				*/
	private static String	REMOTE = "org.compiere.process.ReplicationRemote";
	protected static String	START = "com.compiere.client.StartReplication";

	/**
	 *  Prepare - e.g., get Parameters.
	 */
	public void prepare()
	{
		ProcessInfoParameter[] para = getParameter();
		for (int i = 0; i < para.length; i++)
		{
			String name = para[i].getParameterName();
			if (para[i].getParameter() == null)
				;
			else if (name.equals("IsTest"))
				m_test = Boolean.valueOf("Y".equals (para[i].getParameter()));
			else
				log.log(Level.SEVERE, "prepare - Unknown Parameter: " + name);
		}
		m_system = MSystem.get (getCtx());
	}	//	prepare

	/**
	 *  Perrform process.
	 *  @return Message
	 *  @throws Exception if not successful
	 */
	public String doIt() throws Exception
	{
		if (m_system == null || !m_system.isValid())
			return ("SystemNotSetupForReplication");
		//
		log.info("doIt - Record_ID=" + getRecord_ID() + ", test=" + m_test);
		connectRemote();
		//
		setupRemote();
		mergeData();
		sendUpdates();

		//	Save Info
		log.info("doIt - Replicated=" + m_replicated + " - " + m_replicationStart);
		m_replicationRun.setIsReplicated(m_replicated);
		double sec = (System.currentTimeMillis() - m_start);
		sec /= 1000;
		m_replicationRun.setDescription(sec + " s");
		m_replicationRun.save();
		if (m_replicated)
		{
			m_replication.setDateLastRun (m_replicationStart);
			m_replication.save();
		}
		//
		exit();
		return m_replicated ? "Replicated" : "Replication Error";
	}	//	doIt


	/**
	 * 	Connect to Remote Server
	 *	@throws java.lang.Exception
	 */
	private void connectRemote() throws Exception
	{
		//	Replication Info
		m_replication = new MReplication (getCtx(), getRecord_ID(), get_TrxName());
		//
		String AppsHost = m_replication.getHostAddress();
		int AppsPort = m_replication.getHostPort();
		boolean RMIoverHTTP = m_replication.isRMIoverHTTP();
		log.info (AppsHost + ":" + AppsPort + " - HTTP Tunnel=" + RMIoverHTTP);
		InitialContext ic = CConnection.getInitialContext(
			CConnection.getInitialEnvironment(AppsHost, AppsPort, RMIoverHTTP));
		if (ic == null)
			throw new Exception ("NoInitialContext");

		try
		{
			ServerHome serverHome = (ServerHome)ic.lookup (ServerHome.JNDI_NAME);
		//	log.fine("- ServerHome: " + serverHome);
			if (serverHome == null)
				throw new Exception ("NoServer");
			m_serverRemote = serverHome.create();
		//	log.fine("- Server: " + m_serverRemote);
		//	log.fine("- Remote Status = " + m_serverRemote.getStatus());
		}
		catch (Exception ex)
		{
			log.log(Level.SEVERE, "connectRemote", ex);
			throw new Exception (ex);
		}
	}	//	connectRemote


	/**
	 *	Setup Remote AD_System/AD_Table/AD_Sequence for Remote Management.
	 * 	@throws Exception
	 */
	private void setupRemote() throws Exception
	{
		log.info("setupRemote");
		//
		String sql = "SELECT rt.AD_Table_ID, rt.ReplicationType, t.TableName "
			+ "FROM AD_ReplicationTable rt"
			+ " INNER JOIN AD_Table t ON (rt.AD_Table_ID=t.AD_Table_ID) "
			+ "WHERE rt.IsActive='Y' AND t.IsActive='Y'"
			+ " AND AD_ReplicationStrategy_ID=? "	//	#1
			+ "ORDER BY t.LoadSeq";
		RowSet rowset = getRowSet(sql, new Object[]{new Integer(m_replication.getAD_ReplicationStrategy_ID())});
		if (rowset == null)
			throw new Exception("setupRemote - No RowSet Data");

		//	Data Info
		RemoteSetupVO data = new RemoteSetupVO();
		data.Test = m_test;
		data.ReplicationTable = rowset;	//	RowSet
		data.IDRangeStart = m_replication.getIDRangeStart();
		data.IDRangeEnd   = m_replication.getIDRangeEnd();
		data.AD_Client_ID = m_replication.getRemote_Client_ID();
		data.AD_Org_ID = m_replication.getRemote_Org_ID();
		data.Prefix = m_replication.getPrefix();
		data.Suffix = m_replication.getSuffix();
		//	Process Info
		ProcessInfo pi = new ProcessInfo(data.toString(), 0);
		pi.setClassName (REMOTE);
		pi.setSerializableObject(data);
		Object result = doIt(START, "init", new Object[]{m_system});
		if (result == null || !Boolean.TRUE.equals(result))
			throw new Exception("setupRemote - Init Error - " + result);
		//	send it
		pi = m_serverRemote.process (new Properties (), pi);
		ProcessInfoLog[] logs = pi.getLogs();
		Timestamp dateRun = null;
		if (logs != null && logs.length > 0)
			dateRun = logs[0].getP_Date();	//	User Remote Timestamp!
		//
		log.info ("setupRemote - " + pi + " - Remote Timestamp = " + dateRun);
		if (dateRun != null)
			m_replicationStart = dateRun;
		m_replicationRun = new MReplicationRun (getCtx(), m_replication.getAD_Replication_ID(), m_replicationStart, get_TrxName());
		m_replicationRun.save();
	}	//	setupRemote

	/*************************************************************************/

	/**
	 * 	Receive new Data from Remote.
	 * 	@throws Exception
	 */
	private void mergeData() throws Exception
	{
		log.info("mergeData");
		//
		String sql = "SELECT rt.AD_Table_ID, rt.ReplicationType, t.TableName, rt.AD_ReplicationTable_ID "
			+ "FROM AD_ReplicationTable rt"
			+ " INNER JOIN AD_Table t ON (rt.AD_Table_ID=t.AD_Table_ID) "
			+ "WHERE rt.IsActive='Y' AND t.IsActive='Y'"
			+ " AND AD_ReplicationStrategy_ID=?" 	//	#1
			+ " AND rt.ReplicationType='M' "		//	Merge
			+ "ORDER BY t.LoadSeq";
		RowSet rowset = getRowSet(sql, new Object[]{new Integer(m_replication.getAD_ReplicationStrategy_ID())});
		try
		{
			while (rowset.next())
				mergeDataTable (rowset.getInt(1), rowset.getString(3), rowset.getInt(4));
			rowset.close();
			rowset = null;
		}
		catch (SQLException ex)
		{
			log.log(Level.SEVERE, "mergeData", ex);
			m_replicated = false;
		}
	}	//	mergeData

	/**
	 * 	Receive New Data from Remote (and send local updates)
	 * 	@param AD_Table_ID table id
	 * 	@param TableName table name
	 * 	@param AD_ReplicationTable_ID id
	 * 	@return true if success
	 * 	@throws java.lang.Exception
	 */
	private boolean mergeDataTable (int AD_Table_ID, String TableName, int AD_ReplicationTable_ID) throws Exception
	{
		RemoteMergeDataVO data = new RemoteMergeDataVO();
		data.Test = m_test;
		data.TableName = TableName;
		//	Create SQL
		StringBuffer sql = new StringBuffer("SELECT * FROM ")
			.append(TableName)
			.append(" WHERE AD_Client_ID=").append(m_replication.getRemote_Client_ID());
		if (m_replication.getRemote_Org_ID() != 0)
			sql.append(" AND AD_Org_ID IN (0,").append(m_replication.getRemote_Org_ID()).append(")");
		if (m_replication.getDateLastRun() != null)
			sql.append(" AND Updated >= ").append(DB.TO_DATE(m_replication.getDateLastRun(), false));
		sql.append(" ORDER BY ");
		data.KeyColumns = getKeyColumns(AD_Table_ID);
		if (data.KeyColumns == null || data.KeyColumns.length == 0)
		{
			log.log(Level.SEVERE, "mergeDataTable - No KeyColumns for " + TableName);
			m_replicated = false;
			return false;
		}
		for (int i = 0; i < data.KeyColumns.length; i++)
		{
			if (i > 0)
				sql.append(",");
			sql.append(data.KeyColumns[i]);
		}
		data.Sql = sql.toString();
		//	New Central Data
		data.CentralData = getRowSet(data.Sql, null);
		if (data.CentralData == null)
		{
			log.fine("mergeDataTable - CentralData is Null - " + TableName);
			m_replicated = false;
			return false;
		}

		//	Process Info
		ProcessInfo pi = new ProcessInfo("MergeData", 0);
		pi.setClassName (REMOTE);
		pi.setSerializableObject(data);
		//	send it
		pi = m_serverRemote.process (new Properties (), pi);
		ProcessInfoLog[] logs = pi.getLogs();
		String msg = "< ";
		if (logs != null && logs.length > 0)
			msg += logs[0].getP_Msg();	//	Remote Message
		log.info("mergeDataTable - " + pi);
		//
		MReplicationLog rLog = new MReplicationLog (getCtx(), m_replicationRun.getAD_Replication_Run_ID(), AD_ReplicationTable_ID, msg, get_TrxName());
		if (pi.isError())
		{
			log.severe ("mergeDataTable Error - " + pi);
			m_replicated = false;
			rLog.setIsReplicated(false);
		}
		else	//	import data fom remote

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -