⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 commandmessagehandler.java

📁 mysql集群
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
				lock.lock();
				try{
					if(this.ended){
						releaseConnection(fromConn);
					}
				}finally{
					lock.unlock();
				}
			}
			
			if(CommandStatus.AllCompleted == commStatus){
				try{
					if(commandQueue.currentCommand.isMain()){
						commandQueue.mainCommandExecuted = true;
						releaseConnection(source);
					}

					/**
					 * 如果是客户端请求的命令则:
					 * 1、请求是多台server的,需要进行合并数据
					 * 2、单台server直接写出到客户端
					 */
					
					if(commandQueue.currentCommand.isMain()){
						if(commandQueue.isMultiple()){
							List<byte[]> list = this.mergeMessages();
							if(list != null){
								for(byte[] buffer : list){
									dispatchMessageFrom(fromConn,buffer);
								}
							}
						}else{
							dispatchMessageFrom(fromConn,message);
						}
					}else{
						//非主命令发送以后返回出错信息,则结束当前的session
						Collection<ConnectionStatuts> connectionStatutsSet = commandQueue.connStatusMap.values();
						for(ConnectionStatuts connStatus : connectionStatutsSet){
							//看是否每个服务器返回的数据包都没有异常信息。
							if((connStatus.statusCode & SessionStatus.ERROR) >0){
								this.commandQueue.currentCommand.setStatusCode(connStatus.statusCode);
								byte[] errorBuffer = connStatus.buffers.get(connStatus.buffers.size()-1);
								if(!commandQueue.mainCommandExecuted){
									dispatchMessageFrom(connStatus.conn,errorBuffer);
									if(source.isAutoCommit()){
										this.endSession();
									}
								}else{
									if(logger.isDebugEnabled()){
										byte[] commandBuffer = commandQueue.currentCommand.getBuffer();
										StringBuffer buffer = new StringBuffer();
										buffer.append("Current Command Execute Error:\n");
										buffer.append(StringUtil.dumpAsHex(commandBuffer,commandBuffer.length));
										buffer.append("\n error Packet:\n");
										buffer.append(StringUtil.dumpAsHex(errorBuffer,errorBuffer.length));
										logger.debug(buffer.toString());
									}
								}
								return;
							}
						}
					}
				}finally{
					afterCommandCompleted(commandQueue.currentCommand);
				}
			}else{
				if(commandQueue.currentCommand.isMain()){
					if(!commandQueue.isMultiple()){
						dispatchMessageFrom(fromConn,message);
					}
				}
			}
		}
	}
	
	/**
	 * 当一个命令结束的时候,清理缓存的数据包。并且尝试发送下一个command
	 * 如果队列中没有命令,则结束当前回话
	 * @param oldCommand 当前的command
	 */
	protected void afterCommandCompleted(CommandInfo oldCommand){
		if(oldCommand.getRunnable()!= null){
			oldCommand.getRunnable().run();
		}
		commandQueue.clearAllBuffer();

		//当一个命令的最后一个数据包到达,则将当前的命令从队列中删除。
		commandQueue.sessionInitQueryQueue.remove(0);
		if(!ended){
			startNextCommand();
		}
	}
	
	//判断是否需要继续发送下一条客户端命令
	//发送下一条命令
	protected synchronized void startNextCommand(){
		if(commandQueue.currentCommand != null && (commandQueue.currentCommand.getStatusCode() & SessionStatus.ERROR) >0){
			if(source.isAutoCommit()){
				this.endSession();
			}
			return;
		}
		
		if(!this.ended && commandQueue.tryNextCommandTuple()){
			commandType = commandQueue.currentCommand.getBuffer()[4];
			Collection<ConnectionStatuts> connSet = commandQueue.connStatusMap.values();
			
			boolean commandCompleted = commandQueue.currentCommand.getCompletedCount().get() == commandQueue.connStatusMap.size();
			
			if(!commandCompleted){
				for(ConnectionStatuts status : connSet){
					status.setCommandType(commandType);
					startConnectionCommand(status.conn,commandQueue.currentCommand);
				}
			}
			
			dispatchMessageFrom(source,commandQueue.currentCommand.getBuffer());
			
			if(commandCompleted){
				afterCommandCompleted(commandQueue.currentCommand);
			}
		}else{
			if(source.isAutoCommit()){
				this.endSession();
			}
		}
	}
	
	/**
	 * 任何在handler里面需要发送到目标连接的数据包,都调用该方法发送出去。
	 * 从服务器端发送过来的消息到客户端,或者从客户端发送命令到各个mysql server。
	 * 
	 * 这儿主要发送的消息有2种:
	 * 1、从客户端发送过来的消息
	 * 2、reponse当前的主要命令(是客户端发出来的命令而不是该proxy内部产生的命令)的数据包
	 * 以上2种数据包通过dispatchMessage 方法发送出去的。
	 * 由内部产生的命令数据包可以在 afterCommandCompleted()之后 根据ConnectionStatus.buffers中保存。
	 * commandQueue.clearAllBuffer() 以后buffers 将被清空
	 * 
	 * @param fromServer 是否是从mysql server 端发送过来的
	 * @param message 消息内容
	 */
	protected void dispatchMessageFrom(Connection fromConn,byte[] message){
		if(fromConn != source){
			dispatchMessageTo(source,message);
		}else{
			Collection<MysqlServerConnection> connSet =  commandQueue.connStatusMap.keySet();
			for(Connection conn : connSet){
				dispatchMessageTo(conn,message);
			}
		}
	}
	
	/**
	 * 这儿将启动一些缓存机制,避免小数据包频繁调用 系统write, CommandMessageHandler类或者其子类必须通过该方法发送数据包
	 * @param toConn
	 * @param message
	 */
	protected void dispatchMessageTo(Connection toConn,byte[] message){
		
		if(toConn == source){
			if(message != null){
				appendBufferToWrite(message,buffer,toConn,false);
			}else{
				appendBufferToWrite(message,buffer,toConn,true);
			}
		}else{
			toConn.postMessage(message);
		}
		
	}
	
	private  boolean appendBufferToWrite(byte[] byts,PacketBuffer buffer,Connection conn,boolean writeNow){
		if(byts == null){
			if(buffer.getPosition()>0){
				conn.postMessage(buffer.toByteBuffer());
				buffer.reset();
			}
			return true;
		}else{
			if(writeNow || buffer.remaining() < byts.length){
				if(buffer.getPosition()>0){
					buffer.writeBytes(byts);
					conn.postMessage(buffer.toByteBuffer());
					buffer.reset();
				}else{
					conn.postMessage(byts);
				}
				return true;
			}else{
				buffer.writeBytes(byts);
				return true;
			}
		}
	}
	
	protected void releaseConnection(Connection conn){
		lock.lock();
		try{
			MessageHandler handler = handlerMap.remove(conn);
			if(handler != null){
				conn.setMessageHandler(handler);
			}
			
			if(conn instanceof MysqlServerConnection){
				PoolableObject pooledObject = (PoolableObject)conn;
				if(pooledObject.getObjectPool() != null){
					try {
						pooledObject.getObjectPool().returnObject(conn);
						if(logger.isDebugEnabled()){
							logger.debug("connection:"+conn+" return to pool");
						}
					} catch (Exception e) {
					}
				}
			}
		}finally{
			lock.unlock();	
		}
	}
	
	/**
	 * 关闭该messageHandler 并且恢复所有这个messageHandler所handle的Connection
	 */
	protected void releaseAllCompletedConnection(){
		lock.lock();
		try{
			Set<Map.Entry<Connection,MessageHandler>> handlerSet = handlerMap.entrySet();
			for(Map.Entry<Connection,MessageHandler> entry:handlerSet){
				MessageHandler handler = entry.getValue();
				Connection connection = entry.getKey();
				ConnectionStatuts status = this.commandQueue.connStatusMap.get(connection);
				if(this.commandQueue.currentCommand == null || status != null && (status.statusCode & SessionStatus.COMPLETED)>0){
					connection.setMessageHandler(handler);
					if(!connection.isClosed()){
						if(connection instanceof MysqlServerConnection){
							PoolableObject pooledObject = (PoolableObject)connection;
							if(pooledObject.getObjectPool() != null){
								try {
									pooledObject.getObjectPool().returnObject(connection);
									if(logger.isDebugEnabled()){
										logger.debug("connection:"+connection+" return to pool");
									}
								} catch (Exception e) {
								}
							}
						}
					}
				}
			}
		}finally{
			lock.unlock();
		}
	}
	
	/**
	 * 合并多服务端的消息,发送到客户端
	 * 只有在多连接的情况下需要进行数据包聚合,聚合以后逐一将数据包通过 {@link #dispatchMessageFrom(Connection, byte[])}方法发送出去,
	 * 一对一的连接直接通过{@link #dispatchMessageFrom(Connection, byte[])} 方法 直接发送出去,而不需要merge。
	 * @return
	 */
	protected List<byte[]> mergeMessages(){
		Collection<ConnectionStatuts> connectionStatutsSet = commandQueue.connStatusMap.values();
		boolean isSelectQuery = true;
		List<byte[]> buffers = null;
		List<byte[]> returnList = new ArrayList<byte[]>();
		for(ConnectionStatuts connStatus : connectionStatutsSet){
			//看是否每个服务器返回的数据包都没有异常信息。
			byte[] buffer = connStatus.buffers.get(connStatus.buffers.size()-1);
			buffers = connStatus.buffers;
			if((connStatus.statusCode & SessionStatus.ERROR) >0){
				return buffers;
			}
			if(isSelectQuery){
				isSelectQuery =isSelectQuery && MysqlPacketBuffer.isEofPacket(buffer);
			}
		}
		
		if(isSelectQuery){
			//当前的packetId
			byte paketId = 0;
			
			//发送field信息
			for(byte[] buffer : buffers){
				if(MysqlPacketBuffer.isEofPacket(buffer)){
					returnList.add(buffer);
					paketId = buffer[3];
					break;
				}else{
					returnList.add(buffer);
					paketId = buffer[3];
				}
			}
			paketId += 1;
			//发送rows数据包
			for(ConnectionStatuts connStatus : connectionStatutsSet){
				boolean rowStart = false;;
				for(byte[] buffer : connStatus.buffers){
					if(!rowStart){
						if(MysqlPacketBuffer.isEofPacket(buffer)){
							rowStart = true;
						}else{
							continue;
						}
					}else{
						if(!MysqlPacketBuffer.isEofPacket(buffer)){
							buffer[3] = paketId;
							paketId += 1;
							returnList.add(buffer);
						}
					}
				}
			}
			
			byte[] eofBuffer = buffers.get(buffers.size()-1);
			eofBuffer[3] = paketId;
			returnList.add(eofBuffer);
		}else{
			OkPacket ok = new OkPacket();
			StringBuffer strbuffer = new StringBuffer();
			for(ConnectionStatuts connStatus : connectionStatutsSet){
				byte[] buffer = connStatus.buffers.get(connStatus.buffers.size()-1);
				OkPacket connOK = new OkPacket();
				connOK.init(buffer,connStatus.conn);
				ok.affectedRows +=connOK.affectedRows;
				ok.insertId =connOK.insertId;
				ok.packetId = 1;
				strbuffer.append(connOK.message);
				ok.warningCount +=connOK.warningCount;
			}
			ok.message = strbuffer.toString();
			returnList.add(ok.toByteBuffer(source).array());
		}
		return returnList;
	}

	protected abstract ConnectionStatuts newConnectionStatuts(Connection conn);

	public synchronized void startSession() throws Exception {
		if(logger.isInfoEnabled()){
			logger.info(this+" session start");
		}
		for(ObjectPool pool:pools){
			MysqlServerConnection conn;
			conn = (MysqlServerConnection)pool.borrowObject();
			handlerMap.put(conn, conn.getMessageHandler());
			conn.setMessageHandler(this);
			commandQueue.connStatusMap.put(conn, newConnectionStatuts(conn));
		}
		appendPreMainCommand();
		this.commandQueue.appendCommand(info, true);
		appendAfterMainCommand();
		startNextCommand();
	}
	
	public boolean checkIdle(long now) {
		if(timeout >0){
			return (now - createTime)>timeout;
		}else{
			if(ended){
				/**
				 * 如果该session已经结束,此时如果serverConnection端还在等待所有数据访问。并且超过15s,则需要当空闲的会话
				 * 避免由于各种原因造成服务器端没有发送数据或者已经结束的会话而ServerConnection无法返回Pool中。
				 */
				return (now - endTime)>15000;
			}
			return false;
		}
	}

	public void endSession() {
		lock.lock();
		try{
			if(!ended){
				endTime = System.currentTimeMillis();
				ended = true;
				this.releaseAllCompletedConnection();
				if(!this.commandQueue.mainCommandExecuted){
					ErrorPacket error = new ErrorPacket();
					error.errno = 10000;
					error.packetId = 2;
					error.serverErrorMessage = "session was killed!!";
					this.dispatchMessageTo(source, error.toByteBuffer(source).array());
					logger.warn("session was killed!!",new Exception());
					source.postClose(null);
				}else{
					if(logger.isInfoEnabled()){
						logger.info(this+" session ended.");
					}
				}
				this.dispatchMessageTo(this.source, null);
			}
		}finally{
			lock.unlock();
		}
	}
	

	public boolean isEnded() {
		lock.lock();
		try{
			return this.ended;
		}finally{
			lock.unlock();
		}
	}
	
	public void appendReport(StringBuilder buffer, long now, long sinceLast,boolean reset,Level level) {
		buffer.append("    -- MessageHandler:").append("multiple Size:").append(commandQueue.connStatusMap.size());
		if(commandQueue.currentCommand != null){
			buffer.append(",currentCommand completedCount:");
			buffer.append(commandQueue.currentCommand.getCompletedCount()).append("\n");
		}else{
			buffer.append("\n");
		}
	}

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -