frag2.java
来自「JGRoups源码」· Java 代码 · 共 597 行 · 第 1/2 页
JAVA
597 行
package org.jgroups.protocols;import org.jgroups.Address;import org.jgroups.Event;import org.jgroups.Message;import org.jgroups.View;import org.jgroups.stack.Protocol;import org.jgroups.util.Range;import org.jgroups.util.Util;import java.util.*;/** * Fragmentation layer. Fragments messages larger than frag_size into smaller packets. * Reassembles fragmented packets into bigger ones. The fragmentation number is prepended * to the messages as a header (and removed at the receiving side).<p> * Each fragment is identified by (a) the sender (part of the message to which the header is appended), * (b) the fragmentation ID (which is unique per FRAG2 layer (monotonically increasing) and (c) the * fragement ID which ranges from 0 to number_of_fragments-1.<p> * Requirement: lossless delivery (e.g. NAK, ACK). No requirement on ordering. Works for both unicast and * multicast messages.<br/> * Compared to FRAG, this protocol does <em>not</em> need to serialize the message in order to break it into * smaller fragments: it looks only at the message's buffer, which is a byte[] array anyway. We assume that the * size addition for headers and src and dest address is minimal when the transport finally has to serialize the * message, so we add a constant (200 bytes). * @author Bela Ban * @version $Id: FRAG2.java,v 1.25 2006/08/23 07:20:12 belaban Exp $ */public class FRAG2 extends Protocol { /** The max number of bytes in a message. If a message's buffer is bigger, it will be fragmented */ int frag_size=1500; /** Number of bytes that we think the headers plus src and dest will take up when message is serialized by transport. This will be subtracted from frag_size */ int overhead=200; /*the fragmentation list contains a fragmentation table per sender *this way it becomes easier to clean up if a sender (member) leaves or crashes */ private final FragmentationList fragment_list=new FragmentationList(); private int curr_id=1; private final Vector members=new Vector(11); private static final String name="FRAG2"; long num_sent_msgs=0; long num_sent_frags=0; long num_received_msgs=0; long num_received_frags=0; public final String getName() { return name; } public int getFragSize() {return frag_size;} public void setFragSize(int s) {frag_size=s;} public int getOverhead() {return overhead;} public void setOverhead(int o) {overhead=o;} public long getNumberOfSentMessages() {return num_sent_msgs;} public long getNumberOfSentFragments() {return num_sent_frags;} public long getNumberOfReceivedMessages() {return num_received_msgs;} public long getNumberOfReceivedFragments() {return num_received_frags;} synchronized int getNextId() { return curr_id++; } /** Setup the Protocol instance acording to the configuration string */ public boolean setProperties(Properties props) { String str; super.setProperties(props); str=props.getProperty("frag_size"); if(str != null) { frag_size=Integer.parseInt(str); props.remove("frag_size"); } str=props.getProperty("overhead"); if(str != null) { overhead=Integer.parseInt(str); props.remove("overhead"); } int old_frag_size=frag_size; frag_size-=overhead; if(frag_size <=0) { log.error("frag_size=" + old_frag_size + ", overhead=" + overhead + ", new frag_size=" + frag_size + ": new frag_size is invalid"); return false; } if(log.isInfoEnabled()) log.info("frag_size=" + old_frag_size + ", overhead=" + overhead + ", new frag_size=" + frag_size); if(props.size() > 0) { log.error("FRAG2.setProperties(): the following properties are not recognized: " + props); return false; } return true; } public void resetStats() { super.resetStats(); num_sent_msgs=num_sent_frags=num_received_msgs=num_received_frags=0; } /** * Fragment a packet if larger than frag_size (add a header). Otherwise just pass down. Only * add a header if framentation is needed ! */ public void down(Event evt) { switch(evt.getType()) { case Event.MSG: Message msg=(Message)evt.getArg(); long size=msg.getLength(); synchronized(this) { num_sent_msgs++; } if(size > frag_size) { if(trace) { StringBuffer sb=new StringBuffer("message's buffer size is "); sb.append(size).append(", will fragment ").append("(frag_size="); sb.append(frag_size).append(')'); log.trace(sb.toString()); } fragment(msg); // Fragment and pass down return; } break; case Event.VIEW_CHANGE: //don't do anything if this dude is sending out the view change //we are receiving a view change, //in here we check for the View view=(View)evt.getArg(); Vector new_mbrs=view.getMembers(), left_mbrs; Address mbr; left_mbrs=Util.determineLeftMembers(members, new_mbrs); members.clear(); members.addAll(new_mbrs); for(int i=0; i < left_mbrs.size(); i++) { mbr=(Address)left_mbrs.elementAt(i); //the new view doesn't contain the sender, he must have left, //hence we will clear all his fragmentation tables fragment_list.remove(mbr); if(trace) log.trace("[VIEW_CHANGE] removed " + mbr + " from fragmentation table"); } break; case Event.CONFIG: passDown(evt); if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg()); handleConfigEvent((HashMap)evt.getArg()); return; } passDown(evt); // Pass on to the layer below us } /** * If event is a message, if it is fragmented, re-assemble fragments into big message and pass up * the stack. */ public void up(Event evt) { switch(evt.getType()) { case Event.MSG: Message msg=(Message)evt.getArg(); Object obj=msg.getHeader(name); if(obj != null && obj instanceof FragHeader) { // needs to be defragmented unfragment(msg); // Unfragment and possibly pass up return; } else { num_received_msgs++; } break; case Event.CONFIG: passUp(evt); if(log.isInfoEnabled()) log.info("received CONFIG event: " + evt.getArg()); handleConfigEvent((HashMap)evt.getArg()); return; } passUp(evt); // Pass up to the layer above us by default } /** Send all fragments as separate messages (with same ID !). Example: <pre> Given the generated ID is 2344, number of fragments=3, message {dst,src,buf} would be fragmented into: [2344,3,0]{dst,src,buf1}, [2344,3,1]{dst,src,buf2} and [2344,3,2]{dst,src,buf3} </pre> */ void fragment(Message msg) { byte[] buffer; List fragments; Event evt; FragHeader hdr; Message frag_msg; Address dest=msg.getDest(); long id=getNextId(); // used as seqnos int num_frags; StringBuffer sb; Range r; try { buffer=msg.getBuffer(); fragments=Util.computeFragOffsets(buffer, frag_size); num_frags=fragments.size(); synchronized(this) { num_sent_frags+=num_frags; } if(trace) { sb=new StringBuffer("fragmenting packet to "); sb.append((dest != null ? dest.toString() : "<all members>")).append(" (size=").append(buffer.length); sb.append(") into ").append(num_frags).append(" fragment(s) [frag_size=").append(frag_size).append(']'); log.trace(sb.toString()); } for(int i=0; i < fragments.size(); i++) { r=(Range)fragments.get(i); // Copy the original msg (needed because we need to copy the headers too) frag_msg=msg.copy(false); // don't copy the buffer, only src, dest and headers frag_msg.setBuffer(buffer, (int)r.low, (int)r.high); hdr=new FragHeader(id, i, num_frags); frag_msg.putHeader(name, hdr); evt=new Event(Event.MSG, frag_msg); passDown(evt); } } catch(Exception e) { if(log.isErrorEnabled()) log.error("fragmentation failure", e); } } /** 1. Get all the fragment buffers 2. When all are received -> Assemble them into one big buffer 3. Read headers and byte buffer from big buffer 4. Set headers and buffer in msg 5. Pass msg up the stack */ void unfragment(Message msg) { FragmentationTable frag_table; Address sender=msg.getSrc(); Message assembled_msg; FragHeader hdr=(FragHeader)msg.removeHeader(name); frag_table=fragment_list.get(sender); if(frag_table == null) { frag_table=new FragmentationTable(sender); try { fragment_list.add(sender, frag_table); } catch(IllegalArgumentException x) { // the entry has already been added, probably in parallel from another thread frag_table=fragment_list.get(sender); } } num_received_frags++; assembled_msg=frag_table.add(hdr.id, hdr.frag_id, hdr.num_frags, msg); if(assembled_msg != null) { try { if(trace) log.trace("assembled_msg is " + assembled_msg); assembled_msg.setSrc(sender); // needed ? YES, because fragments have a null src !! num_received_msgs++; passUp(new Event(Event.MSG, assembled_msg)); } catch(Exception e) { if(log.isErrorEnabled()) log.error("unfragmentation failed", e); } } } void handleConfigEvent(HashMap map) { if(map == null) return; if(map.containsKey("frag_size")) { frag_size=((Integer)map.get("frag_size")).intValue(); if(log.isDebugEnabled()) log.debug("setting frag_size=" + frag_size);
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?