frag2.java

来自「JGRoups源码」· Java 代码 · 共 597 行 · 第 1/2 页

JAVA
597
字号
package org.jgroups.protocols;import org.jgroups.Address;import org.jgroups.Event;import org.jgroups.Message;import org.jgroups.View;import org.jgroups.stack.Protocol;import org.jgroups.util.Range;import org.jgroups.util.Util;import java.util.*;/** * Fragmentation layer. Fragments messages larger than frag_size into smaller packets. * Reassembles fragmented packets into bigger ones. The fragmentation number is prepended * to the messages as a header (and removed at the receiving side).<p> * Each fragment is identified by (a) the sender (part of the message to which the header is appended), * (b) the fragmentation ID (which is unique per FRAG2 layer (monotonically increasing) and (c) the * fragement ID which ranges from 0 to number_of_fragments-1.<p> * Requirement: lossless delivery (e.g. NAK, ACK). No requirement on ordering. Works for both unicast and * multicast messages.<br/> * Compared to FRAG, this protocol does <em>not</em> need to serialize the message in order to break it into * smaller fragments: it looks only at the message's buffer, which is a byte[] array anyway. We assume that the * size addition for headers and src and dest address is minimal when the transport finally has to serialize the * message, so we add a constant (200 bytes). * @author Bela Ban * @version $Id: FRAG2.java,v 1.25 2006/08/23 07:20:12 belaban Exp $ */public class FRAG2 extends Protocol {    /** The max number of bytes in a message. If a message's buffer is bigger, it will be fragmented */    int frag_size=1500;    /** Number of bytes that we think the headers plus src and dest will take up when        message is serialized by transport. This will be subtracted from frag_size */    int overhead=200;    /*the fragmentation list contains a fragmentation table per sender     *this way it becomes easier to clean up if a sender (member) leaves or crashes     */    private final FragmentationList    fragment_list=new FragmentationList();    private int                        curr_id=1;    private final Vector               members=new Vector(11);    private static final String        name="FRAG2";    long num_sent_msgs=0;    long num_sent_frags=0;    long num_received_msgs=0;    long num_received_frags=0;    public final String getName() {        return name;    }    public int getFragSize() {return frag_size;}    public void setFragSize(int s) {frag_size=s;}    public int getOverhead() {return overhead;}    public void setOverhead(int o) {overhead=o;}    public long getNumberOfSentMessages() {return num_sent_msgs;}    public long getNumberOfSentFragments() {return num_sent_frags;}    public long getNumberOfReceivedMessages() {return num_received_msgs;}    public long getNumberOfReceivedFragments() {return num_received_frags;}    synchronized int getNextId() {        return curr_id++;    }    /** Setup the Protocol instance acording to the configuration string */    public boolean setProperties(Properties props) {        String str;        super.setProperties(props);        str=props.getProperty("frag_size");        if(str != null) {            frag_size=Integer.parseInt(str);            props.remove("frag_size");        }        str=props.getProperty("overhead");        if(str != null) {            overhead=Integer.parseInt(str);            props.remove("overhead");        }        int old_frag_size=frag_size;        frag_size-=overhead;        if(frag_size <=0) {            log.error("frag_size=" + old_frag_size + ", overhead=" + overhead +                      ", new frag_size=" + frag_size + ": new frag_size is invalid");            return false;        }        if(log.isInfoEnabled())            log.info("frag_size=" + old_frag_size + ", overhead=" + overhead + ", new frag_size=" + frag_size);        if(props.size() > 0) {            log.error("FRAG2.setProperties(): the following properties are not recognized: " + props);            return false;        }        return true;    }    public void resetStats() {        super.resetStats();        num_sent_msgs=num_sent_frags=num_received_msgs=num_received_frags=0;    }    /**     * Fragment a packet if larger than frag_size (add a header). Otherwise just pass down. Only     * add a header if framentation is needed !     */    public void down(Event evt) {        switch(evt.getType()) {        case Event.MSG:            Message msg=(Message)evt.getArg();            long size=msg.getLength();            synchronized(this) {                num_sent_msgs++;            }            if(size > frag_size) {                if(trace) {                    StringBuffer sb=new StringBuffer("message's buffer size is ");                    sb.append(size).append(", will fragment ").append("(frag_size=");                    sb.append(frag_size).append(')');                    log.trace(sb.toString());                }                fragment(msg);  // Fragment and pass down                return;            }            break;        case Event.VIEW_CHANGE:            //don't do anything if this dude is sending out the view change            //we are receiving a view change,            //in here we check for the            View view=(View)evt.getArg();            Vector new_mbrs=view.getMembers(), left_mbrs;            Address mbr;            left_mbrs=Util.determineLeftMembers(members, new_mbrs);            members.clear();            members.addAll(new_mbrs);            for(int i=0; i < left_mbrs.size(); i++) {                mbr=(Address)left_mbrs.elementAt(i);                //the new view doesn't contain the sender, he must have left,                //hence we will clear all his fragmentation tables                fragment_list.remove(mbr);                if(trace) log.trace("[VIEW_CHANGE] removed " + mbr + " from fragmentation table");            }            break;        case Event.CONFIG:            passDown(evt);            if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg());            handleConfigEvent((HashMap)evt.getArg());            return;        }        passDown(evt);  // Pass on to the layer below us    }    /**     * If event is a message, if it is fragmented, re-assemble fragments into big message and pass up     * the stack.     */    public void up(Event evt) {        switch(evt.getType()) {        case Event.MSG:            Message msg=(Message)evt.getArg();            Object obj=msg.getHeader(name);            if(obj != null && obj instanceof FragHeader) { // needs to be defragmented                unfragment(msg); // Unfragment and possibly pass up                return;            }            else {                num_received_msgs++;            }            break;        case Event.CONFIG:            passUp(evt);            if(log.isInfoEnabled()) log.info("received CONFIG event: " + evt.getArg());            handleConfigEvent((HashMap)evt.getArg());            return;        }        passUp(evt); // Pass up to the layer above us by default    }    /** Send all fragments as separate messages (with same ID !).     Example:     <pre>     Given the generated ID is 2344, number of fragments=3, message {dst,src,buf}     would be fragmented into:     [2344,3,0]{dst,src,buf1},     [2344,3,1]{dst,src,buf2} and     [2344,3,2]{dst,src,buf3}     </pre>     */    void fragment(Message msg) {        byte[]             buffer;        List               fragments;        Event              evt;        FragHeader         hdr;        Message            frag_msg;        Address            dest=msg.getDest();        long               id=getNextId(); // used as seqnos        int                num_frags;        StringBuffer       sb;        Range              r;        try {            buffer=msg.getBuffer();            fragments=Util.computeFragOffsets(buffer, frag_size);            num_frags=fragments.size();            synchronized(this) {                num_sent_frags+=num_frags;            }            if(trace) {                sb=new StringBuffer("fragmenting packet to ");                sb.append((dest != null ? dest.toString() : "<all members>")).append(" (size=").append(buffer.length);                sb.append(") into ").append(num_frags).append(" fragment(s) [frag_size=").append(frag_size).append(']');                log.trace(sb.toString());            }            for(int i=0; i < fragments.size(); i++) {                r=(Range)fragments.get(i);                // Copy the original msg (needed because we need to copy the headers too)                frag_msg=msg.copy(false); // don't copy the buffer, only src, dest and headers                frag_msg.setBuffer(buffer, (int)r.low, (int)r.high);                hdr=new FragHeader(id, i, num_frags);                frag_msg.putHeader(name, hdr);                evt=new Event(Event.MSG, frag_msg);                passDown(evt);            }        }        catch(Exception e) {            if(log.isErrorEnabled()) log.error("fragmentation failure", e);        }    }    /**     1. Get all the fragment buffers     2. When all are received -> Assemble them into one big buffer     3. Read headers and byte buffer from big buffer     4. Set headers and buffer in msg     5. Pass msg up the stack     */    void unfragment(Message msg) {        FragmentationTable frag_table;        Address            sender=msg.getSrc();        Message            assembled_msg;        FragHeader         hdr=(FragHeader)msg.removeHeader(name);        frag_table=fragment_list.get(sender);        if(frag_table == null) {            frag_table=new FragmentationTable(sender);            try {                fragment_list.add(sender, frag_table);            }            catch(IllegalArgumentException x) { // the entry has already been added, probably in parallel from another thread                frag_table=fragment_list.get(sender);            }        }        num_received_frags++;        assembled_msg=frag_table.add(hdr.id, hdr.frag_id, hdr.num_frags, msg);        if(assembled_msg != null) {            try {                if(trace) log.trace("assembled_msg is " + assembled_msg);                assembled_msg.setSrc(sender); // needed ? YES, because fragments have a null src !!                num_received_msgs++;                passUp(new Event(Event.MSG, assembled_msg));            }            catch(Exception e) {                if(log.isErrorEnabled()) log.error("unfragmentation failed", e);            }        }    }    void handleConfigEvent(HashMap map) {        if(map == null) return;        if(map.containsKey("frag_size")) {            frag_size=((Integer)map.get("frag_size")).intValue();            if(log.isDebugEnabled()) log.debug("setting frag_size=" + frag_size);

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?