jchannel.java

来自「JGRoups源码」· Java 代码 · 共 1,619 行 · 第 1/5 页

JAVA
1,619
字号
    }    public void enableStats(boolean stats) {        this.stats=stats;    }    public void resetStats() {        sent_msgs=received_msgs=sent_bytes=received_bytes=0;    }    public long getSentMessages() {return sent_msgs;}    public long getSentBytes() {return sent_bytes;}    public long getReceivedMessages() {return received_msgs;}    public long getReceivedBytes() {return received_bytes;}    public int  getNumberOfTasksInTimer() {return prot_stack != null ? prot_stack.timer.size() : -1;}    public String dumpTimerQueue() {        return prot_stack != null? prot_stack.dumpTimerQueue() : "<n/a";    }    /**     * Returns a pretty-printed form of all the protocols. If include_properties is set,     * the properties for each protocol will also be printed.     */    public String printProtocolSpec(boolean include_properties) {        return prot_stack != null ? prot_stack.printProtocolSpec(include_properties) : null;    }    /**     * Connects the channel to a group.     * If the channel is already connected, an error message will be printed to the error log.     * If the channel is closed a ChannelClosed exception will be thrown.     * This method starts the protocol stack by calling ProtocolStack.start,     * then it sends an Event.CONNECT event down the stack and waits to receive a CONNECT_OK event.     * Once the CONNECT_OK event arrives from the protocol stack, any channel listeners are notified     * and the channel is considered connected.     *     * @param cluster_name A <code>String</code> denoting the group name. Cannot be null.     * @exception ChannelException The protocol stack cannot be started     * @exception ChannelClosedException The channel is closed and therefore cannot be used any longer.     *                                   A new channel has to be created first.     */    public synchronized void connect(String cluster_name) throws ChannelException, ChannelClosedException {        /*make sure the channel is not closed*/        checkClosed();        /*if we already are connected, then ignore this*/        if(connected) {            if(log.isTraceEnabled()) log.trace("already connected to " + cluster_name);            return;        }        /*make sure we have a valid channel name*/        if(cluster_name == null) {            if(log.isInfoEnabled()) log.info("cluster_name is null, assuming unicast channel");        }        else            this.cluster_name=cluster_name;        try {            prot_stack.startStack(); // calls start() in all protocols, from top to bottom        }        catch(Throwable e) {            throw new ChannelException("failed to start protocol stack", e);        }        String tmp=Util.getProperty(new String[]{Global.CHANNEL_LOCAL_ADDR_TIMEOUT, "local_addr.timeout"},                                    null, null, false, "30000");        LOCAL_ADDR_TIMEOUT=Long.parseLong(tmp);		/* Wait LOCAL_ADDR_TIMEOUT milliseconds for local_addr to have a non-null value (set by SET_LOCAL_ADDRESS) */        local_addr=(Address)local_addr_promise.getResult(LOCAL_ADDR_TIMEOUT);        if(local_addr == null) {            log.fatal("local_addr is null; cannot connect");            throw new ChannelException("local_addr is null");        }        /*create a temporary view, assume this channel is the only member and         *is the coordinator*/        Vector t=new Vector(1);        t.addElement(local_addr);        my_view=new View(local_addr, 0, t);  // create a dummy view        // only connect if we are not a unicast channel        if(cluster_name != null) {            connect_promise.reset();            if(flush_supported)               flush_unblock_promise.reset();            Event connect_event=new Event(Event.CONNECT, cluster_name);            down(connect_event);            Object res=connect_promise.getResult();  // waits forever until connected (or channel is closed)            if(res != null && res instanceof Exception) { // the JOIN was rejected by the coordinator                throw new ChannelException("connect() failed", (Throwable)res);            }            //if FLUSH is used do not return from connect() until UNBLOCK event is received            boolean singletonMember = my_view != null && my_view.size() == 1;            boolean shouldWaitForUnblock = flush_supported && receive_blocks && !singletonMember && !flush_unblock_promise.hasResult();            if(shouldWaitForUnblock){               try{                  flush_unblock_promise.getResultWithTimeout(FLUSH_UNBLOCK_TIMEOUT);               }               catch (TimeoutException te){                  if(log.isWarnEnabled())                     log.warn("waiting on UNBLOCK after connect timed out");               }            }        }        connected=true;        notifyChannelConnected(this);    }    public synchronized boolean connect(String cluster_name, Address target, String state_id, long timeout) throws ChannelException {        throw new UnsupportedOperationException("not yet implemented");    }    /**     * Disconnects the channel if it is connected. If the channel is closed, this operation is ignored<BR>     * Otherwise the following actions happen in the listed order<BR>     * <ol>     * <li> The JChannel sends a DISCONNECT event down the protocol stack<BR>     * <li> Blocks until the channel to receives a DISCONNECT_OK event<BR>     * <li> Sends a STOP_QUEING event down the stack<BR>     * <li> Stops the protocol stack by calling ProtocolStack.stop()<BR>     * <li> Notifies the listener, if the listener is available<BR>     * </ol>     */    public synchronized void disconnect() {        if(closed) return;        if(connected) {            if(cluster_name != null) {                /* Send down a DISCONNECT event. The DISCONNECT event travels down to the GMS, where a                *  DISCONNECT_OK response is generated and sent up the stack. JChannel blocks until a                *  DISCONNECT_OK has been received, or until timeout has elapsed.                */                Event disconnect_event=new Event(Event.DISCONNECT, local_addr);                disconnect_promise.reset();                down(disconnect_event);   // DISCONNECT is handled by each layer                disconnect_promise.getResult(); // wait for DISCONNECT_OK            }            // Just in case we use the QUEUE protocol and it is still blocked...            down(new Event(Event.STOP_QUEUEING));            connected=false;            try {                prot_stack.stopStack(); // calls stop() in all protocols, from top to bottom            }            catch(Exception e) {                if(log.isErrorEnabled()) log.error("exception: " + e);            }            notifyChannelDisconnected(this);            init(); // sets local_addr=null; changed March 18 2003 (bela) -- prevented successful rejoining        }    }    /**     * Destroys the channel.     * After this method has been called, the channel us unusable.<BR>     * This operation will disconnect the channel and close the channel receive queue immediately<BR>     */    public synchronized void close() {        _close(true, true); // by default disconnect before closing channel and close mq    }    /** Shuts down the channel without disconnecting */    public synchronized void shutdown() {        _close(false, true); // by default disconnect before closing channel and close mq    }    /**     * Opens the channel.     * This does the following actions:     * <ol>     * <li> Resets the receiver queue by calling Queue.reset     * <li> Sets up the protocol stack by calling ProtocolStack.setup     * <li> Sets the closed flag to false     * </ol>     */    public synchronized void open() throws ChannelException {        if(!closed)            throw new ChannelException("channel is already open");        try {            mq.reset();            // new stack is created on open() - bela June 12 2003            prot_stack=new ProtocolStack(this, props);            prot_stack.setup();            closed=false;        }        catch(Exception e) {            throw new ChannelException("failed to open channel" , e);        }    }    /**     * returns true if the Open operation has been called successfully     */    public boolean isOpen() {        return !closed;    }    /**     * returns true if the Connect operation has been called successfully     */    public boolean isConnected() {        return connected;    }    public int getNumMessages() {        return mq != null? mq.size() : -1;    }    public String dumpQueue() {        return Util.dumpQueue(mq);    }    /**     * Returns a map of statistics of the various protocols and of the channel itself.     * @return Map<String,Map>. A map where the keys are the protocols ("channel" pseudo key is     * used for the channel itself") and the values are property maps.     */    public Map dumpStats() {        Map retval=prot_stack.dumpStats();        if(retval != null) {            Map tmp=dumpChannelStats();            if(tmp != null)                retval.put("channel", tmp);        }        return retval;    }    private Map dumpChannelStats() {        Map retval=new HashMap();        retval.put("sent_msgs", new Long(sent_msgs));        retval.put("sent_bytes", new Long(sent_bytes));        retval.put("received_msgs", new Long(received_msgs));        retval.put("received_bytes", new Long(received_bytes));        return retval;    }    /**     * Sends a message through the protocol stack.     * Implements the Transport interface.     *     * @param msg the message to be sent through the protocol stack,     *        the destination of the message is specified inside the message itself     * @exception ChannelNotConnectedException     * @exception ChannelClosedException     */    public void send(Message msg) throws ChannelNotConnectedException, ChannelClosedException {        checkClosed();        checkNotConnected();        if(stats) {            sent_msgs++;            sent_bytes+=msg.getLength();        }        if(msg == null)            throw new NullPointerException("msg is null");        down(new Event(Event.MSG, msg));    }    /**     * creates a new message with the destination address, and the source address     * and the object as the message value     * @param dst - the destination address of the message, null for all members     * @param src - the source address of the message     * @param obj - the value of the message     * @exception ChannelNotConnectedException     * @exception ChannelClosedException     * @see JChannel#send     */    public void send(Address dst, Address src, Serializable obj) throws ChannelNotConnectedException, ChannelClosedException {        send(new Message(dst, src, obj));    }    /**     * Blocking receive method.     * This method returns the object that was first received by this JChannel and that has not been     * received before. After the object is received, it is removed from the receive queue.<BR>     * If you only want to inspect the object received without removing it from the queue call     * JChannel.peek<BR>     * If no messages are in the receive queue, this method blocks until a message is added or the operation times out<BR>     * By specifying a timeout of 0, the operation blocks forever, or until a message has been received.     * @param timeout the number of milliseconds to wait if the receive queue is empty. 0 means wait forever     * @exception TimeoutException if a timeout occured prior to a new message was received     * @exception ChannelNotConnectedException     * @exception ChannelClosedException     * @see JChannel#peek     */    public Object receive(long timeout) throws ChannelNotConnectedException, ChannelClosedException, TimeoutException {        checkClosed();        checkNotConnected();        try {            Event evt=(timeout <= 0)? (Event)mq.remove() : (Event)mq.remove(timeout);            Object retval=getEvent(evt);            evt=null;            if(stats) {                if(retval != null && retval instanceof Message) {                    received_msgs++;                    received_bytes+=((Message)retval).getLength();                }            }            return retval;

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?