replicationmanager.java
来自「JGRoups源码」· Java 代码 · 共 343 行 · 第 1/2 页
JAVA
343 行
* resource to become available (for a certain amount of time). When a transaction is * committed or rolled back, all resources associated with this transaction will be released. * @param lock_info Information about resource(s) to be acquired. This may be null, e.g. if this information * is already implied in <code>data</code>. Both <code>data</code> and <code>lock_info</code> * may be used to define the set of resources to be acquired. * @param lock_acquisition_timeout The number of milliseconds to wait until a lock acquisition request is * considered failed (causing a LockingException). If 0 we will wait forever. * (Note that this may lead to deadlocks). * @param lock_lease_timeout The number of milliseconds we want to keep the lock for a resource. After * this time has elapsed, the lock will be released. If 0 we won't release the lock(s) * @param use_locks If this is false, we will ignore all lock information (even if it is specified) and * not use locks at all. * @return RspList A list of Rsps ({@link org.jgroups.util.Rsp}), one for each member. Each one is the result of * {@link ReplicationReceiver#receive}. If a member didn't send a response, the <code>received</code> * field will be false. If the member was suspected while waiting for a response, the <code> * suspected</code> field will be true. If the <code>receive()</code> method in the receiver returned * a value it will be in field <code>retval</code>. If the receiver threw an exception it will also * be in this field. */ public RspList send(Address dest, byte[] data, boolean synchronous, long synchronous_timeout, Xid transaction, byte[] lock_info, long lock_acquisition_timeout, long lock_lease_timeout, boolean use_locks) { // throws UpdateException, TimeoutException, LockingException { Message msg=null; ReplicationData d=new ReplicationData(ReplicationData.SEND, data, transaction, lock_info, lock_acquisition_timeout, lock_lease_timeout, use_locks); if(log.isInfoEnabled()) log.info("data is " + d + " (synchronous=" + synchronous + ')'); msg=new Message(dest, null, d); if(synchronous) { return disp.castMessage(null, msg, GroupRequest.GET_ALL, synchronous_timeout); } else { disp.castMessage(null, msg, GroupRequest.GET_NONE, 0); return null; } } /** * Commits all modifications sent to the receivers via {@link #send} and releases all locks associated with * this transaction. If modifications were made to stable storage (but not to resource), those modifications * would now need to be transferred to the resource (e.g. database). */ public void commit(Xid transaction) { sendMessage(ReplicationData.COMMIT, transaction); } /** * Discards all modifications sent to the receivers via {@link #send} and releases all locks associated with * this transaction. */ public void rollback(Xid transaction) { sendMessage(ReplicationData.ROLLBACK, transaction); } /* ------------------------------- RequestHandler interface ------------------------------ */ public Object handle(Message msg) { Object retval=null; ReplicationData data; if(msg == null) { if(log.isErrorEnabled()) log.error("received message was null"); return null; } if(msg.getLength() == 0) { if(log.isErrorEnabled()) log.error("payload of received message was null"); return null; } try { data=(ReplicationData)msg.getObject(); } catch(Throwable ex) { if(log.isErrorEnabled()) log.error("failure unmarshalling message: " + ex); return null; } switch(data.getType()) { case ReplicationData.SEND: try { return handleSend(data); } catch(Throwable ex) { if(log.isErrorEnabled()) log.error("failed handling update: " + ex); return ex; } case ReplicationData.COMMIT: handleCommit(data.getTransaction()); break; case ReplicationData.ROLLBACK: handleRollback(data.getTransaction()); break; default: if(log.isErrorEnabled()) log.error("received incorrect replication message: " + data); return null; } return retval; } /* --------------------------- End of RequestHandler interface---------------------------- */ protected Object handleSend(ReplicationData data) throws UpdateException, LockingException { try { if(receiver == null) { if(log.isWarnEnabled()) log.warn("receiver is not set"); return null; } return receiver.receive(data.getTransaction(), data.getData(), data.getLockInfo(), data.getLockAcquisitionTimeout(), data.getLockLeaseTimeout(), data.useLocks()); } catch(Throwable ex) { return ex; } } protected void handleCommit(Xid transaction) { if(receiver == null) { if(log.isWarnEnabled()) log.warn("receiver is not set"); } else receiver.commit(transaction); } protected void handleRollback(Xid transaction) { if(receiver == null) { if(log.isWarnEnabled()) log.warn("receiver is not set"); } else receiver.rollback(transaction); } /* -------------------------------------- Private methods ------------------------------------ */ void sendMessage(int type, Xid transaction) { ReplicationData data=new ReplicationData(type, null, transaction, null, 0, 0, false); Message msg=new Message(null, null, data); disp.castMessage(null, msg, GroupRequest.GET_NONE, 0); // send commit message asynchronously } /* ---------------------------------- End of Private methods --------------------------------- */ }
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?