⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 flush.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
   {      boolean successfulBlock = false;      blockok_promise.reset();            new Thread(Util.getGlobalThreadGroup(), new Runnable()      {         public void run()         {            passUp(new Event(Event.BLOCK));         }      }, "FLUSH block").start();            try      {         blockok_promise.getResultWithTimeout(btimeout);         successfulBlock = true;      }      catch (TimeoutException e)      {         log.warn("Blocking of channel using BLOCK event timed out after " + btimeout + " msec.");      }      return successfulBlock;   }   private boolean isCurrentFlushMessage(FlushHeader fh)   {      return fh.viewID == currentViewId();   }   private long currentViewId()   {      long viewId = -1;      synchronized (sharedLock)      {         ViewId view = currentView.getVid();         if (view != null)         {            viewId = view.getId();         }      }      return viewId;   }   private boolean onViewChange(View view)   {      boolean amINewCoordinator = false;      if (receivedFirstView)      {         receivedMoreThanOneView = true;      }      if (!receivedFirstView)      {         receivedFirstView = true;      }           synchronized (sharedLock)      {         suspected.retainAll(view.getMembers());         currentView = view;                  amINewCoordinator = flushCoordinator != null && !view.getMembers().contains(flushCoordinator)               && localAddress.equals(view.getMembers().get(0));      }            //If coordinator leaves, its STOP FLUSH message will be discarded by      //other members at NAKACK layer. Remaining members will be hung, waiting      //for STOP_FLUSH message. If I am new coordinator I will complete the      //FLUSH and send STOP_FLUSH on flush callers behalf.      if (amINewCoordinator)      {         if (log.isDebugEnabled())            log.debug("Coordinator left, " + localAddress + " will complete flush");         onResume();      }            if (log.isDebugEnabled())         log.debug("Installing view at  " + localAddress + " view is " + view);            return receivedFirstView && !receivedMoreThanOneView;   }   private void onStopFlush()   {      if (stats)      {         long stopFlushTime = System.currentTimeMillis();         totalTimeInFlush += (stopFlushTime - startFlushTime);         if (numberOfFlushes > 0)         {            averageFlushDuration = totalTimeInFlush / (double)numberOfFlushes;         }      }            if (!shouldReturnLastFromFlush)      {                  if (log.isDebugEnabled())            log.debug("At " + localAddress + " unblocking FLUSH.down() and sending UNBLOCK up");                           synchronized (blockMutex)         {            isBlockState = false;            blockMutex.notifyAll();         }         passUp(new Event(Event.UNBLOCK));      }                 //ack this STOP_FLUSH      Message msg = new Message(null, localAddress, null);      msg.putHeader(getName(), new FlushHeader(FlushHeader.STOP_FLUSH_OK,currentViewId()));            passDown(new Event(Event.MSG, msg));            if (log.isDebugEnabled())         log.debug("Received STOP_FLUSH and sent STOP_FLUSH_OK from " + localAddress);    }   private void onSuspend(View view)   {      Message msg = null;      Collection participantsInFlush = null;      synchronized (sharedLock)      {         //start FLUSH only on group members that we need to flush         if (view != null)         {            participantsInFlush = new ArrayList(view.getMembers());            participantsInFlush.retainAll(currentView.getMembers());         }         else         {            participantsInFlush = new ArrayList(currentView.getMembers());         }         msg = new Message(null, localAddress, null);         msg.putHeader(getName(), new FlushHeader(FlushHeader.START_FLUSH, currentViewId(), participantsInFlush));      }      if (participantsInFlush.isEmpty())      {         passUp(new Event(Event.SUSPEND_OK));         passDown(new Event(Event.SUSPEND_OK));      }      else      {         passDown(new Event(Event.MSG, msg));         if (log.isDebugEnabled())            log.debug("Received SUSPEND at " + localAddress + ", sent START_FLUSH to " + participantsInFlush);      }   }   private void onResume()   {	  long viewID = currentViewId();	      Message msg = new Message(null, localAddress, null);      msg.putHeader(getName(), new FlushHeader(FlushHeader.STOP_FLUSH,viewID));      passDown(new Event(Event.MSG, msg));      if (log.isDebugEnabled())         log.debug("Received RESUME at " + localAddress + ", sent STOP_FLUSH to all");   }   private void onStartFlush(Address flushStarter, FlushHeader fh)   {            if (stats)      {         startFlushTime = System.currentTimeMillis();         numberOfFlushes += 1;      }                  synchronized (sharedLock)      {                  flushCoordinator = flushStarter;         flushMembers.clear();         if(fh.flushParticipants!=null)         {            flushMembers.addAll(fh.flushParticipants);         }                 flushMembers.removeAll(suspected);      }      Message msg = new Message(null, localAddress, null);      msg.putHeader(getName(), new FlushHeader(FlushHeader.FLUSH_OK, fh.viewID));      passDown(new Event(Event.MSG, msg));      if (log.isDebugEnabled())         log.debug("Received START_FLUSH at " + localAddress + " responded with FLUSH_OK");   }   private void onFlushOk(Address address, long viewID)   {      boolean flushOkCompleted = false;      Message m = null;      synchronized (sharedLock)      {         flushOkSet.add(address);         flushOkCompleted = flushOkSet.containsAll(flushMembers);         if (flushOkCompleted)         {            m = new Message(flushCoordinator, localAddress, null);         }      }      if (log.isDebugEnabled())         log.debug("At " + localAddress + " FLUSH_OK from " + address + ",completed "                + flushOkCompleted + ",  flushOkSet " + flushOkSet.toString());      if (flushOkCompleted)      {         isBlockState = true;         m.putHeader(getName(), new FlushHeader(FlushHeader.FLUSH_COMPLETED, viewID));         passDown(new Event(Event.MSG, m));         if (log.isDebugEnabled())            log.debug(localAddress + " is blocking FLUSH.down(). Sent FLUSH_COMPLETED message to " + flushCoordinator);      }   }      private void onStopFlushOk(Address address, long viewID)   {      boolean stopFlushOkCompleted = false;      synchronized (sharedLock)      {         stopFlushOkSet.add(address);         TreeSet membersCopy = new TreeSet(currentView.getMembers());         membersCopy.removeAll(suspected);         stopFlushOkCompleted = stopFlushOkSet.containsAll(membersCopy);      }      if (log.isDebugEnabled())         log.debug("At " + localAddress + " STOP_FLUSH_OK from " + address + ",completed " + stopFlushOkCompleted               + ",  stopFlushOkSet " + stopFlushOkSet.toString());      if (stopFlushOkCompleted)      {         if (shouldReturnLastFromFlush)         {                        if (log.isDebugEnabled())               log.debug("At " + localAddress + " unblocking FLUSH.down() and sending UNBLOCK up");                                   synchronized (blockMutex)            {               isBlockState = false;               blockMutex.notifyAll();            }                        passUp(new Event(Event.UNBLOCK));         }                  synchronized (sharedLock)         {            flushCompletedSet.clear();            flushOkSet.clear();               stopFlushOkSet.clear();            flushMembers.clear();            suspected.clear();            flushCoordinator = null;         }         shouldReturnLastFromFlush = false;      }        }   private void onFlushCompleted(Address address)   {      boolean flushCompleted = false;      synchronized (sharedLock)      {         flushCompletedSet.add(address);         flushCompleted = flushCompletedSet.containsAll(flushMembers);      }      if (log.isDebugEnabled())         log.debug("At " + localAddress + " FLUSH_COMPLETED from " + address                + ",completed " + flushCompleted + ",flushCompleted "               + flushCompletedSet.toString());      if (flushCompleted)      {         //needed for jmx operation startFlush(timeout);         flush_promise.setResult(Boolean.TRUE);         passUp(new Event(Event.SUSPEND_OK));         passDown(new Event(Event.SUSPEND_OK));         if (log.isDebugEnabled())            log.debug("All FLUSH_COMPLETED received at " + localAddress + " sent SUSPEND_OK down/up");      }   }   private void onSuspect(Address address)   {      boolean flushOkCompleted = false;      Message m = null;      long viewID = 0;      synchronized (sharedLock)      {         suspected.add(address);         flushMembers.removeAll(suspected);         viewID = currentViewId();         flushOkCompleted = !flushOkSet.isEmpty() && flushOkSet.containsAll(flushMembers);         if (flushOkCompleted)         {            m = new Message(flushCoordinator, localAddress, null);         }      }      if (flushOkCompleted)      {         m.putHeader(getName(), new FlushHeader(FlushHeader.FLUSH_COMPLETED, viewID));         passDown(new Event(Event.MSG, m));         if (log.isDebugEnabled())            log.debug(localAddress + " sent FLUSH_COMPLETED message to " + flushCoordinator);      }      if (log.isDebugEnabled())         log.debug("Suspect is " + address + ",completed " + flushOkCompleted + ",  flushOkSet " + flushOkSet               + " flushMembers " + flushMembers);   }   private boolean isFlushRunning()   {      return isBlockState;   }   public static class FlushHeader extends Header implements Streamable   {      public static final byte START_FLUSH = 0;      public static final byte FLUSH_OK = 1;      public static final byte STOP_FLUSH = 2;      public static final byte FLUSH_COMPLETED = 3;            public static final byte STOP_FLUSH_OK = 4;      byte type;      long viewID;      Collection flushParticipants;      public FlushHeader()      {         this(START_FLUSH,0);      } // used for externalization           public FlushHeader(byte type, long viewID)      {         this(type, viewID, null);      }      public FlushHeader(byte type, long viewID, Collection flushView)      {         this.type = type;         this.viewID = viewID;         this.flushParticipants = flushView;      }      public String toString()      {         switch (type)         {            case START_FLUSH :               return "FLUSH[type=START_FLUSH,viewId=" + viewID + ",members=" + flushParticipants + "]";            case FLUSH_OK :               return "FLUSH[type=FLUSH_OK,viewId=" + viewID + "]";            case STOP_FLUSH :               return "FLUSH[type=STOP_FLUSH,viewId=" + viewID + "]";            case STOP_FLUSH_OK :               return "FLUSH[type=STOP_FLUSH_OK,viewId=" + viewID + "]";                              case FLUSH_COMPLETED :               return "FLUSH[type=FLUSH_COMPLETED,viewId=" + viewID + "]";            default :               return "[FLUSH: unknown type (" + type + ")]";         }      }      public void writeExternal(ObjectOutput out) throws IOException      {         out.writeByte(type);         out.writeLong(viewID);         out.writeObject(flushParticipants);      }      public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException      {         type = in.readByte();         viewID = in.readLong();         flushParticipants = (Collection) in.readObject();      }      public void writeTo(DataOutputStream out) throws IOException      {         out.writeByte(type);         out.writeLong(viewID);         if (flushParticipants != null && !flushParticipants.isEmpty())         {            out.writeShort(flushParticipants.size());            for (Iterator iter = flushParticipants.iterator(); iter.hasNext();)            {               Address address = (Address) iter.next();               Util.writeAddress(address, out);            }         }         else         {            out.writeShort(0);         }      }      public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException      {         type = in.readByte();         viewID = in.readLong();         int flushParticipantsSize = in.readShort();         if (flushParticipantsSize > 0)         {            flushParticipants = new ArrayList(flushParticipantsSize);            for (int i = 0; i < flushParticipantsSize; i++)            {               flushParticipants.add(Util.readAddress(in));            }         }      }   }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -