frag.java

来自「JGRoups源码」· Java 代码 · 共 563 行 · 第 1/2 页

JAVA
563
字号
// $Id: FRAG.java,v 1.32 2006/10/11 14:39:41 belaban Exp $package org.jgroups.protocols;import org.jgroups.Address;import org.jgroups.Event;import org.jgroups.Message;import org.jgroups.View;import org.jgroups.stack.Protocol;import org.jgroups.util.ExposedByteArrayOutputStream;import org.jgroups.util.Util;import java.io.ByteArrayInputStream;import java.io.DataInputStream;import java.io.DataOutputStream;import java.util.*;/** * Fragmentation layer. Fragments messages larger than FRAG_SIZE into smaller packets. * Reassembles fragmented packets into bigger ones. The fragmentation number is prepended * to the messages as a header (and removed at the receiving side).<p> * Each fragment is identified by (a) the sender (part of the message to which the header is appended), * (b) the fragmentation ID (which is unique per FRAG layer (monotonically increasing) and (c) the * fragement ID which ranges from 0 to number_of_fragments-1.<p> * Requirement: lossless delivery (e.g. NAK, ACK). No requirement on ordering. Works for both unicast and * multicast messages. * @author Bela Ban * @author Filip Hanik * @version $Id: FRAG.java,v 1.32 2006/10/11 14:39:41 belaban Exp $ */public class FRAG extends Protocol {    private int frag_size=8192;  // conservative value    /*the fragmentation list contains a fragmentation table per sender     *this way it becomes easier to clean up if a sender (member) leaves or crashes     */    private final FragmentationList     fragment_list=new FragmentationList();    private int                         curr_id=1;    private final ExposedByteArrayOutputStream bos=new ExposedByteArrayOutputStream(1024);    private final Vector                members=new Vector(11);    private final static String         name="FRAG";    long num_sent_msgs=0;    long num_sent_frags=0;    long num_received_msgs=0;    long num_received_frags=0;    public String getName() {        return name;    }    public int getFragSize() {return frag_size;}    public void setFragSize(int s) {frag_size=s;}    public long getNumberOfSentMessages() {return num_sent_msgs;}    public long getNumberOfSentFragments() {return num_sent_frags;}    public long getNumberOfReceivedMessages() {return num_received_msgs;}    public long getNumberOfReceivedFragments() {return num_received_frags;}    /**     * Setup the Protocol instance acording to the configuration string     */    public boolean setProperties(Properties props) {        String str;                super.setProperties(props);        str=props.getProperty("frag_size");        if(str != null) {            frag_size=Integer.parseInt(str);            props.remove("frag_size");        }        if(props.size() > 0) {            log.error("FRAG.setProperties(): the following properties are not recognized: " + props);            return false;        }        return true;    }    public void resetStats() {        super.resetStats();        num_sent_msgs=num_sent_frags=num_received_msgs=num_received_frags=0;    }    /**     * Fragment a packet if larger than frag_size (add a header). Otherwise just pass down. Only     * add a header if framentation is needed !     */    public void down(Event evt) {        switch(evt.getType()) {        case Event.MSG:            Message msg=(Message)evt.getArg();            long size=msg.size();            num_sent_msgs++;            if(size > frag_size) {                if(trace) {                    StringBuffer sb=new StringBuffer("message size is ");                    sb.append(size).append(", will fragment (frag_size=").append(frag_size).append(')');                    log.trace(sb.toString());                }                fragment(msg);  // Fragment and pass down                return;            }            break;        case Event.VIEW_CHANGE:            //don't do anything if this dude is sending out the view change            //we are receiving a view change,            //in here we check for the            View view=(View)evt.getArg();            Vector new_mbrs=view.getMembers(), left_mbrs;            Address mbr;            left_mbrs=Util.determineLeftMembers(members, new_mbrs);            members.clear();            members.addAll(new_mbrs);            for(int i=0; i < left_mbrs.size(); i++) {                mbr=(Address)left_mbrs.elementAt(i);                //the new view doesn't contain the sender, he must have left,                //hence we will clear all his fragmentation tables                fragment_list.remove(mbr);                if(trace)                    log.trace("[VIEW_CHANGE] removed " + mbr + " from fragmentation table");            }            break;        case Event.CONFIG:            passDown(evt);            if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg());            handleConfigEvent((HashMap)evt.getArg());            return;        }        passDown(evt);  // Pass on to the layer below us    }    /**     * If event is a message, if it is fragmented, re-assemble fragments into big message and pass up the stack.     */    public void up(Event evt) {        switch(evt.getType()) {        case Event.MSG:            Message msg=(Message)evt.getArg();            Object obj=msg.getHeader(name);            if(obj != null && obj instanceof FragHeader) { // needs to be defragmented                unfragment(msg); // Unfragment and possibly pass up                return;            }            else {                num_received_msgs++;            }            break;        case Event.CONFIG:            passUp(evt);            if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg());            handleConfigEvent((HashMap)evt.getArg());            return;        }        passUp(evt); // Pass up to the layer above us by default    }    /**     * Send all fragments as separate messages (with same ID !).     * Example:     * <pre>     * Given the generated ID is 2344, number of fragments=3, message {dst,src,buf}     * would be fragmented into:     * <p/>     * [2344,3,0]{dst,src,buf1},     * [2344,3,1]{dst,src,buf2} and     * [2344,3,2]{dst,src,buf3}     * </pre>     */    private void fragment(Message msg) {        DataOutputStream   out=null;        byte[]             buffer;        byte[]             fragments[];        Event              evt;        FragHeader         hdr;        Message            frag_msg;        Address            dest=msg.getDest(), src=msg.getSrc();        long               id=curr_id++; // used as seqnos        int                num_frags;        int size;        try {            // Write message into a byte buffer and fragment it            // Synchronization around bos is needed for concurrent access (http://jira.jboss.com/jira/browse/JGRP-215)            synchronized(bos) {                bos.reset();                out=new DataOutputStream(bos);                msg.writeTo(out);                out.flush();                buffer=bos.getRawBuffer();                fragments=Util.fragmentBuffer(buffer, frag_size, bos.size());            }            num_frags=fragments.length;            num_sent_frags+=num_frags;            if(trace) {                StringBuffer sb=new StringBuffer();                sb.append("fragmenting packet to ").append(dest != null ? dest.toString() : "<all members>");                sb.append(" (size=").append(buffer.length).append(") into ").append(num_frags);                sb.append(" fragment(s) [frag_size=").append(frag_size).append(']');                log.trace(sb.toString());            }            for(int i=0; i < num_frags; i++) {                frag_msg=new Message(dest, src, fragments[i]);                hdr=new FragHeader(id, i, num_frags);                frag_msg.putHeader(name, hdr);                evt=new Event(Event.MSG, frag_msg);                passDown(evt);            }        }        catch(Exception e) {            log.error("exception occurred trying to fragment message", e);        }        finally {            Util.close(out);        }    }    /**     * 1. Get all the fragment buffers     * 2. When all are received -> Assemble them into one big buffer     * 3. Read headers and byte buffer from big buffer     * 4. Set headers and buffer in msg     * 5. Pass msg up the stack     */    private void unfragment(Message msg) {        FragmentationTable   frag_table;        Address              sender=msg.getSrc();        Message              assembled_msg;        FragHeader           hdr=(FragHeader)msg.removeHeader(name);        byte[]               m;        ByteArrayInputStream bis;        DataInputStream      in=null;        frag_table=fragment_list.get(sender);        if(frag_table == null) {            frag_table=new FragmentationTable(sender);            try {                fragment_list.add(sender, frag_table);            }            catch(IllegalArgumentException x) { // the entry has already been added, probably in parallel from another thread                frag_table=fragment_list.get(sender);            }        }        num_received_frags++;        m=frag_table.add(hdr.id, hdr.frag_id, hdr.num_frags, msg.getBuffer());        if(m != null) {            try {                bis=new ByteArrayInputStream(m);                in=new DataInputStream(bis);                assembled_msg=new Message(false);                assembled_msg.readFrom(in);                if(trace) log.trace("assembled_msg is " + assembled_msg);                assembled_msg.setSrc(sender); // needed ? YES, because fragments have a null src !!                num_received_msgs++;                passUp(new Event(Event.MSG, assembled_msg));            }            catch(Exception e) {                log.error("failed unfragmenting a message", e);            }            finally {                Util.close(in);            }        }    }

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?