clusterstream.java
来自「RESIN 3.2 最新源码」· Java 代码 · 共 519 行
JAVA
519 行
/* * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved * * This file is part of Resin(R) Open Source * * Each copy or derived work must preserve the copyright notice and this * notice unmodified. * * Resin Open Source is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * Resin Open Source is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty * of NON-INFRINGEMENT. See the GNU General Public License for more * details. * * You should have received a copy of the GNU General Public License * along with Resin Open Source; if not, write to the * * Free Software Foundation, Inc. * 59 Temple Place, Suite 330 * Boston, MA 02111-1307 USA * * @author Scott Ferguson */package com.caucho.server.cluster;import com.caucho.hessian.io.*;import com.caucho.server.hmux.*;import com.caucho.util.*;import com.caucho.vfs.*;import java.io.*;import java.util.logging.*;/** * Defines a connection to the client. */public class ClusterStream { private static final L10N L = new L10N(ClusterStream.class); private static final Logger log = Logger.getLogger(ClusterStream.class.getName()); private ServerPool _pool; private ReadStream _is; private WriteStream _os; private Hessian2StreamingInput _in; private Hessian2StreamingOutput _out; private long _freeTime; private String _debugId; ClusterStream(ServerPool pool, int count, ReadStream is, WriteStream os) { _pool = pool; _is = is; _os = os; _debugId = "[" + pool.getDebugId() + ":" + count + "]"; } /** * Returns the owning pool */ public ServerPool getPool() { return _pool; } /** * Returns the input stream. */ public ReadStream getReadStream() { _freeTime = 0; return _is; } /** * Returns the write stream. */ public WriteStream getWriteStream() { _freeTime = 0; return _os; } /** * Returns the hessian input stream */ public Hessian2StreamingInput getHessianInputStream() { if (_in == null) _in = new Hessian2StreamingInput(_is); return _in; } /** * Returns the hessian output stream */ public Hessian2StreamingOutput getHessianOutputStream() { if (_out == null) _out = new Hessian2StreamingOutput(_os); return _out; } /** * Returns the free time, i.e. the time the connection was last idle. */ public long getFreeTime() { return _freeTime; } /** * Sets the free time. */ public void setFreeTime(long freeTime) { _freeTime = freeTime; } /** * Returns true if nearing end of free time. */ public boolean isLongIdle() { long now = Alarm.getCurrentTime(); return (_pool.getLoadBalanceIdleTime() < now - _freeTime + 2000L); } public boolean message(String to, String from, Serializable query) throws IOException { WriteStream out = getWriteStream(); out.write(HmuxRequest.HMTP_MESSAGE); out.write(0); out.write(0); writeString(to); writeString(from); Hessian2StreamingOutput hOut = getHessianOutputStream(); hOut.writeObject(query); out.write(HmuxRequest.HMUX_QUIT); out.flush(); return true; } public boolean queryGet(long id, String to, String from, Serializable query) throws IOException { WriteStream out = getWriteStream(); out.write(HmuxRequest.HMTP_QUERY_GET); out.write(0); out.write(8); writeLong(id); writeString(to); writeString(from); Hessian2StreamingOutput hOut = getHessianOutputStream(); hOut.writeObject(query); out.write(HmuxRequest.HMUX_QUIT); out.flush(); return true; } public boolean querySet(long id, String to, String from, Serializable query) throws IOException { WriteStream out = getWriteStream(); out.write(HmuxRequest.HMTP_QUERY_SET); out.write(0); out.write(8); writeLong(id); writeString(to); writeString(from); Hessian2StreamingOutput hOut = getHessianOutputStream(); hOut.writeObject(query); out.write(HmuxRequest.HMUX_QUIT); out.flush(); return true; } public boolean queryResult(long id, String to, String from, Serializable query) throws IOException { WriteStream out = getWriteStream(); out.write(HmuxRequest.HMTP_QUERY_RESULT); out.write(0); out.write(8); writeLong(id); writeString(to); writeString(from); Hessian2StreamingOutput hOut = getHessianOutputStream(); hOut.writeObject(query); out.write(HmuxRequest.HMUX_QUIT); out.flush(); return true; } public Serializable readQueryResult(long id) throws IOException { ReadStream in = getReadStream(); int code = in.read(); if (code != HmuxRequest.HMTP_QUERY_RESULT) throw new IOException(L.l("expected query result at '" + (char) code + "' " + code)); int len = (in.read() << 8) + in.read(); long resultId = readLongValue(); code = in.read(); if (code != HmuxRequest.HMUX_STRING) throw new IOException(L.l("expected string at '" + (char) code + "' " + code)); String to = readStringValue(); code = in.read(); if (code != HmuxRequest.HMUX_STRING) throw new IOException(L.l("expected string at '" + (char) code + "' " + code)); String from = readStringValue(); Hessian2StreamingInput hIn = getHessianInputStream(); Serializable result = (Serializable) hIn.readObject(); return result; } /** * Returns the debug id. */ public String getDebugId() { return _debugId; } /** * Clears the recycled connections. */ public void clearRecycle() { _pool.clearRecycle(); } /** * Recycles. */ public void free() { // #2369 - the load balancer might set its own view of the free // time if (_is != null && _freeTime <= 0) _freeTime = _is.getReadTime(); _pool.free(this); } public void close() { if (_is != null) _pool.close(this); closeImpl(); } /** * closes the stream. */ void closeImpl() { ReadStream is = _is; _is = null; WriteStream os = _os; _os = null; try { if (is != null) is.close(); } catch (Throwable e) { log.log(Level.FINER, e.toString(), e); } try { if (os != null) os.close(); } catch (Throwable e) { log.log(Level.FINER, e.toString(), e); } } /** * Writes a hmux exit to the target. */ public void writeExit() throws IOException { _os.write(HmuxRequest.HMUX_EXIT); _os.flush(); } /** * Writes a hmux quit to the target. */ public void writeQuit() throws IOException { _os.write(HmuxRequest.HMUX_QUIT); _os.flush(); } /** * Writes a hmux yield to the target, used for unidirectional messages */ public void writeYield() throws IOException { _os.write(HmuxRequest.HMUX_YIELD); _os.flush(); } /** * Writes a hmux int to the target. */ public void writeInt(int code, int value) throws IOException { WriteStream os = _os; os.write(code); os.write(0); os.write(4); os.write(value >> 24); os.write(value >> 16); os.write(value >> 8); os.write(value); } public void writeLong(long id) throws IOException { WriteStream os = _os; os.write((int) (id >> 56)); os.write((int) (id >> 48)); os.write((int) (id >> 40)); os.write((int) (id >> 32)); os.write((int) (id >> 24)); os.write((int) (id >> 16)); os.write((int) (id >> 8)); os.write((int) id); } public void writeLong(int code, long id) throws IOException { WriteStream os = _os; os.write(code); os.write(0); os.write(8); os.write((int) (id >> 56)); os.write((int) (id >> 48)); os.write((int) (id >> 40)); os.write((int) (id >> 32)); os.write((int) (id >> 24)); os.write((int) (id >> 16)); os.write((int) (id >> 8)); os.write((int) id); } public void writeString(String s) throws IOException { int len = s.length(); WriteStream os = _os; os.write(HmuxRequest.HMUX_STRING); os.write(len >> 8); os.write(len); os.print(s); } public void writeString(int code, String s) throws IOException { int len = s.length(); WriteStream os = _os; os.write(code); os.write(len >> 8); os.write(len); os.print(s); } /** * Writes a hmux string to the target. */ public void writeBinary(int code, byte []value) throws IOException { WriteStream os = _os; if (value == null) { os.write(code); os.write(0); os.write(0); return; } int len = value.length; os.write(code); os.write(len >> 8); os.write(len); os.write(value, 0, len); } /** * Writes a hmux string to the target. */ public void writeBinary(int code, byte []value, int offset, int len) throws IOException { WriteStream os = _os; os.write(code); os.write(len >> 8); os.write(len); os.write(value, offset, len); } public long readLongValue() throws IOException { ReadStream is = _is; return (((long) is.read() << 56) + ((long) is.read() << 48) + ((long) is.read() << 40) + ((long) is.read() << 32) + ((long) is.read() << 24) + ((long) is.read() << 16) + ((long) is.read() << 8) + ((long) is.read() << 0)); } public String readStringValue() throws IOException { ReadStream is = _is; int len = (is.read() << 8) + is.read(); char []data = new char[len]; for (int i = 0; i < len; i++) data[i] = (char) is.read(); return new String(data); } @Override public String toString() { return getClass().getSimpleName() + "[" + _debugId + "]"; }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?