📄 deltamanager.java
字号:
sessionId + "-" + System.currentTimeMillis());
if (log.isDebugEnabled()) {
log.debug(sm.getString("deltaManager.createMessage.access", getName(),sessionId));
}
}
}
//update last replicated time
if (msg != null) session.setLastTimeReplicated(System.currentTimeMillis());
return msg;
} catch (IOException x) {
log.error(sm.getString("deltaManager.createMessage.unableCreateDeltaRequest",sessionId), x);
return null;
}
}
/**
* Reset manager statistics
*/
public synchronized void resetStatistics() {
processingTime = 0 ;
expiredSessions = 0 ;
rejectedSessions = 0 ;
sessionReplaceCounter = 0 ;
counterNoStateTransfered = 0 ;
maxActive = getActiveSessions() ;
sessionCounter = getActiveSessions() ;
counterReceive_EVT_ALL_SESSION_DATA = 0;
counterReceive_EVT_GET_ALL_SESSIONS = 0;
counterReceive_EVT_SESSION_ACCESSED = 0 ;
counterReceive_EVT_SESSION_CREATED = 0 ;
counterReceive_EVT_SESSION_DELTA = 0 ;
counterReceive_EVT_SESSION_EXPIRED = 0 ;
counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0;
counterSend_EVT_ALL_SESSION_DATA = 0;
counterSend_EVT_GET_ALL_SESSIONS = 0;
counterSend_EVT_SESSION_ACCESSED = 0 ;
counterSend_EVT_SESSION_CREATED = 0 ;
counterSend_EVT_SESSION_DELTA = 0 ;
counterSend_EVT_SESSION_EXPIRED = 0 ;
counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0;
}
// -------------------------------------------------------- persistence handler
public void load() {
}
public void unload() {
}
// -------------------------------------------------------- expire
/**
* send session expired to other cluster nodes
*
* @param id
* session id
*/
protected void sessionExpired(String id) {
counterSend_EVT_SESSION_EXPIRED++ ;
SessionMessage msg = new SessionMessageImpl(getName(),SessionMessage.EVT_SESSION_EXPIRED, null, id, id+ "-EXPIRED-MSG");
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.expire",getName(), id));
send(msg);
}
/**
* Exipre all find sessions.
*/
public void expireAllLocalSessions()
{
long timeNow = System.currentTimeMillis();
Session sessions[] = findSessions();
int expireDirect = 0 ;
int expireIndirect = 0 ;
if(log.isDebugEnabled()) log.debug("Start expire all sessions " + getName() + " at " + timeNow + " sessioncount " + sessions.length);
for (int i = 0; i < sessions.length; i++) {
if (sessions[i] instanceof DeltaSession) {
DeltaSession session = (DeltaSession) sessions[i];
if (session.isPrimarySession()) {
if (session.isValid()) {
session.expire();
expireDirect++;
} else {
expireIndirect++;
}//end if
}//end if
}//end if
}//for
long timeEnd = System.currentTimeMillis();
if(log.isDebugEnabled()) log.debug("End expire sessions " + getName() + " exipre processingTime " + (timeEnd - timeNow) + " expired direct sessions: " + expireDirect + " expired direct sessions: " + expireIndirect);
}
/**
* When the manager expires session not tied to a request. The cluster will
* periodically ask for a list of sessions that should expire and that
* should be sent across the wire.
*
* @return The invalidated sessions array
*/
public String[] getInvalidatedSessions() {
return new String[0];
}
// -------------------------------------------------------- message receive
/**
* Test that sender and local domain is the same
*/
protected boolean checkSenderDomain(SessionMessage msg,Member sender) {
boolean sameDomain= true;
if (!sameDomain && log.isWarnEnabled()) {
log.warn(sm.getString("deltaManager.receiveMessage.fromWrongDomain",
new Object[] {getName(),
msg.getEventTypeString(),
sender,
"",
"" }));
}
return sameDomain ;
}
/**
* This method is called by the received thread when a SessionMessage has
* been received from one of the other nodes in the cluster.
*
* @param msg -
* the message received
* @param sender -
* the sender of the message, this is used if we receive a
* EVT_GET_ALL_SESSION message, so that we only reply to the
* requesting node
*/
protected void messageReceived(SessionMessage msg, Member sender) {
if(doDomainReplication() && !checkSenderDomain(msg,sender)) {
return;
}
ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
try {
ClassLoader[] loaders = getClassLoaders();
if ( loaders != null && loaders.length > 0) Thread.currentThread().setContextClassLoader(loaders[0]);
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.eventType",getName(), msg.getEventTypeString(), sender));
switch (msg.getEventType()) {
case SessionMessage.EVT_GET_ALL_SESSIONS: {
handleGET_ALL_SESSIONS(msg,sender);
break;
}
case SessionMessage.EVT_ALL_SESSION_DATA: {
handleALL_SESSION_DATA(msg,sender);
break;
}
case SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE: {
handleALL_SESSION_TRANSFERCOMPLETE(msg,sender);
break;
}
case SessionMessage.EVT_SESSION_CREATED: {
handleSESSION_CREATED(msg,sender);
break;
}
case SessionMessage.EVT_SESSION_EXPIRED: {
handleSESSION_EXPIRED(msg,sender);
break;
}
case SessionMessage.EVT_SESSION_ACCESSED: {
handleSESSION_ACCESSED(msg,sender);
break;
}
case SessionMessage.EVT_SESSION_DELTA: {
handleSESSION_DELTA(msg,sender);
break;
}
default: {
//we didn't recognize the message type, do nothing
break;
}
} //switch
} catch (Exception x) {
log.error(sm.getString("deltaManager.receiveMessage.error",getName()), x);
} finally {
Thread.currentThread().setContextClassLoader(contextLoader);
}
}
// -------------------------------------------------------- message receiver handler
/**
* handle receive session state is complete transfered
* @param msg
* @param sender
*/
protected void handleALL_SESSION_TRANSFERCOMPLETE(SessionMessage msg, Member sender) {
counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE++ ;
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.transfercomplete",getName(), sender.getHost(), new Integer(sender.getPort())));
stateTransferCreateSendTime = msg.getTimestamp() ;
stateTransfered = true ;
}
/**
* handle receive session delta
* @param msg
* @param sender
* @throws IOException
* @throws ClassNotFoundException
*/
protected void handleSESSION_DELTA(SessionMessage msg, Member sender) throws IOException, ClassNotFoundException {
counterReceive_EVT_SESSION_DELTA++;
byte[] delta = msg.getSession();
DeltaSession session = (DeltaSession) findSession(msg.getSessionID());
if (session != null) {
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.delta",getName(), msg.getSessionID()));
DeltaRequest dreq = deserializeDeltaRequest(session, delta);
dreq.execute(session, notifyListenersOnReplication);
session.setPrimarySession(false);
}
}
/**
* handle receive session is access at other node ( primary session is now false)
* @param msg
* @param sender
* @throws IOException
*/
protected void handleSESSION_ACCESSED(SessionMessage msg,Member sender) throws IOException {
counterReceive_EVT_SESSION_ACCESSED++;
DeltaSession session = (DeltaSession) findSession(msg.getSessionID());
if (session != null) {
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.accessed",getName(), msg.getSessionID()));
session.access();
session.setPrimarySession(false);
session.endAccess();
}
}
/**
* handle receive session is expire at other node ( expire session also here)
* @param msg
* @param sender
* @throws IOException
*/
protected void handleSESSION_EXPIRED(SessionMessage msg,Member sender) throws IOException {
counterReceive_EVT_SESSION_EXPIRED++;
DeltaSession session = (DeltaSession) findSession(msg.getSessionID());
if (session != null) {
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.expired",getName(), msg.getSessionID()));
session.expire(notifySessionListenersOnReplication, false);
}
}
/**
* handle receive new session is created at other node (create backup - primary false)
* @param msg
* @param sender
*/
protected void handleSESSION_CREATED(SessionMessage msg,Member sender) {
counterReceive_EVT_SESSION_CREATED++;
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.createNewSession",getName(), msg.getSessionID()));
DeltaSession session = (DeltaSession) createEmptySession();
session.setManager(this);
session.setValid(true);
session.setPrimarySession(false);
session.setCreationTime(msg.getTimestamp());
session.access();
if(notifySessionListenersOnReplication)
session.setId(msg.getSessionID());
else
session.setIdInternal(msg.getSessionID());
session.resetDeltaRequest();
session.endAccess();
}
/**
* handle receive sessions from other not ( restart )
* @param msg
* @param sender
* @throws ClassNotFoundException
* @throws IOException
*/
protected void handleALL_SESSION_DATA(SessionMessage msg,Member sender) throws ClassNotFoundException, IOException {
counterReceive_EVT_ALL_SESSION_DATA++;
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataBegin",getName()));
byte[] data = msg.getSession();
deserializeSessions(data);
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataAfter",getName()));
//stateTransferred = true;
}
/**
* handle receive that other node want all sessions ( restart )
* a) send all sessions with one message
* b) send session at blocks
* After sending send state is complete transfered
* @param msg
* @param sender
* @throws IOException
*/
protected void handleGET_ALL_SESSIONS(SessionMessage msg, Member sender) throws IOException {
counterReceive_EVT_GET_ALL_SESSIONS++;
//get a list of all the session from this manager
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingBegin", getName()));
// Write the number of active sessions, followed by the details
// get all sessions and serialize without sync
Session[] currentSessions = findSessions();
long findSessionTimestamp = System.currentTimeMillis() ;
if (isSendAllSessions()) {
sendSessions(sender, currentSessions, findSessionTimestamp);
} else {
// send session at blocks
int len = currentSessions.length < getSendAllSessionsSize() ? currentSessions.length : getSendAllSessionsSize();
Session[] sendSessions = new Session[len];
for (int i = 0; i < currentSessions.length; i += getSendAllSessionsSize()) {
len = i + getSendAllSessionsSize() > currentSessions.length ? currentSessions.length - i : getSendAllSessionsSize();
System.arraycopy(currentSessions, i, sendSessions, 0, len);
sendSessions(sender, sendSessions,findSessionTimestamp);
if (getSendAllSessionsWaitTime() > 0) {
try {
Thread.sleep(getSendAllSessionsWaitTime());
} catch (Exception sleep) {
}
}//end if
}//for
}//end if
SessionMessage newmsg = new SessionMessageImpl(name,SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE, null,"SESSION-STATE-TRANSFERED", "SESSION-STATE-TRANSFERED"+ getName());
newmsg.setTimestamp(findSessionTimestamp);
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionTransfered",getName()));
counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE++;
cluster.send(newmsg, sender);
}
/**
* send a block of session to sender
* @param sender
* @param currentSessions
* @param sendTimestamp
* @throws IOException
*/
protected void sendSessions(Member sender, Session[] currentSessions,long sendTimestamp) throws IOException {
byte[] data = serializeSessions(currentSessions);
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingAfter",getName()));
SessionMessage newmsg = new SessionMessageImpl(name,SessionMessage.EVT_ALL_SESSION_DATA, data,"SESSION-STATE", "SESSION-STATE-" + getName());
newmsg.setTimestamp(sendTimestamp);
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionData",getName()));
counterSend_EVT_ALL_SESSION_DATA++;
cluster.send(newmsg, sender);
}
public ClusterManager cloneFromTemplate() {
DeltaManager result = new DeltaManager();
result.name = "Clone-from-"+name;
result.cluster = cluster;
result.replicationValve = replicationValve;
result.maxActiveSessions = maxActiveSessions;
result.expireSessionsOnShutdown = expireSessionsOnShutdown;
result.notifyListenersOnReplication = notifyListenersOnReplication;
result.notifySessionListenersOnReplication = notifySessionListenersOnReplication;
result.stateTransferTimeout = stateTransferTimeout;
result.sendAllSessions = sendAllSessions;
result.sendClusterDomainOnly = sendClusterDomainOnly ;
result.sendAllSessionsSize = sendAllSessionsSize;
result.sendAllSessionsWaitTime = sendAllSessionsWaitTime ;
result.receiverQueue = receiverQueue ;
result.stateTimestampDrop = stateTimestampDrop ;
result.stateTransferCreateSendTime = stateTransferCreateSendTime;
return result;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -