📄 nonblockingoutputpipe.java
字号:
* {@inheritDoc} **/ protected void finalize() { close(); } /** * {@inheritDoc} **/ public synchronized void close() { // Close the queue so that no more messages are accepted if( !closed ) { if (LOG.isEnabledFor(Level.INFO)) { LOG.info("Closing queue for " + getPipeID() ); } queue.close(); } closed = true; } /** * {@inheritDoc} **/ public boolean isClosed() { return closed; } /** * {@inheritDoc} **/ public final String getType() { return pAdv.getType(); } /** * {@inheritDoc} **/ public final ID getPipeID() { return pAdv.getPipeID(); } /** * {@inheritDoc} **/ public final String getName() { return pAdv.getName(); } /** * {@inheritDoc} **/ public final PipeAdvertisement getAdvertisement() { return pAdv; } /** * {@inheritDoc} **/ public boolean send(Message msg) throws IOException { if ( LOG.isEnabledFor(Level.DEBUG) ) { LOG.debug( "Queuing " + msg + " for pipe " + getPipeID() ); } boolean pushed = false; while( !queue.isClosed() ) { try { pushed = queue.push( msg, 250 * TimeUtils.AMILLISECOND ); break; } catch ( InterruptedException woken ) { Thread.interrupted(); } } if( !pushed && queue.isClosed() ) { IOException failed = new IOException( "Could not enqueue " + msg + " for sending. Pipe is closed." ); if (LOG.isEnabledFor(Level.ERROR)) { LOG.error( failed, failed ); } throw failed; } startServiceThread(); return pushed; } /** * {@inheritDoc} * * <p/>Sends the messages. * * <p>This method does a lot of things. It has several distinct states: * * <p/><table border="1"> * <thead> * <tr> * <th>STATE</th> * <th>Activity</th> * <tr> * </thead> * * <tbody> * <tr> * <th>ACQUIREMESSENGER</th * <td>Acquire a messenger to the specified destination peer. If a * messenger is acquired, then go to <b>SENDMESSAGES</b> state * otherwise go to <b>STARTMIGRATE</b>.</td> * </tr> * * <tr> * <th>SENDMESSAGES</th> * <td>Send messages until queue is closed and all messages have * been sent. Go to state <b>CLOSED</b> when done. If the messenger * becomes closed then go to <b>ACQUIREMESSENGER</b>. <emphasis>If * there are no messages to send for <code>IDLEWORKERLINGER</code> * millisecondsthen the worker thread will exit. It will only be * restarted if another message is eventually enqueued.</emphasis> * </td> * </tr> * * <tr> * <th>STARTVERIFY</th> * <td>Starts a verification query(s) to the destination peer. This * state is activated after * <code>PipeServiceImpl.VERIFYINTERVAL</code> milliseconds of * sending messages. The query responses will be tracked in the * <b>PENDINGVERIFY</b> state.</td> * </tr> * * <tr> * <th>STARTMIGRATE</th> * <td>Starts a query(s) for peers listening on this pipe. The * query responses will be tracked in the <b>PENDINGMIGRATE</b> * state.</td> * </tr> * * <tr> * <th>PENDINGVERIFY</th> * <td>Issues query messages to verify that the destination peer is * still listening on the pipe. Queries are issued every * <code>QUERYINTERVAL</code> milliseconds. If a positive response * is received, go to state <b>ACQUIREMESSENGER</b>. If no response * is received within <b>QUERYTIMEOUT</b> milliseconds or a * negative response is received then go to state * <b>STARTMIGRATE</b>.</td> * </tr> * * <tr> * <th>PENDINGMIGRATE</th> * <td>Issues query messages to find a new destination peer. * Queries are issued every <code>QUERYINTERVAL</code> milliseconds. * If a positive response is received, go to state * <b>ACQUIREMESSENGER</b>. If no positive response from an * eligible peer is received within <b>QUERYTIMEOUT</b> * milliseconds go to state <b>CLOSED</b>.</td> * </tr> * * <tr> * <th>CLOSED</th> * <td>Exit the worker thread.</td> * </tr> * </tbody> * </table> **/ public void run() { long absoluteTimeoutAt = -1; long nextQueryAt = -1; try { // state loop while( workerState.CLOSED != workerstate ) { synchronized( this ) { LOG.debug( "NON-BLOCKING WORKER AT STATE : " + workerstate + ((workerState.SENDMESSAGES == workerstate) ? "\n\t" + TimeUtils.toRelativeTimeMillis( nextVerifyAt, TimeUtils.timeNow() ) + " until verify." : "" ) ); // switch() emulation if ( (workerState.STARTVERIFY == workerstate) || (workerState.STARTMIGRATE == workerstate) ) { if (LOG.isEnabledFor(Level.DEBUG)) { if( null == destPeer ) { LOG.debug( "Starting re-resolve for '" + getPipeID() ); } else { LOG.debug( "Starting verify for '" + getPipeID() + "' to : " + destPeer ); } } queryID = myPipeResolver.getNextQueryID(); myPipeResolver.addListener( (PipeID) getPipeID(), this, queryID ); absoluteTimeoutAt = TimeUtils.toAbsoluteTimeMillis( Math.max( QUERYTIMEOUTMIN, (PipeServiceImpl.VERIFYINTERVAL / 20) ) ); nextQueryAt = TimeUtils.timeNow(); if( workerState.STARTVERIFY == workerstate ){ workerstate = workerState.PENDINGVERIFY; } else if ( workerState.STARTMIGRATE == workerstate ) { workerstate = workerState.PENDINGMIGRATE; } // move on to the next state. } else if ( (workerState.PENDINGVERIFY == workerstate) || (workerState.PENDINGMIGRATE == workerstate) ) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug( "Pipe " + ((workerState.PENDINGVERIFY == workerstate) ? "verify" : "migrate") + "in progress. Continues for " + TimeUtils.toRelativeTimeMillis( absoluteTimeoutAt, TimeUtils.timeNow() ) + "ms. Next query in " + TimeUtils.toRelativeTimeMillis( nextQueryAt, TimeUtils.timeNow() ) + "ms." ); } // check to see if we are completely done. if ( TimeUtils.toRelativeTimeMillis( absoluteTimeoutAt, TimeUtils.timeNow() ) <= 0 ) { myPipeResolver.removeListener( (PipeID) getPipeID(), queryID ); if( workerState.PENDINGVERIFY == workerstate ) { if (LOG.isEnabledFor(Level.INFO)) { LOG.info( "Pipe '" + getPipeID() + "' has migrated from " + destPeer ); } workerstate = workerState.STARTMIGRATE; // move on to the next state. continue; } else { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn( "Pipe '" + getPipeID() + "' cannot be migrated and is being closed" ); } workerstate = workerState.CLOSED; close(); // move on to the next state. continue; } } // check if its time ot send another copy of the query. if( TimeUtils.toRelativeTimeMillis( nextQueryAt, TimeUtils.timeNow() ) <= 0 ) { if( null != destPeer ) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug( "Sending out verify query (" + queryID + ") for '" + getPipeID() + "' to : " + destPeer ); } myPipeResolver.sendPipeQuery( pAdv, Collections.singleton( destPeer ), queryID ); } else { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug( "Sending out resolve query (" + queryID + ") for " + getPipeID() ); } myPipeResolver.sendPipeQuery( pAdv, resolvablePeers, queryID ); } nextQueryAt = TimeUtils.toAbsoluteTimeMillis( Math.max( QUERYINTERVALMIN, (PipeServiceImpl.VERIFYINTERVAL / 50) ) ); } long sleep = TimeUtils.toRelativeTimeMillis( Math.min( nextQueryAt, absoluteTimeoutAt ), TimeUtils.timeNow() ); if ( sleep >= 0 ) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug( "Waiting " + sleep + "ms for response for (" + queryID + ") for " + getPipeID() ); } try { wait( sleep ); } catch( InterruptedException woken ) { Thread.interrupted(); } } // move on to the next state.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -