⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 commandmessagehandler.java

📁 mysql集群
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/*
 * 	This program is free software; you can redistribute it and/or modify it under the terms of 
 * the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, 
 * or (at your option) any later version. 
 * 
 * 	This program is distributed in the hope that it will be useful, 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  
 * See the GNU General Public License for more details. 
 * 	You should have received a copy of the GNU General Public License along with this program; 
 * if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 */
package com.meidusa.amoeba.mysql.handler;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import com.meidusa.amoeba.mysql.net.CommandInfo;
import com.meidusa.amoeba.mysql.net.CommandListener;
import com.meidusa.amoeba.mysql.net.MysqlClientConnection;
import com.meidusa.amoeba.mysql.net.MysqlConnection;
import com.meidusa.amoeba.mysql.net.MysqlServerConnection;
import com.meidusa.amoeba.mysql.net.packet.EOFPacket;
import com.meidusa.amoeba.mysql.net.packet.ErrorPacket;
import com.meidusa.amoeba.mysql.net.packet.MysqlPacketBuffer;
import com.meidusa.amoeba.mysql.net.packet.OkPacket;
import com.meidusa.amoeba.mysql.net.packet.QueryCommandPacket;
import com.meidusa.amoeba.net.Connection;
import com.meidusa.amoeba.net.MessageHandler;
import com.meidusa.amoeba.net.Sessionable;
import com.meidusa.amoeba.net.packet.AbstractPacketBuffer;
import com.meidusa.amoeba.net.packet.Packet;
import com.meidusa.amoeba.net.packet.PacketBuffer;
import com.meidusa.amoeba.net.poolable.ObjectPool;
import com.meidusa.amoeba.net.poolable.PoolableObject;
import com.meidusa.amoeba.util.Reporter;
import com.meidusa.amoeba.util.StringUtil;

/**
 * 
 * @author <a href=mailto:piratebase@sina.com>Struct chen</a>
 *
 */
public abstract class CommandMessageHandler implements MessageHandler,Sessionable,Reporter.SubReporter {
	private static Logger logger = Logger.getLogger(CommandMessageHandler.class); 
	
	/**
	 * 表示服务器返回的数据包所表示当前会话状态
	 * @author <a href=mailto:piratebase@sina.com>Struct chen</a>
	 *
	 */
	static class SessionStatus{
		public static final int QUERY = 1;
		public static final int RESULT_HEAD  = 2;
		public static final int EOF_FIELDS  = 4;
		public static final int EOF_ROWS  = 8;
		public static final int OK  = 16;
		public static final int ERROR  = 32;
		public static final int COMPLETED  = 64;
	}
	
	static enum CommandStatus{
		ConnectionNotComplete,ConnectionCompleted,AllCompleted
	}
	
	/**
	 * 描述服务端连接的状态。包括当前命令的状态,当前连接的数据包
	 * @author <a href=mailto:piratebase@sina.com>Struct chen</a>
	 *
	 */
	static abstract class ConnectionStatuts{
		protected Connection conn;
		public ConnectionStatuts(Connection conn){
			this.conn = conn;
		}
		int statusCode;
		int packetIndex;
		List<byte[]> buffers;
		protected  byte commandType;
		
		public void clearBuffer(){
			if(buffers != null){
				buffers.clear();
			}
		}
		
		public void setCommandType(byte commandType){
			this.commandType = commandType;
			statusCode = 0;
			packetIndex = 0; 
		}
		/**
		 * 判断从服务器端返回得数据包是否表示当前请求的结束。
		 * @param buffer
		 * @return
		 */
		public boolean isCompleted(byte[] buffer) {
			if(this.commandType == QueryCommandPacket.COM_INIT_DB){
				boolean isCompleted = false; 
				if(MysqlPacketBuffer.isErrorPacket(buffer)){
					statusCode |= SessionStatus.ERROR;
					statusCode |= SessionStatus.COMPLETED;
					isCompleted = true;
				}else if(MysqlPacketBuffer.isOkPacket(buffer)){
					statusCode |= SessionStatus.OK;
					statusCode |= SessionStatus.COMPLETED;
					isCompleted = true;
				}
				return isCompleted;
			}else{
				return false;
			}
		}
	}
	
	protected static class CommandQueue{
		protected List<CommandInfo> sessionInitQueryQueue; //所有的从客户端发送过来的 command 队列
		protected CommandInfo currentCommand;//当前的query
		private final Lock lock = new ReentrantLock(false);
		protected Map<MysqlServerConnection,ConnectionStatuts> connStatusMap = new HashMap<MysqlServerConnection,ConnectionStatuts>();
		private boolean mainCommandExecuted;
		private MysqlClientConnection source;
		public CommandQueue(MysqlClientConnection source){
			this.source = source;
		}
		public boolean isMultiple(){
			return connStatusMap.size()>1;
		}
		
		public void clearAllBuffer(){
			Collection<ConnectionStatuts> collection = connStatusMap.values();
			for(ConnectionStatuts status : collection){
				status.clearBuffer();
			}
		}
		
		/**
		 * 尝试下一个命令,如果返回false,表示队列中没有命令了。
		 * 
		 * @return
		 */
		private boolean tryNextCommandTuple(){
			if(sessionInitQueryQueue == null){
				return false;
			}else{
				if(sessionInitQueryQueue.size()>0){
					currentCommand = sessionInitQueryQueue.get(0);
					if(logger.isDebugEnabled()){
						QueryCommandPacket command = new QueryCommandPacket();
						command.init(currentCommand.getBuffer(),source);
						logger.debug(command);
					}
					return true;
				}
				return false;
			}
		}
		
		/**
		 * 判断返回的数据是否是当前命令的结束包。
		 * 当前全部连接都全部返回以后则表示当前命令完全结束。
		 * @param conn
		 * @param buffer
		 * @return
		 */
		protected  CommandStatus checkResponseCompleted(Connection conn,byte[] buffer){
			boolean isCompleted = false;
			ConnectionStatuts connStatus = connStatusMap.get(conn);
			if(connStatus == null){
				logger.error("connection Status not Found, byffer="+StringUtil.dumpAsHex(buffer, buffer.length));
			}
			isCompleted = connStatus.isCompleted(buffer);
			connStatus.packetIndex ++;
			/**
			 * 如果是多个连接的,需要将数据缓存起来,等待命令全部完成以后,将数据进行组装,然后发送到客户端
			 * {@link #CommandMessageHandler.mergeMessageToClient}
			 */
			if(connStatus.buffers == null){
				connStatus.buffers = new ArrayList<byte[]>();
			}
			connStatus.buffers.add(buffer);
			if(isCompleted){
				lock.lock();
				try{
					if(currentCommand.getCompletedCount().incrementAndGet() == connStatusMap.size()){
						if(logger.isDebugEnabled()){
							Packet packet = null;
							if(MysqlPacketBuffer.isErrorPacket(buffer)){
								packet = new ErrorPacket();
							}else if(MysqlPacketBuffer.isEofPacket(buffer)){
								packet = new EOFPacket();
							}else if(MysqlPacketBuffer.isOkPacket(buffer)){
								packet = new OkPacket();
							}
							packet.init(buffer,conn);
							logger.debug("returned Packet:"+packet);
						}
						return CommandStatus.AllCompleted;
						
					}else{
						return CommandStatus.ConnectionCompleted;
					}
				}finally{
					lock.unlock();
				}
			}else{
				return CommandStatus.ConnectionNotComplete;
			}
		}
		
		/**
		 * 是否append 成功,如果成功则表示以前曾经堆积过,需要队列来保证发送命令的循序。
		 * 如果队列中没有堆积的命令,则返回false.
		 * 否则返回true, 则表示可直接发送命令
		 * @param commandInfo
		 * @param force 强制append 命令,返回为true
		 * @return
		 */
		public synchronized  boolean appendCommand(CommandInfo commandInfo,boolean force){
			if(force){
				if(sessionInitQueryQueue == null){
					sessionInitQueryQueue = Collections.synchronizedList(new ArrayList<CommandInfo>());
				}
				if(!sessionInitQueryQueue.contains(commandInfo)){
					sessionInitQueryQueue.add(commandInfo);
				}
				return true;
			}else{
				if(sessionInitQueryQueue == null){
					return false;
				}else{
					if(sessionInitQueryQueue.size() ==0){
						return false;
					}
					if(!sessionInitQueryQueue.contains(commandInfo)){
						sessionInitQueryQueue.add(commandInfo);
					}
					return true;
				}
			}
		}
	}
	
	protected MysqlClientConnection source;
	private boolean completed;
	private long createTime;
	private long timeout;
	private long endTime;
	private boolean ended = false;
	protected CommandQueue commandQueue;
	private ObjectPool[] pools;
	private CommandInfo info = new CommandInfo();
	protected byte commandType;
	protected Map<Connection,MessageHandler> handlerMap = new HashMap<Connection,MessageHandler>();
	private final Lock lock = new ReentrantLock(false);
	private PacketBuffer buffer = new AbstractPacketBuffer(1400);
	public CommandMessageHandler(final MysqlClientConnection source,byte[] query,ObjectPool[] pools,long timeout){
		handlerMap.put(source, source.getMessageHandler());
		source.setMessageHandler(this);
		commandQueue = new CommandQueue(source);
		QueryCommandPacket command = new QueryCommandPacket();
		command.init(query,source);
		this.pools = pools;
		info.setBuffer(query);
		info.setMain(true);
		
		this.source = source;
		this.createTime = System.currentTimeMillis();
		this.timeout = timeout;
	}
	
	/**
	 * 判断被handled的Connection 消息传送是否都完成
	 * @return
	 */
	public boolean isCompleted(){
		return completed;
	}
	
	/**
	 * 主要是为了服务端连接 与 客户端连接的环境一致(比如,当前的schema 、charset等)
	 * 
	 * 在发送主命令之前,预先需要发送一些额外的命令,比如sourceConnection、destConnection 当前的database不一致,需要发送init_db Command
	 * 为了减少复杂度,只要一个Connection需要发送命令,那么所有连接都必须发送一次相同的命令。
	 * 
	 * @param sourceMysql
	 * @param destMysqlConn
	 */
	//TODO 需要进行优化
	protected void appendPreMainCommand(){
		Set<MysqlServerConnection> connSet = commandQueue.connStatusMap.keySet();
		final MysqlConnection sourceMysql =(MysqlConnection) source;
		for(Connection destConn : connSet){
			MysqlConnection destMysqlConn = (MysqlConnection)destConn;
			if(!StringUtil.equalsIgnoreCase(sourceMysql.getSchema(), destMysqlConn.getSchema())){
				if(sourceMysql.getSchema() != null){
					QueryCommandPacket selectDBCommand = new QueryCommandPacket();
					selectDBCommand.arg = sourceMysql.getSchema();
					selectDBCommand.command = QueryCommandPacket.COM_INIT_DB;
					
					byte[] buffer = selectDBCommand.toByteBuffer(destMysqlConn).array();
					CommandInfo info = new CommandInfo();
					info.setBuffer(buffer);
					info.setMain(false);
					info.setRunnable(new Runnable(){
						public void run() {
							Set<MysqlServerConnection> connSet = commandQueue.connStatusMap.keySet();
							for(Connection conn : connSet){
								((MysqlConnection)conn).setSchema(sourceMysql.getSchema());
							}
						}
					});
					commandQueue.appendCommand(info,true);
				}
			}
			
			if(sourceMysql.getCharset()!= null &&
					!StringUtil.equalsIgnoreCase(sourceMysql.getCharset(),destMysqlConn.getCharset())){
				QueryCommandPacket charsetCommand = new QueryCommandPacket();
				charsetCommand.arg = "set names " + sourceMysql.getCharset();
				charsetCommand.command = QueryCommandPacket.COM_QUERY;
				
				byte[] buffer = charsetCommand.toByteBuffer(sourceMysql).array();
				CommandInfo info = new CommandInfo();
				info.setBuffer(buffer);
				info.setMain(false);
				info.setRunnable(new Runnable(){
					public void run() {
						Set<MysqlServerConnection> connSet = commandQueue.connStatusMap.keySet();
						for(Connection conn : connSet){
							((MysqlConnection)conn).setCharset(sourceMysql.getCharset());
						}
					}
				});
				commandQueue.appendCommand(info,true);
			}
			
			if(sourceMysql.isAutoCommit() != destMysqlConn.isAutoCommit()){
				QueryCommandPacket charsetCommand = new QueryCommandPacket();
				charsetCommand.arg = "set autocommit = " + (sourceMysql.isAutoCommit()?1:0);
				charsetCommand.command = QueryCommandPacket.COM_QUERY;
				
				byte[] buffer = charsetCommand.toByteBuffer(sourceMysql).array();
				CommandInfo info = new CommandInfo();
				info.setBuffer(buffer);
				info.setMain(false);
				info.setRunnable(new Runnable(){
					public void run() {
						Set<MysqlServerConnection> connSet = commandQueue.connStatusMap.keySet();
						for(Connection conn : connSet){
							((MysqlConnection)conn).setAutoCommit(sourceMysql.isAutoCommit());
						}
					}
				});
				commandQueue.appendCommand(info,true);
			}
			
		}
	}
	protected void appendAfterMainCommand(){
		
	}

	/**
	 * 当连接开始一个命令的时候
	 * @param conn
	 */
	protected void startConnectionCommand(Connection conn,CommandInfo currentCommand){
		if(conn instanceof CommandListener){
			CommandListener listener = (CommandListener)conn;
			listener.startCommand(commandQueue.currentCommand);
		}
	}
	
	/**
	 * 当连接完成一个命令的时候执行,只是针对连接自己,而不是所有连接。
	 * @param conn
	 */
	protected void finishedConnectionCommand(Connection conn,CommandInfo currentCommand){
		if(conn instanceof CommandListener){
			CommandListener listener = (CommandListener) conn;
			listener.finishedCommand(this.commandQueue.currentCommand);
		}
	}
	
	public void handleMessage(Connection fromConn, byte[] message) {
		/*if(ended){
			logger.error("ended session handler handle message:\n"+StringUtil.dumpAsHex(message, message.length));
			return;
		}*/

		if(fromConn == source){
			CommandInfo info = new CommandInfo();
			info.setBuffer(message);
			info.setMain(true);
			
			if(!commandQueue.appendCommand(info,false)){
				dispatchMessageFrom(source,message);
			}
		}else{
			
			if(logger.isDebugEnabled()){
				if(MysqlPacketBuffer.isErrorPacket(message)){
					logger.error("connection="+fromConn.hashCode()+",error packet:\n"+StringUtil.dumpAsHex(message, message.length));
				}
			}
			//判断命令是否完成了
			CommandStatus commStatus = commandQueue.checkResponseCompleted(fromConn, message);
			
			if(CommandStatus.AllCompleted == commStatus || CommandStatus.ConnectionCompleted == commStatus){
				finishedConnectionCommand(fromConn,commandQueue.currentCommand);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -