⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 orderinterceptor.java

📁 精通tomcat书籍原代码,希望大家共同学习
💻 JAVA
字号:
/* * Copyright 1999,2004 The Apache Software Foundation. *  * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at *  *      http://www.apache.org/licenses/LICENSE-2.0 *  * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and */package org.apache.catalina.tribes.group.interceptors;import java.util.HashMap;import org.apache.catalina.tribes.ChannelException;import org.apache.catalina.tribes.ChannelMessage;import org.apache.catalina.tribes.Member;import org.apache.catalina.tribes.group.ChannelInterceptorBase;import org.apache.catalina.tribes.group.InterceptorPayload;import org.apache.catalina.tribes.io.XByteBuffer;/** * * The order interceptor guarantees that messages are received in the same order they were  * sent. * This interceptor works best with the ack=true setting. <br> * There is no point in  * using this with the replicationMode="fastasynchqueue" as this mode guarantees ordering.<BR> * If you are using the mode ack=false replicationMode=pooled, and have a lot of concurrent threads, * this interceptor can really slow you down, as many messages will be completely out of order * and the queue might become rather large. If this is the case, then you might want to set  * the value OrderInterceptor.maxQueue = 25 (meaning that we will never keep more than 25 messages in our queue) * <br><b>Configuration Options</b><br> * OrderInteceptor.expire=<milliseconds> - if a message arrives out of order, how long before we act on it <b>default=3000ms</b><br> * OrderInteceptor.maxQueue=<max queue size> - how much can the queue grow to ensure ordering.  *   This setting is useful to avoid OutOfMemoryErrors<b>default=Integer.MAX_VALUE</b><br> * OrderInterceptor.forwardExpired=<boolean> - this flag tells the interceptor what to  * do when a message has expired or the queue has grown larger than the maxQueue value. * true means that the message is sent up the stack to the receiver that will receive and out of order message * false means, forget the message and reset the message counter. <b>default=true</b> *  *  * @author Filip Hanik * @version 1.0 */public class OrderInterceptor extends ChannelInterceptorBase {    private HashMap outcounter = new HashMap();    private HashMap incounter = new HashMap();    private HashMap incoming = new HashMap();    private long expire = 3000;    private boolean forwardExpired = true;    private int maxQueue = Integer.MAX_VALUE;    public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {        for ( int i=0; i<destination.length; i++ ) {            int nr = incCounter(destination[i]);            //reduce byte copy            msg.getMessage().append(nr);            try {                getNext().sendMessage(new Member[] {destination[i]}, msg, payload);            }finally {                msg.getMessage().trim(4);            }        }    }    public void messageReceived(ChannelMessage msg) {        int msgnr = XByteBuffer.toInt(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-4);        msg.getMessage().trim(4);        MessageOrder order = new MessageOrder(msgnr,msg);        if ( processIncoming(order) ) processLeftOvers(msg.getAddress(),false);    }        public synchronized void processLeftOvers(Member member, boolean force) {        MessageOrder tmp = (MessageOrder)incoming.get(member);        if ( force ) {            Counter cnt = getInCounter(member);            cnt.setCounter(Integer.MAX_VALUE);        }        if ( tmp!= null ) processIncoming(tmp);    }    /**     *      * @param order MessageOrder     * @return boolean - true if a message expired and was processed     */    public synchronized boolean processIncoming(MessageOrder order) {        boolean result = false;        Member member = order.getMessage().getAddress();        Counter cnt = getInCounter(member);                MessageOrder tmp = (MessageOrder)incoming.get(member);        if ( tmp != null ) {            order = MessageOrder.add(tmp,order);        }                        while ( (order!=null) && (order.getMsgNr() <= cnt.getCounter())  ) {            //we are right on target. process orders            if ( order.getMsgNr() == cnt.getCounter() ) cnt.inc();            else if ( order.getMsgNr() > cnt.getCounter() ) cnt.setCounter(order.getMsgNr());            super.messageReceived(order.getMessage());            order.setMessage(null);            order = order.next;        }        MessageOrder head = order;        MessageOrder prev = null;        tmp = order;        //flag to empty out the queue when it larger than maxQueue        boolean empty = order!=null?order.getCount()>=maxQueue:false;        while ( tmp != null ) {            //process expired messages or empty out the queue            if ( tmp.isExpired(expire) || empty ) {                //reset the head                if ( tmp == head ) head = tmp.next;                cnt.setCounter(tmp.getMsgNr()+1);                if ( getForwardExpired() ) super.messageReceived(tmp.getMessage());                tmp.setMessage(null);                tmp = tmp.next;                if ( prev != null ) prev.next = tmp;                  result = true;            } else {                prev = tmp;                tmp = tmp.next;            }        }        if ( head == null ) incoming.remove(member);        else incoming.put(member, head);        return result;    }        public void memberAdded(Member member) {        //notify upwards        getInCounter(member);        getOutCounter(member);        super.memberAdded(member);    }    public void memberDisappeared(Member member) {        //notify upwards        outcounter.remove(member);        incounter.remove(member);        //clear the remaining queue        processLeftOvers(member,true);        super.memberDisappeared(member);    }        public int incCounter(Member mbr) {         Counter cnt = getOutCounter(mbr);        return cnt.inc();    }        public synchronized Counter getInCounter(Member mbr) {        Counter cnt = (Counter)incounter.get(mbr);        if ( cnt == null ) {            cnt = new Counter();            cnt.inc(); //always start at 1 for incoming            incounter.put(mbr,cnt);        }        return cnt;    }    public synchronized Counter getOutCounter(Member mbr) {        Counter cnt = (Counter)outcounter.get(mbr);        if ( cnt == null ) {            cnt = new Counter();            outcounter.put(mbr,cnt);        }        return cnt;    }    public static class Counter {        private int value = 0;                public int getCounter() {            return value;        }                public synchronized void setCounter(int counter) {            this.value = counter;        }                public synchronized int inc() {            return ++value;        }    }        public static class MessageOrder {        private long received = System.currentTimeMillis();        private MessageOrder next;        private int msgNr;        private ChannelMessage msg = null;        public MessageOrder(int msgNr,ChannelMessage msg) {            this.msgNr = msgNr;            this.msg = msg;        }                public boolean isExpired(long expireTime) {            return (System.currentTimeMillis()-received) > expireTime;        }                public ChannelMessage getMessage() {            return msg;        }                public void setMessage(ChannelMessage msg) {            this.msg = msg;        }                public void setNext(MessageOrder order) {            this.next = order;        }        public MessageOrder getNext() {            return next;        }                public int getCount() {            int counter = 1;            MessageOrder tmp = next;            while ( tmp != null ) {                counter++;                tmp = tmp.next;            }            return counter;        }                public static MessageOrder add(MessageOrder head, MessageOrder add) {            if ( head == null ) return add;            if ( add == null ) return head;            if ( head == add ) return add;            if ( head.getMsgNr() > add.getMsgNr() ) {                add.next = head;                return add;            }                        MessageOrder iter = head;            MessageOrder prev = null;            while ( iter.getMsgNr() < add.getMsgNr() && (iter.next !=null ) ) {                prev = iter;                iter = iter.next;            }            if ( iter.getMsgNr() < add.getMsgNr() ) {                //add after                add.next = iter.next;                iter.next = add;            } else if (iter.getMsgNr() > add.getMsgNr()) {                //add before                prev.next = add;                add.next = iter;                            } else {                throw new ArithmeticException("Message added has the same counter, synchronization bug. Disable the order interceptor");            }                        return head;        }                public int getMsgNr() {            return msgNr;        }                            }    public void setExpire(long expire) {        this.expire = expire;    }    public void setForwardExpired(boolean forwardExpired) {        this.forwardExpired = forwardExpired;    }    public void setMaxQueue(int maxQueue) {        this.maxQueue = maxQueue;    }    public long getExpire() {        return expire;    }    public boolean getForwardExpired() {        return forwardExpired;    }    public int getMaxQueue() {        return maxQueue;    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -