📄 streaming_state_transfer.java
字号:
{ try { runner.join(3000); } catch (InterruptedException ignored) { } } if (log.isDebugEnabled()) log.debug("Shutting the thread pool down... "); pool.shutdownNow(); try { //TODO use some system wide timeout eventually pool.awaitTerminationAfterShutdown(5000); } catch (InterruptedException ignored) { } } if (log.isDebugEnabled()) log.debug("Thread pool is shutdown. All pool threads are cleaned up."); } } private class StateProviderHandler { public void process(Socket socket) { StreamingOutputStreamWrapper wrapper = null; ObjectInputStream ois = null; try { int bufferSize = socket.getSendBufferSize(); socket.setSendBufferSize(socket_buffer_size); if (log.isDebugEnabled()) log.debug("Running on " + Thread.currentThread() + ". Accepted request for state transfer from " + socket.getInetAddress() + ":" + socket.getPort() + ", original buffer size was " + bufferSize + " and was reset to " + socket.getSendBufferSize() + ", passing outputstream up... "); //read out state requesters state_id and address and clear this request ois = new ObjectInputStream(socket.getInputStream()); String state_id = (String) ois.readObject(); Address stateRequester = (Address) ois.readObject(); removeFromStateRequesters(stateRequester, state_id); wrapper = new StreamingOutputStreamWrapper(socket); StateTransferInfo sti = new StateTransferInfo(stateRequester, wrapper, state_id); passUp(new Event(Event.STATE_TRANSFER_OUTPUTSTREAM, sti)); } catch (IOException e) { if (warn) { log.warn("State writer socket thread spawned abnormaly", e); } } catch (ClassNotFoundException e) { //thrown by ois.readObject() //should never happen since String/Address are core classes } finally { if (socket != null && !socket.isConnected()) { if (warn) log.warn("Accepted request for state transfer but socket " + socket + " not connected properly. Closing it..."); try { if (wrapper != null) { wrapper.close(); } else { socket.close(); } } catch (IOException e) { } } } } } private class StreamingInputStreamWrapper extends InputStream { private Socket inputStreamOwner; private InputStream delegate; private Channel channelOwner; public StreamingInputStreamWrapper(Socket inputStreamOwner) throws IOException { super(); this.inputStreamOwner = inputStreamOwner; this.delegate = new BufferedInputStream(inputStreamOwner.getInputStream()); this.channelOwner = stack.getChannel(); } public int available() throws IOException { return delegate.available(); } public void close() throws IOException { if (log.isDebugEnabled()) { log.debug("State reader " + inputStreamOwner + " is closing the socket "); } if (channelOwner != null && channelOwner.isConnected()) { channelOwner.down(new Event(Event.STATE_TRANSFER_INPUTSTREAM_CLOSED)); } inputStreamOwner.close(); } public synchronized void mark(int readlimit) { delegate.mark(readlimit); } public boolean markSupported() { return delegate.markSupported(); } public int read() throws IOException { return delegate.read(); } public int read(byte[] b, int off, int len) throws IOException { return delegate.read(b, off, len); } public int read(byte[] b) throws IOException { return delegate.read(b); } public synchronized void reset() throws IOException { delegate.reset(); } public long skip(long n) throws IOException { return delegate.skip(n); } } private class StreamingOutputStreamWrapper extends OutputStream { private Socket outputStreamOwner; private OutputStream delegate; private long bytesWrittenCounter = 0; private Channel channelOwner; public StreamingOutputStreamWrapper(Socket outputStreamOwner) throws IOException { super(); this.outputStreamOwner = outputStreamOwner; this.delegate = new BufferedOutputStream(outputStreamOwner.getOutputStream()); this.channelOwner = stack.getChannel(); } public void close() throws IOException { if (log.isDebugEnabled()) { log.debug("State writer " + outputStreamOwner + " is closing the socket "); } try { if (channelOwner != null && channelOwner.isConnected()) { channelOwner.down(new Event(Event.STATE_TRANSFER_OUTPUTSTREAM_CLOSED)); } outputStreamOwner.close(); } catch (IOException e) { throw e; } finally { if (stats) { synchronized (state_requesters) { num_bytes_sent += bytesWrittenCounter; avg_state_size = num_bytes_sent / (double) num_state_reqs; } } } } public void flush() throws IOException { delegate.flush(); } public void write(byte[] b, int off, int len) throws IOException { delegate.write(b, off, len); bytesWrittenCounter += len; } public void write(byte[] b) throws IOException { delegate.write(b); if (b != null) { bytesWrittenCounter += b.length; } } public void write(int b) throws IOException { delegate.write(b); bytesWrittenCounter += 1; } } public static class StateHeader extends Header implements Streamable { public static final byte STATE_REQ = 1; public static final byte STATE_RSP = 2; public static final byte STATE_REMOVE_REQUESTER = 3; long id = 0; // state transfer ID (to separate multiple state transfers at the same time) byte type = 0; Address sender; // sender of state STATE_REQ or STATE_RSP Digest my_digest = null; // digest of sender (if type is STATE_RSP) IpAddress bind_addr = null; String state_id = null; // for partial state transfer public StateHeader() { // for externalization } public StateHeader(byte type, Address sender, String state_id) { this.type = type; this.sender = sender; this.state_id = state_id; } public StateHeader(byte type, Address sender, long id, Digest digest) { this.type = type; this.sender = sender; this.id = id; this.my_digest = digest; } public StateHeader(byte type, Address sender, IpAddress bind_addr, Digest digest, String state_id) { this.type = type; this.sender = sender; this.my_digest = digest; this.bind_addr = bind_addr; this.state_id = state_id; } public int getType() { return type; } public Digest getDigest() { return my_digest; } public String getStateId() { return state_id; } public boolean equals(Object o) { StateHeader other; if (sender != null && o != null) { if (!(o instanceof StateHeader)) return false; other = (StateHeader) o; return sender.equals(other.sender) && id == other.id; } return false; } public int hashCode() { if (sender != null) return sender.hashCode() + (int) id; else return (int) id; } public String toString() { StringBuffer sb = new StringBuffer(); sb.append("type=").append(type2Str(type)); if (sender != null) sb.append(", sender=").append(sender).append(" id=").append(id); if (my_digest != null) sb.append(", digest=").append(my_digest); return sb.toString(); } static String type2Str(int t) { switch (t) { case STATE_REQ : return "STATE_REQ"; case STATE_RSP : return "STATE_RSP"; case STATE_REMOVE_REQUESTER : return "STATE_REMOVE_REQUESTER"; default : return "<unknown>"; } } public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(sender); out.writeLong(id); out.writeByte(type); out.writeObject(my_digest); out.writeObject(bind_addr); out.writeUTF(state_id); } public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { sender = (Address) in.readObject(); id = in.readLong(); type = in.readByte(); my_digest = (Digest) in.readObject(); bind_addr = (IpAddress) in.readObject(); state_id = in.readUTF(); } public void writeTo(DataOutputStream out) throws IOException { out.writeByte(type); out.writeLong(id); Util.writeAddress(sender, out); Util.writeStreamable(my_digest, out); Util.writeStreamable(bind_addr, out); Util.writeString(state_id, out); } public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException { type = in.readByte(); id = in.readLong(); sender = Util.readAddress(in); my_digest = (Digest) Util.readStreamable(Digest.class, in); bind_addr = (IpAddress) Util.readStreamable(IpAddress.class, in); state_id = Util.readString(in); } public long size() { long retval = Global.LONG_SIZE + Global.BYTE_SIZE; // id and type retval += Util.size(sender); retval += Global.BYTE_SIZE; // presence byte for my_digest if (my_digest != null) retval += my_digest.serializedSize(); retval += Global.BYTE_SIZE; // presence byte for state_id if (state_id != null) retval += state_id.length() + 2; return retval; } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -