📄 debuggerharness.py
字号:
#----------------------------------------------------------------------------# Name: DebuggerHarness.py# Purpose: ## Author: Matt Fryer## Created: 7/28/04# CVS-ID: $Id: DebuggerHarness.py,v 1.6 2005/12/30 23:01:35 RD Exp $# Copyright: (c) 2005 ActiveGrid, Inc.# License: wxWindows License#----------------------------------------------------------------------------import bdbimport sysimport SimpleXMLRPCServerimport threadingimport xmlrpclibimport osimport typesimport Queueimport tracebackimport inspectfrom xml.dom.minidom import getDOMImplementationimport atexitimport pickleimport cStringIOimport bz2if sys.platform.startswith("win"): import win32api _WINDOWS = Trueelse: _WINDOWS = False _VERBOSE = False_DEBUG_DEBUGGER = Falseclass Adb(bdb.Bdb): def __init__(self, harness, queue): bdb.Bdb.__init__(self) self._harness = harness self._userBreak = False self._queue = queue self._knownCantExpandFiles = {} self._knownExpandedFiles = {} def getLongName(self, filename): if not _WINDOWS: return filename if self._knownCantExpandFiles.get(filename): return filename if self._knownExpandedFiles.get(filename): return self._knownExpandedFiles.get(filename) try: newname = win32api.GetLongPathName(filename) self._knownExpandedFiles[filename] = newname return newname except: self._knownCantExpandFiles[filename] = filename return filename def canonic(self, orig_filename): if orig_filename == "<" + orig_filename[1:-1] + ">": return orig_filename filename = self.getLongName(orig_filename) canonic = self.fncache.get(filename) if not canonic: canonic = os.path.abspath(filename) canonic = os.path.normcase(canonic) self.fncache[filename] = canonic return canonic # Overriding this so that we continue to trace even if no breakpoints are set. def set_continue(self): self.stopframe = self.botframe self.returnframe = None self.quitting = 0 def do_clear(self, arg): bdb.Breakpoint.bpbynumber[int(arg)].deleteMe() def user_line(self, frame): if self.in_debugger_code(frame): self.set_step() return message = self.__frame2message(frame) self._harness.interaction(message, frame, "") def user_call(self, frame, argument_list): if self.in_debugger_code(frame): self.set_step() return if self.stop_here(frame): message = self.__frame2message(frame) self._harness.interaction(message, frame, "") def user_return(self, frame, return_value): if self.in_debugger_code(frame): self.set_step() return message = self.__frame2message(frame) self._harness.interaction(message, frame, "") def user_exception(self, frame, (exc_type, exc_value, exc_traceback)): frame.f_locals['__exception__'] = exc_type, exc_value if type(exc_type) == type(''): exc_type_name = exc_type else: exc_type_name = exc_type.__name__ message = "Exception occured: " + repr(exc_type_name) + " See locals.__exception__ for details." traceback.print_exception(exc_type, exc_value, exc_traceback) self._harness.interaction(message, frame, message) def in_debugger_code(self, frame): if _DEBUG_DEBUGGER: return False message = self.__frame2message(frame) return message.count('DebuggerHarness') > 0 def frame2message(self, frame): return self.__frame2message(frame) def __frame2message(self, frame): code = frame.f_code filename = code.co_filename lineno = frame.f_lineno basename = os.path.basename(filename) message = "%s:%s" % (basename, lineno) if code.co_name != "?": message = "%s: %s()" % (message, code.co_name) return message def runFile(self, fileName): self.reset() #global_dict = {} #global_dict['__name__'] = '__main__' try: fileToRun = open(fileName, mode='r') if _VERBOSE: print "Running file ", fileName sys.settrace(self.trace_dispatch) import __main__ exec fileToRun in __main__.__dict__,__main__.__dict__ except SystemExit: pass except: tp, val, tb = sys.exc_info() traceback.print_exception(tp, val, tb) sys.settrace(None) self.quitting = 1 #global_dict.clear() def trace_dispatch(self, frame, event, arg): if self.quitting: return # None # Check for ui events self.readQueue() if event == 'line': return self.dispatch_line(frame) if event == 'call': return self.dispatch_call(frame, arg) if event == 'return': return self.dispatch_return(frame, arg) if event == 'exception': return self.dispatch_exception(frame, arg) print 'Adb.dispatch: unknown debugging event:', `event` return self.trace_dispatch def readQueue(self): while self._queue.qsize(): try: item = self._queue.get_nowait() if item.kill(): self._harness.do_exit(kill=True) elif item.breakHere(): self._userBreak = True elif item.hasBreakpoints(): self.set_all_breakpoints(item.getBreakpoints()) except Queue.Empty: pass def set_all_breakpoints(self, dict): self.clear_all_breaks() for fileName in dict.keys(): lineList = dict[fileName] for lineNumber in lineList: if _VERBOSE: print "Setting break at line ", str(lineNumber), " in file ", self.canonic(fileName) self.set_break(fileName, int(lineNumber)) return "" def stop_here(self, frame): if self._userBreak: return True # (CT) stopframe may now also be None, see dispatch_call. # (CT) the former test for None is therefore removed from here. if frame is self.stopframe: return True while frame is not None and frame is not self.stopframe: if frame is self.botframe: return True frame = frame.f_back return Falseclass BreakNotify(object): def __init__(self, bps=None, break_here=False, kill=False): self._bps = bps self._break_here = break_here self._kill = kill def breakHere(self): return self._break_here def kill(self): return self._kill def getBreakpoints(self): return self._bps def hasBreakpoints(self): return (self._bps != None)class AGXMLRPCServer(SimpleXMLRPCServer.SimpleXMLRPCServer): def __init__(self, address, logRequests=0): SimpleXMLRPCServer.SimpleXMLRPCServer.__init__(self, address, logRequests=logRequests) class BreakListenerThread(threading.Thread): def __init__(self, host, port, queue): threading.Thread.__init__(self) self._host = host self._port = int(port) self._keepGoing = True self._queue = queue self._server = AGXMLRPCServer((self._host, self._port), logRequests=0) self._server.register_function(self.update_breakpoints) self._server.register_function(self.break_requested) self._server.register_function(self.die) def break_requested(self): bn = BreakNotify(break_here=True) self._queue.put(bn) return "" def update_breakpoints(self, pickled_Binary_bpts): dict = pickle.loads(pickled_Binary_bpts.data) bn = BreakNotify(bps=dict) self._queue.put(bn) return "" def die(self): bn = BreakNotify(kill=True) self._queue.put(bn) return "" def run(self): while self._keepGoing: try: self._server.handle_request() except: if _VERBOSE: tp, val, tb = sys.exc_info() print "Exception in BreakListenerThread.run():", str(tp), str(val) self._keepGoing = False def AskToStop(self): self._keepGoing = False if type(self._server) is not types.NoneType: if _VERBOSE: print "Before calling server close on breakpoint server" self._server.server_close() if _VERBOSE: print "Calling server close on breakpoint server" self._server = None class DebuggerHarness(object): def __init__(self): # Host and port for debugger-side RPC server self._hostname = sys.argv[1] self._portNumber = int(sys.argv[2]) # Name the gui proxy object is registered under self._breakPortNumber = int(sys.argv[3]) # Host and port where the gui proxy can be found. self._guiHost = sys.argv[4] self._guiPort = int(sys.argv[5]) # Command to debug. self._command = sys.argv[6] # Strip out the harness' arguments so that the process we run will see argv as if # it was called directly. sys.argv = sys.argv[6:] self._currentFrame = None self._wait = False # Connect to the gui-side RPC server. self._guiServerUrl = 'http://' + self._guiHost + ':' + str(self._guiPort) + '/' if _VERBOSE: print "Connecting to gui server at ", self._guiServerUrl self._guiServer = xmlrpclib.ServerProxy(self._guiServerUrl,allow_none=1) # Start the break listener self._breakQueue = Queue.Queue(50) self._breakListener = BreakListenerThread(self._hostname, self._breakPortNumber, self._breakQueue) self._breakListener.start() # Create the debugger. self._adb = Adb(self, self._breakQueue) # Create the debugger-side RPC Server and register functions for remote calls. self._server = AGXMLRPCServer((self._hostname, self._portNumber), logRequests=0) self._server.register_function(self.set_step) self._server.register_function(self.set_continue) self._server.register_function(self.set_next) self._server.register_function(self.set_return) self._server.register_function(self.set_breakpoint) self._server.register_function(self.clear_breakpoint) self._server.register_function(self.set_all_breakpoints) self._server.register_function(self.attempt_introspection) self._server.register_function(self.execute_in_frame) self._server.register_function(self.add_watch) self._server.register_function(self.request_frame_document) self.frame_stack = [] self.message_frame_dict = {} self.introspection_list = [] atexit.register(self.do_exit) def run(self): self._adb.runFile(self._command) self.do_exit(kill=True) def do_exit(self, kill=False): self._adb.set_quit() self._breakListener.AskToStop() self._server.server_close() try: self._guiServer.quit() except: pass if kill: try: sys.exit() except: pass def set_breakpoint(self, fileName, lineNo): self._adb.set_break(fileName, lineNo) return "" def set_all_breakpoints(self, dict): self._adb.clear_all_breaks() for fileName in dict.keys(): lineList = dict[fileName] for lineNumber in lineList: self._adb.set_break(fileName, int(lineNumber)) if _VERBOSE: print "Setting break at ", str(lineNumber), " in file ", fileName return "" def clear_breakpoint(self, fileName, lineNo): self._adb.clear_break(fileName, lineNo) return "" def add_watch(self, name, text, frame_message, run_once): if len(frame_message) > 0: frame = self.message_frame_dict[frame_message] try: item = eval(text, frame.f_globals, frame.f_locals) return self.get_watch_document(item, name) except: tp, val, tb = sys.exc_info() return self.get_exception_document(tp, val, tb) return "" def execute_in_frame(self, frame_message, command): frame = self.message_frame_dict[frame_message] output = cStringIO.StringIO() out = sys.stdout err = sys.stderr sys.stdout = output sys.stderr = output try: code = compile(command, '<string>', 'single') exec code in frame.f_globals, frame.f_locals return output.getvalue() sys.stdout = out sys.stderr = err except: sys.stdout = out sys.stderr = err
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -