📄 tcptransport.java
字号:
socket = null;
}
}
public void runMulticastServer() {
if (! allowMulticast)
return;
byte[] buffer;
for (;;) {
buffer = new byte [propagateSize];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
try {
multicastSocket.receive(packet);
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.ERROR)) LOG.error("failure in runMulticastServer ", e);
break;
}
processReceivingBuffer(buffer);
}
}
public void processMulticast(byte[] buffer) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("processMulticast starts");
if ((buffer == null) || (buffer.length < Header.length) ) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" buffer null or truncated.");
return;
}
try {
// Get the header
Header header = new Header();
if (! header.initFromNetwork(buffer, 0)) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "header is corrupted. Msg ignored." );
throw new IOException( "header is corrupted. Msg ignored." );
}
if( header.size > buffer.length ) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "size from header is larger than buffer. Msg was truncated." );
throw new IOException( "size from header is larger than buffer. Msg was truncated." );
}
ByteArrayInputStream bais = new ByteArrayInputStream(buffer, Header.length, header.size);
Message msg = endpoint.newMessage();
MessageWireFormatFactory.newMessageWireFormat(
new MimeMediaType( "application/x-jxta-msg" ) ).readMessage(bais, msg);
// Give the message to the EndpointService Manager
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" handing message to EndpointService");
if (!allowMulticast) {
return;
}
// Notify an existing TcpConnection for the source of the message
// that the peer might be up again.
EndpointAddress destAddr = msg.getSourceAddress();
if (destAddr != null) {
checkConnection(destAddr);
}
// Demux the message for the upper layers.
endpoint.demux(msg);
// Make sure we are not hogging the cpu with input
// thus preventing it from being processed.
// It is better to slow the sending side a bit.
Thread.yield();
} catch (IOException e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" discard message - exception " + e);
// Just discard the message. Multicast are not reliable
return;
}
}
public void runReceive(Socket socket, boolean keep) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("runReceive starts");
if (socket == null) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" no socket");
return;
}
InputStream inputStream = null;
try {
inputStream = socket.getInputStream();
if (inputStream == null) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" no inputStream");
socket.close();
return;
}
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" failed to create inputStream");
try {
socket.close();
} catch ( Exception ignored ) {
}
return;
}
while (true) {
int size = 0;
try {
// Get the header
byte[] headerBuffer = new byte [Header.length];
size = inputStream.read(headerBuffer, 0, Header.length);
if( -1 == size ) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( " EOF on inputsteam. Closing socket" );
inputStream.close();
socket.close();
return;
}
// Send keepalive info as soon as the cnx is extablished.
// That'll speed-up the closing of this transaction for the
// other side.
try {
OutputStream op = socket.getOutputStream();
if (op != null) {
byte rep;
if (keep) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" keep alive");
// Keep alive ok
rep = 1;
} else {
// Keep alive not ok
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" not keep alive");
rep = 0;
}
op.write(rep);
op.flush();
}
else
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Cannot send keepalive");
} catch (Exception e) {
// Not fatal
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Cannot send keepalive ok");
}
if (size != Header.length) {
// We assume that the header is transmitted in one chunk
// Discard message
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" wrong header size= " + size +
" expected= " + Header.length);
inputStream.close();
socket.close();
return;
}
Header header = new Header();
if (! header.initFromNetwork(headerBuffer, 0)) {
// The stream is broken. Discard the message
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" header corrupted, closing inputStream.");
inputStream.close();
socket.close();
return;
}
size = 0;
int tmpSize = 0;
byte[] buffer = new byte [header.size];
while (size < header.size) {
try {
tmpSize = inputStream.read(buffer,
size,
header.size - size);
} catch (Exception e) {
// The stream is broken. Discard the message
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" inputStream is broken");
inputStream.close();
socket.close();
return;
}
if (tmpSize == -1) {
// No more bytes to receive. This is an error. Discard the
// message.
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" no more bytes to receive - incomplete message");
inputStream.close();
socket.close();
return;
}
size += tmpSize;
}
if (size != header.size) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" header size is invalid");
inputStream.close();
socket.close();
return;
}
// Give the message to the endpoint manager
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" handing incoming message from " + socket.getInetAddress().getHostAddress() + " to EndpointService");
ByteArrayInputStream bais = new ByteArrayInputStream(buffer, 0, header.size);
Message msg = endpoint.newMessage();
MessageWireFormatFactory.newMessageWireFormat(
new MimeMediaType( "application/x-jxta-msg" ) )
.readMessage(bais, msg);
// Notify an existing TcpConnection for the source of the message
// that the peer might be up again.
EndpointAddress destAddr = msg.getSourceAddress();
if (destAddr != null) {
checkConnection(destAddr);
}
// Demux the message for the upper layers.
endpoint.demux(msg);
// Make sure we are not hogging the cpu with input
// thus preventing it from being processed.
// It is better to slow the sending side a bit.
Thread.yield();
if (!keep) {
// No keep alive. Close the socket and return
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Not keepalive. close socket");
inputStream.close();
socket.close();
return;
}
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("runReceiveError " + e);
if (inputStream != null) {
try {
inputStream.close();
} catch (Exception ez1) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("runReceiveError cannot close inputStream " + ez1);
}
}
if (socket != null) {
try {
socket.close();
} catch (Exception ez1) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("runReceiveError cannot close socket " + ez1);
}
}
return;
}
}
}
/**
* Creates and return an EndpointMessenger
*/
public EndpointMessenger getMessenger(EndpointAddress dst)
throws IOException {
// XXX: the following is a work around in order to
// strip out peers that advertise a local IP address
// as their EndpointService (127.0.0.1)
// lomax@jxta.org
String addr = dst.getProtocolAddress();
if (addr.indexOf("127.0.0.1") >= 0) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("getMessenger is trying to get 127.0.0.1 - discard");
return null;
}
return new TcpNonBlockingMessenger(dst, this, connManager);
}
/**
* Propagates a TransportMessage on this EndpointProtocol
*
* @param message the TransportMessage to be propagated
* @param pName the name of a service
* @param pParams parameters for this message.
* @param prunePeer ????
* @exception IOException thrown if the message could not be sent for some reason.
*/
public synchronized void propagate(Message message,
String pName,
String pParams,
String prunePeer) throws IOException {
if (! allowMulticast)
return;
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("propagate");
LOG.debug("prunepeer = " + prunePeer);
}
// First build the destination and source addresses
// While mAddress is a global variable which is initialized once with
// the local EndpointAddress for TCP multicast, the following manipulation
// is thread-safe: mAddress is used only in this method, which itself is
// synchronized. Synchronizing this method is not such a bad idea anyway, since
// it might be a good thing to not allow concurrent IP multicast: that naturally
// bonds the usage of ip-multicast in a linear matter and not exponantial.
mAddress.setServiceName(pName);
mAddress.setServiceParameter(pParams);
message.setDestinationAddress(mAddress);
message.setSourceAddress(publicAddress);
// Allocate a buffer to contain the message and the header
ByteArrayOutputStream baos = new ByteArrayOutputStream();
MessageWireFormatFactory.newMessageWireFormat(
new MimeMediaType( "application/x-jxta-msg" ) )
.writeMessage(baos, message);
baos.close();
byte bytes[] = baos.toByteArray();
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
byte[] buffer = new byte [multicastPacketSize];
// Store the message into the buffer. We must loop because the input
// stream may not return the whole message just because we ask for it.
int msgSize = Header.length;
for(;;) {
int res = bais.read( buffer, msgSize, multicastPacketSize - msgSize );
if( -1 == res )
break;
msgSize += res;
// we have to make sure that we don't allow message size to grow
// larger than multicast size or we will loop forever reading.
if( msgSize >= multicastPacketSize ) {
try {
bais.close();
} catch( IOException ignored ) {}
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "Message discarded : larger than multicast packet size ("
+ multicastPacketSize + ")" );
throw new IOException(
"Message discarded : larger than multicast packet size ("
+ multicastPacketSize + ")" );
}
}
try {
bais.close();
} catch( IOException ignored ) {}
if( Header.length == msgSize ) {
// Something odd happened. Discard the message
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "Message discarded : could not read whole message from stream" );
throw new IOException( "Message discarded : could not read whole message from stream" );
}
// First build the header
Header header = new Header();
header.cmd = Header.PROPAGATE;
// XXX: This part could probably be initialized only once. To be fixed.
header.srcAddr = usingInterface.getAddress();
header.srcPort = serverSocketPort;
header.size = msgSize - header.length;
header.buildForNetwork(buffer, 0);
DatagramPacket packet = new DatagramPacket(buffer, msgSize,
propagateInetAddress,
propagatePort);
if (multicastSocket == null) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -