⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 total.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
            }        }        reqMsg=new Message(sequencerAddr, addr, new byte[0]);        reqMsg.putHeader(getName(), new Header(Header.REQ, seqID, NULL_ID));        passDown(new Event(Event.MSG, reqMsg));    }    /**     * Receive a unicast message: Remove the <code>UCAST</code> header     *     * @param msg the received unicast message     */    private void _recvUcast(Message msg) {        msg.removeHeader(getName());    }    /**     * Receive a broadcast message: Put it in the pending up queue and then     * try to deliver above as many messages as possible     *     * @param msg the received broadcast message     */    private void _recvBcast(Message msg) {        Header header=(Header)msg.getHeader(getName());        // i. Put the message in the up pending queue only if it's not        // already there, as it seems that the event may be received        // multiple times before a view change when all members are        // negotiating a common set of stable msgs        //        // ii. Deliver as many messages as possible        synchronized(upTbl) {            if(header.sequenceID <= seqID)                return;            upTbl.put(new Long(header.sequenceID), msg);        }        _deliverBcast();    }    /**     * Received a bcast request - Ignore if not the sequencer, else send a     * bcast reply     *     * @param msg the broadcast request message     */    private void _recvBcastRequest(Message msg) {        Header header;        Message repMsg;        // i. If blocked, discard the bcast request        // ii. Assign a seqID to the message and send it back to the requestor        if(!addr.equals(sequencerAddr)) {            if(log.isErrorEnabled())                log.error("Received bcast request " +                          "but not a sequencer");            return;        }        if(state == BLOCK) {            if(log.isInfoEnabled()) log.info("Blocked, discard bcast req");            return;        }        header=(Header)msg.getHeader(getName());        ++sequencerSeqID;        repMsg=new Message(msg.getSrc(), addr, new byte[0]);        repMsg.putHeader(getName(), new Header(Header.REP, header.localSequenceID,                                               sequencerSeqID));        passDown(new Event(Event.MSG, repMsg));    }    /**     * Received a bcast reply - Match with the pending bcast request and move     * the message in the list of messages to be delivered above     *     * @param header the header of the bcast reply     */    private void _recvBcastReply(Header header) {        Message msg;        long id;        // i. If blocked, discard the bcast reply        //        // ii. Assign the received seqID to the message and broadcast it        //        // iii.        // - Acknowledge the message to the retransmitter        // - If non-existent BCAST_REQ, send a fake bcast to avoid seqID gaps        // - If localID == NULL_ID, it's a null BCAST, else normal BCAST        // - Set the seq ID of the message to the one sent by the sequencer        if(state == BLOCK) {            if(log.isInfoEnabled()) log.info("Blocked, discard bcast rep");            return;        }        synchronized(reqTbl) {            msg=(Message)reqTbl.remove(new Long(header.localSequenceID));        }        if(msg != null) {            retransmitter.ack(header.localSequenceID);            id=header.localSequenceID;        }        else {            if(log.isInfoEnabled())                log.info("Bcast reply to " +                         "non-existent BCAST_REQ[" + header.localSequenceID +                         "], Sending NULL bcast");            id=NULL_ID;            msg=new Message(null, addr, new byte[0]);        }        msg.putHeader(getName(), new Header(Header.BCAST, id, header.sequenceID));        passDown(new Event(Event.MSG, msg));    }    /**     * Resend the bcast request with the given localSeqID     *     * @param seqID the local sequence id of the     */    private void _retransmitBcastRequest(long seqID) {        // *** Get a shared lock        try {            stateLock.readLock().acquire();            try {                if(log.isInfoEnabled()) log.info("Retransmit BCAST_REQ[" + seqID + ']');                _transmitBcastRequest(seqID);            }            finally {                stateLock.readLock().release();            }        }        catch(InterruptedException e) {            log.error("failed acquiring a read lock", e);        }    }    /* Up event handlers     * If the return value is true the event travels further up the stack     * else it won't be forwarded     */    /**     * Prepare for a VIEW_CHANGE: switch to flushing state     *     * @return true if the event is to be forwarded further up     */    private boolean _upBlock() {        // *** Get an exclusive lock        try {            stateLock.writeLock().acquire();            try {                state=FLUSH;                // *** Revoke the exclusive lock            }            finally {                stateLock.writeLock().release();            }        }        catch(InterruptedException e) {            log.error("failed acquiring the write lock", e);        }        return (true);    }    /**     * Handle an up MSG event     *     * @param event the MSG event     * @return true if the event is to be forwarded further up     */    private boolean _upMsg(Event event) {        Message msg;        Object obj;        Header header;        // *** Get a shared lock        try {            stateLock.readLock().acquire();            try {                // If NULL_STATE, shouldn't receive any msg on the up queue!                if(state == NULL_STATE) {                    if(log.isErrorEnabled()) log.error("Up msg in NULL_STATE");                    return (false);                }                // Peek the header:                //                // (UCAST) A unicast message - Send up the stack                // (BCAST) A broadcast message - Handle specially                // (REQ) A broadcast request - Handle specially                // (REP) A broadcast reply from the sequencer - Handle specially                msg=(Message)event.getArg();                if(!((obj=msg.getHeader(getName())) instanceof TOTAL.Header)) {                    if(log.isErrorEnabled()) log.error("No TOTAL.Header found");                    return (false);                }                header=(Header)obj;                switch(header.type) {                case Header.UCAST:                    _recvUcast(msg);                    return (true);                case Header.BCAST:                    _recvBcast(msg);                    return (false);                case Header.REQ:                    _recvBcastRequest(msg);                    return (false);                case Header.REP:                    _recvBcastReply(header);                    return (false);                default:                    if(log.isErrorEnabled()) log.error("Unknown header type");                    return (false);                }                // ** Revoke the shared lock            }            finally {                stateLock.readLock().release();            }        }        catch(InterruptedException e) {            if(log.isErrorEnabled()) log.error(e.getMessage());        }        return (true);    }    /**     * Set the address of this group member     *     * @param event the SET_LOCAL_ADDRESS event     * @return true if event should be forwarded further up     */    private boolean _upSetLocalAddress(Event event) {        // *** Get an exclusive lock        try {            stateLock.writeLock().acquire();            try {                addr=(Address)event.getArg();            }            finally {                stateLock.writeLock().release();            }        }        catch(InterruptedException e) {            log.error(e.getMessage());        }        return (true);    }    /**     * Handle view changes     * <p/>     * param event the VIEW_CHANGE event     *     * @return true if the event should be forwarded to the layer above     */    private boolean _upViewChange(Event event) {        Object oldSequencerAddr;        // *** Get an exclusive lock        try {            stateLock.writeLock().acquire();            try {                state=RUN;                // i. See if this member is the sequencer                // ii. If this is the sequencer, reset the sequencer's sequence ID                // iii. Reset the last received sequence ID                //                // iv. Replay undelivered bcasts: Put all the undelivered bcasts                // sent by us back to the req queue and discard the rest                oldSequencerAddr=sequencerAddr;                sequencerAddr=                        (Address)((View)event.getArg()).getMembers().elementAt(0);                if(addr.equals(sequencerAddr)) {                    sequencerSeqID=NULL_ID;                    if((oldSequencerAddr == null) ||                            (!addr.equals(oldSequencerAddr)))                        if(log.isInfoEnabled()) log.info("I'm the new sequencer");                }                seqID=NULL_ID;                _replayBcast();                // *** Revoke the exclusive lock            }            finally {                stateLock.writeLock().release();            }        }        catch(InterruptedException e) {            log.error(e.getMessage());        }        return (true);    }    /*     * Down event handlers     * If the return value is true the event travels further down the stack     * else it won't be forwarded     */    /**     * Blocking confirmed - No messages should come from above until a     * VIEW_CHANGE event is received. Switch to blocking state.     *     * @return true if event should travel further down     */    private boolean downBlockOk() {        // *** Get an exclusive lock        try {            stateLock.writeLock().acquire();            try {                state=BLOCK;            }            finally {                stateLock.writeLock().release();            }        }        catch(InterruptedException e) {            log.error(e.getMessage());        }        return (true);    }    /**     * A MSG event travelling down the stack. Forward unicast messages, treat     * specially the broadcast messages.<br>     * <p/>     * If in <code>BLOCK</code> state, i.e. it has replied to a     * <code>BLOCk_OK</code> and hasn't yet received a     * <code>VIEW_CHANGE</code> event, messages are discarded<br>     * <p/>     * If in <code>FLUSH</code> state, forward unicast but queue broadcasts     *     * @param event the MSG event     * @return true if event should travel further down     */    private boolean _downMsg(Event event) {        Message msg;        // *** Get a shared lock        try {            stateLock.readLock().acquire();            try {                // i. Discard all msgs, if in NULL_STATE                // ii. Discard all msgs, if blocked                if(state == NULL_STATE) {                    if(log.isErrorEnabled()) log.error("Discard msg in NULL_STATE");                    return (false);                }                if(state == BLOCK) {                    if(log.isErrorEnabled()) log.error("Blocked, discard msg");                    return (false);                }                msg=(Message)event.getArg();                if(msg.getDest() == null) {                    _sendBcastRequest(msg);                    return (false);                }                else {                    msg=_sendUcast(msg);                    event.setArg(msg);                }                // ** Revoke the shared lock            }            finally {                stateLock.readLock().release();            }        }        catch(InterruptedException e) {            log.error(e.getMessage());        }        return (true);    }    /**     * Prepare this layer to receive messages from above     */    public void start() throws Exception {        TimeScheduler timer;        timer=stack != null ? stack.timer : null;        if(timer == null)            throw new Exception("TOTAL.start(): timer is null");        reqTbl=new TreeMap();        upTbl=new TreeMap();        retransmitter=new AckSenderWindow(new Command(), AVG_RETRANSMIT_INTERVAL);    }    /**     * Handle the stop() method travelling down the stack.     * <p/>     * The local addr is set to null, since after a Start->Stop->Start     * sequence this member's addr is not guaranteed to be the same     */    public void stop() {        try {            stateLock.writeLock().acquire();            try {                state=NULL_STATE;                retransmitter.reset();                reqTbl.clear();                upTbl.clear();                addr=null;            }            finally {                stateLock.writeLock().release();            }        }        catch(InterruptedException e) {            log.error(e.getMessage());        }    }    /**     * Process an event coming from the layer below     *     * @param event the event to process     */    public void up(Event event) {        switch(event.getType()) {        case Event.BLOCK:            if(!_upBlock()) return;            break;        case Event.MSG:            if(!_upMsg(event)) return;            break;        case Event.SET_LOCAL_ADDRESS:            if(!_upSetLocalAddress(event)) return;            break;        case Event.VIEW_CHANGE:            if(!_upViewChange(event)) return;            break;        default:            break;        }        passUp(event);    }    /**     * Process an event coming from the layer above     *     * @param event the event to process     */    public void down(Event event) {        switch(event.getType()) {        case Event.BLOCK_OK:            if(!downBlockOk()) return;            break;        case Event.MSG:            if(!_downMsg(event)) return;            break;        default:            break;        }        passDown(event);    }    /**     * Create the TOTAL layer     */    public TOTAL() {    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -