⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pbcast.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
    /* --------------------------------- Private Methods --------------------------------------------- */    /**     Ensures that FIFO is observed for all messages for a certain member. The NakReceiverWindow corresponding     to a certain sender is looked up in a hashtable. Then, the message is added to the NakReceiverWindow.     As many messages as possible are then removed from the table and passed up.     */    void handleUpMessage(Message m, PbcastHeader hdr) {        Address sender=m.getSrc();        NakReceiverWindow win=null;        Message tmpmsg;        long tmp_seqno=hdr.seqno;        if(sender == null) {            if(log.isErrorEnabled()) log.error("sender is null");            return;        }        synchronized(digest) {            win=(NakReceiverWindow) digest.get(sender);            if(win == null) {                if(warn) log.warn("NakReceiverWindow for sender " + sender +                                                       " not found. Creating new NakReceiverWindow starting at seqno=" + tmp_seqno);                win=new NakReceiverWindow(sender, tmp_seqno);                digest.put(sender, win);            }            // *************************************            // The header was removed before, so we add it again for the NakReceiverWindow. When there is a            // retransmission request, the header will already be attached to the message (both message and            // header are *copied* into delivered_msgs when a message is removed from NakReceiverWindow).            // *************************************            m.putHeader(getName(), hdr);            win.add(tmp_seqno, m);                if(log.isInfoEnabled()) log.info("receiver window for " + sender + " is " + win);            // Try to remove as many message as possible and send them up the stack            while((tmpmsg=win.remove()) != null) {                tmpmsg.removeHeader(getName()); // need to remove header again, so upper protocols don't get confused                passUp(new Event(Event.MSG, tmpmsg));            }            // Garbage collect messages if singleton member (because then we won't receive any gossips, triggering            // garbage collection)            if(members.size() == 1) {                tmp_seqno=Math.max(tmp_seqno - gc_lag, 0);                if(tmp_seqno <= 0) {                }                else {                    if(trace) log.trace("deleting messages < " + tmp_seqno + " from " + sender);                    win.stable(tmp_seqno);                }            }        }    }    /**     * Returns for each sender the 'highest seen' seqno from the digest. Highest seen means the     * highest seqno without any gaps, e.g. if for a sender P the messages 2 3 4 6 7 were received,     * then only 2, 3 and 4 can be delivered, so 4 is the highest seen. 6 and 7 cannot because there     * 5 is missing. If there are no message, the highest seen seqno is -1.     */    Digest getDigest() {        Digest ret=new Digest(digest.size());        long highest_seqno, lowest_seqno;        Address key;        NakReceiverWindow win;        for(Enumeration e=digest.keys(); e.hasMoreElements();) {            key=(Address) e.nextElement();            win=(NakReceiverWindow) digest.get(key);            lowest_seqno=win.getLowestSeen();            highest_seqno=win.getHighestSeen();            ret.add(key, lowest_seqno, highest_seqno);        }        if(log.isInfoEnabled()) log.info("digest is " + ret);        return ret;    }    /**     * Sets (or resets) the contents of the 'digest' table. Its current messages will be deleted and the     * NakReceiverTables reset.     */    void setDigest(Digest d) {        NakReceiverWindow win;        long tmp_seqno=1;        synchronized(digest) {            for(Enumeration e=digest.elements(); e.hasMoreElements();) {                win=(NakReceiverWindow) e.nextElement();                win.reset();            }            digest.clear();            Map.Entry entry;            Address sender;            org.jgroups.protocols.pbcast.Digest.Entry val;            for(Iterator it=d.senders.entrySet().iterator(); it.hasNext();) {                entry=(Map.Entry)it.next();                sender=(Address)entry.getKey();                if(sender == null) {                    if(log.isErrorEnabled()) log.error("cannot set item because sender is null");                    continue;                }                val=(org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue();                tmp_seqno=val.high_seqno;                digest.put(sender, new NakReceiverWindow(sender, tmp_seqno + 1)); // next to expect, digest had *last* seen !            }        }    }    String printDigest() {        long highest_seqno;        Address key;        NakReceiverWindow win;        StringBuffer sb=new StringBuffer();        for(Enumeration e=digest.keys(); e.hasMoreElements();) {            key=(Address) e.nextElement();            win=(NakReceiverWindow) digest.get(key);            highest_seqno=win.getHighestSeen();            sb.append(key + ": " + highest_seqno + '\n');        }        return sb.toString();    }    String printIncomingMessageQueue() {        StringBuffer sb=new StringBuffer();        NakReceiverWindow win;        win=(NakReceiverWindow) digest.get(local_addr);        sb.append(win);        return sb.toString();    }    void startGossipThread() {        if(gossip_thread == null) {            gossip_thread=new Thread(this);            gossip_thread.setDaemon(true);            gossip_thread.start();        }    }    void stopGossipThread() {        Thread tmp;        if(gossip_thread != null) {            if(gossip_thread.isAlive()) {                tmp=gossip_thread;                gossip_thread=null;                tmp.interrupt();                tmp=null;            }        }        gossip_thread=null;    }    void startGossipHandler() {        if(gossip_handler == null) {            gossip_handler=new GossipHandler(gossip_queue);            gossip_handler.start();        }    }    void stopGossipHandler() {        if(gossip_handler != null) {            gossip_handler.stop();            gossip_handler=null;        }    }    /**     * Send a gossip message with a message digest of the highest seqnos seen per sender to a subset     * of the current membership. Exclude self (I receive all mcasts sent by myself).     */    void sendGossip() {        Vector current_mbrs=(Vector) members.clone();        Vector subset_mbrs=null;        Gossip gossip=null;        Message msg;        Address dest;        PbcastHeader hdr;        if(local_addr != null)            current_mbrs.remove(local_addr); // don't pick myself        if(mcast_gossip) {  // send gossip to all members using a multicast            gossip=new Gossip(local_addr, gossip_round, getDigest().copy(), null); // not_seen list is null, prevents forwarding            for(int i=0; i < current_mbrs.size(); i++)  // all members have seen this gossip. Used for garbage collection                gossip.addToSeenList((Address) current_mbrs.elementAt(i));            hdr=new PbcastHeader(gossip, PbcastHeader.GOSSIP);            msg=new Message(); // null dest == multicast to all members            msg.putHeader(getName(), hdr);                if(log.isInfoEnabled()) log.info("(from " + local_addr +                           ") multicasting gossip " + gossip.shortForm() + " to all members");            passDown(new Event(Event.MSG, msg));        }        else {            subset_mbrs=Util.pickSubset(current_mbrs, subset);            for(int i=0; i < subset_mbrs.size(); i++) {                gossip=new Gossip(local_addr, gossip_round, getDigest().copy(), (Vector) current_mbrs.clone());                gossip.addToSeenList(local_addr);                hdr=new PbcastHeader(gossip, PbcastHeader.GOSSIP);                dest=(Address) subset_mbrs.elementAt(i);                msg=new Message(dest);                msg.putHeader(getName(), hdr);                    if(log.isInfoEnabled()) log.info("(from " + local_addr +                               ") sending gossip " + gossip.shortForm() + " to " + subset_mbrs);                passDown(new Event(Event.MSG, msg));            }        }        gossip_round++;    }    /**     * MOST IMPORTANT METHOD IN THIS CLASS !! This guy really decides how a gossip reaches all members,     * or whether it will flood the network !<p>     * Scrutinize the gossip received and request retransmission of messages that we haven't received yet.     * A gossip has a digest which carries for each sender the lowest and highest seqno seen. We check     * this range against our own digest and request retransmission of missing messages if needed.<br>     * <em>See DESIGN for a description of this method</em>     */    void handleGossip(Gossip gossip) {        long my_low=0, my_high=0, their_low, their_high;        Hashtable ht=null;        Digest their_digest;        NakReceiverWindow win;        Message msg;        Address dest;        Vector new_dests;        PbcastHeader hdr;        List missing_msgs; // list of missing messages (for retransmission) (List of Longs)        if(trace)            log.trace("(from " + local_addr + ") received gossip " + gossip.shortForm() + " from " + gossip.sender);        if(gossip == null || gossip.digest == null) {            if(warn) log.warn("gossip is null or digest is null");            return;        }        /* 1. If gossip sender is null, we cannot ask it for missing messages anyway, so discard gossip ! */        if(gossip.sender == null) {            if(log.isErrorEnabled()) log.error("sender of gossip is null; " +                                                 "don't know where to send XMIT_REQ to. Discarding gossip");            return;        }        /* 2. Don't process the gossip if the sender of the gossip is not a member anymore. If it is a newly           joined member, discard it as well (we can't tell the difference). When the new member will be           added to the membership, then its gossips will be processed */        if(!members.contains(gossip.sender)) {            if(warn) log.warn("sender " + gossip.sender +                                                " is not a member. Gossip will not be processed");            if(shun)                shunInvalidGossiper(gossip.sender);            return;        }        /* 3. If this gossip was received before, just discard it and return (don't process the           same gossip twice). This prevents flooding of the gossip sender with retransmission reqs */        while(gossip_list.size() >= max_gossip_cache) // first delete oldest gossips            gossip_list.removeFromHead();        if(gossip_list.contains(gossip))         // already received, don't re-broadcast            return;        else            gossip_list.add(gossip.copy());      // add to list of received gossips        /* 4. Send a HEARD_FROM event containing all members in the gossip-chain down to the FD layer.           This ensures that we don't suspect them */        seen_list=gossip.getSeenList();        if(seen_list.size() > 0)            passDown(new Event(Event.HEARD_FROM, seen_list.clone()));        /* 5. Compare their digest against ours. Find out if some messages in the their digest are           not in our digest. If yes, put them in the 'ht' hashtable for retransmission */        their_digest=gossip.digest;        Map.Entry entry;        Address sender;        org.jgroups.protocols.pbcast.Digest.Entry val;        for(Iterator it=their_digest.senders.entrySet().iterator(); it.hasNext();) {            entry=(Map.Entry)it.next();            sender=(Address)entry.getKey();            val=(org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue();            their_low=val.low_seqno;            their_high=val.high_seqno;            if(their_low == 0 && their_high == 0)                continue; // won't have any messages for this sender, don't even re-send            win=(NakReceiverWindow) digest.get(sender);            if(win == null) {                // this specific sender in this digest is probably not a member anymore, new digests                // won't contain it. for now, just ignore it. if it is a new member, it will be in the next                // gossips                    if(warn) log.warn("sender " + sender + " not found, skipping...");                continue;            }            my_low=win.getLowestSeen();            my_high=win.getHighestSeen();            if(my_high < their_high) {                // changed by Bela (June 26 2003) - replaced my_high with my_low (not tested though !)                if(my_low + 1 < their_low) {                }                else {                    missing_msgs=win.getMissingMessages(my_high, their_high);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -