tp.java
来自「JGRoups源码」· Java 代码 · 共 1,778 行 · 第 1/5 页
JAVA
1,778 行
package org.jgroups.protocols;import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;import org.jgroups.*;import org.jgroups.stack.Protocol;import org.jgroups.stack.IpAddress;import org.jgroups.util.*;import org.jgroups.util.List;import org.jgroups.util.Queue;import java.io.DataInputStream;import java.io.IOException;import java.net.*;import java.text.NumberFormat;import java.util.*;/** * Generic transport - specific implementations should extend this abstract class. * Features which are provided to the subclasses include * <ul> * <li>version checking * <li>marshalling and unmarshalling * <li>message bundling (handling single messages, and message lists) * <li>incoming packet handler * <li>loopback * </ul> * A subclass has to override * <ul> * <li>{@link #sendToAllMembers(byte[], int, int)} * <li>{@link #sendToSingleMember(org.jgroups.Address, byte[], int, int)} * <li>{@link #init()} * <li>{@link #start()}: subclasses <em>must</em> call super.start() <em>after</em> they initialize themselves * (e.g., created their sockets). * <li>{@link #stop()}: subclasses <em>must</em> call super.stop() after they deinitialized themselves * <li>{@link #destroy()} * </ul> * The create() or start() method has to create a local address.<br> * The {@link #receive(Address, Address, byte[], int, int)} method must * be called by subclasses when a unicast or multicast message has been received. * @author Bela Ban * @version $Id: TP.java,v 1.77 2006/10/24 13:20:25 belaban Exp $ */public abstract class TP extends Protocol { /** The address (host and port) of this member */ protected Address local_addr=null; /** The name of the group to which this member is connected */ protected String channel_name=null; /** The interface (NIC) which should be used by this transport */ protected InetAddress bind_addr=null; /** Overrides bind_addr, -Djgroups.bind_addr and -Dbind.address: let's the OS return the local host address */ boolean use_local_host=false; /** If true, the transport should use all available interfaces to receive multicast messages * @deprecated Use {@link receive_on_all_interfaces} instead */ boolean bind_to_all_interfaces=false; /** If true, the transport should use all available interfaces to receive multicast messages */ boolean receive_on_all_interfaces=false; /** List<NetworkInterface> of interfaces to receive multicasts on. The multicast receive socket will listen * on all of these interfaces. This is a comma-separated list of IP addresses or interface names. E.g. * "192.168.5.1,eth1,127.0.0.1". Duplicates are discarded; we only bind to an interface once. * If this property is set, it override receive_on_all_interfaces. */ java.util.List receive_interfaces=null; /** If true, the transport should use all available interfaces to send multicast messages. This means * the same multicast message is sent N times, so use with care */ boolean send_on_all_interfaces=false; /** List<NetworkInterface> of interfaces to send multicasts on. The multicast send socket will send the * same multicast message on all of these interfaces. This is a comma-separated list of IP addresses or * interface names. E.g. "192.168.5.1,eth1,127.0.0.1". Duplicates are discarded. * If this property is set, it override send_on_all_interfaces. */ java.util.List send_interfaces=null; /** The port to which the transport binds. 0 means to bind to any (ephemeral) port */ int bind_port=0; int port_range=1; // 27-6-2003 bgooren, Only try one port by default /** The members of this group (updated when a member joins or leaves) */ final protected Vector members=new Vector(11); protected View view=null; /** Pre-allocated byte stream. Used for marshalling messages. Will grow as needed */ final ExposedByteArrayOutputStream out_stream=new ExposedByteArrayOutputStream(1024); final ExposedBufferedOutputStream buf_out_stream=new ExposedBufferedOutputStream(out_stream, 1024); final ExposedDataOutputStream dos=new ExposedDataOutputStream(buf_out_stream); final ExposedByteArrayInputStream in_stream=new ExposedByteArrayInputStream(new byte[]{'0'}); final ExposedBufferedInputStream buf_in_stream=new ExposedBufferedInputStream(in_stream); final DataInputStream dis=new DataInputStream(buf_in_stream); /** If true, messages sent to self are treated specially: unicast messages are * looped back immediately, multicast messages get a local copy first and - * when the real copy arrives - it will be discarded. Useful for Window * media (non)sense */ boolean loopback=false; /** Discard packets with a different version. Usually minor version differences are okay. Setting this property * to true means that we expect the exact same version on all incoming packets */ boolean discard_incompatible_packets=false; /** Sometimes receivers are overloaded (they have to handle de-serialization etc). * Packet handler is a separate thread taking care of de-serialization, receiver * thread(s) simply put packet in queue and return immediately. Setting this to * true adds one more thread */ boolean use_incoming_packet_handler=true; /** Used by packet handler to store incoming DatagramPackets */ Queue incoming_packet_queue=null; /** Dequeues DatagramPackets from packet_queue, unmarshalls them and * calls <tt>handleIncomingUdpPacket()</tt> */ IncomingPacketHandler incoming_packet_handler=null; /** Used by packet handler to store incoming Messages */ Queue incoming_msg_queue=null; IncomingMessageHandler incoming_msg_handler; /** Packets to be sent are stored in outgoing_queue and sent by a separate thread. Enabling this * value uses an additional thread */ boolean use_outgoing_packet_handler=false; /** Used by packet handler to store outgoing DatagramPackets */ BoundedLinkedQueue outgoing_queue=null; /** max number of elements in the bounded outgoing_queue */ int outgoing_queue_max_size=2000; OutgoingPacketHandler outgoing_packet_handler=null; /** If set it will be added to <tt>local_addr</tt>. Used to implement * for example transport independent addresses */ byte[] additional_data=null; /** Maximum number of bytes for messages to be queued until they are sent. This value needs to be smaller than the largest datagram packet size in case of UDP */ int max_bundle_size=AUTOCONF.senseMaxFragSizeStatic(); /** Max number of milliseconds until queued messages are sent. Messages are sent when max_bundle_size or * max_bundle_timeout has been exceeded (whichever occurs faster) */ long max_bundle_timeout=20; /** Enabled bundling of smaller messages into bigger ones */ boolean enable_bundling=false; private Bundler bundler=null; protected TimeScheduler timer=null; private DiagnosticsHandler diag_handler=null; boolean enable_diagnostics=true; String diagnostics_addr="224.0.0.75"; int diagnostics_port=7500; /** HashMap<Address, Address>. Keys=senders, values=destinations. For each incoming message M with sender S, adds * an entry with key=S and value= sender's IP address and port. */ HashMap addr_translation_table=new HashMap(); boolean use_addr_translation=false; TpHeader header; final String name=getName(); static final byte LIST = 1; // we have a list of messages rather than a single message when set static final byte MULTICAST = 2; // message is a multicast (versus a unicast) message when set long num_msgs_sent=0, num_msgs_received=0, num_bytes_sent=0, num_bytes_received=0; static NumberFormat f; static { f=NumberFormat.getNumberInstance(); f.setGroupingUsed(false); f.setMaximumFractionDigits(2); } /** * Creates the TP protocol, and initializes the * state variables, does however not start any sockets or threads. */ protected TP() { } /** * debug only */ public String toString() { return name + "(local address: " + local_addr + ')'; } public void resetStats() { num_msgs_sent=num_msgs_received=num_bytes_sent=num_bytes_received=0; } public long getNumMessagesSent() {return num_msgs_sent;} public long getNumMessagesReceived() {return num_msgs_received;} public long getNumBytesSent() {return num_bytes_sent;} public long getNumBytesReceived() {return num_bytes_received;} public String getBindAddress() {return bind_addr != null? bind_addr.toString() : "null";} public void setBindAddress(String bind_addr) throws UnknownHostException { this.bind_addr=InetAddress.getByName(bind_addr); } /** @deprecated Use {@link #isReceiveOnAllInterfaces()} instead */ public boolean getBindToAllInterfaces() {return receive_on_all_interfaces;} public void setBindToAllInterfaces(boolean flag) {this.receive_on_all_interfaces=flag;} public boolean isReceiveOnAllInterfaces() {return receive_on_all_interfaces;} public java.util.List getReceiveInterfaces() {return receive_interfaces;} public boolean isSendOnAllInterfaces() {return send_on_all_interfaces;} public java.util.List getSendInterfaces() {return send_interfaces;} public boolean isDiscardIncompatiblePackets() {return discard_incompatible_packets;} public void setDiscardIncompatiblePackets(boolean flag) {discard_incompatible_packets=flag;} public boolean isEnableBundling() {return enable_bundling;} public void setEnableBundling(boolean flag) {enable_bundling=flag;} public int getMaxBundleSize() {return max_bundle_size;} public void setMaxBundleSize(int size) {max_bundle_size=size;} public long getMaxBundleTimeout() {return max_bundle_timeout;} public void setMaxBundleTimeout(long timeout) {max_bundle_timeout=timeout;} public int getOutgoingQueueSize() {return outgoing_queue != null? outgoing_queue.size() : 0;} public int getIncomingQueueSize() {return incoming_packet_queue != null? incoming_packet_queue.size() : 0;} public Address getLocalAddress() {return local_addr;} public String getChannelName() {return channel_name;} public boolean isLoopback() {return loopback;} public void setLoopback(boolean b) {loopback=b;} public boolean isUseIncomingPacketHandler() {return use_incoming_packet_handler;} public boolean isUseOutgoingPacketHandler() {return use_outgoing_packet_handler;} public int getOutgoingQueueMaxSize() {return outgoing_queue != null? outgoing_queue_max_size : 0;} public void setOutgoingQueueMaxSize(int new_size) { if(outgoing_queue != null) { outgoing_queue.setCapacity(new_size); outgoing_queue_max_size=new_size; } } public Map dumpStats() { Map retval=super.dumpStats(); if(retval == null) retval=new HashMap(); retval.put("num_msgs_sent", new Long(num_msgs_sent)); retval.put("num_msgs_received", new Long(num_msgs_received)); retval.put("num_bytes_sent", new Long(num_bytes_sent)); retval.put("num_bytes_received", new Long(num_bytes_received)); return retval; } /** * Send to all members in the group. UDP would use an IP multicast message, whereas TCP would send N * messages, one for each member * @param data The data to be sent. This is not a copy, so don't modify it * @param offset * @param length * @throws Exception */ public abstract void sendToAllMembers(byte[] data, int offset, int length) throws Exception; /** * Send to all members in the group. UDP would use an IP multicast message, whereas TCP would send N * messages, one for each member * @param dest Must be a non-null unicast address * @param data The data to be sent. This is not a copy, so don't modify it * @param offset * @param length * @throws Exception */ public abstract void sendToSingleMember(Address dest, byte[] data, int offset, int length) throws Exception; public abstract String getInfo(); public abstract void postUnmarshalling(Message msg, Address dest, Address src, boolean multicast); public abstract void postUnmarshallingList(Message msg, Address dest, boolean multicast); private StringBuffer _getInfo() { StringBuffer sb=new StringBuffer(); sb.append(local_addr).append(" (").append(channel_name).append(") ").append("\n"); sb.append("local_addr=").append(local_addr).append("\n"); sb.append("group_name=").append(channel_name).append("\n"); sb.append("version=").append(Version.description).append(", cvs=\"").append(Version.cvs).append("\"\n"); sb.append("view: ").append(view).append('\n'); sb.append(getInfo()); return sb; } private void handleDiagnosticProbe(SocketAddress sender, DatagramSocket sock, String request) { try { StringTokenizer tok=new StringTokenizer(request); String req=tok.nextToken(); StringBuffer info=new StringBuffer("n/a"); if(req.trim().toLowerCase().startsWith("query")) { ArrayList l=new ArrayList(tok.countTokens()); while(tok.hasMoreTokens()) l.add(tok.nextToken().trim().toLowerCase()); info=_getInfo(); if(l.contains("jmx")) { Channel ch=stack.getChannel(); if(ch != null) { Map m=ch.dumpStats(); StringBuffer sb=new StringBuffer(); sb.append("stats:\n"); for(Iterator it=m.entrySet().iterator(); it.hasNext();) { sb.append(it.next()).append("\n"); } info.append(sb); } } if(l.contains("props")) { String p=stack.printProtocolSpecAsXML(); info.append("\nprops:\n").append(p); } } byte[] diag_rsp=info.toString().getBytes(); if(log.isDebugEnabled()) log.debug("sending diag response to " + sender); sendResponse(sock, sender, diag_rsp); } catch(Throwable t) { if(log.isErrorEnabled()) log.error("failed sending diag rsp to " + sender, t); } } private static void sendResponse(DatagramSocket sock, SocketAddress sender, byte[] buf) throws IOException { DatagramPacket p=new DatagramPacket(buf, 0, buf.length, sender); sock.send(p); }
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?