⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 resizeagent.pyc_dis

📁 Cache服务器模块,可以完成图片的缓存处理功能. 很好用的.
💻 PYC_DIS
字号:
#! /usr/bin/env python
# emacs-mode: -*- python-*-

import logging
import time
import twisted.internet.interfaces
import twisted.web.client
from zope.interface import implements
from twisted.internet import reactor
import config
import const
log = logging.getLogger(const.CONST_APP_LOGGER)
class clsMyHttpPageGetter(twisted.web.client.HTTPPageGetter):

    def __init__(self):
        self.coRequest = None
        self.cbSuccessGet = True
        self.csURL = config.resize_get_url
        self.csResizeHost = ("%s:%d" % (config.resize_host,
         config.resize_port))
        self.ciReqID = None
        self.ciLastReqSendTime = None
        self.ciTimeout = 15



    def sendCommand(self, command, path):
        self.transport.write(("%s %s HTTP/1.1\r\n" % (command,
         path)))



    def connectionMade(self):
        """
    把自己增加到factory的FreeList中
    """
        self.factory.cnAddFreeResizeClient(self)



    def cnAddConsumer(self, aoConsumer):
        """
    添加请求的发起及处理实体
    """
        if (self.coRequest is not None):
            log.error("duplication consumer,why?")
        self.coRequest = aoConsumer
        self.ciReqID = self.coRequest.ciID



    def cnProcess(self):
        """
    发送一个请求到resize
    """
        if (self.coRequest is None):
            return
        self.ciLastReqSendTime = time.time()
        loImage = self.coRequest.coImage
        lsMethod = "GET"
        lsURL = (config.resize_get_url + self.coRequest.csResizeQuery)
        lsPostData = ""
        lsContentType = ""
        if True:
            lsMethod = "POST"
            lsURL = config.resize_post_url
            llPostField = [("size",
              loImage.csRSize),
             ("color",
              loImage.csColor)]
            lsDestType = self.coRequest.cdArgs["type"]
            llPostField.append(("type",
             lsDestType))
            if (lsDestType == "jpg"):
                llPostField.append(("quality",
                 self.coRequest.cdArgs["quality"]))
            llPostFile = (("file",
              "inimage",
              loImage.csContent))
            (lsContentType, lsPostData,) = self.encode_multipart_formdata(llPostField, llPostFile)
        self.sendCommand(lsMethod, lsURL)
        self.sendHeader("User-Agent", "precache")
        self.sendHeader("host", self.csResizeHost)
        self.sendHeader("connection", "keep-alive")
        if (len(lsPostData) > 0):
            self.sendHeader("content-type", lsContentType)
            self.sendHeader("content-length", str(len(lsPostData)))
        self.endHeaders()
        self.headers = {}
        if (len(lsPostData) > 0):
            self.transport.write(lsPostData)



    def encode_multipart_formdata(self, fields, files):
        """
    fields is a sequence of (name, value) elements for regular form fields.
    files is a sequence of (name, filename, value) elements for data to be uploaded as files
    Return (content_type, body) ready for httplib.HTTP instance
    """
        BOUNDARY = "----------ThIs_Is_tHe_bouNdaRY_$"
        CRLF = "\r\n"
        L = []
        for (key, value,) in fields:
            L.append(("--" + BOUNDARY))
            L.append(('Content-Disposition: form-data; name="%s"' % key))
            L.append("")
            L.append(value)

        for (key, filename, value,) in files:
            L.append(("--" + BOUNDARY))
            L.append(('Content-Disposition: form-data; name="%s"; filename="%s"' % (key,
             filename)))
            L.append("Content-Type: image/yes")
            L.append("")
            L.append(value)

        L.append((("--" + BOUNDARY) + "--"))
        L.append("")
        body = CRLF.join(L)
        content_type = ("multipart/form-data; boundary=%s" % BOUNDARY)
        return (content_type,
         body)



    def connectionLost(self, reason):
        log.warn("resize connection is lost")
        self.handleResponseEnd()
        self.factory.cnRemoveResizeClient(self, self.ciReqID)
        self.transport.loseConnection()



    def handleResponse(self, response):
        """
    整个应答完成的回调函数
    """
        if (self.coRequest is None):
            return
        if self.cbSuccessGet:
            self.coRequest.cnSetResizeRspHeader(self.headers)
        log.debug(("task(%d) %d data read" % (self.coRequest.ciID,
         len(response))))
        self.coRequest.write(response)
        self.coRequest = None
        self.firstLine = 1
        self._HTTPClient__buffer = ""
        self.headers = {}
        self.factory.cnReturnFreeResizeClient(self, self.ciReqID)



    def setRawMode(self):
        """
    HttpClient的实现中假设content-length不为0 ,这样对于resize
    返回的空的body无法处理(挂起)
    """
        if ((self.length is not None) and (self.length == 0)):
            self.handleResponseEnd()
        else:
            twisted.web.client.HTTPPageGetter.setRawMode(self)



    def handleStatusDefault(self):
        log.warn(("unhandle status %s" % self.status))
        self.cbSuccessGet = False



    def handleStatus_404(self):
        log.warn(("404 for %s" % self.coRequest.csGetFullURL()))
        self.cbSuccessGet = False



    def handleStatus_200(self):
        try:
            log.debug(("200 for task(%d),size(%s)" % (self.coRequest.ciID,
             self.headers.get("content-length", ('not specified',))[0])))
        except:
            log.debug("exception")
        if (self.headers.get("content-length", ('1',))[0] == "0"):
            self.transport.loseConnection()
        self.cbSuccessGet = True



    def handleIfTimeout(self):
        if ((time.time() - self.ciLastReqSendTime) > self.ciTimeout):
            log.warn(("resize timeout for %s" % self.coRequest.csGetFullURL()))
            log.warn(("losing connection for task id(%d)" % self.coRequest.ciID))
            self.transport.loseConnection()



class clsResizeAgent(twisted.web.client.HTTPClientFactory):
    implements(twisted.internet.interfaces.IProducer)
    protocol = clsMyHttpPageGetter

    def __init__(self):
        self.clConsumer = []
        self.clFreeResizeClient = []
        self.ciMaxResizeClient = config.resize_client_max
        self.ciCurrentClientCount = 0
        self.cdRunningProcess = {}
        self.ciDelayBegin = 0
        self.ciDelayTime = 10
        self.ciDelayReqCount = 0



    def cnAddConsumer(self, aoConsumer):
        """
    添加一个consumer到等待队列中
    """
        self.clConsumer.append(aoConsumer)



    def coGetFreeResizeClient(self):
        """
    返回一个可用的空闲的resize client,如果没有则尝试创建一个新的,如果到达最高限制,则返回空
    """
        if ((len(self.clFreeResizeClient) == 0) and (self.ciCurrentClientCount < self.ciMaxResizeClient)):
            log.debug(("current connection(%d)<maxconnection(%d),create one more connection to resize" % (self.ciCurrentClientCount,
             self.ciMaxResizeClient)))
            reactor.connectTCP(config.resize_host, config.resize_port, self)
            self.ciCurrentClientCount += 1
            return
        if (len(self.clFreeResizeClient) > 0):
            log.debug(("one resize connection is borrow, %d in free list" % (len(self.clFreeResizeClient) - 1)))
            return self.clFreeResizeClient.pop(0)



    def cnAddFreeResizeClient(self, aoClient):
        """
    由MyHttpPageGetter调用,当连接成功时
    """
        self.clFreeResizeClient.append(aoClient)
        log.debug(("one resize connection is add to pool, current pool(%d/%d)" % (len(self.clFreeResizeClient),
         self.ciCurrentClientCount)))
        self.resumeProducing()



    def cnReturnFreeResizeClient(self, aoClient, aiReqID = None):
        """
    由MyHttpPageGetter调用,当请求处理完毕后
    """
        self.clFreeResizeClient.append(aoClient)
        log.debug(("connection return(%d/%d)" % (len(self.clFreeResizeClient),
         self.ciCurrentClientCount)))
        try:
            del self.cdRunningProcess[aiReqID]
        except:
            pass
        self.resumeProducing()



    def cnRemoveResizeClient(self, aoClient = None, aiReqID = None):
        """
    由MyHttpPageGetter调用,当发生错误时
    """
        self.ciCurrentClientCount -= 1
        if (aoClient is not None):
            try:
                liIdx = self.clFreeResizeClient.index(aoClient)
                self.clFreeResizeClient.pop(liIdx)
            except:
                pass
            try:
                del self.cdRunningProcess[aiReqID]
            except:
                pass
        log.debug(("one client connection is removed, current pool(%d/%d)" % (len(self.clFreeResizeClient),
         self.ciCurrentClientCount)))
        self.resumeProducing()



    def resumeProducing(self):
        """
    处理下载任务
    """
        liConsumerIdx = self.ciGetConsumer()
        if (liConsumerIdx == -1):
            return
        if (self.ciDelayBegin > 0):
            if ((time.time() - self.ciDelayBegin) > self.ciDelayTime):
                self.ciDelayBegin = 0
                self.ciDelayReqCount = 0
            else:
                log.warn("temporary paused,request will be handle in 1 second later")
                if (self.ciDelayReqCount >= self.ciMaxResizeClient):
                    log.warn("too much request paused, skip one")
                    return
                self.ciDelayReqCount += 1
                reactor.callLater(self.ciDelayTime, self.resumeProducing)
                return
        loResizeClient = self.coGetFreeResizeClient()
        if (loResizeClient is None):
            return
        loConsumer = self.clConsumer.pop(liConsumerIdx)
        loResizeClient.cnAddConsumer(loConsumer)
        self.cdRunningProcess[loConsumer.ciID] = loResizeClient
        loResizeClient.cnProcess()



    def ciGetConsumer(self):
        """
    读取新的任务进行处理
    获取任务的优先级别是
    1.源图已经有cache的,说明很可能resize曾经做过转换,返回将最快
    2.无源图cache
    3.超时的
    """
        liRet = -1
        liFirstNoCache = -1
        llLoc = []
        if (len(self.clConsumer) != 0):
            liRet = 0
            for i in xrange(0, (len(self.clConsumer) - 1)):
                if (not self.clConsumer[i].cbTimeout):
                    if self.clConsumer[i].coImage.csCacheLoc:
                        return i
                    elif (liFirstNoCache == -1):
                        liFirstNoCache = i
                        liRet = i

        return liRet



    def clientConnectionFailed(self, connector, reason):
        log.warn("unable to connect to server")
        self.ciDelayBegin = time.time()
        self.cnRemoveResizeClient()



    def buildProtocol(self, addr):
        loProtocol = self.protocol()
        loProtocol.factory = self
        return loProtocol



    def stopProducing(self, aiReqID):
        """
    停止一个请求,并断开连接
    """
        try:
            for consumer in self.clConsumer:
                if (consumer.ciID == aiReqID):
                    self.clConsumer.remove(consumer)
                    return

            self.cdRunningProcess[aiReqID].transport.loseConnection()
        except:
            pass



    def checkTimeout(self):
        """
    对正在运行的resize请求检查是否超时
    """
        for process in self.cdRunningProcess.values():
            process.handleIfTimeout()




ResizeAgent = clsResizeAgent()

# local variables:
# tab-width: 4

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -