📄 total.java
字号:
} } reqMsg=new Message(sequencerAddr, addr, new byte[0]); reqMsg.putHeader(getName(), new Header(Header.REQ, seqID, NULL_ID)); passDown(new Event(Event.MSG, reqMsg)); } /** * Receive a unicast message: Remove the <code>UCAST</code> header * * @param msg the received unicast message */ private void _recvUcast(Message msg) { msg.removeHeader(getName()); } /** * Receive a broadcast message: Put it in the pending up queue and then * try to deliver above as many messages as possible * * @param msg the received broadcast message */ private void _recvBcast(Message msg) { Header header=(Header)msg.getHeader(getName()); // i. Put the message in the up pending queue only if it's not // already there, as it seems that the event may be received // multiple times before a view change when all members are // negotiating a common set of stable msgs // // ii. Deliver as many messages as possible synchronized(upTbl) { if(header.sequenceID <= seqID) return; upTbl.put(new Long(header.sequenceID), msg); } _deliverBcast(); } /** * Received a bcast request - Ignore if not the sequencer, else send a * bcast reply * * @param msg the broadcast request message */ private void _recvBcastRequest(Message msg) { Header header; Message repMsg; // i. If blocked, discard the bcast request // ii. Assign a seqID to the message and send it back to the requestor if(!addr.equals(sequencerAddr)) { if(log.isErrorEnabled()) log.error("Received bcast request " + "but not a sequencer"); return; } if(state == BLOCK) { if(log.isInfoEnabled()) log.info("Blocked, discard bcast req"); return; } header=(Header)msg.getHeader(getName()); ++sequencerSeqID; repMsg=new Message(msg.getSrc(), addr, new byte[0]); repMsg.putHeader(getName(), new Header(Header.REP, header.localSequenceID, sequencerSeqID)); passDown(new Event(Event.MSG, repMsg)); } /** * Received a bcast reply - Match with the pending bcast request and move * the message in the list of messages to be delivered above * * @param header the header of the bcast reply */ private void _recvBcastReply(Header header) { Message msg; long id; // i. If blocked, discard the bcast reply // // ii. Assign the received seqID to the message and broadcast it // // iii. // - Acknowledge the message to the retransmitter // - If non-existent BCAST_REQ, send a fake bcast to avoid seqID gaps // - If localID == NULL_ID, it's a null BCAST, else normal BCAST // - Set the seq ID of the message to the one sent by the sequencer if(state == BLOCK) { if(log.isInfoEnabled()) log.info("Blocked, discard bcast rep"); return; } synchronized(reqTbl) { msg=(Message)reqTbl.remove(new Long(header.localSequenceID)); } if(msg != null) { retransmitter.ack(header.localSequenceID); id=header.localSequenceID; } else { if(log.isInfoEnabled()) log.info("Bcast reply to " + "non-existent BCAST_REQ[" + header.localSequenceID + "], Sending NULL bcast"); id=NULL_ID; msg=new Message(null, addr, new byte[0]); } msg.putHeader(getName(), new Header(Header.BCAST, id, header.sequenceID)); passDown(new Event(Event.MSG, msg)); } /** * Resend the bcast request with the given localSeqID * * @param seqID the local sequence id of the */ private void _retransmitBcastRequest(long seqID) { // *** Get a shared lock try { stateLock.readLock().acquire(); try { if(log.isInfoEnabled()) log.info("Retransmit BCAST_REQ[" + seqID + ']'); _transmitBcastRequest(seqID); } finally { stateLock.readLock().release(); } } catch(InterruptedException e) { log.error("failed acquiring a read lock", e); } } /* Up event handlers * If the return value is true the event travels further up the stack * else it won't be forwarded */ /** * Prepare for a VIEW_CHANGE: switch to flushing state * * @return true if the event is to be forwarded further up */ private boolean _upBlock() { // *** Get an exclusive lock try { stateLock.writeLock().acquire(); try { state=FLUSH; // *** Revoke the exclusive lock } finally { stateLock.writeLock().release(); } } catch(InterruptedException e) { log.error("failed acquiring the write lock", e); } return (true); } /** * Handle an up MSG event * * @param event the MSG event * @return true if the event is to be forwarded further up */ private boolean _upMsg(Event event) { Message msg; Object obj; Header header; // *** Get a shared lock try { stateLock.readLock().acquire(); try { // If NULL_STATE, shouldn't receive any msg on the up queue! if(state == NULL_STATE) { if(log.isErrorEnabled()) log.error("Up msg in NULL_STATE"); return (false); } // Peek the header: // // (UCAST) A unicast message - Send up the stack // (BCAST) A broadcast message - Handle specially // (REQ) A broadcast request - Handle specially // (REP) A broadcast reply from the sequencer - Handle specially msg=(Message)event.getArg(); if(!((obj=msg.getHeader(getName())) instanceof TOTAL.Header)) { if(log.isErrorEnabled()) log.error("No TOTAL.Header found"); return (false); } header=(Header)obj; switch(header.type) { case Header.UCAST: _recvUcast(msg); return (true); case Header.BCAST: _recvBcast(msg); return (false); case Header.REQ: _recvBcastRequest(msg); return (false); case Header.REP: _recvBcastReply(header); return (false); default: if(log.isErrorEnabled()) log.error("Unknown header type"); return (false); } // ** Revoke the shared lock } finally { stateLock.readLock().release(); } } catch(InterruptedException e) { if(log.isErrorEnabled()) log.error(e.getMessage()); } return (true); } /** * Set the address of this group member * * @param event the SET_LOCAL_ADDRESS event * @return true if event should be forwarded further up */ private boolean _upSetLocalAddress(Event event) { // *** Get an exclusive lock try { stateLock.writeLock().acquire(); try { addr=(Address)event.getArg(); } finally { stateLock.writeLock().release(); } } catch(InterruptedException e) { log.error(e.getMessage()); } return (true); } /** * Handle view changes * <p/> * param event the VIEW_CHANGE event * * @return true if the event should be forwarded to the layer above */ private boolean _upViewChange(Event event) { Object oldSequencerAddr; // *** Get an exclusive lock try { stateLock.writeLock().acquire(); try { state=RUN; // i. See if this member is the sequencer // ii. If this is the sequencer, reset the sequencer's sequence ID // iii. Reset the last received sequence ID // // iv. Replay undelivered bcasts: Put all the undelivered bcasts // sent by us back to the req queue and discard the rest oldSequencerAddr=sequencerAddr; sequencerAddr= (Address)((View)event.getArg()).getMembers().elementAt(0); if(addr.equals(sequencerAddr)) { sequencerSeqID=NULL_ID; if((oldSequencerAddr == null) || (!addr.equals(oldSequencerAddr))) if(log.isInfoEnabled()) log.info("I'm the new sequencer"); } seqID=NULL_ID; _replayBcast(); // *** Revoke the exclusive lock } finally { stateLock.writeLock().release(); } } catch(InterruptedException e) { log.error(e.getMessage()); } return (true); } /* * Down event handlers * If the return value is true the event travels further down the stack * else it won't be forwarded */ /** * Blocking confirmed - No messages should come from above until a * VIEW_CHANGE event is received. Switch to blocking state. * * @return true if event should travel further down */ private boolean downBlockOk() { // *** Get an exclusive lock try { stateLock.writeLock().acquire(); try { state=BLOCK; } finally { stateLock.writeLock().release(); } } catch(InterruptedException e) { log.error(e.getMessage()); } return (true); } /** * A MSG event travelling down the stack. Forward unicast messages, treat * specially the broadcast messages.<br> * <p/> * If in <code>BLOCK</code> state, i.e. it has replied to a * <code>BLOCk_OK</code> and hasn't yet received a * <code>VIEW_CHANGE</code> event, messages are discarded<br> * <p/> * If in <code>FLUSH</code> state, forward unicast but queue broadcasts * * @param event the MSG event * @return true if event should travel further down */ private boolean _downMsg(Event event) { Message msg; // *** Get a shared lock try { stateLock.readLock().acquire(); try { // i. Discard all msgs, if in NULL_STATE // ii. Discard all msgs, if blocked if(state == NULL_STATE) { if(log.isErrorEnabled()) log.error("Discard msg in NULL_STATE"); return (false); } if(state == BLOCK) { if(log.isErrorEnabled()) log.error("Blocked, discard msg"); return (false); } msg=(Message)event.getArg(); if(msg.getDest() == null) { _sendBcastRequest(msg); return (false); } else { msg=_sendUcast(msg); event.setArg(msg); } // ** Revoke the shared lock } finally { stateLock.readLock().release(); } } catch(InterruptedException e) { log.error(e.getMessage()); } return (true); } /** * Prepare this layer to receive messages from above */ public void start() throws Exception { TimeScheduler timer; timer=stack != null ? stack.timer : null; if(timer == null) throw new Exception("TOTAL.start(): timer is null"); reqTbl=new TreeMap(); upTbl=new TreeMap(); retransmitter=new AckSenderWindow(new Command(), AVG_RETRANSMIT_INTERVAL); } /** * Handle the stop() method travelling down the stack. * <p/> * The local addr is set to null, since after a Start->Stop->Start * sequence this member's addr is not guaranteed to be the same */ public void stop() { try { stateLock.writeLock().acquire(); try { state=NULL_STATE; retransmitter.reset(); reqTbl.clear(); upTbl.clear(); addr=null; } finally { stateLock.writeLock().release(); } } catch(InterruptedException e) { log.error(e.getMessage()); } } /** * Process an event coming from the layer below * * @param event the event to process */ public void up(Event event) { switch(event.getType()) { case Event.BLOCK: if(!_upBlock()) return; break; case Event.MSG: if(!_upMsg(event)) return; break; case Event.SET_LOCAL_ADDRESS: if(!_upSetLocalAddress(event)) return; break; case Event.VIEW_CHANGE: if(!_upViewChange(event)) return; break; default: break; } passUp(event); } /** * Process an event coming from the layer above * * @param event the event to process */ public void down(Event event) { switch(event.getType()) { case Event.BLOCK_OK: if(!downBlockOk()) return; break; case Event.MSG: if(!_downMsg(event)) return; break; default: break; } passDown(event); } /** * Create the TOTAL layer */ public TOTAL() { }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -