📄 resizeagent.pyc_dis
字号:
#! /usr/bin/env python
# emacs-mode: -*- python-*-
import logging
import time
import twisted.internet.interfaces
import twisted.web.client
from zope.interface import implements
from twisted.internet import reactor
import config
import const
log = logging.getLogger(const.CONST_APP_LOGGER)
class clsMyHttpPageGetter(twisted.web.client.HTTPPageGetter):
def __init__(self):
self.coRequest = None
self.cbSuccessGet = True
self.csURL = config.resize_get_url
self.csResizeHost = ("%s:%d" % (config.resize_host,
config.resize_port))
self.ciReqID = None
self.ciLastReqSendTime = None
self.ciTimeout = 15
def sendCommand(self, command, path):
self.transport.write(("%s %s HTTP/1.1\r\n" % (command,
path)))
def connectionMade(self):
"""
把自己增加到factory的FreeList中
"""
self.factory.cnAddFreeResizeClient(self)
def cnAddConsumer(self, aoConsumer):
"""
添加请求的发起及处理实体
"""
if (self.coRequest is not None):
log.error("duplication consumer,why?")
self.coRequest = aoConsumer
self.ciReqID = self.coRequest.ciID
def cnProcess(self):
"""
发送一个请求到resize
"""
if (self.coRequest is None):
return
self.ciLastReqSendTime = time.time()
loImage = self.coRequest.coImage
lsMethod = "GET"
lsURL = (config.resize_get_url + self.coRequest.csResizeQuery)
lsPostData = ""
lsContentType = ""
if True:
lsMethod = "POST"
lsURL = config.resize_post_url
llPostField = [("size",
loImage.csRSize),
("color",
loImage.csColor)]
lsDestType = self.coRequest.cdArgs["type"]
llPostField.append(("type",
lsDestType))
if (lsDestType == "jpg"):
llPostField.append(("quality",
self.coRequest.cdArgs["quality"]))
llPostFile = (("file",
"inimage",
loImage.csContent))
(lsContentType, lsPostData,) = self.encode_multipart_formdata(llPostField, llPostFile)
self.sendCommand(lsMethod, lsURL)
self.sendHeader("User-Agent", "precache")
self.sendHeader("host", self.csResizeHost)
self.sendHeader("connection", "keep-alive")
if (len(lsPostData) > 0):
self.sendHeader("content-type", lsContentType)
self.sendHeader("content-length", str(len(lsPostData)))
self.endHeaders()
self.headers = {}
if (len(lsPostData) > 0):
self.transport.write(lsPostData)
def encode_multipart_formdata(self, fields, files):
"""
fields is a sequence of (name, value) elements for regular form fields.
files is a sequence of (name, filename, value) elements for data to be uploaded as files
Return (content_type, body) ready for httplib.HTTP instance
"""
BOUNDARY = "----------ThIs_Is_tHe_bouNdaRY_$"
CRLF = "\r\n"
L = []
for (key, value,) in fields:
L.append(("--" + BOUNDARY))
L.append(('Content-Disposition: form-data; name="%s"' % key))
L.append("")
L.append(value)
for (key, filename, value,) in files:
L.append(("--" + BOUNDARY))
L.append(('Content-Disposition: form-data; name="%s"; filename="%s"' % (key,
filename)))
L.append("Content-Type: image/yes")
L.append("")
L.append(value)
L.append((("--" + BOUNDARY) + "--"))
L.append("")
body = CRLF.join(L)
content_type = ("multipart/form-data; boundary=%s" % BOUNDARY)
return (content_type,
body)
def connectionLost(self, reason):
log.warn("resize connection is lost")
self.handleResponseEnd()
self.factory.cnRemoveResizeClient(self, self.ciReqID)
self.transport.loseConnection()
def handleResponse(self, response):
"""
整个应答完成的回调函数
"""
if (self.coRequest is None):
return
if self.cbSuccessGet:
self.coRequest.cnSetResizeRspHeader(self.headers)
log.debug(("task(%d) %d data read" % (self.coRequest.ciID,
len(response))))
self.coRequest.write(response)
self.coRequest = None
self.firstLine = 1
self._HTTPClient__buffer = ""
self.headers = {}
self.factory.cnReturnFreeResizeClient(self, self.ciReqID)
def setRawMode(self):
"""
HttpClient的实现中假设content-length不为0 ,这样对于resize
返回的空的body无法处理(挂起)
"""
if ((self.length is not None) and (self.length == 0)):
self.handleResponseEnd()
else:
twisted.web.client.HTTPPageGetter.setRawMode(self)
def handleStatusDefault(self):
log.warn(("unhandle status %s" % self.status))
self.cbSuccessGet = False
def handleStatus_404(self):
log.warn(("404 for %s" % self.coRequest.csGetFullURL()))
self.cbSuccessGet = False
def handleStatus_200(self):
try:
log.debug(("200 for task(%d),size(%s)" % (self.coRequest.ciID,
self.headers.get("content-length", ('not specified',))[0])))
except:
log.debug("exception")
if (self.headers.get("content-length", ('1',))[0] == "0"):
self.transport.loseConnection()
self.cbSuccessGet = True
def handleIfTimeout(self):
if ((time.time() - self.ciLastReqSendTime) > self.ciTimeout):
log.warn(("resize timeout for %s" % self.coRequest.csGetFullURL()))
log.warn(("losing connection for task id(%d)" % self.coRequest.ciID))
self.transport.loseConnection()
class clsResizeAgent(twisted.web.client.HTTPClientFactory):
implements(twisted.internet.interfaces.IProducer)
protocol = clsMyHttpPageGetter
def __init__(self):
self.clConsumer = []
self.clFreeResizeClient = []
self.ciMaxResizeClient = config.resize_client_max
self.ciCurrentClientCount = 0
self.cdRunningProcess = {}
self.ciDelayBegin = 0
self.ciDelayTime = 10
self.ciDelayReqCount = 0
def cnAddConsumer(self, aoConsumer):
"""
添加一个consumer到等待队列中
"""
self.clConsumer.append(aoConsumer)
def coGetFreeResizeClient(self):
"""
返回一个可用的空闲的resize client,如果没有则尝试创建一个新的,如果到达最高限制,则返回空
"""
if ((len(self.clFreeResizeClient) == 0) and (self.ciCurrentClientCount < self.ciMaxResizeClient)):
log.debug(("current connection(%d)<maxconnection(%d),create one more connection to resize" % (self.ciCurrentClientCount,
self.ciMaxResizeClient)))
reactor.connectTCP(config.resize_host, config.resize_port, self)
self.ciCurrentClientCount += 1
return
if (len(self.clFreeResizeClient) > 0):
log.debug(("one resize connection is borrow, %d in free list" % (len(self.clFreeResizeClient) - 1)))
return self.clFreeResizeClient.pop(0)
def cnAddFreeResizeClient(self, aoClient):
"""
由MyHttpPageGetter调用,当连接成功时
"""
self.clFreeResizeClient.append(aoClient)
log.debug(("one resize connection is add to pool, current pool(%d/%d)" % (len(self.clFreeResizeClient),
self.ciCurrentClientCount)))
self.resumeProducing()
def cnReturnFreeResizeClient(self, aoClient, aiReqID = None):
"""
由MyHttpPageGetter调用,当请求处理完毕后
"""
self.clFreeResizeClient.append(aoClient)
log.debug(("connection return(%d/%d)" % (len(self.clFreeResizeClient),
self.ciCurrentClientCount)))
try:
del self.cdRunningProcess[aiReqID]
except:
pass
self.resumeProducing()
def cnRemoveResizeClient(self, aoClient = None, aiReqID = None):
"""
由MyHttpPageGetter调用,当发生错误时
"""
self.ciCurrentClientCount -= 1
if (aoClient is not None):
try:
liIdx = self.clFreeResizeClient.index(aoClient)
self.clFreeResizeClient.pop(liIdx)
except:
pass
try:
del self.cdRunningProcess[aiReqID]
except:
pass
log.debug(("one client connection is removed, current pool(%d/%d)" % (len(self.clFreeResizeClient),
self.ciCurrentClientCount)))
self.resumeProducing()
def resumeProducing(self):
"""
处理下载任务
"""
liConsumerIdx = self.ciGetConsumer()
if (liConsumerIdx == -1):
return
if (self.ciDelayBegin > 0):
if ((time.time() - self.ciDelayBegin) > self.ciDelayTime):
self.ciDelayBegin = 0
self.ciDelayReqCount = 0
else:
log.warn("temporary paused,request will be handle in 1 second later")
if (self.ciDelayReqCount >= self.ciMaxResizeClient):
log.warn("too much request paused, skip one")
return
self.ciDelayReqCount += 1
reactor.callLater(self.ciDelayTime, self.resumeProducing)
return
loResizeClient = self.coGetFreeResizeClient()
if (loResizeClient is None):
return
loConsumer = self.clConsumer.pop(liConsumerIdx)
loResizeClient.cnAddConsumer(loConsumer)
self.cdRunningProcess[loConsumer.ciID] = loResizeClient
loResizeClient.cnProcess()
def ciGetConsumer(self):
"""
读取新的任务进行处理
获取任务的优先级别是
1.源图已经有cache的,说明很可能resize曾经做过转换,返回将最快
2.无源图cache
3.超时的
"""
liRet = -1
liFirstNoCache = -1
llLoc = []
if (len(self.clConsumer) != 0):
liRet = 0
for i in xrange(0, (len(self.clConsumer) - 1)):
if (not self.clConsumer[i].cbTimeout):
if self.clConsumer[i].coImage.csCacheLoc:
return i
elif (liFirstNoCache == -1):
liFirstNoCache = i
liRet = i
return liRet
def clientConnectionFailed(self, connector, reason):
log.warn("unable to connect to server")
self.ciDelayBegin = time.time()
self.cnRemoveResizeClient()
def buildProtocol(self, addr):
loProtocol = self.protocol()
loProtocol.factory = self
return loProtocol
def stopProducing(self, aiReqID):
"""
停止一个请求,并断开连接
"""
try:
for consumer in self.clConsumer:
if (consumer.ciID == aiReqID):
self.clConsumer.remove(consumer)
return
self.cdRunningProcess[aiReqID].transport.loseConnection()
except:
pass
def checkTimeout(self):
"""
对正在运行的resize请求检查是否超时
"""
for process in self.cdRunningProcess.values():
process.handleIfTimeout()
ResizeAgent = clsResizeAgent()
# local variables:
# tab-width: 4
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -