📄 simpletcpreplicationmanager.java
字号:
boolean hasPrincipal = session.getPrincipal() != null; session_out.writeBoolean(hasPrincipal); if ( hasPrincipal ) { session_out.writeObject(SerializablePrincipal.createPrincipal((GenericPrincipal)session.getPrincipal())); }//end if ((ReplicatedSession)session).writeObjectData(session_out); return session_data.toByteArray(); } catch ( Exception x ) { log.error("Failed to serialize the session!",x); } return null; } /** * Open Stream and use correct ClassLoader (Container) Switch * ThreadClassLoader * * @param data * @return The object input stream * @throws IOException */ public ReplicationStream getReplicationStream(byte[] data) throws IOException { return getReplicationStream(data,0,data.length); } public ReplicationStream getReplicationStream(byte[] data, int offset, int length) throws IOException { ByteArrayInputStream fis =null; ReplicationStream ois = null; Loader loader = null; ClassLoader classLoader = null; //fix to be able to run the DeltaManager //stand alone without a container. //use the Threads context class loader if (container != null) loader = container.getLoader(); if (loader != null) classLoader = loader.getClassLoader(); else classLoader = Thread.currentThread().getContextClassLoader(); //end fix fis = new ByteArrayInputStream(data, offset, length); if ( classLoader == Thread.currentThread().getContextClassLoader() ) { ois = new ReplicationStream(fis, new ClassLoader[] {classLoader}); } else { ois = new ReplicationStream(fis, new ClassLoader[] {classLoader,Thread.currentThread().getContextClassLoader()}); } return ois; } /** * Reinstantiates a serialized session from the data passed in. * This will first call createSession() so that we get a fresh instance with all * the managers set and all the transient fields validated. * Then it calls Session.readObjectData(byte[]) to deserialize the object * @param data - a byte array containing session data * @return a valid Session object, null if an error occurs * */ protected Session readSession( byte[] data, String sessionId ) { try { ReplicationStream session_in = getReplicationStream(data); Session session = sessionId!=null?this.findSession(sessionId):null; boolean isNew = (session==null); //clear the old values from the existing session if ( session!=null ) { ReplicatedSession rs = (ReplicatedSession)session; rs.expire(false); //cleans up the previous values, since we are not doing removes session = null; }//end if if (session==null) { session = createSession(null,false, false); sessions.remove(session.getIdInternal()); } boolean hasPrincipal = session_in.readBoolean(); SerializablePrincipal p = null; if ( hasPrincipal ) p = (SerializablePrincipal)session_in.readObject(); ((ReplicatedSession)session).readObjectData(session_in); if ( hasPrincipal ) session.setPrincipal(p.getPrincipal(getContainer().getRealm())); ((ReplicatedSession)session).setId(sessionId,isNew); ReplicatedSession rsession = (ReplicatedSession)session; rsession.setAccessCount(1); session.setManager(this); session.setValid(true); rsession.setLastAccessedTime(System.currentTimeMillis()); rsession.setThisAccessedTime(System.currentTimeMillis()); ((ReplicatedSession)session).setAccessCount(0); session.setNew(false); if(log.isTraceEnabled()) log.trace("Session loaded id="+sessionId + " actualId="+session.getId()+ " exists="+this.sessions.containsKey(sessionId)+ " valid="+rsession.isValid()); return session; } catch ( Exception x ) { log.error("Failed to deserialize the session!",x); } return null; } public String getName() { return this.name; } /** * Prepare for the beginning of active use of the public methods of this * component. This method should be called after <code>configure()</code>, * and before any of the public methods of the component are utilized.<BR> * Starts the cluster communication channel, this will connect with the other nodes * in the cluster, and request the current session state to be transferred to this node. * @exception IllegalStateException if this component has already been * started * @exception LifecycleException if this component detects a fatal error * that prevents this component from being used */ public void start() throws LifecycleException { mManagerRunning = true; super.start(); //start the javagroups channel try { //the channel is already running if ( mChannelStarted ) return; if(log.isInfoEnabled()) log.info("Starting clustering manager...:"+getName()); if ( cluster == null ) { log.error("Starting... no cluster associated with this context:"+getName()); return; } cluster.addManager(getName(),this); if (cluster.getMembers().length > 0) { Member mbr = cluster.getMembers()[0]; SessionMessage msg = new SessionMessageImpl(this.getName(), SessionMessage.EVT_GET_ALL_SESSIONS, null, "GET-ALL", "GET-ALL-"+this.getName()); cluster.send(msg, mbr); if(log.isWarnEnabled()) log.warn("Manager["+getName()+"], requesting session state from "+mbr+ ". This operation will timeout if no session state has been received within "+ "60 seconds"); long reqStart = System.currentTimeMillis(); long reqNow = 0; boolean isTimeout=false; do { try { Thread.sleep(100); }catch ( Exception sleep) {} reqNow = System.currentTimeMillis(); isTimeout=((reqNow-reqStart)>(1000*60)); } while ( (!isStateTransferred()) && (!isTimeout)); if ( isTimeout || (!isStateTransferred()) ) { log.error("Manager["+getName()+"], No session state received, timing out."); }else { if(log.isInfoEnabled()) log.info("Manager["+getName()+"], session state received in "+(reqNow-reqStart)+" ms."); } } else { if(log.isInfoEnabled()) log.info("Manager["+getName()+"], skipping state transfer. No members active in cluster group."); }//end if mChannelStarted = true; } catch ( Exception x ) { log.error("Unable to start SimpleTcpReplicationManager",x); } } /** * Gracefully terminate the active use of the public methods of this * component. This method should be the last one called on a given * instance of this component.<BR> * This will disconnect the cluster communication channel and stop the listener thread. * @exception IllegalStateException if this component has not been started * @exception LifecycleException if this component detects a fatal error * that needs to be reported */ public void stop() throws LifecycleException { mManagerRunning = false; mChannelStarted = false; super.stop(); //stop the javagroup channel try { this.sessions.clear(); cluster.removeManager(getName(),this);// mReplicationListener.stopListening();// mReplicationTransmitter.stop();// service.stop();// service = null; } catch ( Exception x ) { log.error("Unable to stop SimpleTcpReplicationManager",x); } } public void setDistributable(boolean dist) { this.distributable = dist; } public boolean getDistributable() { return distributable; } /** * This method is called by the received thread when a SessionMessage has * been received from one of the other nodes in the cluster. * @param msg - the message received * @param sender - the sender of the message, this is used if we receive a * EVT_GET_ALL_SESSION message, so that we only reply to * the requesting node */ protected void messageReceived( SessionMessage msg, Member sender ) { try { if(log.isInfoEnabled()) { log.debug("Received SessionMessage of type="+msg.getEventTypeString()); log.debug("Received SessionMessage sender="+sender); } switch ( msg.getEventType() ) { case SessionMessage.EVT_GET_ALL_SESSIONS: { //get a list of all the session from this manager Object[] sessions = findSessions(); java.io.ByteArrayOutputStream bout = new java.io.ByteArrayOutputStream(); java.io.ObjectOutputStream oout = new java.io.ObjectOutputStream(bout); oout.writeInt(sessions.length); for (int i=0; i<sessions.length; i++){ ReplicatedSession ses = (ReplicatedSession)sessions[i]; oout.writeUTF(ses.getIdInternal()); byte[] data = writeSession(ses); oout.writeObject(data); }//for //don't send a message if we don't have to oout.flush(); oout.close(); byte[] data = bout.toByteArray(); SessionMessage newmsg = new SessionMessageImpl(name, SessionMessage.EVT_ALL_SESSION_DATA, data, "SESSION-STATE","SESSION-STATE-"+getName()); cluster.send(newmsg, sender); break; } case SessionMessage.EVT_ALL_SESSION_DATA: { java.io.ByteArrayInputStream bin = new java.io.ByteArrayInputStream(msg.getSession()); java.io.ObjectInputStream oin = new java.io.ObjectInputStream(bin); int size = oin.readInt(); for ( int i=0; i<size; i++) { String id = oin.readUTF(); byte[] data = (byte[])oin.readObject(); Session session = readSession(data,id); }//for stateTransferred=true; break; } case SessionMessage.EVT_SESSION_CREATED: { Session session = this.readSession(msg.getSession(),msg.getSessionID()); if ( log.isDebugEnabled() ) { log.debug("Received replicated session=" + session + " isValid=" + session.isValid()); } break; } case SessionMessage.EVT_SESSION_EXPIRED: { Session session = findSession(msg.getSessionID()); if ( session != null ) { session.expire(); this.remove(session); }//end if break; } case SessionMessage.EVT_SESSION_ACCESSED :{ Session session = findSession(msg.getSessionID()); if ( session != null ) { session.access(); session.endAccess(); } break; } default: { //we didn't recognize the message type, do nothing break; } }//switch } catch ( Exception x ) { log.error("Unable to receive message through TCP channel",x); } } public void messageDataReceived(ClusterMessage cmsg) { try { if ( cmsg instanceof SessionMessage ) { SessionMessage msg = (SessionMessage)cmsg; messageReceived(msg, msg.getAddress() != null ? (Member) msg.getAddress() : null); } } catch(Throwable ex){ log.error("InMemoryReplicationManager.messageDataReceived()", ex); }//catch } public boolean isStateTransferred() { return stateTransferred; } public void setName(String name) { this.name = name; } public boolean isNotifyListenersOnReplication() { return notifyListenersOnReplication; } public void setNotifyListenersOnReplication(boolean notifyListenersOnReplication) { this.notifyListenersOnReplication = notifyListenersOnReplication; } /* * @see org.apache.catalina.ha.ClusterManager#getCluster() */ public CatalinaCluster getCluster() { return cluster; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -