📄 deltamanager.java
字号:
/**
* Remove a lifecycle event listener from this component.
*
* @param listener
* The listener to remove
*/
public void removeLifecycleListener(LifecycleListener listener) {
lifecycle.removeLifecycleListener(listener);
}
/**
* Prepare for the beginning of active use of the public methods of this
* component. This method should be called after <code>configure()</code>,
* and before any of the public methods of the component are utilized.
*
* @exception LifecycleException
* if this component detects a fatal error that prevents this
* component from being used
*/
public void start() throws LifecycleException {
if (!initialized) init();
// Validate and update our current component state
if (started) {
return;
}
started = true;
lifecycle.fireLifecycleEvent(START_EVENT, null);
// Force initialization of the random number generator
generateSessionId();
// Load unloaded sessions, if any
try {
//the channel is already running
Cluster cluster = getCluster() ;
// stop remove cluster binding
//wow, how many nested levels of if statements can we have ;)
if(cluster == null) {
Container context = getContainer() ;
if(context != null && context instanceof Context) {
Container host = context.getParent() ;
if(host != null && host instanceof Host) {
cluster = host.getCluster();
if(cluster != null && cluster instanceof CatalinaCluster) {
setCluster((CatalinaCluster) cluster) ;
} else {
Container engine = host.getParent() ;
if(engine != null && engine instanceof Engine) {
cluster = engine.getCluster();
if(cluster != null && cluster instanceof CatalinaCluster) {
setCluster((CatalinaCluster) cluster) ;
}
} else {
cluster = null ;
}
}
}
}
}
if (cluster == null) {
log.error(sm.getString("deltaManager.noCluster", getName()));
return;
} else {
if (log.isInfoEnabled()) {
String type = "unknown" ;
if( cluster.getContainer() instanceof Host){
type = "Host" ;
} else if( cluster.getContainer() instanceof Engine){
type = "Engine" ;
}
log.info(sm.getString("deltaManager.registerCluster", getName(), type, cluster.getClusterName()));
}
}
if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.startClustering", getName()));
//to survice context reloads, as only a stop/start is called, not
// createManager
cluster.registerManager(this);
getAllClusterSessions();
} catch (Throwable t) {
log.error(sm.getString("deltaManager.managerLoad"), t);
}
}
/**
* get from first session master the backup from all clustered sessions
* @see #findSessionMasterMember()
*/
public synchronized void getAllClusterSessions() {
if (cluster != null && cluster.getMembers().length > 0) {
long beforeSendTime = System.currentTimeMillis();
Member mbr = findSessionMasterMember();
if(mbr == null) { // No domain member found
return;
}
SessionMessage msg = new SessionMessageImpl(this.getName(),SessionMessage.EVT_GET_ALL_SESSIONS, null, "GET-ALL","GET-ALL-" + getName());
// set reference time
stateTransferCreateSendTime = beforeSendTime ;
// request session state
counterSend_EVT_GET_ALL_SESSIONS++;
stateTransfered = false ;
// FIXME This send call block the deploy thread, when sender waitForAck is enabled
try {
synchronized(receivedMessageQueue) {
receiverQueue = true ;
}
cluster.send(msg, mbr);
if (log.isWarnEnabled()) log.warn(sm.getString("deltaManager.waitForSessionState",getName(), mbr));
// FIXME At sender ack mode this method check only the state transfer and resend is a problem!
waitForSendAllSessions(beforeSendTime);
} finally {
synchronized(receivedMessageQueue) {
for (Iterator iter = receivedMessageQueue.iterator(); iter.hasNext();) {
SessionMessage smsg = (SessionMessage) iter.next();
if (!stateTimestampDrop) {
messageReceived(smsg, smsg.getAddress() != null ? (Member) smsg.getAddress() : null);
} else {
if (smsg.getEventType() != SessionMessage.EVT_GET_ALL_SESSIONS && smsg.getTimestamp() >= stateTransferCreateSendTime) {
// FIXME handle EVT_GET_ALL_SESSIONS later
messageReceived(smsg,smsg.getAddress() != null ? (Member) smsg.getAddress() : null);
} else {
if (log.isWarnEnabled()) {
log.warn(sm.getString("deltaManager.dropMessage",getName(), smsg.getEventTypeString(),new Date(stateTransferCreateSendTime), new Date(smsg.getTimestamp())));
}
}
}
}
receivedMessageQueue.clear();
receiverQueue = false ;
}
}
} else {
if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.noMembers", getName()));
}
}
/**
* Register cross context session at replication valve thread local
* @param session cross context session
*/
protected void registerSessionAtReplicationValve(DeltaSession session) {
if(replicationValve == null) {
if(container instanceof StandardContext && ((StandardContext)container).getCrossContext()) {
Cluster cluster = getCluster() ;
if(cluster != null && cluster instanceof CatalinaCluster) {
Valve[] valves = ((CatalinaCluster)cluster).getValves();
if(valves != null && valves.length > 0) {
for(int i=0; replicationValve == null && i < valves.length ; i++ ){
if(valves[i] instanceof ReplicationValve) replicationValve = (ReplicationValve)valves[i] ;
}//for
if(replicationValve == null && log.isDebugEnabled()) {
log.debug("no ReplicationValve found for CrossContext Support");
}//endif
}//end if
}//endif
}//end if
}//end if
if(replicationValve != null) {
replicationValve.registerReplicationSession(session);
}
}
/**
* Find the master of the session state
* @return master member of sessions
*/
protected Member findSessionMasterMember() {
Member mbr = null;
Member mbrs[] = cluster.getMembers();
if(mbrs.length != 0 ) mbr = mbrs[0];
if(mbr == null && log.isWarnEnabled()) log.warn(sm.getString("deltaManager.noMasterMember",getName(), ""));
if(mbr != null && log.isDebugEnabled()) log.warn(sm.getString("deltaManager.foundMasterMember",getName(), mbr));
return mbr;
}
/**
* Wait that cluster session state is transfer or timeout after 60 Sec
* With stateTransferTimeout == -1 wait that backup is transfered (forever mode)
*/
protected void waitForSendAllSessions(long beforeSendTime) {
long reqStart = System.currentTimeMillis();
long reqNow = reqStart ;
boolean isTimeout = false;
if(getStateTransferTimeout() > 0) {
// wait that state is transfered with timeout check
do {
try {
Thread.sleep(100);
} catch (Exception sleep) {
//
}
reqNow = System.currentTimeMillis();
isTimeout = ((reqNow - reqStart) > (1000 * getStateTransferTimeout()));
} while ((!getStateTransfered()) && (!isTimeout));
} else {
if(getStateTransferTimeout() == -1) {
// wait that state is transfered
do {
try {
Thread.sleep(100);
} catch (Exception sleep) {
}
} while ((!getStateTransfered()));
reqNow = System.currentTimeMillis();
}
}
if (isTimeout || (!getStateTransfered())) {
counterNoStateTransfered++ ;
log.error(sm.getString("deltaManager.noSessionState",getName(),new Date(beforeSendTime),new Long(reqNow - beforeSendTime)));
} else {
if (log.isInfoEnabled())
log.info(sm.getString("deltaManager.sessionReceived",getName(), new Date(beforeSendTime), new Long(reqNow - beforeSendTime)));
}
}
/**
* Gracefully terminate the active use of the public methods of this
* component. This method should be the last one called on a given instance
* of this component.
*
* @exception LifecycleException
* if this component detects a fatal error that needs to be
* reported
*/
public void stop() throws LifecycleException {
if (log.isDebugEnabled())
log.debug(sm.getString("deltaManager.stopped", getName()));
// Validate and update our current component state
if (!started)
throw new LifecycleException(sm.getString("deltaManager.notStarted"));
lifecycle.fireLifecycleEvent(STOP_EVENT, null);
started = false;
// Expire all active sessions
if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.expireSessions", getName()));
Session sessions[] = findSessions();
for (int i = 0; i < sessions.length; i++) {
DeltaSession session = (DeltaSession) sessions[i];
if (!session.isValid())
continue;
try {
session.expire(true, isExpireSessionsOnShutdown());
} catch (Throwable ignore) {
;
}
}
// Require a new random number generator if we are restarted
this.random = null;
getCluster().removeManager(this);
replicationValve = null;
if (initialized) {
destroy();
}
}
// ----------------------------------------- PropertyChangeListener Methods
/**
* Process property change events from our associated Context.
*
* @param event
* The property change event that has occurred
*/
public void propertyChange(PropertyChangeEvent event) {
// Validate the source of this event
if (!(event.getSource() instanceof Context))
return;
// Process a relevant property change
if (event.getPropertyName().equals("sessionTimeout")) {
try {
setMaxInactiveInterval(((Integer) event.getNewValue()).intValue() * 60);
} catch (NumberFormatException e) {
log.error(sm.getString("deltaManager.sessionTimeout", event.getNewValue()));
}
}
}
// -------------------------------------------------------- Replication
// Methods
/**
* A message was received from another node, this is the callback method to
* implement if you are interested in receiving replication messages.
*
* @param cmsg -
* the message received.
*/
public void messageDataReceived(ClusterMessage cmsg) {
if (cmsg != null && cmsg instanceof SessionMessage) {
SessionMessage msg = (SessionMessage) cmsg;
switch (msg.getEventType()) {
case SessionMessage.EVT_GET_ALL_SESSIONS:
case SessionMessage.EVT_SESSION_CREATED:
case SessionMessage.EVT_SESSION_EXPIRED:
case SessionMessage.EVT_SESSION_ACCESSED:
case SessionMessage.EVT_SESSION_DELTA: {
synchronized(receivedMessageQueue) {
if(receiverQueue) {
receivedMessageQueue.add(msg);
return ;
}
}
break;
}
default: {
//we didn't queue, do nothing
break;
}
} //switch
messageReceived(msg, msg.getAddress() != null ? (Member) msg.getAddress() : null);
}
}
/**
* When the request has been completed, the replication valve will notify
* the manager, and the manager will decide whether any replication is
* needed or not. If there is a need for replication, the manager will
* create a session message and that will be replicated. The cluster
* determines where it gets sent.
*
* @param sessionId -
* the sessionId that just completed.
* @return a SessionMessage to be sent,
*/
public ClusterMessage requestCompleted(String sessionId) {
try {
DeltaSession session = (DeltaSession) findSession(sessionId);
DeltaRequest deltaRequest = session.getDeltaRequest();
SessionMessage msg = null;
boolean isDeltaRequest = false ;
synchronized(deltaRequest) {
isDeltaRequest = deltaRequest.getSize() > 0 ;
if (isDeltaRequest) {
counterSend_EVT_SESSION_DELTA++;
byte[] data = serializeDeltaRequest(deltaRequest);
msg = new SessionMessageImpl(getName(),
SessionMessage.EVT_SESSION_DELTA,
data,
sessionId,
sessionId + "-" + System.currentTimeMillis());
session.resetDeltaRequest();
}
}
if(!isDeltaRequest) {
if(!session.isPrimarySession()) {
counterSend_EVT_SESSION_ACCESSED++;
msg = new SessionMessageImpl(getName(),
SessionMessage.EVT_SESSION_ACCESSED,
null,
sessionId,
sessionId + "-" + System.currentTimeMillis());
if (log.isDebugEnabled()) {
log.debug(sm.getString("deltaManager.createMessage.accessChangePrimary",getName(), sessionId));
}
}
} else { // log only outside synch block!
if (log.isDebugEnabled()) {
log.debug(sm.getString("deltaManager.createMessage.delta",getName(), sessionId));
}
}
session.setPrimarySession(true);
//check to see if we need to send out an access message
if ((msg == null)) {
long replDelta = System.currentTimeMillis() - session.getLastTimeReplicated();
if (replDelta > (getMaxInactiveInterval() * 1000)) {
counterSend_EVT_SESSION_ACCESSED++;
msg = new SessionMessageImpl(getName(),
SessionMessage.EVT_SESSION_ACCESSED,
null,
sessionId,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -