📄 nonblockingoutputpipe.java
字号:
} else if( workerState.ACQUIREMESSENGER == workerstate ) { if ( (null == destMessenger) || destMessenger.isClosed() ) { destMessenger = null; if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug( "Getting messenger to '" + destPeer + "' for pipe " + (PipeID) getPipeID() ); } destAddress = mkAddress( destPeer, getPipeID() ); // FIXME 20031011 bondolo@jxta.org This should not be done under sync destMessenger = endpoint.getMessenger(destAddress); if (destMessenger == null) { // We could not get a messenger to the peer, forget it and // try again. if (LOG.isEnabledFor(Level.WARN)) { LOG.warn( "Could not get messenger to : " + destPeer + ". " ); } if( migrated ) { // we can't migrate again, we never finished. // the last migrate! workerstate = workerState.CLOSED; close(); } else { workerstate = workerState.STARTMIGRATE; } myPipeResolver.removeListener( (PipeID) getPipeID(), queryID ); queryID = -1; destPeer = null; destAddress = null; // move on to the next state. continue; } else { // migration completed. migrated = false; } } else { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug( "Using existing messenger to : " + destPeer ); } } workerstate = workerState.SENDMESSAGES; nextVerifyAt = TimeUtils.toAbsoluteTimeMillis( PipeServiceImpl.VERIFYINTERVAL ); // move on to the next state. continue; // can't just fall through because we would start sending messages immediately. } else if( workerState.SENDMESSAGES == workerstate ) { // is it time to do verification again? if ( TimeUtils.toRelativeTimeMillis( nextVerifyAt, TimeUtils.timeNow() ) <= 0 ) { workerstate = workerState.STARTVERIFY; myPipeResolver.removeListener( (PipeID) getPipeID(), queryID ); queryID = -1; } // move on to the next state. } else if( workerState.CLOSED == workerstate ) { queue.clear(); // they aren't going to be sent if( null != destMessenger ) { destMessenger.close(); destMessenger = null; } serviceThread = null; break; } else { LOG.warn( "Unrecognized state in worker thread : " + workerstate ); } } // now actually send messages. We don't do this under the global sync. if( workerState.SENDMESSAGES == workerstate ) { Message msg = null; try { msg = (Message) queue.pop( IDLEWORKERLINGER ); } catch ( InterruptedException woken ) { Thread.interrupted(); continue; } if( null == msg ) { synchronized( this ) { // before deciding to die, we need to make sure that // nobody snuck something into the queue. If there // is, then we have to be the one to service the // queue. if( null == queue.peek() ) { if( closed ) { workerstate = workerState.CLOSED; continue; } else { serviceThread = null; break; } } else { continue; } } } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug( "Sending " + msg + " on " + getPipeID() ); } if (!destMessenger.isClosed() ) { try { destMessenger.sendMessageB(msg, null, null); } catch( IOException failed ) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn( "Failure sending " + msg + " on " + getPipeID(), failed ); } } } // May be now closed due to failing to send. if ( destMessenger.isClosed() ) { synchronized( this ) { workerstate = workerState.ACQUIREMESSENGER; destMessenger = null; } } } } } catch ( Throwable all ) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error( "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all ); } // give another thread the chance to start unless one already has. // If the exception was caused by damaged state on this object then // starting a new Thread may just cause the same exception again. // Unfortunate tradeoff. synchronized( this ) { if( serviceThread == Thread.currentThread() ) { serviceThread = null; } } } finally { if (LOG.isEnabledFor(Level.INFO)) { LOG.info( "Thread exit : " + Thread.currentThread().getName() + "\n\tworker state : " + workerstate + "\tqueue closed : " + queue.isClosed() + "\tnumber in queue : " + queue.getCurrentInQueue() + "\tnumber queued : " + queue.getNumEnqueued() + "\tnumber dequeued : " + queue.getNumDequeued() ); } } } /** * Starts the worker thread if it is not already running. **/ private synchronized void startServiceThread() { // if there is no service thread, start one. if ( (null == serviceThread) && !closed ) { serviceThread = new Thread( myGroup.getHomeThreadGroup(), this, "Worker Thread for NonBlockingOutputPipe : " + getPipeID() ); serviceThread.setDaemon( true ); serviceThread.start(); if ( LOG.isEnabledFor(Level.INFO) ) { LOG.info( "Thread start : " + serviceThread.getName() + "\n\tworker state : " + workerstate + "\tqueue closed : " + queue.isClosed() + "\tnumber in queue : " + queue.getCurrentInQueue() + "\tnumber queued : " + queue.getNumEnqueued() + "\tnumber dequeued : " + queue.getNumDequeued() ); } } } /** * Convenience method for constructing a peer endpoint address from its * peer id * * @param destPeer the desitnation peer * @param pipeID the pipe to put in the param field. * @return the pipe endpoint address. */ protected EndpointAddress mkAddress( ID destPeer, ID pipeID ) { EndpointAddress addr = new EndpointAddress( "jxta", destPeer.getUniqueValue().toString(), "PipeService", pipeID.toString() ); return addr; } /** * {@inheritDoc} **/ public synchronized boolean pipeNAKEvent( PipeResolver.Event event) { if( ((workerstate == workerState.PENDINGVERIFY) || (workerstate == workerState.ACQUIREMESSENGER) || (workerstate == workerState.SENDMESSAGES)) && (event.getPeerID().equals(destPeer) && (event.getQueryID() == queryID)) ) { // we have been told that the destination peer no longer wants // to talk with us. We will try to migrate to another peer. if (LOG.isEnabledFor(Level.WARN)) { LOG.warn( "Pipe '" + getPipeID() + "' is closed at " + event.getPeerID() ); } workerstate = workerState.STARTMIGRATE; myPipeResolver.removeListener( (PipeID) getPipeID(), queryID ); queryID = -1; destPeer = null; destAddress = null; if( null != destMessenger ) { destMessenger.close(); destMessenger = null; } notify(); return true; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug( "Ignoring NAK from " + event.getPeerID() ); } // didn't refer to us or we don't care. return false; } /** * {@inheritDoc} **/ public synchronized boolean pipeResolveEvent( PipeResolver.Event event) { if( ((workerstate == workerState.PENDINGVERIFY) || (workerstate == workerState.PENDINGMIGRATE)) && (event.getQueryID() == queryID) ) { if( (workerstate == workerState.PENDINGVERIFY) && !event.getPeerID().equals(destPeer) ) { // not from the right peer so ignore it. if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug( "Ignoring response from " + event.getPeerID() ); } return false; } else { if (LOG.isEnabledFor(Level.INFO)) { LOG.info( "Pipe '" + getPipeID() + "' is verified for " + destPeer ); } } workerstate = workerState.ACQUIREMESSENGER; migrated = true; destPeer = event.getPeerID(); if( (workerstate == workerState.PENDINGMIGRATE) && LOG.isEnabledFor(Level.INFO)) { LOG.info( "Pipe '" + getPipeID() + "' has migrated to " + destPeer ); } notify(); return true; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug( "Ignoring resolve from " + event.getPeerID() ); } // didn't refer to us or we don't care. return false; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -