📄 jxtaserversocket.java
字号:
/* * $Id: JxtaServerSocket.java,v 1.41 2006/05/25 00:41:34 hamada Exp $ * * Copyright (c) 2001-2006 Sun Microsystems, Inc. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Sun Microsystems, Inc. for Project JXTA." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must * not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact Project JXTA at http://www.jxta.org. * * 5. Products derived from this software may not be called "JXTA", * nor may "JXTA" appear in their name, without prior written * permission of Sun. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of Project JXTA. For more * information on Project JXTA, please see * <http://www.jxta.org/>. * * This license is based on the BSD license adopted by the Apache Foundation. * */package net.jxta.socket;import java.io.IOException;import java.io.InputStream;import java.net.ServerSocket;import java.net.Socket;import java.net.SocketAddress;import java.net.SocketException;import java.net.SocketTimeoutException;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeUnit;import net.jxta.document.AdvertisementFactory;import net.jxta.document.MimeMediaType;import net.jxta.document.StructuredDocument;import net.jxta.document.StructuredDocumentFactory;import net.jxta.document.XMLDocument;import net.jxta.endpoint.InputStreamMessageElement;import net.jxta.endpoint.Message;import net.jxta.endpoint.MessageElement;import net.jxta.endpoint.Messenger;import net.jxta.endpoint.TextDocumentMessageElement;import net.jxta.id.IDFactory;import net.jxta.peergroup.PeerGroup;import net.jxta.peergroup.PeerGroupID;import net.jxta.pipe.InputPipe;import net.jxta.pipe.PipeMsgEvent;import net.jxta.pipe.PipeMsgListener;import net.jxta.pipe.PipeService;import net.jxta.protocol.PeerAdvertisement;import net.jxta.protocol.PipeAdvertisement;import org.apache.log4j.Level;import org.apache.log4j.Logger;/** * JxtaServerSocket is a bi-directional Pipe that behaves very much like * ServerSocket. It creates an inputpipe and listens for pipe connection requests. * JxtaServerSocket also defines it own protocol. Requests arrive as a JXTA Message * with the following elements: * * <p> * <Cred> Credentials which can be used to determine trust </Cred> * <p> * <reqPipe> requestor's pipe advertisement </reqPipe> * <p> * <remPipe> Remote pipe advertisement </remPipe> * <p> * <reqPeer> Remote peer advertisement </reqPeer> * <p> * <stream> determine whether the connection is reliable, or not </stream> * <p> * <close> close request </close> * <p> * <data> Data </data> * <p> * JxtaServerSocket then creates a new private pipe, listens for messages on that pipe, * resolves the requestor's pipe, and sends a <remPipe> private pipe created </remotePipe> * advertisement back, where the remote side is resolved. */public class JxtaServerSocket extends ServerSocket implements PipeMsgListener { private static final Logger LOG = Logger.getLogger(JxtaServerSocket.class.getName()); public static final String nameSpace = "JXTASOC"; public static final String credTag = "Cred"; public static final String reqPipeTag = "reqPipe"; public static final String remPeerTag = "remPeer"; public static final String remPipeTag = "remPipe"; public static final String dataTag = "data"; public static final String closeTag = "close"; public static final String streamTag = "stream"; protected PeerGroup group; protected InputPipe serverPipe; protected PipeAdvertisement pipeadv; protected int backlog = 50; protected long timeout = 60 * 1000; protected String closeLock = new String("closeLock"); protected BlockingQueue queue = null; protected boolean created = false; protected boolean bound = false; protected boolean closed = false; protected StructuredDocument myCredentialDoc = null; /** * Default Constructor * <p> * backlog default of 50 * <p> * timeout defaults to 60 seconds, i.e. blocking. * <p> * A call to bind() is needed to finish initializing this object. */ public JxtaServerSocket() throws IOException {} /** * Constructs and binds a JxtaServerSocket using a JxtaSocketAddress as * the address. The default backlog will be 50, and the default timeout * 60 seconds. * * @param address an instance of JxtaSocketAddress * @throws IOException * @see net.jxta.socket.JxtaSocketAddress */ public JxtaServerSocket(SocketAddress address) throws IOException { this(address, 50); } /** * Constructor for the JxtaServerSocket * <p> * The backlog defaults to 50. * <p> * The timeout default to 60 seconds, i.e. blocking. * * @param group JXTA PeerGroup * @param pipeadv PipeAdvertisement on which pipe requests are accepted * @exception IOException if an I/O error occurs */ public JxtaServerSocket(PeerGroup group, PipeAdvertisement pipeadv) throws IOException { this(group, pipeadv, 50); } /** * Constructs and binds a JxtaServerSocket using a JxtaSocketAddress as * the address. The default timeout will be 60 seconds. * * @param address an instance of JxtaSocketAddress * @param backlog the size of the backlog queue * @throws IOException * @see net.jxta.socket.JxtaSocketAddress */ public JxtaServerSocket(SocketAddress address, int backlog) throws IOException { this(address, backlog, 60000); } /** * Constructor for the JxtaServerSocket object * * @param group JXTA PeerGroup * @param pipeadv PipeAdvertisement on which pipe requests are accepted * @param backlog the maximum length of the queue. * @exception IOException if an I/O error occurs */ public JxtaServerSocket(PeerGroup group, PipeAdvertisement pipeadv, int backlog) throws IOException { this(group, pipeadv, backlog, 60000); } /** * Constructs and binds a JxtaServerSocket using a JxtaSocketAddress as * the address. * * @param address an instance of JxtaSocketAddress * @param backlog the size of the backlog queue * @param timeout connection timeout in milliseconds * @throws IOException * @see net.jxta.socket.JxtaSocketAddress */ public JxtaServerSocket(SocketAddress address, int backlog, int timeout) throws IOException { if (backlog <= 0) { throw new IllegalArgumentException("backlog must be > 0"); } if (timeout < 0) { throw new IllegalArgumentException("timeout must be >= 0"); } if (0 == timeout) { this.timeout = Long.MAX_VALUE; } else { this.timeout = (long) timeout; } bind(address, backlog); } /** * Constructor for the JxtaServerSocket object. * * @param group JXTA PeerGroup * @param pipeadv PipeAdvertisement on which pipe requests are accepted * @param backlog the maximum length of the queue. * @param timeout the specified timeout, in milliseconds * @exception IOException if an I/O error occurs */ public JxtaServerSocket(PeerGroup group, PipeAdvertisement pipeadv, int backlog, int timeout) throws IOException { if (pipeadv.getType() != null && pipeadv.getType().equals(PipeService.PropagateType)) { throw new IOException("Propagate pipe advertisements are not supported"); } this.group = group; this.pipeadv = pipeadv; if (backlog <= 0) { throw new IllegalArgumentException("backlog must be > 0"); } if (timeout < 0) { throw new IllegalArgumentException("timeout must be >= 0"); } if (0 == timeout) { this.timeout = Long.MAX_VALUE; } else { this.timeout = (long) timeout; } queue = new ArrayBlockingQueue(backlog); PipeService pipeSvc = group.getPipeService(); serverPipe = pipeSvc.createInputPipe(pipeadv, this); setBound(); } /** * Binds the <code>JxtaServerSocket</code> to a specific pipe advertisement * * @param group JXTA PeerGroup * @param pipeadv PipeAdvertisement on which pipe requests are accepted * @exception IOException if an I/O error occurs */ public void bind(PeerGroup group, PipeAdvertisement pipeadv) throws IOException { if (isBound()) { throw new SocketException("Already bound"); } if (pipeadv.getType() != null && pipeadv.getType().equals(PipeService.PropagateType)) { throw new IOException("Propagate pipe advertisements are not supported"); } this.group = group; this.pipeadv = pipeadv; PipeService pipeSvc = group.getPipeService(); serverPipe = pipeSvc.createInputPipe(pipeadv, this); setBound(); } /** * Binds the <code>JxtaServerSocket</code> to a specific pipe advertisement * * @param group JXTA PeerGroup * @param pipeadv PipeAdvertisement on which pipe requests are accepted * @param backlog the maximum length of the queue. * @exception IOException if an I/O error occurs */ public void bind(PeerGroup group, PipeAdvertisement pipeadv, int backlog) throws IOException { this.backlog = backlog; queue = new ArrayBlockingQueue(backlog); bind(group, pipeadv); } /** * {@inheritDoc} * * <p/>Used to bind a JxtaServerSocket created with the no-arg constructor. */ public void bind(SocketAddress endpoint) throws IOException { bind(endpoint, backlog); } /** * {@inheritDoc} * * <p/>Used to bind a JxtaServerSocket created with the no-arg constructor. */ public void bind(SocketAddress endpoint, int backlog) throws IOException { if (endpoint instanceof JxtaSocketAddress) { JxtaSocketAddress socketAddress = (JxtaSocketAddress) endpoint; PeerGroup pg = PeerGroup.globalRegistry.lookupInstance(socketAddress.getPeerGroupId()); if (pg == null) { throw new IOException( "Can't connect socket in PeerGroup with id " + socketAddress.getPeerGroupId().toString() + ". No running instance of the group is registered."); } bind(pg.getWeakInterface(), socketAddress.getPipeAdv(), backlog); pg.unref(); } else { throw new IllegalArgumentException("Unsupported subclass of SocketAddress; " + "use JxtaSocketAddress instead."); } } /** * {@inheritDoc}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -