⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pbcast.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
                    if(missing_msgs != null) {                        if(log.isInfoEnabled())                            log.info("asking " + gossip.sender + " for retransmission of " +                                    sender + ", missing messages: " + missing_msgs + "\nwin for " + sender + ":\n" + win + '\n');                        if(ht == null) ht=new Hashtable();                        ht.put(sender, missing_msgs);                    }                }            }        }        /* 6. Send a XMIT_REQ to the sender of the gossip. The sender will then resend those messages as           an XMIT_RSP unicast message (the messages are in its buffer, as a List) */        if(ht == null || ht.size() == 0) {        }        else {            hdr=new PbcastHeader(PbcastHeader.XMIT_REQ);            hdr.xmit_reqs=ht;                if(log.isInfoEnabled()) log.info("sending XMIT_REQ to " + gossip.sender);            msg=new Message(gossip.sender, null, null);            msg.putHeader(getName(), hdr);            passDown(new Event(Event.MSG, msg));        }        /* 7. Remove myself from 'not_seen' list. If not_seen list is empty, we can garbage-collect messages           smaller than the digest. Since all the members have seen the gossip, it will not be re-sent */        gossip.removeFromNotSeenList(local_addr);        if(gossip.sizeOfNotSeenList() == 0) {            garbageCollect(gossip.digest);            return;        }        /* 8. If we make it to this point, re-send to subset of remaining members in 'not_seen' list */        new_dests=Util.pickSubset(gossip.getNotSeenList(), subset);            if(log.isInfoEnabled()) log.info("(from " + local_addr +                                                ") forwarding gossip " + gossip.shortForm() + " to " + new_dests);        gossip.addToSeenList(local_addr);        for(int i=0; i < new_dests.size(); i++) {            dest=(Address) new_dests.elementAt(i);            msg=new Message(dest, null, null);            hdr=new PbcastHeader(gossip.copy(), PbcastHeader.GOSSIP);            msg.putHeader(getName(), hdr);            passDown(new Event(Event.MSG, msg));        }    }    /**     * Find the messages indicated in <code>xmit_reqs</code> and re-send them to     * <code>requester</code>     */    void handleXmitRequest(Address requester, Hashtable xmit_reqs) {        NakReceiverWindow win;        Address sender;        List msgs, missing_msgs, xmit_msgs;        Message msg;        if(requester == null) {            if(log.isErrorEnabled()) log.error("requester is null");            return;        }        if(log.isInfoEnabled()) log.info("retransmission requests are " + printXmitReqs(xmit_reqs));        for(Enumeration e=xmit_reqs.keys(); e.hasMoreElements();) {            sender=(Address) e.nextElement();            win=(NakReceiverWindow) digest.get(sender);            if(win == null) {                if(warn) log.warn("sender " + sender +                                                         " not found in my digest; skipping retransmit request !");                continue;            }            missing_msgs=(List) xmit_reqs.get(sender);            msgs=win.getMessagesInList(missing_msgs);  // msgs to be sent back to requester            // re-send the messages to requester. don't add a header since they already have headers            // (when added to the NakReceiverWindow, the headers were not removed)            xmit_msgs=new List();            for(Enumeration en=msgs.elements(); en.hasMoreElements();) {                msg=((Message) en.nextElement()).copy();                xmit_msgs.add(msg);            }            // create a msg with the List of xmit_msgs as contents, add header            msg=new Message(requester, null, xmit_msgs);            msg.putHeader(getName(), new PbcastHeader(PbcastHeader.XMIT_RSP));            passDown(new Event(Event.MSG, msg));        }    }    void handleXmitRsp(List xmit_msgs) {        Message m;        PbcastHeader hdr;        for(Enumeration e=xmit_msgs.elements(); e.hasMoreElements();) {            m=(Message) e.nextElement();            hdr=(PbcastHeader) m.removeHeader(getName());            if(hdr == null) {                log.warn("header is null, ignoring message");            }            else {                if(log.isInfoEnabled()) log.info("received #" + hdr.seqno + ", type=" +                        PbcastHeader.type2String(hdr.type) + ", msg=" + m);                handleUpMessage(m, hdr);            }        }    }    String printXmitReqs(Hashtable xmit_reqs) {        StringBuffer sb=new StringBuffer();        Address key;        boolean first=true;        if(xmit_reqs == null)            return "<null>";        for(Enumeration e=xmit_reqs.keys(); e.hasMoreElements();) {            key=(Address) e.nextElement();            if(!first) {                sb.append(", ");            }            else                first=false;            sb.append(key + ": " + xmit_reqs.get(key));        }        return sb.toString();    }    void garbageCollect(Digest d) {        Address sender;        long tmp_seqno;        NakReceiverWindow win;        Map.Entry entry;        org.jgroups.protocols.pbcast.Digest.Entry val;        for(Iterator it=d.senders.entrySet().iterator(); it.hasNext();) {            entry=(Map.Entry)it.next();            sender=(Address)entry.getKey();            val=(org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue();            win=(NakReceiverWindow)digest.get(sender);            if(win == null) {                if(log.isDebugEnabled()) log.debug("sender " + sender +                                                   " not found in our message digest, skipping");                continue;            }            tmp_seqno=val.high_seqno;            tmp_seqno=Math.max(tmp_seqno - gc_lag, 0);            if(tmp_seqno <= 0) {                continue;            }            if(trace) log.trace("(from " + local_addr +                                               ") GC: deleting messages < " + tmp_seqno + " from " + sender);            win.stable(tmp_seqno);        }    }    /**     * If sender of gossip is not a member, send a NOT_MEMBER to sender (after n gossips received).     * This will cause that member to leave the group and possibly re-join.     */    void shunInvalidGossiper(Address invalid_gossiper) {        int num_pings=0;        Message shun_msg;        if(invalid_gossipers.containsKey(invalid_gossiper)) {            num_pings=((Integer) invalid_gossipers.get(invalid_gossiper)).intValue();            if(num_pings >= max_invalid_gossips) {                    if(log.isInfoEnabled()) log.info("sender " + invalid_gossiper +                                                               " is not member of " + members + " ! Telling it to leave group");                shun_msg=new Message(invalid_gossiper, null, null);                shun_msg.putHeader(getName(), new PbcastHeader(PbcastHeader.NOT_MEMBER));                passDown(new Event(Event.MSG, shun_msg));                invalid_gossipers.remove(invalid_gossiper);            }            else {                num_pings++;                invalid_gossipers.put(invalid_gossiper, new Integer(num_pings));            }        }        else {            num_pings++;            invalid_gossipers.put(invalid_gossiper, new Integer(num_pings));        }    }    /** Computes the gossip_interval. See DESIGN for details */    long computeGossipInterval(int num_mbrs, double desired_avg_gossip) {        return getRandom((long) (num_mbrs * desired_avg_gossip * 2));    }    long getRandom(long range) {        return (long) ((Math.random() * range) % range);    }    /* ------------------------------- End of Private Methods ---------------------------------------- */    private static class GossipEntry {        PbcastHeader hdr=null;        Address sender=null;        byte[] data=null;        GossipEntry(PbcastHeader hdr, Address sender, byte[] data) {            this.hdr=hdr;            this.sender=sender;            this.data=data;        }        public String toString() {            return "hdr=" + hdr + ", sender=" + sender + ", data=" + data;        }    }    /**     Handles gossip and retransmission requests. Removes requests from a (bounded) queue.     */    private class GossipHandler implements Runnable {        Thread t=null;        final Queue queue;        GossipHandler(Queue q) {            queue=q;        }        void start() {            if(t == null) {                t=new Thread(this, "PBCAST.GossipHandlerThread");                t.setDaemon(true);                t.start();            }        }        void stop() {            Thread tmp;            if(t != null && t.isAlive()) {                tmp=t;                t=null;                if(queue != null)                    queue.close(false); // don't flush elements                tmp.interrupt();            }            t=null;        }        public void run() {            GossipEntry entry;            PbcastHeader hdr;            List xmit_msgs;            byte[] data;            while(t != null && queue != null) {                try {                    entry=(GossipEntry) queue.remove();                    hdr=entry.hdr;                    if(hdr == null) {                        if(log.isErrorEnabled()) log.error("gossip entry has no PbcastHeader");                        continue;                    }                    switch(hdr.type) {                        case PbcastHeader.GOSSIP:                            handleGossip(hdr.gossip);                            break;                        case PbcastHeader.XMIT_REQ:                            if(hdr.xmit_reqs == null) {                                if(warn) log.warn("request is null !");                                break;                            }                            handleXmitRequest(entry.sender, hdr.xmit_reqs);                            break;                        case PbcastHeader.XMIT_RSP:                            data=entry.data;                            if(data == null) {                                if(warn) log.warn("buffer is null (no xmitted msgs)");                                break;                            }                            try {                                xmit_msgs=(List) Util.objectFromByteBuffer(data);                            }                            catch(Exception ex) {                                if(log.isErrorEnabled()) log.error("failed creating retransmitted messages from buffer", ex);                                break;                            }                            handleXmitRsp(xmit_msgs);                            break;                        case PbcastHeader.NOT_MEMBER:  // we are shunned                            if(shun) {                                if(log.isInfoEnabled()) log.info("I am being shunned. Will leave and re-join");                                passUp(new Event(Event.EXIT));                            }                            break;                        default:                            if(log.isErrorEnabled()) log.error("type (" + hdr.type +                                                                         ") of PbcastHeader not known !");                            return;                    }                }                catch(QueueClosedException closed) {                    break;                }            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -