⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 deltamanager.java

📁 业界著名的tomcat服务器的最新6.0的源代码。
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
                                                 sessionId + "-" + System.currentTimeMillis());
                    if (log.isDebugEnabled()) {
                        log.debug(sm.getString("deltaManager.createMessage.access", getName(),sessionId));
                    }
                }

            }

            //update last replicated time
            if (msg != null) session.setLastTimeReplicated(System.currentTimeMillis());
            return msg;
        } catch (IOException x) {
            log.error(sm.getString("deltaManager.createMessage.unableCreateDeltaRequest",sessionId), x);
            return null;
        }

    }
    /**
     * Reset manager statistics
     */
    public synchronized void resetStatistics() {
        processingTime = 0 ;
        expiredSessions = 0 ;
        rejectedSessions = 0 ;
        sessionReplaceCounter = 0 ;
        counterNoStateTransfered = 0 ;
        maxActive = getActiveSessions() ;
        sessionCounter = getActiveSessions() ;
        counterReceive_EVT_ALL_SESSION_DATA = 0;
        counterReceive_EVT_GET_ALL_SESSIONS = 0;
        counterReceive_EVT_SESSION_ACCESSED = 0 ;
        counterReceive_EVT_SESSION_CREATED = 0 ;
        counterReceive_EVT_SESSION_DELTA = 0 ;
        counterReceive_EVT_SESSION_EXPIRED = 0 ;
        counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0;
        counterSend_EVT_ALL_SESSION_DATA = 0;
        counterSend_EVT_GET_ALL_SESSIONS = 0;
        counterSend_EVT_SESSION_ACCESSED = 0 ;
        counterSend_EVT_SESSION_CREATED = 0 ;
        counterSend_EVT_SESSION_DELTA = 0 ;
        counterSend_EVT_SESSION_EXPIRED = 0 ;
        counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0;
        
    }
   
    //  -------------------------------------------------------- persistence handler

    public void load() {

    }

    public void unload() {

    }

    //  -------------------------------------------------------- expire

    /**
     * send session expired to other cluster nodes
     * 
     * @param id
     *            session id
     */
    protected void sessionExpired(String id) {
        counterSend_EVT_SESSION_EXPIRED++ ;
        SessionMessage msg = new SessionMessageImpl(getName(),SessionMessage.EVT_SESSION_EXPIRED, null, id, id+ "-EXPIRED-MSG");
        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.expire",getName(), id));
        send(msg);
    }

    /**
     * Exipre all find sessions.
     */
    public void expireAllLocalSessions()
    {
        long timeNow = System.currentTimeMillis();
        Session sessions[] = findSessions();
        int expireDirect  = 0 ;
        int expireIndirect = 0 ;
        
        if(log.isDebugEnabled()) log.debug("Start expire all sessions " + getName() + " at " + timeNow + " sessioncount " + sessions.length);
        for (int i = 0; i < sessions.length; i++) {
            if (sessions[i] instanceof DeltaSession) {
                DeltaSession session = (DeltaSession) sessions[i];
                if (session.isPrimarySession()) {
                    if (session.isValid()) {
                        session.expire();
                        expireDirect++;
                    } else {
                        expireIndirect++;
                    }//end if
                }//end if
            }//end if
        }//for
        long timeEnd = System.currentTimeMillis();
        if(log.isDebugEnabled()) log.debug("End expire sessions " + getName() + " exipre processingTime " + (timeEnd - timeNow) + " expired direct sessions: " + expireDirect + " expired direct sessions: " + expireIndirect);
      
    }
    
    /**
     * When the manager expires session not tied to a request. The cluster will
     * periodically ask for a list of sessions that should expire and that
     * should be sent across the wire.
     * 
     * @return The invalidated sessions array
     */
    public String[] getInvalidatedSessions() {
        return new String[0];
    }

    //  -------------------------------------------------------- message receive

    /**
     * Test that sender and local domain is the same
     */
    protected boolean checkSenderDomain(SessionMessage msg,Member sender) {
        boolean sameDomain= true;
        if (!sameDomain && log.isWarnEnabled()) {
                log.warn(sm.getString("deltaManager.receiveMessage.fromWrongDomain",
                         new Object[] {getName(), 
                         msg.getEventTypeString(), 
                         sender,
                         "",
                         "" }));
        }
        return sameDomain ;
    }

    /**
     * This method is called by the received thread when a SessionMessage has
     * been received from one of the other nodes in the cluster.
     * 
     * @param msg -
     *            the message received
     * @param sender -
     *            the sender of the message, this is used if we receive a
     *            EVT_GET_ALL_SESSION message, so that we only reply to the
     *            requesting node
     */
    protected void messageReceived(SessionMessage msg, Member sender) {
        if(doDomainReplication() && !checkSenderDomain(msg,sender)) {
            return;
        }
        ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
        try {
            
            ClassLoader[] loaders = getClassLoaders();
            if ( loaders != null && loaders.length > 0) Thread.currentThread().setContextClassLoader(loaders[0]);
            if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.eventType",getName(), msg.getEventTypeString(), sender));
 
            switch (msg.getEventType()) {
                case SessionMessage.EVT_GET_ALL_SESSIONS: {
                    handleGET_ALL_SESSIONS(msg,sender);
                    break;
                }
                case SessionMessage.EVT_ALL_SESSION_DATA: {
                    handleALL_SESSION_DATA(msg,sender);
                    break;
                }
                case SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE: {
                    handleALL_SESSION_TRANSFERCOMPLETE(msg,sender);
                    break;
                }
                case SessionMessage.EVT_SESSION_CREATED: {
                    handleSESSION_CREATED(msg,sender);
                    break;
                }
                case SessionMessage.EVT_SESSION_EXPIRED: {
                    handleSESSION_EXPIRED(msg,sender);
                    break;
                }
                case SessionMessage.EVT_SESSION_ACCESSED: {
                    handleSESSION_ACCESSED(msg,sender);
                    break;
                }
                case SessionMessage.EVT_SESSION_DELTA: {
                   handleSESSION_DELTA(msg,sender);
                   break;
                }
                default: {
                    //we didn't recognize the message type, do nothing
                    break;
                }
            } //switch
        } catch (Exception x) {
            log.error(sm.getString("deltaManager.receiveMessage.error",getName()), x);
        } finally {
            Thread.currentThread().setContextClassLoader(contextLoader);
        }
    }

    // -------------------------------------------------------- message receiver handler


    /**
     * handle receive session state is complete transfered
     * @param msg
     * @param sender
     */
    protected void handleALL_SESSION_TRANSFERCOMPLETE(SessionMessage msg, Member sender) {
        counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE++ ;
        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.transfercomplete",getName(), sender.getHost(), new Integer(sender.getPort())));
        stateTransferCreateSendTime = msg.getTimestamp() ;
        stateTransfered = true ;
    }

    /**
     * handle receive session delta
     * @param msg
     * @param sender
     * @throws IOException
     * @throws ClassNotFoundException
     */
    protected void handleSESSION_DELTA(SessionMessage msg, Member sender) throws IOException, ClassNotFoundException {
        counterReceive_EVT_SESSION_DELTA++;
        byte[] delta = msg.getSession();
        DeltaSession session = (DeltaSession) findSession(msg.getSessionID());
        if (session != null) {
            if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.delta",getName(), msg.getSessionID()));
            DeltaRequest dreq = deserializeDeltaRequest(session, delta);
            dreq.execute(session, notifyListenersOnReplication);
            session.setPrimarySession(false);
        }
    }

    /**
     * handle receive session is access at other node ( primary session is now false)
     * @param msg
     * @param sender
     * @throws IOException
     */
    protected void handleSESSION_ACCESSED(SessionMessage msg,Member sender) throws IOException {
        counterReceive_EVT_SESSION_ACCESSED++;
        DeltaSession session = (DeltaSession) findSession(msg.getSessionID());
        if (session != null) {
            if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.accessed",getName(), msg.getSessionID()));
            session.access();
            session.setPrimarySession(false);
            session.endAccess();
        }
    }

    /**
     * handle receive session is expire at other node ( expire session also here)
     * @param msg
     * @param sender
     * @throws IOException
     */
    protected void handleSESSION_EXPIRED(SessionMessage msg,Member sender) throws IOException {
        counterReceive_EVT_SESSION_EXPIRED++;
        DeltaSession session = (DeltaSession) findSession(msg.getSessionID());
        if (session != null) {
            if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.expired",getName(), msg.getSessionID()));
            session.expire(notifySessionListenersOnReplication, false);
        }
    }

    /**
     * handle receive new session is created at other node (create backup - primary false)
     * @param msg
     * @param sender
     */
    protected void handleSESSION_CREATED(SessionMessage msg,Member sender) {
        counterReceive_EVT_SESSION_CREATED++;
        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.createNewSession",getName(), msg.getSessionID()));
        DeltaSession session = (DeltaSession) createEmptySession();
        session.setManager(this);
        session.setValid(true);
        session.setPrimarySession(false);
        session.setCreationTime(msg.getTimestamp());
        session.access();
        if(notifySessionListenersOnReplication)
            session.setId(msg.getSessionID());
        else
            session.setIdInternal(msg.getSessionID());
        session.resetDeltaRequest();
        session.endAccess();

    }

    /**
     * handle receive sessions from other not ( restart )
     * @param msg
     * @param sender
     * @throws ClassNotFoundException
     * @throws IOException
     */
    protected void handleALL_SESSION_DATA(SessionMessage msg,Member sender) throws ClassNotFoundException, IOException {
        counterReceive_EVT_ALL_SESSION_DATA++;
        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataBegin",getName()));
        byte[] data = msg.getSession();
        deserializeSessions(data);
        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataAfter",getName()));
        //stateTransferred = true;
    }

    /**
     * handle receive that other node want all sessions ( restart )
     * a) send all sessions with one message
     * b) send session at blocks
     * After sending send state is complete transfered
     * @param msg
     * @param sender
     * @throws IOException
     */
    protected void handleGET_ALL_SESSIONS(SessionMessage msg, Member sender) throws IOException {
        counterReceive_EVT_GET_ALL_SESSIONS++;
        //get a list of all the session from this manager
        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingBegin", getName()));
        // Write the number of active sessions, followed by the details
        // get all sessions and serialize without sync
        Session[] currentSessions = findSessions();
        long findSessionTimestamp = System.currentTimeMillis() ;
        if (isSendAllSessions()) {
            sendSessions(sender, currentSessions, findSessionTimestamp);
        } else {
            // send session at blocks
            int len = currentSessions.length < getSendAllSessionsSize() ? currentSessions.length : getSendAllSessionsSize();
            Session[] sendSessions = new Session[len];
            for (int i = 0; i < currentSessions.length; i += getSendAllSessionsSize()) {
                len = i + getSendAllSessionsSize() > currentSessions.length ? currentSessions.length - i : getSendAllSessionsSize();
                System.arraycopy(currentSessions, i, sendSessions, 0, len);
                sendSessions(sender, sendSessions,findSessionTimestamp);
                if (getSendAllSessionsWaitTime() > 0) {
                    try {
                        Thread.sleep(getSendAllSessionsWaitTime());
                    } catch (Exception sleep) {
                    }
                }//end if
            }//for
        }//end if
        
        SessionMessage newmsg = new SessionMessageImpl(name,SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE, null,"SESSION-STATE-TRANSFERED", "SESSION-STATE-TRANSFERED"+ getName());
        newmsg.setTimestamp(findSessionTimestamp);
        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionTransfered",getName()));
        counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE++;
        cluster.send(newmsg, sender);
    }


    /**
     * send a block of session to sender
     * @param sender
     * @param currentSessions
     * @param sendTimestamp
     * @throws IOException
     */
    protected void sendSessions(Member sender, Session[] currentSessions,long sendTimestamp) throws IOException {
        byte[] data = serializeSessions(currentSessions);
        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingAfter",getName()));
        SessionMessage newmsg = new SessionMessageImpl(name,SessionMessage.EVT_ALL_SESSION_DATA, data,"SESSION-STATE", "SESSION-STATE-" + getName());
        newmsg.setTimestamp(sendTimestamp);
        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionData",getName()));
        counterSend_EVT_ALL_SESSION_DATA++;
        cluster.send(newmsg, sender);
    }

    public ClusterManager cloneFromTemplate() {
        DeltaManager result = new DeltaManager();
        result.name = "Clone-from-"+name;
        result.cluster = cluster;
        result.replicationValve = replicationValve;
        result.maxActiveSessions = maxActiveSessions;
        result.expireSessionsOnShutdown = expireSessionsOnShutdown;
        result.notifyListenersOnReplication = notifyListenersOnReplication;
        result.notifySessionListenersOnReplication = notifySessionListenersOnReplication;
        result.stateTransferTimeout = stateTransferTimeout;
        result.sendAllSessions = sendAllSessions;
        result.sendClusterDomainOnly = sendClusterDomainOnly ;
        result.sendAllSessionsSize = sendAllSessionsSize;
        result.sendAllSessionsWaitTime = sendAllSessionsWaitTime ; 
        result.receiverQueue = receiverQueue ;
        result.stateTimestampDrop = stateTimestampDrop ;
        result.stateTransferCreateSendTime = stateTransferCreateSendTime; 
        return result;
    }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -