frag.java

来自「JGRoups源码」· Java 代码 · 共 563 行 · 第 1/2 页

JAVA
563
字号
    void handleConfigEvent(HashMap map) {        if(map == null) return;        if(map.containsKey("frag_size")) {            frag_size=((Integer)map.get("frag_size")).intValue();            if(log.isDebugEnabled()) log.debug("setting frag_size=" + frag_size);        }    }    /**     * A fragmentation list keeps a list of fragmentation tables     * sorted by an Address ( the sender ).     * This way, if the sender disappears or leaves the group half way     * sending the content, we can simply remove this members fragmentation     * table and clean up the memory of the receiver.     * We do not have to do the same for the sender, since the sender doesn't keep a fragmentation table     */    static class FragmentationList {        /* initialize the hashtable to hold all the fragmentation tables         * 11 is the best growth capacity to start with<br/>         * HashMap<Address,FragmentationTable>         */        private final HashMap frag_tables=new HashMap(11);        /**         * Adds a fragmentation table for this particular sender         * If this sender already has a fragmentation table, an IllegalArgumentException         * will be thrown.         * @param sender - the address of the sender, cannot be null         * @param table  - the fragmentation table of this sender, cannot be null         * @throws IllegalArgumentException if an entry for this sender already exist         */        public void add(Address sender, FragmentationTable table) throws IllegalArgumentException {            FragmentationTable healthCheck;            synchronized(frag_tables) {                healthCheck=(FragmentationTable)frag_tables.get(sender);                if(healthCheck == null) {                    frag_tables.put(sender, table);                }                else {                    throw new IllegalArgumentException("Sender <" + sender + "> already exists in the fragementation list");                }            }        }        /**         * returns a fragmentation table for this sender         * returns null if the sender doesn't have a fragmentation table         * @return the fragmentation table for this sender, or null if no table exist         */        public FragmentationTable get(Address sender) {            synchronized(frag_tables) {                return (FragmentationTable)frag_tables.get(sender);            }        }        /**         * returns true if this sender already holds a         * fragmentation for this sender, false otherwise         * @param sender - the sender, cannot be null         * @return true if this sender already has a fragmentation table         */        public boolean containsSender(Address sender) {            synchronized(frag_tables) {                return frag_tables.containsKey(sender);            }        }        /**         * removes the fragmentation table from the list.         * after this operation, the fragementation list will no longer         * hold a reference to this sender's fragmentation table         * @param sender - the sender who's fragmentation table you wish to remove, cannot be null         * @return true if the table was removed, false if the sender doesn't have an entry         */        public boolean remove(Address sender) {            synchronized(frag_tables) {                boolean result=containsSender(sender);                frag_tables.remove(sender);                return result;            }        }        /**         * returns a list of all the senders that have fragmentation tables opened.         * @return an array of all the senders in the fragmentation list         */        public Address[] getSenders() {            Address[] result;            int index=0;            synchronized(frag_tables) {                result=new Address[frag_tables.size()];                for(Iterator it=frag_tables.keySet().iterator(); it.hasNext();) {                    result[index++]=(Address)it.next();                }            }            return result;        }        public String toString() {            Map.Entry entry;            StringBuffer buf=new StringBuffer("Fragmentation list contains ");            synchronized(frag_tables) {                buf.append(frag_tables.size()).append(" tables\n");                for(Iterator it=frag_tables.entrySet().iterator(); it.hasNext();) {                    entry=(Map.Entry)it.next();                    buf.append(entry.getKey()).append(": " ).append(entry.getValue()).append("\n");                }            }            return buf.toString();        }    }    /**     * Keeps track of the fragments that are received.     * Reassembles fragements into entire messages when all fragments have been received.     * The fragmentation holds a an array of byte arrays for a unique sender     * The first dimension of the array is the order of the fragmentation, in case the arrive out of order     */    static class FragmentationTable {        private final Address sender;        /* the hashtable that holds the fragmentation entries for this sender*/        private final Hashtable h=new Hashtable(11);  // keys: frag_ids, vals: Entrys        FragmentationTable(Address sender) {            this.sender=sender;        }        /**         * inner class represents an entry for a message         * each entry holds an array of byte arrays sorted         * once all the byte buffer entries have been filled         * the fragmentation is considered complete.         */        static class Entry {            //the total number of fragment in this message            int tot_frags=0;            // each fragment is a byte buffer            byte[] fragments[]=null;            //the number of fragments we have received            int number_of_frags_recvd=0;            // the message ID            long msg_id=-1;            /**             * Creates a new entry             *             * @param tot_frags the number of fragments to expect for this message             */            Entry(long msg_id, int tot_frags) {                this.msg_id=msg_id;                this.tot_frags=tot_frags;                fragments=new byte[tot_frags][];                for(int i=0; i < tot_frags; i++) {                    fragments[i]=null;                }            }            /**             * adds on fragmentation buffer to the message             *             * @param frag_id the number of the fragment being added 0..(tot_num_of_frags - 1)             * @param frag    the byte buffer containing the data for this fragmentation, should not be null             */            public void set(int frag_id, byte[] frag) {                fragments[frag_id]=frag;                number_of_frags_recvd++;            }            /**             * returns true if this fragmentation is complete             * ie, all fragmentations have been received for this buffer             */            public boolean isComplete() {                /*first make the simple check*/                if(number_of_frags_recvd < tot_frags) {                    return false;                }                /*then double check just in case*/                for(int i=0; i < fragments.length; i++) {                    if(fragments[i] == null)                        return false;                }                /*all fragmentations have been received*/                return true;            }            /**             * Assembles all the fragmentations into one buffer             * this method does not check if the fragmentation is complete             *             * @return the complete message in one buffer             */            public byte[] assembleBuffer() {                return Util.defragmentBuffer(fragments);            }            /**             * debug only             */            public String toString() {                StringBuffer ret=new StringBuffer();                ret.append("[tot_frags=").append(tot_frags).append(", number_of_frags_recvd=").append(number_of_frags_recvd).append(']');                return ret.toString();            }            public int hashCode() {                return super.hashCode();            }        }        /**         * Creates a new entry if not yet present. Adds the fragment.         * If all fragements for a given message have been received,         * an entire message is reassembled and returned.         * Otherwise null is returned.         *         * @param id        - the message ID, unique for a sender         * @param frag_id   the index of this fragmentation (0..tot_frags-1)         * @param tot_frags the total number of fragmentations expected         * @param fragment  - the byte buffer for this fragment         */        public synchronized byte[] add(long id, int frag_id, int tot_frags, byte[] fragment) {            /*initialize the return value to default not complete */            byte[] retval=null;            Entry e=(Entry)h.get(new Long(id));            if(e == null) {   // Create new entry if not yet present                e=new Entry(id, tot_frags);                h.put(new Long(id), e);            }            e.set(frag_id, fragment);            if(e.isComplete()) {                retval=e.assembleBuffer();                h.remove(new Long(id));            }            return retval;        }        public void reset() {        }        public String toString() {            StringBuffer buf=new StringBuffer("Fragmentation Table Sender:").append(sender).append("\n\t");            java.util.Enumeration e=this.h.elements();            while(e.hasMoreElements()) {                Entry entry=(Entry)e.nextElement();                int count=0;                for(int i=0; i < entry.fragments.length; i++) {                    if(entry.fragments[i] != null) {                        count++;                    }                }                buf.append("Message ID:").append(entry.msg_id).append("\n\t");                buf.append("Total Frags:").append(entry.tot_frags).append("\n\t");                buf.append("Frags Received:").append(count).append("\n\n");            }            return buf.toString();        }    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?