📄 deltamanager.java
字号:
sessionId, sessionId + "-" + System.currentTimeMillis()); if (log.isDebugEnabled()) { log.debug(sm.getString("deltaManager.createMessage.accessChangePrimary",getName(), sessionId)); } } } else { // log only outside synch block! if (log.isDebugEnabled()) { log.debug(sm.getString("deltaManager.createMessage.delta",getName(), sessionId)); } } session.setPrimarySession(true); //check to see if we need to send out an access message if ((msg == null)) { long replDelta = System.currentTimeMillis() - session.getLastTimeReplicated(); if (replDelta > (getMaxInactiveInterval() * 1000)) { counterSend_EVT_SESSION_ACCESSED++; msg = new SessionMessageImpl(getName(), SessionMessage.EVT_SESSION_ACCESSED, null, sessionId, sessionId + "-" + System.currentTimeMillis()); if (log.isDebugEnabled()) { log.debug(sm.getString("deltaManager.createMessage.access", getName(),sessionId)); } } } //update last replicated time if (msg != null) session.setLastTimeReplicated(System.currentTimeMillis()); return msg; } catch (IOException x) { log.error(sm.getString("deltaManager.createMessage.unableCreateDeltaRequest",sessionId), x); return null; } } /** * Reset manager statistics */ public synchronized void resetStatistics() { processingTime = 0 ; expiredSessions = 0 ; rejectedSessions = 0 ; sessionReplaceCounter = 0 ; counterNoStateTransfered = 0 ; maxActive = getActiveSessions() ; sessionCounter = getActiveSessions() ; counterReceive_EVT_ALL_SESSION_DATA = 0; counterReceive_EVT_GET_ALL_SESSIONS = 0; counterReceive_EVT_SESSION_ACCESSED = 0 ; counterReceive_EVT_SESSION_CREATED = 0 ; counterReceive_EVT_SESSION_DELTA = 0 ; counterReceive_EVT_SESSION_EXPIRED = 0 ; counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0; counterSend_EVT_ALL_SESSION_DATA = 0; counterSend_EVT_GET_ALL_SESSIONS = 0; counterSend_EVT_SESSION_ACCESSED = 0 ; counterSend_EVT_SESSION_CREATED = 0 ; counterSend_EVT_SESSION_DELTA = 0 ; counterSend_EVT_SESSION_EXPIRED = 0 ; counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0; } // -------------------------------------------------------- persistence handler public void load() { } public void unload() { } // -------------------------------------------------------- expire /** * send session expired to other cluster nodes * * @param id * session id */ protected void sessionExpired(String id) { counterSend_EVT_SESSION_EXPIRED++ ; SessionMessage msg = new SessionMessageImpl(getName(),SessionMessage.EVT_SESSION_EXPIRED, null, id, id+ "-EXPIRED-MSG"); if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.expire",getName(), id)); send(msg); } /** * Exipre all find sessions. */ public void expireAllLocalSessions() { long timeNow = System.currentTimeMillis(); Session sessions[] = findSessions(); int expireDirect = 0 ; int expireIndirect = 0 ; if(log.isDebugEnabled()) log.debug("Start expire all sessions " + getName() + " at " + timeNow + " sessioncount " + sessions.length); for (int i = 0; i < sessions.length; i++) { if (sessions[i] instanceof DeltaSession) { DeltaSession session = (DeltaSession) sessions[i]; if (session.isPrimarySession()) { if (session.isValid()) { session.expire(); expireDirect++; } else { expireIndirect++; }//end if }//end if }//end if }//for long timeEnd = System.currentTimeMillis(); if(log.isDebugEnabled()) log.debug("End expire sessions " + getName() + " exipre processingTime " + (timeEnd - timeNow) + " expired direct sessions: " + expireDirect + " expired direct sessions: " + expireIndirect); } /** * When the manager expires session not tied to a request. The cluster will * periodically ask for a list of sessions that should expire and that * should be sent across the wire. * * @return The invalidated sessions array */ public String[] getInvalidatedSessions() { return new String[0]; } // -------------------------------------------------------- message receive /** * Test that sender and local domain is the same */ protected boolean checkSenderDomain(SessionMessage msg,Member sender) { String localMemberDomain = cluster.getLocalMember().getDomain(); boolean sameDomain= localMemberDomain.equals(sender.getDomain()); if (!sameDomain && log.isWarnEnabled()) { log.warn(sm.getString("deltaManager.receiveMessage.fromWrongDomain", new Object[] {getName(), msg.getEventTypeString(), sender, sender.getDomain(), localMemberDomain })); } return sameDomain ; } /** * This method is called by the received thread when a SessionMessage has * been received from one of the other nodes in the cluster. * * @param msg - * the message received * @param sender - * the sender of the message, this is used if we receive a * EVT_GET_ALL_SESSION message, so that we only reply to the * requesting node */ protected void messageReceived(SessionMessage msg, Member sender) { if(isSendClusterDomainOnly() && !checkSenderDomain(msg,sender)) { return; } try { if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.eventType",getName(), msg.getEventTypeString(), sender)); switch (msg.getEventType()) { case SessionMessage.EVT_GET_ALL_SESSIONS: { handleGET_ALL_SESSIONS(msg,sender); break; } case SessionMessage.EVT_ALL_SESSION_DATA: { handleALL_SESSION_DATA(msg,sender); break; } case SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE: { handleALL_SESSION_TRANSFERCOMPLETE(msg,sender); break; } case SessionMessage.EVT_SESSION_CREATED: { handleSESSION_CREATED(msg,sender); break; } case SessionMessage.EVT_SESSION_EXPIRED: { handleSESSION_EXPIRED(msg,sender); break; } case SessionMessage.EVT_SESSION_ACCESSED: { handleSESSION_ACCESSED(msg,sender); break; } case SessionMessage.EVT_SESSION_DELTA: { handleSESSION_DELTA(msg,sender); break; } default: { //we didn't recognize the message type, do nothing break; } } //switch } catch (Exception x) { log.error(sm.getString("deltaManager.receiveMessage.error",getName()), x); } } // -------------------------------------------------------- message receiver handler /** * handle receive session state is complete transfered * @param msg * @param sender */ protected void handleALL_SESSION_TRANSFERCOMPLETE(SessionMessage msg, Member sender) { counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE++ ; if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.transfercomplete",getName(), sender.getHost(), new Integer(sender.getPort()))); stateTransferCreateSendTime = msg.getTimestamp() ; stateTransfered = true ; } /** * handle receive session delta * @param msg * @param sender * @throws IOException * @throws ClassNotFoundException */ protected void handleSESSION_DELTA(SessionMessage msg, Member sender) throws IOException, ClassNotFoundException { counterReceive_EVT_SESSION_DELTA++; byte[] delta = msg.getSession(); DeltaSession session = (DeltaSession) findSession(msg.getSessionID()); if (session != null) { if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.delta",getName(), msg.getSessionID())); DeltaRequest dreq = deserializeDeltaRequest(session, delta); dreq.execute(session, notifyListenersOnReplication); session.setPrimarySession(false); } } /** * handle receive session is access at other node ( primary session is now false) * @param msg * @param sender * @throws IOException */ protected void handleSESSION_ACCESSED(SessionMessage msg,Member sender) throws IOException { counterReceive_EVT_SESSION_ACCESSED++; DeltaSession session = (DeltaSession) findSession(msg.getSessionID()); if (session != null) { if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.accessed",getName(), msg.getSessionID())); session.access(); session.setPrimarySession(false); session.endAccess(); } } /** * handle receive session is expire at other node ( expire session also here) * @param msg * @param sender * @throws IOException */ protected void handleSESSION_EXPIRED(SessionMessage msg,Member sender) throws IOException { counterReceive_EVT_SESSION_EXPIRED++; DeltaSession session = (DeltaSession) findSession(msg.getSessionID()); if (session != null) { if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.expired",getName(), msg.getSessionID())); session.expire(notifySessionListenersOnReplication, false); } } /** * handle receive new session is created at other node (create backup - primary false) * @param msg * @param sender */ protected void handleSESSION_CREATED(SessionMessage msg,Member sender) { counterReceive_EVT_SESSION_CREATED++; if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.createNewSession",getName(), msg.getSessionID())); DeltaSession session = (DeltaSession) createEmptySession(); session.setManager(this); session.setValid(true); session.setPrimarySession(false); session.setCreationTime(msg.getTimestamp()); session.access(); if(notifySessionListenersOnReplication) session.setId(msg.getSessionID()); else session.setIdInternal(msg.getSessionID()); session.resetDeltaRequest(); session.endAccess(); } /** * handle receive sessions from other not ( restart ) * @param msg * @param sender * @throws ClassNotFoundException * @throws IOException */ protected void handleALL_SESSION_DATA(SessionMessage msg,Member sender) throws ClassNotFoundException, IOException { counterReceive_EVT_ALL_SESSION_DATA++; if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataBegin",getName())); byte[] data = msg.getSession(); deserializeSessions(data); if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataAfter",getName())); //stateTransferred = true; } /** * handle receive that other node want all sessions ( restart ) * a) send all sessions with one message * b) send session at blocks * After sending send state is complete transfered * @param msg * @param sender * @throws IOException */ protected void handleGET_ALL_SESSIONS(SessionMessage msg, Member sender) throws IOException { counterReceive_EVT_GET_ALL_SESSIONS++; //get a list of all the session from this manager if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingBegin", getName())); // Write the number of active sessions, followed by the details // get all sessions and serialize without sync Session[] currentSessions = findSessions(); long findSessionTimestamp = System.currentTimeMillis() ; if (isSendAllSessions()) { sendSessions(sender, currentSessions, findSessionTimestamp); } else { // send session at blocks int len = currentSessions.length < getSendAllSessionsSize() ? currentSessions.length : getSendAllSessionsSize(); Session[] sendSessions = new Session[len]; for (int i = 0; i < currentSessions.length; i += getSendAllSessionsSize()) { len = i + getSendAllSessionsSize() > currentSessions.length ? currentSessions.length - i : getSendAllSessionsSize(); System.arraycopy(currentSessions, i, sendSessions, 0, len); sendSessions(sender, sendSessions,findSessionTimestamp); if (getSendAllSessionsWaitTime() > 0) { try { Thread.sleep(getSendAllSessionsWaitTime()); } catch (Exception sleep) { } }//end if }//for }//end if SessionMessage newmsg = new SessionMessageImpl(name,SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE, null,"SESSION-STATE-TRANSFERED", "SESSION-STATE-TRANSFERED"+ getName()); newmsg.setTimestamp(findSessionTimestamp); if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionTransfered",getName())); counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE++; cluster.send(newmsg, sender); } /** * send a block of session to sender * @param sender * @param currentSessions * @param sendTimestamp * @throws IOException */ protected void sendSessions(Member sender, Session[] currentSessions,long sendTimestamp) throws IOException { byte[] data = serializeSessions(currentSessions); if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingAfter",getName())); SessionMessage newmsg = new SessionMessageImpl(name,SessionMessage.EVT_ALL_SESSION_DATA, data,"SESSION-STATE", "SESSION-STATE-" + getName()); newmsg.setTimestamp(sendTimestamp); if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionData",getName())); counterSend_EVT_ALL_SESSION_DATA++; cluster.send(newmsg, sender); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -