⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 debuggerharness.py

📁 wxPython的基本示例程序
💻 PY
📖 第 1 页 / 共 2 页
字号:
#----------------------------------------------------------------------------# Name:         DebuggerHarness.py# Purpose:      ## Author:       Matt Fryer## Created:      7/28/04# CVS-ID:       $Id: DebuggerHarness.py,v 1.6 2005/12/30 23:01:35 RD Exp $# Copyright:    (c) 2005 ActiveGrid, Inc.# License:      wxWindows License#----------------------------------------------------------------------------import bdbimport sysimport SimpleXMLRPCServerimport threadingimport xmlrpclibimport osimport typesimport Queueimport tracebackimport inspectfrom xml.dom.minidom import getDOMImplementationimport atexitimport pickleimport cStringIOimport bz2if sys.platform.startswith("win"):    import win32api    _WINDOWS = Trueelse:    _WINDOWS = False    _VERBOSE = False_DEBUG_DEBUGGER = Falseclass Adb(bdb.Bdb):    def __init__(self, harness, queue):        bdb.Bdb.__init__(self)        self._harness = harness        self._userBreak = False        self._queue = queue        self._knownCantExpandFiles = {}         self._knownExpandedFiles = {}         def getLongName(self, filename):        if not _WINDOWS:            return filename        if self._knownCantExpandFiles.get(filename):            return filename        if self._knownExpandedFiles.get(filename):            return self._knownExpandedFiles.get(filename)        try:            newname = win32api.GetLongPathName(filename)            self._knownExpandedFiles[filename] = newname            return newname        except:            self._knownCantExpandFiles[filename] = filename            return filename                def canonic(self, orig_filename):        if orig_filename == "<" + orig_filename[1:-1] + ">":            return orig_filename        filename = self.getLongName(orig_filename)            canonic = self.fncache.get(filename)        if not canonic:            canonic = os.path.abspath(filename)            canonic = os.path.normcase(canonic)            self.fncache[filename] = canonic        return canonic             # Overriding this so that we continue to trace even if no breakpoints are set.    def set_continue(self):        self.stopframe = self.botframe        self.returnframe = None        self.quitting = 0                               def do_clear(self, arg):        bdb.Breakpoint.bpbynumber[int(arg)].deleteMe()            def user_line(self, frame):        if self.in_debugger_code(frame):            self.set_step()            return        message = self.__frame2message(frame)        self._harness.interaction(message, frame, "")            def user_call(self, frame, argument_list):        if self.in_debugger_code(frame):            self.set_step()            return        if self.stop_here(frame):            message = self.__frame2message(frame)            self._harness.interaction(message, frame, "")            def user_return(self, frame, return_value):        if self.in_debugger_code(frame):            self.set_step()            return        message = self.__frame2message(frame)        self._harness.interaction(message, frame, "")            def user_exception(self, frame, (exc_type, exc_value, exc_traceback)):        frame.f_locals['__exception__'] = exc_type, exc_value        if type(exc_type) == type(''):            exc_type_name = exc_type        else:             exc_type_name = exc_type.__name__        message = "Exception occured: " + repr(exc_type_name) + " See locals.__exception__ for details."        traceback.print_exception(exc_type, exc_value, exc_traceback)        self._harness.interaction(message, frame, message)    def in_debugger_code(self, frame):        if _DEBUG_DEBUGGER: return False        message = self.__frame2message(frame)        return message.count('DebuggerHarness') > 0            def frame2message(self, frame):        return self.__frame2message(frame)            def __frame2message(self, frame):        code = frame.f_code        filename = code.co_filename        lineno = frame.f_lineno        basename = os.path.basename(filename)        message = "%s:%s" % (basename, lineno)        if code.co_name != "?":            message = "%s: %s()" % (message, code.co_name)        return message            def runFile(self, fileName):        self.reset()        #global_dict = {}        #global_dict['__name__'] = '__main__'        try:            fileToRun = open(fileName, mode='r')            if _VERBOSE: print "Running file ", fileName            sys.settrace(self.trace_dispatch)            import __main__            exec fileToRun in __main__.__dict__,__main__.__dict__        except SystemExit:            pass        except:            tp, val, tb = sys.exc_info()            traceback.print_exception(tp, val, tb)                   sys.settrace(None)        self.quitting = 1        #global_dict.clear()     def trace_dispatch(self, frame, event, arg):        if self.quitting:            return # None        # Check for ui events        self.readQueue()        if event == 'line':            return self.dispatch_line(frame)        if event == 'call':            return self.dispatch_call(frame, arg)        if event == 'return':            return self.dispatch_return(frame, arg)        if event == 'exception':            return self.dispatch_exception(frame, arg)        print 'Adb.dispatch: unknown debugging event:', `event`        return self.trace_dispatch         def readQueue(self):        while self._queue.qsize():            try:                item = self._queue.get_nowait()                if item.kill():                    self._harness.do_exit(kill=True)                elif item.breakHere():                    self._userBreak = True                elif item.hasBreakpoints():                    self.set_all_breakpoints(item.getBreakpoints())            except Queue.Empty:                pass                                      def set_all_breakpoints(self, dict):        self.clear_all_breaks()        for fileName in dict.keys():            lineList = dict[fileName]            for lineNumber in lineList:                                if _VERBOSE: print "Setting break at line ", str(lineNumber), " in file ", self.canonic(fileName)                self.set_break(fileName, int(lineNumber))        return ""                    def stop_here(self, frame):        if self._userBreak:            return True                # (CT) stopframe may now also be None, see dispatch_call.        # (CT) the former test for None is therefore removed from here.        if frame is self.stopframe:            return True        while frame is not None and frame is not self.stopframe:            if frame is self.botframe:                return True            frame = frame.f_back        return Falseclass BreakNotify(object):    def __init__(self, bps=None, break_here=False, kill=False):        self._bps = bps        self._break_here = break_here        self._kill = kill            def breakHere(self):        return self._break_here            def kill(self):        return self._kill            def getBreakpoints(self):        return self._bps        def hasBreakpoints(self):        return (self._bps != None)class AGXMLRPCServer(SimpleXMLRPCServer.SimpleXMLRPCServer):    def __init__(self, address, logRequests=0):        SimpleXMLRPCServer.SimpleXMLRPCServer.__init__(self, address, logRequests=logRequests)                       class BreakListenerThread(threading.Thread):    def __init__(self, host, port, queue):        threading.Thread.__init__(self)        self._host = host        self._port = int(port)        self._keepGoing = True        self._queue = queue        self._server = AGXMLRPCServer((self._host, self._port), logRequests=0)        self._server.register_function(self.update_breakpoints)        self._server.register_function(self.break_requested)        self._server.register_function(self.die)        def break_requested(self):        bn = BreakNotify(break_here=True)        self._queue.put(bn)        return ""            def update_breakpoints(self, pickled_Binary_bpts):        dict = pickle.loads(pickled_Binary_bpts.data)        bn = BreakNotify(bps=dict)        self._queue.put(bn)        return ""            def die(self):        bn = BreakNotify(kill=True)        self._queue.put(bn)        return ""                def run(self):        while self._keepGoing:            try:                self._server.handle_request()             except:                if _VERBOSE:                    tp, val, tb = sys.exc_info()                    print "Exception in BreakListenerThread.run():", str(tp), str(val)                self._keepGoing = False           def AskToStop(self):        self._keepGoing = False        if type(self._server) is not types.NoneType:            if _VERBOSE: print "Before calling server close on breakpoint server"            self._server.server_close()            if _VERBOSE: print "Calling server close on breakpoint server"            self._server = None                                   class DebuggerHarness(object):        def __init__(self):        # Host and port for debugger-side RPC server        self._hostname = sys.argv[1]        self._portNumber = int(sys.argv[2])        # Name the gui proxy object is registered under        self._breakPortNumber = int(sys.argv[3])        # Host and port where the gui proxy can be found.        self._guiHost = sys.argv[4]        self._guiPort = int(sys.argv[5])        # Command to debug.        self._command = sys.argv[6]        # Strip out the harness' arguments so that the process we run will see argv as if        # it was called directly.        sys.argv = sys.argv[6:]        self._currentFrame = None        self._wait = False        # Connect to the gui-side RPC server.        self._guiServerUrl = 'http://' + self._guiHost + ':' + str(self._guiPort) + '/'        if _VERBOSE: print "Connecting to gui server at ", self._guiServerUrl        self._guiServer = xmlrpclib.ServerProxy(self._guiServerUrl,allow_none=1)            # Start the break listener        self._breakQueue = Queue.Queue(50)        self._breakListener = BreakListenerThread(self._hostname, self._breakPortNumber, self._breakQueue)                self._breakListener.start()        # Create the debugger.        self._adb = Adb(self, self._breakQueue)                # Create the debugger-side RPC Server and register functions for remote calls.        self._server = AGXMLRPCServer((self._hostname, self._portNumber), logRequests=0)        self._server.register_function(self.set_step)        self._server.register_function(self.set_continue)        self._server.register_function(self.set_next)        self._server.register_function(self.set_return)        self._server.register_function(self.set_breakpoint)        self._server.register_function(self.clear_breakpoint)        self._server.register_function(self.set_all_breakpoints)        self._server.register_function(self.attempt_introspection)        self._server.register_function(self.execute_in_frame)        self._server.register_function(self.add_watch)        self._server.register_function(self.request_frame_document)                self.frame_stack = []        self.message_frame_dict = {}        self.introspection_list = []        atexit.register(self.do_exit)            def run(self):        self._adb.runFile(self._command)        self.do_exit(kill=True)            def do_exit(self, kill=False):        self._adb.set_quit()        self._breakListener.AskToStop()        self._server.server_close()        try:            self._guiServer.quit()        except:            pass        if kill:            try:                sys.exit()            except:                pass            def set_breakpoint(self, fileName, lineNo):        self._adb.set_break(fileName, lineNo)        return ""            def set_all_breakpoints(self, dict):        self._adb.clear_all_breaks()        for fileName in dict.keys():            lineList = dict[fileName]            for lineNumber in lineList:                self._adb.set_break(fileName, int(lineNumber))                if _VERBOSE: print "Setting break at ", str(lineNumber), " in file ", fileName        return ""                                            def clear_breakpoint(self, fileName, lineNo):        self._adb.clear_break(fileName, lineNo)        return ""    def add_watch(self,  name,  text, frame_message, run_once):         if len(frame_message) > 0:            frame = self.message_frame_dict[frame_message]             try:                item = eval(text, frame.f_globals, frame.f_locals)                return self.get_watch_document(item, name)            except:                 tp, val, tb = sys.exc_info()                return self.get_exception_document(tp, val, tb)         return ""            def execute_in_frame(self, frame_message, command):        frame = self.message_frame_dict[frame_message]        output = cStringIO.StringIO()        out = sys.stdout        err = sys.stderr        sys.stdout = output        sys.stderr = output        try:            code = compile(command, '<string>', 'single')            exec code in frame.f_globals, frame.f_locals            return output.getvalue()            sys.stdout = out            sys.stderr = err                except:            sys.stdout = out            sys.stderr = err        

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -