📄 multiplexer.java
字号:
case Event.GET_APPLSTATE:
case Event.STATE_TRANSFER_OUTPUTSTREAM:
handleStateRequest(evt);
break;
case Event.GET_STATE_OK:
case Event.STATE_TRANSFER_INPUTSTREAM:
handleStateResponse(evt);
break;
case Event.SET_LOCAL_ADDRESS:
local_addr=(Address)evt.getArg();
passToAllMuxChannels(evt);
break;
case Event.BLOCK:
blocked=true;
int num_services=services.size();
if(num_services == 0) {
channel.blockOk();
return;
}
block_ok_collector.reset();
passToAllMuxChannels(evt);
block_ok_collector.waitUntil(num_services);
channel.blockOk();
return;
case Event.UNBLOCK: // process queued-up MergeViews
if(!blocked) {
passToAllMuxChannels(evt);
return;
}
else
blocked=false;
if(temp_merge_view != null) {
if(log.isTraceEnabled())
log.trace("calling handleMergeView() from UNBLOCK (flush_present=" + flush_present + ")");
try {
Thread merge_handler=new Thread() {
public void run() {
try {
handleMergeView(temp_merge_view);
}
catch(Exception e) {
if(log.isErrorEnabled())
log.error("problems handling merge view", e);
}
}
};
merge_handler.setName("merge handler (unblock)");
merge_handler.setDaemon(false);
merge_handler.start();
}
catch(Exception e) {
if(log.isErrorEnabled())
log.error("failed handling merge view", e);
}
}
passToAllMuxChannels(evt);
break;
default:
passToAllMuxChannels(evt);
break;
}
}
public Channel createMuxChannel(JChannelFactory f, String id, String stack_name) throws Exception {
MuxChannel ch;
synchronized(services) {
if(services.containsKey(id))
throw new Exception("service ID \"" + id + "\" is already registered, cannot register duplicate ID");
ch=new MuxChannel(f, channel, id, stack_name, this);
services.put(id, ch);
}
return ch;
}
private void passToAllMuxChannels(Event evt) {
for(Iterator it=services.values().iterator(); it.hasNext();) {
MuxChannel ch=(MuxChannel)it.next();
ch.up(evt);
}
}
public MuxChannel remove(String id) {
synchronized(services) {
return (MuxChannel)services.remove(id);
}
}
/** Closes the underlying JChannel if all MuxChannels have been disconnected */
public void disconnect() {
MuxChannel mux_ch;
boolean all_disconnected=true;
synchronized(services) {
for(Iterator it=services.values().iterator(); it.hasNext();) {
mux_ch=(MuxChannel)it.next();
if(mux_ch.isConnected()) {
all_disconnected=false;
break;
}
}
if(all_disconnected) {
if(log.isTraceEnabled()) {
log.trace("disconnecting underlying JChannel as all MuxChannels are disconnected");
}
channel.disconnect();
}
}
}
public void unregister(String appl_id) {
synchronized(services) {
services.remove(appl_id);
}
}
public boolean close() {
MuxChannel mux_ch;
boolean all_closed=true;
synchronized(services) {
for(Iterator it=services.values().iterator(); it.hasNext();) {
mux_ch=(MuxChannel)it.next();
if(mux_ch.isOpen()) {
all_closed=false;
break;
}
}
if(all_closed) {
if(log.isTraceEnabled()) {
log.trace("closing underlying JChannel as all MuxChannels are closed");
}
channel.close();
services.clear();
}
return all_closed;
}
}
public void closeAll() {
synchronized(services) {
MuxChannel mux_ch;
for(Iterator it=services.values().iterator(); it.hasNext();) {
mux_ch=(MuxChannel)it.next();
mux_ch.setConnected(false);
mux_ch.setClosed(true);
mux_ch.closeMessageQueue(true);
}
}
}
public boolean shutdown() {
MuxChannel mux_ch;
boolean all_closed=true;
synchronized(services) {
for(Iterator it=services.values().iterator(); it.hasNext();) {
mux_ch=(MuxChannel)it.next();
if(mux_ch.isOpen()) {
all_closed=false;
break;
}
}
if(all_closed) {
if(log.isTraceEnabled()) {
log.trace("shutting down underlying JChannel as all MuxChannels are closed");
}
channel.shutdown();
services.clear();
}
return all_closed;
}
}
private boolean isFlushPresent() {
return channel.getProtocolStack().findProtocol("FLUSH") != null;
}
private void sendServiceState() throws Exception {
Object[] my_services=services.keySet().toArray();
byte[] data=Util.objectToByteBuffer(my_services);
ServiceInfo sinfo=new ServiceInfo(ServiceInfo.LIST_SERVICES_RSP, null, channel.getLocalAddress(), data);
Message rsp=new Message(); // send to everyone
MuxHeader hdr=new MuxHeader(sinfo);
rsp.putHeader(NAME, hdr);
channel.send(rsp);
}
private Address getLocalAddress() {
if(local_addr != null)
return local_addr;
if(channel != null)
local_addr=channel.getLocalAddress();
return local_addr;
}
private Address getCoordinator() {
if(channel != null) {
View v=channel.getView();
if(v != null) {
Vector members=v.getMembers();
if(members != null && members.size() > 0) {
return (Address)members.firstElement();
}
}
}
return null;
}
public Address getServiceCoordinator(String service_id) {
List hosts=(List)service_state.get(service_id);
if(hosts == null || hosts.size() == 0)
return null;
return (Address)hosts.get(0);
}
private void sendServiceMessage(byte type, String service, Address host) throws Exception {
if(host == null)
host=getLocalAddress();
if(host == null) {
if(log.isWarnEnabled()) {
log.warn("local_addr is null, cannot send ServiceInfo." + ServiceInfo.typeToString(type) + " message");
}
return;
}
ServiceInfo si=new ServiceInfo(type, service, host, null);
MuxHeader hdr=new MuxHeader(si);
Message service_msg=new Message();
service_msg.putHeader(NAME, hdr);
channel.send(service_msg);
}
private void handleStateRequest(Event evt) {
StateTransferInfo info=(StateTransferInfo)evt.getArg();
String id=info.state_id;
String original_id=id;
MuxChannel mux_ch=null;
try {
int index=id.indexOf(SEPARATOR);
if(index > -1) {
info.state_id=id.substring(index + SEPARATOR_LEN);
id=id.substring(0, index); // similar reuse as above...
}
else {
info.state_id=null;
}
mux_ch=(MuxChannel)services.get(id);
if(mux_ch == null)
throw new IllegalArgumentException("didn't find service with ID=" + id + " to fetch state from");
// evt.setArg(info);
mux_ch.up(evt); // state_id will be null, get regular state from the service named state_id
}
catch(Throwable ex) {
if(log.isErrorEnabled())
log.error("failed returning the application state, will return null", ex);
channel.returnState(null, original_id); // we cannot use mux_ch because it might be null due to the lookup above
}
}
private void handleStateResponse(Event evt) {
StateTransferInfo info=(StateTransferInfo)evt.getArg();
MuxChannel mux_ch;
String appl_id, substate_id, tmp;
tmp=info.state_id;
if(tmp == null) {
if(log.isTraceEnabled())
log.trace("state is null, not passing up: " + info);
return;
}
int index=tmp.indexOf(SEPARATOR);
if(index > -1) {
appl_id=tmp.substring(0, index);
substate_id=tmp.substring(index+SEPARATOR_LEN);
}
else {
appl_id=tmp;
substate_id=null;
}
mux_ch=(MuxChannel)services.get(appl_id);
if(mux_ch == null) {
log.error("didn't find service with ID=" + appl_id + " to fetch state from");
}
else {
StateTransferInfo tmp_info=info.copy();
tmp_info.state_id=substate_id;
evt.setArg(tmp_info);
mux_ch.up(evt); // state_id will be null, get regular state from the service named state_id
}
}
private void handleServiceStateRequest(ServiceInfo info, Address sender) throws Exception {
switch(info.type) {
case ServiceInfo.STATE_REQ:
byte[] state;
synchronized(service_state) {
state=Util.objectToByteBuffer(service_state);
}
ServiceInfo si=new ServiceInfo(ServiceInfo.STATE_RSP, null, null, state);
MuxHeader hdr=new MuxHeader(si);
Message state_rsp=new Message(sender);
state_rsp.putHeader(NAME, hdr);
channel.send(state_rsp);
break;
case ServiceInfo.STATE_RSP:
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -