frag.java
来自「JGRoups源码」· Java 代码 · 共 563 行 · 第 1/2 页
JAVA
563 行
// $Id: FRAG.java,v 1.32 2006/10/11 14:39:41 belaban Exp $package org.jgroups.protocols;import org.jgroups.Address;import org.jgroups.Event;import org.jgroups.Message;import org.jgroups.View;import org.jgroups.stack.Protocol;import org.jgroups.util.ExposedByteArrayOutputStream;import org.jgroups.util.Util;import java.io.ByteArrayInputStream;import java.io.DataInputStream;import java.io.DataOutputStream;import java.util.*;/** * Fragmentation layer. Fragments messages larger than FRAG_SIZE into smaller packets. * Reassembles fragmented packets into bigger ones. The fragmentation number is prepended * to the messages as a header (and removed at the receiving side).<p> * Each fragment is identified by (a) the sender (part of the message to which the header is appended), * (b) the fragmentation ID (which is unique per FRAG layer (monotonically increasing) and (c) the * fragement ID which ranges from 0 to number_of_fragments-1.<p> * Requirement: lossless delivery (e.g. NAK, ACK). No requirement on ordering. Works for both unicast and * multicast messages. * @author Bela Ban * @author Filip Hanik * @version $Id: FRAG.java,v 1.32 2006/10/11 14:39:41 belaban Exp $ */public class FRAG extends Protocol { private int frag_size=8192; // conservative value /*the fragmentation list contains a fragmentation table per sender *this way it becomes easier to clean up if a sender (member) leaves or crashes */ private final FragmentationList fragment_list=new FragmentationList(); private int curr_id=1; private final ExposedByteArrayOutputStream bos=new ExposedByteArrayOutputStream(1024); private final Vector members=new Vector(11); private final static String name="FRAG"; long num_sent_msgs=0; long num_sent_frags=0; long num_received_msgs=0; long num_received_frags=0; public String getName() { return name; } public int getFragSize() {return frag_size;} public void setFragSize(int s) {frag_size=s;} public long getNumberOfSentMessages() {return num_sent_msgs;} public long getNumberOfSentFragments() {return num_sent_frags;} public long getNumberOfReceivedMessages() {return num_received_msgs;} public long getNumberOfReceivedFragments() {return num_received_frags;} /** * Setup the Protocol instance acording to the configuration string */ public boolean setProperties(Properties props) { String str; super.setProperties(props); str=props.getProperty("frag_size"); if(str != null) { frag_size=Integer.parseInt(str); props.remove("frag_size"); } if(props.size() > 0) { log.error("FRAG.setProperties(): the following properties are not recognized: " + props); return false; } return true; } public void resetStats() { super.resetStats(); num_sent_msgs=num_sent_frags=num_received_msgs=num_received_frags=0; } /** * Fragment a packet if larger than frag_size (add a header). Otherwise just pass down. Only * add a header if framentation is needed ! */ public void down(Event evt) { switch(evt.getType()) { case Event.MSG: Message msg=(Message)evt.getArg(); long size=msg.size(); num_sent_msgs++; if(size > frag_size) { if(trace) { StringBuffer sb=new StringBuffer("message size is "); sb.append(size).append(", will fragment (frag_size=").append(frag_size).append(')'); log.trace(sb.toString()); } fragment(msg); // Fragment and pass down return; } break; case Event.VIEW_CHANGE: //don't do anything if this dude is sending out the view change //we are receiving a view change, //in here we check for the View view=(View)evt.getArg(); Vector new_mbrs=view.getMembers(), left_mbrs; Address mbr; left_mbrs=Util.determineLeftMembers(members, new_mbrs); members.clear(); members.addAll(new_mbrs); for(int i=0; i < left_mbrs.size(); i++) { mbr=(Address)left_mbrs.elementAt(i); //the new view doesn't contain the sender, he must have left, //hence we will clear all his fragmentation tables fragment_list.remove(mbr); if(trace) log.trace("[VIEW_CHANGE] removed " + mbr + " from fragmentation table"); } break; case Event.CONFIG: passDown(evt); if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg()); handleConfigEvent((HashMap)evt.getArg()); return; } passDown(evt); // Pass on to the layer below us } /** * If event is a message, if it is fragmented, re-assemble fragments into big message and pass up the stack. */ public void up(Event evt) { switch(evt.getType()) { case Event.MSG: Message msg=(Message)evt.getArg(); Object obj=msg.getHeader(name); if(obj != null && obj instanceof FragHeader) { // needs to be defragmented unfragment(msg); // Unfragment and possibly pass up return; } else { num_received_msgs++; } break; case Event.CONFIG: passUp(evt); if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg()); handleConfigEvent((HashMap)evt.getArg()); return; } passUp(evt); // Pass up to the layer above us by default } /** * Send all fragments as separate messages (with same ID !). * Example: * <pre> * Given the generated ID is 2344, number of fragments=3, message {dst,src,buf} * would be fragmented into: * <p/> * [2344,3,0]{dst,src,buf1}, * [2344,3,1]{dst,src,buf2} and * [2344,3,2]{dst,src,buf3} * </pre> */ private void fragment(Message msg) { DataOutputStream out=null; byte[] buffer; byte[] fragments[]; Event evt; FragHeader hdr; Message frag_msg; Address dest=msg.getDest(), src=msg.getSrc(); long id=curr_id++; // used as seqnos int num_frags; int size; try { // Write message into a byte buffer and fragment it // Synchronization around bos is needed for concurrent access (http://jira.jboss.com/jira/browse/JGRP-215) synchronized(bos) { bos.reset(); out=new DataOutputStream(bos); msg.writeTo(out); out.flush(); buffer=bos.getRawBuffer(); fragments=Util.fragmentBuffer(buffer, frag_size, bos.size()); } num_frags=fragments.length; num_sent_frags+=num_frags; if(trace) { StringBuffer sb=new StringBuffer(); sb.append("fragmenting packet to ").append(dest != null ? dest.toString() : "<all members>"); sb.append(" (size=").append(buffer.length).append(") into ").append(num_frags); sb.append(" fragment(s) [frag_size=").append(frag_size).append(']'); log.trace(sb.toString()); } for(int i=0; i < num_frags; i++) { frag_msg=new Message(dest, src, fragments[i]); hdr=new FragHeader(id, i, num_frags); frag_msg.putHeader(name, hdr); evt=new Event(Event.MSG, frag_msg); passDown(evt); } } catch(Exception e) { log.error("exception occurred trying to fragment message", e); } finally { Util.close(out); } } /** * 1. Get all the fragment buffers * 2. When all are received -> Assemble them into one big buffer * 3. Read headers and byte buffer from big buffer * 4. Set headers and buffer in msg * 5. Pass msg up the stack */ private void unfragment(Message msg) { FragmentationTable frag_table; Address sender=msg.getSrc(); Message assembled_msg; FragHeader hdr=(FragHeader)msg.removeHeader(name); byte[] m; ByteArrayInputStream bis; DataInputStream in=null; frag_table=fragment_list.get(sender); if(frag_table == null) { frag_table=new FragmentationTable(sender); try { fragment_list.add(sender, frag_table); } catch(IllegalArgumentException x) { // the entry has already been added, probably in parallel from another thread frag_table=fragment_list.get(sender); } } num_received_frags++; m=frag_table.add(hdr.id, hdr.frag_id, hdr.num_frags, msg.getBuffer()); if(m != null) { try { bis=new ByteArrayInputStream(m); in=new DataInputStream(bis); assembled_msg=new Message(false); assembled_msg.readFrom(in); if(trace) log.trace("assembled_msg is " + assembled_msg); assembled_msg.setSrc(sender); // needed ? YES, because fragments have a null src !! num_received_msgs++; passUp(new Event(Event.MSG, assembled_msg)); } catch(Exception e) { log.error("failed unfragmenting a message", e); } finally { Util.close(in); } } }
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?