📄 replicationvalve.java
字号:
} if (primaryIndicator) { createPrimaryIndicator(request) ; } Context context = request.getContext(); boolean isCrossContext = context != null && context instanceof StandardContext && ((StandardContext) context).getCrossContext(); try { if(isCrossContext) { if(log.isDebugEnabled()) log.debug(sm.getString("ReplicationValve.crossContext.add")); //FIXME add Pool of Arraylists crossContextSessions.set(new ArrayList()); } getNext().invoke(request, response); Manager manager = request.getContext().getManager(); if (manager != null && manager instanceof ClusterManager) { ClusterManager clusterManager = (ClusterManager) manager; CatalinaCluster containerCluster = (CatalinaCluster) getContainer().getCluster(); if (containerCluster == null) { if (log.isWarnEnabled()) log.warn(sm.getString("ReplicationValve.nocluster")); return; } // valve cluster can access manager - other cluster handle replication // at host level - hopefully! if(containerCluster.getManager(clusterManager.getName()) == null) return ; if(containerCluster.hasMembers()) { sendReplicationMessage(request, totalstart, isCrossContext, clusterManager, containerCluster); } else { resetReplicationRequest(request,isCrossContext); } } } finally { // Array must be remove: Current master request send endAccess at recycle. // Don't register this request session again! if(isCrossContext) { if(log.isDebugEnabled()) log.debug(sm.getString("ReplicationValve.crossContext.remove")); // crossContextSessions.remove() only exist at Java 5 // register ArrayList at a pool crossContextSessions.set(null); } } } /** * reset the active statitics */ public void resetStatistics() { totalRequestTime = 0 ; totalSendTime = 0 ; lastSendTime = 0 ; nrOfFilterRequests = 0 ; nrOfRequests = 0 ; nrOfSendRequests = 0; nrOfCrossContextSendRequests = 0; } /** * Return a String rendering of this object. */ public String toString() { StringBuffer sb = new StringBuffer("ReplicationValve["); if (container != null) sb.append(container.getName()); sb.append("]"); return (sb.toString()); } // --------------------------------------------------------- Protected Methods /** * @param request * @param totalstart * @param isCrossContext * @param clusterManager * @param containerCluster */ protected void sendReplicationMessage(Request request, long totalstart, boolean isCrossContext, ClusterManager clusterManager, CatalinaCluster containerCluster) { //this happens after the request long start = 0; if(isDoProcessingStats()) { start = System.currentTimeMillis(); } try { // send invalid sessions // DeltaManager returns String[0] if (!(clusterManager instanceof DeltaManager)) sendInvalidSessions(clusterManager, containerCluster); // send replication sendSessionReplicationMessage(request, clusterManager, containerCluster); if(isCrossContext) sendCrossContextSession(containerCluster); } catch (Exception x) { // FIXME we have a lot of sends, but the trouble with one node stops the correct replication to other nodes! log.error(sm.getString("ReplicationValve.send.failure"), x); } finally { // FIXME this stats update are not cheap!! if(isDoProcessingStats()) { updateStats(totalstart,start); } } } /** * Send all changed cross context sessions to backups * @param containerCluster */ protected void sendCrossContextSession(CatalinaCluster containerCluster) { Object sessions = crossContextSessions.get(); if(sessions != null && sessions instanceof List && ((List)sessions).size() >0) { for(Iterator iter = ((List)sessions).iterator(); iter.hasNext() ;) { Session session = (Session)iter.next(); if(log.isDebugEnabled()) log.debug(sm.getString("ReplicationValve.crossContext.sendDelta", session.getManager().getContainer().getName() )); sendMessage(session,(ClusterManager)session.getManager(),containerCluster); if(isDoProcessingStats()) { nrOfCrossContextSendRequests++; } } } } /** * Fix memory leak for long sessions with many changes, when no backup member exists! * @param request current request after responce is generated * @param isCrossContext check crosscontext threadlocal */ protected void resetReplicationRequest(Request request, boolean isCrossContext) { Session contextSession = request.getSessionInternal(false); if(contextSession != null & contextSession instanceof DeltaSession){ resetDeltaRequest(contextSession); ((DeltaSession)contextSession).setPrimarySession(true); } if(isCrossContext) { Object sessions = crossContextSessions.get(); if(sessions != null && sessions instanceof List && ((List)sessions).size() >0) { Iterator iter = ((List)sessions).iterator(); for(; iter.hasNext() ;) { Session session = (Session)iter.next(); resetDeltaRequest(session); if(session instanceof DeltaSession) ((DeltaSession)contextSession).setPrimarySession(true); } } } } /** * Reset DeltaRequest from session * @param session HttpSession from current request or cross context session */ protected void resetDeltaRequest(Session session) { if(log.isDebugEnabled()) { log.debug(sm.getString("ReplicationValve.resetDeltaRequest" , session.getManager().getContainer().getName() )); } ((DeltaSession)session).resetDeltaRequest(); } /** * Send Cluster Replication Request * @param request current request * @param manager session manager * @param cluster replication cluster */ protected void sendSessionReplicationMessage(Request request, ClusterManager manager, CatalinaCluster cluster) { Session session = request.getSessionInternal(false); if (session != null) { String uri = request.getDecodedRequestURI(); // request without session change if (!isRequestWithoutSessionChange(uri)) { if (log.isDebugEnabled()) log.debug(sm.getString("ReplicationValve.invoke.uri", uri)); sendMessage(session,manager,cluster); } else if(isDoProcessingStats()) nrOfFilterRequests++; } } /** * Send message delta message from request session * @param request current request * @param manager session manager * @param cluster replication cluster */ protected void sendMessage(Session session, ClusterManager manager, CatalinaCluster cluster) { String id = session.getIdInternal(); if (id != null) { send(manager, cluster, id); } } /** * send manager requestCompleted message to cluster * @param manager SessionManager * @param cluster replication cluster * @param sessionId sessionid from the manager * @see DeltaManager#requestCompleted(String) * @see SimpleTcpCluster#send(ClusterMessage) */ protected void send(ClusterManager manager, CatalinaCluster cluster, String sessionId) { ClusterMessage msg = manager.requestCompleted(sessionId); if (msg != null) { if(manager.isSendClusterDomainOnly()) { cluster.sendClusterDomain(msg); } else { cluster.send(msg); } if(isDoProcessingStats()) nrOfSendRequests++; } } /** * check for session invalidations * @param manager * @param cluster */ protected void sendInvalidSessions(ClusterManager manager, CatalinaCluster cluster) { String[] invalidIds=manager.getInvalidatedSessions(); if ( invalidIds.length > 0 ) { for ( int i=0;i<invalidIds.length; i++ ) { try { send(manager,cluster,invalidIds[i]); } catch ( Exception x ) { log.error(sm.getString("ReplicationValve.send.invalid.failure",invalidIds[i]),x); } } } } /** * is request without possible session change * @param uri The request uri * @return True if no session change */ protected boolean isRequestWithoutSessionChange(String uri) { boolean filterfound = false; for (int i = 0; (i < reqFilters.length) && (!filterfound); i++) { java.util.regex.Matcher matcher = reqFilters[i].matcher(uri); filterfound = matcher.matches(); } return filterfound; } /** * protocol cluster replications stats * @param requestTime * @param clusterTime */ protected void updateStats(long requestTime, long clusterTime) { synchronized(this) { lastSendTime=System.currentTimeMillis(); totalSendTime+=lastSendTime - clusterTime; totalRequestTime+=lastSendTime - requestTime; nrOfRequests++; } if(log.isInfoEnabled()) { if ( (nrOfRequests % 100) == 0 ) { log.info(sm.getString("ReplicationValve.stats", new Object[]{ new Long(totalRequestTime/nrOfRequests), new Long(totalSendTime/nrOfRequests), new Long(nrOfRequests), new Long(nrOfSendRequests), new Long(nrOfCrossContextSendRequests), new Long(nrOfFilterRequests), new Long(totalRequestTime), new Long(totalSendTime)})); } } } /** * Mark Request that processed at primary node with attribute * primaryIndicatorName * * @param request * @throws IOException */ protected void createPrimaryIndicator(Request request) throws IOException { String id = request.getRequestedSessionId(); if ((id != null) && (id.length() > 0)) { Manager manager = request.getContext().getManager(); Session session = manager.findSession(id); if (session instanceof ClusterSession) { ClusterSession cses = (ClusterSession) session; if (cses != null) { Boolean isPrimary = new Boolean(cses.isPrimarySession()); if (log.isDebugEnabled()) log.debug(sm.getString( "ReplicationValve.session.indicator", request.getContext().getName(),id, primaryIndicatorName, isPrimary)); request.setAttribute(primaryIndicatorName, isPrimary); } } else { if (log.isDebugEnabled()) { if (session != null) { log.debug(sm.getString( "ReplicationValve.session.found", request.getContext().getName(),id)); } else { log.debug(sm.getString( "ReplicationValve.session.invalid", request.getContext().getName(),id)); } } } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -