📄 jxtabidipipe.java
字号:
closePipe(); } catch (IOException ie) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("failed during close", ie); } } return true; } return false; } private void receiveMessage(Message message) { Iterator i = message.getMessageElements(Defs.NAMESPACE, Defs.MIME_TYPE_ACK); if (i.hasNext()) { if (ros != null) { ros.recv(message); } return; } i = message.getMessageElements(Defs.NAMESPACE, Defs.MIME_TYPE_BLOCK); if (i.hasNext()) { // It can happen that we receive messages for the input stream // while we have not finished creating it. try { synchronized (finalLock) { while (waiting) { finalLock.wait(timeout); } } } catch (InterruptedException ie) {} if (ris != null) { ris.recv(message); } } } /** * Gets the Maximum Retry Timeout of the reliability layer * * @return The maximum retry Timeout value */ public synchronized int getMaxRetryTimeout() { return maxRetryTimeout; } /** * Gets the Maximum Retry Timeout of the reliability layer * * @param maxRetryTimeout The new maximum retry timeout value * @exception IllegalArgumentException if maxRetryTimeout exceeds jxta platform maximum retry timeout */ public synchronized void setMaxRetryTimeout(int maxRetryTimeout) { if (maxRetryTimeout <= 0 || maxRetryTimeout > MAXRETRYTIMEOUT) { throw new IllegalArgumentException("Invalid Maximum retry timeout :"+maxRetryTimeout+" Exceed Global maximum retry timeout :"+MAXRETRYTIMEOUT); } this.maxRetryTimeout = maxRetryTimeout; } /** * Gets the Retry Timeout of the reliability layer * * @return The retry Timeout value */ public synchronized int getRetryTimeout() { return retryTimeout; } /** * Sets the Retry Timeout of the underlying reliability layer * . * In reliable mode it is possible for this call to block * trying to obtain a lock on reliable input stream * * @param retryTimeout The new retry timeout value * @exception IOException if an I/O error occurs */ public synchronized void setRetryTimeout(int retryTimeout) throws IOException { if (timeout <= 0) { throw new IllegalArgumentException("Invalid Socket timeout :"+retryTimeout); } this.retryTimeout = retryTimeout; if (outgoing != null) { outgoing.setTimeout(retryTimeout); } if (ris != null) { ris.setTimeout(retryTimeout); } } /** * When in reliable mode, gets the Reliable library window size * * @return The windowSize value * @exception IOException if an I/O error occurs */ public synchronized int getWindowSize() { return windowSize; } /** * When in reliable mode, sets the Reliable library window size * * @param windowSize The new window size value * @exception IOException if an I/O error occurs */ public synchronized void setWindowSize(int windowSize) throws IOException { if (isBound()) { throw new IOException("Socket bound. Can not change the window size"); } this.windowSize = windowSize; queue.setMaxQueueSize(windowSize); } /** * This method is invoked by the Reliablity library for each incoming data message * * @param message Incoming message */ public void processIncomingMessage(Message message) { if (!hasClose(message)) { PipeMsgEvent event = new PipeMsgEvent(this, message, (PipeID) in.getPipeID()); push(event); } } private void push(PipeMsgEvent event) { if (msgListener == null) { try { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("push message onto queue"); } queue.push(event, -1); } catch (InterruptedException ie) {} } else { dequeue(); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("calling message listener"); } msgListener.pipeMsgEvent(event); } } /** * Send a message * In non reliable mode the message is sent directly over a * <code>Messenger</code> * * *@param msg Message to send to the remote side *@return true if message was succussfully enqueued * @see net.jxta.endpoint.Message */ public boolean sendMessage(Message msg) throws IOException { if (isReliable) { int seqn = ros.send(msg); return (seqn > 0); } else { return msgr.sendMessage(msg, null, null); } } private void dequeue() { while (queue != null && queue.getCurrentInQueue() > 0) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("dequeing messages onto message listener"); } msgListener.pipeMsgEvent((PipeMsgEvent) queue.pop()); } } /** * {@inheritDoc} */ public void outputPipeEvent(OutputPipeEvent event) { OutputPipe op = event.getOutputPipe(); if (op.getAdvertisement() == null) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("The output pipe has no internal pipe advertisement. Continueing anyway."); } } if (op.getAdvertisement() == null || pipeAdv.equals(op.getAdvertisement())) { synchronized (acceptLock) { // modify op within lock to prevent a race with the if. if (connectOutpipe == null) { connectOutpipe = op; // set to null to avoid closure op = null; } acceptLock.notifyAll(); } // Ooops one too many, we were too fast re-trying. if (op != null) { op.close(); } } else { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Unexpected OutputPipe :"+op); } } } /** * A lightweight output pipe constructor, note the return type * Since all the info needed is available, there's no need for to * use the pipe service to resolve the pipe we have all we need * to construct a messenger. * *@param group group context *@param pipeAdv Remote Pipe Advertisement *@param peer Remote Peer advertisement *@return Messenger */ protected static Messenger lightweightOutputPipe(PeerGroup group, PipeAdvertisement pipeAdv, PeerAdvertisement peer) { EndpointService endpoint = group.getEndpointService(); ID opId = pipeAdv.getPipeID(); String destPeer = (peer.getPeerID().getUniqueValue()).toString(); // Get an endpoint messenger to that address EndpointAddress addr; if (pipeAdv.getType().equals(PipeService.UnicastType)) { addr = new EndpointAddress("jxta", destPeer, "PipeService", opId.toString()); } else if (pipeAdv.getType().equals(PipeService.UnicastSecureType)) { addr = new EndpointAddress("jxtatls", destPeer, "PipeService", opId.toString()); } else { // not a supported type return null; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Creating a lightweightOutputPipe()"); } return endpoint.getMessenger(addr); } /** * Not implemented yet */ protected boolean checkCred(StructuredDocument cred) { //FIXME need to check credentials return true; } /** * Send a close message to the remote side */ private void sendClose() { Message msg = new Message(); msg.addMessageElement(JxtaServerPipe.nameSpace, new StringMessageElement(JxtaServerPipe.closeTag, "close", null)); try { sendMessage(msg); } catch (IOException ie) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.error("failed during close", ie); } } } /** * Returns the message listener for this pipe * @return PipeMsgListener * @deprecated use getMessageListener instead */ public PipeMsgListener getListener() { return getMessageListener(); } /** * Returns the message listener for this pipe * @return PipeMsgListener * */ public PipeMsgListener getMessageListener() { return msgListener; } /** * Sets message listener for a pipe spawned by the JxtaServerPipe. * There is a window where a message could arrive prior to listener being * registered therefore a message queue is created to queue messages, once * a listener is registered these messages will be dequeued by calling the * listener until the queue is empty * * @param msgListener New value of property listener. * @deprecated use setMessageListener instead */ public void setListener(PipeMsgListener msgListener) { setMessageListener(msgListener); } /** * Sets message listener for a pipe spawned by the JxtaServerPipe. * There is a window where a message could arrive prior to listener being * registered therefore a message queue is created to queue messages, once * a listener is registered these messages will be dequeued by calling the * listener until the queue is empty * * @param msgListener New value of property listener. * */ public void setMessageListener(PipeMsgListener msgListener) { this.msgListener = msgListener; // if there are messages enqueued then dequeue them onto the msgListener dequeue(); } /** * Sets a Pipe event listener, set listener to null to unset the listener * * @param eventListener New value of property listener. * @deprecated use setPipeEventListener instead */ public void setListener(PipeEventListener eventListener) { setPipeEventListener(eventListener); } /** * Sets a Pipe event listener, set listener to null to unset the listener * * @param eventListener New value of property listener. * */ public void setPipeEventListener(PipeEventListener eventListener) { this.eventListener = eventListener; } /** * Returns the Pipe event listener for this pipe * @return PipeMsgListener * */ public PipeEventListener getPipeEventListener() { return eventListener; } /** * Gets a message from the queue. If no Object is immediately available, * then wait the specified amount of time for a message to be inserted. * * @param timeout Amount of time to wait in milliseconds for an object to * be available. Per Java convention, a timeout of zero (0) means wait an * infinite amount of time. Negative values mean do not wait at all. * @return The next message in the queue., if a listener is registered calls * to this method will return null * @throws InterruptedException if the operation is interrupted before * the timeout interval is completed. */ public Message getMessage(int timeout) throws InterruptedException { if (queue == null || msgListener != null) { return null; } else { PipeMsgEvent ev = (PipeMsgEvent) queue.pop(timeout); if (ev != null) { return ev.getMessage(); } else { return null; } } } /** * Returns the Assigned PipeAdvertisement * @return the Assigned PipeAdvertisement */ public PipeAdvertisement getPipeAdvertisement() { return pipeAdv; } /** * {@inheritDoc} * * <p/>Closes the JxtaBiDiPipe. */ protected synchronized void finalize() throws Throwable { if(!closed) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("JxtaBiDiPipe is being finalized without being previously closed. This is likely a users bug."); } } close(); super.finalize(); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -