📄 deltamanager.java
字号:
/** * Get the lifecycle listeners associated with this lifecycle. If this * Lifecycle has no listeners registered, a zero-length array is returned. */ public LifecycleListener[] findLifecycleListeners() { return lifecycle.findLifecycleListeners(); } /** * Remove a lifecycle event listener from this component. * * @param listener * The listener to remove */ public void removeLifecycleListener(LifecycleListener listener) { lifecycle.removeLifecycleListener(listener); } /** * Prepare for the beginning of active use of the public methods of this * component. This method should be called after <code>configure()</code>, * and before any of the public methods of the component are utilized. * * @exception LifecycleException * if this component detects a fatal error that prevents this * component from being used */ public void start() throws LifecycleException { if (!initialized) init(); // Validate and update our current component state if (started) { return; } started = true; lifecycle.fireLifecycleEvent(START_EVENT, null); // Force initialization of the random number generator generateSessionId(); // Load unloaded sessions, if any try { //the channel is already running Cluster cluster = getCluster() ; // stop remove cluster binding //wow, how many nested levels of if statements can we have ;) if(cluster == null) { Container context = getContainer() ; if(context != null && context instanceof Context) { Container host = context.getParent() ; if(host != null && host instanceof Host) { cluster = host.getCluster(); if(cluster != null && cluster instanceof CatalinaCluster) { setCluster((CatalinaCluster) cluster) ; } else { Container engine = host.getParent() ; if(engine != null && engine instanceof Engine) { cluster = engine.getCluster(); if(cluster != null && cluster instanceof CatalinaCluster) { setCluster((CatalinaCluster) cluster) ; } } else { cluster = null ; } } } } } if (cluster == null) { log.error(sm.getString("deltaManager.noCluster", getName())); return; } else { if (log.isInfoEnabled()) { String type = "unknown" ; if( cluster.getContainer() instanceof Host){ type = "Host" ; } else if( cluster.getContainer() instanceof Engine){ type = "Engine" ; } log.info(sm.getString("deltaManager.registerCluster", getName(), type, cluster.getClusterName())); } } if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.startClustering", getName())); //to survice context reloads, as only a stop/start is called, not // createManager ((CatalinaCluster)cluster).addManager(getName(), this); getAllClusterSessions(); } catch (Throwable t) { log.error(sm.getString("deltaManager.managerLoad"), t); } } /** * get from first session master the backup from all clustered sessions * @see #findSessionMasterMember() */ public synchronized void getAllClusterSessions() { if (cluster != null && cluster.getMembers().length > 0) { long beforeSendTime = System.currentTimeMillis(); Member mbr = findSessionMasterMember(); if(mbr == null) { // No domain member found return; } SessionMessage msg = new SessionMessageImpl(this.getName(),SessionMessage.EVT_GET_ALL_SESSIONS, null, "GET-ALL","GET-ALL-" + getName()); // set reference time stateTransferCreateSendTime = beforeSendTime ; // request session state counterSend_EVT_GET_ALL_SESSIONS++; stateTransfered = false ; // FIXME This send call block the deploy thread, when sender waitForAck is enabled try { synchronized(receivedMessageQueue) { receiverQueue = true ; } cluster.send(msg, mbr); if (log.isWarnEnabled()) log.warn(sm.getString("deltaManager.waitForSessionState",getName(), mbr)); // FIXME At sender ack mode this method check only the state transfer and resend is a problem! waitForSendAllSessions(beforeSendTime); } finally { synchronized(receivedMessageQueue) { for (Iterator iter = receivedMessageQueue.iterator(); iter.hasNext();) { SessionMessage smsg = (SessionMessage) iter.next(); if (!stateTimestampDrop) { messageReceived(smsg, smsg.getAddress() != null ? (Member) smsg.getAddress() : null); } else { if (smsg.getEventType() != SessionMessage.EVT_GET_ALL_SESSIONS && smsg.getTimestamp() >= stateTransferCreateSendTime) { // FIXME handle EVT_GET_ALL_SESSIONS later messageReceived(smsg,smsg.getAddress() != null ? (Member) smsg.getAddress() : null); } else { if (log.isWarnEnabled()) { log.warn(sm.getString("deltaManager.dropMessage",getName(), smsg.getEventTypeString(),new Date(stateTransferCreateSendTime), new Date(smsg.getTimestamp()))); } } } } receivedMessageQueue.clear(); receiverQueue = false ; } } } else { if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.noMembers", getName())); } } /** * Register cross context session at replication valve thread local * @param session cross context session */ protected void registerSessionAtReplicationValve(DeltaSession session) { if(replicationValve == null) { if(container instanceof StandardContext && ((StandardContext)container).getCrossContext()) { Cluster cluster = getCluster() ; if(cluster != null && cluster instanceof CatalinaCluster) { Valve[] valves = ((CatalinaCluster)cluster).getValves(); if(valves != null && valves.length > 0) { for(int i=0; replicationValve == null && i < valves.length ; i++ ){ if(valves[i] instanceof ReplicationValve) replicationValve = (ReplicationValve)valves[i] ; }//for if(replicationValve == null && log.isDebugEnabled()) { log.debug("no ReplicationValve found for CrossContext Support"); }//endif }//end if }//endif }//end if }//end if if(replicationValve != null) { replicationValve.registerReplicationSession(session); } } /** * Find the master of the session state * @return master member of sessions */ protected Member findSessionMasterMember() { Member mbr = null; Member mbrs[] = cluster.getMembers(); String localMemberDomain = cluster.getLocalMember().getDomain(); if(isSendClusterDomainOnly()) { for (int i = 0; mbr == null && i < mbrs.length; i++) { Member member = mbrs[i]; if(localMemberDomain.equals(member.getDomain())) mbr = member ; } } else { if(mbrs.length != 0 ) mbr = mbrs[0]; } if(mbr == null && log.isWarnEnabled()) log.warn(sm.getString("deltaManager.noMasterMember",getName(), localMemberDomain)); if(mbr != null && log.isDebugEnabled()) log.warn(sm.getString("deltaManager.foundMasterMember",getName(), mbr)); return mbr; } /** * Wait that cluster session state is transfer or timeout after 60 Sec * With stateTransferTimeout == -1 wait that backup is transfered (forever mode) */ protected void waitForSendAllSessions(long beforeSendTime) { long reqStart = System.currentTimeMillis(); long reqNow = reqStart ; boolean isTimeout = false; if(getStateTransferTimeout() > 0) { // wait that state is transfered with timeout check do { try { Thread.sleep(100); } catch (Exception sleep) { // } reqNow = System.currentTimeMillis(); isTimeout = ((reqNow - reqStart) > (1000 * getStateTransferTimeout())); } while ((!getStateTransfered()) && (!isTimeout)); } else { if(getStateTransferTimeout() == -1) { // wait that state is transfered do { try { Thread.sleep(100); } catch (Exception sleep) { } } while ((!getStateTransfered())); reqNow = System.currentTimeMillis(); } } if (isTimeout || (!getStateTransfered())) { counterNoStateTransfered++ ; log.error(sm.getString("deltaManager.noSessionState",getName(),new Date(beforeSendTime),new Long(reqNow - beforeSendTime))); } else { if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.sessionReceived",getName(), new Date(beforeSendTime), new Long(reqNow - beforeSendTime))); } } /** * Gracefully terminate the active use of the public methods of this * component. This method should be the last one called on a given instance * of this component. * * @exception LifecycleException * if this component detects a fatal error that needs to be * reported */ public void stop() throws LifecycleException { if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.stopped", getName())); // Validate and update our current component state if (!started) throw new LifecycleException(sm.getString("deltaManager.notStarted")); lifecycle.fireLifecycleEvent(STOP_EVENT, null); started = false; // Expire all active sessions if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.expireSessions", getName())); Session sessions[] = findSessions(); for (int i = 0; i < sessions.length; i++) { DeltaSession session = (DeltaSession) sessions[i]; if (!session.isValid()) continue; try { session.expire(true, isExpireSessionsOnShutdown()); } catch (Throwable ignore) { ; } } // Require a new random number generator if we are restarted this.random = null; getCluster().removeManager(getName(),this); replicationValve = null; if (initialized) { destroy(); } } // ----------------------------------------- PropertyChangeListener Methods /** * Process property change events from our associated Context. * * @param event * The property change event that has occurred */ public void propertyChange(PropertyChangeEvent event) { // Validate the source of this event if (!(event.getSource() instanceof Context)) return; // Process a relevant property change if (event.getPropertyName().equals("sessionTimeout")) { try { setMaxInactiveInterval(((Integer) event.getNewValue()).intValue() * 60); } catch (NumberFormatException e) { log.error(sm.getString("deltaManager.sessionTimeout", event.getNewValue())); } } } // -------------------------------------------------------- Replication // Methods /** * A message was received from another node, this is the callback method to * implement if you are interested in receiving replication messages. * * @param cmsg - * the message received. */ public void messageDataReceived(ClusterMessage cmsg) { if (cmsg != null && cmsg instanceof SessionMessage) { SessionMessage msg = (SessionMessage) cmsg; switch (msg.getEventType()) { case SessionMessage.EVT_GET_ALL_SESSIONS: case SessionMessage.EVT_SESSION_CREATED: case SessionMessage.EVT_SESSION_EXPIRED: case SessionMessage.EVT_SESSION_ACCESSED: case SessionMessage.EVT_SESSION_DELTA: { synchronized(receivedMessageQueue) { if(receiverQueue) { receivedMessageQueue.add(msg); return ; } } break; } default: { //we didn't queue, do nothing break; } } //switch messageReceived(msg, msg.getAddress() != null ? (Member) msg.getAddress() : null); } } /** * When the request has been completed, the replication valve will notify * the manager, and the manager will decide whether any replication is * needed or not. If there is a need for replication, the manager will * create a session message and that will be replicated. The cluster * determines where it gets sent. * * @param sessionId - * the sessionId that just completed. * @return a SessionMessage to be sent, */ public ClusterMessage requestCompleted(String sessionId) { try { DeltaSession session = (DeltaSession) findSession(sessionId); DeltaRequest deltaRequest = session.getDeltaRequest(); SessionMessage msg = null; boolean isDeltaRequest = false ; synchronized(deltaRequest) { isDeltaRequest = deltaRequest.getSize() > 0 ; if (isDeltaRequest) { counterSend_EVT_SESSION_DELTA++; byte[] data = serializeDeltaRequest(deltaRequest); msg = new SessionMessageImpl(getName(), SessionMessage.EVT_SESSION_DELTA, data, sessionId, sessionId + "-" + System.currentTimeMillis()); session.resetDeltaRequest(); } } if(!isDeltaRequest) { if(!session.isPrimarySession()) { counterSend_EVT_SESSION_ACCESSED++; msg = new SessionMessageImpl(getName(), SessionMessage.EVT_SESSION_ACCESSED, null,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -