⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 replicationvalve.java

📁 业界著名的tomcat服务器的最新6.0的源代码。
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
            totalstart = System.currentTimeMillis();
        }
        if (primaryIndicator) {
            createPrimaryIndicator(request) ;
        }
        Context context = request.getContext();
        boolean isCrossContext = context != null
                && context instanceof StandardContext
                && ((StandardContext) context).getCrossContext();
        try {
            if(isCrossContext) {
                if(log.isDebugEnabled())
                    log.debug(sm.getString("ReplicationValve.crossContext.add"));
                //FIXME add Pool of Arraylists
                crossContextSessions.set(new ArrayList());
            }
            getNext().invoke(request, response);
            Manager manager = request.getContext().getManager();
            if (manager != null && manager instanceof ClusterManager) {
                ClusterManager clusterManager = (ClusterManager) manager;
                CatalinaCluster containerCluster = (CatalinaCluster) getContainer().getCluster();
                if (containerCluster == null) {
                    if (log.isWarnEnabled())
                        log.warn(sm.getString("ReplicationValve.nocluster"));
                    return;
                }
                // valve cluster can access manager - other cluster handle replication 
                // at host level - hopefully!
                if(containerCluster.getManager(clusterManager.getName()) == null)
                    return ;
                if(containerCluster.hasMembers()) {
                    sendReplicationMessage(request, totalstart, isCrossContext, clusterManager, containerCluster);
                } else {
                    resetReplicationRequest(request,isCrossContext);
                }        
            }
        } finally {
            // Array must be remove: Current master request send endAccess at recycle. 
            // Don't register this request session again!
            if(isCrossContext) {
                if(log.isDebugEnabled())
                    log.debug(sm.getString("ReplicationValve.crossContext.remove"));
                // crossContextSessions.remove() only exist at Java 5
                // register ArrayList at a pool
                crossContextSessions.set(null);
            }
        }
    }

    
    /**
     * reset the active statitics 
     */
    public void resetStatistics() {
        totalRequestTime = 0 ;
        totalSendTime = 0 ;
        lastSendTime = 0 ;
        nrOfFilterRequests = 0 ;
        nrOfRequests = 0 ;
        nrOfSendRequests = 0;
        nrOfCrossContextSendRequests = 0;
    }
    
    /**
     * Return a String rendering of this object.
     */
    public String toString() {

        StringBuffer sb = new StringBuffer("ReplicationValve[");
        if (container != null)
            sb.append(container.getName());
        sb.append("]");
        return (sb.toString());

    }

    // --------------------------------------------------------- Protected Methods

    /**
     * @param request
     * @param totalstart
     * @param isCrossContext
     * @param clusterManager
     * @param containerCluster
     */
    protected void sendReplicationMessage(Request request, long totalstart, boolean isCrossContext, ClusterManager clusterManager, CatalinaCluster containerCluster) {
        //this happens after the request
        long start = 0;
        if(doStatistics()) {
            start = System.currentTimeMillis();
        }
        try {
            // send invalid sessions
            // DeltaManager returns String[0]
            if (!(clusterManager instanceof DeltaManager))
                sendInvalidSessions(clusterManager, containerCluster);
            // send replication
            sendSessionReplicationMessage(request, clusterManager, containerCluster);
            if(isCrossContext)
                sendCrossContextSession(containerCluster);
        } catch (Exception x) {
            // FIXME we have a lot of sends, but the trouble with one node stops the correct replication to other nodes!
            log.error(sm.getString("ReplicationValve.send.failure"), x);
        } finally {
            // FIXME this stats update are not cheap!!
            if(doStatistics()) {
                updateStats(totalstart,start);
            }
        }
    }

    /**
     * Send all changed cross context sessions to backups
     * @param containerCluster
     */
    protected void sendCrossContextSession(CatalinaCluster containerCluster) {
        Object sessions = crossContextSessions.get();
        if(sessions != null && sessions instanceof List
                && ((List)sessions).size() >0) {
            for(Iterator iter = ((List)sessions).iterator(); iter.hasNext() ;) {          
                Session session = (Session)iter.next();
                if(log.isDebugEnabled())
                    log.debug(sm.getString("ReplicationValve.crossContext.sendDelta",  
                            session.getManager().getContainer().getName() ));
                sendMessage(session,(ClusterManager)session.getManager(),containerCluster);
                if(doStatistics()) {
                    nrOfCrossContextSendRequests++;
                }
            }
        }
    }
  
    /**
     * Fix memory leak for long sessions with many changes, when no backup member exists!
     * @param request current request after responce is generated
     * @param isCrossContext check crosscontext threadlocal
     */
    protected void resetReplicationRequest(Request request, boolean isCrossContext) {
        Session contextSession = request.getSessionInternal(false);
        if(contextSession != null & contextSession instanceof DeltaSession){
            resetDeltaRequest(contextSession);
            ((DeltaSession)contextSession).setPrimarySession(true);
        }
        if(isCrossContext) {
            Object sessions = crossContextSessions.get();
            if(sessions != null && sessions instanceof List
               && ((List)sessions).size() >0) {
                Iterator iter = ((List)sessions).iterator();
                for(; iter.hasNext() ;) {          
                    Session session = (Session)iter.next();
                    resetDeltaRequest(session);
                    if(session instanceof DeltaSession)
                        ((DeltaSession)contextSession).setPrimarySession(true);

                }
            }
        }                     
    }

    /**
     * Reset DeltaRequest from session
     * @param session HttpSession from current request or cross context session
     */
    protected void resetDeltaRequest(Session session) {
        if(log.isDebugEnabled()) {
            log.debug(sm.getString("ReplicationValve.resetDeltaRequest" , 
                session.getManager().getContainer().getName() ));
        }
        ((DeltaSession)session).resetDeltaRequest();
    }

    /**
     * Send Cluster Replication Request
     * @param request current request
     * @param manager session manager
     * @param cluster replication cluster
     */
    protected void sendSessionReplicationMessage(Request request,
            ClusterManager manager, CatalinaCluster cluster) {
        Session session = request.getSessionInternal(false);
        if (session != null) {
            String uri = request.getDecodedRequestURI();
            // request without session change
            if (!isRequestWithoutSessionChange(uri)) {
                if (log.isDebugEnabled())
                    log.debug(sm.getString("ReplicationValve.invoke.uri", uri));
                sendMessage(session,manager,cluster);
            } else
                if(doStatistics())
                    nrOfFilterRequests++;
        }

    }

   /**
    * Send message delta message from request session 
    * @param request current request
    * @param manager session manager
    * @param cluster replication cluster
    */
    protected void sendMessage(Session session,
             ClusterManager manager, CatalinaCluster cluster) {
        String id = session.getIdInternal();
        if (id != null) {
            send(manager, cluster, id);
        }
    }

    /**
     * send manager requestCompleted message to cluster
     * @param manager SessionManager
     * @param cluster replication cluster
     * @param sessionId sessionid from the manager
     * @see DeltaManager#requestCompleted(String)
     * @see SimpleTcpCluster#send(ClusterMessage)
     */
    protected void send(ClusterManager manager, CatalinaCluster cluster, String sessionId) {
        ClusterMessage msg = manager.requestCompleted(sessionId);
        if (msg != null) {
            if(manager.doDomainReplication()) {
                cluster.sendClusterDomain(msg);
            } else {
                cluster.send(msg);
            }
            if(doStatistics())
                nrOfSendRequests++;
        }
    }
    
    /**
     * check for session invalidations
     * @param manager
     * @param cluster
     */
    protected void sendInvalidSessions(ClusterManager manager, CatalinaCluster cluster) {
        String[] invalidIds=manager.getInvalidatedSessions();
        if ( invalidIds.length > 0 ) {
            for ( int i=0;i<invalidIds.length; i++ ) {
                try {
                    send(manager,cluster,invalidIds[i]);
                } catch ( Exception x ) {
                    log.error(sm.getString("ReplicationValve.send.invalid.failure",invalidIds[i]),x);
                }
            }
        }
    }
    
    /**
     * is request without possible session change
     * @param uri The request uri
     * @return True if no session change
     */
    protected boolean isRequestWithoutSessionChange(String uri) {

        boolean filterfound = false;

        for (int i = 0; (i < reqFilters.length) && (!filterfound); i++) {
            java.util.regex.Matcher matcher = reqFilters[i].matcher(uri);
            filterfound = matcher.matches();
        }
        return filterfound;
    }

    /**
     * protocol cluster replications stats
     * @param requestTime
     * @param clusterTime
     */
    protected  void updateStats(long requestTime, long clusterTime) {
        synchronized(this) {
            lastSendTime=System.currentTimeMillis();
            totalSendTime+=lastSendTime - clusterTime;
            totalRequestTime+=lastSendTime - requestTime;
            nrOfRequests++;
        }
        if(log.isInfoEnabled()) {
            if ( (nrOfRequests % 100) == 0 ) {
                 log.info(sm.getString("ReplicationValve.stats",
                     new Object[]{
                         new Long(totalRequestTime/nrOfRequests),
                         new Long(totalSendTime/nrOfRequests),
                         new Long(nrOfRequests),
                         new Long(nrOfSendRequests),
                         new Long(nrOfCrossContextSendRequests),
                         new Long(nrOfFilterRequests),
                         new Long(totalRequestTime),
                         new Long(totalSendTime)}));
             }
        }
    }


    /**
     * Mark Request that processed at primary node with attribute
     * primaryIndicatorName
     * 
     * @param request
     * @throws IOException
     */
    protected void createPrimaryIndicator(Request request) throws IOException {
        String id = request.getRequestedSessionId();
        if ((id != null) && (id.length() > 0)) {
            Manager manager = request.getContext().getManager();
            Session session = manager.findSession(id);
            if (session instanceof ClusterSession) {
                ClusterSession cses = (ClusterSession) session;
                if (cses != null) {
                    if (log.isDebugEnabled())
                        log.debug(sm.getString(
                                "ReplicationValve.session.indicator", request.getContext().getName(),id,
                                primaryIndicatorName, cses.isPrimarySession()));
                    request.setAttribute(primaryIndicatorName, cses.isPrimarySession()?Boolean.TRUE:Boolean.FALSE);
                }
            } else {
                if (log.isDebugEnabled()) {
                    if (session != null) {
                        log.debug(sm.getString(
                                "ReplicationValve.session.found", request.getContext().getName(),id));
                    } else {
                        log.debug(sm.getString(
                                "ReplicationValve.session.invalid", request.getContext().getName(),id));
                    }
                }
            }
        }
    }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -