⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 fc.java.txt

📁 JGRoups源码
💻 TXT
📖 第 1 页 / 共 2 页
字号:
// $Id: FC.java.txt,v 1.1 2005/08/16 12:58:58 belaban Exp $package org.jgroups.protocols;import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;import org.jgroups.*;import org.jgroups.stack.Protocol;import org.jgroups.util.BoundedList;import org.jgroups.util.CondVar;import org.jgroups.util.Streamable;import java.io.*;import java.util.*;/** * Simple flow control protocol based on a credit system. Each sender has a number of credits (bytes * to send). When the credits have been exhausted, the sender blocks. Each receiver also keeps track of * how many credits it has received from a sender. When credits for a sender fall below a threshold, * the receiver sends more credits to the sender. Works for both unicast and multicast messages. * <p> * Note that this protocol must be located towards the top of the stack, or all down_threads from JChannel to this * protocol must be set to false ! This is in order to block JChannel.send()/JChannel.down(). * @author Bela Ban * @version $Revision: 1.1 $ */public class FC extends Protocol {    /** My own address */    Address local_addr=null;    /** HashMap<Address,Long>: keys are members, values are credits left. For each send, the     * number of credits is decremented by the message size */    final Map sent=new HashMap(11);    // final Map sent=new ConcurrentHashMap(11);    /** HashMap<Address,Long>: keys are members, values are credits left (in bytes).     * For each receive, the credits for the sender are decremented by the size of the received message.     * When the credits are 0, we refill and send a CREDIT message to the sender. Sender blocks until CREDIT     * is received after reaching <tt>min_credits</tt> credits. */    final Map received=new ConcurrentReaderHashMap(11);    // final Map received=new ConcurrentHashMap(11);    /** We cache the membership */    final Vector members=new Vector(11);    /** List of members from whom we expect credits */    final Vector creditors=new Vector(11);    /** Max number of bytes to send per receiver until an ack must     * be received before continuing sending */    private long max_credits=50000;    /** Max time (in milliseconds) to block. If credit hasn't been received after max_block_time, we send     * a REPLENISHMENT request to the members from which we expect credits. A value <= 0 means to     * wait forever.     */    private long max_block_time=5000;    /** If credits fall below this limit, we send more credits to the sender. (We also send when     * credits are exhausted (0 credits left)) */    double min_threshold=0.25;    /** Computed as <tt>max_credits</tt> times <tt>min_theshold</tt>. If explicitly set, this will     * override the above computation */    private long min_credits=0;    /** Current blocking. True if blocking, else false */    private final CondVar blocking=new CondVar("blocking", Boolean.FALSE);    static final String name="FC";    private long start_blocking=0, stop_blocking=0;    private int num_blockings=0, num_replenishments=0, num_credit_requests=0;    private long total_time_blocking=0;    final BoundedList last_blockings=new BoundedList(50);    final static FcHeader REPLENISH_HDR=new FcHeader(FcHeader.REPLENISH);    final static FcHeader CREDIT_REQUEST_HDR=new FcHeader(FcHeader.CREDIT_REQUEST);    public String getName() {        return name;    }    public void resetStats() {        super.resetStats();        num_blockings=num_replenishments=num_credit_requests=0;        total_time_blocking=0;        last_blockings.removeAll();    }    public long getMaxCredits() {        return max_credits;    }    public void setMaxCredits(long max_credits) {        this.max_credits=max_credits;    }    public double getMinThreshold() {        return min_threshold;    }    public void setMinThreshold(double min_threshold) {        this.min_threshold=min_threshold;    }    public long getMinCredits() {        return min_credits;    }    public void setMinCredits(long min_credits) {        this.min_credits=min_credits;    }    public boolean isBlocked() {        Object obj=blocking.get();        return obj != null && obj instanceof Boolean && ((Boolean)obj).booleanValue();    }    public int getNumberOfBlockings() {        return num_blockings;    }    public long getTotalTimeBlocked() {        return total_time_blocking;    }    public double getAverageTimeBlocked() {        return num_blockings == 0? num_blockings : total_time_blocking / num_blockings;    }    public int getNumberOfReplenishmentsReceived() {        return num_replenishments;    }    public int getNumberOfCreditRequests() {        return num_credit_requests;    }    public String printSenderCredits() {        return printMap(sent);    }    public String printReceiverCredits() {        return printMap(received);    }    public String printCredits() {        StringBuffer sb=new StringBuffer();        sb.append("senders:\n").append(printMap(sent)).append("\n\nreceivers:\n").append(printMap(received));        return sb.toString();    }    public Map dumpStats() {        Map retval=super.dumpStats();        if(retval == null)            retval=new HashMap();        retval.put("senders", printMap(sent));        retval.put("receivers", printMap(received));        retval.put("num_blockings", new Integer(this.num_blockings));        retval.put("avg_time_blocked", new Double(getAverageTimeBlocked()));        retval.put("num_replenishments", new Integer(this.num_replenishments));        return retval;    }    public String showLastBlockingTimes() {        return last_blockings.toString();    }    public void unblock() {        unblockSender();    }    public boolean setProperties(Properties props) {        String  str;        boolean min_credits_set=false;        super.setProperties(props);        str=props.getProperty("max_credits");        if(str != null) {            max_credits=Long.parseLong(str);            props.remove("max_credits");        }        str=props.getProperty("min_threshold");        if(str != null) {            min_threshold=Double.parseDouble(str);            props.remove("min_threshold");        }        str=props.getProperty("min_credits");        if(str != null) {            min_credits=Long.parseLong(str);            props.remove("min_credits");            min_credits_set=true;        }        if(!min_credits_set)            min_credits=(long)((double)max_credits * min_threshold);        str=props.getProperty("max_block_time");        if(str != null) {            max_block_time=Long.parseLong(str);            props.remove("max_block_time");        }        if(props.size() > 0) {            log.error("FC.setProperties(): the following properties are not recognized: " + props);            return false;        }        return true;    }    public void stop() {        super.stop();        unblock();    }    /**     * We need to receive view changes concurrent to messages on the down events: a message might blocks, e.g.     * because we don't have enough credits to send to member P. However, if member P crashed, we need to unblock !     * @param evt     */    protected void receiveDownEvent(Event evt) {        if(evt.getType() == Event.VIEW_CHANGE) {            View v=(View)evt.getArg();            Vector mbrs=v.getMembers();            handleViewChange(mbrs);        }        super.receiveDownEvent(evt);    }    public void down(Event evt) {        switch(evt.getType()) {        case Event.MSG:            handleDownMessage(evt);            return;        }        passDown(evt); // this could potentially use the lower protocol's thread which may block    }    private synchronized void handleDownMessage(Event evt) {        if(Boolean.TRUE.equals(blocking.get())) { // blocked            waitUntilEnoughCreditsAvailable();        }        else {            // not blocked            boolean rc;            synchronized(sent) { // 'sent' is the same lock as blocking.getLock()...                rc=decrMessage((Message)evt.getArg());                if(rc == false) {                    if(trace)                        log.trace("blocking due to insufficient credits");                    blocking.set(Boolean.TRUE);                    start_blocking=System.currentTimeMillis();                    num_blockings++;                }            }            if(rc == false) {                waitUntilEnoughCreditsAvailable();            }        }        passDown(evt);    }    public void up(Event evt) {        switch(evt.getType()) {            case Event.SET_LOCAL_ADDRESS:                local_addr=(Address)evt.getArg();                break;            case Event.VIEW_CHANGE:                handleViewChange(((View)evt.getArg()).getMembers());                break;            case Event.MSG:                Message msg=(Message)evt.getArg();                FcHeader hdr=(FcHeader)msg.removeHeader(name);                if(hdr != null) {                    switch(hdr.type) {                    case FcHeader.REPLENISH:                        num_replenishments++;                        handleCredit(msg.getSrc());                        break;                    case FcHeader.CREDIT_REQUEST:                        num_credit_requests++;                        Address sender=msg.getSrc();                        if(trace)                            log.trace("received credit request from " + sender + ": sending credits");                        received.put(sender, new Long(max_credits));                        sendCredit(sender);                        break;                    default:                        log.error("header type " + hdr.type + " not known");                        break;                    }                    return; // don't pass message up                }                else {                    adjustCredit(msg);                }                break;        }        passUp(evt);    }    private void handleCredit(Address sender) {        if(sender == null) return;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -