natpunch.py

来自「这是一个嵌入式linux系统下的命令工具包」· Python 代码 · 共 255 行

PY
255
字号
# Written by John Hoffman# derived from NATPortMapping.py by Yejun Yang# and from example code by Myers Carpenter# see LICENSE.txt for license informationimport socketfrom traceback import print_excfrom subnetparse import IP_Listfrom clock import clockfrom __init__ import createPeerIDtry:    Trueexcept:    True = 1    False = 0DEBUG = FalseEXPIRE_CACHE = 30 # secondsID = "BT-"+createPeerID()[-4:]try:    import pythoncom, win32com.client    _supported = 1except ImportError:    _supported = 0class _UPnP1:   # derived from Myers Carpenter's code                # seems to use the machine's local UPnP                # system for its operation.  Runs fairly fast    def __init__(self):        self.map = None        self.last_got_map = -10e10    def _get_map(self):        if self.last_got_map + EXPIRE_CACHE < clock():            try:                dispatcher = win32com.client.Dispatch("HNetCfg.NATUPnP")                self.map = dispatcher.StaticPortMappingCollection                self.last_got_map = clock()            except:                self.map = None        return self.map    def test(self):        try:            assert self._get_map()     # make sure a map was found            success = True        except:            success = False        return success    def open(self, ip, p):        map = self._get_map()        try:            map.Add(p,'TCP',p,ip,True,ID)            if DEBUG:                print 'port opened: '+ip+':'+str(p)            success = True        except:            if DEBUG:                print "COULDN'T OPEN "+str(p)                print_exc()            success = False        return success    def close(self, p):        map = self._get_map()        try:            map.Remove(p,'TCP')            success = True            if DEBUG:                print 'port closed: '+str(p)        except:            if DEBUG:                print 'ERROR CLOSING '+str(p)                print_exc()            success = False        return success    def clean(self, retry = False):        if not _supported:            return        try:            map = self._get_map()            ports_in_use = []            for i in xrange(len(map)):                try:                    mapping = map[i]                    port = mapping.ExternalPort                    prot = str(mapping.Protocol).lower()                    desc = str(mapping.Description).lower()                except:                    port = None                if port and prot == 'tcp' and desc[:3] == 'bt-':                    ports_in_use.append(port)            success = True            for port in ports_in_use:                try:                    map.Remove(port,'TCP')                except:                    success = False            if not success and not retry:                self.clean(retry = True)        except:            passclass _UPnP2:   # derived from Yejun Yang's code                # apparently does a direct search for UPnP hardware                # may work in some cases where _UPnP1 won't, but is slow                # still need to implement "clean" method    def __init__(self):        self.services = None        self.last_got_services = -10e10                               def _get_services(self):        if not self.services or self.last_got_services + EXPIRE_CACHE < clock():            self.services = []            try:                f=win32com.client.Dispatch("UPnP.UPnPDeviceFinder")                for t in ( "urn:schemas-upnp-org:service:WANIPConnection:1",                           "urn:schemas-upnp-org:service:WANPPPConnection:1" ):                    try:                        conns = f.FindByType(t,0)                        for c in xrange(len(conns)):                            try:                                svcs = conns[c].Services                                for s in xrange(len(svcs)):                                    try:                                        self.services.append(svcs[s])                                    except:                                        pass                            except:                                pass                    except:                        pass            except:                pass            self.last_got_services = clock()        return self.services    def test(self):        try:            assert self._get_services()    # make sure some services can be found            success = True        except:            success = False        return success    def open(self, ip, p):        svcs = self._get_services()        success = False        for s in svcs:            try:                s.InvokeAction('AddPortMapping',['',p,'TCP',p,ip,True,ID,0],'')                success = True            except:                pass        if DEBUG and not success:            print "COULDN'T OPEN "+str(p)            print_exc()        return success    def close(self, p):        svcs = self._get_services()        success = False        for s in svcs:            try:                s.InvokeAction('DeletePortMapping', ['',p,'TCP'], '')                success = True            except:                pass        if DEBUG and not success:            print "COULDN'T OPEN "+str(p)            print_exc()        return successclass _UPnP:    # master holding class    def __init__(self):        self.upnp1 = _UPnP1()        self.upnp2 = _UPnP2()        self.upnplist = (None, self.upnp1, self.upnp2)        self.upnp = None        self.local_ip = None        self.last_got_ip = -10e10            def get_ip(self):        if self.last_got_ip + EXPIRE_CACHE < clock():            local_ips = IP_List()            local_ips.set_intranet_addresses()            try:                for info in socket.getaddrinfo(socket.gethostname(),0,socket.AF_INET):                            # exception if socket library isn't recent                    self.local_ip = info[4][0]                    if local_ips.includes(self.local_ip):                        self.last_got_ip = clock()                        if DEBUG:                            print 'Local IP found: '+self.local_ip                        break                else:                    raise ValueError('couldn\'t find intranet IP')            except:                self.local_ip = None                if DEBUG:                    print 'Error finding local IP'                    print_exc()        return self.local_ip    def test(self, upnp_type):        if DEBUG:            print 'testing UPnP type '+str(upnp_type)        if not upnp_type or not _supported or self.get_ip() is None:            if DEBUG:                print 'not supported'            return 0        pythoncom.CoInitialize()                # leave initialized        self.upnp = self.upnplist[upnp_type]    # cache this        if self.upnp.test():            if DEBUG:                print 'ok'            return upnp_type        if DEBUG:            print 'tested bad'        return 0    def open(self, p):        assert self.upnp, "must run UPnP_test() with the desired UPnP access type first"        return self.upnp.open(self.get_ip(), p)    def close(self, p):        assert self.upnp, "must run UPnP_test() with the desired UPnP access type first"        return self.upnp.close(p)    def clean(self):        return self.upnp1.clean()_upnp_ = _UPnP()UPnP_test = _upnp_.testUPnP_open_port = _upnp_.openUPnP_close_port = _upnp_.closeUPnP_reset = _upnp_.clean

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?