⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nonblockingoutputpipe.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
                    } else if( workerState.ACQUIREMESSENGER == workerstate ) {                                                if ( (null == destMessenger) || destMessenger.isClosed() ) {                                                        destMessenger = null;                                                        if (LOG.isEnabledFor(Level.DEBUG)) {                                LOG.debug( "Getting messenger to '" + destPeer + "' for pipe " + (PipeID) getPipeID() );                            }                                                        destAddress = mkAddress( destPeer, getPipeID() );                                                        // FIXME 20031011 bondolo@jxta.org This should not be done under sync                            destMessenger = endpoint.getMessenger(destAddress);                                                        if (destMessenger == null) {                                // We could not get a messenger to the peer, forget it and                                // try again.                                                                if (LOG.isEnabledFor(Level.WARN)) {                                    LOG.warn( "Could not get messenger to : " + destPeer + ". " );                                }                                                                if( migrated ) {                                    // we can't migrate again, we never finished.                                    // the last migrate!                                    workerstate = workerState.CLOSED;                                    close();                                } else {                                    workerstate = workerState.STARTMIGRATE;                                }                                myPipeResolver.removeListener( (PipeID) getPipeID(), queryID );                                queryID = -1;                                destPeer = null;                                destAddress = null;                                                                // move on to the next state.                                continue;                            } else {                                // migration completed.                                migrated = false;                            }                        } else {                            if (LOG.isEnabledFor(Level.DEBUG)) {                                LOG.debug( "Using existing messenger to : " + destPeer );                            }                        }                                                workerstate = workerState.SENDMESSAGES;                                                nextVerifyAt = TimeUtils.toAbsoluteTimeMillis( PipeServiceImpl.VERIFYINTERVAL );                                                // move on to the next state.                        continue; // can't just fall through because we would start sending messages immediately.                    } else if( workerState.SENDMESSAGES == workerstate ) {                        // is it time to do verification again?                        if ( TimeUtils.toRelativeTimeMillis( nextVerifyAt, TimeUtils.timeNow() ) <= 0 ) {                            workerstate = workerState.STARTVERIFY;                            myPipeResolver.removeListener( (PipeID) getPipeID(), queryID );                            queryID = -1;                        }                        // move on to the next state.                    } else if( workerState.CLOSED == workerstate ) {                        queue.clear(); // they aren't going to be sent                                                if( null != destMessenger ) {                            destMessenger.close();                            destMessenger = null;                        }                                                serviceThread = null;                                                break;                    } else {                        LOG.warn( "Unrecognized state in worker thread : " + workerstate );                    }                }                                // now actually send messages. We don't do this under the global sync.                if( workerState.SENDMESSAGES == workerstate ) {                    Message msg = null;                                        try {                        msg = (Message) queue.pop( IDLEWORKERLINGER );                    } catch ( InterruptedException woken ) {                        Thread.interrupted();                        continue;                    }                                        if( null == msg ) {                        synchronized( this ) {                            // before deciding to die, we need to make sure that                            // nobody snuck something into the queue. If there                            // is, then we have to be the one to service the                            // queue.                            if( null == queue.peek() ) {                                if( closed ) {                                    workerstate = workerState.CLOSED;                                    continue;                                } else {                                    serviceThread = null;                                    break;                                }                            } else {                                continue;                            }                        }                    }                                        if (LOG.isEnabledFor(Level.DEBUG)) {                        LOG.debug( "Sending " + msg + " on " + getPipeID() );                    }                                        if (!destMessenger.isClosed() ) {                        try {                            destMessenger.sendMessageB(msg, null, null);                        } catch( IOException failed ) {                            if (LOG.isEnabledFor(Level.WARN)) {                                LOG.warn( "Failure sending " + msg + " on " + getPipeID(), failed );                            }                        }                    }                                        // May be now closed due to failing to send.                    if ( destMessenger.isClosed() ) {                        synchronized( this ) {                            workerstate = workerState.ACQUIREMESSENGER;                            destMessenger = null;                        }                    }                }            }        } catch ( Throwable all ) {            if (LOG.isEnabledFor(Level.ERROR)) {                LOG.error( "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all );            }                        // give another thread the chance to start unless one already has.            // If the exception was caused by damaged state on this object then            // starting a new Thread may just cause the same exception again.            // Unfortunate tradeoff.            synchronized( this ) {                if( serviceThread == Thread.currentThread() ) {                    serviceThread = null;                }            }        } finally {            if (LOG.isEnabledFor(Level.INFO)) {                LOG.info( "Thread exit : " + Thread.currentThread().getName() +                "\n\tworker state : " + workerstate +                "\tqueue closed : " + queue.isClosed() +                "\tnumber in queue : " + queue.getCurrentInQueue() +                "\tnumber queued : " + queue.getNumEnqueued() +                "\tnumber dequeued : " + queue.getNumDequeued() );            }        }    }        /**     *  Starts the worker thread if it is not already running.     **/    private synchronized void startServiceThread() {        // if there is no service thread, start one.        if ( (null == serviceThread) && !closed ) {            serviceThread = new Thread( myGroup.getHomeThreadGroup(), this, "Worker Thread for NonBlockingOutputPipe : " + getPipeID() );            serviceThread.setDaemon( true );            serviceThread.start();            if ( LOG.isEnabledFor(Level.INFO) ) {                LOG.info( "Thread start : " + serviceThread.getName() +                "\n\tworker state : " + workerstate +                "\tqueue closed : " + queue.isClosed() +                "\tnumber in queue : " + queue.getCurrentInQueue() +                "\tnumber queued : " + queue.getNumEnqueued() +                "\tnumber dequeued : " + queue.getNumDequeued() );            }        }    }        /**     *  Convenience method for constructing a peer endpoint address from its     *  peer id     *     * @param destPeer the desitnation peer     * @param pipeID the pipe to put in the param field.     * @return the pipe endpoint address.     */    protected EndpointAddress mkAddress( ID destPeer, ID pipeID ) {                EndpointAddress addr = new EndpointAddress( "jxta", destPeer.getUniqueValue().toString(), "PipeService", pipeID.toString() );                return addr;    }        /**     *  {@inheritDoc}     **/    public synchronized boolean pipeNAKEvent( PipeResolver.Event event) {                if( ((workerstate == workerState.PENDINGVERIFY) ||        (workerstate == workerState.ACQUIREMESSENGER) ||        (workerstate == workerState.SENDMESSAGES)) &&        (event.getPeerID().equals(destPeer) && (event.getQueryID() == queryID)) ) {            // we have been told that the destination peer no longer wants            // to talk with us. We will try to migrate to another peer.            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn( "Pipe '" + getPipeID() + "' is closed at " + event.getPeerID() );            }                        workerstate = workerState.STARTMIGRATE;            myPipeResolver.removeListener( (PipeID) getPipeID(), queryID );            queryID = -1;            destPeer = null;            destAddress = null;            if( null != destMessenger ) {                destMessenger.close();                destMessenger = null;            }            notify();            return true;        }                if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug( "Ignoring NAK from " + event.getPeerID() );        }                // didn't refer to us or we don't care.        return false;    }        /**     *  {@inheritDoc}     **/    public synchronized boolean pipeResolveEvent( PipeResolver.Event event) {                if( ((workerstate == workerState.PENDINGVERIFY) ||        (workerstate == workerState.PENDINGMIGRATE)) &&        (event.getQueryID() == queryID) ) {            if( (workerstate == workerState.PENDINGVERIFY) && !event.getPeerID().equals(destPeer) ) {                // not from the right peer so ignore it.                                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug( "Ignoring response from " + event.getPeerID() );                }                return false;            } else {                if (LOG.isEnabledFor(Level.INFO)) {                    LOG.info( "Pipe '" + getPipeID() + "' is verified for " + destPeer );                }            }                        workerstate = workerState.ACQUIREMESSENGER;            migrated = true;            destPeer = event.getPeerID();                        if( (workerstate == workerState.PENDINGMIGRATE) &&  LOG.isEnabledFor(Level.INFO)) {                LOG.info( "Pipe '" + getPipeID() + "' has migrated to " + destPeer );            }                        notify();            return true;        }                if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug( "Ignoring resolve from " + event.getPeerID() );        }                // didn't refer to us or we don't care.        return false;    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -