⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 streaming_state_transfer.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
         if (state_requesters.isEmpty())         {            if (warn)               log.warn("Should be responding to state requester, but there are no requesters !");            return;         }         if (digest == null && isDigestNeeded())         {            if (warn)               log.warn("Should be responding to state requester, but there is no digest !");            else               digest = digest.copy();         }         if (log.isDebugEnabled())            log.debug("Iterating state requesters " + state_requesters);         for (Iterator it = state_requesters.keySet().iterator(); it.hasNext();)         {            String tmp_state_id = (String) it.next();            Set requesters = (Set) state_requesters.get(tmp_state_id);            for (Iterator iter = requesters.iterator(); iter.hasNext();)            {               Address requester = (Address) iter.next();               Message state_rsp = new Message(requester);               StateHeader hdr = new StateHeader(StateHeader.STATE_RSP, local_addr, spawner.getServerSocketAddress(),                     digest, tmp_state_id);               state_rsp.putHeader(NAME, hdr);               if (log.isDebugEnabled())                  log.debug("Responding to state requester " + requester + " with address "                        + spawner.getServerSocketAddress() + " and digest " + digest);               passDown(new Event(Event.MSG, state_rsp));               if (stats)               {                  num_state_reqs++;               }            }         }      }   }   private boolean startFlush(long timeout)   {      boolean successfulFlush = false;      passUp(new Event(Event.SUSPEND));      try      {         flush_promise.reset();         flush_promise.getResultWithTimeout(timeout);         successfulFlush = true;      }      catch (TimeoutException e)      {         log.warn("Initiator of flush and state requesting member " + local_addr               + " timed out waiting for flush responses after "                + timeout + " msec");      }      return successfulFlush;   }   private void stopFlush()   {      passUp(new Event(Event.RESUME));   }   private PooledExecutor setupThreadPool()   {      PooledExecutor threadPool = new PooledExecutor(max_pool);      threadPool.waitWhenBlocked();      threadPool.setMinimumPoolSize(1);      threadPool.setKeepAliveTime(pool_thread_keep_alive);      threadPool.setThreadFactory(new ThreadFactory()      {         public Thread newThread(final Runnable command)         {            synchronized (poolLock)            {               threadCounter++;            }            return new Thread(Util.getGlobalThreadGroup(), "STREAMING_STATE_TRANSFER.poolid=" + threadCounter)            {               public void run()               {                  if (log.isDebugEnabled())                  {                     log.debug(Thread.currentThread() + " started.");                  }                  command.run();                  if (log.isDebugEnabled())                  {                     log.debug(Thread.currentThread() + " stopped.");                  }               }            };         }      });      return threadPool;   }   private Address determineCoordinator()   {      Address ret = null;      synchronized (members)      {         if (members != null && !members.isEmpty())         {            for (int i = 0; i < members.size(); i++)               if (!local_addr.equals(members.elementAt(i)))                  return (Address) members.elementAt(i);         }      }      return ret;   }   private void handleViewChange(View v)   {      Address old_coord;      Vector new_members = v.getMembers();      boolean send_up_null_state_rsp = false;      synchronized (members)      {         old_coord = (Address) (members.size() > 0 ? members.firstElement() : null);         members.clear();         members.addAll(new_members);         // this handles the case where a coord dies during a state transfer; prevents clients from hanging forever         // Note this only takes a coordinator crash into account, a getState(target, timeout), where target is not         // null is not handled ! (Usually we get the state from the coordinator)         // http://jira.jboss.com/jira/browse/JGRP-148         if (waiting_for_state_response && old_coord != null && !members.contains(old_coord))         {            send_up_null_state_rsp = true;         }      }      if (send_up_null_state_rsp)      {         log.warn("discovered that the state provider (" + old_coord               + ") crashed; will return null state to application");      }   }   private void handleStateReq(StateHeader hdr)   {      Object sender = hdr.sender;      if (sender == null)      {         if (log.isErrorEnabled())            log.error("sender is null !");         return;      }      String id = hdr.state_id;      synchronized (state_requesters)      {         boolean empty = state_requesters.isEmpty();         Set requesters = (Set) state_requesters.get(id);         if (requesters == null)         {            requesters = new HashSet();         }         requesters.add(sender);         state_requesters.put(id, requesters);         if (!isDigestNeeded())         {            respondToStateRequester();         }         else if (empty)         {            digest = null;            if (log.isDebugEnabled())               log.debug("passing down GET_DIGEST_STATE");            passDown(new Event(Event.GET_DIGEST_STATE));         }      }   }   void handleStateRsp(StateHeader hdr)   {      Digest tmp_digest = hdr.my_digest;      waiting_for_state_response = false;      if (isDigestNeeded())      {         if (tmp_digest == null)         {            if (warn)               log.warn("digest received from " + hdr.sender + " is null, skipping setting digest !");         }         else         {            // set the digest (e.g.in NAKACK)            passDown(new Event(Event.SET_DIGEST, tmp_digest));         }      }            connectToStateProvider(hdr);   }   void removeFromStateRequesters(Address address, String state_id)   {      synchronized (state_requesters)      {         Set requesters = (Set) state_requesters.get(state_id);         if (requesters != null && !requesters.isEmpty())         {            boolean removed = requesters.remove(address);            if (log.isDebugEnabled())            {               log.debug("Attempted to clear " + address + " from requesters, successful=" + removed);            }            if (requesters.isEmpty())            {               state_requesters.remove(state_id);               if (log.isDebugEnabled())               {                  log.debug("Cleared all requesters for state " + state_id + ",state_requesters=" + state_requesters);               }            }         }              }   }   private void connectToStateProvider(StateHeader hdr)   {            IpAddress address = hdr.bind_addr;      String tmp_state_id = hdr.getStateId();      StreamingInputStreamWrapper wrapper = null;      StateTransferInfo sti = null;      Socket socket = new Socket();      try      {         socket.bind(new InetSocketAddress(bind_addr, 0));         int bufferSize = socket.getReceiveBufferSize();         socket.setReceiveBufferSize(socket_buffer_size);         if (log.isDebugEnabled())            log.debug("Connecting to state provider " + address.getIpAddress() + ":" + address.getPort()                  + ", original buffer size was " + bufferSize + " and was reset to " + socket.getReceiveBufferSize());         socket.connect(new InetSocketAddress(address.getIpAddress(), address.getPort()));         if (log.isDebugEnabled())            log.debug("Connected to state provider, my end of the socket is " + socket.getLocalAddress() + ":"                  + socket.getLocalPort() + " passing inputstream up...");         //write out our state_id and address so state provider can clear this request         ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());         out.writeObject(tmp_state_id);         out.writeObject(local_addr);         wrapper = new StreamingInputStreamWrapper(socket);         sti = new StateTransferInfo(hdr.sender, wrapper, tmp_state_id);                        }      catch (IOException e)      {         if (warn)         {            log.warn("State reader socket thread spawned abnormaly", e);         }                  //pass null stream up so that JChannel.getState() returns false          InputStream is = null;         sti = new StateTransferInfo(hdr.sender, is, tmp_state_id);               }      finally      {         if (!socket.isConnected())         {            if (warn)               log.warn("Could not connect to state provider. Closing socket...");            try            {               if (wrapper != null)               {                  wrapper.close();               }               else               {                  socket.close();               }            }            catch (IOException e)            {            }            //since socket did not connect properly we have to            //clear our entry in state providers hashmap "manually"            Message m = new Message(hdr.sender);            StateHeader mhdr = new StateHeader(StateHeader.STATE_REMOVE_REQUESTER, local_addr, tmp_state_id);            m.putHeader(NAME, mhdr);            passDown(new Event(Event.MSG, m));         }         passStreamUp(sti);      }   }   private void passStreamUp(final StateTransferInfo sti)   {      Runnable readingThread = new Runnable()      {         public void run()         {            passUp(new Event(Event.STATE_TRANSFER_INPUTSTREAM, sti));         }      };      if (use_reading_thread)      {         new Thread(Util.getGlobalThreadGroup(), readingThread, "STREAMING_STATE_TRANSFER.reader").start();      }      else      {         readingThread.run();      }   }   /*    * ------------------------ End of Private Methods    * ------------------------------    */   private class StateProviderThreadSpawner implements Runnable   {      PooledExecutor pool;      ServerSocket serverSocket;      IpAddress address;            Thread runner;      volatile boolean running = true;      public StateProviderThreadSpawner(PooledExecutor pool, ServerSocket stateServingSocket)      {         super();         this.pool = pool;         this.serverSocket = stateServingSocket;         this.address = new IpAddress(STREAMING_STATE_TRANSFER.this.bind_addr, serverSocket.getLocalPort());      }      public void run()      {         runner = Thread.currentThread();         for (; running;)         {            try            {               if (log.isDebugEnabled())                  log.debug("StateProviderThreadSpawner listening at " + getServerSocketAddress() + "...");               if (log.isDebugEnabled())                  log.debug("Pool has " + pool.getPoolSize() + " active threads");               final Socket socket = serverSocket.accept();               pool.execute(new Runnable()               {                  public void run()                  {                     if (log.isDebugEnabled())                        log.debug("Accepted request for state transfer from " + socket.getInetAddress() + ":"                              + socket.getPort() + " handing of to PooledExecutor thread");                     new StateProviderHandler().process(socket);                  }               });            }            catch (IOException e)            {               if (warn)               {                  //we get this exception when we close server socket                  //exclude that case                  if (serverSocket != null && !serverSocket.isClosed())                  {                     log.warn("Spawning socket from server socket finished abnormaly", e);                  }               }            }            catch (InterruptedException e)            {               // should not happen            }         }      }      public IpAddress getServerSocketAddress()      {         return address;      }      public void stop()      {         running = false;         try         {            if (serverSocket != null && !serverSocket.isClosed())            {               serverSocket.close();            }         }         catch (IOException e)         {         }         finally         {            if (log.isDebugEnabled())               log.debug("Waiting for StateProviderThreadSpawner to die ... ");            if (runner != null)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -