⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 streaming_state_transfer.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
            {               try               {                  runner.join(3000);               }               catch (InterruptedException ignored)               {               }            }            if (log.isDebugEnabled())               log.debug("Shutting the thread pool down... ");            pool.shutdownNow();            try            {               //TODO use some system wide timeout eventually               pool.awaitTerminationAfterShutdown(5000);            }            catch (InterruptedException ignored)            {            }         }         if (log.isDebugEnabled())            log.debug("Thread pool is shutdown. All pool threads are cleaned up.");      }   }   private class StateProviderHandler   {      public void process(Socket socket)      {         StreamingOutputStreamWrapper wrapper = null;         ObjectInputStream ois = null;         try         {            int bufferSize = socket.getSendBufferSize();            socket.setSendBufferSize(socket_buffer_size);            if (log.isDebugEnabled())               log.debug("Running on " + Thread.currentThread() + ". Accepted request for state transfer from "                     + socket.getInetAddress() + ":" + socket.getPort() + ", original buffer size was " + bufferSize                     + " and was reset to " + socket.getSendBufferSize() + ", passing outputstream up... ");            //read out state requesters state_id and address and clear this request            ois = new ObjectInputStream(socket.getInputStream());            String state_id = (String) ois.readObject();            Address stateRequester = (Address) ois.readObject();            removeFromStateRequesters(stateRequester, state_id);            wrapper = new StreamingOutputStreamWrapper(socket);            StateTransferInfo sti = new StateTransferInfo(stateRequester, wrapper, state_id);            passUp(new Event(Event.STATE_TRANSFER_OUTPUTSTREAM, sti));         }         catch (IOException e)         {            if (warn)            {               log.warn("State writer socket thread spawned abnormaly", e);            }         }         catch (ClassNotFoundException e)         {            //thrown by ois.readObject()            //should never happen since String/Address are core classes         }         finally         {            if (socket != null && !socket.isConnected())            {               if (warn)                  log.warn("Accepted request for state transfer but socket " + socket                        + " not connected properly. Closing it...");               try               {                  if (wrapper != null)                  {                     wrapper.close();                  }                  else                  {                     socket.close();                  }               }               catch (IOException e)               {               }            }         }      }   }   private class StreamingInputStreamWrapper extends InputStream   {      private Socket inputStreamOwner;      private InputStream delegate;      private Channel channelOwner;      public StreamingInputStreamWrapper(Socket inputStreamOwner) throws IOException      {         super();         this.inputStreamOwner = inputStreamOwner;         this.delegate = new BufferedInputStream(inputStreamOwner.getInputStream());         this.channelOwner = stack.getChannel();      }      public int available() throws IOException      {         return delegate.available();      }      public void close() throws IOException      {         if (log.isDebugEnabled())         {            log.debug("State reader " + inputStreamOwner + " is closing the socket ");         }         if (channelOwner != null && channelOwner.isConnected())         {            channelOwner.down(new Event(Event.STATE_TRANSFER_INPUTSTREAM_CLOSED));         }         inputStreamOwner.close();      }      public synchronized void mark(int readlimit)      {         delegate.mark(readlimit);      }      public boolean markSupported()      {         return delegate.markSupported();      }      public int read() throws IOException      {         return delegate.read();      }      public int read(byte[] b, int off, int len) throws IOException      {         return delegate.read(b, off, len);      }      public int read(byte[] b) throws IOException      {         return delegate.read(b);      }      public synchronized void reset() throws IOException      {         delegate.reset();      }      public long skip(long n) throws IOException      {         return delegate.skip(n);      }   }   private class StreamingOutputStreamWrapper extends OutputStream   {      private Socket outputStreamOwner;      private OutputStream delegate;      private long bytesWrittenCounter = 0;            private Channel channelOwner;      public StreamingOutputStreamWrapper(Socket outputStreamOwner) throws IOException      {         super();         this.outputStreamOwner = outputStreamOwner;         this.delegate = new BufferedOutputStream(outputStreamOwner.getOutputStream());         this.channelOwner = stack.getChannel();      }      public void close() throws IOException      {         if (log.isDebugEnabled())         {            log.debug("State writer " + outputStreamOwner + " is closing the socket ");         }         try         {            if (channelOwner != null && channelOwner.isConnected())            {               channelOwner.down(new Event(Event.STATE_TRANSFER_OUTPUTSTREAM_CLOSED));            }                       outputStreamOwner.close();         }         catch (IOException e)         {            throw e;         }         finally         {            if (stats)            {               synchronized (state_requesters)               {                  num_bytes_sent += bytesWrittenCounter;                  avg_state_size = num_bytes_sent / (double) num_state_reqs;               }            }         }      }      public void flush() throws IOException      {         delegate.flush();      }      public void write(byte[] b, int off, int len) throws IOException      {         delegate.write(b, off, len);         bytesWrittenCounter += len;      }      public void write(byte[] b) throws IOException      {         delegate.write(b);         if (b != null)         {            bytesWrittenCounter += b.length;         }      }      public void write(int b) throws IOException      {         delegate.write(b);         bytesWrittenCounter += 1;      }   }   public static class StateHeader extends Header implements Streamable   {      public static final byte STATE_REQ = 1;      public static final byte STATE_RSP = 2;      public static final byte STATE_REMOVE_REQUESTER = 3;      long id = 0; // state transfer ID (to separate multiple state transfers at the same time)      byte type = 0;      Address sender; // sender of state STATE_REQ or STATE_RSP      Digest my_digest = null; // digest of sender (if type is STATE_RSP)      IpAddress bind_addr = null;      String state_id = null; // for partial state transfer      public StateHeader()      { // for externalization      }      public StateHeader(byte type, Address sender, String state_id)      {         this.type = type;         this.sender = sender;         this.state_id = state_id;      }      public StateHeader(byte type, Address sender, long id, Digest digest)      {         this.type = type;         this.sender = sender;         this.id = id;         this.my_digest = digest;      }      public StateHeader(byte type, Address sender, IpAddress bind_addr, Digest digest, String state_id)      {         this.type = type;         this.sender = sender;         this.my_digest = digest;         this.bind_addr = bind_addr;         this.state_id = state_id;      }      public int getType()      {         return type;      }      public Digest getDigest()      {         return my_digest;      }      public String getStateId()      {         return state_id;      }      public boolean equals(Object o)      {         StateHeader other;         if (sender != null && o != null)         {            if (!(o instanceof StateHeader))               return false;            other = (StateHeader) o;            return sender.equals(other.sender) && id == other.id;         }         return false;      }      public int hashCode()      {         if (sender != null)            return sender.hashCode() + (int) id;         else            return (int) id;      }      public String toString()      {         StringBuffer sb = new StringBuffer();         sb.append("type=").append(type2Str(type));         if (sender != null)            sb.append(", sender=").append(sender).append(" id=").append(id);         if (my_digest != null)            sb.append(", digest=").append(my_digest);         return sb.toString();      }      static String type2Str(int t)      {         switch (t)         {            case STATE_REQ :               return "STATE_REQ";            case STATE_RSP :               return "STATE_RSP";            case STATE_REMOVE_REQUESTER :               return "STATE_REMOVE_REQUESTER";            default :               return "<unknown>";         }      }      public void writeExternal(ObjectOutput out) throws IOException      {         out.writeObject(sender);         out.writeLong(id);         out.writeByte(type);         out.writeObject(my_digest);         out.writeObject(bind_addr);         out.writeUTF(state_id);      }      public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException      {         sender = (Address) in.readObject();         id = in.readLong();         type = in.readByte();         my_digest = (Digest) in.readObject();         bind_addr = (IpAddress) in.readObject();         state_id = in.readUTF();      }      public void writeTo(DataOutputStream out) throws IOException      {         out.writeByte(type);         out.writeLong(id);         Util.writeAddress(sender, out);         Util.writeStreamable(my_digest, out);         Util.writeStreamable(bind_addr, out);         Util.writeString(state_id, out);      }      public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException      {         type = in.readByte();         id = in.readLong();         sender = Util.readAddress(in);         my_digest = (Digest) Util.readStreamable(Digest.class, in);         bind_addr = (IpAddress) Util.readStreamable(IpAddress.class, in);         state_id = Util.readString(in);      }      public long size()      {         long retval = Global.LONG_SIZE + Global.BYTE_SIZE; // id and type         retval += Util.size(sender);         retval += Global.BYTE_SIZE; // presence byte for my_digest         if (my_digest != null)            retval += my_digest.serializedSize();         retval += Global.BYTE_SIZE; // presence byte for state_id         if (state_id != null)            retval += state_id.length() + 2;         return retval;      }   }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -