📄 wirepipe.java
字号:
/**
* create an OutputPipe from the pipe Advertisement giving an Enumeration of
* PeerId(s). Only peers in that Enumeration will receive messages.
*
* @param peers is an enumeration of the PeerId of peers that will receive the
* message.
*
* @return OuputPipe
* @throws IOException if none of the peers in the enumeration has the
* corresponding OutputPipe
*/
public OutputPipe createOutputPipe( Enumeration peers, long timeout)
throws IOException {
if (!valid) {
// This WIRE is not valid
throw new IOException();
}
// create a OutputPipe
OutputPipe op = null;
op = new WirePrivateOutputPipe(this, wireId, peers);
return op;
}
public void sendMessage(Message msg, WireHeader header)
throws IOException {
// send locally.
msg = localDemux(msg);
// send remote.
if (LOG.isEnabledFor(Priority.INFO)) LOG.info("propagating message");
propagate(msg, header);
}
public void processIncomingMessage(Message msg,
EndpointAddress srcAddr,
EndpointAddress dstAddr) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("processIncominMessage starts");
// Check if there is a JXTA-WIRE header
WireHeader header = null;
try {
MessageElement elem = msg.getElement(WireTagName);
InputStream ip = elem.getStream();
if (ip != null) {
header = new WireHeader(ip);
ip.close();
}
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("No JxtaWireHeader tag. Message is discarded");
return;
}
if (isLoopback(header)) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Loopback detected - discard");
return;
}
if (!isAlive(header)) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Dead on arrival - discard");
return;
}
// Message is valid
// Queue the message to local InputPipes
msg = localDemux(msg);
if (!isToBePropagated(header)) {
// This message ran out of fuel.
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Max TTL reached - discard");
return;
}
if (LOG.isEnabledFor(Priority.INFO)) LOG.info("repropagating message");
propagate(msg, header);
}
// Check if the "peers" list within the header contains
// the local peer.
private boolean isLoopback(WireHeader header) {
Vector peers = header.getPeers();
if ((peers == null) || (peers.size() <= 0)) {
// This header does not contain any peer. This is
// suspicious, so, discard the message, pretending
// it is a loopback (messages are loopback "by default").
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("isLoopback: No peers in header");
return true;
}
String peerId = null;
for (int i = 0; i < peers.size(); ++i) {
try {
peerId = (String) peers.elementAt(i);
if (peerId.equals(localPeerId)) {
// This is a lookback
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("isLoopback: local peer is in peers");
return true;
}
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("isLoopback failed with " + e);
continue;
}
}
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("isLoopback: this message is not a loopback");
return false;
}
private boolean isAlive(WireHeader header) {
// XXX lomax@jxta.org
// To be implemented
return true;
}
// For the time being, just decrement the TTL is check if it
// still strictly positive.
private boolean isToBePropagated(WireHeader header) {
header.setTTL(header.getTTL() - 1);
if (header.getTTL() < 1) {
return false;
}
return true;
}
private synchronized Message localDemux(Message msg) {
Message tmpMsg = null;
// Copy the message and queue it to all the queues
for (int i = 0; i < inputs.size(); ++i) {
try {
EndpointReceiveQueue queue = (EndpointReceiveQueue) inputs.elementAt(i);
if (queue == null) {
continue;
}
tmpMsg = (Message) msg.clone();
queue.push(tmpMsg);
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("localDemux failed with " + e);
continue;
}
}
// Fire message events on all the listeners
for (int i = 0; i < pipemsglisteners.size(); ++i) {
try {
PipeMsgListener listener = (PipeMsgListener) pipemsglisteners.elementAt(i);
if (listener == null) {
continue;
}
tmpMsg = (Message) msg.clone();
PipeMsgEvent event = new PipeMsgEvent(this, tmpMsg);
listener.pipeMsgEvent( event );
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("localDemux failed with " + e);
continue;
}
}
return msg;
}
private void addLocalPeer(WireHeader header) {
Vector peers = header.getPeers();
if (peers == null) {
peers = new Vector();
}
String peerId = null;
for (int i = 0; i < peers.size(); ++i) {
try {
peerId = (String) peers.elementAt(i);
if (peerId.equals(localPeerId)) {
return;
}
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("addLocalPeer failed with " + e);
}
}
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Adding local peer id into the header");
peers.addElement(localPeerId);
header.setPeers(peers);
}
private void propagate(Message msg, WireHeader header) {
if (LOG.isEnabledFor(Priority.INFO)) LOG.info("propagate starts");
// Add, if necessary, the local peer into the peers lists
// Check if the local peer is already in the list. Otherwise
// add it.
addLocalPeer(header);
// XXX lomax@jxta.org
// This implementation directely gives the message to propagate
// to the rendezvous. This can be a "semi-blocking" operation
// (read slow).
// A better implementation would be to simply queue the message
// and have a background thread actually sending the message.
// To do.
try {
// Push the header onto the message
msg.addElement(msg.newMessageElement (WireTagName,
null,
header.getInputStream()));
rendezvous.propagateInGroup(msg,
WireName,
wireId,
7,
null);
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("propagate failed with " + e);
}
return;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -