📄 deltamanager.java
字号:
//request session state
cluster.send(msg, mbr);
log.warn("Manager["+getName()+"], requesting session state from "+mbr+
". This operation will timeout if no session state has been received within "+
"60 seconds");
long reqStart = System.currentTimeMillis();
long reqNow = 0;
boolean isTimeout=false;
do {
try {
Thread.currentThread().sleep(100);
}catch ( Exception sleep) {}
reqNow = System.currentTimeMillis();
isTimeout=((reqNow-reqStart)>(1000*60));
} while ( (!getStateTransferred()) && (!isTimeout));
if ( isTimeout || (!getStateTransferred()) ) {
log.error("Manager["+getName()+"], No session state received, timing out.");
}else {
log.info("Manager["+getName()+"], session state received in "+(reqNow-reqStart)+" ms.");
}
} else {
log.info("Manager["+getName()+"], skipping state transfer. No members active in cluster group.");
}//end if
} catch (Throwable t) {
log.error(sm.getString("standardManager.managerLoad"), t);
}
}
/**
* Gracefully terminate the active use of the public methods of this
* component. This method should be the last one called on a given
* instance of this component.
*
* @exception LifecycleException if this component detects a fatal error
* that needs to be reported
*/
public void stop() throws LifecycleException {
if (log.isDebugEnabled())
log.debug("Stopping");
// Validate and update our current component state
if (!started)
throw new LifecycleException
(sm.getString("standardManager.notStarted"));
lifecycle.fireLifecycleEvent(STOP_EVENT, null);
started = false;
// Expire all active sessions
if ( this.getExpireSessionsOnShutdown() ) {
log.info("Expiring sessions upon shutdown");
Session sessions[] = findSessions();
for (int i = 0; i < sessions.length; i++) {
DeltaSession session = (DeltaSession) sessions[i];
if (!session.isValid())
continue;
try {
session.expire();
}
catch (Throwable t) {
;
} //catch
} //for
}//end if
// Require a new random number generator if we are restarted
this.random = null;
if( initialized ) {
destroy();
}
}
// ----------------------------------------- PropertyChangeListener Methods
/**
* Process property change events from our associated Context.
*
* @param event The property change event that has occurred
*/
public void propertyChange(PropertyChangeEvent event) {
// Validate the source of this event
if (!(event.getSource() instanceof Context))
return;
Context context = (Context) event.getSource();
// Process a relevant property change
if (event.getPropertyName().equals("sessionTimeout")) {
try {
setMaxInactiveInterval
( ((Integer) event.getNewValue()).intValue()*60 );
} catch (NumberFormatException e) {
log.error(sm.getString("standardManager.sessionTimeout",
event.getNewValue().toString()));
}
}
}
// -------------------------------------------------------- Replication Methods
/**
* A message was received from another node, this
* is the callback method to implement if you are interested in
* receiving replication messages.
* @param msg - the message received.
*/
public void messageDataReceived(SessionMessage msg) {
messageReceived(msg, msg.getAddress()!=null?(Member)msg.getAddress():null);
}
/**
* When the request has been completed, the replication valve
* will notify the manager, and the manager will decide whether
* any replication is needed or not.
* If there is a need for replication, the manager will
* create a session message and that will be replicated.
* The cluster determines where it gets sent.
* @param sessionId - the sessionId that just completed.
* @return a SessionMessage to be sent,
*/
public SessionMessage requestCompleted(String sessionId) {
try {
DeltaSession session = (DeltaSession) findSession(sessionId);
DeltaRequest deltaRequest = session.getDeltaRequest();
SessionMessage msg = null;
if (deltaRequest.getSize() > 0) {
byte[] data = unloadDeltaRequest(deltaRequest);
msg = new SessionMessage(name, SessionMessage.EVT_SESSION_DELTA,
data, sessionId);
session.resetDeltaRequest();
} else if ( !session.isPrimarySession() ) {
msg = new SessionMessage(getName(),
SessionMessage.EVT_SESSION_ACCESSED,
null,
sessionId);
}
session.setPrimarySession(true);
//check to see if we need to send out an access message
if ( (msg == null) ) {
long replDelta = System.currentTimeMillis() - session.getLastTimeReplicated();
if ( replDelta > (getMaxInactiveInterval()*1000) ) {
msg = new SessionMessage(getName(),
SessionMessage.EVT_SESSION_ACCESSED,
null,
sessionId);
}
}
//update last replicated time
if ( msg != null ) session.setLastTimeReplicated(System.currentTimeMillis());
return msg;
}
catch (IOException x) {
log.error("Unable to serialize delta request", x);
return null;
}
}
protected void sessionExpired(String id) {
SessionMessage msg = new SessionMessage(getName(),
SessionMessage.EVT_SESSION_EXPIRED,
null,
id);
cluster.send(msg);
}
/**
* When the manager expires session not tied to a request.
* The cluster will periodically ask for a list of sessions
* that should expire and that should be sent across the wire.
* @return
*/
public String[] getInvalidatedSessions() {
return new String[0];
}
/**
* This method is called by the received thread when a SessionMessage has
* been received from one of the other nodes in the cluster.
* @param msg - the message received
* @param sender - the sender of the message, this is used if we receive a
* EVT_GET_ALL_SESSION message, so that we only reply to
* the requesting node
*/
protected void messageReceived(SessionMessage msg, Member sender) {
try {
log.debug("Manager ("+name+") Received SessionMessage of type=" + msg.getEventTypeString()+" from "+sender);
switch (msg.getEventType()) {
case SessionMessage.EVT_GET_ALL_SESSIONS: {
//get a list of all the session from this manager
byte[] data = doUnload();
SessionMessage newmsg = new SessionMessage(name,
SessionMessage.EVT_ALL_SESSION_DATA,
data, "");
cluster.send(newmsg, sender);
break;
}
case SessionMessage.EVT_ALL_SESSION_DATA: {
byte[] data = msg.getSession();
doLoad(data);
stateTransferred = true;
break;
}
case SessionMessage.EVT_SESSION_CREATED: {
DeltaSession session = (DeltaSession)createSession(false);
session.setId(msg.getSessionID());
session.setNew(false);
session.setPrimarySession(false);
session.resetDeltaRequest();
break;
}
case SessionMessage.EVT_SESSION_EXPIRED: {
DeltaSession session = (DeltaSession)findSession(msg.getSessionID());
if (session != null) {
session.expire(true,false);
} //end if
break;
}
case SessionMessage.EVT_SESSION_ACCESSED: {
DeltaSession session = (DeltaSession)findSession(msg.getSessionID());
if (session != null) {
session.access();
session.setPrimarySession(false);
}
break;
}
case SessionMessage.EVT_SESSION_DELTA : {
byte[] delta = msg.getSession();
DeltaSession session = (DeltaSession)findSession(msg.getSessionID());
if (session != null) {
DeltaRequest dreq = loadDeltaRequest(session, delta);
dreq.execute(session);
session.setPrimarySession(false);
session.access();
}
break;
}
default: {
//we didn't recognize the message type, do nothing
break;
}
} //switch
}
catch (Exception x) {
log.error("Unable to receive message through TCP channel", x);
}
}
// -------------------------------------------------------- Private Methods
public void backgroundProcess() {
log.debug("DeltaManager.backgroundProcess invoked at "+System.currentTimeMillis());
processExpires();
}
/**
* Invalidate all sessions that have expired.
*/
public void processExpires() {
long timeNow = System.currentTimeMillis();
Session sessions[] = findSessions();
for (int i = 0; i < sessions.length; i++) {
DeltaSession session = (DeltaSession) sessions[i];
if (!session.isValid()) {
try {
expiredSessions++;
} catch (Throwable t) {
log.error(sm.getString
("standardManager.expireException"), t);
}
}
}
long timeEnd = System.currentTimeMillis();
processingTime += ( timeEnd - timeNow );
}
public boolean getStateTransferred() {
return stateTransferred;
}
public void setStateTransferred(boolean stateTransferred) {
this.stateTransferred = stateTransferred;
}
public CatalinaCluster getCluster() {
return cluster;
}
public void setCluster(CatalinaCluster cluster) {
this.cluster = cluster;
}
public void load() {
}
public void unload() {
}
public boolean getUseDirtyFlag() {
return useDirtyFlag;
}
public void setUseDirtyFlag(boolean useDirtyFlag) {
this.useDirtyFlag = useDirtyFlag;
}
public boolean getExpireSessionsOnShutdown() {
return expireSessionsOnShutdown;
}
public void setExpireSessionsOnShutdown(boolean expireSessionsOnShutdown) {
this.expireSessionsOnShutdown = expireSessionsOnShutdown;
}
public boolean getPrintToScreen() {
return printToScreen;
}
public void setPrintToScreen(boolean printToScreen) {
this.printToScreen = printToScreen;
}
public void setName(String name) {
this.name = name;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -