jchannel.java
来自「JGRoups源码」· Java 代码 · 共 1,619 行 · 第 1/5 页
JAVA
1,619 行
} public void enableStats(boolean stats) { this.stats=stats; } public void resetStats() { sent_msgs=received_msgs=sent_bytes=received_bytes=0; } public long getSentMessages() {return sent_msgs;} public long getSentBytes() {return sent_bytes;} public long getReceivedMessages() {return received_msgs;} public long getReceivedBytes() {return received_bytes;} public int getNumberOfTasksInTimer() {return prot_stack != null ? prot_stack.timer.size() : -1;} public String dumpTimerQueue() { return prot_stack != null? prot_stack.dumpTimerQueue() : "<n/a"; } /** * Returns a pretty-printed form of all the protocols. If include_properties is set, * the properties for each protocol will also be printed. */ public String printProtocolSpec(boolean include_properties) { return prot_stack != null ? prot_stack.printProtocolSpec(include_properties) : null; } /** * Connects the channel to a group. * If the channel is already connected, an error message will be printed to the error log. * If the channel is closed a ChannelClosed exception will be thrown. * This method starts the protocol stack by calling ProtocolStack.start, * then it sends an Event.CONNECT event down the stack and waits to receive a CONNECT_OK event. * Once the CONNECT_OK event arrives from the protocol stack, any channel listeners are notified * and the channel is considered connected. * * @param cluster_name A <code>String</code> denoting the group name. Cannot be null. * @exception ChannelException The protocol stack cannot be started * @exception ChannelClosedException The channel is closed and therefore cannot be used any longer. * A new channel has to be created first. */ public synchronized void connect(String cluster_name) throws ChannelException, ChannelClosedException { /*make sure the channel is not closed*/ checkClosed(); /*if we already are connected, then ignore this*/ if(connected) { if(log.isTraceEnabled()) log.trace("already connected to " + cluster_name); return; } /*make sure we have a valid channel name*/ if(cluster_name == null) { if(log.isInfoEnabled()) log.info("cluster_name is null, assuming unicast channel"); } else this.cluster_name=cluster_name; try { prot_stack.startStack(); // calls start() in all protocols, from top to bottom } catch(Throwable e) { throw new ChannelException("failed to start protocol stack", e); } String tmp=Util.getProperty(new String[]{Global.CHANNEL_LOCAL_ADDR_TIMEOUT, "local_addr.timeout"}, null, null, false, "30000"); LOCAL_ADDR_TIMEOUT=Long.parseLong(tmp); /* Wait LOCAL_ADDR_TIMEOUT milliseconds for local_addr to have a non-null value (set by SET_LOCAL_ADDRESS) */ local_addr=(Address)local_addr_promise.getResult(LOCAL_ADDR_TIMEOUT); if(local_addr == null) { log.fatal("local_addr is null; cannot connect"); throw new ChannelException("local_addr is null"); } /*create a temporary view, assume this channel is the only member and *is the coordinator*/ Vector t=new Vector(1); t.addElement(local_addr); my_view=new View(local_addr, 0, t); // create a dummy view // only connect if we are not a unicast channel if(cluster_name != null) { connect_promise.reset(); if(flush_supported) flush_unblock_promise.reset(); Event connect_event=new Event(Event.CONNECT, cluster_name); down(connect_event); Object res=connect_promise.getResult(); // waits forever until connected (or channel is closed) if(res != null && res instanceof Exception) { // the JOIN was rejected by the coordinator throw new ChannelException("connect() failed", (Throwable)res); } //if FLUSH is used do not return from connect() until UNBLOCK event is received boolean singletonMember = my_view != null && my_view.size() == 1; boolean shouldWaitForUnblock = flush_supported && receive_blocks && !singletonMember && !flush_unblock_promise.hasResult(); if(shouldWaitForUnblock){ try{ flush_unblock_promise.getResultWithTimeout(FLUSH_UNBLOCK_TIMEOUT); } catch (TimeoutException te){ if(log.isWarnEnabled()) log.warn("waiting on UNBLOCK after connect timed out"); } } } connected=true; notifyChannelConnected(this); } public synchronized boolean connect(String cluster_name, Address target, String state_id, long timeout) throws ChannelException { throw new UnsupportedOperationException("not yet implemented"); } /** * Disconnects the channel if it is connected. If the channel is closed, this operation is ignored<BR> * Otherwise the following actions happen in the listed order<BR> * <ol> * <li> The JChannel sends a DISCONNECT event down the protocol stack<BR> * <li> Blocks until the channel to receives a DISCONNECT_OK event<BR> * <li> Sends a STOP_QUEING event down the stack<BR> * <li> Stops the protocol stack by calling ProtocolStack.stop()<BR> * <li> Notifies the listener, if the listener is available<BR> * </ol> */ public synchronized void disconnect() { if(closed) return; if(connected) { if(cluster_name != null) { /* Send down a DISCONNECT event. The DISCONNECT event travels down to the GMS, where a * DISCONNECT_OK response is generated and sent up the stack. JChannel blocks until a * DISCONNECT_OK has been received, or until timeout has elapsed. */ Event disconnect_event=new Event(Event.DISCONNECT, local_addr); disconnect_promise.reset(); down(disconnect_event); // DISCONNECT is handled by each layer disconnect_promise.getResult(); // wait for DISCONNECT_OK } // Just in case we use the QUEUE protocol and it is still blocked... down(new Event(Event.STOP_QUEUEING)); connected=false; try { prot_stack.stopStack(); // calls stop() in all protocols, from top to bottom } catch(Exception e) { if(log.isErrorEnabled()) log.error("exception: " + e); } notifyChannelDisconnected(this); init(); // sets local_addr=null; changed March 18 2003 (bela) -- prevented successful rejoining } } /** * Destroys the channel. * After this method has been called, the channel us unusable.<BR> * This operation will disconnect the channel and close the channel receive queue immediately<BR> */ public synchronized void close() { _close(true, true); // by default disconnect before closing channel and close mq } /** Shuts down the channel without disconnecting */ public synchronized void shutdown() { _close(false, true); // by default disconnect before closing channel and close mq } /** * Opens the channel. * This does the following actions: * <ol> * <li> Resets the receiver queue by calling Queue.reset * <li> Sets up the protocol stack by calling ProtocolStack.setup * <li> Sets the closed flag to false * </ol> */ public synchronized void open() throws ChannelException { if(!closed) throw new ChannelException("channel is already open"); try { mq.reset(); // new stack is created on open() - bela June 12 2003 prot_stack=new ProtocolStack(this, props); prot_stack.setup(); closed=false; } catch(Exception e) { throw new ChannelException("failed to open channel" , e); } } /** * returns true if the Open operation has been called successfully */ public boolean isOpen() { return !closed; } /** * returns true if the Connect operation has been called successfully */ public boolean isConnected() { return connected; } public int getNumMessages() { return mq != null? mq.size() : -1; } public String dumpQueue() { return Util.dumpQueue(mq); } /** * Returns a map of statistics of the various protocols and of the channel itself. * @return Map<String,Map>. A map where the keys are the protocols ("channel" pseudo key is * used for the channel itself") and the values are property maps. */ public Map dumpStats() { Map retval=prot_stack.dumpStats(); if(retval != null) { Map tmp=dumpChannelStats(); if(tmp != null) retval.put("channel", tmp); } return retval; } private Map dumpChannelStats() { Map retval=new HashMap(); retval.put("sent_msgs", new Long(sent_msgs)); retval.put("sent_bytes", new Long(sent_bytes)); retval.put("received_msgs", new Long(received_msgs)); retval.put("received_bytes", new Long(received_bytes)); return retval; } /** * Sends a message through the protocol stack. * Implements the Transport interface. * * @param msg the message to be sent through the protocol stack, * the destination of the message is specified inside the message itself * @exception ChannelNotConnectedException * @exception ChannelClosedException */ public void send(Message msg) throws ChannelNotConnectedException, ChannelClosedException { checkClosed(); checkNotConnected(); if(stats) { sent_msgs++; sent_bytes+=msg.getLength(); } if(msg == null) throw new NullPointerException("msg is null"); down(new Event(Event.MSG, msg)); } /** * creates a new message with the destination address, and the source address * and the object as the message value * @param dst - the destination address of the message, null for all members * @param src - the source address of the message * @param obj - the value of the message * @exception ChannelNotConnectedException * @exception ChannelClosedException * @see JChannel#send */ public void send(Address dst, Address src, Serializable obj) throws ChannelNotConnectedException, ChannelClosedException { send(new Message(dst, src, obj)); } /** * Blocking receive method. * This method returns the object that was first received by this JChannel and that has not been * received before. After the object is received, it is removed from the receive queue.<BR> * If you only want to inspect the object received without removing it from the queue call * JChannel.peek<BR> * If no messages are in the receive queue, this method blocks until a message is added or the operation times out<BR> * By specifying a timeout of 0, the operation blocks forever, or until a message has been received. * @param timeout the number of milliseconds to wait if the receive queue is empty. 0 means wait forever * @exception TimeoutException if a timeout occured prior to a new message was received * @exception ChannelNotConnectedException * @exception ChannelClosedException * @see JChannel#peek */ public Object receive(long timeout) throws ChannelNotConnectedException, ChannelClosedException, TimeoutException { checkClosed(); checkNotConnected(); try { Event evt=(timeout <= 0)? (Event)mq.remove() : (Event)mq.remove(timeout); Object retval=getEvent(evt); evt=null; if(stats) { if(retval != null && retval instanceof Message) { received_msgs++; received_bytes+=((Message)retval).getLength(); } } return retval;
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?