📄 udp.java.txt
字号:
private void setSourceAddress(Message msg) { if(msg.getSrc() == null) msg.setSrc(local_addr); } /** * Processes a packet read from either the multicast or unicast socket. Needs to be synchronized because * mcast or unicast socket reads can be concurrent. * Correction (bela April 19 2005): we acces no instance variables, all vars are allocated on the stack, so * this method should be reentrant: removed 'synchronized' keyword */ void handleIncomingUdpPacket(IpAddress dest, InetAddress sender, int port, byte[] data) { ByteArrayInputStream inp_stream=null; DataInputStream inp=null; Message msg=null; List l; // used if bundling is enabled short version; boolean is_message_list; try { // skip the first n bytes (default: 4), this is the version info inp_stream=new ByteArrayInputStream(data); inp=new DataInputStream(inp_stream); version=inp.readShort(); if(Version.compareTo(version) == false) { if(log.isWarnEnabled()) { StringBuffer sb=new StringBuffer(); sb.append("packet from ").append(sender).append(':').append(port); sb.append(" has different version (").append(version); sb.append(") from ours (").append(Version.printVersion()).append("). "); if(discard_incompatible_packets) sb.append("Packet is discarded"); else sb.append("This may cause problems"); log.warn(sb.toString()); } if(discard_incompatible_packets) return; } is_message_list=inp.readBoolean(); if(is_message_list) { l=bufferToList(inp, dest); for(Enumeration en=l.elements(); en.hasMoreElements();) { msg=(Message)en.nextElement(); try { handleMessage(msg); } catch(Throwable t) { if(log.isErrorEnabled()) log.error("failed unmarshalling message list", t); } } } else { msg=bufferToMessage(inp, dest, sender, port); handleMessage(msg); } } catch(Throwable e) { if(log.isErrorEnabled()) log.error("exception in processing incoming packet", e); } finally { Util.closeInputStream(inp); Util.closeInputStream(inp_stream); } } void handleMessage(Message msg) { Event evt; UdpHeader hdr; Address dst=msg.getDest(); if(dst == null) dst=mcast_addr; if(stats) { num_msgs_received++; num_bytes_received+=msg.getLength(); } // discard my own multicast loopback copy if(loopback) { Address src=msg.getSrc(); if((dst == null || (dst != null && dst.isMulticastAddress())) && src != null && local_addr.equals(src)) { if(log.isTraceEnabled()) log.trace("discarded own loopback multicast packet"); return; } } evt=new Event(Event.MSG, msg); if(log.isTraceEnabled()) { StringBuffer sb=new StringBuffer("message is "); sb.append(msg).append(", headers are ").append(msg.getHeaders()); log.trace(sb.toString()); } /* Because Protocol.up() is never called by this bottommost layer, we call up() directly in the observer. * This allows e.g. PerfObserver to get the time of reception of a message */ if(observer != null) observer.up(evt, up_queue.size()); hdr=(UdpHeader)msg.getHeader(name); // replaced removeHeader() with getHeader() if(hdr != null) { /* Discard all messages destined for a channel with a different name */ String ch_name=hdr.channel_name; // Discard if message's group name is not the same as our group name unless the // message is a diagnosis message (special group name DIAG_GROUP) if(ch_name != null && channel_name != null && !channel_name.equals(ch_name) && !ch_name.equals(Util.DIAG_GROUP)) { if(log.isWarnEnabled()) log.warn("discarded message from different group (" + ch_name + "). Sender was " + msg.getSrc()); return; } } else { if(log.isErrorEnabled()) log.error("message does not have a UDP header"); } passUp(evt); } void sendUdpMessage(Message msg) throws Exception { sendUdpMessage(msg, false); } /** Send a message to the address specified in dest */ void sendUdpMessage(Message msg, boolean copyForOutgoingQueue) throws Exception { IpAddress dest; Message copy; Event evt; dest=(IpAddress)msg.getDest(); // guaranteed to be non-null setSourceAddress(msg); if(log.isTraceEnabled()) { StringBuffer sb=new StringBuffer("sending msg to "); sb.append(msg.getDest()).append(" (src=").append(msg.getSrc()).append("), headers are ").append(msg.getHeaders()); log.trace(sb.toString()); } // Don't send if destination is local address. Instead, switch dst and src and put in up_queue. // If multicast message, loopback a copy directly to us (but still multicast). Once we receive this, // we will discard our own multicast message if(loopback && (dest.equals(local_addr) || dest.isMulticastAddress())) { copy=msg.copy(); // copy.removeHeader(name); // we don't remove the header copy.setSrc(local_addr); // copy.setDest(dest); evt=new Event(Event.MSG, copy); /* Because Protocol.up() is never called by this bottommost layer, we call up() directly in the observer. This allows e.g. PerfObserver to get the time of reception of a message */ if(observer != null) observer.up(evt, up_queue.size()); if(log.isTraceEnabled()) log.trace("looped back local message " + copy); passUp(evt); if(dest != null && !dest.isMulticastAddress()) return; } if(use_outgoing_packet_handler) { if(copyForOutgoingQueue) outgoing_queue.add(msg.copy()); else outgoing_queue.add(msg); return; } send(msg); } /** Internal method to serialize and send a message. This method is not reentrant */ void send(Message msg) throws Exception { Buffer buf; IpAddress dest=(IpAddress)msg.getDest(); // guaranteed to be non-null IpAddress src=(IpAddress)msg.getSrc(); synchronized(out_stream) { buf=messageToBuffer(msg, dest, src); doSend(buf, dest.getIpAddress(), dest.getPort()); } } void doSend(Buffer buf, InetAddress dest, int port) throws IOException { DatagramPacket packet; // packet=new DatagramPacket(data, data.length, dest, port); packet=new DatagramPacket(buf.getBuf(), buf.getOffset(), buf.getLength(), dest, port); if(stats) { num_msgs_sent++; num_bytes_sent+=buf.getLength(); } if(dest.isMulticastAddress() && mcast_send_sock != null) { // mcast_recv_sock might be null if ip_mcast is false mcast_send_sock.send(packet); } else { if(sock != null) sock.send(packet); } } void sendMultipleUdpMessages(Message msg, Vector dests) { Address dest; for(int i=0; i < dests.size(); i++) { dest=(Address)dests.elementAt(i); msg.setDest(dest); try { sendUdpMessage(msg, true); // copy for outgoing queue if outgoing queue handler is enabled } catch(Exception e) { if(log.isErrorEnabled()) log.error("failed sending multiple messages", e); } } } /** * This method needs to be synchronized on out_stream when it is called * @param msg * @param dest * @param src * @return * @throws IOException */ private Buffer messageToBuffer(Message msg, IpAddress dest, IpAddress src) throws Exception { Buffer retval; DataOutputStream out=null; try { out_stream.reset(); out=new DataOutputStream(out_stream); out.writeShort(Version.version); // write the version out.writeBoolean(false); // single message, *not* a list of messages nullAddresses(msg, dest, src); msg.writeTo(out); revertAddresses(msg, dest, src); out.flush(); retval=new Buffer(out_stream.getRawBuffer(), 0, out_stream.size()); return retval; } finally { Util.closeOutputStream(out); } } private void nullAddresses(Message msg, IpAddress dest, IpAddress src) { msg.setDest(null); if(!dest.isMulticastAddress()) { // unicast if(src != null) { if(null_src_addresses) msg.setSrc(new IpAddress(src.getPort(), false)); // null the host part, leave the port if(src.getAdditionalData() != null) ((IpAddress)msg.getSrc()).setAdditionalData(src.getAdditionalData()); } else { msg.setSrc(null); } } else { // multicast if(src != null) { if(null_src_addresses) msg.setSrc(new IpAddress(src.getPort(), false)); // null the host part, leave the port if(src.getAdditionalData() != null) ((IpAddress)msg.getSrc()).setAdditionalData(src.getAdditionalData()); } } } private void revertAddresses(Message msg, IpAddress dest, IpAddress src) { msg.setDest(dest); msg.setSrc(src); } private Message bufferToMessage(DataInputStream instream, IpAddress dest, InetAddress sender, int port) throws Exception { Message msg=new Message(); msg.readFrom(instream); setAddresses(msg, dest, sender, port); return msg; } private void setAddresses(Message msg, IpAddress dest, InetAddress sender, int port) { // set the destination address if(msg.getDest() == null && dest != null) msg.setDest(dest); // set the source address if not set IpAddress src_addr=(IpAddress)msg.getSrc(); if(src_addr == null) { try {msg.setSrc(new IpAddress(sender, port));} catch(Throwable t) {} } else { byte[] tmp_additional_data=src_addr.getAdditionalData(); if(src_addr.getIpAddress() == null) { try {msg.setSrc(new IpAddress(sender, src_addr.getPort()));} catch(Throwable t) {} } if(tmp_additional_data != null) ((IpAddress)msg.getSrc()).setAdditionalData(tmp_additional_data); } } private Buffer listToBuffer(List l, IpAddress dest) throws Exception { Buffer retval=null; IpAddress src; Message msg; int len=l != null? l.size() : 0; boolean src_written=false; DataOutputStream out=null; out_stream.reset(); try { out=new DataOutputStream(out_stream); out.writeShort(Version.version); out.writeBoolean(true); out.writeInt(len); for(Enumeration en=l.elements(); en.hasMoreElements();) { msg=(Message)en.nextElement(); src=(IpAddress)msg.getSrc(); if(!src_written) { Util.writeAddress(src, out); src_written=true; } msg.setDest(null); msg.setSrc(null); msg.writeTo(out); revertAddresses(msg, dest, src); } out.flush(); retval=new Buffer(out_stream.getRawBuffer(), 0, out_stream.size()); return retval; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -