📄 commandmessagehandler.java
字号:
/*
* This program is free software; you can redistribute it and/or modify it under the terms of
* the GNU General Public License as published by the Free Software Foundation; either version 3 of the License,
* or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU General Public License for more details.
* You should have received a copy of the GNU General Public License along with this program;
* if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
package com.meidusa.amoeba.mysql.handler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import com.meidusa.amoeba.mysql.net.CommandInfo;
import com.meidusa.amoeba.mysql.net.CommandListener;
import com.meidusa.amoeba.mysql.net.MysqlClientConnection;
import com.meidusa.amoeba.mysql.net.MysqlConnection;
import com.meidusa.amoeba.mysql.net.MysqlServerConnection;
import com.meidusa.amoeba.mysql.net.packet.EOFPacket;
import com.meidusa.amoeba.mysql.net.packet.ErrorPacket;
import com.meidusa.amoeba.mysql.net.packet.MysqlPacketBuffer;
import com.meidusa.amoeba.mysql.net.packet.OkPacket;
import com.meidusa.amoeba.mysql.net.packet.QueryCommandPacket;
import com.meidusa.amoeba.net.Connection;
import com.meidusa.amoeba.net.MessageHandler;
import com.meidusa.amoeba.net.Sessionable;
import com.meidusa.amoeba.net.packet.AbstractPacketBuffer;
import com.meidusa.amoeba.net.packet.Packet;
import com.meidusa.amoeba.net.packet.PacketBuffer;
import com.meidusa.amoeba.net.poolable.ObjectPool;
import com.meidusa.amoeba.net.poolable.PoolableObject;
import com.meidusa.amoeba.util.Reporter;
import com.meidusa.amoeba.util.StringUtil;
/**
*
* @author <a href=mailto:piratebase@sina.com>Struct chen</a>
*
*/
public abstract class CommandMessageHandler implements MessageHandler,Sessionable,Reporter.SubReporter {
private static Logger logger = Logger.getLogger(CommandMessageHandler.class);
/**
* 表示服务器返回的数据包所表示当前会话状态
* @author <a href=mailto:piratebase@sina.com>Struct chen</a>
*
*/
static class SessionStatus{
public static final int QUERY = 1;
public static final int RESULT_HEAD = 2;
public static final int EOF_FIELDS = 4;
public static final int EOF_ROWS = 8;
public static final int OK = 16;
public static final int ERROR = 32;
public static final int COMPLETED = 64;
}
static enum CommandStatus{
ConnectionNotComplete,ConnectionCompleted,AllCompleted
}
/**
* 描述服务端连接的状态。包括当前命令的状态,当前连接的数据包
* @author <a href=mailto:piratebase@sina.com>Struct chen</a>
*
*/
static abstract class ConnectionStatuts{
protected Connection conn;
public ConnectionStatuts(Connection conn){
this.conn = conn;
}
int statusCode;
int packetIndex;
List<byte[]> buffers;
protected byte commandType;
public void clearBuffer(){
if(buffers != null){
buffers.clear();
}
}
public void setCommandType(byte commandType){
this.commandType = commandType;
statusCode = 0;
packetIndex = 0;
}
/**
* 判断从服务器端返回得数据包是否表示当前请求的结束。
* @param buffer
* @return
*/
public boolean isCompleted(byte[] buffer) {
if(this.commandType == QueryCommandPacket.COM_INIT_DB){
boolean isCompleted = false;
if(MysqlPacketBuffer.isErrorPacket(buffer)){
statusCode |= SessionStatus.ERROR;
statusCode |= SessionStatus.COMPLETED;
isCompleted = true;
}else if(MysqlPacketBuffer.isOkPacket(buffer)){
statusCode |= SessionStatus.OK;
statusCode |= SessionStatus.COMPLETED;
isCompleted = true;
}
return isCompleted;
}else{
return false;
}
}
}
protected static class CommandQueue{
protected List<CommandInfo> sessionInitQueryQueue; //所有的从客户端发送过来的 command 队列
protected CommandInfo currentCommand;//当前的query
private final Lock lock = new ReentrantLock(false);
protected Map<MysqlServerConnection,ConnectionStatuts> connStatusMap = new HashMap<MysqlServerConnection,ConnectionStatuts>();
private boolean mainCommandExecuted;
private MysqlClientConnection source;
public CommandQueue(MysqlClientConnection source){
this.source = source;
}
public boolean isMultiple(){
return connStatusMap.size()>1;
}
public void clearAllBuffer(){
Collection<ConnectionStatuts> collection = connStatusMap.values();
for(ConnectionStatuts status : collection){
status.clearBuffer();
}
}
/**
* 尝试下一个命令,如果返回false,表示队列中没有命令了。
*
* @return
*/
private boolean tryNextCommandTuple(){
if(sessionInitQueryQueue == null){
return false;
}else{
if(sessionInitQueryQueue.size()>0){
currentCommand = sessionInitQueryQueue.get(0);
if(logger.isDebugEnabled()){
QueryCommandPacket command = new QueryCommandPacket();
command.init(currentCommand.getBuffer(),source);
logger.debug(command);
}
return true;
}
return false;
}
}
/**
* 判断返回的数据是否是当前命令的结束包。
* 当前全部连接都全部返回以后则表示当前命令完全结束。
* @param conn
* @param buffer
* @return
*/
protected CommandStatus checkResponseCompleted(Connection conn,byte[] buffer){
boolean isCompleted = false;
ConnectionStatuts connStatus = connStatusMap.get(conn);
if(connStatus == null){
logger.error("connection Status not Found, byffer="+StringUtil.dumpAsHex(buffer, buffer.length));
}
isCompleted = connStatus.isCompleted(buffer);
connStatus.packetIndex ++;
/**
* 如果是多个连接的,需要将数据缓存起来,等待命令全部完成以后,将数据进行组装,然后发送到客户端
* {@link #CommandMessageHandler.mergeMessageToClient}
*/
if(connStatus.buffers == null){
connStatus.buffers = new ArrayList<byte[]>();
}
connStatus.buffers.add(buffer);
if(isCompleted){
lock.lock();
try{
if(currentCommand.getCompletedCount().incrementAndGet() == connStatusMap.size()){
if(logger.isDebugEnabled()){
Packet packet = null;
if(MysqlPacketBuffer.isErrorPacket(buffer)){
packet = new ErrorPacket();
}else if(MysqlPacketBuffer.isEofPacket(buffer)){
packet = new EOFPacket();
}else if(MysqlPacketBuffer.isOkPacket(buffer)){
packet = new OkPacket();
}
packet.init(buffer,conn);
logger.debug("returned Packet:"+packet);
}
return CommandStatus.AllCompleted;
}else{
return CommandStatus.ConnectionCompleted;
}
}finally{
lock.unlock();
}
}else{
return CommandStatus.ConnectionNotComplete;
}
}
/**
* 是否append 成功,如果成功则表示以前曾经堆积过,需要队列来保证发送命令的循序。
* 如果队列中没有堆积的命令,则返回false.
* 否则返回true, 则表示可直接发送命令
* @param commandInfo
* @param force 强制append 命令,返回为true
* @return
*/
public synchronized boolean appendCommand(CommandInfo commandInfo,boolean force){
if(force){
if(sessionInitQueryQueue == null){
sessionInitQueryQueue = Collections.synchronizedList(new ArrayList<CommandInfo>());
}
if(!sessionInitQueryQueue.contains(commandInfo)){
sessionInitQueryQueue.add(commandInfo);
}
return true;
}else{
if(sessionInitQueryQueue == null){
return false;
}else{
if(sessionInitQueryQueue.size() ==0){
return false;
}
if(!sessionInitQueryQueue.contains(commandInfo)){
sessionInitQueryQueue.add(commandInfo);
}
return true;
}
}
}
}
protected MysqlClientConnection source;
private boolean completed;
private long createTime;
private long timeout;
private long endTime;
private boolean ended = false;
protected CommandQueue commandQueue;
private ObjectPool[] pools;
private CommandInfo info = new CommandInfo();
protected byte commandType;
protected Map<Connection,MessageHandler> handlerMap = new HashMap<Connection,MessageHandler>();
private final Lock lock = new ReentrantLock(false);
private PacketBuffer buffer = new AbstractPacketBuffer(1400);
public CommandMessageHandler(final MysqlClientConnection source,byte[] query,ObjectPool[] pools,long timeout){
handlerMap.put(source, source.getMessageHandler());
source.setMessageHandler(this);
commandQueue = new CommandQueue(source);
QueryCommandPacket command = new QueryCommandPacket();
command.init(query,source);
this.pools = pools;
info.setBuffer(query);
info.setMain(true);
this.source = source;
this.createTime = System.currentTimeMillis();
this.timeout = timeout;
}
/**
* 判断被handled的Connection 消息传送是否都完成
* @return
*/
public boolean isCompleted(){
return completed;
}
/**
* 主要是为了服务端连接 与 客户端连接的环境一致(比如,当前的schema 、charset等)
*
* 在发送主命令之前,预先需要发送一些额外的命令,比如sourceConnection、destConnection 当前的database不一致,需要发送init_db Command
* 为了减少复杂度,只要一个Connection需要发送命令,那么所有连接都必须发送一次相同的命令。
*
* @param sourceMysql
* @param destMysqlConn
*/
//TODO 需要进行优化
protected void appendPreMainCommand(){
Set<MysqlServerConnection> connSet = commandQueue.connStatusMap.keySet();
final MysqlConnection sourceMysql =(MysqlConnection) source;
for(Connection destConn : connSet){
MysqlConnection destMysqlConn = (MysqlConnection)destConn;
if(!StringUtil.equalsIgnoreCase(sourceMysql.getSchema(), destMysqlConn.getSchema())){
if(sourceMysql.getSchema() != null){
QueryCommandPacket selectDBCommand = new QueryCommandPacket();
selectDBCommand.arg = sourceMysql.getSchema();
selectDBCommand.command = QueryCommandPacket.COM_INIT_DB;
byte[] buffer = selectDBCommand.toByteBuffer(destMysqlConn).array();
CommandInfo info = new CommandInfo();
info.setBuffer(buffer);
info.setMain(false);
info.setRunnable(new Runnable(){
public void run() {
Set<MysqlServerConnection> connSet = commandQueue.connStatusMap.keySet();
for(Connection conn : connSet){
((MysqlConnection)conn).setSchema(sourceMysql.getSchema());
}
}
});
commandQueue.appendCommand(info,true);
}
}
if(sourceMysql.getCharset()!= null &&
!StringUtil.equalsIgnoreCase(sourceMysql.getCharset(),destMysqlConn.getCharset())){
QueryCommandPacket charsetCommand = new QueryCommandPacket();
charsetCommand.arg = "set names " + sourceMysql.getCharset();
charsetCommand.command = QueryCommandPacket.COM_QUERY;
byte[] buffer = charsetCommand.toByteBuffer(sourceMysql).array();
CommandInfo info = new CommandInfo();
info.setBuffer(buffer);
info.setMain(false);
info.setRunnable(new Runnable(){
public void run() {
Set<MysqlServerConnection> connSet = commandQueue.connStatusMap.keySet();
for(Connection conn : connSet){
((MysqlConnection)conn).setCharset(sourceMysql.getCharset());
}
}
});
commandQueue.appendCommand(info,true);
}
if(sourceMysql.isAutoCommit() != destMysqlConn.isAutoCommit()){
QueryCommandPacket charsetCommand = new QueryCommandPacket();
charsetCommand.arg = "set autocommit = " + (sourceMysql.isAutoCommit()?1:0);
charsetCommand.command = QueryCommandPacket.COM_QUERY;
byte[] buffer = charsetCommand.toByteBuffer(sourceMysql).array();
CommandInfo info = new CommandInfo();
info.setBuffer(buffer);
info.setMain(false);
info.setRunnable(new Runnable(){
public void run() {
Set<MysqlServerConnection> connSet = commandQueue.connStatusMap.keySet();
for(Connection conn : connSet){
((MysqlConnection)conn).setAutoCommit(sourceMysql.isAutoCommit());
}
}
});
commandQueue.appendCommand(info,true);
}
}
}
protected void appendAfterMainCommand(){
}
/**
* 当连接开始一个命令的时候
* @param conn
*/
protected void startConnectionCommand(Connection conn,CommandInfo currentCommand){
if(conn instanceof CommandListener){
CommandListener listener = (CommandListener)conn;
listener.startCommand(commandQueue.currentCommand);
}
}
/**
* 当连接完成一个命令的时候执行,只是针对连接自己,而不是所有连接。
* @param conn
*/
protected void finishedConnectionCommand(Connection conn,CommandInfo currentCommand){
if(conn instanceof CommandListener){
CommandListener listener = (CommandListener) conn;
listener.finishedCommand(this.commandQueue.currentCommand);
}
}
public void handleMessage(Connection fromConn, byte[] message) {
/*if(ended){
logger.error("ended session handler handle message:\n"+StringUtil.dumpAsHex(message, message.length));
return;
}*/
if(fromConn == source){
CommandInfo info = new CommandInfo();
info.setBuffer(message);
info.setMain(true);
if(!commandQueue.appendCommand(info,false)){
dispatchMessageFrom(source,message);
}
}else{
if(logger.isDebugEnabled()){
if(MysqlPacketBuffer.isErrorPacket(message)){
logger.error("connection="+fromConn.hashCode()+",error packet:\n"+StringUtil.dumpAsHex(message, message.length));
}
}
//判断命令是否完成了
CommandStatus commStatus = commandQueue.checkResponseCompleted(fromConn, message);
if(CommandStatus.AllCompleted == commStatus || CommandStatus.ConnectionCompleted == commStatus){
finishedConnectionCommand(fromConn,commandQueue.currentCommand);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -