📄 slpcore.java
字号:
} synchronized (dAs) { dAs.notifyAll(); } // if there is a daemon instance, inform it about the discovered // DA if (daemon != null) { daemon.newDaDiscovered(advert); } } platform.logDebug("NEW DA LIST: " + dAs); return null; // reply messages case SLPMessage.ATTRRPLY: case SLPMessage.SRVRPLY: case SLPMessage.SRVTYPERPLY: synchronized (replyListeners) { List queue = (List) replyListeners .get(new Integer(message.xid)); if (queue != null) { synchronized (queue) { queue.add(message); queue.notifyAll(); } return null; } else { platform.logTraceReg("SRVTYPEREPLY recieved (" + message.address + ":" + message.port + ") " + message.toString() + " but not replyListeners present anymore"); } } return null; // request messages case SLPMessage.SRVRQST: case SLPMessage.ATTRRQST: case SLPMessage.SRVTYPERQST: // silently drop messages where this peer is in the previous // responder list for (int i = 0; i < SLPCore.myIPs.length; i++) { if (((RequestMessage) message).prevRespList .contains(SLPCore.myIPs[i])) { platform.logTraceDrop("DROPPED (" + message.address + ":" + message.port + ") " + message.toString() + "(udp multicast)"); return null; } } // if we have a daemon instance, delegate the // message to the daemon. if (daemon != null) { return daemon.handleMessage(message); } else { platform.logDebug("SRVTYPERQST recieved (" + message.address + ":" + message.port + ") " + message.toString() + " but no SLPDaemon to handle the message present"); return null; } default: // if we have a daemon instance, delegate all other // messages to the daemon. if (daemon != null) { return daemon.handleMessage(message); } else { platform.logDebug("A message recieved (" + message.address + ":" + message.port + ") " + message.toString() + " but no SLPDaemon to handle the message present"); return null; } } } /** * get the next XID. * * @return the next XID. */ static short nextXid() { if (nextXid == 0) { nextXid = 1; } return nextXid++; } /** * find DAs for the scopes by sending a multicast service request for * service <i>service:directory-agent</i>. * * @param scopes * a <code>List</code> of scopes. * @throws ServiceLocationException * in case of network errors. */ static void daLookup(final List scopes) throws ServiceLocationException { int i = 0; try { // change by TomoTherapy Inc // added loop for each IP for each interface // used 1.4 SocketAddress // altered by Jan to be backwards compatible with Java 2 for (; i < myIPs.length; i++) { // create a socket bound to the next ip address final InetAddress addr = InetAddress.getByName(myIPs[i]); DatagramSocket socket = new DatagramSocket(0, addr); ServiceRequest sreq = new ServiceRequest(new ServiceType( SLP_DA_TYPE), scopes, null, SLPCore.DEFAULT_LOCALE); sreq.xid = SLPCore.nextXid(); sreq.scopeList = scopes; sreq.address = MCAST_ADDRESS; sreq.multicast = true; byte[] bytes = sreq.getBytes(); DatagramPacket d = new DatagramPacket(bytes, bytes.length, MCAST_ADDRESS, SLP_PORT); platform.logTraceMessage("SENT " + sreq + "(udp multicast)"); setupReceiverThread(socket, CONFIG.getWaitTime(), sreq); try { socket.send(d); } catch (SocketException se) { // blacklist address final List remaining = new ArrayList(java.util.Arrays .asList(myIPs)); final String faulty = myIPs[i]; remaining.remove(faulty); myIPs = (String[]) remaining.toArray(new String[remaining .size()]); platform.logDebug("Blacklisting IP " + faulty); } } } catch (IllegalArgumentException ise) { platform.logDebug("May never happen, no filter set", ise); } catch (UnknownHostException uhe) { platform.logWarning("Unknown net.slp.interfaces address: " + myIPs[i], uhe); throw new ServiceLocationException( ServiceLocationException.NETWORK_ERROR, uhe.getMessage()); } catch (IOException e) { platform.logWarning("Error connecting to: " + myIPs[i], e); throw new ServiceLocationException( ServiceLocationException.NETWORK_ERROR, e.getMessage()); } } /** * send a unicast message over TCP. * * @param msg * the message. * @return the reply. * @throws ServiceLocationException * in case of network errors. */ static ReplyMessage sendMessageTCP(final SLPMessage msg) throws ServiceLocationException { try { if (msg.xid == 0) { msg.xid = nextXid(); } Socket socket = new Socket(msg.address, msg.port); socket.setSoTimeout(CONFIG.getTCPTimeout()); DataOutputStream out = new DataOutputStream(socket .getOutputStream()); DataInputStream in = new DataInputStream(socket.getInputStream()); msg.writeTo(out); final ReplyMessage reply = (ReplyMessage) SLPMessage.parse( msg.address, msg.port, in, true); socket.close(); return reply; } catch (Exception e) { throw new ServiceLocationException( ServiceLocationException.NETWORK_ERROR, e.getMessage()); } } /** * send a unicast message over UDP. * * @param msg * the message to be sent. * @param expectReply * waits for a reply if set to true. * @return the reply. * @throws ServiceLocationException * in case of network errors etc. */ static ReplyMessage sendMessage(final SLPMessage msg, final boolean expectReply) throws ServiceLocationException { if (msg.xid == 0) { msg.xid = nextXid(); } if (msg.getSize() > CONFIG.getMTU() || TCP_ONLY) { return sendMessageTCP(msg); } try { DatagramSocket dsocket = new DatagramSocket(); dsocket.setSoTimeout(CONFIG.getDatagramMaxWait()); byte[] bytes = msg.getBytes(); DatagramPacket packet = new DatagramPacket(bytes, bytes.length, msg.address, msg.port); byte[] receivedBytes = new byte[CONFIG.getMTU()]; DatagramPacket received = new DatagramPacket(receivedBytes, receivedBytes.length); dsocket.send(packet); platform.logTraceMessage("SENT (" + msg.address + ":" + msg.port + ") " + msg + " (via udp port " + dsocket.getLocalPort() + ")"); // if no reply is expected, return if (!expectReply) { return null; } dsocket.receive(received); dsocket.close(); final DataInputStream in = new DataInputStream( new ByteArrayInputStream(received.getData())); ReplyMessage reply = (ReplyMessage) SLPMessage.parse(received .getAddress(), received.getPort(), in, false); return reply; } catch (SocketException se) { throw new ServiceLocationException( ServiceLocationException.NETWORK_INIT_FAILED, se .getMessage()); } catch (ProtocolException pe) { // Overflow, retry with TCP return sendMessageTCP(msg); } catch (IOException ioe) { platform.logError("Exception during sending of " + msg); platform.logError("to " + msg.address + ":" + msg.port); platform.logError("Exception:", ioe); throw new ServiceLocationException( ServiceLocationException.NETWORK_ERROR, ioe.getMessage()); } catch (Throwable t) { platform.logDebug(t.getMessage(), t); throw new ServiceLocationException((short) 1, t.getMessage()); } } /** * send a request via multicast convergence algorithm. * * @param msg * the message. * @return the collected reply messages. * @throws ServiceLocationException * in case of network errors. */ static List multicastConvergence(final RequestMessage msg) throws ServiceLocationException { try { long start = System.currentTimeMillis(); List replyQueue = new ArrayList(); List responders = new ArrayList(); List responses = new ArrayList(); if (msg.xid == 0) { msg.xid = SLPCore.nextXid(); } // register the reply queue as listener Integer queryXID = new Integer(msg.xid); synchronized (replyListeners) { replyListeners.put(queryXID, replyQueue); } msg.port = SLPCore.SLP_PORT; msg.prevRespList = new ArrayList(); msg.multicast = true; // send to localhost, in case the OS does not support multicast over // loopback which can fail if no SA is running locally msg.address = LOCALHOST; try { replyQueue.add(sendMessageTCP(msg)); } catch (ServiceLocationException e) { if(e.getErrorCode() != ServiceLocationException.NETWORK_ERROR) { throw e; } } msg.address = MCAST_ADDRESS; ReplyMessage reply; for (int i = 0; i < myIPs.length; i++) { // create a socket bound to the next ip address final InetAddress addr = InetAddress.getByName(myIPs[i]); final MulticastSocket socket = new MulticastSocket(); socket.setInterface(addr); socket.setTimeToLive(CONFIG.getMcastTTL()); setupReceiverThread(socket, CONFIG.getMcastMaxWait(), msg); // the multicast convergence algorithm long totalTimeout = System.currentTimeMillis() + CONFIG.getMcastMaxWait(); int[] transmissionSchedule = SLPCore.CONFIG.getMcastTimeouts(); int retryCounter = 0; long nextTimeout; int failCounter = 0; boolean seenNew = false; boolean seenLocalResponse = false; nextTimeout = System.currentTimeMillis() + transmissionSchedule[retryCounter]; while (!Thread.currentThread().isInterrupted() && totalTimeout > System.currentTimeMillis() && nextTimeout > System.currentTimeMillis() && retryCounter < transmissionSchedule.length && failCounter < CONFIG.getConvergenceFailerCount()) { msg.prevRespList = responders; byte[] message = msg.getBytes(); // finish convergence in case of message size exeeds MTU if (message.length > CONFIG.getMTU()) { break; } // send the message DatagramPacket p = new DatagramPacket(message, message.length, InetAddress .getByName(SLP_MCAST_ADDRESS), SLP_PORT); try { socket.send(p); } catch (IOException ioe) { break; } platform.logTraceMessage("SENT " + msg); /** * @fix: bug #1518729. Changed processing of the replyQueue. * Thanks to Richard Reid for figuring out the problem * with multicast replies and proposing the fix */ try { Thread.sleep(transmissionSchedule[retryCounter]); } catch (InterruptedException dontcare) { // Restore the interrupted status Thread.currentThread().interrupt(); } synchronized (replyQueue) { // did something else wake us up ? if (replyQueue.isEmpty()) { failCounter++; nextTimeout = System.currentTimeMillis() + transmissionSchedule[retryCounter++]; continue; } while (!replyQueue.isEmpty()) { reply = (ReplyMessage) replyQueue.remove(0); // silently drop duplicate responses, process only // new // results if (!responders.contains(reply.address .getHostAddress())) { if (isLocalResponder(reply.address)) { if (seenLocalResponse) { continue; } else { seenLocalResponse = true; } } seenNew = true; responders.add(reply.address.getHostAddress()); responses.addAll(reply.getResult()); } } if (!seenNew) { failCounter++; } else { seenNew = false; } } nextTimeout = System.currentTimeMillis() + transmissionSchedule[retryCounter++]; } } // we are done, remove the listener queue synchronized (replyListeners) { replyListeners.remove(queryXID); } platform.logDebug("convergence for xid=" + msg.xid + " finished after " + (System.currentTimeMillis() - start) + " ms, result: " + responses); return responses; } catch (IOException ioe) { platform.logDebug(ioe.getMessage(), ioe); throw new ServiceLocationException( ServiceLocationException.NETWORK_ERROR, ioe.getMessage()); } } private static boolean isLocalResponder(InetAddress addr) { for (int i = 0; i < SLPCore.myIPs.length; i++) { if (addr.getHostAddress().equals(SLPCore.myIPs[i])) { return true; } } return false; } /** * setup a new receiver thread for a socket. * * @param socket * the <code>DatagramSocket</code> for which the receiver * thread is set up. * @param minLifetime * the minimum lifetime of the receiver thread. */ private static void setupReceiverThread(final DatagramSocket socket, final long minLifetime, final SLPMessage msg) { new Thread() { public void run() { // prepare an empty datagram for receiving DatagramPacket packet; byte[] bytes = new byte[SLPCore.CONFIG.getMTU()]; // calculate the end of lifetime long timeout = System.currentTimeMillis() + minLifetime + 1000; // while lifetime is not expired while (System.currentTimeMillis() < timeout) { // set socket timeout try { long l = timeout - System.currentTimeMillis(); int soTimeout = (int) (l < 0 ? 1 : l); socket.setSoTimeout(soTimeout); } catch (SocketException e1) { platform.logError( "Exception in mcast receiver thread", e1); return; } packet = new DatagramPacket(bytes, bytes.length); try { // try to receive a datagram packet socket.receive(packet); } catch (InterruptedIOException iioe) { continue; } catch (IOException e) { platform.logDebug(e.getMessage(), e); return; } final DataInputStream in = new DataInputStream( new ByteArrayInputStream(packet.getData())); try { // and delegate it to the SLPCore handleMessage(SLPMessage.parse(packet.getAddress(), packet.getPort(), in, false)); } catch (ProtocolException pe) { // Overflow, try to use TCP try { msg.address = packet.getAddress(); msg.port = packet.getPort(); msg.multicast = false; handleMessage(sendMessageTCP(msg)); } catch (ServiceLocationException e) { } } catch (ServiceLocationException e) { platform.logDebug(e.getMessage(), e); } } // close the socket socket.close(); } }.start(); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -