piggyback.java

来自「JGRoups源码」· Java 代码 · 共 253 行

JAVA
253
字号
// $Id: PIGGYBACK.java,v 1.10 2005/08/11 12:43:47 belaban Exp $package org.jgroups.protocols;import org.jgroups.*;import org.jgroups.stack.Protocol;import org.jgroups.util.Queue;import org.jgroups.util.QueueClosedException;import org.jgroups.util.Util;import java.io.IOException;import java.io.ObjectInput;import java.io.ObjectOutput;import java.util.Properties;import java.util.Vector;/** * Combines multiple messages into a single large one. As many messages as possible are combined into * one, after a max timeout or when the msg size becomes too big the message is sent. On the receiving * side, the large message is spliced into the smaller ones and delivered. */public class PIGGYBACK extends Protocol {    long max_wait_time=20; // milliseconds: max. wait between consecutive msgs    long max_size=8192;    // don't piggyback if created msg would exceed this size (in bytes)    final Queue msg_queue=new Queue();    Packer packer=null;    boolean packing=false;    Address local_addr=null;    class Packer implements Runnable {        Thread t=null;        public void start() {            if(t == null) {                t=new Thread(this, "Packer thread");                t.setDaemon(true);                t.start();            }        }        public void stop() {            t=null;        }        public void run() {            long current_size=0;            long start_time, time_to_wait=max_wait_time;            Message m, new_msg;            Vector msgs;            while(packer != null) {                try {                    m=(Message)msg_queue.remove();                    m.setSrc(local_addr);                    start_time=System.currentTimeMillis();                    current_size=0;                    new_msg=new Message();                    msgs=new Vector();                    msgs.addElement(m);                    current_size+=m.size();                    while(System.currentTimeMillis() - start_time <= max_wait_time &&                            current_size <= max_size) {                        time_to_wait=max_wait_time - (System.currentTimeMillis() - start_time);                        if(time_to_wait <= 0)                            break;                        try {                            m=(Message)msg_queue.peek(time_to_wait);                            m.setSrc(local_addr);                        }                        catch(TimeoutException timeout) {                            break;                        }                        if(m == null || m.size() + current_size > max_size)                            break;                        m=(Message)msg_queue.remove();                        current_size+=m.size();                        msgs.addElement(m);                    }                    try {                        new_msg.putHeader(getName(), new PiggybackHeader());                        new_msg.setBuffer(Util.objectToByteBuffer(msgs));                        passDown(new Event(Event.MSG, new_msg));                            if(log.isInfoEnabled()) log.info("combined " + msgs.size() +                                    " messages of a total size of " + current_size + " bytes");                    }                    catch(Exception e) {                        if(warn) log.warn("exception is " + e);                    }                }                catch(QueueClosedException closed) {                     if(log.isInfoEnabled()) log.info("packer stopped as queue is closed");                    break;                }            }        }    }    /**     * All protocol names have to be unique !     */    public String getName() {        return "PIGGYBACK";    }    public boolean setProperties(Properties props) {super.setProperties(props);        String str;        str=props.getProperty("max_wait_time");        if(str != null) {            max_wait_time=Long.parseLong(str);            props.remove("max_wait_time");        }        str=props.getProperty("max_size");        if(str != null) {            max_size=Long.parseLong(str);            props.remove("max_size");        }        if(props.size() > 0) {            log.error("PIGGYBACK.setProperties(): these properties are not recognized: " + props);            return false;        }        return true;    }    public void start() throws Exception {        startPacker();    }    public void stop() {        packing=false;        msg_queue.close(true);  // flush pending messages, this should also stop the packer ...        stopPacker();           // ... but for safety reasons, we stop it here again    }    public void up(Event evt) {        Message msg;        Object obj;        Vector messages;        switch(evt.getType()) {            case Event.SET_LOCAL_ADDRESS:                local_addr=(Address)evt.getArg();                break;            case Event.MSG:                msg=(Message)evt.getArg();                obj=msg.getHeader(getName());                if(obj == null || !(obj instanceof PiggybackHeader))                    break;                msg.removeHeader(getName());                try {                    messages=(Vector)msg.getObject();                     if(log.isInfoEnabled()) log.info("unpacking " + messages.size() + " messages");                    for(int i=0; i < messages.size(); i++)                        passUp(new Event(Event.MSG, messages.elementAt(i)));                }                catch(Exception e) {                    if(warn) log.warn("piggyback message does not contain a vector of " +                            "piggybacked messages, discarding message ! Exception is " + e);                    return;                }                return;             // don't pass up !        }        passUp(evt);            // Pass up to the layer above us    }    public void down(Event evt) {        Message msg;        switch(evt.getType()) {            case Event.MSG:                msg=(Message)evt.getArg();                if(msg.getDest() != null && !msg.getDest().isMulticastAddress())                    break;  // unicast message, handle as usual                if(!packing)                    break;  // pass down as usual; we haven't started yet                try {                    msg_queue.add(msg);                }                catch(QueueClosedException closed) {                    break;  // pass down regularly                }                return;        }        passDown(evt);          // Pass on to the layer below us    }    void startPacker() {        if(packer == null) {            packing=true;            packer=new Packer();            packer.start();        }    }    void stopPacker() {        if(packer != null) {            packer.stop();            packing=false;            msg_queue.close(false);            packer=null;        }    }    public static class PiggybackHeader extends Header {        public PiggybackHeader() {        }        public String toString() {            return "[PIGGYBACK: <variables> ]";        }        public void writeExternal(ObjectOutput out) throws IOException {        }        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {        }    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?