⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 slpcore.java

📁 Introduction to Sybase IQ.pdf
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
				}				synchronized (dAs) {					dAs.notifyAll();				}				// if there is a daemon instance, inform it about the discovered				// DA				if (daemon != null) {					daemon.newDaDiscovered(advert);				}			}			platform.logDebug("NEW DA LIST: " + dAs);			return null;			// reply messages		case SLPMessage.ATTRRPLY:		case SLPMessage.SRVRPLY:		case SLPMessage.SRVTYPERPLY:			synchronized (replyListeners) {				List queue = (List) replyListeners						.get(new Integer(message.xid));				if (queue != null) {					synchronized (queue) {						queue.add(message);						queue.notifyAll();					}					return null;				} else {					platform.logTraceReg("SRVTYPEREPLY recieved ("							+ message.address + ":" + message.port + ") "							+ message.toString()							+ " but not replyListeners present anymore");				}			}			return null;			// request messages		case SLPMessage.SRVRQST:		case SLPMessage.ATTRRQST:		case SLPMessage.SRVTYPERQST:			// silently drop messages where this peer is in the previous			// responder list			for (int i = 0; i < SLPCore.myIPs.length; i++) {				if (((RequestMessage) message).prevRespList						.contains(SLPCore.myIPs[i])) {					platform.logTraceDrop("DROPPED (" + message.address + ":"								+ message.port + ") " + message.toString()								+ "(udp multicast)");					return null;				}			}			// if we have a daemon instance, delegate the			// message to the daemon.			if (daemon != null) {				return daemon.handleMessage(message);			} else {				platform.logDebug("SRVTYPERQST recieved ("						+ message.address + ":" + message.port + ") "						+ message.toString()						+ " but no SLPDaemon to handle the message present");				return null;			}		default:			// if we have a daemon instance, delegate all other			// messages to the daemon.			if (daemon != null) {				return daemon.handleMessage(message);			} else {				platform.logDebug("A message recieved ("						+ message.address + ":" + message.port + ") "						+ message.toString()						+ " but no SLPDaemon to handle the message present");				return null;			}		}	}	/**	 * get the next XID.	 * 	 * @return the next XID.	 */	static short nextXid() {		if (nextXid == 0) {			nextXid = 1;		}		return nextXid++;	}	/**	 * find DAs for the scopes by sending a multicast service request for	 * service <i>service:directory-agent</i>.	 * 	 * @param scopes	 *            a <code>List</code> of scopes.	 * @throws ServiceLocationException	 *             in case of network errors.	 */	static void daLookup(final List scopes) throws ServiceLocationException {		int i = 0;		try {			// change by TomoTherapy Inc			// added loop for each IP for each interface			// used 1.4 SocketAddress			// altered by Jan to be backwards compatible with Java 2			for (; i < myIPs.length; i++) {				// create a socket bound to the next ip address				final InetAddress addr = InetAddress.getByName(myIPs[i]);				DatagramSocket socket = new DatagramSocket(0, addr);				ServiceRequest sreq = new ServiceRequest(new ServiceType(						SLP_DA_TYPE), scopes, null, SLPCore.DEFAULT_LOCALE);				sreq.xid = SLPCore.nextXid();				sreq.scopeList = scopes;				sreq.address = MCAST_ADDRESS;				sreq.multicast = true;				byte[] bytes = sreq.getBytes();				DatagramPacket d = new DatagramPacket(bytes, bytes.length,						MCAST_ADDRESS, SLP_PORT);				platform.logTraceMessage("SENT " + sreq + "(udp multicast)");				setupReceiverThread(socket, CONFIG.getWaitTime(), sreq);				try {					socket.send(d);				} catch (SocketException se) {					// blacklist address					final List remaining = new ArrayList(java.util.Arrays							.asList(myIPs));					final String faulty = myIPs[i];					remaining.remove(faulty);					myIPs = (String[]) remaining.toArray(new String[remaining							.size()]);					platform.logDebug("Blacklisting IP " + faulty);				}			}		} catch (IllegalArgumentException ise) {			platform.logDebug("May never happen, no filter set", ise);		} catch (UnknownHostException uhe) {			platform.logWarning("Unknown net.slp.interfaces address: " + myIPs[i], uhe);			throw new ServiceLocationException(					ServiceLocationException.NETWORK_ERROR, uhe.getMessage());		} catch (IOException e) {			platform.logWarning("Error connecting to: " + myIPs[i], e);			throw new ServiceLocationException(					ServiceLocationException.NETWORK_ERROR, e.getMessage());		}	}	/**	 * send a unicast message over TCP.	 * 	 * @param msg	 *            the message.	 * @return the reply.	 * @throws ServiceLocationException	 *             in case of network errors.	 */	static ReplyMessage sendMessageTCP(final SLPMessage msg)			throws ServiceLocationException {		try {			if (msg.xid == 0) {				msg.xid = nextXid();			}			Socket socket = new Socket(msg.address, msg.port);			socket.setSoTimeout(CONFIG.getTCPTimeout());			DataOutputStream out = new DataOutputStream(socket					.getOutputStream());			DataInputStream in = new DataInputStream(socket.getInputStream());			msg.writeTo(out);			final ReplyMessage reply = (ReplyMessage) SLPMessage.parse(					msg.address, msg.port, in, true);			socket.close();			return reply;		} catch (Exception e) {			throw new ServiceLocationException(					ServiceLocationException.NETWORK_ERROR, e.getMessage());		}	}	/**	 * send a unicast message over UDP.	 * 	 * @param msg	 *            the message to be sent.	 * @param expectReply	 *            waits for a reply if set to true.	 * @return the reply.	 * @throws ServiceLocationException	 *             in case of network errors etc.	 */	static ReplyMessage sendMessage(final SLPMessage msg,			final boolean expectReply) throws ServiceLocationException {		if (msg.xid == 0) {			msg.xid = nextXid();		}		if (msg.getSize() > CONFIG.getMTU() || TCP_ONLY) {			return sendMessageTCP(msg);		}		try {			DatagramSocket dsocket = new DatagramSocket();			dsocket.setSoTimeout(CONFIG.getDatagramMaxWait());			byte[] bytes = msg.getBytes();			DatagramPacket packet = new DatagramPacket(bytes, bytes.length,					msg.address, msg.port);			byte[] receivedBytes = new byte[CONFIG.getMTU()];			DatagramPacket received = new DatagramPacket(receivedBytes,					receivedBytes.length);			dsocket.send(packet);			platform.logTraceMessage("SENT (" + msg.address + ":" + msg.port + ") "						+ msg + " (via udp port " + dsocket.getLocalPort()						+ ")");			// if no reply is expected, return			if (!expectReply) {				return null;			}			dsocket.receive(received);			dsocket.close();			final DataInputStream in = new DataInputStream(					new ByteArrayInputStream(received.getData()));			ReplyMessage reply = (ReplyMessage) SLPMessage.parse(received					.getAddress(), received.getPort(), in, false);			return reply;		} catch (SocketException se) {			throw new ServiceLocationException(					ServiceLocationException.NETWORK_INIT_FAILED, se							.getMessage());		} catch (ProtocolException pe) {			// Overflow, retry with TCP			return sendMessageTCP(msg);		} catch (IOException ioe) {			platform.logError("Exception during sending of " + msg);			platform.logError("to " + msg.address + ":" + msg.port);			platform.logError("Exception:", ioe);			throw new ServiceLocationException(					ServiceLocationException.NETWORK_ERROR, ioe.getMessage());		} catch (Throwable t) {			platform.logDebug(t.getMessage(), t);			throw new ServiceLocationException((short) 1, t.getMessage());		}	}	/**	 * send a request via multicast convergence algorithm.	 * 	 * @param msg	 *            the message.	 * @return the collected reply messages.	 * @throws ServiceLocationException	 *             in case of network errors.	 */	static List multicastConvergence(final RequestMessage msg)			throws ServiceLocationException {		try {			long start = System.currentTimeMillis();			List replyQueue = new ArrayList();			List responders = new ArrayList();			List responses = new ArrayList();			if (msg.xid == 0) {				msg.xid = SLPCore.nextXid();			}			// register the reply queue as listener			Integer queryXID = new Integer(msg.xid);			synchronized (replyListeners) {				replyListeners.put(queryXID, replyQueue);			}			msg.port = SLPCore.SLP_PORT;			msg.prevRespList = new ArrayList();			msg.multicast = true;			// send to localhost, in case the OS does not support multicast over			// loopback which can fail if no SA is running locally			msg.address = LOCALHOST;			try {				replyQueue.add(sendMessageTCP(msg));			} catch (ServiceLocationException e) {				if(e.getErrorCode() != ServiceLocationException.NETWORK_ERROR) {					throw e;				}			}			msg.address = MCAST_ADDRESS;			ReplyMessage reply;			for (int i = 0; i < myIPs.length; i++) {				// create a socket bound to the next ip address				final InetAddress addr = InetAddress.getByName(myIPs[i]);				final MulticastSocket socket = new MulticastSocket();				socket.setInterface(addr);				socket.setTimeToLive(CONFIG.getMcastTTL());				setupReceiverThread(socket, CONFIG.getMcastMaxWait(), msg);				// the multicast convergence algorithm				long totalTimeout = System.currentTimeMillis()						+ CONFIG.getMcastMaxWait();				int[] transmissionSchedule = SLPCore.CONFIG.getMcastTimeouts();				int retryCounter = 0;				long nextTimeout;				int failCounter = 0;				boolean seenNew = false;				boolean seenLocalResponse = false;				nextTimeout = System.currentTimeMillis()						+ transmissionSchedule[retryCounter];				while (!Thread.currentThread().isInterrupted()						&& totalTimeout > System.currentTimeMillis()						&& nextTimeout > System.currentTimeMillis()						&& retryCounter < transmissionSchedule.length						&& failCounter < CONFIG.getConvergenceFailerCount()) {					msg.prevRespList = responders;					byte[] message = msg.getBytes();					// finish convergence in case of message size exeeds MTU					if (message.length > CONFIG.getMTU()) {						break;					}					// send the message					DatagramPacket p = new DatagramPacket(message,							message.length, InetAddress									.getByName(SLP_MCAST_ADDRESS), SLP_PORT);					try {						socket.send(p);					} catch (IOException ioe) {						break;					}					platform.logTraceMessage("SENT " + msg);					/**					 * @fix: bug #1518729. Changed processing of the replyQueue.					 *       Thanks to Richard Reid for figuring out the problem					 *       with multicast replies and proposing the fix					 */					try {						Thread.sleep(transmissionSchedule[retryCounter]);					} catch (InterruptedException dontcare) {						// Restore the interrupted status						Thread.currentThread().interrupt();					}					synchronized (replyQueue) {						// did something else wake us up ?						if (replyQueue.isEmpty()) {							failCounter++;							nextTimeout = System.currentTimeMillis()									+ transmissionSchedule[retryCounter++];							continue;						}						while (!replyQueue.isEmpty()) {							reply = (ReplyMessage) replyQueue.remove(0);							// silently drop duplicate responses, process only							// new							// results							if (!responders.contains(reply.address									.getHostAddress())) {								if (isLocalResponder(reply.address)) {									if (seenLocalResponse) {										continue;									} else {										seenLocalResponse = true;									}								}								seenNew = true;								responders.add(reply.address.getHostAddress());								responses.addAll(reply.getResult());							}						}						if (!seenNew) {							failCounter++;						} else {							seenNew = false;						}					}					nextTimeout = System.currentTimeMillis()							+ transmissionSchedule[retryCounter++];				}			}			// we are done, remove the listener queue			synchronized (replyListeners) {				replyListeners.remove(queryXID);			}			platform.logDebug("convergence for xid=" + msg.xid						+ " finished after "						+ (System.currentTimeMillis() - start)						+ " ms, result: " + responses);			return responses;		} catch (IOException ioe) {			platform.logDebug(ioe.getMessage(), ioe);			throw new ServiceLocationException(					ServiceLocationException.NETWORK_ERROR, ioe.getMessage());		}	}	private static boolean isLocalResponder(InetAddress addr) {		for (int i = 0; i < SLPCore.myIPs.length; i++) {			if (addr.getHostAddress().equals(SLPCore.myIPs[i])) {				return true;			}		}		return false;	}	/**	 * setup a new receiver thread for a socket.	 * 	 * @param socket	 *            the <code>DatagramSocket</code> for which the receiver	 *            thread is set up.	 * @param minLifetime	 *            the minimum lifetime of the receiver thread.	 */	private static void setupReceiverThread(final DatagramSocket socket,			final long minLifetime, final SLPMessage msg) {		new Thread() {			public void run() {				// prepare an empty datagram for receiving				DatagramPacket packet;				byte[] bytes = new byte[SLPCore.CONFIG.getMTU()];				// calculate the end of lifetime				long timeout = System.currentTimeMillis() + minLifetime + 1000;				// while lifetime is not expired				while (System.currentTimeMillis() < timeout) {					// set socket timeout					try {						long l = timeout - System.currentTimeMillis();						int soTimeout = (int) (l < 0 ? 1 : l);						socket.setSoTimeout(soTimeout);					} catch (SocketException e1) {						platform.logError(									"Exception in mcast receiver thread", e1);						return;					}					packet = new DatagramPacket(bytes, bytes.length);					try {						// try to receive a datagram packet						socket.receive(packet);					} catch (InterruptedIOException iioe) {						continue;					} catch (IOException e) {						platform.logDebug(e.getMessage(), e);						return;					}					final DataInputStream in = new DataInputStream(							new ByteArrayInputStream(packet.getData()));					try {						// and delegate it to the SLPCore						handleMessage(SLPMessage.parse(packet.getAddress(),								packet.getPort(), in, false));					} catch (ProtocolException pe) {						// Overflow, try to use TCP						try {							msg.address = packet.getAddress();							msg.port = packet.getPort();							msg.multicast = false;							handleMessage(sendMessageTCP(msg));						} catch (ServiceLocationException e) {						}					} catch (ServiceLocationException e) {						platform.logDebug(e.getMessage(), e);					}				}				// close the socket				socket.close();			}		}.start();	}}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -