⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 fc.java.new

📁 JGRoups源码
💻 NEW
📖 第 1 页 / 共 2 页
字号:
// $Id: FC.java,v 1.51 2006/01/14 14:00:38 belaban Exp $package org.jgroups.protocols;import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;import EDU.oswego.cs.dl.util.concurrent.Sync;import EDU.oswego.cs.dl.util.concurrent.ReentrantLock;import EDU.oswego.cs.dl.util.concurrent.CondVar;import org.jgroups.*;import org.jgroups.stack.Protocol;import org.jgroups.util.BoundedList;import org.jgroups.util.Streamable;import org.jgroups.util.Util;import java.io.*;import java.util.*;/** * Simple flow control protocol based on a credit system. Each sender has a number of credits (bytes * to send). When the credits have been exhausted, the sender blocks. Each receiver also keeps track of * how many credits it has received from a sender. When credits for a sender fall below a threshold, * the receiver sends more credits to the sender. Works for both unicast and multicast messages. * <p> * Note that this protocol must be located towards the top of the stack, or all down_threads from JChannel to this * protocol must be set to false ! This is in order to block JChannel.send()/JChannel.down(). * <br/>This is the second simplified implementation of the same model. The algorithm is sketched out in * doc/FlowControl.txt * @author Bela Ban * @version $Revision: 1.51 $ */public class FC extends Protocol {    /** HashMap<Address,Long>: keys are members, values are credits left. For each send, the     * number of credits is decremented by the message size */    final Map sent=new HashMap(11);    // final Map sent=new ConcurrentHashMap(11);    /** HashMap<Address,Long>: keys are members, values are credits left (in bytes).     * For each receive, the credits for the sender are decremented by the size of the received message.     * When the credits are 0, we refill and send a CREDIT message to the sender. Sender blocks until CREDIT     * is received after reaching <tt>min_credits</tt> credits. */    final Map received=new ConcurrentReaderHashMap(11);    // final Map received=new ConcurrentHashMap(11);    /** List of members from whom we expect credits */    final List creditors=new ArrayList(11);    /** Max number of bytes to send per receiver until an ack must     * be received before continuing sending */    private long max_credits=50000;    private Long max_credits_constant;    /** Max time (in milliseconds) to block. If credit hasn't been received after max_block_time, we send     * a REPLENISHMENT request to the members from which we expect credits. A value <= 0 means to     * wait forever.     */    private long max_block_time=5000;    /** If credits fall below this limit, we send more credits to the sender. (We also send when     * credits are exhausted (0 credits left)) */    private double min_threshold=0.25;    /** Computed as <tt>max_credits</tt> times <tt>min_theshold</tt>. If explicitly set, this will     * override the above computation */    private long min_credits=0;    /** Whether FC is still running, this is set to false when the protocol terminates (on stop()) */    private boolean running=true;    /** Determines whether or not to block on down(). Set when not enough credit is available to send a message     * to all or a single member */    private boolean insufficient_credit=false;    /** the lowest credits of any destination (sent_msgs) */    private long lowest_credit=max_credits;    /** Lock to be used with the Condvar below */    final Sync lock=new ReentrantLock();    /** Mutex to block on down() */    final CondVar mutex=new CondVar(lock);    static final String name="FC";    private long start_blocking=0, stop_blocking=0;    private int num_blockings=0;    private int num_credit_requests_received=0, num_credit_requests_sent=0;    private int num_credit_responses_sent=0, num_credit_responses_received=0;    private long total_time_blocking=0;    final BoundedList last_blockings=new BoundedList(50);    final static FcHeader REPLENISH_HDR=new FcHeader(FcHeader.REPLENISH);    final static FcHeader CREDIT_REQUEST_HDR=new FcHeader(FcHeader.CREDIT_REQUEST);    public final String getName() {        return name;    }    public void resetStats() {        super.resetStats();        num_blockings=0;        num_credit_responses_sent=num_credit_responses_received=num_credit_requests_received=num_credit_requests_sent=0;        total_time_blocking=0;        last_blockings.removeAll();    }    public long getMaxCredits() {        return max_credits;    }    public void setMaxCredits(long max_credits) {        this.max_credits=max_credits;        max_credits_constant=new Long(this.max_credits);    }    public double getMinThreshold() {        return min_threshold;    }    public void setMinThreshold(double min_threshold) {        this.min_threshold=min_threshold;    }    public long getMinCredits() {        return min_credits;    }    public void setMinCredits(long min_credits) {        this.min_credits=min_credits;    }    public boolean isBlocked() {        return insufficient_credit;    }    public int getNumberOfBlockings() {        return num_blockings;    }    public long getMaxBlockTime() {        return max_block_time;    }    public void setMaxBlockTime(long t) {        max_block_time=t;    }    public long getTotalTimeBlocked() {        return total_time_blocking;    }    public double getAverageTimeBlocked() {        return num_blockings == 0? 0.0 : total_time_blocking / (double)num_blockings;    }    public int getNumberOfCreditRequestsReceived() {        return num_credit_requests_received;    }    public int getNumberOfCreditRequestsSent() {        return num_credit_requests_sent;    }    public int getNumberOfCreditResponsesReceived() {        return num_credit_responses_received;    }    public int getNumberOfCreditResponsesSent() {        return num_credit_responses_sent;    }    public String printSenderCredits() {        return printMap(sent);    }    public String printReceiverCredits() {        return printMap(received);    }    public String printCredits() {        StringBuffer sb=new StringBuffer();        sb.append("senders:\n").append(printMap(sent)).append("\n\nreceivers:\n").append(printMap(received));        return sb.toString();    }    public Map dumpStats() {        Map retval=super.dumpStats();        if(retval == null)            retval=new HashMap();        retval.put("senders", printMap(sent));        retval.put("receivers", printMap(received));        retval.put("num_blockings", new Integer(this.num_blockings));        retval.put("avg_time_blocked", new Double(getAverageTimeBlocked()));        retval.put("num_replenishments", new Integer(this.num_credit_responses_received));        retval.put("total_time_blocked", new Long(total_time_blocking));        return retval;    }    public String showLastBlockingTimes() {        return last_blockings.toString();    }    /** Allows to unblock a blocked sender from an external program, e.g. JMX */    public void unblock() {        if(Util.acquire(lock)) {            try {                if(trace)                    log.trace("unblocking the sender and replenishing all members, creditors are " + creditors);                Map.Entry entry;                for(Iterator it=sent.entrySet().iterator(); it.hasNext();) {                    entry=(Map.Entry)it.next();                    entry.setValue(max_credits_constant);                }                lowest_credit=computeLowestCredit(sent);                creditors.clear();                insufficient_credit=false;                mutex.broadcast();            }            finally {                Util.release(lock);            }        }//        synchronized(mutex) {//            if(trace)//                log.trace("unblocking the sender and replenishing all members, creditors are " + creditors);////            Map.Entry entry;//            for(Iterator it=sent.entrySet().iterator(); it.hasNext();) {//                entry=(Map.Entry)it.next();//                entry.setValue(max_credits_constant);//            }////            lowest_credit=computeLowestCredit(sent);//            creditors.clear();//            insufficient_credit=false;//            mutex.notifyAll();//        }    }    public boolean setProperties(Properties props) {        String  str;        boolean min_credits_set=false;        super.setProperties(props);        str=props.getProperty("max_credits");        if(str != null) {            max_credits=Long.parseLong(str);            props.remove("max_credits");        }        str=props.getProperty("min_threshold");        if(str != null) {            min_threshold=Double.parseDouble(str);            props.remove("min_threshold");        }        str=props.getProperty("min_credits");        if(str != null) {            min_credits=Long.parseLong(str);            props.remove("min_credits");            min_credits_set=true;        }        if(!min_credits_set)            min_credits=(long)((double)max_credits * min_threshold);        str=props.getProperty("max_block_time");        if(str != null) {            max_block_time=Long.parseLong(str);            props.remove("max_block_time");        }        if(props.size() > 0) {            log.error("FC.setProperties(): the following properties are not recognized: " + props);            return false;        }        max_credits_constant=new Long(max_credits);        return true;    }    public void start() throws Exception {        super.start();//        synchronized(mutex) {//            running=true;//            insufficient_credit=false;//            lowest_credit=max_credits;//        }        lock.acquire();        try {            running=true;            insufficient_credit=false;            lowest_credit=max_credits;        }        finally {            lock.release();        }    }    public void stop() {        super.stop();//        synchronized(mutex) {//            running=false;//            mutex.notifyAll();//        }        if(Util.acquire(lock)) {            try {                running=false;                mutex.broadcast(); // notify all threads waiting on the mutex that we are done            }            finally {                Util.release(lock);            }        }    }    /**     * We need to receive view changes concurrent to messages on the down events: a message might blocks, e.g.     * because we don't have enough credits to send to member P. However, if member P crashed, we need to unblock !     * @param evt     */    protected void receiveDownEvent(Event evt) {        if(evt.getType() == Event.VIEW_CHANGE) {            View v=(View)evt.getArg();            Vector mbrs=v.getMembers();            handleViewChange(mbrs);        }        super.receiveDownEvent(evt);    }    public void down(Event evt) {        switch(evt.getType()) {        case Event.MSG:            handleDownMessage(evt);            return;        }        passDown(evt); // this could potentially use the lower protocol's thread which may block    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -