📄 jxtabidipipe.java
字号:
} catch (SocketTimeoutException io) { if (msgr instanceof TcpMessenger) { ((TcpMessenger) msgr).sendMessageDirect(msg, null, null, true); return true; } else { return msgr.sendMessage(msg, null, null); } } catch (IOException io) { closePipe(true); IOException exp = new IOException("IO error occured during sendMessage()"); exp.initCause(io); throw exp; } } } private void dequeue() { if (!dequeued && (null != msgListener)) { while (queue != null && !queue.isEmpty()) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("dequeing messages onto message listener"); } try { msgListener.pipeMsgEvent(queue.take()); } catch (InterruptedException e) { //ignored } } dequeued = false; } } /** * {@inheritDoc} */ public void outputPipeEvent(OutputPipeEvent event) { OutputPipe op = event.getOutputPipe(); if (op.getAdvertisement() == null) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("The output pipe has no internal pipe advertisement. Continueing anyway."); } } if (op.getAdvertisement() == null || pipeAdv.equals(op.getAdvertisement())) { synchronized (acceptLock) { // modify op within lock to prevent a race with the if. if (connectOutpipe == null) { connectOutpipe = op; // set to null to avoid closure op = null; } acceptLock.notifyAll(); } // Ooops one too many, we were too fast re-trying. if (op != null) { op.close(); } } else { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Unexpected OutputPipe :" + op); } } } /** * A lightweight direct messenger output pipe constructor, note the return type * Since all the info needed is available, there's no need for to * use the pipe service to resolve the pipe we have all we need * to construct a messenger. * * @param group group context * @param pipeAdv Remote Pipe Advertisement * @param peer Remote Peer advertisement * @return Messenger */ protected static Messenger getDirectMessenger(PeerGroup group, PipeAdvertisement pipeAdv, PeerAdvertisement peer) { // Get an endpoint messenger to that address if (pipeAdv.getType().equals(PipeService.PropagateType)) { throw new IllegalArgumentException("Invalid pipe type " + pipeAdv.getType()); } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Creating a Direct Messenger"); } if (pipeAdv.getType().equals(PipeService.UnicastType)) { EndpointService endpoint = group.getEndpointService(); EndpointAddress pipeEndpoint = new EndpointAddress("jxta", (peer.getPeerID().getUniqueValue()).toString(), "PipeService", pipeAdv.getPipeID().toString()); return endpoint.getDirectMessenger(pipeEndpoint, peer, true); } return null; } /** * A lightweight output pipe constructor, note the return type * Since all the info needed is available, there's no need for to * use the pipe service to resolve the pipe we have all we need * to construct a messenger. * * @param group group context * @param pipeAdv Remote Pipe Advertisement * @param peer Remote Peer advertisement * @return Messenger */ protected static Messenger lightweightOutputPipe(PeerGroup group, PipeAdvertisement pipeAdv, PeerAdvertisement peer) { EndpointService endpoint = group.getEndpointService(); ID opId = pipeAdv.getPipeID(); String destPeer = (peer.getPeerID().getUniqueValue()).toString(); // Get an endpoint messenger to that address EndpointAddress addr; if (pipeAdv.getType().equals(PipeService.UnicastType)) { addr = new EndpointAddress("jxta", destPeer, "PipeService", opId.toString()); } else if (pipeAdv.getType().equals(PipeService.UnicastSecureType)) { addr = new EndpointAddress("jxtatls", destPeer, "PipeService", opId.toString()); } else { // not a supported type return null; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Creating a lightweightOutputPipe()"); } return endpoint.getMessenger(addr); } /** * Not implemented yet * * @param cred the credential document * @return always returns true */ protected boolean checkCred(StructuredDocument cred) { // FIXME need to check credentials return true; } /** * Send a close message to the remote side */ private void sendClose() { if (!direct && isReliable && ros.isClosed()) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("ReliableOutputStream is already closed. Skipping close message"); } return; } Message msg = new Message(); msg.addMessageElement(JxtaServerPipe.nameSpace, new StringMessageElement(JxtaServerPipe.closeTag, "close", null)); try { sendMessage(msg); // ros will not take any new message, now. if (!direct && ros != null) { ros.close(); } } catch (IOException ie) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.SEVERE, "failed during close", ie); } } } /** * Returns the message listener for this pipe * * @return PipeMsgListener * @deprecated use getMessageListener instead */ @Deprecated public PipeMsgListener getListener() { return getMessageListener(); } /** * Returns the message listener for this pipe * * @return PipeMsgListener */ public PipeMsgListener getMessageListener() { return msgListener; } /** * Sets message listener for a pipe spawned by the JxtaServerPipe. * There is a window where a message could arrive prior to listener being * registered therefore a message queue is created to queue messages, once * a listener is registered these messages will be dequeued by calling the * listener until the queue is empty * * @param msgListener New value of property listener. * @deprecated use setMessageListener instead */ @Deprecated public void setListener(PipeMsgListener msgListener) { setMessageListener(msgListener); } /** * Sets message listener for a pipe spawned by the JxtaServerPipe. * There is a window where a message could arrive prior to listener being * registered therefore a message queue is created to queue messages, once * a listener is registered these messages will be dequeued by calling the * listener until the queue is empty. * <p/> * Sending messages vis {@link #sendMessage(Message)} from within a * {@code PipeMsgListener} may result in a deadlock due to contention * between the sending and receiving portions of BiDi pipes. * * @param msgListener New value of property listener. */ public void setMessageListener(PipeMsgListener msgListener) { this.msgListener = msgListener; // if there are messages enqueued then dequeue them onto the msgListener dequeue(); } /** * Sets a Pipe event listener, set listener to null to unset the listener * * @param eventListener New value of property listener. * @deprecated use setPipeEventListener instead */ @Deprecated public void setListener(PipeEventListener eventListener) { setPipeEventListener(eventListener); } /** * Sets a Pipe event listener, set listener to null to unset the listener * * @param eventListener New value of property listener. */ public void setPipeEventListener(PipeEventListener eventListener) { this.eventListener = eventListener; } /** * Returns the Pipe event listener for this pipe * * @return PipeMsgListener */ public PipeEventListener getPipeEventListener() { return eventListener; } /** * Sets a Pipe state listener, set listener to null to unset the listener * * @param stateListener New value of property listener. */ public void setPipeStateListener(PipeStateListener stateListener) { this.stateListener = stateListener; } /** * Returns the Pipe state listener for this pipe * * @return PipeMsgListener */ public PipeStateListener getPipeStateListener() { return stateListener; } /** * Gets a message from the queue. If no Object is immediately available, * then wait the specified amount of time for a message to be inserted. * * @param timeout Amount of time to wait in milliseconds for an object to * be available. Per Java convention, a timeout of zero (0) means wait an * infinite amount of time. Negative values mean do not wait at all. * @return The next message in the queue. if a listener is registered calls * to this method will return null * @throws InterruptedException if the operation is interrupted before * the timeout interval is completed. */ public Message getMessage(int timeout) throws InterruptedException { if (queue == null || msgListener != null) { return null; } else { PipeMsgEvent ev = queue.poll(timeout, TimeUnit.MILLISECONDS); if (ev != null) { return ev.getMessage(); } else { return null; } } } /** * Returns the Assigned PipeAdvertisement * * @return the Assigned PipeAdvertisement */ public PipeAdvertisement getPipeAdvertisement() { return pipeAdv; } /** * {@inheritDoc} * <p/> * Closes the JxtaBiDiPipe. */ @Override protected synchronized void finalize() throws Throwable { if (!closed) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("JxtaBiDiPipe is being finalized without being previously closed. This is likely a users bug."); } close(); } super.finalize(); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -