⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 simpletcpreplicationmanager.java

📁 This temp directory is used by the JVM for temporary file storage. The JVM is configured to use thi
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
                    //return immediately if the session is not dirty
                    if (useDirtyFlag && (!session.isDirty())) {
                        //but before we return doing nothing,
                        //see if we should send
                        //an updated last access message so that
                        //sessions across cluster dont expire
                        long interval = session.getMaxInactiveInterval();
                        long lastaccdist = System.currentTimeMillis() -
                            session.getLastAccessWasDistributed();
                        if ( ((interval*1000) / lastaccdist)< 3 ) {
                            SessionMessage accmsg = new SessionMessage(name,
                                SessionMessage.EVT_SESSION_ACCESSED,
                                null,
                                sessionId);
                            session.setLastAccessWasDistributed(System.currentTimeMillis());
                            return accmsg;
                        }
                        return null;
                    }

                    session.setIsDirty(false);
                    if (getDebug() > 5) {
                        try {
                            log.debug("Sending session to cluster=" + session);
                        }
                        catch (Exception ignore) {}
                    }
                    SessionMessage msg = new SessionMessage(name,
                        SessionMessage.EVT_SESSION_CREATED,
                        writeSession(session),
                        session.getId());
                    return msg;
                } //end if
            }//end if
        }
        catch (Exception x )
        {
            log("Unable to replicate session",x);
        }
        return null;
    }

    /**
     * Serialize a session into a byte array<BR>
     * This method simple calls the writeObjectData method on the session
     * and returns the byte data from that call
     * @param session - the session to be serialized
     * @return a byte array containing the session data, null if the serialization failed
     */
    protected byte[] writeSession( Session session )
    {
        try
        {
            java.io.ByteArrayOutputStream session_data = new java.io.ByteArrayOutputStream();
            java.io.ObjectOutputStream session_out = new java.io.ObjectOutputStream(session_data);
            session_out.flush();
            boolean hasPrincipal = session.getPrincipal() != null;
            session_out.writeBoolean(hasPrincipal);
            if ( hasPrincipal )
            {
                session_out.writeObject(SerializablePrincipal.createPrincipal((GenericPrincipal)session.getPrincipal()));
            }//end if
            ((ReplicatedSession)session).writeObjectData(session_out);
            return session_data.toByteArray();

        }
        catch ( Exception x )
        {
            log("Failed to serialize the session!",x,1);
        }
        return null;
    }

    /**
     * Reinstantiates a serialized session from the data passed in.
     * This will first call createSession() so that we get a fresh instance with all
     * the managers set and all the transient fields validated.
     * Then it calls Session.readObjectData(byte[]) to deserialize the object
     * @param data - a byte array containing session data
     * @return a valid Session object, null if an error occurs
     *
     */
    protected Session readSession( byte[] data, String sessionId )
    {
        try
        {
            java.io.ByteArrayInputStream session_data = new java.io.ByteArrayInputStream(data);
            ReplicationStream session_in = new ReplicationStream(session_data,container.getLoader().getClassLoader());

            Session session = sessionId!=null?this.findSession(sessionId):null;
            //clear the old values from the existing session
            if ( session!=null ) {
                ReplicatedSession rs = (ReplicatedSession)session;
                rs.expire(false);
                session = null;
            }//end if

            if (session==null) session = createSession(false,false);
            boolean hasPrincipal = session_in.readBoolean();
            SerializablePrincipal p = null;
            if ( hasPrincipal )
                p = (SerializablePrincipal)session_in.readObject();
            ((ReplicatedSession)session).readObjectData(session_in);
            if ( hasPrincipal )
                session.setPrincipal(p.getPrincipal(getContainer().getRealm()));
            return session;

        }
        catch ( Exception x )
        {
            log("Failed to deserialize the session!",x,1);
        }
        return null;
    }

    public String getName() {
        return this.name;
    }
    /**
     * Prepare for the beginning of active use of the public methods of this
     * component.  This method should be called after <code>configure()</code>,
     * and before any of the public methods of the component are utilized.<BR>
     * Starts the cluster communication channel, this will connect with the other nodes
     * in the cluster, and request the current session state to be transferred to this node.
     * @exception IllegalStateException if this component has already been
     *  started
     * @exception LifecycleException if this component detects a fatal error
     *  that prevents this component from being used
     */
    public void start() throws LifecycleException {
        mManagerRunning = true;
        super.start();
        //start the javagroups channel
        try {
            //the channel is already running
            if ( mChannelStarted ) return;
            log("Starting clustering manager...:"+getName(),1);
            if ( cluster == null ) {
                log("Starting... no cluster associated with this context:"+getName(),1);
                return;
            }

            if (cluster.getMembers().length > 0) {
                Member mbr = cluster.getMembers()[0];
                SessionMessage msg =
                    new SessionMessage(this.getName(),
                                       SessionMessage.EVT_GET_ALL_SESSIONS,
                                       null,
                                       "GET-ALL");
                cluster.send(msg, mbr);
                log.warn("Manager["+getName()+"], requesting session state from "+mbr+
                         ". This operation will timeout if no session state has been received within "+
                         "60 seconds");
                long reqStart = System.currentTimeMillis();
                long reqNow = 0;
                boolean isTimeout=false;
                do {
                    try {
                        Thread.currentThread().sleep(100);
                    }catch ( Exception sleep) {}
                    reqNow = System.currentTimeMillis();
                    isTimeout=((reqNow-reqStart)>(1000*60));
                } while ( (!isStateTransferred()) && (!isTimeout));
                if ( isTimeout || (!isStateTransferred()) ) {
                    log.error("Manager["+getName()+"], No session state received, timing out.");
                }else {
                    log.info("Manager["+getName()+"], session state received in "+(reqNow-reqStart)+" ms.");
                }
            } else {
                log.info("Manager["+getName()+"], skipping state transfer. No members active in cluster group.");
            }//end if
            mChannelStarted = true;
        }  catch ( Exception x ) {
            log("Unable to start SimpleTcpReplicationManager",x,1);
        }
    }

    /**
     * Gracefully terminate the active use of the public methods of this
     * component.  This method should be the last one called on a given
     * instance of this component.<BR>
     * This will disconnect the cluster communication channel and stop the listener thread.
     * @exception IllegalStateException if this component has not been started
     * @exception LifecycleException if this component detects a fatal error
     *  that needs to be reported
     */
    public void stop() throws LifecycleException
    {
        mManagerRunning = false;
        mChannelStarted = false;
        super.stop();
        //stop the javagroup channel
        try
        {
//            mReplicationListener.stopListening();
//            mReplicationTransmitter.stop();
//            service.stop();
//            service = null;
        }
        catch ( Exception x )
        {
            log("Unable to stop javagroups channel",x,1);
        }
    }

    public void setDistributable(boolean dist) {
        this.distributable = dist;
    }

    public boolean getDistributable() {
        return distributable;
    }

    /**
     * This method is called by the received thread when a SessionMessage has
     * been received from one of the other nodes in the cluster.
     * @param msg - the message received
     * @param sender - the sender of the message, this is used if we receive a
     *                 EVT_GET_ALL_SESSION message, so that we only reply to
     *                 the requesting node
     */
    protected void messageReceived( SessionMessage msg, Member sender ) {
        try  {
            log("Received SessionMessage of type="+msg.getEventTypeString(),3);
            log("Received SessionMessage sender="+sender,3);
            switch ( msg.getEventType() ) {
                case SessionMessage.EVT_GET_ALL_SESSIONS: {
                    //get a list of all the session from this manager
                    Object[] sessions = findSessions();
                    java.io.ByteArrayOutputStream bout = new java.io.ByteArrayOutputStream();
                    java.io.ObjectOutputStream oout = new java.io.ObjectOutputStream(bout);
                    oout.writeInt(sessions.length);
                    for (int i=0; i<sessions.length; i++){
                        ReplicatedSession ses = (ReplicatedSession)sessions[i];
                        oout.writeUTF(ses.getId());
                        byte[] data = writeSession(ses);
                        oout.writeObject(data);
                    }//for
                    //don't send a message if we don't have to
                    oout.flush();
                    oout.close();
                    byte[] data = bout.toByteArray();
                    SessionMessage newmsg = new SessionMessage(name,
                        SessionMessage.EVT_ALL_SESSION_DATA,
                        data, "");
                    cluster.send(newmsg, sender);
                    break;
                }
                case SessionMessage.EVT_ALL_SESSION_DATA: {
                    java.io.ByteArrayInputStream bin =
                        new java.io.ByteArrayInputStream(msg.getSession());
                    java.io.ObjectInputStream oin = new java.io.ObjectInputStream(bin);
                    int size = oin.readInt();
                    for ( int i=0; i<size; i++) {
                        String id = oin.readUTF();
                        byte[] data = (byte[])oin.readObject();
                        Session session = readSession(data,id);
                        session.setManager(this);
                        add(session);
                    }//for
                    stateTransferred=true;
                    break;
                }
                case SessionMessage.EVT_SESSION_CREATED: {
                    Session session = this.readSession(msg.getSession(),msg.getSessionID());
                    session.setManager(this);
                    add(session);
                    session.setValid(true);
                    session.access();
                    if ( getDebug()  > 5 ) log("Received replicated session="+session);
                    break;
                }
                case SessionMessage.EVT_SESSION_EXPIRED: {
                    Session session = findSession(msg.getSessionID());
                    if ( session != null ) {
                        session.expire();
                        this.remove(session);
                    }//end if
                    break;
                }
                case SessionMessage.EVT_SESSION_ACCESSED :{
                    Session session = findSession(msg.getSessionID());
                    if ( session != null ) {
                        session.access();
                    }
                    break;
                }
                default:  {
                    //we didn't recognize the message type, do nothing
                    break;
                }
            }//switch
        }
        catch ( Exception x )
        {
            log("Unable to receive message through TCP channel",x,1);
        }
    }

    public void messageDataReceived(SessionMessage msg) {
        try {
            messageReceived(msg, msg.getAddress()!=null?(Member)msg.getAddress():null);
        } catch(Throwable ex){
            log("InMemoryReplicationManager.messageDataReceived()", ex);
        }//catch
    }

    public boolean isStateTransferred() {
        return stateTransferred;
    }

    public void log(String msg)  {
        log(msg,3);
    }

    public void log(String msg, Throwable x) {
        log(msg,x,3);
    }
    public void log(String msg, int level) {
        if ( getDebug() >= level ) {
            String lmsg = msg;
            if ( mPrintToScreen ) System.out.println(lmsg);
            log.info(lmsg);
        }
    }

    public void log(String msg, Throwable x, int level)  {
        if ( getDebug() >= level )  {
            String lmsg = msg;
            if ( mPrintToScreen ) {
                System.out.println(lmsg);
                x.printStackTrace();
            }
            log.error(lmsg,x);
        }//end if
    }
    public void setName(String name) {
        this.name = name;
    }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -