loadbalancer.php

来自「php 开发的内容管理系统」· PHP 代码 · 共 667 行 · 第 1/2 页

PHP
667
字号
<?php/** * * @package MediaWiki *//** * Depends on the database object */require_once( 'Database.php' );# Valid database indexes# Operation-based indexesdefine( 'DB_SLAVE', -1 );     # Read from the slave (or only server)define( 'DB_MASTER', -2 );    # Write to master (or only server)define( 'DB_LAST', -3 );     # Whatever database was used last# Obsolete aliasesdefine( 'DB_READ', -1 );define( 'DB_WRITE', -2 );# Scale polling time so that under overload conditions, the database server# receives a SHOW STATUS query at an average interval of this many microsecondsdefine( 'AVG_STATUS_POLL', 2000 );/** * Database load balancing object * * @todo document * @package MediaWiki */class LoadBalancer {	/* private */ var $mServers, $mConnections, $mLoads, $mGroupLoads;	/* private */ var $mFailFunction, $mErrorConnection;	/* private */ var $mForce, $mReadIndex, $mLastIndex, $mAllowLagged;	/* private */ var $mWaitForFile, $mWaitForPos, $mWaitTimeout;	/* private */ var $mLaggedSlaveMode, $mLastError = 'Unknown error';	function LoadBalancer()	{		$this->mServers = array();		$this->mConnections = array();		$this->mFailFunction = false;		$this->mReadIndex = -1;		$this->mForce = -1;		$this->mLastIndex = -1;		$this->mErrorConnection = false;		$this->mAllowLag = false;	}	function newFromParams( $servers, $failFunction = false, $waitTimeout = 10 )	{		$lb = new LoadBalancer;		$lb->initialise( $servers, $failFunction, $waitTimeout );		return $lb;	}	function initialise( $servers, $failFunction = false, $waitTimeout = 10 )	{		$this->mServers = $servers;		$this->mFailFunction = $failFunction;		$this->mReadIndex = -1;		$this->mWriteIndex = -1;		$this->mForce = -1;		$this->mConnections = array();		$this->mLastIndex = 1;		$this->mLoads = array();		$this->mWaitForFile = false;		$this->mWaitForPos = false;		$this->mWaitTimeout = $waitTimeout;		$this->mLaggedSlaveMode = false;		foreach( $servers as $i => $server ) {			$this->mLoads[$i] = $server['load'];			if ( isset( $server['groupLoads'] ) ) {				foreach ( $server['groupLoads'] as $group => $ratio ) {					if ( !isset( $this->mGroupLoads[$group] ) ) {						$this->mGroupLoads[$group] = array();					}					$this->mGroupLoads[$group][$i] = $ratio;				}			}		}	}	/**	 * Given an array of non-normalised probabilities, this function will select	 * an element and return the appropriate key	 */	function pickRandom( $weights )	{		if ( !is_array( $weights ) || count( $weights ) == 0 ) {			return false;		}		$sum = array_sum( $weights );		if ( $sum == 0 ) {			# No loads on any of them			# In previous versions, this triggered an unweighted random selection,			# but this feature has been removed as of April 2006 to allow for strict 			# separation of query groups. 			return false;		}		$max = mt_getrandmax();		$rand = mt_rand(0, $max) / $max * $sum;		$sum = 0;		foreach ( $weights as $i => $w ) {			$sum += $w;			if ( $sum >= $rand ) {				break;			}		}		return $i;	}	function getRandomNonLagged( $loads ) {		# Unset excessively lagged servers		$lags = $this->getLagTimes();		foreach ( $lags as $i => $lag ) {			if ( isset( $this->mServers[$i]['max lag'] ) && $lag > $this->mServers[$i]['max lag'] ) {				unset( $loads[$i] );			}		}		# Find out if all the slaves with non-zero load are lagged		$sum = 0;		foreach ( $loads as $load ) {			$sum += $load;		}		if ( $sum == 0 ) {			# No appropriate DB servers except maybe the master and some slaves with zero load			# Do NOT use the master			# Instead, this function will return false, triggering read-only mode,			# and a lagged slave will be used instead.			return false;		}		if ( count( $loads ) == 0 ) {			return false;		}		#wfDebugLog( 'connect', var_export( $loads, true ) );		# Return a random representative of the remainder		return $this->pickRandom( $loads );	}	/**	 * Get the index of the reader connection, which may be a slave	 * This takes into account load ratios and lag times. It should	 * always return a consistent index during a given invocation	 *	 * Side effect: opens connections to databases	 */	function getReaderIndex() {		global $wgReadOnly, $wgDBClusterTimeout;		$fname = 'LoadBalancer::getReaderIndex';		wfProfileIn( $fname );		$i = false;		if ( $this->mForce >= 0 ) {			$i = $this->mForce;		} else {			if ( $this->mReadIndex >= 0 ) {				$i = $this->mReadIndex;			} else {				# $loads is $this->mLoads except with elements knocked out if they				# don't work				$loads = $this->mLoads;				$done = false;				$totalElapsed = 0;				do {					if ( $wgReadOnly or $this->mAllowLagged ) {						$i = $this->pickRandom( $loads );					} else {						$i = $this->getRandomNonLagged( $loads );						if ( $i === false && count( $loads ) != 0 )  {							# All slaves lagged. Switch to read-only mode							$wgReadOnly = wfMsgNoDB( 'readonly_lag' );							$i = $this->pickRandom( $loads );						}					}					$serverIndex = $i;					if ( $i !== false ) {						wfDebugLog( 'connect', "$fname: Using reader #$i: {$this->mServers[$i]['host']}...\n" );						$this->openConnection( $i );						if ( !$this->isOpen( $i ) ) {							wfDebug( "$fname: Failed\n" );							unset( $loads[$i] );							$sleepTime = 0;						} else {							$status = $this->mConnections[$i]->getStatus("Thread%");							if ( isset( $this->mServers[$i]['max threads'] ) &&							  $status['Threads_running'] > $this->mServers[$i]['max threads'] )							{								# Too much load, back off and wait for a while.								# The sleep time is scaled by the number of threads connected,								# to produce a roughly constant global poll rate.								$sleepTime = AVG_STATUS_POLL * $status['Threads_connected'];								# If we reach the timeout and exit the loop, don't use it								$i = false;							} else {								$done = true;								$sleepTime = 0;							}						}					} else {						$sleepTime = 500000;					}					if ( $sleepTime ) {							$totalElapsed += $sleepTime;							$x = "{$this->mServers[$serverIndex]['host']} [$serverIndex]";							wfProfileIn( "$fname-sleep $x" );							usleep( $sleepTime );							wfProfileOut( "$fname-sleep $x" );					}				} while ( count( $loads ) && !$done && $totalElapsed / 1e6 < $wgDBClusterTimeout );				if ( $totalElapsed / 1e6 >= $wgDBClusterTimeout ) {					$this->mErrorConnection = false;					$this->mLastError = 'All servers busy';				}				if ( $i !== false && $this->isOpen( $i ) ) {					# Wait for the session master pos for a short time					if ( $this->mWaitForFile ) {						if ( !$this->doWait( $i ) ) {							$this->mServers[$i]['slave pos'] = $this->mConnections[$i]->getSlavePos();						}					}					if ( $i !== false ) {						$this->mReadIndex = $i;					}				} else {					$i = false;				}			}		}		wfProfileOut( $fname );		return $i;	}	/**	 * Get a random server to use in a query group	 */	function getGroupIndex( $group ) {		if ( isset( $this->mGroupLoads[$group] ) ) {			$i = $this->pickRandom( $this->mGroupLoads[$group] );		} else {			$i = false;		}		wfDebug( "Query group $group => $i\n" );		return $i;	}	/**	 * Set the master wait position	 * If a DB_SLAVE connection has been opened already, waits	 * Otherwise sets a variable telling it to wait if such a connection is opened	 */	function waitFor( $file, $pos ) {		$fname = 'LoadBalancer::waitFor';		wfProfileIn( $fname );		wfDebug( "User master pos: $file $pos\n" );		$this->mWaitForFile = false;		$this->mWaitForPos = false;		if ( count( $this->mServers ) > 1 ) {			$this->mWaitForFile = $file;			$this->mWaitForPos = $pos;			$i = $this->mReadIndex;			if ( $i > 0 ) {				if ( !$this->doWait( $i ) ) {					$this->mServers[$i]['slave pos'] = $this->mConnections[$i]->getSlavePos();					$this->mLaggedSlaveMode = true;				}			}		}		wfProfileOut( $fname );	}	/**	 * Wait for a given slave to catch up to the master pos stored in $this	 */	function doWait( $index ) {		global $wgMemc;		$retVal = false;		# Debugging hacks		if ( isset( $this->mServers[$index]['lagged slave'] ) ) {			return false;		} elseif ( isset( $this->mServers[$index]['fake slave'] ) ) {			return true;		}		$key = 'masterpos:' . $index;		$memcPos = $wgMemc->get( $key );		if ( $memcPos ) {			list( $file, $pos ) = explode( ' ', $memcPos );			# If the saved position is later than the requested position, return now			if ( $file == $this->mWaitForFile && $this->mWaitForPos <= $pos ) {				$retVal = true;			}		}		if ( !$retVal && $this->isOpen( $index ) ) {			$conn =& $this->mConnections[$index];			wfDebug( "Waiting for slave #$index to catch up...\n" );			$result = $conn->masterPosWait( $this->mWaitForFile, $this->mWaitForPos, $this->mWaitTimeout );			if ( $result == -1 || is_null( $result ) ) {				# Timed out waiting for slave, use master instead				wfDebug( "Timed out waiting for slave #$index pos {$this->mWaitForFile} {$this->mWaitForPos}\n" );				$retVal = false;			} else {				$retVal = true;				wfDebug( "Done\n" );			}		}		return $retVal;	}	/**	 * Get a connection by index	 */

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?