abstractqueryrouter.java

来自「mysql集群」· Java 代码 · 共 963 行 · 第 1/3 页

JAVA
963
字号
						}
						if(logger.isDebugEnabled()){
							logger.debug("Sql:["+sql +"] no Column rule, using table:"+tableRule.table +" default rules:"+Arrays.toString(tableRule.defaultPools));
						}
						continue;
					}
					List<String> groupMatched = new ArrayList(); 
					for(Rule rule:tableRule.ruleList){
						if(rule.group != null){
							if(groupMatched.contains(rule.group)){
								continue;
							}
						}
						
						//如果参数比必须的参数小,则继续下一条规则
						if(columnMap.size()<rule.parameterMap.size()){
							continue;
						}else{
							
							boolean matched = true;

							//如果查询语句中包含了该规则不需要的参数,则该规则将被忽略
							for(Column exclude : rule.excludes){
								
								Comparable condition = columnMap.get(exclude);
								if(condition != null){
									matched = false;
									break;
								}
							}
							
							//如果不匹配将继续下一条规则
							if(!matched) continue;
							
							Comparable[] comparables= new Comparable[rule.parameterMap.size()];
							//规则中的参数必须在dmlstatement中存在,否则这个规则将不启作用
							for(Map.Entry<Column,Integer> parameter : rule.cloumnMap.entrySet()){
								
								Comparative condition = columnMap.get(parameter.getKey());
								if(condition != null){
									
									//如果规则忽略 数组的 参数,并且参数有array 参数,则忽略该规则
									if(rule.ignoreArray && condition instanceof ComparativeBaseList){
										matched = false;
										break;
									}
									
									comparables[parameter.getValue()] = (Comparative)condition.clone();
								}else{
									matched = false;
									break;
								}
							}
							
							//如果不匹配将继续下一条规则
							if(!matched) continue;
							
							try {
								Comparable<?> result = rule.rowJep.getValue(comparables);
								if(result instanceof Comparative){
									matched = (Boolean)((Comparative)result).getValue();
								}else{
									matched = (Boolean)result;
								}
								
								if(matched){
									if(rule.group != null){
										groupMatched.add(rule.group);
									}
									String[] pools = dmlStatment.isReadStatment()?rule.readPools:rule.writePools;
									if(pools == null){
										pools = rule.defaultPools;
									}
									if(pools != null){
										for(String poolName : pools){
											if(!poolNames.contains(poolName)){
												poolNames.add(poolName);
											}
										}
									}else{
										logger.warn("rule:"+rule.name+" matched, but pools is null");
									}
									
									if(logger.isDebugEnabled()){
										logger.debug("Sql:["+sql +"] matched rule:"+rule.name);
									}
								}
							} catch (com.meidusa.amoeba.sqljep.ParseException e) {
								//logger.error("parse rule error:"+rule.expression,e);
							}
						}
					}
					
					//如果所有规则都无法匹配,则默认采用TableRule中的pool设置。 
					if(poolNames.size() == 0){
						if(logger.isDebugEnabled()){
							logger.debug("no rule matched, using table default rules:"+Arrays.toString(tableRule.defaultPools));
						}
						String[] pools = dmlStatment.isReadStatment()?tableRule.readPools:tableRule.writePools;
						if(pools == null){
							pools = tableRule.defaultPools;
						}
						
						for(String poolName : pools){
							if(!poolNames.contains(poolName)){
								poolNames.add(poolName);
							}
						}
					}
				}
			}
		}else{
			
			//如果sql语句中没有包含table,则采用TableRule中没有name的配置,一般情况下只有一条该规则,而且只有defaultPool启作用
			TableRule tableRule =  this.tableRuleMap.get(null);
			if(tableRule != null && tableRule.defaultPools != null && tableRule.defaultPools.length >0){
				for(String poolName : tableRule.defaultPools){
					if(!poolNames.contains(poolName)){
						poolNames.add(poolName);
					}
				}
			}
		}
		
		ObjectPool[] pools = new ObjectPool[poolNames.size()];
		int i=0;
		for(String name :poolNames){
			pools[i++] = ProxyRuntimeContext.getInstance().getPoolMap().get(name);
		}

		if(logger.isDebugEnabled()){
			logger.debug("Sql:["+sql +"] route to pools:"+poolNames);
		}
		
		if(pools == null || pools.length == 0){
			if(dmlStatment != null){
				pools = dmlStatment.isReadStatment()? this.readPools: this.writePools;
			}
			if(pools == null || pools.length == 0 || pools[0] == null) pools = this.defaultPools;
		}
		
		return pools;
	}
	
	/**
	 * 根据 PropertyStatment 设置相关连接的属性 
	 * @param conn 当前请求的连接
	 * @param statment 当前请求的Statment
	 * @param parameters 
	 */
	protected abstract void setProperty(DatabaseConnection conn,PropertyStatment statment,Object[] parameters);
	
	public void init() throws InitialisationException {
		defaultPools = new ObjectPool[]{ProxyRuntimeContext.getInstance().getPoolMap().get(defaultPool)};
		
		if(defaultPools == null || defaultPools[0] == null){
			throw new InitialisationException("default pool required!");
		}
		if(this.readPool != null && !StringUtil.isEmpty(readPool)){
			readPools = new ObjectPool[]{ProxyRuntimeContext.getInstance().getPoolMap().get(this.readPool)};
		}
		if(this.writePool != null && !StringUtil.isEmpty(writePool)){
			writePools = new ObjectPool[]{ProxyRuntimeContext.getInstance().getPoolMap().get(writePool)};
		}
		map = new LRUMap(this.LRUMapSize);
		
		class ConfigCheckTread extends Thread{
			long lastRuleModified;
			long lastFunFileModified;
			long lastRuleFunctionFileModified;
			File ruleFile;
			File funFile;
			File ruleFunctionFile;
			private ConfigCheckTread(){
			
				this.setDaemon(true);
				this.setName("ruleConfigCheckThread");
				ruleFile = new File(AbstractQueryRouter.this.ruleConfig);
				funFile = new File(AbstractQueryRouter.this.functionConfig);
				lastRuleModified = ruleFile.lastModified();
				lastFunFileModified = funFile.lastModified();
				if(AbstractQueryRouter.this.ruleFunctionConfig != null){
					ruleFunctionFile = new File(AbstractQueryRouter.this.ruleFunctionConfig);
					lastRuleFunctionFileModified = ruleFunctionFile.lastModified();
				}
			}
			public void run(){
				while(true){
					try {
						Thread.sleep(5000l);
						Map<String,Function> funMap = null;
						Map<String,PostfixCommand> ruleFunMap = null;
						Map<Table,TableRule> tableRuleMap = null;
						try{
							if(AbstractQueryRouter.this.functionConfig != null){
								if(funFile.lastModified() != lastFunFileModified){
									try{
										funMap = loadFunctionMap(AbstractQueryRouter.this.functionConfig);
									}catch(ConfigurationException exception){
									}
									
								}
							}
							if(AbstractQueryRouter.this.ruleFunctionConfig != null){
								if(ruleFunctionFile.lastModified() != lastRuleFunctionFileModified){
									ruleFunMap = loadRuleFunctionMap(AbstractQueryRouter.this.ruleFunctionConfig);
								}
							}
							
							if(AbstractQueryRouter.this.ruleConfig != null){
								if(ruleFile.lastModified() != lastRuleModified || 
										(AbstractQueryRouter.this.ruleFunctionConfig != null && ruleFunctionFile.lastModified() != lastRuleFunctionFileModified)){
									tableRuleMap = loadConfig(AbstractQueryRouter.this.ruleConfig);
								}
							}
							
							if(funMap != null){
								AbstractQueryRouter.this.functionMap = funMap;
							}
							
							if(ruleFunMap != null){
								AbstractQueryRouter.this.ruleFunctionMap = ruleFunMap;
							}
							
							if(tableRuleMap != null){
								AbstractQueryRouter.this.tableRuleMap = tableRuleMap;
							}
							
						}catch(ConfigurationException e){
							
						}finally{
							if(funFile != null && funFile.exists()){
								lastFunFileModified = funFile.lastModified();
							}
							if(ruleFunctionFile != null && ruleFunctionFile.exists()){
								lastRuleFunctionFileModified = ruleFunctionFile.lastModified();
							}
							if(ruleFile != null && ruleFile.exists()){
								lastRuleModified = ruleFile.lastModified();
							}
						}
					} catch (InterruptedException e) {
					}
				}
			}
		}
		
		if(needParse){
			boolean configNeedCheck = false;
			
			
			if(AbstractQueryRouter.this.functionConfig != null){
				this.functionMap = loadFunctionMap(AbstractQueryRouter.this.functionConfig);
				configNeedCheck = true;
			}else{
				needEvaluate = false;
			}
			
			if(AbstractQueryRouter.this.ruleFunctionConfig != null){
				AbstractQueryRouter.this.ruleFunctionMap = loadRuleFunctionMap(AbstractQueryRouter.this.ruleFunctionConfig);
				configNeedCheck =true;
			}
			
			if(AbstractQueryRouter.this.ruleConfig != null){
				this.tableRuleMap = loadConfig(this.ruleConfig);
				configNeedCheck =true;
			}else{
				needEvaluate = false;
			}
			
			if(configNeedCheck){
				new ConfigCheckTread().start();
			}
		}
	}
	
	public static Map<String,Function> loadFunctionMap(String configFileName){
		FunctionLoader<String,Function> loader = new FunctionLoader<String,Function>(){

			@Override
			public void initBeanObject(BeanObjectEntityConfig config,
					Function bean) {
				bean.setName(config.getName());
			}

			@Override
			public void putToMap(Map<String, Function> map, Function value) {
				map.put(value.getName(), value);
			}
			
		};
		
		loader.setDTD("/com/meidusa/amoeba/xml/function.dtd");
		loader.setDTDSystemID("function.dtd");
		return loader.loadFunctionMap(configFileName);
	}
	
	public static Map<String,PostfixCommand> loadRuleFunctionMap(String configFileName){
		FunctionLoader<String,PostfixCommand> loader = new FunctionLoader<String,PostfixCommand>(){
			@Override
			public void initBeanObject(BeanObjectEntityConfig config,
					PostfixCommand bean) {
				bean.setName(config.getName());
			}

			@Override
			public void putToMap(Map<String, PostfixCommand> map, PostfixCommand value) {
				map.put(value.getName(), value);
			}
			
		};
		
		loader.setDTD("/com/meidusa/amoeba/xml/function.dtd");
		loader.setDTDSystemID("function.dtd");
		
		Map<String,PostfixCommand> tempRuleFunMap = new HashMap<String,PostfixCommand>();
		Map<String,PostfixCommand> defindMap = loader.loadFunctionMap(configFileName);
		tempRuleFunMap.putAll(ruleFunTab);
		tempRuleFunMap.putAll(defindMap);
		return tempRuleFunMap;
	}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?