📄 nonblockingoutputpipe.java
字号:
* state.</td> * </tr> * <p/> * <tr> * <th>PENDINGVERIFY</th> * <td>Issues query messages to verify that the destination peer is * still listening on the pipe. Queries are issued every * <code>QUERYINTERVAL</code> milliseconds. If a positive response * is received, go to state <b>ACQUIREMESSENGER</b>. If no response * is received within <b>QUERYTIMEOUT</b> milliseconds or a * negative response is received then go to state * <b>STARTMIGRATE</b>.</td> * </tr> * <p/> * <tr> * <th>PENDINGMIGRATE</th> * <td>Issues query messages to find a new destination peer. * Queries are issued every <code>QUERYINTERVAL</code> milliseconds. * If a positive response is received, go to state * <b>ACQUIREMESSENGER</b>. If no positive response from an * eligible peer is received within <b>QUERYTIMEOUT</b> * milliseconds go to state <b>CLOSED</b>.</td> * </tr> * <p/> * <tr> * <th>CLOSED</th> * <td>Exit the worker thread.</td> * </tr> * </tbody> * </table> */ public void run() { long absoluteTimeoutAt = -1; long nextQueryAt = -1; try { // state loop while (WorkerState.CLOSED != workerstate) { synchronized (this) { LOG.fine("NON-BLOCKING WORKER AT STATE : " + workerstate + ((WorkerState.SENDMESSAGES == workerstate) ? "\n\t" + TimeUtils.toRelativeTimeMillis(nextVerifyAt, TimeUtils.timeNow()) + " until verify." : "")); // switch() emulation if ((WorkerState.STARTVERIFY == workerstate) || (WorkerState.STARTMIGRATE == workerstate)) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { if (null == destPeer) { LOG.fine("Starting re-resolve for \'" + getPipeID()); } else { LOG.fine("Starting verify for \'" + getPipeID() + "\' to : " + destPeer); } } queryID = PipeResolver.getNextQueryID(); pipeResolver.addListener(getPipeID(), this, queryID); absoluteTimeoutAt = TimeUtils.toAbsoluteTimeMillis( Math.max(QUERYTIMEOUTMIN, (PipeServiceImpl.VERIFYINTERVAL / 20))); nextQueryAt = TimeUtils.timeNow(); if (WorkerState.STARTVERIFY == workerstate) { workerstate = WorkerState.PENDINGVERIFY; } else if (WorkerState.STARTMIGRATE == workerstate) { workerstate = WorkerState.PENDINGMIGRATE; } // move on to the next state. } else if ((WorkerState.PENDINGVERIFY == workerstate) || (WorkerState.PENDINGMIGRATE == workerstate)) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine( "Pipe " + ((WorkerState.PENDINGVERIFY == workerstate) ? "verify" : "migrate") + "in progress. Continues for " + TimeUtils.toRelativeTimeMillis(absoluteTimeoutAt, TimeUtils.timeNow()) + "ms. Next query in " + TimeUtils.toRelativeTimeMillis(nextQueryAt, TimeUtils.timeNow()) + "ms."); } // check to see if we are completely done. if (TimeUtils.toRelativeTimeMillis(absoluteTimeoutAt, TimeUtils.timeNow()) <= 0) { pipeResolver.removeListener(getPipeID(), queryID); if (WorkerState.PENDINGVERIFY == workerstate) { if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("Pipe \'" + getPipeID() + "\' has migrated from " + destPeer); } workerstate = WorkerState.STARTMIGRATE; // move on to the next state. continue; } else { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Pipe \'" + getPipeID() + "\' cannot be migrated and is being closed"); } workerstate = WorkerState.CLOSED; close(); // move on to the next state. continue; } } // check if its time ot send another copy of the query. if (TimeUtils.toRelativeTimeMillis(nextQueryAt, TimeUtils.timeNow()) <= 0) { if (null != destPeer) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine( "Sending out verify query (" + queryID + ") for \'" + getPipeID() + "\' to : " + destPeer); } pipeResolver.sendPipeQuery(pAdv, Collections.singleton(destPeer), queryID); } else { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sending out resolve query (" + queryID + ") for " + getPipeID()); } pipeResolver.sendPipeQuery(pAdv, resolvablePeers, queryID); } nextQueryAt = TimeUtils.toAbsoluteTimeMillis( Math.max(QUERYINTERVALMIN, (PipeServiceImpl.VERIFYINTERVAL / 50))); } long sleep = TimeUtils.toRelativeTimeMillis(Math.min(nextQueryAt, absoluteTimeoutAt), TimeUtils.timeNow()); if (sleep >= 0) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Waiting " + sleep + "ms for response for (" + queryID + ") for " + getPipeID()); } try { wait(sleep); } catch (InterruptedException woken) { Thread.interrupted(); } } // move on to the next state. } else if (WorkerState.ACQUIREMESSENGER == workerstate) { if ((null == destMessenger) || destMessenger.isClosed()) { destMessenger = null; if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Getting messenger to \'" + destPeer + "\' for pipe " + getPipeID()); } destAddress = mkAddress(destPeer, getPipeID()); // todo 20031011 bondolo@jxta.org This should not be done under sync destMessenger = endpoint.getMessenger(destAddress); if (destMessenger == null) { // We could not get a messenger to the peer, forget it and try again. if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Could not get messenger to : " + destPeer + ". "); } if (migrated) { // we can't migrate again, we never finished. // the last migrate! workerstate = WorkerState.CLOSED; close(); } else { workerstate = WorkerState.STARTMIGRATE; } pipeResolver.removeListener((PipeID) getPipeID(), queryID); queryID = -1; destPeer = null; destAddress = null; // move on to the next state. continue; } else { // migration completed. migrated = false; } } else { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Using existing messenger to : " + destPeer); } } workerstate = WorkerState.SENDMESSAGES; nextVerifyAt = TimeUtils.toAbsoluteTimeMillis(PipeServiceImpl.VERIFYINTERVAL); // move on to the next state. continue; // can't just fall through because we would start sending messages immediately. } else if (WorkerState.SENDMESSAGES == workerstate) { // is it time to do verification again? if (TimeUtils.toRelativeTimeMillis(nextVerifyAt, TimeUtils.timeNow()) <= 0) { workerstate = WorkerState.STARTVERIFY; pipeResolver.removeListener(getPipeID(), queryID); queryID = -1; } // move on to the next state. } else if (WorkerState.CLOSED == workerstate) { queue.clear(); // they aren't going to be sent if (null != destMessenger) { destMessenger.close(); destMessenger = null; } serviceThread = null; break; } else { LOG.warning("Unrecognized state in worker thread : " + workerstate); } } // now actually send messages. We don't do this under the global sync. if (WorkerState.SENDMESSAGES == workerstate) { Message msg = null; try { msg = (Message) queue.pop(IDLEWORKERLINGER); } catch (InterruptedException woken) { Thread.interrupted(); continue; } if (null == msg) { synchronized (this) { // before deciding to die, we need to make sure that // nobody snuck something into the queue. If there // is, then we have to be the one to service the // queue. if (null == queue.peek()) { if (closed) { workerstate = WorkerState.CLOSED; continue; } else { serviceThread = null; break; } } else { continue; } } } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sending " + msg + " on " + getPipeID()); } if (!destMessenger.isClosed()) { try { destMessenger.sendMessageB(msg, null, null); } catch (IOException failed) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Failure sending " + msg + " on " + getPipeID(), failed); } } } // May be now closed due to failing to send. if (destMessenger.isClosed()) { synchronized (this) { workerstate = WorkerState.ACQUIREMESSENGER; destMessenger = null; } } } } } catch (Throwable all) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.log(Level.SEVERE, "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all); } // give another thread the chance to start unless one already has. // If the exception was caused by damaged state on this object then // starting a new Thread may just cause the same exception again. // Unfortunate tradeoff. synchronized (this) { if (serviceThread == Thread.currentThread()) { serviceThread = null; } } } finally { if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info( "Thread exit : " + Thread.currentThread().getName() + "\n\tworker state : " + workerstate + "\tqueue closed : " + queue.isClosed() + "\tnumber in queue : " + queue.getCurrentInQueue() + "\tnumber queued : " + queue.getNumEnqueued() + "\tnumber dequeued : " + queue.getNumDequeued()); } } } /** * Starts the worker thread if it is not already running. */ private synchronized void startServiceThread() { // if there is no service thread, start one. if ((null == serviceThread) && !closed) { serviceThread = new Thread(peerGroup.getHomeThreadGroup(), this , "Worker Thread for NonBlockingOutputPipe : " + getPipeID()); serviceThread.setDaemon(true); serviceThread.start(); if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info( "Thread start : " + serviceThread.getName() + "\n\tworker state : " + workerstate + "\tqueue closed : " + queue.isClosed() + "\tnumber in queue : " + queue.getCurrentInQueue() + "\tnumber queued : " + queue.getNumEnqueued() + "\tnumber dequeued : " + queue.getNumDequeued()); } } } /** * Convenience method for constructing a peer endpoint address from its * peer id * * @param destPeer the desitnation peer * @param pipeID the pipe to put in the param field. * @return the pipe endpoint address. */ protected EndpointAddress mkAddress(ID destPeer, ID pipeID) { return new EndpointAddress("jxta", destPeer.getUniqueValue().toString(), "PipeService", pipeID.toString()); } /** * {@inheritDoc} */ public synchronized boolean pipeNAKEvent(PipeResolver.Event event) { if (((workerstate == WorkerState.PENDINGVERIFY) || (workerstate == WorkerState.ACQUIREMESSENGER) || (workerstate == WorkerState.SENDMESSAGES)) && (event.getPeerID().equals(destPeer) && (event.getQueryID() == queryID))) { // we have been told that the destination peer no longer wants // to talk with us. We will try to migrate to another peer. if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Pipe \'" + getPipeID() + "\' is closed at " + event.getPeerID()); } workerstate = WorkerState.STARTMIGRATE; pipeResolver.removeListener(getPipeID(), queryID); queryID = -1; destPeer = null; destAddress = null; if (null != destMessenger) { destMessenger.close(); destMessenger = null; } notify(); return true; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Ignoring NAK from " + event.getPeerID()); } // didn't refer to us or we don't care. return false; } /** * {@inheritDoc} */ public synchronized boolean pipeResolveEvent(PipeResolver.Event event) { if (((workerstate == WorkerState.PENDINGVERIFY) || (workerstate == WorkerState.PENDINGMIGRATE)) && (event.getQueryID() == queryID)) { if ((workerstate == WorkerState.PENDINGVERIFY) && !event.getPeerID().equals(destPeer)) { // not from the right peer so ignore it. if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Ignoring response from " + event.getPeerID()); } return false; } else { if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("Pipe \'" + getPipeID() + "\' is verified for " + destPeer); } } workerstate = WorkerState.ACQUIREMESSENGER; migrated = true; destPeer = event.getPeerID(); if ((workerstate == WorkerState.PENDINGMIGRATE) && Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("Pipe \'" + getPipeID() + "\' has migrated to " + destPeer); } notify(); return true; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Ignoring resolve from " + event.getPeerID()); } // didn't refer to us or we don't care. return false; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -