📄 simpletcpreplicationmanager.java
字号:
boolean hasPrincipal = session.getPrincipal() != null;
session_out.writeBoolean(hasPrincipal);
if ( hasPrincipal )
{
session_out.writeObject(SerializablePrincipal.createPrincipal((GenericPrincipal)session.getPrincipal()));
}//end if
((ReplicatedSession)session).writeObjectData(session_out);
return session_data.toByteArray();
}
catch ( Exception x )
{
log.error("Failed to serialize the session!",x);
}
return null;
}
/**
* Open Stream and use correct ClassLoader (Container) Switch
* ThreadClassLoader
*
* @param data
* @return The object input stream
* @throws IOException
*/
public ReplicationStream getReplicationStream(byte[] data) throws IOException {
return getReplicationStream(data,0,data.length);
}
public ReplicationStream getReplicationStream(byte[] data, int offset, int length) throws IOException {
ByteArrayInputStream fis =null;
ReplicationStream ois = null;
Loader loader = null;
ClassLoader classLoader = null;
//fix to be able to run the DeltaManager
//stand alone without a container.
//use the Threads context class loader
if (container != null)
loader = container.getLoader();
if (loader != null)
classLoader = loader.getClassLoader();
else
classLoader = Thread.currentThread().getContextClassLoader();
//end fix
fis = new ByteArrayInputStream(data, offset, length);
if ( classLoader == Thread.currentThread().getContextClassLoader() ) {
ois = new ReplicationStream(fis, new ClassLoader[] {classLoader});
} else {
ois = new ReplicationStream(fis, new ClassLoader[] {classLoader,Thread.currentThread().getContextClassLoader()});
}
return ois;
}
/**
* Reinstantiates a serialized session from the data passed in.
* This will first call createSession() so that we get a fresh instance with all
* the managers set and all the transient fields validated.
* Then it calls Session.readObjectData(byte[]) to deserialize the object
* @param data - a byte array containing session data
* @return a valid Session object, null if an error occurs
*
*/
protected Session readSession( byte[] data, String sessionId )
{
try
{
ReplicationStream session_in = getReplicationStream(data);
Session session = sessionId!=null?this.findSession(sessionId):null;
boolean isNew = (session==null);
//clear the old values from the existing session
if ( session!=null ) {
ReplicatedSession rs = (ReplicatedSession)session;
rs.expire(false); //cleans up the previous values, since we are not doing removes
session = null;
}//end if
if (session==null) {
session = createSession(null,false, false);
sessions.remove(session.getIdInternal());
}
boolean hasPrincipal = session_in.readBoolean();
SerializablePrincipal p = null;
if ( hasPrincipal )
p = (SerializablePrincipal)session_in.readObject();
((ReplicatedSession)session).readObjectData(session_in);
if ( hasPrincipal )
session.setPrincipal(p.getPrincipal(getContainer().getRealm()));
((ReplicatedSession)session).setId(sessionId,isNew);
ReplicatedSession rsession = (ReplicatedSession)session;
rsession.setAccessCount(1);
session.setManager(this);
session.setValid(true);
rsession.setLastAccessedTime(System.currentTimeMillis());
rsession.setThisAccessedTime(System.currentTimeMillis());
((ReplicatedSession)session).setAccessCount(0);
session.setNew(false);
if(log.isTraceEnabled())
log.trace("Session loaded id="+sessionId +
" actualId="+session.getId()+
" exists="+this.sessions.containsKey(sessionId)+
" valid="+rsession.isValid());
return session;
}
catch ( Exception x )
{
log.error("Failed to deserialize the session!",x);
}
return null;
}
public String getName() {
return this.name;
}
/**
* Prepare for the beginning of active use of the public methods of this
* component. This method should be called after <code>configure()</code>,
* and before any of the public methods of the component are utilized.<BR>
* Starts the cluster communication channel, this will connect with the other nodes
* in the cluster, and request the current session state to be transferred to this node.
* @exception IllegalStateException if this component has already been
* started
* @exception LifecycleException if this component detects a fatal error
* that prevents this component from being used
*/
public void start() throws LifecycleException {
mManagerRunning = true;
super.start();
try {
//the channel is already running
if ( mChannelStarted ) return;
if(log.isInfoEnabled())
log.info("Starting clustering manager...:"+getName());
if ( cluster == null ) {
log.error("Starting... no cluster associated with this context:"+getName());
return;
}
cluster.registerManager(this);
if (cluster.getMembers().length > 0) {
Member mbr = cluster.getMembers()[0];
SessionMessage msg =
new SessionMessageImpl(this.getName(),
SessionMessage.EVT_GET_ALL_SESSIONS,
null,
"GET-ALL",
"GET-ALL-"+this.getName());
cluster.send(msg, mbr);
if(log.isWarnEnabled())
log.warn("Manager["+getName()+"], requesting session state from "+mbr+
". This operation will timeout if no session state has been received within "+
"60 seconds");
long reqStart = System.currentTimeMillis();
long reqNow = 0;
boolean isTimeout=false;
do {
try {
Thread.sleep(100);
}catch ( Exception sleep) {}
reqNow = System.currentTimeMillis();
isTimeout=((reqNow-reqStart)>(1000*60));
} while ( (!isStateTransferred()) && (!isTimeout));
if ( isTimeout || (!isStateTransferred()) ) {
log.error("Manager["+getName()+"], No session state received, timing out.");
}else {
if(log.isInfoEnabled())
log.info("Manager["+getName()+"], session state received in "+(reqNow-reqStart)+" ms.");
}
} else {
if(log.isInfoEnabled())
log.info("Manager["+getName()+"], skipping state transfer. No members active in cluster group.");
}//end if
mChannelStarted = true;
} catch ( Exception x ) {
log.error("Unable to start SimpleTcpReplicationManager",x);
}
}
/**
* Gracefully terminate the active use of the public methods of this
* component. This method should be the last one called on a given
* instance of this component.<BR>
* This will disconnect the cluster communication channel and stop the listener thread.
* @exception IllegalStateException if this component has not been started
* @exception LifecycleException if this component detects a fatal error
* that needs to be reported
*/
public void stop() throws LifecycleException
{
mManagerRunning = false;
mChannelStarted = false;
super.stop();
try
{
this.sessions.clear();
cluster.removeManager(this);
}
catch ( Exception x )
{
log.error("Unable to stop SimpleTcpReplicationManager",x);
}
}
public void setDistributable(boolean dist) {
this.distributable = dist;
}
public boolean getDistributable() {
return distributable;
}
/**
* This method is called by the received thread when a SessionMessage has
* been received from one of the other nodes in the cluster.
* @param msg - the message received
* @param sender - the sender of the message, this is used if we receive a
* EVT_GET_ALL_SESSION message, so that we only reply to
* the requesting node
*/
protected void messageReceived( SessionMessage msg, Member sender ) {
try {
if(log.isInfoEnabled()) {
log.debug("Received SessionMessage of type="+msg.getEventTypeString());
log.debug("Received SessionMessage sender="+sender);
}
switch ( msg.getEventType() ) {
case SessionMessage.EVT_GET_ALL_SESSIONS: {
//get a list of all the session from this manager
Object[] sessions = findSessions();
java.io.ByteArrayOutputStream bout = new java.io.ByteArrayOutputStream();
java.io.ObjectOutputStream oout = new java.io.ObjectOutputStream(bout);
oout.writeInt(sessions.length);
for (int i=0; i<sessions.length; i++){
ReplicatedSession ses = (ReplicatedSession)sessions[i];
oout.writeUTF(ses.getIdInternal());
byte[] data = writeSession(ses);
oout.writeObject(data);
}//for
//don't send a message if we don't have to
oout.flush();
oout.close();
byte[] data = bout.toByteArray();
SessionMessage newmsg = new SessionMessageImpl(name,
SessionMessage.EVT_ALL_SESSION_DATA,
data, "SESSION-STATE","SESSION-STATE-"+getName());
cluster.send(newmsg, sender);
break;
}
case SessionMessage.EVT_ALL_SESSION_DATA: {
java.io.ByteArrayInputStream bin =
new java.io.ByteArrayInputStream(msg.getSession());
java.io.ObjectInputStream oin = new java.io.ObjectInputStream(bin);
int size = oin.readInt();
for ( int i=0; i<size; i++) {
String id = oin.readUTF();
byte[] data = (byte[])oin.readObject();
Session session = readSession(data,id);
}//for
stateTransferred=true;
break;
}
case SessionMessage.EVT_SESSION_CREATED: {
Session session = this.readSession(msg.getSession(),msg.getSessionID());
if ( log.isDebugEnabled() ) {
log.debug("Received replicated session=" + session +
" isValid=" + session.isValid());
}
break;
}
case SessionMessage.EVT_SESSION_EXPIRED: {
Session session = findSession(msg.getSessionID());
if ( session != null ) {
session.expire();
this.remove(session);
}//end if
break;
}
case SessionMessage.EVT_SESSION_ACCESSED :{
Session session = findSession(msg.getSessionID());
if ( session != null ) {
session.access();
session.endAccess();
}
break;
}
default: {
//we didn't recognize the message type, do nothing
break;
}
}//switch
}
catch ( Exception x )
{
log.error("Unable to receive message through TCP channel",x);
}
}
public void messageDataReceived(ClusterMessage cmsg) {
try {
if ( cmsg instanceof SessionMessage ) {
SessionMessage msg = (SessionMessage)cmsg;
messageReceived(msg,
msg.getAddress() != null ? (Member) msg.getAddress() : null);
}
} catch(Throwable ex){
log.error("InMemoryReplicationManager.messageDataReceived()", ex);
}//catch
}
public boolean isStateTransferred() {
return stateTransferred;
}
public void setName(String name) {
this.name = name;
}
public boolean isNotifyListenersOnReplication() {
return notifyListenersOnReplication;
}
public void setNotifyListenersOnReplication(boolean notifyListenersOnReplication) {
this.notifyListenersOnReplication = notifyListenersOnReplication;
}
/*
* @see org.apache.catalina.ha.ClusterManager#getCluster()
*/
public CatalinaCluster getCluster() {
return cluster;
}
public ClusterManager cloneFromTemplate() {
throw new UnsupportedOperationException();
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -