⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 simpletcpreplicationmanager.java

📁 精通tomcat书籍原代码,希望大家共同学习
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
            boolean hasPrincipal = session.getPrincipal() != null;            session_out.writeBoolean(hasPrincipal);            if ( hasPrincipal )            {                session_out.writeObject(SerializablePrincipal.createPrincipal((GenericPrincipal)session.getPrincipal()));            }//end if            ((ReplicatedSession)session).writeObjectData(session_out);            return session_data.toByteArray();        }        catch ( Exception x )        {            log.error("Failed to serialize the session!",x);        }        return null;    }        /**     * Open Stream and use correct ClassLoader (Container) Switch     * ThreadClassLoader     *      * @param data     * @return The object input stream     * @throws IOException     */    public ReplicationStream getReplicationStream(byte[] data) throws IOException {        return getReplicationStream(data,0,data.length);    }        public ReplicationStream getReplicationStream(byte[] data, int offset, int length) throws IOException {        ByteArrayInputStream fis =null;        ReplicationStream ois = null;        Loader loader = null;        ClassLoader classLoader = null;        //fix to be able to run the DeltaManager        //stand alone without a container.        //use the Threads context class loader        if (container != null)            loader = container.getLoader();        if (loader != null)            classLoader = loader.getClassLoader();        else            classLoader = Thread.currentThread().getContextClassLoader();        //end fix        fis = new ByteArrayInputStream(data, offset, length);        if ( classLoader == Thread.currentThread().getContextClassLoader() ) {            ois = new ReplicationStream(fis, new ClassLoader[] {classLoader});        } else {            ois = new ReplicationStream(fis, new ClassLoader[] {classLoader,Thread.currentThread().getContextClassLoader()});        }        return ois;    }            /**     * Reinstantiates a serialized session from the data passed in.     * This will first call createSession() so that we get a fresh instance with all     * the managers set and all the transient fields validated.     * Then it calls Session.readObjectData(byte[]) to deserialize the object     * @param data - a byte array containing session data     * @return a valid Session object, null if an error occurs     *     */    protected Session readSession( byte[] data, String sessionId )    {        try        {            ReplicationStream session_in = getReplicationStream(data);            Session session = sessionId!=null?this.findSession(sessionId):null;            boolean isNew = (session==null);            //clear the old values from the existing session            if ( session!=null ) {                ReplicatedSession rs = (ReplicatedSession)session;                rs.expire(false);  //cleans up the previous values, since we are not doing removes                session = null;            }//end if            if (session==null) {                session = createSession(null,false, false);                sessions.remove(session.getIdInternal());            }                                    boolean hasPrincipal = session_in.readBoolean();            SerializablePrincipal p = null;            if ( hasPrincipal )                p = (SerializablePrincipal)session_in.readObject();            ((ReplicatedSession)session).readObjectData(session_in);            if ( hasPrincipal )                session.setPrincipal(p.getPrincipal(getContainer().getRealm()));            ((ReplicatedSession)session).setId(sessionId,isNew);            ReplicatedSession rsession = (ReplicatedSession)session;             rsession.setAccessCount(1);            session.setManager(this);            session.setValid(true);            rsession.setLastAccessedTime(System.currentTimeMillis());            rsession.setThisAccessedTime(System.currentTimeMillis());            ((ReplicatedSession)session).setAccessCount(0);            session.setNew(false);            if(log.isTraceEnabled())                 log.trace("Session loaded id="+sessionId +                               " actualId="+session.getId()+                                " exists="+this.sessions.containsKey(sessionId)+                               " valid="+rsession.isValid());            return session;        }        catch ( Exception x )        {            log.error("Failed to deserialize the session!",x);        }        return null;    }    public String getName() {        return this.name;    }    /**     * Prepare for the beginning of active use of the public methods of this     * component.  This method should be called after <code>configure()</code>,     * and before any of the public methods of the component are utilized.<BR>     * Starts the cluster communication channel, this will connect with the other nodes     * in the cluster, and request the current session state to be transferred to this node.     * @exception IllegalStateException if this component has already been     *  started     * @exception LifecycleException if this component detects a fatal error     *  that prevents this component from being used     */    public void start() throws LifecycleException {        mManagerRunning = true;        super.start();        //start the javagroups channel        try {            //the channel is already running            if ( mChannelStarted ) return;            if(log.isInfoEnabled())                log.info("Starting clustering manager...:"+getName());            if ( cluster == null ) {                log.error("Starting... no cluster associated with this context:"+getName());                return;            }            cluster.addManager(getName(),this);            if (cluster.getMembers().length > 0) {                Member mbr = cluster.getMembers()[0];                SessionMessage msg =                    new SessionMessageImpl(this.getName(),                                       SessionMessage.EVT_GET_ALL_SESSIONS,                                       null,                                       "GET-ALL",                                       "GET-ALL-"+this.getName());                cluster.send(msg, mbr);                if(log.isWarnEnabled())                     log.warn("Manager["+getName()+"], requesting session state from "+mbr+                         ". This operation will timeout if no session state has been received within "+                         "60 seconds");                long reqStart = System.currentTimeMillis();                long reqNow = 0;                boolean isTimeout=false;                do {                    try {                        Thread.sleep(100);                    }catch ( Exception sleep) {}                    reqNow = System.currentTimeMillis();                    isTimeout=((reqNow-reqStart)>(1000*60));                } while ( (!isStateTransferred()) && (!isTimeout));                if ( isTimeout || (!isStateTransferred()) ) {                    log.error("Manager["+getName()+"], No session state received, timing out.");                }else {                    if(log.isInfoEnabled())                        log.info("Manager["+getName()+"], session state received in "+(reqNow-reqStart)+" ms.");                }            } else {                if(log.isInfoEnabled())                    log.info("Manager["+getName()+"], skipping state transfer. No members active in cluster group.");            }//end if            mChannelStarted = true;        }  catch ( Exception x ) {            log.error("Unable to start SimpleTcpReplicationManager",x);        }    }    /**     * Gracefully terminate the active use of the public methods of this     * component.  This method should be the last one called on a given     * instance of this component.<BR>     * This will disconnect the cluster communication channel and stop the listener thread.     * @exception IllegalStateException if this component has not been started     * @exception LifecycleException if this component detects a fatal error     *  that needs to be reported     */    public void stop() throws LifecycleException    {        mManagerRunning = false;        mChannelStarted = false;        super.stop();        //stop the javagroup channel        try        {            this.sessions.clear();            cluster.removeManager(getName(),this);//            mReplicationListener.stopListening();//            mReplicationTransmitter.stop();//            service.stop();//            service = null;        }        catch ( Exception x )        {            log.error("Unable to stop SimpleTcpReplicationManager",x);        }    }    public void setDistributable(boolean dist) {        this.distributable = dist;    }    public boolean getDistributable() {        return distributable;    }    /**     * This method is called by the received thread when a SessionMessage has     * been received from one of the other nodes in the cluster.     * @param msg - the message received     * @param sender - the sender of the message, this is used if we receive a     *                 EVT_GET_ALL_SESSION message, so that we only reply to     *                 the requesting node     */    protected void messageReceived( SessionMessage msg, Member sender ) {        try  {            if(log.isInfoEnabled()) {                log.debug("Received SessionMessage of type="+msg.getEventTypeString());                log.debug("Received SessionMessage sender="+sender);            }            switch ( msg.getEventType() ) {                case SessionMessage.EVT_GET_ALL_SESSIONS: {                    //get a list of all the session from this manager                    Object[] sessions = findSessions();                    java.io.ByteArrayOutputStream bout = new java.io.ByteArrayOutputStream();                    java.io.ObjectOutputStream oout = new java.io.ObjectOutputStream(bout);                    oout.writeInt(sessions.length);                    for (int i=0; i<sessions.length; i++){                        ReplicatedSession ses = (ReplicatedSession)sessions[i];                        oout.writeUTF(ses.getIdInternal());                        byte[] data = writeSession(ses);                        oout.writeObject(data);                    }//for                    //don't send a message if we don't have to                    oout.flush();                    oout.close();                    byte[] data = bout.toByteArray();                    SessionMessage newmsg = new SessionMessageImpl(name,                        SessionMessage.EVT_ALL_SESSION_DATA,                        data, "SESSION-STATE","SESSION-STATE-"+getName());                    cluster.send(newmsg, sender);                    break;                }                case SessionMessage.EVT_ALL_SESSION_DATA: {                    java.io.ByteArrayInputStream bin =                        new java.io.ByteArrayInputStream(msg.getSession());                    java.io.ObjectInputStream oin = new java.io.ObjectInputStream(bin);                    int size = oin.readInt();                    for ( int i=0; i<size; i++) {                        String id = oin.readUTF();                        byte[] data = (byte[])oin.readObject();                        Session session = readSession(data,id);                    }//for                    stateTransferred=true;                    break;                }                case SessionMessage.EVT_SESSION_CREATED: {                    Session session = this.readSession(msg.getSession(),msg.getSessionID());                    if ( log.isDebugEnabled() ) {                        log.debug("Received replicated session=" + session +                            " isValid=" + session.isValid());                    }                    break;                }                case SessionMessage.EVT_SESSION_EXPIRED: {                    Session session = findSession(msg.getSessionID());                    if ( session != null ) {                        session.expire();                        this.remove(session);                    }//end if                    break;                }                case SessionMessage.EVT_SESSION_ACCESSED :{                    Session session = findSession(msg.getSessionID());                    if ( session != null ) {                        session.access();                        session.endAccess();                    }                    break;                }                default:  {                    //we didn't recognize the message type, do nothing                    break;                }            }//switch        }        catch ( Exception x )        {            log.error("Unable to receive message through TCP channel",x);        }    }    public void messageDataReceived(ClusterMessage cmsg) {        try {            if ( cmsg instanceof SessionMessage ) {                SessionMessage msg = (SessionMessage)cmsg;                messageReceived(msg,                                msg.getAddress() != null ? (Member) msg.getAddress() : null);            }        } catch(Throwable ex){            log.error("InMemoryReplicationManager.messageDataReceived()", ex);        }//catch    }    public boolean isStateTransferred() {        return stateTransferred;    }    public void setName(String name) {        this.name = name;    }    public boolean isNotifyListenersOnReplication() {        return notifyListenersOnReplication;    }    public void setNotifyListenersOnReplication(boolean notifyListenersOnReplication) {        this.notifyListenersOnReplication = notifyListenersOnReplication;    }    /*      * @see org.apache.catalina.ha.ClusterManager#getCluster()     */    public CatalinaCluster getCluster() {        return cluster;    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -