⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 deltamanager.java

📁 精通tomcat书籍原代码,希望大家共同学习
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
    /**     * Get the lifecycle listeners associated with this lifecycle. If this     * Lifecycle has no listeners registered, a zero-length array is returned.     */    public LifecycleListener[] findLifecycleListeners() {        return lifecycle.findLifecycleListeners();    }    /**     * Remove a lifecycle event listener from this component.     *      * @param listener     *            The listener to remove     */    public void removeLifecycleListener(LifecycleListener listener) {        lifecycle.removeLifecycleListener(listener);    }    /**     * Prepare for the beginning of active use of the public methods of this     * component. This method should be called after <code>configure()</code>,     * and before any of the public methods of the component are utilized.     *      * @exception LifecycleException     *                if this component detects a fatal error that prevents this     *                component from being used     */    public void start() throws LifecycleException {        if (!initialized) init();        // Validate and update our current component state        if (started) {            return;        }        started = true;        lifecycle.fireLifecycleEvent(START_EVENT, null);        // Force initialization of the random number generator        generateSessionId();        // Load unloaded sessions, if any        try {            //the channel is already running            Cluster cluster = getCluster() ;            // stop remove cluster binding            //wow, how many nested levels of if statements can we have ;)            if(cluster == null) {                Container context = getContainer() ;                if(context != null && context instanceof Context) {                     Container host = context.getParent() ;                     if(host != null && host instanceof Host) {                         cluster = host.getCluster();                         if(cluster != null && cluster instanceof CatalinaCluster) {                             setCluster((CatalinaCluster) cluster) ;                         } else {                             Container engine = host.getParent() ;                             if(engine != null && engine instanceof Engine) {                                 cluster = engine.getCluster();                                 if(cluster != null && cluster instanceof CatalinaCluster) {                                     setCluster((CatalinaCluster) cluster) ;                                 }                             } else {                                     cluster = null ;                             }                         }                     }                }            }            if (cluster == null) {                log.error(sm.getString("deltaManager.noCluster", getName()));                return;            } else {                if (log.isInfoEnabled()) {                    String type = "unknown" ;                    if( cluster.getContainer() instanceof Host){                        type = "Host" ;                    } else if( cluster.getContainer() instanceof Engine){                        type = "Engine" ;                    }                    log.info(sm.getString("deltaManager.registerCluster", getName(), type, cluster.getClusterName()));                }            }            if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.startClustering", getName()));            //to survice context reloads, as only a stop/start is called, not            // createManager            ((CatalinaCluster)cluster).addManager(getName(), this);            getAllClusterSessions();        } catch (Throwable t) {            log.error(sm.getString("deltaManager.managerLoad"), t);        }    }    /**     * get from first session master the backup from all clustered sessions     * @see #findSessionMasterMember()     */    public synchronized void getAllClusterSessions() {        if (cluster != null && cluster.getMembers().length > 0) {            long beforeSendTime = System.currentTimeMillis();            Member mbr = findSessionMasterMember();            if(mbr == null) { // No domain member found                 return;            }            SessionMessage msg = new SessionMessageImpl(this.getName(),SessionMessage.EVT_GET_ALL_SESSIONS, null, "GET-ALL","GET-ALL-" + getName());            // set reference time            stateTransferCreateSendTime = beforeSendTime ;            // request session state            counterSend_EVT_GET_ALL_SESSIONS++;            stateTransfered = false ;            // FIXME This send call block the deploy thread, when sender waitForAck is enabled            try {                synchronized(receivedMessageQueue) {                     receiverQueue = true ;                }                cluster.send(msg, mbr);                if (log.isWarnEnabled()) log.warn(sm.getString("deltaManager.waitForSessionState",getName(), mbr));                // FIXME At sender ack mode this method check only the state transfer and resend is a problem!                waitForSendAllSessions(beforeSendTime);            } finally {                synchronized(receivedMessageQueue) {                    for (Iterator iter = receivedMessageQueue.iterator(); iter.hasNext();) {                        SessionMessage smsg = (SessionMessage) iter.next();                        if (!stateTimestampDrop) {                            messageReceived(smsg, smsg.getAddress() != null ? (Member) smsg.getAddress() : null);                        } else {                            if (smsg.getEventType() != SessionMessage.EVT_GET_ALL_SESSIONS && smsg.getTimestamp() >= stateTransferCreateSendTime) {                                // FIXME handle EVT_GET_ALL_SESSIONS later                                messageReceived(smsg,smsg.getAddress() != null ? (Member) smsg.getAddress() : null);                            } else {                                if (log.isWarnEnabled()) {                                    log.warn(sm.getString("deltaManager.dropMessage",getName(), smsg.getEventTypeString(),new Date(stateTransferCreateSendTime), new Date(smsg.getTimestamp())));                                }                            }                        }                    }                            receivedMessageQueue.clear();                    receiverQueue = false ;                }           }        } else {            if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.noMembers", getName()));        }    }    /**     * Register cross context session at replication valve thread local     * @param session cross context session     */    protected void registerSessionAtReplicationValve(DeltaSession session) {        if(replicationValve == null) {            if(container instanceof StandardContext && ((StandardContext)container).getCrossContext()) {                Cluster cluster = getCluster() ;                if(cluster != null && cluster instanceof CatalinaCluster) {                    Valve[] valves = ((CatalinaCluster)cluster).getValves();                    if(valves != null && valves.length > 0) {                        for(int i=0; replicationValve == null && i < valves.length ; i++ ){                            if(valves[i] instanceof ReplicationValve) replicationValve = (ReplicationValve)valves[i] ;                        }//for                        if(replicationValve == null && log.isDebugEnabled()) {                            log.debug("no ReplicationValve found for CrossContext Support");                        }//endif                     }//end if                }//endif            }//end if        }//end if        if(replicationValve != null) {            replicationValve.registerReplicationSession(session);        }    }        /**     * Find the master of the session state     * @return master member of sessions      */    protected Member findSessionMasterMember() {        Member mbr = null;        Member mbrs[] = cluster.getMembers();        String localMemberDomain = cluster.getLocalMember().getDomain();        if(isSendClusterDomainOnly()) {            for (int i = 0; mbr == null && i < mbrs.length; i++) {                Member member = mbrs[i];                if(localMemberDomain.equals(member.getDomain())) mbr = member ;            }        } else {            if(mbrs.length != 0 ) mbr = mbrs[0];        }        if(mbr == null && log.isWarnEnabled()) log.warn(sm.getString("deltaManager.noMasterMember",getName(), localMemberDomain));        if(mbr != null && log.isDebugEnabled()) log.warn(sm.getString("deltaManager.foundMasterMember",getName(), mbr));        return mbr;    }    /**     * Wait that cluster session state is transfer or timeout after 60 Sec     * With stateTransferTimeout == -1 wait that backup is transfered (forever mode)     */    protected void waitForSendAllSessions(long beforeSendTime) {        long reqStart = System.currentTimeMillis();        long reqNow = reqStart ;        boolean isTimeout = false;        if(getStateTransferTimeout() > 0) {            // wait that state is transfered with timeout check            do {                try {                    Thread.sleep(100);                } catch (Exception sleep) {                    //                }                reqNow = System.currentTimeMillis();                isTimeout = ((reqNow - reqStart) > (1000 * getStateTransferTimeout()));            } while ((!getStateTransfered()) && (!isTimeout));        } else {            if(getStateTransferTimeout() == -1) {                // wait that state is transfered                do {                    try {                        Thread.sleep(100);                    } catch (Exception sleep) {                    }                } while ((!getStateTransfered()));                reqNow = System.currentTimeMillis();            }        }        if (isTimeout || (!getStateTransfered())) {            counterNoStateTransfered++ ;            log.error(sm.getString("deltaManager.noSessionState",getName(),new Date(beforeSendTime),new Long(reqNow - beforeSendTime)));        } else {            if (log.isInfoEnabled())                log.info(sm.getString("deltaManager.sessionReceived",getName(), new Date(beforeSendTime), new Long(reqNow - beforeSendTime)));        }    }    /**     * Gracefully terminate the active use of the public methods of this     * component. This method should be the last one called on a given instance     * of this component.     *      * @exception LifecycleException     *                if this component detects a fatal error that needs to be     *                reported     */    public void stop() throws LifecycleException {        if (log.isDebugEnabled())            log.debug(sm.getString("deltaManager.stopped", getName()));        // Validate and update our current component state        if (!started)            throw new LifecycleException(sm.getString("deltaManager.notStarted"));        lifecycle.fireLifecycleEvent(STOP_EVENT, null);        started = false;        // Expire all active sessions        if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.expireSessions", getName()));        Session sessions[] = findSessions();        for (int i = 0; i < sessions.length; i++) {            DeltaSession session = (DeltaSession) sessions[i];            if (!session.isValid())                continue;            try {                session.expire(true, isExpireSessionsOnShutdown());            } catch (Throwable ignore) {                ;            }         }        // Require a new random number generator if we are restarted        this.random = null;        getCluster().removeManager(getName(),this);        replicationValve = null;        if (initialized) {            destroy();        }    }    // ----------------------------------------- PropertyChangeListener Methods    /**     * Process property change events from our associated Context.     *      * @param event     *            The property change event that has occurred     */    public void propertyChange(PropertyChangeEvent event) {        // Validate the source of this event        if (!(event.getSource() instanceof Context))            return;        // Process a relevant property change        if (event.getPropertyName().equals("sessionTimeout")) {            try {                setMaxInactiveInterval(((Integer) event.getNewValue()).intValue() * 60);            } catch (NumberFormatException e) {                log.error(sm.getString("deltaManager.sessionTimeout", event.getNewValue()));            }        }    }    // -------------------------------------------------------- Replication    // Methods    /**     * A message was received from another node, this is the callback method to     * implement if you are interested in receiving replication messages.     *      * @param cmsg -     *            the message received.     */    public void messageDataReceived(ClusterMessage cmsg) {        if (cmsg != null && cmsg instanceof SessionMessage) {            SessionMessage msg = (SessionMessage) cmsg;            switch (msg.getEventType()) {                case SessionMessage.EVT_GET_ALL_SESSIONS:                case SessionMessage.EVT_SESSION_CREATED:                 case SessionMessage.EVT_SESSION_EXPIRED:                 case SessionMessage.EVT_SESSION_ACCESSED:                case SessionMessage.EVT_SESSION_DELTA: {                    synchronized(receivedMessageQueue) {                        if(receiverQueue) {                            receivedMessageQueue.add(msg);                            return ;                        }                    }                   break;                }                default: {                    //we didn't queue, do nothing                    break;                }            } //switch                        messageReceived(msg, msg.getAddress() != null ? (Member) msg.getAddress() : null);        }    }    /**     * When the request has been completed, the replication valve will notify     * the manager, and the manager will decide whether any replication is     * needed or not. If there is a need for replication, the manager will     * create a session message and that will be replicated. The cluster     * determines where it gets sent.     *      * @param sessionId -     *            the sessionId that just completed.     * @return a SessionMessage to be sent,     */    public ClusterMessage requestCompleted(String sessionId) {        try {            DeltaSession session = (DeltaSession) findSession(sessionId);            DeltaRequest deltaRequest = session.getDeltaRequest();            SessionMessage msg = null;            boolean isDeltaRequest = false ;            synchronized(deltaRequest) {                isDeltaRequest = deltaRequest.getSize() > 0 ;                if (isDeltaRequest) {                        counterSend_EVT_SESSION_DELTA++;                    byte[] data = serializeDeltaRequest(deltaRequest);                    msg = new SessionMessageImpl(getName(),                                                 SessionMessage.EVT_SESSION_DELTA,                                                  data,                                                  sessionId,                                                 sessionId + "-" + System.currentTimeMillis());                    session.resetDeltaRequest();                }              }            if(!isDeltaRequest) {                if(!session.isPrimarySession()) {                                   counterSend_EVT_SESSION_ACCESSED++;                    msg = new SessionMessageImpl(getName(),                                                 SessionMessage.EVT_SESSION_ACCESSED,                                                  null, 

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -