📄 tcp.java.txt
字号:
return; } } passUp(evt); } // ConnectionTable.ConnectionListener interface// public void connectionOpened(Address peer_addr) {// if(log.isTraceEnabled()) log.trace("opened connection to " + peer_addr);// }//// public void connectionClosed(Address peer_addr) {// if(peer_addr != null)// if(log.isTraceEnabled()) log.trace("closed connection to " + peer_addr);// } /** Setup the Protocol instance acording to the configuration string */ public boolean setProperties(Properties props) { String str, tmp=null; super.setProperties(props); str=props.getProperty("start_port"); if(str != null) { start_port=Integer.parseInt(str); props.remove("start_port"); } str=props.getProperty("end_port"); if(str != null) { end_port=Integer.parseInt(str); props.remove("end_port"); } // PropertyPermission not granted if running in an untrusted environment with JNLP. try { tmp=System.getProperty("bind.address"); // set by JBoss if(Boolean.getBoolean(IGNORE_BIND_ADDRESS_PROPERTY)) { tmp=null; } } catch (SecurityException ex){ } if(tmp != null) str=tmp; else str=props.getProperty("bind_addr"); if(str != null) { try { bind_addr=InetAddress.getByName(str); } catch(UnknownHostException unknown) { if(log.isFatalEnabled()) log.fatal("(bind_addr): host " + str + " not known"); return false; } props.remove("bind_addr"); } str=props.getProperty("external_addr"); if(str != null) { try { external_addr=InetAddress.getByName(str); } catch(UnknownHostException unknown) { if(log.isFatalEnabled()) log.fatal("(external_addr): host " + str + " not known"); return false; } props.remove("external_addr"); } str=props.getProperty("reaper_interval"); if(str != null) { reaper_interval=Long.parseLong(str); props.remove("reaper_interval"); } str=props.getProperty("conn_expire_time"); if(str != null) { conn_expire_time=Long.parseLong(str); props.remove("conn_expire_time"); } str=props.getProperty("sock_conn_timeout"); if(str != null) { sock_conn_timeout=Integer.parseInt(str); props.remove("sock_conn_timeout"); } str=props.getProperty("recv_buf_size"); if(str != null) { recv_buf_size=Integer.parseInt(str); props.remove("recv_buf_size"); } str=props.getProperty("send_buf_size"); if(str != null) { send_buf_size=Integer.parseInt(str); props.remove("send_buf_size"); } str=props.getProperty("loopback"); if(str != null) { loopback=Boolean.valueOf(str).booleanValue(); props.remove("loopback"); } str=props.getProperty("skip_suspected_members"); if(str != null) { skip_suspected_members=Boolean.valueOf(str).booleanValue(); props.remove("skip_suspected_members"); } if(props.size() > 0) { log.error("TCP.setProperties(): the following properties are not recognized: " + props); return false; } return true; } /** If the sender is null, set our own address. We cannot just go ahead and set the address anyway, as we might be sending a message on behalf of someone else ! E.g. in case of retransmission, when the original sender has crashed, or in a FLUSH protocol when we have to return all unstable messages with the FLUSH_OK response. */ private void setSourceAddress(Message msg) { if(msg.getSrc() == null) msg.setSrc(local_addr); } /** Send a message to the address specified in msg.dest */ private void sendUnicastMessage(Message msg) { IpAddress dest; Message copy; Object hdr; Event evt; dest=(IpAddress)msg.getDest(); // guaranteed not to be null if(!(dest instanceof IpAddress)) { if(log.isErrorEnabled()) log.error("destination address is not of type IpAddress !"); return; } setSourceAddress(msg); /* Don't send if destination is local address. Instead, switch dst and src and put in up_queue */ if(loopback && local_addr != null && dest != null && dest.equals(local_addr)) { copy=msg.copy(); hdr=copy.getHeader(name); if(hdr != null && hdr instanceof TcpHeader) copy.removeHeader(name); copy.setSrc(local_addr); copy.setDest(local_addr); evt=new Event(Event.MSG, copy); /* Because Protocol.up() is never called by this bottommost layer, we call up() directly in the observer. This allows e.g. PerfObserver to get the time of reception of a message */ if(observer != null) observer.up(evt, up_queue.size()); passUp(evt); return; } if(log.isTraceEnabled()) log.trace("dest=" + msg.getDest() + ", hdrs:\n" + msg.printObjectHeaders()); try { if(skip_suspected_members) { if(suspected_mbrs.contains(dest)) { if(log.isTraceEnabled()) log.trace("will not send unicast message to " + dest + " as it is currently suspected"); return; } } ct.send(msg); } catch(SocketException e) { if(members.contains(dest)) { if(!suspected_mbrs.contains(dest)) { suspected_mbrs.add(dest); passUp(new Event(Event.SUSPECT, dest)); } } } } protected void sendMulticastMessage(Message msg) { Address dest; Vector mbrs=(Vector)members.clone(); for(int i=0; i < mbrs.size(); i++) { dest=(Address)mbrs.elementAt(i); msg.setDest(dest); sendUnicastMessage(msg); } } protected void handleDownEvent(Event evt) { switch(evt.getType()) { case Event.TMP_VIEW: case Event.VIEW_CHANGE: suspected_mbrs.removeAll(); synchronized(members) { members.clear(); members.addAll(((View)evt.getArg()).getMembers()); } break; case Event.GET_LOCAL_ADDRESS: // return local address -> Event(SET_LOCAL_ADDRESS, local) passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr)); break; case Event.CONNECT: group_addr=(String)evt.getArg(); // removed March 18 2003 (bela), not needed (handled by GMS) // Can't remove it; otherwise TCPGOSSIP breaks (bela May 8 2003) ! passUp(new Event(Event.CONNECT_OK)); break; case Event.DISCONNECT: passUp(new Event(Event.DISCONNECT_OK)); break; case Event.CONFIG: if(log.isTraceEnabled()) log.trace("received CONFIG event: " + evt.getArg()); handleConfigEvent((HashMap)evt.getArg()); break; } } void handleConfigEvent(HashMap map) { if(map == null) return; if(map.containsKey("additional_data")) additional_data=(byte[])map.get("additional_data"); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -