📄 mcasttransport.java
字号:
// get the TransportAdv if (tcpChilds.hasMoreElements()) { param = tcpChilds.nextElement(); Attribute typeAttr = param.getAttribute("type"); if (!TCPAdv.getAdvertisementType().equals(typeAttr.getValue())) { throw new IllegalArgumentException("transport adv is not a " + TCPAdv.getAdvertisementType()); } if (tcpChilds.hasMoreElements()) { throw new IllegalArgumentException("Multiple transport advs detected for " + assignedID); } } else { throw new IllegalArgumentException(TransportAdvertisement.getAdvertisementType() + " could not be located."); } Advertisement paramsAdv = null; try { paramsAdv = AdvertisementFactory.newAdvertisement(param); } catch (NoSuchElementException notThere) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Could not find parameter document", notThere); } } if (!(paramsAdv instanceof TCPAdv)) { throw new IllegalArgumentException("Provided Advertisement was not a " + TCPAdv.getAdvertisementType()); } TCPAdv adv = (TCPAdv) paramsAdv; // Check if we are disabled. If so, don't bother with the rest of config. if (!adv.getMulticastState()) { disabled = true; return; } // Determine the local interface to use. If the user specifies one, use // that. Otherwise, use the all the available interfaces. interfaceAddressStr = adv.getInterfaceAddress(); if (interfaceAddressStr != null) { try { usingInterface = InetAddress.getByName(interfaceAddressStr); } catch (UnknownHostException failed) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Invalid address for local interface address, using default"); } usingInterface = IPUtils.ANYADDRESS; } } else { usingInterface = IPUtils.ANYADDRESS; } // Start the servers // Only the outgoing interface matters. // Verify that ANY interface does not in fact mean LOOPBACK only. // If that's the case, we want to make that explicit, so that // consistency checks regarding the allowed use of that interface work // properly. if (usingInterface.equals(IPUtils.ANYADDRESS)) { boolean localOnly = true; Iterator<InetAddress> eachLocal = IPUtils.getAllLocalAddresses(); while (eachLocal.hasNext()) { InetAddress anAddress = eachLocal.next(); if (!anAddress.isLoopbackAddress()) { localOnly = false; break; } } if (localOnly) { usingInterface = IPUtils.LOOPBACK; } } ourSrcAddr = new EndpointAddress(group.getPeerID(), null, null); msgSrcAddrElement = new StringMessageElement(EndpointServiceImpl.MESSAGE_SOURCE_NAME, ourSrcAddr.toString(), null); // Get the multicast configuration. multicastAddress = adv.getMulticastAddr(); multicastPort = adv.getMulticastPort(); // XXX 20070711 bondolo We resolve the address only once. Perhaps we should do this dynamically? try { multicastInetAddress = InetAddress.getByName(multicastAddress); } catch (UnknownHostException notValid) { IllegalArgumentException failed = new IllegalArgumentException("Invalid or unknown host name :" + multicastAddress); failed.initCause(notValid); throw failed; } assert multicastInetAddress.isMulticastAddress(); publicAddress = new EndpointAddress(protocolName, multicastAddress + ":" + Integer.toString(multicastPort), null, null); multicastPacketSize = adv.getMulticastSize(); // Create the multicast input socket try { multicastSocket = new MulticastSocket(new InetSocketAddress(usingInterface, multicastPort)); } catch (IOException failed) { throw new PeerGroupException("Could not open multicast socket", failed); } try { // Surprisingly, "true" means disable.... multicastSocket.setLoopbackMode(false); } catch (SocketException ignored) { // We may not be able to set loopback mode. It is inconsistent // whether an error will occur if the set fails. } // Tell tell the world about our configuration. if (Logging.SHOW_CONFIG && LOG.isLoggable(Level.CONFIG)) { StringBuilder configInfo = new StringBuilder("Configuring IP Multicast Message Transport : " + assignedID); if (implAdvertisement != null) { configInfo.append("\n\tImplementation :"); configInfo.append("\n\t\tModule Spec ID: ").append(implAdvertisement.getModuleSpecID()); configInfo.append("\n\t\tImpl Description : ").append(implAdvertisement.getDescription()); configInfo.append("\n\t\tImpl URI : ").append(implAdvertisement.getUri()); configInfo.append("\n\t\tImpl Code : ").append(implAdvertisement.getCode()); } configInfo.append("\n\tGroup Params:"); configInfo.append("\n\t\tGroup : ").append(group); configInfo.append("\n\t\tPeer ID: ").append(group.getPeerID()); configInfo.append("\n\tConfiguration:"); configInfo.append("\n\t\tProtocol: ").append(protocolName); configInfo.append("\n\t\tInterface address: ").append(interfaceAddressStr == null ? "(unspecified)" : interfaceAddressStr); configInfo.append("\n\t\tMulticast Addr: ").append(multicastAddress); configInfo.append("\n\t\tMulticast Port: ").append(multicastPort); configInfo.append("\n\t\tMulticast Packet Size: ").append(multicastPacketSize); configInfo.append("\n\tBound To :"); configInfo.append("\n\t\tUsing Interface: ").append(usingInterface.getHostAddress()); configInfo.append("\n\t\tMulticast Server Bind Addr: ").append(multicastSocket.getLocalSocketAddress()); configInfo.append("\n\t\tPublic Address: ").append(publicAddress); LOG.config(configInfo.toString()); } } /** * {@inheritDoc} */ public synchronized int startApp(String[] arg) { if (disabled) { if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("IP Multicast Message Transport disabled."); } return Module.START_DISABLED; } endpoint = group.getEndpointService(); if (null == endpoint) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Stalled until there is an endpoint service"); } return Module.START_AGAIN_STALLED; } isClosed = false; if (TransportMeterBuildSettings.TRANSPORT_METERING) { TransportServiceMonitor transportServiceMonitor = (TransportServiceMonitor) MonitorManager.getServiceMonitor(group, MonitorResources.transportServiceMonitorClassID); if (transportServiceMonitor != null) { multicastTransportMeter = transportServiceMonitor.createTransportMeter("Multicast", publicAddress); multicastTransportBindingMeter = getMulticastTransportBindingMeter(publicAddress); // Since multicast is connectionless, force it to appear outbound connected multicastTransportBindingMeter.connectionEstablished(true, 0); // Since multicast is connectionless, force it to appear inbound connected multicastTransportBindingMeter.connectionEstablished(false, 0); } } // We're fully ready to function. MessengerEventListener messengerEventListener = endpoint.addMessageTransport(this); if (messengerEventListener == null) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.severe("Transport registration refused"); } return -1; } // Cannot start before registration multicastProcessor = new DatagramProcessor(((StdPeerGroup) group).getExecutor()); multicastThread = new Thread(group.getHomeThreadGroup(), this, "IP Multicast Listener for " + publicAddress); multicastThread.setDaemon(true); multicastThread.start(); try { multicastSocket.joinGroup(multicastInetAddress); } catch (IOException soe) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.severe("Could not join multicast group, setting Multicast off"); } return -1; } if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("IP Multicast Message Transport started."); } return Module.START_OK; } /** * {@inheritDoc} */ public synchronized void stopApp() { if (isClosed || disabled) { return; } isClosed = true; if (multicastSocket != null) { multicastSocket.close(); multicastSocket = null; } if (null != multicastProcessor) { multicastProcessor.stop(); multicastProcessor = null; } endpoint.removeMessageTransport(this); if (TransportMeterBuildSettings.TRANSPORT_METERING && (multicastTransportBindingMeter != null)) { // Since multicast is connectionless, force it to appear outbound disconnected multicastTransportBindingMeter.connectionClosed(true, 0); // Since multicast is connectionless, force it to appear inbound disconnected multicastTransportBindingMeter.connectionClosed(false, 0); } } /** * {@inheritDoc} */ public String getProtocolName() { return protocolName; } /** * {@inheritDoc} */ public EndpointAddress getPublicAddress() { return publicAddress; } /** * {@inheritDoc} */ public EndpointService getEndpointService() { return (EndpointService) endpoint.getInterface(); } /** * {@inheritDoc} */ public Object transportControl(Object operation, Object Value) { return null; } /** * {@inheritDoc} * <p/> * Handles incoming multicasts and enqueues them with the datagram processor. */ public void run() { try { while (!isClosed) { byte[] buffer = new byte[multicastPacketSize]; DatagramPacket packet = new DatagramPacket(buffer, buffer.length); try { multicastSocket.receive(packet); if (isClosed) { return; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("multicast message received from :" + packet.getAddress().getHostAddress()); } // This operation is blocking and may take a long time to // return. As a result we may lose datagram packets because // we are not calling // {@link MulticastSocket#receive(DatagramPacket)} often
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -