⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 deltamanager.java

📁 This temp directory is used by the JVM for temporary file storage. The JVM is configured to use thi
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
                
                //request session state
                cluster.send(msg, mbr);
                log.warn("Manager["+getName()+"], requesting session state from "+mbr+
                         ". This operation will timeout if no session state has been received within "+
                         "60 seconds");
                long reqStart = System.currentTimeMillis();
                long reqNow = 0;
                boolean isTimeout=false;
                do {
                    try {
                        Thread.currentThread().sleep(100);
                    }catch ( Exception sleep) {}
                    reqNow = System.currentTimeMillis();
                    isTimeout=((reqNow-reqStart)>(1000*60));
                } while ( (!getStateTransferred()) && (!isTimeout));
                if ( isTimeout || (!getStateTransferred()) ) {
                    log.error("Manager["+getName()+"], No session state received, timing out.");
                }else {
                    log.info("Manager["+getName()+"], session state received in "+(reqNow-reqStart)+" ms.");
                }
            } else {
                log.info("Manager["+getName()+"], skipping state transfer. No members active in cluster group.");
            }//end if

        } catch (Throwable t) {
            log.error(sm.getString("standardManager.managerLoad"), t);
        }

    }


    /**
     * Gracefully terminate the active use of the public methods of this
     * component.  This method should be the last one called on a given
     * instance of this component.
     *
     * @exception LifecycleException if this component detects a fatal error
     *  that needs to be reported
     */
    public void stop() throws LifecycleException {

        if (log.isDebugEnabled())
            log.debug("Stopping");

        // Validate and update our current component state
        if (!started)
            throw new LifecycleException
                (sm.getString("standardManager.notStarted"));
        lifecycle.fireLifecycleEvent(STOP_EVENT, null);
        started = false;

        // Expire all active sessions
        if ( this.getExpireSessionsOnShutdown() ) {
            log.info("Expiring sessions upon shutdown");
            Session sessions[] = findSessions();
            for (int i = 0; i < sessions.length; i++) {
                DeltaSession session = (DeltaSession) sessions[i];
                if (!session.isValid())
                    continue;
                try {
                    session.expire();
                }
                catch (Throwable t) {
                    ;
                } //catch
            } //for
        }//end if

        // Require a new random number generator if we are restarted
        this.random = null;

        if( initialized ) {
            destroy();
        }
    }


    // ----------------------------------------- PropertyChangeListener Methods


    /**
     * Process property change events from our associated Context.
     *
     * @param event The property change event that has occurred
     */
    public void propertyChange(PropertyChangeEvent event) {

        // Validate the source of this event
        if (!(event.getSource() instanceof Context))
            return;
        Context context = (Context) event.getSource();

        // Process a relevant property change
        if (event.getPropertyName().equals("sessionTimeout")) {
            try {
                setMaxInactiveInterval
                    ( ((Integer) event.getNewValue()).intValue()*60 );
            } catch (NumberFormatException e) {
                log.error(sm.getString("standardManager.sessionTimeout",
                                 event.getNewValue().toString()));
            }
        }

    }

    // -------------------------------------------------------- Replication Methods

    /**
        * A message was received from another node, this
        * is the callback method to implement if you are interested in
        * receiving replication messages.
        * @param msg - the message received.
        */
       public void messageDataReceived(SessionMessage msg) {
           messageReceived(msg, msg.getAddress()!=null?(Member)msg.getAddress():null);
       }

       /**
        * When the request has been completed, the replication valve
        * will notify the manager, and the manager will decide whether
        * any replication is needed or not.
        * If there is a need for replication, the manager will
        * create a session message and that will be replicated.
        * The cluster determines where it gets sent.
        * @param sessionId - the sessionId that just completed.
        * @return a SessionMessage to be sent,
        */
       public SessionMessage requestCompleted(String sessionId) {
           try {
               DeltaSession session = (DeltaSession) findSession(sessionId);
               DeltaRequest deltaRequest = session.getDeltaRequest();
               SessionMessage msg = null;
               if (deltaRequest.getSize() > 0) {
   
                   byte[] data = unloadDeltaRequest(deltaRequest);
                   msg = new SessionMessage(name, SessionMessage.EVT_SESSION_DELTA,
                                            data, sessionId);
                   session.resetDeltaRequest();
               } else if ( !session.isPrimarySession() ) {
                   msg = new SessionMessage(getName(),
                                         SessionMessage.EVT_SESSION_ACCESSED,
                                         null,
                                         sessionId);
               }
               session.setPrimarySession(true);
               //check to see if we need to send out an access message
               if ( (msg == null) ) {
                   long replDelta = System.currentTimeMillis() - session.getLastTimeReplicated();
                   if ( replDelta > (getMaxInactiveInterval()*1000) ) {
                       msg = new SessionMessage(getName(),
                                             SessionMessage.EVT_SESSION_ACCESSED,
                                             null,
                                             sessionId);
                   }
                   
               }
               
               //update last replicated time
               if ( msg != null ) session.setLastTimeReplicated(System.currentTimeMillis());
               return msg;
           }
           catch (IOException x) {
               log.error("Unable to serialize delta request", x);
               return null;
           }
   
       }
       
       protected void sessionExpired(String id) {
           SessionMessage msg = new SessionMessage(getName(), 
                                                   SessionMessage.EVT_SESSION_EXPIRED,
                                                   null,
                                                   id);
           cluster.send(msg);
       }
   
       /**
        * When the manager expires session not tied to a request.
        * The cluster will periodically ask for a list of sessions
        * that should expire and that should be sent across the wire.
        * @return
        */
       public String[] getInvalidatedSessions() {
           return new String[0];
       }


       /**
        * This method is called by the received thread when a SessionMessage has
        * been received from one of the other nodes in the cluster.
        * @param msg - the message received
        * @param sender - the sender of the message, this is used if we receive a
        *                 EVT_GET_ALL_SESSION message, so that we only reply to
        *                 the requesting node
        */
       protected void messageReceived(SessionMessage msg, Member sender) {
           try {
               log.debug("Manager ("+name+") Received SessionMessage of type=" + msg.getEventTypeString()+" from "+sender);
               switch (msg.getEventType()) {
                   case SessionMessage.EVT_GET_ALL_SESSIONS: {
                       //get a list of all the session from this manager
                       byte[] data = doUnload();
                       SessionMessage newmsg = new SessionMessage(name,
                           SessionMessage.EVT_ALL_SESSION_DATA,
                           data, "");
                       cluster.send(newmsg, sender);
                       break;
                   }
                   case SessionMessage.EVT_ALL_SESSION_DATA: {
                       byte[] data = msg.getSession();
                       doLoad(data);
                       stateTransferred = true;
                       break;
                   }
                   case SessionMessage.EVT_SESSION_CREATED: {
                       DeltaSession session = (DeltaSession)createSession(false);
                       session.setId(msg.getSessionID());
                       session.setNew(false);
                       session.setPrimarySession(false);
                       session.resetDeltaRequest();
                       break;
                   }
                   case SessionMessage.EVT_SESSION_EXPIRED: {
                       DeltaSession session = (DeltaSession)findSession(msg.getSessionID());
                       if (session != null) {
                           session.expire(true,false);
                       } //end if
                       break;
                   }
                   case SessionMessage.EVT_SESSION_ACCESSED: {
                       DeltaSession session = (DeltaSession)findSession(msg.getSessionID());
                       if (session != null) {
                           session.access();
                           session.setPrimarySession(false);
                       }
                       break;
                   }
                   case SessionMessage.EVT_SESSION_DELTA : {
                       byte[] delta = msg.getSession();
                       DeltaSession session = (DeltaSession)findSession(msg.getSessionID());
                       if (session != null) {
                           DeltaRequest dreq = loadDeltaRequest(session, delta);
                           dreq.execute(session);
                           session.setPrimarySession(false);
                           session.access();
                       }
                       
                       break;
                   }
                   default: {
                       //we didn't recognize the message type, do nothing
                       break;
                   }
               } //switch
           }
           catch (Exception x) {
               log.error("Unable to receive message through TCP channel", x);
           }
       }



    // -------------------------------------------------------- Private Methods

    public void backgroundProcess() {
        log.debug("DeltaManager.backgroundProcess invoked at "+System.currentTimeMillis());
        processExpires();
    }
    /**
     * Invalidate all sessions that have expired.
     */
    public void processExpires() {
        long timeNow = System.currentTimeMillis();
        Session sessions[] = findSessions();

        for (int i = 0; i < sessions.length; i++) {
            DeltaSession session = (DeltaSession) sessions[i];
            if (!session.isValid()) {
                try {
                    expiredSessions++;
                } catch (Throwable t) {
                    log.error(sm.getString
                              ("standardManager.expireException"), t);
                }
            }
        }
        long timeEnd = System.currentTimeMillis();
        processingTime += ( timeEnd - timeNow );
    }

    public boolean getStateTransferred() {
        return stateTransferred;
    }

    public void setStateTransferred(boolean stateTransferred) {
        this.stateTransferred = stateTransferred;
    }

    public CatalinaCluster getCluster() {
        return cluster;
    }

    public void setCluster(CatalinaCluster cluster) {
        this.cluster = cluster;
    }

    public void load() {

    }

    public void unload() {

    }
    public boolean getUseDirtyFlag() {
        return useDirtyFlag;
    }
    public void setUseDirtyFlag(boolean useDirtyFlag) {
        this.useDirtyFlag = useDirtyFlag;
    }
    public boolean getExpireSessionsOnShutdown() {
        return expireSessionsOnShutdown;
    }
    public void setExpireSessionsOnShutdown(boolean expireSessionsOnShutdown) {
        this.expireSessionsOnShutdown = expireSessionsOnShutdown;
    }
    public boolean getPrintToScreen() {
        return printToScreen;
    }
    public void setPrintToScreen(boolean printToScreen) {
        this.printToScreen = printToScreen;
    }
    public void setName(String name) {
        this.name = name;
    }


}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -