⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mysqlclientconnection.java

📁 mysql集群
💻 JAVA
字号:
/*
 * 	This program is free software; you can redistribute it and/or modify it under the terms of 
 * the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, 
 * or (at your option) any later version. 
 * 
 * 	This program is distributed in the hope that it will be useful, 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  
 * See the GNU General Public License for more details. 
 * 	You should have received a copy of the GNU General Public License along with this program; 
 * if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 */
package com.meidusa.amoeba.mysql.net;

import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.collections.map.LRUMap;
import org.apache.log4j.Logger;

import com.meidusa.amoeba.context.ProxyRuntimeContext;
import com.meidusa.amoeba.mysql.filter.IOFilter;
import com.meidusa.amoeba.mysql.filter.PacketFilterInvocation;
import com.meidusa.amoeba.mysql.filter.PacketIOFilter;
import com.meidusa.amoeba.mysql.handler.PreparedStatmentInfo;
import com.meidusa.amoeba.net.AuthingableConnectionManager;
import com.meidusa.amoeba.net.Connection;
import com.meidusa.amoeba.util.ThreadLocalMap;

/**
 * 负责连接到 proxy server的客户端连接对象包装
 * @author <a href=mailto:piratebase@sina.com>Struct chen</a>
 *
 */
public class MysqlClientConnection extends MysqlConnection{
	private static Logger logger = Logger.getLogger(MysqlClientConnection.class);
	protected String seed;//保存服务端发送的随机用于客户端加密的字符串
	protected byte[] authenticationMessage;//保存客户端返回的加密过的字符串
	static List<IOFilter> filterList = new ArrayList<IOFilter>();
	static {
		filterList.add(new PacketIOFilter());
	}
	
	private List<byte[]> longDataList = new ArrayList<byte[]>(); 
	
	private List<byte[]> unmodifiableLongDataList = Collections.unmodifiableList(new ArrayList<byte[]>()); 
	/**
	 * 存储sql,statmentId 对
	 */
	private final Map<String,Long> SQL_STATMENT_ID_MAP = Collections.synchronizedMap(new HashMap<String,Long>(256));
	private AtomicLong atomicLong = new AtomicLong(1);
	
	/**
	 * 采用LRU缓存这些preparedStatment信息
	 * key=statmentId
	 * value=PreparedStatmentInfo object
	 */
	@SuppressWarnings("unchecked")
	private final Map<Long,PreparedStatmentInfo> PREPARED_STATMENT_MAP = Collections.synchronizedMap(new LRUMap(256){
		private static final long serialVersionUID = 1L;
		protected boolean removeLRU(LinkEntry entry) {
			PreparedStatmentInfo info = (PreparedStatmentInfo)entry.getValue();
			SQL_STATMENT_ID_MAP.remove(info.getPreparedStatment());
		    return true;
		}
		
		public PreparedStatmentInfo remove(Object key){
			PreparedStatmentInfo info = (PreparedStatmentInfo)super.remove(key);
			SQL_STATMENT_ID_MAP.remove(info.getPreparedStatment());
			return info;
		}
		
		public Object put(Object key,Object value){
			PreparedStatmentInfo info = (PreparedStatmentInfo)value;
			SQL_STATMENT_ID_MAP.put(info.getPreparedStatment(), (Long)key);
			return super.put(key, value);
		}
		
		public void putAll(Map map){
			for(Iterator it= map.entrySet().iterator();it.hasNext();){
				Map.Entry<Long,PreparedStatmentInfo> entry = (Map.Entry<Long,PreparedStatmentInfo>)it.next();
				SQL_STATMENT_ID_MAP.put(entry.getValue().getPreparedStatment(), entry.getKey());
			}
			super.putAll(map);
		}
	});
	public MysqlClientConnection(SocketChannel channel, long createStamp){
		super(channel, createStamp);
		
	}

	public PreparedStatmentInfo getPreparedStatmentInfo(long id){
		return PREPARED_STATMENT_MAP.get(id);
	}
	
	public PreparedStatmentInfo getPreparedStatmentInfo(String preparedSql){
		Long id = SQL_STATMENT_ID_MAP.get(preparedSql);
		PreparedStatmentInfo info = null;
		if(id == null){
			info = new PreparedStatmentInfo(this,atomicLong.getAndIncrement(),preparedSql);
			PREPARED_STATMENT_MAP.put(info.getStatmentId(), info);
		}else{
			info = getPreparedStatmentInfo(id);
		}
		return info;
	}
	
	public String getSeed() {
		return seed;
	}

	public void setSeed(String seed) {
		this.seed = seed;
	}

	public byte[] getAuthenticationMessage() {
		return authenticationMessage;
	}

	public void handleMessage(Connection conn,byte[] message) {
		//在未验证通过的时候
		/** 此时接收到的应该是认证数据,保存数据为认证提供数据 */
		this.authenticationMessage = message;
		((AuthingableConnectionManager)_cmgr).getAuthenticator().authenticateConnection(this);
	}
	
	protected void messageProcess(final byte[] msg){
		final PacketFilterInvocation invocation = new PacketFilterInvocation(filterList,this,msg){
			@Override
			protected Result doProcess() {
				ProxyRuntimeContext.getInstance().getClientSideExecutor().execute(new Runnable(){

					public void run() {
						try{
							MysqlClientConnection.this.getMessageHandler().handleMessage(MysqlClientConnection.this, msg);
						}finally{
							ThreadLocalMap.reset();
						}
					}
				});
				return null;
			}
		};
		invocation.invoke();
	}
	
	public void addLongData(byte[] longData){
		longDataList.add(longData);
	}
	
	public void clearLongData(){
		longDataList.clear();
	}
	
	public List<byte[]> getLongDataList(){
		return unmodifiableLongDataList;
	}
	
	/**
	 * 正在处于验证的Connection Idle时间可以设置相应的少一点。
	 */
	public boolean checkIdle(long now) {
		if(isAuthenticated()){
			return false;
		}else{
			long idleMillis = now - _lastEvent;
			if (idleMillis < 5000) {
				return false;
			}
			if (isClosed()) {
				return true;
			}
			
			logger.warn("Disconnecting non-communicative client [conn=" + this
					+ ", idle=" + idleMillis + "ms].");
			return true;
		}
	}
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -