⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 fc.java.txt

📁 JGRoups源码
💻 TXT
📖 第 1 页 / 共 2 页
字号:
        StringBuffer sb=null;        boolean unblock=false;        if(trace) {            Long old_credit=(Long)sent.get(sender);            sb=new StringBuffer();            sb.append("received credit from ").append(sender).append(", old credit was ").                    append(old_credit).append(", new credits are ").append(max_credits).                    append(".\nCreditors before are: ").append(creditors);        }        synchronized(sent) {            sent.put(sender, new Long(max_credits));            if(creditors.size() > 0) {  // we are blocked because we expect credit from one or more members                removeCreditor(sender);                if(trace) {                    sb.append("\nCreditors after removal of ").append(sender).append(" are: ").append(creditors);                    log.trace(sb.toString());                }                if(creditors.size() == 0) {                    unblock=true;                }            }            else {  // no creditors, but still blocking: we need to unblock                if(Boolean.TRUE.equals(blocking.get()))                    unblock=true;            }        }        if(unblock) // moved this outside of the 'sent' synchronized block            unblockSender();    }    /**     * Check whether sender has enough credits left. If not, send him some more     * @param msg     */    private void adjustCredit(Message msg) {        Address src=msg.getSrc();        long    size=Math.max(24, msg.getLength());        if(src == null) {            if(log.isErrorEnabled()) log.error("src is null");            return;        }        if(decrementCredit(received, src, size, min_credits) == false) {            received.put(src, new Long(max_credits));            if(trace) log.trace("sending replenishment message to " + src);            sendCredit(src);        }    }    private void sendCredit(Address dest) {        Message  msg=new Message(dest, null, null);        msg.putHeader(name, REPLENISH_HDR);        passDown(new Event(Event.MSG, msg));    }    private void sendCreditRequest(final Address dest) {        Message  msg=new Message(dest, null, null);        msg.putHeader(name, CREDIT_REQUEST_HDR);        passDown(new Event(Event.MSG, msg));    }    /**     * Checks whether enough credits are available to send message. If not, blocks until enough credits     * are available     * @param evt Guaranteed to be a Message     * @return     */    private void waitUntilEnoughCreditsAvailable() {        while(true) {            try {                blocking.waitUntilWithTimeout(Boolean.FALSE, max_block_time);  // waits on 'sent'                break;            }            catch(TimeoutException e) {                List tmp=new ArrayList(creditors);                if(trace)                    log.trace("timeout occurred waiting for credits; sending credit request to " + tmp +                              ", creditors are " + creditors);                Address mbr;                for(Iterator it=tmp.iterator(); it.hasNext();) {                    mbr=(Address)it.next();                    sendCreditRequest(mbr);                }            }        }    }    /**     * Try to decrement the credits needed for this message and return true if successful, or false otherwise.     * For unicast destinations, the credits required are subtracted from the unicast destination member, for     * multicast messages the credits are subtracted from all current members in the group.     * @param msg     * @return false: will block, true: will not block     */    private boolean decrMessage(Message msg) {        Address dest;        long    size;        boolean success=true;        // ******************************************************************************************************        // this method is called by waitUntilEnoughCredits() which syncs on 'sent', so we don't need to sync here        // ******************************************************************************************************        if(msg == null) {            if(log.isErrorEnabled()) log.error("msg is null");            return true; // don't block !        }        dest=msg.getDest();        size=Math.max(24, msg.getLength());        if(dest != null && !dest.isMulticastAddress()) { // unicast destination            if(decrementCredit(sent, dest, size, 0)) {                return true;            }            else {                addCreditor(dest);                return false;            }        }        else {                 // multicast destination            for(Iterator it=members.iterator(); it.hasNext();) {                dest=(Address)it.next();                if(decrementCredit(sent, dest, size, 0) == false) {                    addCreditor(dest);                    success=false;                }            }        }        return success;    }    /** If message queueing is enabled, sends queued messages and unlocks sender (if successful) */    private void unblockSender() {        if(start_blocking > 0) {            stop_blocking=System.currentTimeMillis();            long diff=stop_blocking - start_blocking;            total_time_blocking+=diff;            last_blockings.add(new Long(diff));            stop_blocking=start_blocking=0;            if(trace)                log.trace("setting blocking=false, blocking time was " + diff + "ms");        }        if(trace)            log.trace("setting blocking=false");        blocking.set(Boolean.FALSE);    }    private void addCreditor(Address mbr) {        if(mbr != null && !creditors.contains(mbr))            creditors.add(mbr);    }    private void removeCreditor(Address mbr) {        creditors.remove(mbr);    }    /**     * Find the credits associated with <tt>dest</tt> and decrement its credits by credits_required. If the remaining     * value is less than or equal to 0, return false, else return true. Note that we will always subtract the credits.     * @param map     * @param dest     * @param credits_required Number of bytes required     * @param minimal_credits For the receiver: add minimal credits to check whether credits need to be sent     * @return Whether the required credits could successfully be subtracted from the credits left     */    private boolean decrementCredit(Map map, Address dest, long credits_required, long minimal_credits) {        long    credits_left, new_credits_left;        Long    tmp=(Long)map.get(dest);        boolean success;        if(tmp == null)            return true;        credits_left=tmp.longValue();        success=credits_left > (credits_required + minimal_credits);        new_credits_left=Math.max(0, credits_left - credits_required);        map.put(dest, new Long(new_credits_left));        if(success) {            return true;        }        else {            if(trace) {                StringBuffer sb=new StringBuffer();                sb.append("not enough credits left for ").append(dest).append(": left=").append(new_credits_left);                sb.append(", required+min_credits=").append((credits_required +min_credits)).append(", required=");                sb.append(credits_required).append(", min_credits=").append(min_credits);                log.trace(sb.toString());            }            return false;        }    }    void handleViewChange(Vector mbrs) {        Address addr;        if(mbrs == null) return;        if(trace) log.trace("new membership: " + mbrs);        members.clear();        members.addAll(mbrs);        synchronized(received) {            // add members not in membership to received hashmap (with full credits)            for(int i=0; i < mbrs.size(); i++) {                addr=(Address) mbrs.elementAt(i);                if(!received.containsKey(addr))                    received.put(addr, new Long(max_credits));            }            // remove members that left            for(Iterator it=received.keySet().iterator(); it.hasNext();) {                addr=(Address) it.next();                if(!mbrs.contains(addr))                    it.remove();            }        }        boolean unblock=false;        synchronized(sent) {            // add members not in membership to sent hashmap (with full credits)            for(int i=0; i < mbrs.size(); i++) {                addr=(Address) mbrs.elementAt(i);                if(!sent.containsKey(addr))                    sent.put(addr, new Long(max_credits));            }            // remove members that left            for(Iterator it=sent.keySet().iterator(); it.hasNext();) {                addr=(Address)it.next();                if(!mbrs.contains(addr))                    it.remove(); // modified the underlying map            }            // remove all creditors which are not in the new view            for(int i=0; i < creditors.size(); i++) {                Address creditor=(Address)creditors.elementAt(i);                if(!mbrs.contains(creditor))                    creditors.remove(creditor);            }            if(trace) log.trace("creditors are " + creditors);            if(creditors.size() == 0)                unblock=true;        }        if(unblock)            unblockSender();    }    private static String printMap(Map m) {        Map.Entry entry;        StringBuffer sb=new StringBuffer();        for(Iterator it=m.entrySet().iterator(); it.hasNext();) {            entry=(Map.Entry)it.next();            sb.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n");        }        return sb.toString();    }    public static class FcHeader extends Header implements Streamable {        public static final byte REPLENISH      = 1;        public static final byte CREDIT_REQUEST = 2; // the sender of the message is the requester        byte  type = REPLENISH;        public FcHeader() {        }        public FcHeader(byte type) {            this.type=type;        }        public long size() {            return Global.BYTE_SIZE;        }        public void writeExternal(ObjectOutput out) throws IOException {            out.writeByte(type);        }        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {            type=in.readByte();        }        public void writeTo(DataOutputStream out) throws IOException {            out.writeByte(type);        }        public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {            type=in.readByte();        }        public String toString() {            switch(type) {            case REPLENISH: return "REPLENISH";            case CREDIT_REQUEST: return "CREDIT_REQUEST";            default: return "<invalid type>";            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -