📄 nakack.java
字号:
sb.append("\nMissing messages received\n"); MissingMessage missing; for(Enumeration en=receive_history.elements(); en.hasMoreElements();) { missing=(MissingMessage)en.nextElement(); sb.append(missing).append("\n"); } return sb.toString(); } public Vector providedUpServices() { Vector retval=new Vector(5); retval.addElement(new Integer(Event.GET_DIGEST)); retval.addElement(new Integer(Event.GET_DIGEST_STABLE)); retval.addElement(new Integer(Event.GET_DIGEST_STATE)); retval.addElement(new Integer(Event.SET_DIGEST)); retval.addElement(new Integer(Event.MERGE_DIGEST)); return retval; } public Vector providedDownServices() { Vector retval=new Vector(2); retval.addElement(new Integer(Event.GET_DIGEST)); retval.addElement(new Integer(Event.GET_DIGEST_STABLE)); return retval; } public void start() throws Exception { timer=stack != null ? stack.timer : null; if(timer == null) throw new Exception("timer is null"); started=true; } public void stop() { started=false; reset(); // clears sent_msgs and destroys all NakReceiverWindows } /** * <b>Callback</b>. Called by superclass when event may be handled.<p> <b>Do not use <code>passDown()</code> in this * method as the event is passed down by default by the superclass after this method returns !</b> */ public void down(Event evt) { Digest digest; Vector mbrs; switch(evt.getType()) { case Event.MSG: Message msg=(Message)evt.getArg(); Address dest=msg.getDest(); if(dest != null && !dest.isMulticastAddress()) { break; // unicast address: not null and not mcast, pass down unchanged } send(evt, msg); return; // don't pass down the stack case Event.STABLE: // generated by STABLE layer. Delete stable messages passed in arg stable((Digest)evt.getArg()); return; // do not pass down further (Bela Aug 7 2001) case Event.GET_DIGEST: digest=getDigest(); passUp(new Event(Event.GET_DIGEST_OK, digest != null ? digest.copy() : null)); return; case Event.GET_DIGEST_STABLE: digest=getDigestHighestDeliveredMsgs(); passUp(new Event(Event.GET_DIGEST_STABLE_OK, digest != null ? digest.copy() : null)); return; case Event.GET_DIGEST_STATE: digest=getDigest(); passUp(new Event(Event.GET_DIGEST_STATE_OK, digest != null ? digest.copy() : null)); return; case Event.SET_DIGEST: setDigest((Digest)evt.getArg()); return; case Event.MERGE_DIGEST: mergeDigest((Digest)evt.getArg()); return; case Event.CONFIG: passDown(evt); if(log.isDebugEnabled()) { log.debug("received CONFIG event: " + evt.getArg()); } handleConfigEvent((HashMap)evt.getArg()); return; case Event.TMP_VIEW: View tmp_view=(View)evt.getArg(); mbrs=tmp_view.getMembers(); members.clear(); members.addAll(mbrs); adjustReceivers(false); break; case Event.VIEW_CHANGE: tmp_view=(View)evt.getArg(); mbrs=tmp_view.getMembers(); members.clear(); members.addAll(mbrs); adjustReceivers(true); is_server=true; // check vids from now on Set tmp=new LinkedHashSet(members); tmp.add(null); // for null destination (= mcast) sent.keySet().retainAll(tmp); received.keySet().retainAll(tmp); view=tmp_view; break; case Event.BECOME_SERVER: is_server=true; break; case Event.DISCONNECT: leaving=true; reset(); break; } passDown(evt); } /** * <b>Callback</b>. Called by superclass when event may be handled.<p> <b>Do not use <code>PassUp</code> in this * method as the event is passed up by default by the superclass after this method returns !</b> */ public void up(Event evt) { NakAckHeader hdr; Message msg; Digest digest; switch(evt.getType()) { case Event.MSG: msg=(Message)evt.getArg(); hdr=(NakAckHeader)msg.getHeader(name); if(hdr == null) break; // pass up (e.g. unicast msg) // discard messages while not yet server (i.e., until JOIN has returned) if(!is_server) { if(trace) log.trace("message was discarded (not yet server)"); return; } // Changed by bela Jan 29 2003: we must not remove the header, otherwise // further xmit requests will fail ! //hdr=(NakAckHeader)msg.removeHeader(getName()); switch(hdr.type) { case NakAckHeader.MSG: handleMessage(msg, hdr); return; // transmitter passes message up for us ! case NakAckHeader.XMIT_REQ: if(hdr.range == null) { if(log.isErrorEnabled()) { log.error("XMIT_REQ: range of xmit msg is null; discarding request from " + msg.getSrc()); } return; } handleXmitReq(msg.getSrc(), hdr.range.low, hdr.range.high, hdr.sender); return; case NakAckHeader.XMIT_RSP: if(trace) log.trace("received missing messages " + hdr.range); handleXmitRsp(msg); return; default: if(log.isErrorEnabled()) { log.error("NakAck header type " + hdr.type + " not known !"); } return; } case Event.STABLE: // generated by STABLE layer. Delete stable messages passed in arg stable((Digest)evt.getArg()); return; // do not pass up further (Bela Aug 7 2001) case Event.GET_DIGEST: digest=getDigestHighestDeliveredMsgs(); passDown(new Event(Event.GET_DIGEST_OK, digest)); return; case Event.GET_DIGEST_STABLE: digest=getDigestHighestDeliveredMsgs(); passDown(new Event(Event.GET_DIGEST_STABLE_OK, digest)); return; case Event.SET_LOCAL_ADDRESS: local_addr=(Address)evt.getArg(); break; case Event.CONFIG: passUp(evt); if(log.isDebugEnabled()) { log.debug("received CONFIG event: " + evt.getArg()); } handleConfigEvent((HashMap)evt.getArg()); return; } passUp(evt); } /* --------------------------------- Private Methods --------------------------------------- */ /** * Adds the message to the sent_msgs table and then passes it down the stack. Change Bela Ban May 26 2002: we don't * store a copy of the message, but a reference ! This saves us a lot of memory. However, this also means that a * message should not be changed after storing it in the sent-table ! See protocols/DESIGN for details. * Made seqno increment and adding to sent_msgs atomic, e.g. seqno won't get incremented if adding to * sent_msgs fails e.g. due to an OOM (see http://jira.jboss.com/jira/browse/JGRP-179). bela Jan 13 2006 */ private void send(Event evt, Message msg) { if(msg == null) throw new NullPointerException("msg is null; event is " + evt); if(!started) { if(warn) log.warn("[" + local_addr + "] discarded message as start() has not been called, message: " + msg); return; } long msg_id; synchronized(sent_msgs) { try { // incrementing seqno and adding the msg to sent_msgs needs to be atomic msg_id=seqno +1; msg.putHeader(name, new NakAckHeader(NakAckHeader.MSG, msg_id)); if(Global.copy) { sent_msgs.put(new Long(msg_id), msg.copy()); } else { sent_msgs.put(new Long(msg_id), msg); } seqno=msg_id; } catch(Throwable t) { if(t instanceof Error) throw (Error)t; if(t instanceof RuntimeException) throw (RuntimeException)t; else { throw new RuntimeException("failure adding msg " + msg + " to the retransmit table", t); } } } try { // moved passDown() out of synchronized clause (bela Sept 7 2006) http://jira.jboss.com/jira/browse/JGRP-300 if(trace) log.trace("sending " + local_addr + "#" + msg_id); passDown(evt); // if this fails, since msg is in sent_msgs, it can be retransmitted } catch(Throwable t) { // eat the exception, don't pass it up the stack if(warn) { log.warn("failure passing message down", t); } } } /** * Finds the corresponding NakReceiverWindow and adds the message to it (according to seqno). Then removes as many * messages as possible from the NRW and passes them up the stack. Discards messages from non-members. */ private void handleMessage(Message msg, NakAckHeader hdr) { NakReceiverWindow win; Message msg_to_deliver; Address sender=msg.getSrc(); if(sender == null) { if(log.isErrorEnabled()) log.error("sender of message is null"); return; } if(trace) { StringBuffer sb=new StringBuffer('['); sb.append(local_addr).append(": received ").append(sender).append('#').append(hdr.seqno); log.trace(sb.toString()); } // msg is potentially re-sent later as result of XMIT_REQ reception; that's why hdr is added ! // Changed by bela Jan 29 2003: we currently don't resend from received msgs, just from sent_msgs ! // msg.putHeader(getName(), hdr); synchronized(received_msgs) { win=(NakReceiverWindow)received_msgs.get(sender); } if(win == null) { // discard message if there is no entry for sender if(leaving) return; if(warn) { StringBuffer sb=new StringBuffer('['); sb.append(local_addr).append("] discarded message from non-member ") .append(sender).append(", my view is " ).append(this.view); log.warn(sb); } return; } win.add(hdr.seqno, msg); // add in order, then remove and pass up as many msgs as possible // Prevents concurrent passing up of messages by different threads (http://jira.jboss.com/jira/browse/JGRP-198); // this is all the more important once we have a threadless stack (http://jira.jboss.com/jira/browse/JGRP-181), // where lots of threads can come up to this point concurrently, but only 1 is allowed to pass at a time // We *can* deliver messages from *different* senders concurrently, e.g. reception of P1, Q1, P2, Q2 can result in // delivery of P1, Q1, Q2, P2: FIFO (implemented by NAKACK) says messages need to be delivered only in the // order in which they were sent by the sender synchronized(win) { while((msg_to_deliver=win.remove()) != null) { // Changed by bela Jan 29 2003: not needed (see above) //msg_to_deliver.removeHeader(getName()); passUp(new Event(Event.MSG, msg_to_deliver)); } } } /** * Retransmit from sent-table, called when XMIT_REQ is received. Bundles all messages to be xmitted into one large * message and sends them back with an XMIT_RSP header. Note that since we cannot count on a fragmentation layer * below us, we have to make sure the message doesn't exceed max_xmit_size bytes. If this is the case, we split the * message into multiple, smaller-chunked messages. But in most cases this still yields fewer messages than if each * requested message was retransmitted separately.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -