⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 deltamanager.java

📁 精通tomcat书籍原代码,希望大家共同学习
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
                                                 sessionId,                                                 sessionId + "-" + System.currentTimeMillis());                    if (log.isDebugEnabled()) {                        log.debug(sm.getString("deltaManager.createMessage.accessChangePrimary",getName(), sessionId));                    }                }                } else { // log only outside synch block!                if (log.isDebugEnabled()) {                    log.debug(sm.getString("deltaManager.createMessage.delta",getName(), sessionId));                }            }            session.setPrimarySession(true);            //check to see if we need to send out an access message            if ((msg == null)) {                long replDelta = System.currentTimeMillis() - session.getLastTimeReplicated();                if (replDelta > (getMaxInactiveInterval() * 1000)) {                    counterSend_EVT_SESSION_ACCESSED++;                    msg = new SessionMessageImpl(getName(),                                                 SessionMessage.EVT_SESSION_ACCESSED,                                                  null,                                                 sessionId,                                                  sessionId + "-" + System.currentTimeMillis());                    if (log.isDebugEnabled()) {                        log.debug(sm.getString("deltaManager.createMessage.access", getName(),sessionId));                    }                }            }            //update last replicated time            if (msg != null) session.setLastTimeReplicated(System.currentTimeMillis());            return msg;        } catch (IOException x) {            log.error(sm.getString("deltaManager.createMessage.unableCreateDeltaRequest",sessionId), x);            return null;        }    }    /**     * Reset manager statistics     */    public synchronized void resetStatistics() {        processingTime = 0 ;        expiredSessions = 0 ;        rejectedSessions = 0 ;        sessionReplaceCounter = 0 ;        counterNoStateTransfered = 0 ;        maxActive = getActiveSessions() ;        sessionCounter = getActiveSessions() ;        counterReceive_EVT_ALL_SESSION_DATA = 0;        counterReceive_EVT_GET_ALL_SESSIONS = 0;        counterReceive_EVT_SESSION_ACCESSED = 0 ;        counterReceive_EVT_SESSION_CREATED = 0 ;        counterReceive_EVT_SESSION_DELTA = 0 ;        counterReceive_EVT_SESSION_EXPIRED = 0 ;        counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0;        counterSend_EVT_ALL_SESSION_DATA = 0;        counterSend_EVT_GET_ALL_SESSIONS = 0;        counterSend_EVT_SESSION_ACCESSED = 0 ;        counterSend_EVT_SESSION_CREATED = 0 ;        counterSend_EVT_SESSION_DELTA = 0 ;        counterSend_EVT_SESSION_EXPIRED = 0 ;        counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0;            }       //  -------------------------------------------------------- persistence handler    public void load() {    }    public void unload() {    }    //  -------------------------------------------------------- expire    /**     * send session expired to other cluster nodes     *      * @param id     *            session id     */    protected void sessionExpired(String id) {        counterSend_EVT_SESSION_EXPIRED++ ;        SessionMessage msg = new SessionMessageImpl(getName(),SessionMessage.EVT_SESSION_EXPIRED, null, id, id+ "-EXPIRED-MSG");        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.expire",getName(), id));        send(msg);    }    /**     * Exipre all find sessions.     */    public void expireAllLocalSessions()    {        long timeNow = System.currentTimeMillis();        Session sessions[] = findSessions();        int expireDirect  = 0 ;        int expireIndirect = 0 ;                if(log.isDebugEnabled()) log.debug("Start expire all sessions " + getName() + " at " + timeNow + " sessioncount " + sessions.length);        for (int i = 0; i < sessions.length; i++) {            if (sessions[i] instanceof DeltaSession) {                DeltaSession session = (DeltaSession) sessions[i];                if (session.isPrimarySession()) {                    if (session.isValid()) {                        session.expire();                        expireDirect++;                    } else {                        expireIndirect++;                    }//end if                }//end if            }//end if        }//for        long timeEnd = System.currentTimeMillis();        if(log.isDebugEnabled()) log.debug("End expire sessions " + getName() + " exipre processingTime " + (timeEnd - timeNow) + " expired direct sessions: " + expireDirect + " expired direct sessions: " + expireIndirect);          }        /**     * When the manager expires session not tied to a request. The cluster will     * periodically ask for a list of sessions that should expire and that     * should be sent across the wire.     *      * @return The invalidated sessions array     */    public String[] getInvalidatedSessions() {        return new String[0];    }    //  -------------------------------------------------------- message receive    /**     * Test that sender and local domain is the same     */    protected boolean checkSenderDomain(SessionMessage msg,Member sender) {        String localMemberDomain = cluster.getLocalMember().getDomain();        boolean sameDomain= localMemberDomain.equals(sender.getDomain());        if (!sameDomain && log.isWarnEnabled()) {                log.warn(sm.getString("deltaManager.receiveMessage.fromWrongDomain",                         new Object[] {getName(),                          msg.getEventTypeString(),                          sender,                         sender.getDomain(),                         localMemberDomain }));        }        return sameDomain ;    }    /**     * This method is called by the received thread when a SessionMessage has     * been received from one of the other nodes in the cluster.     *      * @param msg -     *            the message received     * @param sender -     *            the sender of the message, this is used if we receive a     *            EVT_GET_ALL_SESSION message, so that we only reply to the     *            requesting node     */    protected void messageReceived(SessionMessage msg, Member sender) {        if(isSendClusterDomainOnly() && !checkSenderDomain(msg,sender)) {            return;        }        try {            if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.eventType",getName(), msg.getEventTypeString(), sender));             switch (msg.getEventType()) {                case SessionMessage.EVT_GET_ALL_SESSIONS: {                    handleGET_ALL_SESSIONS(msg,sender);                    break;                }                case SessionMessage.EVT_ALL_SESSION_DATA: {                    handleALL_SESSION_DATA(msg,sender);                    break;                }                case SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE: {                    handleALL_SESSION_TRANSFERCOMPLETE(msg,sender);                    break;                }                case SessionMessage.EVT_SESSION_CREATED: {                    handleSESSION_CREATED(msg,sender);                    break;                }                case SessionMessage.EVT_SESSION_EXPIRED: {                    handleSESSION_EXPIRED(msg,sender);                    break;                }                case SessionMessage.EVT_SESSION_ACCESSED: {                    handleSESSION_ACCESSED(msg,sender);                    break;                }                case SessionMessage.EVT_SESSION_DELTA: {                   handleSESSION_DELTA(msg,sender);                   break;                }                default: {                    //we didn't recognize the message type, do nothing                    break;                }            } //switch        } catch (Exception x) {            log.error(sm.getString("deltaManager.receiveMessage.error",getName()), x);        }    }    // -------------------------------------------------------- message receiver handler    /**     * handle receive session state is complete transfered     * @param msg     * @param sender     */    protected void handleALL_SESSION_TRANSFERCOMPLETE(SessionMessage msg, Member sender) {        counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE++ ;        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.transfercomplete",getName(), sender.getHost(), new Integer(sender.getPort())));        stateTransferCreateSendTime = msg.getTimestamp() ;        stateTransfered = true ;    }    /**     * handle receive session delta     * @param msg     * @param sender     * @throws IOException     * @throws ClassNotFoundException     */    protected void handleSESSION_DELTA(SessionMessage msg, Member sender) throws IOException, ClassNotFoundException {        counterReceive_EVT_SESSION_DELTA++;        byte[] delta = msg.getSession();        DeltaSession session = (DeltaSession) findSession(msg.getSessionID());        if (session != null) {            if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.delta",getName(), msg.getSessionID()));            DeltaRequest dreq = deserializeDeltaRequest(session, delta);            dreq.execute(session, notifyListenersOnReplication);            session.setPrimarySession(false);        }    }    /**     * handle receive session is access at other node ( primary session is now false)     * @param msg     * @param sender     * @throws IOException     */    protected void handleSESSION_ACCESSED(SessionMessage msg,Member sender) throws IOException {        counterReceive_EVT_SESSION_ACCESSED++;        DeltaSession session = (DeltaSession) findSession(msg.getSessionID());        if (session != null) {            if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.accessed",getName(), msg.getSessionID()));            session.access();            session.setPrimarySession(false);            session.endAccess();        }    }    /**     * handle receive session is expire at other node ( expire session also here)     * @param msg     * @param sender     * @throws IOException     */    protected void handleSESSION_EXPIRED(SessionMessage msg,Member sender) throws IOException {        counterReceive_EVT_SESSION_EXPIRED++;        DeltaSession session = (DeltaSession) findSession(msg.getSessionID());        if (session != null) {            if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.expired",getName(), msg.getSessionID()));            session.expire(notifySessionListenersOnReplication, false);        }    }    /**     * handle receive new session is created at other node (create backup - primary false)     * @param msg     * @param sender     */    protected void handleSESSION_CREATED(SessionMessage msg,Member sender) {        counterReceive_EVT_SESSION_CREATED++;        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.createNewSession",getName(), msg.getSessionID()));        DeltaSession session = (DeltaSession) createEmptySession();        session.setManager(this);        session.setValid(true);        session.setPrimarySession(false);        session.setCreationTime(msg.getTimestamp());        session.access();        if(notifySessionListenersOnReplication)            session.setId(msg.getSessionID());        else            session.setIdInternal(msg.getSessionID());        session.resetDeltaRequest();        session.endAccess();    }    /**     * handle receive sessions from other not ( restart )     * @param msg     * @param sender     * @throws ClassNotFoundException     * @throws IOException     */    protected void handleALL_SESSION_DATA(SessionMessage msg,Member sender) throws ClassNotFoundException, IOException {        counterReceive_EVT_ALL_SESSION_DATA++;        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataBegin",getName()));        byte[] data = msg.getSession();        deserializeSessions(data);        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataAfter",getName()));        //stateTransferred = true;    }    /**     * handle receive that other node want all sessions ( restart )     * a) send all sessions with one message     * b) send session at blocks     * After sending send state is complete transfered     * @param msg     * @param sender     * @throws IOException     */    protected void handleGET_ALL_SESSIONS(SessionMessage msg, Member sender) throws IOException {        counterReceive_EVT_GET_ALL_SESSIONS++;        //get a list of all the session from this manager        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingBegin", getName()));        // Write the number of active sessions, followed by the details        // get all sessions and serialize without sync        Session[] currentSessions = findSessions();        long findSessionTimestamp = System.currentTimeMillis() ;        if (isSendAllSessions()) {            sendSessions(sender, currentSessions, findSessionTimestamp);        } else {            // send session at blocks            int len = currentSessions.length < getSendAllSessionsSize() ? currentSessions.length : getSendAllSessionsSize();            Session[] sendSessions = new Session[len];            for (int i = 0; i < currentSessions.length; i += getSendAllSessionsSize()) {                len = i + getSendAllSessionsSize() > currentSessions.length ? currentSessions.length - i : getSendAllSessionsSize();                System.arraycopy(currentSessions, i, sendSessions, 0, len);                sendSessions(sender, sendSessions,findSessionTimestamp);                if (getSendAllSessionsWaitTime() > 0) {                    try {                        Thread.sleep(getSendAllSessionsWaitTime());                    } catch (Exception sleep) {                    }                }//end if            }//for        }//end if                SessionMessage newmsg = new SessionMessageImpl(name,SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE, null,"SESSION-STATE-TRANSFERED", "SESSION-STATE-TRANSFERED"+ getName());        newmsg.setTimestamp(findSessionTimestamp);        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionTransfered",getName()));        counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE++;        cluster.send(newmsg, sender);    }    /**     * send a block of session to sender     * @param sender     * @param currentSessions     * @param sendTimestamp     * @throws IOException     */    protected void sendSessions(Member sender, Session[] currentSessions,long sendTimestamp) throws IOException {        byte[] data = serializeSessions(currentSessions);        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingAfter",getName()));        SessionMessage newmsg = new SessionMessageImpl(name,SessionMessage.EVT_ALL_SESSION_DATA, data,"SESSION-STATE", "SESSION-STATE-" + getName());        newmsg.setTimestamp(sendTimestamp);        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionData",getName()));        counterSend_EVT_ALL_SESSION_DATA++;        cluster.send(newmsg, sender);    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -