⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 fc.java.new

📁 JGRoups源码
💻 NEW
📖 第 1 页 / 共 2 页
字号:
    public void up(Event evt) {        switch(evt.getType()) {            case Event.MSG:                Message msg=(Message)evt.getArg();                FcHeader hdr=(FcHeader)msg.removeHeader(name);                if(hdr != null) {                    switch(hdr.type) {                    case FcHeader.REPLENISH:                        num_credit_responses_received++;                        handleCredit(msg.getSrc());                        break;                    case FcHeader.CREDIT_REQUEST:                        num_credit_requests_received++;                        Address sender=msg.getSrc();                        if(trace)                            log.trace("received credit request from " + sender + ": sending credits");                        received.put(sender, max_credits_constant);                        sendCredit(sender);                        break;                    default:                        log.error("header type " + hdr.type + " not known");                        break;                    }                    return; // don't pass message up                }                else {                    adjustCredit(msg);                }                break;        case Event.VIEW_CHANGE:            handleViewChange(((View)evt.getArg()).getMembers());            break;        }        passUp(evt);    }    private void handleDownMessage(Event evt) {        Message msg=(Message)evt.getArg();        int     length=msg.getLength();        Address dest=msg.getDest();        if(Util.acquire(lock)) {            try {                if(lowest_credit <= length) {                    determineCreditors(dest, length);                    insufficient_credit=true;                    num_blockings++;                    start_blocking=System.currentTimeMillis();                    while(insufficient_credit && running) {                        try {mutex.timedwait(max_block_time);} catch(InterruptedException e) {}                        if(insufficient_credit && running) {                            Util.release(lock);                            try {                                if(trace)                                    log.trace("timeout occurred waiting for credits; sending credit request to " + creditors);                                for(int i=0; i < creditors.size(); i++) {                                    sendCreditRequest((Address)creditors.get(i));                                }                            }                            finally {                                Util.acquire(lock);                            }                        }                    }                    stop_blocking=System.currentTimeMillis();                    long block_time=stop_blocking - start_blocking;                    if(trace)                        log.trace("total time blocked: " + block_time + " ms");                    total_time_blocking+=block_time;                    last_blockings.add(new Long(block_time));                }                else {                    long tmp=decrementCredit(sent, dest, length);                    if(tmp != -1)                        lowest_credit=Math.min(tmp, lowest_credit);                }            }            finally {                Util.release(lock);            }        }        // send message - either after regular processing, or after blocking (when enough credits available again)        passDown(evt);    }    /**     * Checks whether one member (unicast msg) or all members (multicast msg) have enough credits. Add those     * that don't to the creditors list     * @param dest     * @param length     */    private void determineCreditors(Address dest, int length) {        boolean multicast=dest == null || dest.isMulticastAddress();        Address mbr;        Long    credits;        if(multicast) {            Map.Entry entry;            for(Iterator it=sent.entrySet().iterator(); it.hasNext();) {                entry=(Map.Entry)it.next();                mbr=(Address)entry.getKey();                credits=(Long)entry.getValue();                if(credits.longValue() <= length) {                    if(!creditors.contains(mbr))                        creditors.add(mbr);                }            }        }        else {            credits=(Long)sent.get(dest);            if(credits != null && credits.longValue() <= length) {                if(!creditors.contains(dest))                    creditors.add(dest);            }        }    }    /**     * Decrements credits from a single member, or all members in sent_msgs, depending on whether it is a multicast     * or unicast message. No need to acquire mutex (must already be held when this method is called)     * @param dest     * @param credits     * @return The lowest number of credits left, or -1 if a unicast member was not found     */    private long decrementCredit(Map m, Address dest, long credits) {        boolean multicast=dest == null || dest.isMulticastAddress();        long    lowest=max_credits, tmp;        Long    val;        if(multicast) {            if(m.size() == 0)                return -1;            Map.Entry entry;            for(Iterator it=m.entrySet().iterator(); it.hasNext();) {                entry=(Map.Entry)it.next();                val=(Long)entry.getValue();                tmp=val.longValue();                tmp-=credits;                entry.setValue(new Long(tmp));                lowest=Math.min(tmp, lowest);            }            return lowest;        }        else {            val=(Long)m.get(dest);            if(val != null) {                lowest=val.longValue();                lowest-=credits;                m.put(dest, new Long(lowest));                return lowest;            }        }        return -1;    }    private void handleCredit(Address sender) {        if(sender == null) return;        StringBuffer sb=null;        if(Util.acquire(lock)) {            try {                if(trace) {                    Long old_credit=(Long)sent.get(sender);                    sb=new StringBuffer();                    sb.append("received credit from ").append(sender).append(", old credit was ").                            append(old_credit).append(", new credits are ").append(max_credits).                            append(".\nCreditors before are: ").append(creditors);                }                sent.put(sender, max_credits_constant);                lowest_credit=computeLowestCredit(sent);                if(creditors.size() > 0) {  // we are blocked because we expect credit from one or more members                    creditors.remove(sender);                    if(trace) {                        sb.append("\nCreditors after removal of ").append(sender).append(" are: ").append(creditors);                        log.trace(sb.toString());                    }                }                if(insufficient_credit && lowest_credit > 0 && creditors.size() == 0) {                    insufficient_credit=false;                    mutex.broadcast();                }            }            finally {                Util.release(lock);            }        }    }    private static long computeLowestCredit(Map m) {        Collection credits=m.values(); // List of Longs (credits)        Long retval=(Long)Collections.min(credits);        return retval.longValue();    }    /**     * Check whether sender has enough credits left. If not, send him some more     * @param msg     */    private void adjustCredit(Message msg) {        Address src=msg.getSrc();        long    length=msg.getLength(); // we don't care about headers for the purpose of flow control        if(src == null) {            if(log.isErrorEnabled()) log.error("src is null");            return;        }        if(length == 0)            return; // no effect        if(decrementCredit(received, src, length) <= min_credits) {            received.put(src, max_credits_constant);            if(trace) log.trace("sending replenishment message to " + src);            sendCredit(src);        }    }    private void sendCredit(Address dest) {        Message  msg=new Message(dest, null, null);        msg.putHeader(name, REPLENISH_HDR);        passDown(new Event(Event.MSG, msg));        num_credit_responses_sent++;    }    private void sendCreditRequest(final Address dest) {        Message  msg=new Message(dest, null, null);        msg.putHeader(name, CREDIT_REQUEST_HDR);        passDown(new Event(Event.MSG, msg));        num_credit_requests_sent++;    }    private void handleViewChange(Vector mbrs) {        Address addr;        if(mbrs == null) return;        if(trace) log.trace("new membership: " + mbrs);        if(Util.acquire(lock)) {            try {                // add members not in membership to received and sent hashmap (with full credits)                for(int i=0; i < mbrs.size(); i++) {                    addr=(Address) mbrs.elementAt(i);                    if(!received.containsKey(addr))                        received.put(addr, max_credits_constant);                    if(!sent.containsKey(addr))                        sent.put(addr, max_credits_constant);                }                // remove members that left                for(Iterator it=received.keySet().iterator(); it.hasNext();) {                    addr=(Address) it.next();                    if(!mbrs.contains(addr))                        it.remove();                }                // remove members that left                for(Iterator it=sent.keySet().iterator(); it.hasNext();) {                    addr=(Address)it.next();                    if(!mbrs.contains(addr))                        it.remove(); // modified the underlying map                }                // remove all creditors which are not in the new view                for(int i=0; i < creditors.size(); i++) {                    Address creditor=(Address)creditors.get(i);                    if(!mbrs.contains(creditor))                        creditors.remove(creditor);                }                if(trace) log.trace("creditors are " + creditors);                if(insufficient_credit && creditors.size() == 0) {                    lowest_credit=computeLowestCredit(sent);                    insufficient_credit=false;                    mutex.broadcast();                }            }            finally {                Util.release(lock);            }        }    }    private static String printMap(Map m) {        Map.Entry entry;        StringBuffer sb=new StringBuffer();        for(Iterator it=m.entrySet().iterator(); it.hasNext();) {            entry=(Map.Entry)it.next();            sb.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n");        }        return sb.toString();    }    public static class FcHeader extends Header implements Streamable {        public static final byte REPLENISH      = 1;        public static final byte CREDIT_REQUEST = 2; // the sender of the message is the requester        byte  type = REPLENISH;        public FcHeader() {        }        public FcHeader(byte type) {            this.type=type;        }        public long size() {            return Global.BYTE_SIZE;        }        public void writeExternal(ObjectOutput out) throws IOException {            out.writeByte(type);        }        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {            type=in.readByte();        }        public void writeTo(DataOutputStream out) throws IOException {            out.writeByte(type);        }        public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {            type=in.readByte();        }        public String toString() {            switch(type) {            case REPLENISH: return "REPLENISH";            case CREDIT_REQUEST: return "CREDIT_REQUEST";            default: return "<invalid type>";            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -