📄 simpletcpreplicationmanager.java
字号:
/* * Copyright 1999,2004 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.catalina.ha.session;import java.io.IOException;import org.apache.catalina.LifecycleException;import org.apache.catalina.Session;import org.apache.catalina.ha.CatalinaCluster;import org.apache.catalina.ha.ClusterManager;import org.apache.catalina.ha.ClusterMessage;import org.apache.catalina.tribes.Member;import org.apache.catalina.realm.GenericPrincipal;import org.apache.catalina.session.StandardManager;import org.apache.catalina.tribes.io.ReplicationStream;import java.io.ByteArrayInputStream;import org.apache.catalina.Loader;/** * Title: Tomcat Session Replication for Tomcat 4.0 <BR> * Description: A very simple straight forward implementation of * session replication of servers in a cluster.<BR> * This session replication is implemented "live". By live * I mean, when a session attribute is added into a session on Node A * a message is broadcasted to other messages and setAttribute is called on the * replicated sessions.<BR> * A full description of this implementation can be found under * <href="http://www.filip.net/tomcat/">Filip's Tomcat Page</a><BR> * * Copyright: See apache license * Company: www.filip.net * @author <a href="mailto:mail@filip.net">Filip Hanik</a> * @author Bela Ban (modifications for synchronous replication) * @version 1.0 for TC 4.0 * Description: The InMemoryReplicationManager is a session manager that replicated * session information in memory. It uses <a href="www.javagroups.com">JavaGroups</a> as * a communication protocol to ensure guaranteed and ordered message delivery. * JavaGroups also provides a very flexible protocol stack to ensure that the replication * can be used in any environment. * <BR><BR> * The InMemoryReplicationManager extends the StandardManager hence it allows for us * to inherit all the basic session management features like expiration, session listeners etc * <BR><BR> * To communicate with other nodes in the cluster, the InMemoryReplicationManager sends out 7 different type of multicast messages * all defined in the SessionMessage class.<BR> * When a session is replicated (not an attribute added/removed) the session is serialized into * a byte array using the StandardSession.readObjectData, StandardSession.writeObjectData methods. */public class SimpleTcpReplicationManager extends StandardManager implements ClusterManager{ public static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( SimpleTcpReplicationManager.class ); //the channel configuration protected String mChannelConfig = null; //the group name protected String mGroupName = "TomcatReplication"; //somehow start() gets called more than once protected boolean mChannelStarted = false; //log to screen protected boolean mPrintToScreen = true; protected boolean defaultMode = false; protected boolean mManagerRunning = false; /** Use synchronous rather than asynchronous replication. Every session modification (creation, change, removal etc) * will be sent to all members. The call will then wait for max milliseconds, or forever (if timeout is 0) for * all responses. */ protected boolean synchronousReplication=true; /** Set to true if we don't want the sessions to expire on shutdown */ protected boolean mExpireSessionsOnShutdown = true; protected boolean useDirtyFlag = false; protected String name; protected boolean distributable = true; protected CatalinaCluster cluster; protected java.util.HashMap invalidatedSessions = new java.util.HashMap(); /** * Flag to keep track if the state has been transferred or not * Assumes false. */ protected boolean stateTransferred = false; private boolean notifyListenersOnReplication; private boolean sendClusterDomainOnly = true ; /** * Constructor, just calls super() * */ public SimpleTcpReplicationManager() { super(); } public boolean isSendClusterDomainOnly() { return sendClusterDomainOnly; } /** * @param sendClusterDomainOnly The sendClusterDomainOnly to set. */ public void setSendClusterDomainOnly(boolean sendClusterDomainOnly) { this.sendClusterDomainOnly = sendClusterDomainOnly; } /** * @return Returns the defaultMode. */ public boolean isDefaultMode() { return defaultMode; } /** * @param defaultMode The defaultMode to set. */ public void setDefaultMode(boolean defaultMode) { this.defaultMode = defaultMode; } public boolean isManagerRunning() { return mManagerRunning; } public void setUseDirtyFlag(boolean usedirtyflag) { this.useDirtyFlag = usedirtyflag; } public void setExpireSessionsOnShutdown(boolean expireSessionsOnShutdown) { mExpireSessionsOnShutdown = expireSessionsOnShutdown; } public void setCluster(CatalinaCluster cluster) { if(log.isDebugEnabled()) log.debug("Cluster associated with SimpleTcpReplicationManager"); this.cluster = cluster; } public boolean getExpireSessionsOnShutdown() { return mExpireSessionsOnShutdown; } public void setPrintToScreen(boolean printtoscreen) { if(log.isDebugEnabled()) log.debug("Setting screen debug to:"+printtoscreen); mPrintToScreen = printtoscreen; } public void setSynchronousReplication(boolean flag) { synchronousReplication=flag; } /** * Override persistence since they don't go hand in hand with replication for now. */ public void unload() throws IOException { if ( !getDistributable() ) { super.unload(); } } /** * Creates a HTTP session. * Most of the code in here is copied from the StandardManager. * This is not pretty, yeah I know, but it was necessary since the * StandardManager had hard coded the session instantiation to the a * StandardSession, when we actually want to instantiate a ReplicatedSession<BR> * If the call comes from the Tomcat servlet engine, a SessionMessage goes out to the other * nodes in the cluster that this session has been created. * @param notify - if set to true the other nodes in the cluster will be notified. * This flag is needed so that we can create a session before we deserialize * a replicated one * * @see ReplicatedSession */ protected Session createSession(String sessionId, boolean notify, boolean setId) { //inherited from the basic manager if ((getMaxActiveSessions() >= 0) && (sessions.size() >= getMaxActiveSessions())) throw new IllegalStateException(sm.getString("standardManager.createSession.ise")); Session session = new ReplicatedSession(this); // Initialize the properties of the new session and return it session.setNew(true); session.setValid(true); session.setCreationTime(System.currentTimeMillis()); session.setMaxInactiveInterval(this.maxInactiveInterval); if(sessionId == null) sessionId = generateSessionId(); if ( setId ) session.setId(sessionId); if ( notify && (cluster!=null) ) { ((ReplicatedSession)session).setIsDirty(true); } return (session); }//createSession //========================================================================= // OVERRIDE THESE METHODS TO IMPLEMENT THE REPLICATION //========================================================================= /** * Construct and return a new session object, based on the default * settings specified by this Manager's properties. The session * id will be assigned by this method, and available via the getId() * method of the returned session. If a new session cannot be created * for any reason, return <code>null</code>. * * @exception IllegalStateException if a new session cannot be * instantiated for any reason */ public Session createSession(String sessionId) { //create a session and notify the other nodes in the cluster Session session = createSession(sessionId,getDistributable(),true); add(session); return session; } public void sessionInvalidated(String sessionId) { synchronized ( invalidatedSessions ) { invalidatedSessions.put(sessionId, sessionId); } } public String[] getInvalidatedSessions() { synchronized ( invalidatedSessions ) { String[] result = new String[invalidatedSessions.size()]; invalidatedSessions.values().toArray(result); return result; } } public ClusterMessage requestCompleted(String sessionId) { if ( !getDistributable() ) { log.warn("Received requestCompleted message, although this context["+ getName()+"] is not distributable. Ignoring message"); return null; } //notify javagroups try { if ( invalidatedSessions.get(sessionId) != null ) { synchronized ( invalidatedSessions ) { invalidatedSessions.remove(sessionId); SessionMessage msg = new SessionMessageImpl(name, SessionMessage.EVT_SESSION_EXPIRED, null, sessionId, sessionId); return msg; } } else { ReplicatedSession session = (ReplicatedSession) findSession( sessionId); if (session != null) { //return immediately if the session is not dirty if (useDirtyFlag && (!session.isDirty())) { //but before we return doing nothing, //see if we should send //an updated last access message so that //sessions across cluster dont expire long interval = session.getMaxInactiveInterval(); long lastaccdist = System.currentTimeMillis() - session.getLastAccessWasDistributed(); if ( ((interval*1000) / lastaccdist)< 3 ) { SessionMessage accmsg = new SessionMessageImpl(name, SessionMessage.EVT_SESSION_ACCESSED, null, sessionId, sessionId); session.setLastAccessWasDistributed(System.currentTimeMillis()); return accmsg; } return null; } session.setIsDirty(false); if (log.isDebugEnabled()) { try { log.debug("Sending session to cluster=" + session); } catch (Exception ignore) {} } SessionMessage msg = new SessionMessageImpl(name, SessionMessage.EVT_SESSION_CREATED, writeSession(session), session.getIdInternal(), session.getIdInternal()); return msg; } //end if }//end if } catch (Exception x ) { log.error("Unable to replicate session",x); } return null; } /** * Serialize a session into a byte array<BR> * This method simple calls the writeObjectData method on the session * and returns the byte data from that call * @param session - the session to be serialized * @return a byte array containing the session data, null if the serialization failed */ protected byte[] writeSession( Session session ) { try { java.io.ByteArrayOutputStream session_data = new java.io.ByteArrayOutputStream(); java.io.ObjectOutputStream session_out = new java.io.ObjectOutputStream(session_data); session_out.flush();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -