⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nonblockingoutputpipe.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
     * state.</td>     * </tr>     * <p/>     * <tr>     * <th>PENDINGVERIFY</th>     * <td>Issues query messages to verify that the destination peer is     * still listening on the pipe. Queries are issued every     * <code>QUERYINTERVAL</code> milliseconds. If a positive response     * is received, go to state <b>ACQUIREMESSENGER</b>. If no response     * is received within <b>QUERYTIMEOUT</b> milliseconds or a     * negative response is received then go to state     * <b>STARTMIGRATE</b>.</td>     * </tr>     * <p/>     * <tr>     * <th>PENDINGMIGRATE</th>     * <td>Issues query messages to find a new destination peer.     * Queries are issued every <code>QUERYINTERVAL</code> milliseconds.     * If a positive response is received, go to state     * <b>ACQUIREMESSENGER</b>. If no positive response from an     * eligible peer is received within <b>QUERYTIMEOUT</b>     * milliseconds go to state <b>CLOSED</b>.</td>     * </tr>     * <p/>     * <tr>     * <th>CLOSED</th>     * <td>Exit the worker thread.</td>     * </tr>     * </tbody>     * </table>     */    public void run() {        long absoluteTimeoutAt = -1;        long nextQueryAt = -1;        try {            // state loop            while (WorkerState.CLOSED != workerstate) {                synchronized (this) {                    LOG.fine("NON-BLOCKING WORKER AT STATE : " + workerstate                            + ((WorkerState.SENDMESSAGES == workerstate)                                    ? "\n\t" + TimeUtils.toRelativeTimeMillis(nextVerifyAt, TimeUtils.timeNow())                                    + " until verify."                                    : ""));                    // switch() emulation                    if ((WorkerState.STARTVERIFY == workerstate) || (WorkerState.STARTMIGRATE == workerstate)) {                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                            if (null == destPeer) {                                LOG.fine("Starting re-resolve for \'" + getPipeID());                            } else {                                LOG.fine("Starting verify for \'" + getPipeID() + "\' to : " + destPeer);                            }                        }                        queryID = PipeResolver.getNextQueryID();                        pipeResolver.addListener(getPipeID(), this, queryID);                        absoluteTimeoutAt = TimeUtils.toAbsoluteTimeMillis(                                Math.max(QUERYTIMEOUTMIN, (PipeServiceImpl.VERIFYINTERVAL / 20)));                        nextQueryAt = TimeUtils.timeNow();                        if (WorkerState.STARTVERIFY == workerstate) {                            workerstate = WorkerState.PENDINGVERIFY;                        } else if (WorkerState.STARTMIGRATE == workerstate) {                            workerstate = WorkerState.PENDINGMIGRATE;                        }                        // move on to the next state.                    } else if ((WorkerState.PENDINGVERIFY == workerstate) || (WorkerState.PENDINGMIGRATE == workerstate)) {                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                            LOG.fine(                                    "Pipe " + ((WorkerState.PENDINGVERIFY == workerstate) ? "verify" : "migrate")                                    + "in progress. Continues for "                                    + TimeUtils.toRelativeTimeMillis(absoluteTimeoutAt, TimeUtils.timeNow())                                    + "ms. Next query in " + TimeUtils.toRelativeTimeMillis(nextQueryAt, TimeUtils.timeNow())                                    + "ms.");                        }                        // check to see if we are completely done.                        if (TimeUtils.toRelativeTimeMillis(absoluteTimeoutAt, TimeUtils.timeNow()) <= 0) {                            pipeResolver.removeListener(getPipeID(), queryID);                            if (WorkerState.PENDINGVERIFY == workerstate) {                                if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                                    LOG.info("Pipe \'" + getPipeID() + "\' has migrated from " + destPeer);                                }                                workerstate = WorkerState.STARTMIGRATE;                                // move on to the next state.                                continue;                            } else {                                if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                                    LOG.warning("Pipe \'" + getPipeID() + "\' cannot be migrated and is being closed");                                }                                workerstate = WorkerState.CLOSED;                                close();                                // move on to the next state.                                continue;                            }                        }                        // check if its time ot send another copy of the query.                        if (TimeUtils.toRelativeTimeMillis(nextQueryAt, TimeUtils.timeNow()) <= 0) {                            if (null != destPeer) {                                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                                    LOG.fine(                                            "Sending out verify query (" + queryID + ") for \'" + getPipeID() + "\' to : "                                            + destPeer);                                }                                pipeResolver.sendPipeQuery(pAdv, Collections.singleton(destPeer), queryID);                            } else {                                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                                    LOG.fine("Sending out resolve query (" + queryID + ") for " + getPipeID());                                }                                pipeResolver.sendPipeQuery(pAdv, resolvablePeers, queryID);                            }                            nextQueryAt = TimeUtils.toAbsoluteTimeMillis(                                    Math.max(QUERYINTERVALMIN, (PipeServiceImpl.VERIFYINTERVAL / 50)));                        }                        long sleep = TimeUtils.toRelativeTimeMillis(Math.min(nextQueryAt, absoluteTimeoutAt), TimeUtils.timeNow());                        if (sleep >= 0) {                            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                                LOG.fine("Waiting " + sleep + "ms for response for (" + queryID + ") for " + getPipeID());                            }                            try {                                wait(sleep);                            } catch (InterruptedException woken) {                                Thread.interrupted();                            }                        }                        // move on to the next state.                    } else if (WorkerState.ACQUIREMESSENGER == workerstate) {                        if ((null == destMessenger) || destMessenger.isClosed()) {                            destMessenger = null;                            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                                LOG.fine("Getting messenger to \'" + destPeer + "\' for pipe " + getPipeID());                            }                            destAddress = mkAddress(destPeer, getPipeID());                            // todo 20031011 bondolo@jxta.org This should not be done under sync                            destMessenger = endpoint.getMessenger(destAddress);                            if (destMessenger == null) {                                // We could not get a messenger to the peer, forget it and try again.                                if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                                    LOG.warning("Could not get messenger to : " + destPeer + ". ");                                }                                if (migrated) {                                    // we can't migrate again, we never finished.                                    // the last migrate!                                    workerstate = WorkerState.CLOSED;                                    close();                                } else {                                    workerstate = WorkerState.STARTMIGRATE;                                }                                pipeResolver.removeListener((PipeID) getPipeID(), queryID);                                queryID = -1;                                destPeer = null;                                destAddress = null;                                // move on to the next state.                                continue;                            } else {                                // migration completed.                                migrated = false;                            }                        } else {                            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                                LOG.fine("Using existing messenger to : " + destPeer);                            }                        }                        workerstate = WorkerState.SENDMESSAGES;                        nextVerifyAt = TimeUtils.toAbsoluteTimeMillis(PipeServiceImpl.VERIFYINTERVAL);                        // move on to the next state.                        continue; // can't just fall through because we would start sending messages immediately.                    } else if (WorkerState.SENDMESSAGES == workerstate) {                        // is it time to do verification again?                        if (TimeUtils.toRelativeTimeMillis(nextVerifyAt, TimeUtils.timeNow()) <= 0) {                            workerstate = WorkerState.STARTVERIFY;                            pipeResolver.removeListener(getPipeID(), queryID);                            queryID = -1;                        }                        // move on to the next state.                    } else if (WorkerState.CLOSED == workerstate) {                        queue.clear(); // they aren't going to be sent                        if (null != destMessenger) {                            destMessenger.close();                            destMessenger = null;                        }                        serviceThread = null;                        break;                    } else {                        LOG.warning("Unrecognized state in worker thread : " + workerstate);                    }                }                // now actually send messages. We don't do this under the global sync.                if (WorkerState.SENDMESSAGES == workerstate) {                    Message msg = null;                    try {                        msg = (Message) queue.pop(IDLEWORKERLINGER);                    } catch (InterruptedException woken) {                        Thread.interrupted();                        continue;                    }                    if (null == msg) {                        synchronized (this) {                            // before deciding to die, we need to make sure that                            // nobody snuck something into the queue. If there                            // is, then we have to be the one to service the                            // queue.                            if (null == queue.peek()) {                                if (closed) {                                    workerstate = WorkerState.CLOSED;                                    continue;                                } else {                                    serviceThread = null;                                    break;                                }                            } else {                                continue;                            }                        }                    }                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("Sending " + msg + " on " + getPipeID());                    }                    if (!destMessenger.isClosed()) {                        try {                            destMessenger.sendMessageB(msg, null, null);                        } catch (IOException failed) {                            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                                LOG.log(Level.WARNING, "Failure sending " + msg + " on " + getPipeID(), failed);                            }                        }                    }                    // May be now closed due to failing to send.                    if (destMessenger.isClosed()) {                        synchronized (this) {                            workerstate = WorkerState.ACQUIREMESSENGER;                            destMessenger = null;                        }                    }                }            }        } catch (Throwable all) {            if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {                LOG.log(Level.SEVERE, "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all);            }            // give another thread the chance to start unless one already has.            // If the exception was caused by damaged state on this object then            // starting a new Thread may just cause the same exception again.            // Unfortunate tradeoff.            synchronized (this) {                if (serviceThread == Thread.currentThread()) {                    serviceThread = null;                }            }        } finally {            if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                LOG.info(                        "Thread exit : " + Thread.currentThread().getName() + "\n\tworker state : " + workerstate                        + "\tqueue closed : " + queue.isClosed() + "\tnumber in queue : " + queue.getCurrentInQueue()                        + "\tnumber queued : " + queue.getNumEnqueued() + "\tnumber dequeued : " + queue.getNumDequeued());            }        }    }    /**     * Starts the worker thread if it is not already running.     */    private synchronized void startServiceThread() {        // if there is no service thread, start one.        if ((null == serviceThread) && !closed) {            serviceThread = new Thread(peerGroup.getHomeThreadGroup(), this                    ,                    "Worker Thread for NonBlockingOutputPipe : " + getPipeID());            serviceThread.setDaemon(true);            serviceThread.start();            if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                LOG.info(                        "Thread start : " + serviceThread.getName() + "\n\tworker state : " + workerstate + "\tqueue closed : "                        + queue.isClosed() + "\tnumber in queue : " + queue.getCurrentInQueue() + "\tnumber queued : "                        + queue.getNumEnqueued() + "\tnumber dequeued : " + queue.getNumDequeued());            }        }    }    /**     * Convenience method for constructing a peer endpoint address from its     * peer id     *     * @param destPeer the desitnation peer     * @param pipeID   the pipe to put in the param field.     * @return the pipe endpoint address.     */    protected EndpointAddress mkAddress(ID destPeer, ID pipeID) {        return new EndpointAddress("jxta", destPeer.getUniqueValue().toString(), "PipeService", pipeID.toString());    }    /**     * {@inheritDoc}     */    public synchronized boolean pipeNAKEvent(PipeResolver.Event event) {        if (((workerstate == WorkerState.PENDINGVERIFY) || (workerstate == WorkerState.ACQUIREMESSENGER)                || (workerstate == WorkerState.SENDMESSAGES))                && (event.getPeerID().equals(destPeer) && (event.getQueryID() == queryID))) {            // we have been told that the destination peer no longer wants            // to talk with us. We will try to migrate to another peer.            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.warning("Pipe \'" + getPipeID() + "\' is closed at " + event.getPeerID());            }            workerstate = WorkerState.STARTMIGRATE;            pipeResolver.removeListener(getPipeID(), queryID);            queryID = -1;            destPeer = null;            destAddress = null;            if (null != destMessenger) {                destMessenger.close();                destMessenger = null;            }            notify();            return true;        }        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Ignoring NAK from " + event.getPeerID());        }        // didn't refer to us or we don't care.        return false;    }    /**     * {@inheritDoc}     */    public synchronized boolean pipeResolveEvent(PipeResolver.Event event) {        if (((workerstate == WorkerState.PENDINGVERIFY) || (workerstate == WorkerState.PENDINGMIGRATE))                && (event.getQueryID() == queryID)) {            if ((workerstate == WorkerState.PENDINGVERIFY) && !event.getPeerID().equals(destPeer)) {                // not from the right peer so ignore it.                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("Ignoring response from " + event.getPeerID());                }                return false;            } else {                if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                    LOG.info("Pipe \'" + getPipeID() + "\' is verified for " + destPeer);                }            }            workerstate = WorkerState.ACQUIREMESSENGER;            migrated = true;            destPeer = event.getPeerID();            if ((workerstate == WorkerState.PENDINGMIGRATE) && Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                LOG.info("Pipe \'" + getPipeID() + "\' has migrated to " + destPeer);            }            notify();            return true;        }        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Ignoring resolve from " + event.getPeerID());        }        // didn't refer to us or we don't care.        return false;    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -