📄 jxtamulticastsocket.java
字号:
return null; } /** * Returns the binding state of the MutlicastSocket. * * @return true if the MutlicastSocket successfully bound to an address */ @Override public boolean isBound() { return bound; } /** * Closes this MutlicastSocket. */ @Override public synchronized void close() { if (closed) { return; } bound = false; closed = true; in.close(); outputPipe.close(); queue.close(); in = null; } /** * {@inheritDoc} */ public void pipeMsgEvent(PipeMsgEvent event) { Message message = event.getMessage(); if (message == null) { return; } MessageElement element = null; // does the message contain any data element = message.getMessageElement(NAMESPACE, DATATAG); if (element == null) { return; } try { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Pushing a message onto queue"); } queue.push(message, -1); } catch (InterruptedException e) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Interrupted", e); } } } /** * Gets the Timeout attribute of the JxtaMulticastSocket * * @return The soTimeout value */ @Override public synchronized int getSoTimeout() { return timeout; } /** * Sets the Timeout attribute of the JxtaMulticastSocket * a timeout of 0 blocks forever, by default this Socket's * timeout is set to 0 * * @param timeout The new soTimeout value */ @Override public synchronized void setSoTimeout(int timeout) throws SocketException { checkState(); this.timeout = timeout; } /** * Returns the closed state of the JxtaMulticastSocket. * * @return true if the socket has been closed */ @Override public synchronized boolean isClosed() { return closed; } /** * Throws a SocketException if closed or not bound * * @throws SocketException if closed */ private void checkState() throws SocketException { if (isClosed()) { throw new SocketException("MulticastSocket is closed"); } else if (!isBound()) { throw new SocketException("MulticastSocket not bound"); } } /** * {@inheritDoc} */ @Override public void send(DatagramPacket packet) throws IOException { checkState(); byte[] data = new byte[packet.getLength()]; System.arraycopy(packet.getData(), packet.getOffset(), data, 0, packet.getLength()); Message msg = new Message(); msg.addMessageElement(NAMESPACE, srcElement); msg.addMessageElement(NAMESPACE, new ByteArrayMessageElement(DATATAG, MimeMediaType.AOS, data, null)); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sending a data packet"); } InetAddress address = packet.getAddress(); PeerID pid = null; if (address != null) { String pidStr = address.getHostName(); try { pid = (PeerID) IDFactory.fromURI(new URI(pidStr)); } catch (Exception ex) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Invalid source PeerID multicasting instead"); } } } if (pid != null) { // Unicast datagram // create a op pipe to the destination peer OutputPipe op = pipeSvc.createOutputPipe(pipeAdv, Collections.singleton(pid), 1000); op.send(msg); op.close(); } else { // multicast outputPipe.send(msg); } } /** * {@inheritDoc} */ @Override public void receive(DatagramPacket packet) throws IOException { checkState(); Message msg = null; // data MessageElement del = null; // src MessageElement sel = null; try { msg = (Message) queue.pop(timeout); if (msg == null) { if (timeout > 0) { throw new SocketTimeoutException("Socket timeout reached"); } else { return; } } del = msg.getMessageElement(NAMESPACE, DATATAG); sel = msg.getMessageElement(NAMESPACE, SRCIDTAG); if (del == null || sel == null) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Message contains no data element, returning"); } return; } else { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Popped a message off the queue"); } } } catch (InterruptedException e) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Exception occured", e); } throw new IOException(e.toString()); } if (del.getByteLength() > packet.getLength()) { throw new IOException("Datagram can not accomodate message of size :" + del.getByteLength()); } String addrStr = new String(sel.getBytes(false), 0, (int) sel.getByteLength(), "UTF8"); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Src Address :" + addrStr); } InetAddress address = InetAddress.getByAddress(addrStr, fauxip); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Setting Data, and Src Address :" + address); } packet.setAddress(address); packet.setData(del.getBytes(false)); } /** * {@inheritDoc} */ @Override public InetAddress getLocalAddress() { if (isClosed()) { return null; } return localAddress; } /** * {@inheritDoc} */ @Override public SocketAddress getLocalSocketAddress() { if (isClosed()) { return null; } return socketAddress; } /** * {@inheritDoc} */ @Override public void bind(SocketAddress addr) throws SocketException { if (isBound()) { throw new SocketException("Already bound"); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -