replicationmanager.java

来自「JGRoups源码」· Java 代码 · 共 343 行 · 第 1/2 页

JAVA
343
字号
// $Id: ReplicationManager.java,v 1.8 2006/09/29 21:49:02 bstansberry Exp $package org.jgroups.blocks;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.*;import org.jgroups.util.RspList;import java.io.Serializable;/** * Class to propagate updates to a number of nodes in various ways: * <ol> * <li>Asynchronous * <li>Synchronous * <li>Synchronous with locking * </ol> *  * <br/><em>Note: This class is experimental as of Oct 2002</em> * * @author Bela Ban Oct 2002 */public class ReplicationManager implements RequestHandler {    Address                     local_addr=null;    ReplicationReceiver         receiver=null;    /** Used to broadcast updates and receive responses (latter only in synchronous case) */    protected MessageDispatcher disp=null;    protected final Log log=LogFactory.getLog(this.getClass());    /**     * Creates an instance of ReplicationManager on top of a Channel     */    public ReplicationManager(Channel channel,                              MessageListener ml,                              MembershipListener l,                              ReplicationReceiver receiver) {        setReplicationReceiver(receiver);	if(channel != null) {            local_addr=channel.getLocalAddress();            disp=new MessageDispatcher(channel,                                       ml,                                       l,                                       this,      // ReplicationManager is RequestHandler                                       true);     // use deadlock detection        }    }        /**     * Creates an instance of ReplicationManager on top of a PullPushAdapter     */    public ReplicationManager(PullPushAdapter adapter,                              Serializable id,                              MessageListener ml,                              MembershipListener l,                              ReplicationReceiver receiver) {        if(adapter != null && adapter.getTransport() != null && adapter.getTransport() instanceof Channel)            local_addr=((Channel)adapter.getTransport()).getLocalAddress();        setReplicationReceiver(receiver);        disp=new MessageDispatcher(adapter,                                   id,     // FIXME                                   ml,                                   l,                                   this);  // ReplicationManager is RequestHandler        disp.setDeadlockDetection(true);    }        public void stop() {        if(disp != null)            disp.stop();    }        /**     * Create a new transaction. The transaction will be used to send updates, identify updates in the same transaction,     * and eventually commit or rollback the changes associated with the transaction.     * @return Xid A unique transaction     * @exception Exception Thrown when local_addr is null     */    public Xid begin() throws Exception {        return begin(Xid.DIRTY_READS);    }    /**     * Create a new transaction. The tracsion will be used to send updates, identify updates in the same transaction,     * and eventually commit or rollback the changes associated with the transaction.     * @param transaction_mode Mode in which the transaction should run. Possible values are Xid.DIRTY_READS,     *                         Xid.READ_COMMITTED, Xid.REPEATABLE_READ and Xid.SERIALIZABLE     * @return Xid A unique transaction     * @exception Exception Thrown when local_addr is null     */    public Xid begin(int transaction_mode) throws Exception {        return Xid.create(local_addr, transaction_mode);    }        public void setReplicationReceiver(ReplicationReceiver handler) {        this.receiver=handler;    }    public void setMembershipListener(MembershipListener l) {        if(l == null)            return;        if(disp == null) {            if(log.isErrorEnabled()) log.error("dispatcher is null, cannot set MembershipListener");        }        else {            disp.setMembershipListener(l);        }    }        /**     * Sends a request to all members of the group. Sending is asynchronous (return immediately) or     * synchronous (wait for all members to respond). If <code>use_locking</code> is true, then locking     * will be used at the receivers to acquire locks before accessing/updating a resource. Locks can be     * explicitly set using <code>lock_info</code> or implicitly through <code>data</code>. In the latter     * case, locks are induced from the data sent, e.g. if the data is a request for updating a certain row     * in a table, then we need to acquire a lock for that table.<p>     * In case of using locks, if the transaction associated with update already has a lock for a given resource,     * we will return. Otherwise, we will wait for <code>lock_acquisition_timeout</code> milliseconds. If the lock     * is not granted within that time a <code>LockingException</code> will be thrown. (<em>We hope to     * replace this timeout with a distributed deadlock detection algorithm in the future.</em>)<p>     * We have 3 main use case for this method:     * <ol>     * <li><b>Asynchronous</b>: sends the message and returns immediately. Argument <code>asynchronous</code>     *     needs to be true. All other arguments except <code>data</code> are ignored and can be null. Will call     *     <code>update()</code> on the registered ReplicationReceiver at each receiver.     * <li><b>Synchronous without locks</b>: sends the message, but returns only after responses from all members     *     have been received, or <code>synchronous_timeout</code> milliseconds have elapsed (whichever comes     *     first). Argument <code>asynchronous</code> needs to be false. Argument <code>synchronous_timeout</code>     *     needs to be >= 0. If it is null the call will not time out, but wait for all responses.     *     All other arguments (besides <code>data</code> are ignored).     * <li><b>Synchronous with locks</b>: sends the message, but returns only after responses from all members     *     have been received, or <code>synchronous_timeout</code> milliseconds have elapsed (whichever comes     *     first). At the receiver's side we have to acquire a lock for the resource to be updated, if the      *     acquisition fails a LockingException will be thrown. The resource to be locked can be found in two ways:     *     either <code>data</code> contains the resource(c) to be acquired implicitly, or <code>lock_info</code>     *     lists the resources explicitly, or both. All the locks acquired at the receiver's side should be associated     *     with <code>transaction</code>. When a <code>commit()</code> is received, the receiver should commit     *     the modifications to the resource and release all locks. When a <code>rollback()</code> is received,     *     the receiver should remove all (temporary) modifications and release all locks associated with     *     <code>transaction</code>.     * </ol>     * In both the synchronous cases a List of byte[] will be returned if the data was sent to all receivers     * successfully, cointaining byte buffers. The list may be empty.     * @param dest The destination to which to send the message. Will be sent to all members if null.     * @param data The data to be sent to all members. It may contain information about the resource to be locked.     * @param synchronous If false the call is asynchronous, ie. non-blocking. If true, the method will wait     *                    until responses from all members have been received (unless a timeout is defined, see below)     * @param synchronous_timeout In a synchronous call, we will wait for responses from all members or until     *                            <code>synchronous_timeout</code> have elapsed (whichever comes first). 0 means     *                            to wait forever.     * @param transaction The transaction under which all locks for resources should be acquired. The receiver     *                    will probably maintain a lock table with resources as keys and transactions as values.     *                    When an update is received, the receiver checks its lock table: if the resource is     *                    not yet taken, the resource/transaction pair will be added to the lock table. Otherwise,     *                    we check if the transaction's owner associated with the resource is the same as the caller.     *                    If this is the case, the lock will be considered granted, otherwise we will wait for the

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?