📄 streaming_state_transfer.java
字号:
if (state_requesters.isEmpty()) { if (warn) log.warn("Should be responding to state requester, but there are no requesters !"); return; } if (digest == null && isDigestNeeded()) { if (warn) log.warn("Should be responding to state requester, but there is no digest !"); else digest = digest.copy(); } if (log.isDebugEnabled()) log.debug("Iterating state requesters " + state_requesters); for (Iterator it = state_requesters.keySet().iterator(); it.hasNext();) { String tmp_state_id = (String) it.next(); Set requesters = (Set) state_requesters.get(tmp_state_id); for (Iterator iter = requesters.iterator(); iter.hasNext();) { Address requester = (Address) iter.next(); Message state_rsp = new Message(requester); StateHeader hdr = new StateHeader(StateHeader.STATE_RSP, local_addr, spawner.getServerSocketAddress(), digest, tmp_state_id); state_rsp.putHeader(NAME, hdr); if (log.isDebugEnabled()) log.debug("Responding to state requester " + requester + " with address " + spawner.getServerSocketAddress() + " and digest " + digest); passDown(new Event(Event.MSG, state_rsp)); if (stats) { num_state_reqs++; } } } } } private boolean startFlush(long timeout) { boolean successfulFlush = false; passUp(new Event(Event.SUSPEND)); try { flush_promise.reset(); flush_promise.getResultWithTimeout(timeout); successfulFlush = true; } catch (TimeoutException e) { log.warn("Initiator of flush and state requesting member " + local_addr + " timed out waiting for flush responses after " + timeout + " msec"); } return successfulFlush; } private void stopFlush() { passUp(new Event(Event.RESUME)); } private PooledExecutor setupThreadPool() { PooledExecutor threadPool = new PooledExecutor(max_pool); threadPool.waitWhenBlocked(); threadPool.setMinimumPoolSize(1); threadPool.setKeepAliveTime(pool_thread_keep_alive); threadPool.setThreadFactory(new ThreadFactory() { public Thread newThread(final Runnable command) { synchronized (poolLock) { threadCounter++; } return new Thread(Util.getGlobalThreadGroup(), "STREAMING_STATE_TRANSFER.poolid=" + threadCounter) { public void run() { if (log.isDebugEnabled()) { log.debug(Thread.currentThread() + " started."); } command.run(); if (log.isDebugEnabled()) { log.debug(Thread.currentThread() + " stopped."); } } }; } }); return threadPool; } private Address determineCoordinator() { Address ret = null; synchronized (members) { if (members != null && !members.isEmpty()) { for (int i = 0; i < members.size(); i++) if (!local_addr.equals(members.elementAt(i))) return (Address) members.elementAt(i); } } return ret; } private void handleViewChange(View v) { Address old_coord; Vector new_members = v.getMembers(); boolean send_up_null_state_rsp = false; synchronized (members) { old_coord = (Address) (members.size() > 0 ? members.firstElement() : null); members.clear(); members.addAll(new_members); // this handles the case where a coord dies during a state transfer; prevents clients from hanging forever // Note this only takes a coordinator crash into account, a getState(target, timeout), where target is not // null is not handled ! (Usually we get the state from the coordinator) // http://jira.jboss.com/jira/browse/JGRP-148 if (waiting_for_state_response && old_coord != null && !members.contains(old_coord)) { send_up_null_state_rsp = true; } } if (send_up_null_state_rsp) { log.warn("discovered that the state provider (" + old_coord + ") crashed; will return null state to application"); } } private void handleStateReq(StateHeader hdr) { Object sender = hdr.sender; if (sender == null) { if (log.isErrorEnabled()) log.error("sender is null !"); return; } String id = hdr.state_id; synchronized (state_requesters) { boolean empty = state_requesters.isEmpty(); Set requesters = (Set) state_requesters.get(id); if (requesters == null) { requesters = new HashSet(); } requesters.add(sender); state_requesters.put(id, requesters); if (!isDigestNeeded()) { respondToStateRequester(); } else if (empty) { digest = null; if (log.isDebugEnabled()) log.debug("passing down GET_DIGEST_STATE"); passDown(new Event(Event.GET_DIGEST_STATE)); } } } void handleStateRsp(StateHeader hdr) { Digest tmp_digest = hdr.my_digest; waiting_for_state_response = false; if (isDigestNeeded()) { if (tmp_digest == null) { if (warn) log.warn("digest received from " + hdr.sender + " is null, skipping setting digest !"); } else { // set the digest (e.g.in NAKACK) passDown(new Event(Event.SET_DIGEST, tmp_digest)); } } connectToStateProvider(hdr); } void removeFromStateRequesters(Address address, String state_id) { synchronized (state_requesters) { Set requesters = (Set) state_requesters.get(state_id); if (requesters != null && !requesters.isEmpty()) { boolean removed = requesters.remove(address); if (log.isDebugEnabled()) { log.debug("Attempted to clear " + address + " from requesters, successful=" + removed); } if (requesters.isEmpty()) { state_requesters.remove(state_id); if (log.isDebugEnabled()) { log.debug("Cleared all requesters for state " + state_id + ",state_requesters=" + state_requesters); } } } } } private void connectToStateProvider(StateHeader hdr) { IpAddress address = hdr.bind_addr; String tmp_state_id = hdr.getStateId(); StreamingInputStreamWrapper wrapper = null; StateTransferInfo sti = null; Socket socket = new Socket(); try { socket.bind(new InetSocketAddress(bind_addr, 0)); int bufferSize = socket.getReceiveBufferSize(); socket.setReceiveBufferSize(socket_buffer_size); if (log.isDebugEnabled()) log.debug("Connecting to state provider " + address.getIpAddress() + ":" + address.getPort() + ", original buffer size was " + bufferSize + " and was reset to " + socket.getReceiveBufferSize()); socket.connect(new InetSocketAddress(address.getIpAddress(), address.getPort())); if (log.isDebugEnabled()) log.debug("Connected to state provider, my end of the socket is " + socket.getLocalAddress() + ":" + socket.getLocalPort() + " passing inputstream up..."); //write out our state_id and address so state provider can clear this request ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream()); out.writeObject(tmp_state_id); out.writeObject(local_addr); wrapper = new StreamingInputStreamWrapper(socket); sti = new StateTransferInfo(hdr.sender, wrapper, tmp_state_id); } catch (IOException e) { if (warn) { log.warn("State reader socket thread spawned abnormaly", e); } //pass null stream up so that JChannel.getState() returns false InputStream is = null; sti = new StateTransferInfo(hdr.sender, is, tmp_state_id); } finally { if (!socket.isConnected()) { if (warn) log.warn("Could not connect to state provider. Closing socket..."); try { if (wrapper != null) { wrapper.close(); } else { socket.close(); } } catch (IOException e) { } //since socket did not connect properly we have to //clear our entry in state providers hashmap "manually" Message m = new Message(hdr.sender); StateHeader mhdr = new StateHeader(StateHeader.STATE_REMOVE_REQUESTER, local_addr, tmp_state_id); m.putHeader(NAME, mhdr); passDown(new Event(Event.MSG, m)); } passStreamUp(sti); } } private void passStreamUp(final StateTransferInfo sti) { Runnable readingThread = new Runnable() { public void run() { passUp(new Event(Event.STATE_TRANSFER_INPUTSTREAM, sti)); } }; if (use_reading_thread) { new Thread(Util.getGlobalThreadGroup(), readingThread, "STREAMING_STATE_TRANSFER.reader").start(); } else { readingThread.run(); } } /* * ------------------------ End of Private Methods * ------------------------------ */ private class StateProviderThreadSpawner implements Runnable { PooledExecutor pool; ServerSocket serverSocket; IpAddress address; Thread runner; volatile boolean running = true; public StateProviderThreadSpawner(PooledExecutor pool, ServerSocket stateServingSocket) { super(); this.pool = pool; this.serverSocket = stateServingSocket; this.address = new IpAddress(STREAMING_STATE_TRANSFER.this.bind_addr, serverSocket.getLocalPort()); } public void run() { runner = Thread.currentThread(); for (; running;) { try { if (log.isDebugEnabled()) log.debug("StateProviderThreadSpawner listening at " + getServerSocketAddress() + "..."); if (log.isDebugEnabled()) log.debug("Pool has " + pool.getPoolSize() + " active threads"); final Socket socket = serverSocket.accept(); pool.execute(new Runnable() { public void run() { if (log.isDebugEnabled()) log.debug("Accepted request for state transfer from " + socket.getInetAddress() + ":" + socket.getPort() + " handing of to PooledExecutor thread"); new StateProviderHandler().process(socket); } }); } catch (IOException e) { if (warn) { //we get this exception when we close server socket //exclude that case if (serverSocket != null && !serverSocket.isClosed()) { log.warn("Spawning socket from server socket finished abnormaly", e); } } } catch (InterruptedException e) { // should not happen } } } public IpAddress getServerSocketAddress() { return address; } public void stop() { running = false; try { if (serverSocket != null && !serverSocket.isClosed()) { serverSocket.close(); } } catch (IOException e) { } finally { if (log.isDebugEnabled()) log.debug("Waiting for StateProviderThreadSpawner to die ... "); if (runner != null)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -