⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 replicationvalve.java

📁 精通tomcat书籍原代码,希望大家共同学习
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
        }        if (primaryIndicator) {            createPrimaryIndicator(request) ;        }        Context context = request.getContext();        boolean isCrossContext = context != null                && context instanceof StandardContext                && ((StandardContext) context).getCrossContext();        try {            if(isCrossContext) {                if(log.isDebugEnabled())                    log.debug(sm.getString("ReplicationValve.crossContext.add"));                //FIXME add Pool of Arraylists                crossContextSessions.set(new ArrayList());            }            getNext().invoke(request, response);            Manager manager = request.getContext().getManager();            if (manager != null && manager instanceof ClusterManager) {                ClusterManager clusterManager = (ClusterManager) manager;                CatalinaCluster containerCluster = (CatalinaCluster) getContainer().getCluster();                if (containerCluster == null) {                    if (log.isWarnEnabled())                        log.warn(sm.getString("ReplicationValve.nocluster"));                    return;                }                // valve cluster can access manager - other cluster handle replication                 // at host level - hopefully!                if(containerCluster.getManager(clusterManager.getName()) == null)                    return ;                if(containerCluster.hasMembers()) {                    sendReplicationMessage(request, totalstart, isCrossContext, clusterManager, containerCluster);                } else {                    resetReplicationRequest(request,isCrossContext);                }                    }        } finally {            // Array must be remove: Current master request send endAccess at recycle.             // Don't register this request session again!            if(isCrossContext) {                if(log.isDebugEnabled())                    log.debug(sm.getString("ReplicationValve.crossContext.remove"));                // crossContextSessions.remove() only exist at Java 5                // register ArrayList at a pool                crossContextSessions.set(null);            }        }    }        /**     * reset the active statitics      */    public void resetStatistics() {        totalRequestTime = 0 ;        totalSendTime = 0 ;        lastSendTime = 0 ;        nrOfFilterRequests = 0 ;        nrOfRequests = 0 ;        nrOfSendRequests = 0;        nrOfCrossContextSendRequests = 0;    }        /**     * Return a String rendering of this object.     */    public String toString() {        StringBuffer sb = new StringBuffer("ReplicationValve[");        if (container != null)            sb.append(container.getName());        sb.append("]");        return (sb.toString());    }    // --------------------------------------------------------- Protected Methods    /**     * @param request     * @param totalstart     * @param isCrossContext     * @param clusterManager     * @param containerCluster     */    protected void sendReplicationMessage(Request request, long totalstart, boolean isCrossContext, ClusterManager clusterManager, CatalinaCluster containerCluster) {        //this happens after the request        long start = 0;        if(isDoProcessingStats()) {            start = System.currentTimeMillis();        }        try {            // send invalid sessions            // DeltaManager returns String[0]            if (!(clusterManager instanceof DeltaManager))                sendInvalidSessions(clusterManager, containerCluster);            // send replication            sendSessionReplicationMessage(request, clusterManager, containerCluster);            if(isCrossContext)                sendCrossContextSession(containerCluster);        } catch (Exception x) {            // FIXME we have a lot of sends, but the trouble with one node stops the correct replication to other nodes!            log.error(sm.getString("ReplicationValve.send.failure"), x);        } finally {            // FIXME this stats update are not cheap!!            if(isDoProcessingStats()) {                updateStats(totalstart,start);            }        }    }    /**     * Send all changed cross context sessions to backups     * @param containerCluster     */    protected void sendCrossContextSession(CatalinaCluster containerCluster) {        Object sessions = crossContextSessions.get();        if(sessions != null && sessions instanceof List                && ((List)sessions).size() >0) {            for(Iterator iter = ((List)sessions).iterator(); iter.hasNext() ;) {                          Session session = (Session)iter.next();                if(log.isDebugEnabled())                    log.debug(sm.getString("ReplicationValve.crossContext.sendDelta",                              session.getManager().getContainer().getName() ));                sendMessage(session,(ClusterManager)session.getManager(),containerCluster);                if(isDoProcessingStats()) {                    nrOfCrossContextSendRequests++;                }            }        }    }      /**     * Fix memory leak for long sessions with many changes, when no backup member exists!     * @param request current request after responce is generated     * @param isCrossContext check crosscontext threadlocal     */    protected void resetReplicationRequest(Request request, boolean isCrossContext) {        Session contextSession = request.getSessionInternal(false);        if(contextSession != null & contextSession instanceof DeltaSession){            resetDeltaRequest(contextSession);            ((DeltaSession)contextSession).setPrimarySession(true);        }        if(isCrossContext) {            Object sessions = crossContextSessions.get();            if(sessions != null && sessions instanceof List               && ((List)sessions).size() >0) {                Iterator iter = ((List)sessions).iterator();                for(; iter.hasNext() ;) {                              Session session = (Session)iter.next();                    resetDeltaRequest(session);                    if(session instanceof DeltaSession)                        ((DeltaSession)contextSession).setPrimarySession(true);                }            }        }                         }    /**     * Reset DeltaRequest from session     * @param session HttpSession from current request or cross context session     */    protected void resetDeltaRequest(Session session) {        if(log.isDebugEnabled()) {            log.debug(sm.getString("ReplicationValve.resetDeltaRequest" ,                 session.getManager().getContainer().getName() ));        }        ((DeltaSession)session).resetDeltaRequest();    }    /**     * Send Cluster Replication Request     * @param request current request     * @param manager session manager     * @param cluster replication cluster     */    protected void sendSessionReplicationMessage(Request request,            ClusterManager manager, CatalinaCluster cluster) {        Session session = request.getSessionInternal(false);        if (session != null) {            String uri = request.getDecodedRequestURI();            // request without session change            if (!isRequestWithoutSessionChange(uri)) {                if (log.isDebugEnabled())                    log.debug(sm.getString("ReplicationValve.invoke.uri", uri));                sendMessage(session,manager,cluster);            } else                if(isDoProcessingStats())                    nrOfFilterRequests++;        }    }   /**    * Send message delta message from request session     * @param request current request    * @param manager session manager    * @param cluster replication cluster    */    protected void sendMessage(Session session,             ClusterManager manager, CatalinaCluster cluster) {        String id = session.getIdInternal();        if (id != null) {            send(manager, cluster, id);        }    }    /**     * send manager requestCompleted message to cluster     * @param manager SessionManager     * @param cluster replication cluster     * @param sessionId sessionid from the manager     * @see DeltaManager#requestCompleted(String)     * @see SimpleTcpCluster#send(ClusterMessage)     */    protected void send(ClusterManager manager, CatalinaCluster cluster, String sessionId) {        ClusterMessage msg = manager.requestCompleted(sessionId);        if (msg != null) {            if(manager.isSendClusterDomainOnly()) {                cluster.sendClusterDomain(msg);            } else {                cluster.send(msg);            }            if(isDoProcessingStats())                nrOfSendRequests++;        }    }        /**     * check for session invalidations     * @param manager     * @param cluster     */    protected void sendInvalidSessions(ClusterManager manager, CatalinaCluster cluster) {        String[] invalidIds=manager.getInvalidatedSessions();        if ( invalidIds.length > 0 ) {            for ( int i=0;i<invalidIds.length; i++ ) {                try {                    send(manager,cluster,invalidIds[i]);                } catch ( Exception x ) {                    log.error(sm.getString("ReplicationValve.send.invalid.failure",invalidIds[i]),x);                }            }        }    }        /**     * is request without possible session change     * @param uri The request uri     * @return True if no session change     */    protected boolean isRequestWithoutSessionChange(String uri) {        boolean filterfound = false;        for (int i = 0; (i < reqFilters.length) && (!filterfound); i++) {            java.util.regex.Matcher matcher = reqFilters[i].matcher(uri);            filterfound = matcher.matches();        }        return filterfound;    }    /**     * protocol cluster replications stats     * @param requestTime     * @param clusterTime     */    protected  void updateStats(long requestTime, long clusterTime) {        synchronized(this) {            lastSendTime=System.currentTimeMillis();            totalSendTime+=lastSendTime - clusterTime;            totalRequestTime+=lastSendTime - requestTime;            nrOfRequests++;        }        if(log.isInfoEnabled()) {            if ( (nrOfRequests % 100) == 0 ) {                 log.info(sm.getString("ReplicationValve.stats",                     new Object[]{                         new Long(totalRequestTime/nrOfRequests),                         new Long(totalSendTime/nrOfRequests),                         new Long(nrOfRequests),                         new Long(nrOfSendRequests),                         new Long(nrOfCrossContextSendRequests),                         new Long(nrOfFilterRequests),                         new Long(totalRequestTime),                         new Long(totalSendTime)}));             }        }    }    /**     * Mark Request that processed at primary node with attribute     * primaryIndicatorName     *      * @param request     * @throws IOException     */    protected void createPrimaryIndicator(Request request) throws IOException {        String id = request.getRequestedSessionId();        if ((id != null) && (id.length() > 0)) {            Manager manager = request.getContext().getManager();            Session session = manager.findSession(id);            if (session instanceof ClusterSession) {                ClusterSession cses = (ClusterSession) session;                if (cses != null) {                    Boolean isPrimary = new Boolean(cses.isPrimarySession());                    if (log.isDebugEnabled())                        log.debug(sm.getString(                                "ReplicationValve.session.indicator", request.getContext().getName(),id,                                primaryIndicatorName, isPrimary));                    request.setAttribute(primaryIndicatorName, isPrimary);                }            } else {                if (log.isDebugEnabled()) {                    if (session != null) {                        log.debug(sm.getString(                                "ReplicationValve.session.found", request.getContext().getName(),id));                    } else {                        log.debug(sm.getString(                                "ReplicationValve.session.invalid", request.getContext().getName(),id));                    }                }            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -