📄 beepnonblockingmessenger.java
字号:
/*
* Copyright (c) 2001 Sun Microsystems, Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution,
* if any, must include the following acknowledgment:
* "This product includes software developed by the
* Sun Microsystems, Inc. for Project JXTA."
* Alternately, this acknowledgment may appear in the software itself,
* if and wherever such third-party acknowledgments normally appear.
*
* 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
* not be used to endorse or promote products derived from this
* software without prior written permission. For written
* permission, please contact Project JXTA at http://www.jxta.org.
*
* 5. Products derived from this software may not be called "JXTA",
* nor may "JXTA" appear in their name, without prior written
* permission of Sun.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL SUN MICROSYSTEMS OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of Project JXTA. For more
* information on Project JXTA, please see
* <http://www.jxta.org/>.
*
* This license is based on the BSD license adopted by the Apache Foundation.
*
* $Id: BeepNonBlockingMessenger.java,v 1.4 2001/11/07 23:20:51 jice Exp $
*/
package net.jxta.impl.endpoint.beep;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.IOException;
import org.apache.log4j.Category; import org.apache.log4j.Priority;
import org.beepcore.beep.core.ByteDataStream;
import org.beepcore.beep.core.Channel;
import org.beepcore.beep.core.DataStream;
import org.beepcore.beep.lib.Reply;
import org.beepcore.beep.core.BEEPError;
import net.jxta.document.MimeMediaType;
import net.jxta.endpoint.EndpointMessenger;
import net.jxta.endpoint.Message;
import net.jxta.impl.endpoint.MessageWireFormatFactory;
/**
* An EndpointMessenger allows to send Messages onto
* its associated TransportProtocol.
*
* @since JXTA 1.0
*/
public class BeepNonBlockingMessenger implements EndpointMessenger {
/**
* Our Log4J Category
**/
private static final Category LOG = Category.getInstance(BeepNonBlockingMessenger.class.getName());
/**
* The session associated with this messenger
**/
private BeepSession session;
/**
* The channel we are using for this messenger
**/
private Channel channel;
/**
* Create a new messagener for the specified session.
* @param session The session for which the messenger is being created.
* @throws IOException Thrown when the messenger cannot be created.
*/
public BeepNonBlockingMessenger( BeepSession session ) throws IOException {
// Not much has to be done
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Constructor");
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" dest addr = " + session.getDestEndpoint() );
this.session = session;
channel = session.getNewChannel();
}
/**
* Send a TransportMessage onto that link.
*
* @param message TransportMessage to be sent.
* @throws IOException Thrown when the message cannot be sent.
*/
public void sendMessage(Message message) throws IOException {
/* FIXME 20010723 bondolo@jxta.org
*
* At this point, the session and/or channel may have been closed due
* to idle timeouts. We don't currently reopen the session or get
* a new channel. this makes this messesnger dead in the water.
*/
if( !session.isActive() ) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "Session was closed with " + session.getDestEndpoint() );
throw new IOException( "Session was closed with " + session.getDestEndpoint() );
}
try {
// Set the message with the appropriate src and dest address
message.setDestinationAddress( session.getDestEndpoint() );
message.setSourceAddress( session.getSrcEndpoint() );
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("sendMessage");
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" from = " + session.getSrcEndpoint() );
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" to = " + session.getDestEndpoint() );
// XXX 20010725 bondolo@jxta.org This buffering is due to not
// knowing the size of the message
ByteArrayOutputStream messy = new ByteArrayOutputStream();
MessageWireFormatFactory.newMessageWireFormat(
new MimeMediaType( "application/x-jxta-msg" ) )
.writeMessage(messy, message);
Reply reply = new Reply();
DataStream msg = new ByteDataStream( messy.toByteArray() );
channel.sendMSG( msg, reply);
if (LOG.isEnabledFor(Priority.INFO)) LOG.info( "message sent, waiting for reply.");
// Get the reply to the request
DataStream ds = reply.getNextReply().getDataStream();
InputStream is = ds.getInputStream();
ByteArrayOutputStream rpy = new ByteArrayOutputStream();
// Read the data in the reply
while (ds.isComplete() == false || is.available() > 0) {
int res = is.read();
if( -1 != res )
rpy.write( ((byte) res) );
}
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "message sent, got reply.");
if( !rpy.toString().equals( "OK" ) ) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "Bad reply" );
}
} catch (BEEPError e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "Error sending request (" + e.getCode() +
": " + e.getMessage() + ")");
return;
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "Error sending request (" +
e.getMessage() + ")");
return;
}
session.updateLastUsed( );
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "message sent OK");
}
/**
* Close this Messenger
*/
public void close() {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "close for " + session.getDestEndpoint() );
session.releaseChannel( channel );
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -