messagedispatcher.java
来自「JGRoups源码」· Java 代码 · 共 945 行 · 第 1/3 页
JAVA
945 行
membership_listener=l; } public final void setRequestHandler(RequestHandler rh) { req_handler=rh; } /** * Offers access to the underlying Channel. * @return a reference to the underlying Channel. */ public Channel getChannel() { return channel; } public void send(Message msg) throws ChannelNotConnectedException, ChannelClosedException { if(channel != null) { channel.send(msg); } else if(adapter != null) { try { if(id != null) { adapter.send(id, msg); } else { adapter.send(msg); } } catch(Throwable ex) { if(log.isErrorEnabled()) { log.error("exception=" + Util.print(ex)); } } } else { if(log.isErrorEnabled()) { log.error("channel == null"); } } } /** * Cast a message to all members, and wait for <code>mode</code> responses. The responses are returned in a response * list, where each response is associated with its sender.<p> Uses <code>GroupRequest</code>. * * @param dests The members to which the message is to be sent. If it is null, then the message is sent to all * members * @param msg The message to be sent to n members * @param mode Defined in <code>GroupRequest</code>. The number of responses to wait for: <ol> <li>GET_FIRST: * return the first response received. <li>GET_ALL: wait for all responses (minus the ones from * suspected members) <li>GET_MAJORITY: wait for a majority of all responses (relative to the grp * size) <li>GET_ABS_MAJORITY: wait for majority (absolute, computed once) <li>GET_N: wait for n * responses (may block if n > group size) <li>GET_NONE: wait for no responses, return immediately * (non-blocking) </ol> * @param timeout If 0: wait forever. Otherwise, wait for <code>mode</code> responses <em>or</em> timeout time. * @return RspList A list of responses. Each response is an <code>Object</code> and associated to its sender. */ public RspList castMessage(final Vector dests, Message msg, int mode, long timeout) { GroupRequest _req=null; Vector real_dests; Channel tmp; // we need to clone because we don't want to modify the original // (we remove ourselves if LOCAL is false, see below) ! // real_dests=dests != null ? (Vector) dests.clone() : (members != null ? new Vector(members) : null); if(dests != null) { real_dests=(Vector)dests.clone(); } else { synchronized(members) { real_dests=new Vector(members); } } // if local delivery is off, then we should not wait for the message from the local member. // therefore remove it from the membership tmp=channel; if(tmp == null) { if(adapter != null && adapter.getTransport() instanceof Channel) { tmp=(Channel) adapter.getTransport(); } } if(tmp != null && tmp.getOpt(Channel.LOCAL).equals(Boolean.FALSE)) { if(local_addr == null) { local_addr=tmp.getLocalAddress(); } if(local_addr != null && real_dests != null) { real_dests.removeElement(local_addr); } } // don't even send the message if the destination list is empty if(log.isTraceEnabled()) log.trace("real_dests=" + real_dests); if(real_dests == null || real_dests.size() == 0) { if(log.isTraceEnabled()) log.trace("destination list is empty, won't send message"); return new RspList(); // return empty response list } _req=new GroupRequest(msg, corr, real_dests, mode, timeout, 0); _req.setCaller(this.local_addr); try { _req.execute(); } catch(Exception ex) { throw new RuntimeException("failed executing request " + _req, ex); } return _req.getResults(); } /** * Multicast a message request to all members in <code>dests</code> and receive responses via the RspCollector * interface. When done receiving the required number of responses, the caller has to call done(req_id) on the * underlyinh RequestCorrelator, so that the resources allocated to that request can be freed. * * @param dests The list of members from which to receive responses. Null means all members * @param req_id The ID of the request. Used by the underlying RequestCorrelator to correlate responses with * requests * @param msg The request to be sent * @param coll The sender needs to provide this interface to collect responses. Call will return immediately if * this is null */ public void castMessage(final Vector dests, long req_id, Message msg, RspCollector coll) { Vector real_dests; Channel tmp; if(msg == null) { if(log.isErrorEnabled()) log.error("request is null"); return; } if(coll == null) { if(log.isErrorEnabled()) log.error("response collector is null (must be non-null)"); return; } // we need to clone because we don't want to modify the original // (we remove ourselves if LOCAL is false, see below) ! //real_dests=dests != null ? (Vector) dests.clone() : (Vector) members.clone(); if(dests != null) { real_dests=(Vector)dests.clone(); } else { synchronized(members) { real_dests=new Vector(members); } } // if local delivery is off, then we should not wait for the message from the local member. // therefore remove it from the membership tmp=channel; if(tmp == null) { if(adapter != null && adapter.getTransport() instanceof Channel) { tmp=(Channel) adapter.getTransport(); } } if(tmp != null && tmp.getOpt(Channel.LOCAL).equals(Boolean.FALSE)) { if(local_addr == null) { local_addr=tmp.getLocalAddress(); } if(local_addr != null) { real_dests.removeElement(local_addr); } } // don't even send the message if the destination list is empty if(real_dests.size() == 0) { if(log.isDebugEnabled()) log.debug("destination list is empty, won't send message"); return; } try { corr.sendRequest(req_id, real_dests, msg, coll); } catch(Exception e) { throw new RuntimeException("failure sending request " + req_id + " to " + real_dests, e); } } public void done(long req_id) { corr.done(req_id); } /** * Sends a message to a single member (destination = msg.dest) and returns the response. The message's destination * must be non-zero ! */ public Object sendMessage(Message msg, int mode, long timeout) throws TimeoutException, SuspectedException { Vector mbrs=new Vector(); RspList rsp_list=null; Object dest=msg.getDest(); Rsp rsp; GroupRequest _req=null; if(dest == null) { if(log.isErrorEnabled()) log.error("the message's destination is null, cannot send message"); return null; } mbrs.addElement(dest); // dummy membership (of destination address) _req=new GroupRequest(msg, corr, mbrs, mode, timeout, 0); _req.setCaller(local_addr); try { _req.execute(); } catch(Exception t) { throw new RuntimeException("failed executing request " + _req, t); } if(mode == GroupRequest.GET_NONE) { return null; } rsp_list=_req.getResults(); if(rsp_list.size() == 0) { if(log.isWarnEnabled()) log.warn(" response list is empty"); return null; } if(rsp_list.size() > 1) { if(log.isWarnEnabled()) log.warn("response list contains more that 1 response; returning first response !"); } rsp=(Rsp)rsp_list.elementAt(0); if(rsp.wasSuspected()) { throw new SuspectedException(dest); } if(!rsp.wasReceived()) { throw new TimeoutException(); } return rsp.getValue(); }// public void channelConnected(Channel channel) {// if(channel != null) {// Address new_local_addr=channel.getLocalAddress();// if(new_local_addr != null) {// this.local_addr=new_local_addr;//// if(log.isInfoEnabled()) log.info("MessageDispatcher.channelConnected()", "new local address is " + this.local_addr);// }// }// }//// public void channelDisconnected(Channel channel) {// }//// public void channelClosed(Channel channel) {// }//// public void channelShunned() {// }//// public void channelReconnected(Address addr) {// if(channel != null) {// Address new_local_addr=channel.getLocalAddress();// if(new_local_addr != null) {// this.local_addr=new_local_addr;//// if(log.isInfoEnabled()) log.info("MessageDispatcher.channelReconnected()", "new local address is " + this.local_addr);// }// }// } /* ------------------------ RequestHandler Interface ---------------------- */ public Object handle(Message msg) { if(req_handler != null) { return req_handler.handle(msg); } else { return null; } } /* -------------------- End of RequestHandler Interface ------------------- */ class ProtocolAdapter extends Protocol implements UpHandler { /* ------------------------- Protocol Interface --------------------------- */ public String getName() { return "MessageDispatcher"; } public void startUpHandler() { // do nothing, DON'T REMOVE !!!! } public void startDownHandler() { // do nothing, DON'T REMOVE !!!! }
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?