📄 nattraversal.py
字号:
'ST:urn:schemas-upnp-org:device:InternetGatewayDevice:1\r\n' + 'Man:"ssdp:discover"\r\n' + 'MX:3\r\n' + '\r\n') # if you think for one second that I'm going to implement SOAP in any fashion, you're crazy get_mapping_template = ('<?xml version="1.0"?>' + '<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"' + 's:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">' + '<s:Body>' + '<u:GetGenericPortMappingEntry xmlns:u=' + '"urn:schemas-upnp-org:service:WANIPConnection:1">' + '<NewPortMappingIndex>%d</NewPortMappingIndex>' + '</u:GetGenericPortMappingEntry>' + '</s:Body>' + '</s:Envelope>') add_mapping_template = ('<?xml version="1.0"?>' + '<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle=' + '"http://schemas.xmlsoap.org/soap/encoding/">' + '<s:Body>' + '<u:AddPortMapping xmlns:u="urn:schemas-upnp-org:service:WANIPConnection:1">' + '<NewEnabled>1</NewEnabled>' + '<NewRemoteHost>%s</NewRemoteHost>' + '<NewLeaseDuration>0</NewLeaseDuration>' + '<NewInternalPort>%d</NewInternalPort>' + '<NewExternalPort>%d</NewExternalPort>' + '<NewProtocol>%s</NewProtocol>' + '<NewInternalClient>%s</NewInternalClient>' + '<NewPortMappingDescription>%s</NewPortMappingDescription>' + '</u:AddPortMapping>' + '</s:Body>' + '</s:Envelope>') delete_mapping_template = ('<?xml version="1.0"?>' + '<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle=' + '"http://schemas.xmlsoap.org/soap/encoding/">' + '<s:Body>' + '<u:DeletePortMapping xmlns:u="urn:schemas-upnp-org:service:WANIPConnection:1">' + '<NewRemoteHost></NewRemoteHost>' + '<NewExternalPort>%d</NewExternalPort>' + '<NewProtocol>%s</NewProtocol>' + '</u:DeletePortMapping>' + '</s:Body>' + '</s:Envelope>') def _pretify(self, body): # I actually found a router that needed one tag per line body = body.replace('><', '>\r\n<') # don't add newlines in the middle of empty tags (like NewRemoteHost) body = body.replace('>\r\n</', '></') body = body.encode('utf-8') return body def _build_get_mapping_request(self, pmi): body = (self.get_mapping_template % (pmi)) body = self._pretify(body) headers = {'SOAPAction': '"urn:schemas-upnp-org:service:WANIPConnection:1#' + 'GetGenericPortMappingEntry"'} return Request(self.controlURL, body, headers) def _build_add_mapping_request(self, mapping): body = (self.add_mapping_template % (mapping.remote_host, mapping.internal_port, mapping.external_port, mapping.protocol, mapping.host, mapping.service_name)) body = self._pretify(body) headers = {'SOAPAction': '"urn:schemas-upnp-org:service:WANIPConnection:1#' + 'AddPortMapping"'} return Request(self.controlURL, body, headers) def _build_delete_mapping_request(self, external_port, protocol): body = (self.delete_mapping_template % (external_port, protocol)) body = self._pretify(body) headers = {'SOAPAction': '"urn:schemas-upnp-org:service:WANIPConnection:1#' + 'DeletePortMapping"'} return Request(self.controlURL, body, headers) def __init__(self, traverser): NATBase.__init__(self) self.controlURL = None self.transport = None self.traverser = traverser self.rawserver = traverser.rawserver # this service can only be provided if rawserver supports multicast if not hasattr(self.rawserver, "create_multicastsocket"): raise AttributeError, "RawServer does not support create_multicastsocket!" self.rawserver.external_add_task(0, launch_coroutine, _wrap_task(self.rawserver.add_task), self.begin_discovery) def begin_discovery(self): # bind to an available port, and join the multicast group # HEREDAVE! Trying 5000 is excessive especially if there is a # reason for the failure. For example, what if the network interface # is down? The appropriate behavior should be to report the # failure to the user rather than sucking up resources. --Dave #for p in xrange(self.upnp_addr[1], self.upnp_addr[1]+5000): df = get_deferred_host_ip() yield df hostip = df.getResult() for p in xrange(self.upnp_addr[1], self.upnp_addr[1]+50): try: # Original RawServer cannot do this! s = self.rawserver.create_multicastsocket(p, hostip) self.transport = s self.rawserver.start_listening_multicast(s, self) df = s.listening_port.joinGroup(self.upnp_addr[0], socket.INADDR_ANY) yield df # hm, failures. result = df.result # blargh if twisted.copyright.version >= '2.4.0': success = None # ACKKKK..K. Prevents "Unhandled error in Deferred" if df._debugInfo is not None: df._debugInfo.failResult = None else: success = 1 if result is success: break elif isinstance(result, twisted.python.failure.Failure): # HACK. If the failure contains a 'No such device' error # then we abort the discovery because this error denotes # that the peer is not connected to the network. if hasattr( result.value, "__getitem__" ) and \ result.value[2] == 19: yield 0 # abort discovery. else: # I suppose keep trying on different ports, but why would # joinGroup fail? self.transport = None x = s.listening_port.stopListening() if isinstance(x, internet.defer.Deferred): yield df except socket.error, e: pass if not self.transport: # resume init services, because we couldn't bind to a port self.traverser.resume_init_services() else: self.transport.sendto(self.search_string, 0, self.upnp_addr) self.transport.sendto(self.search_string, 0, self.upnp_addr) self.rawserver.add_task(6, self._discovery_timedout) def _discovery_timedout(self): if self.transport: nat_logger.warning("Discovery timed out") self.rawserver.stop_listening_multicast(self.transport) self.transport = None # resume init services, because we know we've failed self.traverser.resume_init_services() def register_port(self, mapping): request = self._build_add_mapping_request(mapping) try: response = urlopen_custom(request, self.rawserver) response = VerifySOAPResponse(request, response) mapping.d.callback(mapping.external_port) nat_logger.info("registered: " + str(mapping)) except Exception, e: #HTTPError, URLError, BadStatusLine, you name it. error = SOAPErrorToString(e) mapping.d.errback(error) def unregister_port(self, external_port, protocol): request = self._build_delete_mapping_request(external_port, protocol) try: response = urlopen_custom(request, self.rawserver) response = VerifySOAPResponse(request, response) nat_logger.info("unregisterd: %s, %s" % (external_port, protocol)) except Exception, e: #HTTPError, URLError, BadStatusLine, you name it. error = SOAPErrorToString(e) nat_logger.error(error) def data_came_in(self, addr, datagram): if self.transport is None: return try: statusline, response = datagram.split('\r\n', 1) except ValueError, e: nat_logger.error(unicode(e.args[0]) + ": " + str(datagram)) # resume init services, because the data is unknown self.traverser.resume_init_services() return httpversion, statuscode, reasonline = statusline.split(None, 2) if (not httpversion.startswith('HTTP')) or (statuscode != '200'): return headers = response.split('\r\n') location = None for header in headers: prefix = 'location:' if header.lower().startswith(prefix): location = header[len(prefix):] location = location.strip() if location: self.rawserver.stop_listening_multicast(self.transport) self.transport = None self.traverser.add_task(self._got_location, location) def _got_location(self, location): if self.controlURL is not None: return URLBase = location for i in xrange(5): # retry try: data = urlopen_custom(location, self.rawserver).read() except IOError: nat_logger.warning("urlopen_custom timeout") except: nat_logger.warning("urlopen_custom error", exc_info=sys.exc_info()) else: break else: nat_logger.warning("urlopen_custom error. giving up.") return try: bs = BeautifulSupe(data) except: # xml.parsers.expat.ExpatError, maybe others #open("wtf.xml", 'wb').write(data) nat_logger.warning("XML parse error", exc_info=sys.exc_info()) return URLBase_tag = bs.first('URLBase') if URLBase_tag and URLBase_tag.contents: URLBase = str(URLBase_tag.contents[0]) wanservices = bs.fetch('service', dict(serviceType= 'urn:schemas-upnp-org:service:WANIPConnection:')) wanservices += bs.fetch('service', dict(serviceType= 'urn:schemas-upnp-org:service:WANPPPConnection:')) for service in wanservices: controlURL = service.get('controlURL') if controlURL: self.controlURL = urlparse.urljoin(URLBase, controlURL) break if self.controlURL is None: # resume init services, because we know we've failed self.traverser.resume_init_services() return # attach service, so the queue gets flushed self.traverser.attach_service(self) def _list_ports(self): mappings = [] _mappings_dict = {} if self.controlURL is None: raise UPnPException("ManualUPnP is not prepared") while True: request = self._build_get_mapping_request(len(mappings)) try: response = urlopen_custom(request, self.rawserver) soap_response = VerifySOAPResponse(request, response) results = SOAPResponseToDict(soap_response) mapping = UPnPPortMapping(results['NewExternalPort'], results['NewInternalPort'], results['NewProtocol'], results['NewInternalClient'], results['NewPortMappingDescription']) ports = (results['NewExternalPort'], results['NewInternalPort']) if ports in _mappings_dict: # duplicate response, stop searching (because the router is clearly insane) break mappings.append(mapping) _mappings_dict[ports] = 1 except URLError, e: # SpecifiedArrayIndexInvalid, for example break except (HTTPError, BadStatusLine, socket.error): nat_logger.error("list_ports failed with:", exc_info=sys.exc_info()) break return mappingsclass WindowsUPnPException(UPnPException): def __init__(self, msg, *args): msg += " (%s)" % os_version a = [msg] + list(args) UPnPException.__init__(self, *a)class WindowsUPnP(NATBase): def __init__(self, traverser): NATBase.__init__(self) self.upnpnat = None self.port_collection = None self.traverser = traverser win32com.client.pythoncom.CoInitialize() try: self.upnpnat = win32com.client.Dispatch("HNetCfg.NATUPnP") except pywintypes.com_error, e: if (e[2][5] == -2147221005): raise WindowsUPnPException("invalid class string") else: raise try: self.port_collection = self.upnpnat.StaticPortMappingCollection if self.port_collection is None: raise WindowsUPnPException("none port_collection") except pywintypes.com_error, e: #if e[1].lower() == "exception occurred.": if (e[2][5] == -2147221164): # I think this is Class Not Registered. # Happens on Windows 98 after the XP ICS wizard has been run raise WindowsUPnPException("exception occurred, class not registered") else: raise # attach service, so the queue gets flushed self.traverser.attach_service(self) def register_port(self, mapping): try: self.port_collection.Add(mapping.external_port, mapping.protocol, mapping.internal_port, mapping.host, True, mapping.service_name) nat_logger.info("registered: " + str(mapping)) mapping.d.callback(mapping.external_port) except pywintypes.com_error, e: # host == 'fake' or address already bound #if (e[2][5] == -2147024726): # host == '', or I haven't a clue #e.args[0] == -2147024894 #mapping.d.errback(e) # detach self so the queue isn't flushed self.traverser.detach_service(self) if hasattr(mapping, 'original_external_port'): mapping.external_port = mapping.original_external_port del mapping.original_external_port # push this mapping back on the queue self.traverser.register_requests.append(mapping) # resume init services, because we know we've failed self.traverser.resume_init_services() def unregister_port(self, external_port, protocol): try: self.port_collection.Remove(external_port, protocol) nat_logger.info("unregisterd: %s, %s" % (external_port, protocol)) except pywintypes.com_error, e: if (e[2][5] == -2147352567): UPNPError("Port %d:%s not bound" % (external_port, protocol)) elif (e[2][5] == -2147221008): UPNPError("Port %d:%s is bound and is not ours to remove" % (external_port, protocol)) elif (e[2][5] == -2147024894): UPNPError("Port %d:%s not bound (2)" % (external_port, protocol)) else: raise def _list_ports(self): mappings = [] try: for mp in self.port_collection: mapping = UPnPPortMapping(mp.ExternalPort, mp.InternalPort, mp.Protocol, mp.InternalClient, mp.Description) mappings.append(mapping) except pywintypes.com_error, e: # it's the "for mp in self.port_collection" iter that can throw # an exception. # com_error: (-2147220976, 'The owner of the PerUser subscription is # not logged on to the system specified', # None, None) pass return mappings
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -