📄 webagent.pyc_dis
字号:
def cnAddConsumer(self, aoConsumer):
"""
添加一个consumer到等待队列中
根据请求类型的不同选择不同的方式连接:
* 无论是resize请求还是IM的请求,都要判断是否需要经squid,通过squid_filter方式判断
* 对于resize请求的https即使是经squid也不保存到连接池中,因为哪些连接是专用而不是通用的
* 对于resize请求的http请求,如果是经squid,则需要使用连接池中的连接
"""
if isinstance(aoConsumer, control.ResizeRequest):
if (aoConsumer.csGetScheme() == "https"):
if self.cbUseProxy(aoConsumer.csGetHost()):
loProt = clsMyHttpsGetter(aoConsumer)
sslproxy.connectSSL(reactor, config.squid_host, config.squid_port, sslproxy.SSLProxyFactory(aoConsumer.csGetHost(), aoConsumer.ciGetPort(), loProt), ssl.ClientContextFactory())
else:
loFactory = clsSSLAgent(aoConsumer)
reactor.connectSSL(aoConsumer.csGetHost(), aoConsumer.ciGetPort(), loFactory, ssl.ClientContextFactory())
elif self.cbUseProxy(aoConsumer.csGetHost()):
self.clConsumer.append(aoConsumer)
else:
loFactory = clsSSLAgent(aoConsumer)
reactor.connectTCP(aoConsumer.csGetHost(), aoConsumer.ciGetPort(), loFactory)
return
else:
self.clConsumer.append(aoConsumer)
def coGetFreeWebClient(self):
"""
返回一个可用的空闲的webclient,如果没有则尝试创建一个新的,如果到达最高限制,则返回空
"""
if ((len(self.clFreeWebClient) == 0) and (self.ciCurrentClientCount < self.ciMaxWebClient)):
log.debug(("current connection(%d)<maxconnection(%d),create one more connection toweb" % (self.ciCurrentClientCount,
self.ciMaxWebClient)))
reactor.connectTCP(self.csHost, self.ciPort, self)
self.ciCurrentClientCount += 1
if (len(self.clFreeWebClient) > 0):
log.debug(("one connection is borrow, %d is free" % (len(self.clFreeWebClient) - 1)))
return self.clFreeWebClient.pop(0)
def cnAddFreeWebClient(self, aoClient):
"""
由MyHttpPageGetter调用,当连接成功时
"""
self.clFreeWebClient.append(aoClient)
log.debug(("one web connection is add to pool(%d/%d)" % (len(self.clFreeWebClient),
self.ciCurrentClientCount)))
self.resumeProducing()
def cnReturnFreeWebClient(self, aoClient, aiReqID = None):
"""
由MyHttpPageGetter调用,当完成一次请求
"""
self.clFreeWebClient.append(aoClient)
log.debug(("connection return (%d/%d)" % (len(self.clFreeWebClient),
self.ciCurrentClientCount)))
try:
del self.cdRunningProcess[aiReqID]
except:
pass
self.resumeProducing()
def cnRemoveWebClient(self, aoClient = None, aiReqID = None):
"""
由MyHttpPageGetter调用,当发生错误时
"""
self.ciCurrentClientCount -= 1
if (aoClient is not None):
try:
liIdx = self.clFreeWebClient.index(aoClient)
self.clFreeWebClient.pop(liIdx)
except:
pass
try:
del self.cdRunningProcess[aiReqID]
except:
pass
log.debug(("one web connection is removed, current pool(%d/%d)" % (len(self.clFreeWebClient),
self.ciCurrentClientCount)))
self.resumeProducing()
def resumeProducing(self):
"""
处理下载任务
"""
liConsumerIdx = self.ciGetConsumer()
if (liConsumerIdx == -1):
return
loWebClient = self.coGetFreeWebClient()
if (loWebClient is None):
return
loConsumer = self.clConsumer.pop(liConsumerIdx)
loWebClient.cnAddConsumer(loConsumer)
self.cdRunningProcess[loConsumer.ciID] = loWebClient
loWebClient.cnProcess()
def buildProtocol(self, addr):
loProtocol = self.protocol()
loProtocol.factory = self
return loProtocol
def stopProducing(self, aiReqID):
"""
停止一个请求ID的处理,断开连接
"""
try:
for consumer in self.clConsumer:
if (consumer.ciID == aiReqID):
self.clConsumer.remove(consumer)
return
self.cdRunningProcess[aiReqID].transport.loseConnection()
except:
pass
def clientConnectionFailed(self, connector, reason):
log.debug(("connection to proxy failed,reason:%s" % reason))
self.cnRemoveWebClient()
def ciGetConsumer(self):
"""
返回一个需要工作的请求
"""
if (len(self.clConsumer) > 0):
return 0
else:
return -1
WebAgent = clsWebAgent(config.web_client_max, config.squid_host, config.squid_port)
class clsSSLAgent(twisted.web.client.HTTPClientFactory):
"""
用于进行https连接处理的agent
"""
def __init__(self, aoConsumer):
self.coConsumer = aoConsumer
self.waiting = 0
self.cookies = {}
def buildProtocol(self, addr):
loProtocol = clsMyHttpsGetter(self.coConsumer)
loProtocol.factory = self
return loProtocol
def clientConnectionFailed(self, connector, reason):
self.coConsumer.write("")
class clsMyHttpsGetter(twisted.web.client.HTTPPageGetter):
def __init__(self, aoRequest):
self.coRequest = aoRequest
self.cbSuccessGet = False
self.followRedirect = False
self.csURL = self.coRequest.csGetFullURL()
def connectionMade(self):
"""
连接上就发送请求,这个连接是不保持的
"""
if (self.coRequest is None):
return
lsMethod = "GET"
self.sendCommand(lsMethod, self.coRequest.csGetPath())
self.sendHeader("Host", self.coRequest.csGetHost())
self.sendHeader("User-Agent", self.coRequest.csUserAgent)
if (len(self.coRequest.csReferer) > 0):
self.sendHeader("referer", self.coRequest.csReferer)
if (len(self.coRequest.clCookies) > 0):
self.sendHeader("Cookie", "; ".join(self.coRequest.clCookies))
lsPostData = self.coRequest.csGetPostData()
if (len(lsPostData) >= 0):
self.sendHeader("Content-Length", str(len(lsPostData)))
self.endHeaders()
self.headers = {}
if (lsPostData is not None):
self.transport.write(lsPostData)
def connectionLost(self, reason):
log.warn(("connection for (%s) is lost" % self.csURL))
self.handleResponseEnd()
def handleResponse(self, response):
"""
整个应答完成的回调函数
"""
log.debug(("%s %d data read" % (self.csURL,
len(response))))
if self.cbSuccessGet:
self.coRequest.cnSetWebRspHeader(self.headers)
self.coRequest.write(response)
self.transport.loseConnection()
self.coRequest = None
def handleStatusDefault(self):
mdWebRspStat[int(self.status)] = (mdWebRspStat.get(int(self.status), 0) + 1)
log.warn(("unhandle status %s for %s" % (self.status,
self.csURL)))
self.cbSuccessGet = False
def handleStatus_404(self):
mdWebRspStat[404] += 1
log.warn(("404 for %s" % self.csURL))
self.cbSuccessGet = False
def handleStatus_200(self):
mdWebRspStat[200] += 1
log.debug(("200 for task(%d),size(%s)" % (self.coRequest.ciID,
self.headers.get("content-length", ('not specified',))[0])))
self.cbSuccessGet = True
def connectionFailed(self):
self.coRequest.write("")
# local variables:
# tab-width: 4
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -