⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nonblockingoutputpipe.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
     *  {@inheritDoc}     **/    protected void finalize() {        close();    }        /**     *  {@inheritDoc}     **/    public synchronized void close() {                // Close the queue so that no more messages are accepted        if( !closed ) {            if (LOG.isEnabledFor(Level.INFO)) {                LOG.info("Closing queue for " + getPipeID() );            }                        queue.close();        }                closed = true;    }        /**     *  {@inheritDoc}     **/    public boolean isClosed() {        return closed;    }        /**     *  {@inheritDoc}     **/    public final String getType() {        return pAdv.getType();    }            /**     *  {@inheritDoc}     **/    public final ID getPipeID() {        return pAdv.getPipeID();    }            /**     *  {@inheritDoc}     **/    public final String getName() {        return pAdv.getName();    }        /**     *  {@inheritDoc}     **/    public final PipeAdvertisement getAdvertisement() {        return pAdv;    }        /**     *  {@inheritDoc}     **/    public boolean send(Message msg) throws IOException {        if ( LOG.isEnabledFor(Level.DEBUG) ) {            LOG.debug( "Queuing " + msg + " for pipe " + getPipeID() );        }                boolean pushed = false;                while( !queue.isClosed() ) {            try {                pushed = queue.push( msg, 250 * TimeUtils.AMILLISECOND );                break;            } catch ( InterruptedException woken ) {                Thread.interrupted();            }        }                if( !pushed && queue.isClosed() ) {            IOException failed = new IOException( "Could not enqueue " + msg + " for sending. Pipe is closed." );                        if (LOG.isEnabledFor(Level.ERROR)) {                LOG.error( failed, failed );            }                        throw failed;        }                startServiceThread();                return pushed;    }        /**     *  {@inheritDoc}     *     *  <p/>Sends the messages.     *     *  <p>This method does a lot of things. It has several distinct states:     *     *  <p/><table border="1">     *      <thead>     *      <tr>     *          <th>STATE</th>     *          <th>Activity</th>     *      <tr>     *      </thead>     *     *      <tbody>     *      <tr>     *          <th>ACQUIREMESSENGER</th     *          <td>Acquire a messenger to the specified destination peer. If a     *          messenger is acquired, then go to <b>SENDMESSAGES</b> state     *          otherwise go to <b>STARTMIGRATE</b>.</td>     *      </tr>     *     *      <tr>     *          <th>SENDMESSAGES</th>     *          <td>Send messages until queue is closed and all messages have     *          been sent. Go to state <b>CLOSED</b> when done. If the messenger     *          becomes closed then go to <b>ACQUIREMESSENGER</b>. <emphasis>If     *          there are no messages to send for <code>IDLEWORKERLINGER</code>     *          millisecondsthen the worker thread will exit. It will only be     *          restarted if another message is eventually enqueued.</emphasis>     *          </td>     *      </tr>     *     *      <tr>     *          <th>STARTVERIFY</th>     *          <td>Starts a verification query(s) to the destination peer. This     *          state is activated after     *          <code>PipeServiceImpl.VERIFYINTERVAL</code> milliseconds of     *          sending messages. The query responses will be tracked in the     *          <b>PENDINGVERIFY</b> state.</td>     *      </tr>     *     *      <tr>     *          <th>STARTMIGRATE</th>     *          <td>Starts a query(s) for peers listening on this pipe. The     *          query responses will be tracked in the <b>PENDINGMIGRATE</b>     *          state.</td>     *      </tr>     *     *      <tr>     *          <th>PENDINGVERIFY</th>     *          <td>Issues query messages to verify that the destination peer is     *          still listening on the pipe. Queries are issued every     *          <code>QUERYINTERVAL</code> milliseconds. If a positive response     *          is received, go to state <b>ACQUIREMESSENGER</b>. If no response     *          is received within <b>QUERYTIMEOUT</b> milliseconds or a     *          negative response is received then go to state     *          <b>STARTMIGRATE</b>.</td>     *      </tr>     *     *      <tr>     *          <th>PENDINGMIGRATE</th>     *          <td>Issues query messages to find a new destination peer.     *          Queries are issued every <code>QUERYINTERVAL</code> milliseconds.     *          If a positive response is received, go to state     *          <b>ACQUIREMESSENGER</b>. If no positive response from an     *          eligible peer is received within <b>QUERYTIMEOUT</b>     *          milliseconds go to state <b>CLOSED</b>.</td>     *      </tr>     *     *      <tr>     *          <th>CLOSED</th>     *          <td>Exit the worker thread.</td>     *      </tr>     *      </tbody>     *  </table>     **/    public void run() {        long absoluteTimeoutAt = -1;        long nextQueryAt = -1;                try {            // state loop            while( workerState.CLOSED != workerstate ) {                synchronized( this ) {                    LOG.debug( "NON-BLOCKING WORKER AT STATE : " + workerstate + ((workerState.SENDMESSAGES == workerstate) ?                    "\n\t" + TimeUtils.toRelativeTimeMillis( nextVerifyAt, TimeUtils.timeNow() ) + " until verify." : "" ) );                                        // switch() emulation                                        if ( (workerState.STARTVERIFY == workerstate) || (workerState.STARTMIGRATE == workerstate) ) {                        if (LOG.isEnabledFor(Level.DEBUG)) {                            if( null == destPeer ) {                                LOG.debug( "Starting re-resolve for '" + getPipeID() );                            } else {                                LOG.debug( "Starting verify for '" + getPipeID() + "' to : " + destPeer );                            }                        }                                                queryID = myPipeResolver.getNextQueryID();                        myPipeResolver.addListener( (PipeID) getPipeID(), this, queryID );                        absoluteTimeoutAt = TimeUtils.toAbsoluteTimeMillis( Math.max( QUERYTIMEOUTMIN, (PipeServiceImpl.VERIFYINTERVAL / 20) ) );                        nextQueryAt = TimeUtils.timeNow();                                                if( workerState.STARTVERIFY == workerstate ){                            workerstate = workerState.PENDINGVERIFY;                        } else if ( workerState.STARTMIGRATE == workerstate ) {                            workerstate = workerState.PENDINGMIGRATE;                        }                                                // move on to the next state.                    } else if ( (workerState.PENDINGVERIFY == workerstate) || (workerState.PENDINGMIGRATE == workerstate) ) {                                                if (LOG.isEnabledFor(Level.DEBUG)) {                            LOG.debug( "Pipe " + ((workerState.PENDINGVERIFY == workerstate) ? "verify" : "migrate") +                            "in progress. Continues for " + TimeUtils.toRelativeTimeMillis( absoluteTimeoutAt, TimeUtils.timeNow() ) + "ms. Next query in " +                            TimeUtils.toRelativeTimeMillis( nextQueryAt, TimeUtils.timeNow() ) + "ms." );                        }                                                // check to see if we are completely done.                        if ( TimeUtils.toRelativeTimeMillis( absoluteTimeoutAt, TimeUtils.timeNow() ) <= 0 ) {                                                        myPipeResolver.removeListener( (PipeID) getPipeID(), queryID );                                                        if( workerState.PENDINGVERIFY == workerstate ) {                                if (LOG.isEnabledFor(Level.INFO)) {                                    LOG.info( "Pipe '" + getPipeID() + "' has migrated from " + destPeer );                                }                                workerstate = workerState.STARTMIGRATE;                                                                // move on to the next state.                                continue;                            } else {                                if (LOG.isEnabledFor(Level.WARN)) {                                    LOG.warn( "Pipe '" + getPipeID() + "' cannot be migrated and is being closed" );                                }                                                                workerstate = workerState.CLOSED;                                close();                                                                // move on to the next state.                                continue;                            }                        }                                                // check if its time ot send another copy of the query.                        if( TimeUtils.toRelativeTimeMillis( nextQueryAt, TimeUtils.timeNow() ) <= 0 ) {                            if( null != destPeer ) {                                if (LOG.isEnabledFor(Level.DEBUG)) {                                    LOG.debug( "Sending out verify query (" + queryID + ") for '" + getPipeID() + "' to : " + destPeer );                                }                                myPipeResolver.sendPipeQuery( pAdv, Collections.singleton( destPeer ), queryID );                            } else {                                if (LOG.isEnabledFor(Level.DEBUG)) {                                    LOG.debug( "Sending out resolve query (" + queryID + ") for " + getPipeID() );                                }                                myPipeResolver.sendPipeQuery( pAdv, resolvablePeers, queryID );                            }                                                        nextQueryAt = TimeUtils.toAbsoluteTimeMillis( Math.max( QUERYINTERVALMIN, (PipeServiceImpl.VERIFYINTERVAL / 50) ) );                        }                                                long sleep = TimeUtils.toRelativeTimeMillis( Math.min( nextQueryAt, absoluteTimeoutAt ), TimeUtils.timeNow() );                                                if ( sleep >= 0 ) {                            if (LOG.isEnabledFor(Level.DEBUG)) {                                LOG.debug( "Waiting " + sleep + "ms for response for (" + queryID + ") for " + getPipeID() );                            }                                                        try {                                wait( sleep );                            } catch( InterruptedException woken ) {                                Thread.interrupted();                            }                        }                                                // move on to the next state.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -