replicationmanager.java
来自「JGRoups源码」· Java 代码 · 共 343 行 · 第 1/2 页
JAVA
343 行
// $Id: ReplicationManager.java,v 1.8 2006/09/29 21:49:02 bstansberry Exp $package org.jgroups.blocks;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.*;import org.jgroups.util.RspList;import java.io.Serializable;/** * Class to propagate updates to a number of nodes in various ways: * <ol> * <li>Asynchronous * <li>Synchronous * <li>Synchronous with locking * </ol> * * <br/><em>Note: This class is experimental as of Oct 2002</em> * * @author Bela Ban Oct 2002 */public class ReplicationManager implements RequestHandler { Address local_addr=null; ReplicationReceiver receiver=null; /** Used to broadcast updates and receive responses (latter only in synchronous case) */ protected MessageDispatcher disp=null; protected final Log log=LogFactory.getLog(this.getClass()); /** * Creates an instance of ReplicationManager on top of a Channel */ public ReplicationManager(Channel channel, MessageListener ml, MembershipListener l, ReplicationReceiver receiver) { setReplicationReceiver(receiver); if(channel != null) { local_addr=channel.getLocalAddress(); disp=new MessageDispatcher(channel, ml, l, this, // ReplicationManager is RequestHandler true); // use deadlock detection } } /** * Creates an instance of ReplicationManager on top of a PullPushAdapter */ public ReplicationManager(PullPushAdapter adapter, Serializable id, MessageListener ml, MembershipListener l, ReplicationReceiver receiver) { if(adapter != null && adapter.getTransport() != null && adapter.getTransport() instanceof Channel) local_addr=((Channel)adapter.getTransport()).getLocalAddress(); setReplicationReceiver(receiver); disp=new MessageDispatcher(adapter, id, // FIXME ml, l, this); // ReplicationManager is RequestHandler disp.setDeadlockDetection(true); } public void stop() { if(disp != null) disp.stop(); } /** * Create a new transaction. The transaction will be used to send updates, identify updates in the same transaction, * and eventually commit or rollback the changes associated with the transaction. * @return Xid A unique transaction * @exception Exception Thrown when local_addr is null */ public Xid begin() throws Exception { return begin(Xid.DIRTY_READS); } /** * Create a new transaction. The tracsion will be used to send updates, identify updates in the same transaction, * and eventually commit or rollback the changes associated with the transaction. * @param transaction_mode Mode in which the transaction should run. Possible values are Xid.DIRTY_READS, * Xid.READ_COMMITTED, Xid.REPEATABLE_READ and Xid.SERIALIZABLE * @return Xid A unique transaction * @exception Exception Thrown when local_addr is null */ public Xid begin(int transaction_mode) throws Exception { return Xid.create(local_addr, transaction_mode); } public void setReplicationReceiver(ReplicationReceiver handler) { this.receiver=handler; } public void setMembershipListener(MembershipListener l) { if(l == null) return; if(disp == null) { if(log.isErrorEnabled()) log.error("dispatcher is null, cannot set MembershipListener"); } else { disp.setMembershipListener(l); } } /** * Sends a request to all members of the group. Sending is asynchronous (return immediately) or * synchronous (wait for all members to respond). If <code>use_locking</code> is true, then locking * will be used at the receivers to acquire locks before accessing/updating a resource. Locks can be * explicitly set using <code>lock_info</code> or implicitly through <code>data</code>. In the latter * case, locks are induced from the data sent, e.g. if the data is a request for updating a certain row * in a table, then we need to acquire a lock for that table.<p> * In case of using locks, if the transaction associated with update already has a lock for a given resource, * we will return. Otherwise, we will wait for <code>lock_acquisition_timeout</code> milliseconds. If the lock * is not granted within that time a <code>LockingException</code> will be thrown. (<em>We hope to * replace this timeout with a distributed deadlock detection algorithm in the future.</em>)<p> * We have 3 main use case for this method: * <ol> * <li><b>Asynchronous</b>: sends the message and returns immediately. Argument <code>asynchronous</code> * needs to be true. All other arguments except <code>data</code> are ignored and can be null. Will call * <code>update()</code> on the registered ReplicationReceiver at each receiver. * <li><b>Synchronous without locks</b>: sends the message, but returns only after responses from all members * have been received, or <code>synchronous_timeout</code> milliseconds have elapsed (whichever comes * first). Argument <code>asynchronous</code> needs to be false. Argument <code>synchronous_timeout</code> * needs to be >= 0. If it is null the call will not time out, but wait for all responses. * All other arguments (besides <code>data</code> are ignored). * <li><b>Synchronous with locks</b>: sends the message, but returns only after responses from all members * have been received, or <code>synchronous_timeout</code> milliseconds have elapsed (whichever comes * first). At the receiver's side we have to acquire a lock for the resource to be updated, if the * acquisition fails a LockingException will be thrown. The resource to be locked can be found in two ways: * either <code>data</code> contains the resource(c) to be acquired implicitly, or <code>lock_info</code> * lists the resources explicitly, or both. All the locks acquired at the receiver's side should be associated * with <code>transaction</code>. When a <code>commit()</code> is received, the receiver should commit * the modifications to the resource and release all locks. When a <code>rollback()</code> is received, * the receiver should remove all (temporary) modifications and release all locks associated with * <code>transaction</code>. * </ol> * In both the synchronous cases a List of byte[] will be returned if the data was sent to all receivers * successfully, cointaining byte buffers. The list may be empty. * @param dest The destination to which to send the message. Will be sent to all members if null. * @param data The data to be sent to all members. It may contain information about the resource to be locked. * @param synchronous If false the call is asynchronous, ie. non-blocking. If true, the method will wait * until responses from all members have been received (unless a timeout is defined, see below) * @param synchronous_timeout In a synchronous call, we will wait for responses from all members or until * <code>synchronous_timeout</code> have elapsed (whichever comes first). 0 means * to wait forever. * @param transaction The transaction under which all locks for resources should be acquired. The receiver * will probably maintain a lock table with resources as keys and transactions as values. * When an update is received, the receiver checks its lock table: if the resource is * not yet taken, the resource/transaction pair will be added to the lock table. Otherwise, * we check if the transaction's owner associated with the resource is the same as the caller. * If this is the case, the lock will be considered granted, otherwise we will wait for the
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?