📄 debuggerharness.py
字号:
#----------------------------------------------------------------------------
# Name: DebuggerHarness.py
# Purpose:
#
# Author: Matt Fryer
#
# Created: 7/28/04
# CVS-ID: $Id: DebuggerHarness.py,v 1.6 2005/12/30 23:01:35 RD Exp $
# Copyright: (c) 2005 ActiveGrid, Inc.
# License: wxWindows License
#----------------------------------------------------------------------------
import bdb
import sys
import SimpleXMLRPCServer
import threading
import xmlrpclib
import os
import types
import Queue
import traceback
import inspect
from xml.dom.minidom import getDOMImplementation
import atexit
import pickle
import cStringIO
import bz2
if sys.platform.startswith("win"):
import win32api
_WINDOWS = True
else:
_WINDOWS = False
_VERBOSE = False
_DEBUG_DEBUGGER = False
class Adb(bdb.Bdb):
def __init__(self, harness, queue):
bdb.Bdb.__init__(self)
self._harness = harness
self._userBreak = False
self._queue = queue
self._knownCantExpandFiles = {}
self._knownExpandedFiles = {}
def getLongName(self, filename):
if not _WINDOWS:
return filename
if self._knownCantExpandFiles.get(filename):
return filename
if self._knownExpandedFiles.get(filename):
return self._knownExpandedFiles.get(filename)
try:
newname = win32api.GetLongPathName(filename)
self._knownExpandedFiles[filename] = newname
return newname
except:
self._knownCantExpandFiles[filename] = filename
return filename
def canonic(self, orig_filename):
if orig_filename == "<" + orig_filename[1:-1] + ">":
return orig_filename
filename = self.getLongName(orig_filename)
canonic = self.fncache.get(filename)
if not canonic:
canonic = os.path.abspath(filename)
canonic = os.path.normcase(canonic)
self.fncache[filename] = canonic
return canonic
# Overriding this so that we continue to trace even if no breakpoints are set.
def set_continue(self):
self.stopframe = self.botframe
self.returnframe = None
self.quitting = 0
def do_clear(self, arg):
bdb.Breakpoint.bpbynumber[int(arg)].deleteMe()
def user_line(self, frame):
if self.in_debugger_code(frame):
self.set_step()
return
message = self.__frame2message(frame)
self._harness.interaction(message, frame, "")
def user_call(self, frame, argument_list):
if self.in_debugger_code(frame):
self.set_step()
return
if self.stop_here(frame):
message = self.__frame2message(frame)
self._harness.interaction(message, frame, "")
def user_return(self, frame, return_value):
if self.in_debugger_code(frame):
self.set_step()
return
message = self.__frame2message(frame)
self._harness.interaction(message, frame, "")
def user_exception(self, frame, (exc_type, exc_value, exc_traceback)):
frame.f_locals['__exception__'] = exc_type, exc_value
if type(exc_type) == type(''):
exc_type_name = exc_type
else:
exc_type_name = exc_type.__name__
message = "Exception occured: " + repr(exc_type_name) + " See locals.__exception__ for details."
traceback.print_exception(exc_type, exc_value, exc_traceback)
self._harness.interaction(message, frame, message)
def in_debugger_code(self, frame):
if _DEBUG_DEBUGGER: return False
message = self.__frame2message(frame)
return message.count('DebuggerHarness') > 0
def frame2message(self, frame):
return self.__frame2message(frame)
def __frame2message(self, frame):
code = frame.f_code
filename = code.co_filename
lineno = frame.f_lineno
basename = os.path.basename(filename)
message = "%s:%s" % (basename, lineno)
if code.co_name != "?":
message = "%s: %s()" % (message, code.co_name)
return message
def runFile(self, fileName):
self.reset()
#global_dict = {}
#global_dict['__name__'] = '__main__'
try:
fileToRun = open(fileName, mode='r')
if _VERBOSE: print "Running file ", fileName
sys.settrace(self.trace_dispatch)
import __main__
exec fileToRun in __main__.__dict__,__main__.__dict__
except SystemExit:
pass
except:
tp, val, tb = sys.exc_info()
traceback.print_exception(tp, val, tb)
sys.settrace(None)
self.quitting = 1
#global_dict.clear()
def trace_dispatch(self, frame, event, arg):
if self.quitting:
return # None
# Check for ui events
self.readQueue()
if event == 'line':
return self.dispatch_line(frame)
if event == 'call':
return self.dispatch_call(frame, arg)
if event == 'return':
return self.dispatch_return(frame, arg)
if event == 'exception':
return self.dispatch_exception(frame, arg)
print 'Adb.dispatch: unknown debugging event:', `event`
return self.trace_dispatch
def readQueue(self):
while self._queue.qsize():
try:
item = self._queue.get_nowait()
if item.kill():
self._harness.do_exit(kill=True)
elif item.breakHere():
self._userBreak = True
elif item.hasBreakpoints():
self.set_all_breakpoints(item.getBreakpoints())
except Queue.Empty:
pass
def set_all_breakpoints(self, dict):
self.clear_all_breaks()
for fileName in dict.keys():
lineList = dict[fileName]
for lineNumber in lineList:
if _VERBOSE: print "Setting break at line ", str(lineNumber), " in file ", self.canonic(fileName)
self.set_break(fileName, int(lineNumber))
return ""
def stop_here(self, frame):
if self._userBreak:
return True
# (CT) stopframe may now also be None, see dispatch_call.
# (CT) the former test for None is therefore removed from here.
if frame is self.stopframe:
return True
while frame is not None and frame is not self.stopframe:
if frame is self.botframe:
return True
frame = frame.f_back
return False
class BreakNotify(object):
def __init__(self, bps=None, break_here=False, kill=False):
self._bps = bps
self._break_here = break_here
self._kill = kill
def breakHere(self):
return self._break_here
def kill(self):
return self._kill
def getBreakpoints(self):
return self._bps
def hasBreakpoints(self):
return (self._bps != None)
class AGXMLRPCServer(SimpleXMLRPCServer.SimpleXMLRPCServer):
def __init__(self, address, logRequests=0):
SimpleXMLRPCServer.SimpleXMLRPCServer.__init__(self, address, logRequests=logRequests)
class BreakListenerThread(threading.Thread):
def __init__(self, host, port, queue):
threading.Thread.__init__(self)
self._host = host
self._port = int(port)
self._keepGoing = True
self._queue = queue
self._server = AGXMLRPCServer((self._host, self._port), logRequests=0)
self._server.register_function(self.update_breakpoints)
self._server.register_function(self.break_requested)
self._server.register_function(self.die)
def break_requested(self):
bn = BreakNotify(break_here=True)
self._queue.put(bn)
return ""
def update_breakpoints(self, pickled_Binary_bpts):
dict = pickle.loads(pickled_Binary_bpts.data)
bn = BreakNotify(bps=dict)
self._queue.put(bn)
return ""
def die(self):
bn = BreakNotify(kill=True)
self._queue.put(bn)
return ""
def run(self):
while self._keepGoing:
try:
self._server.handle_request()
except:
if _VERBOSE:
tp, val, tb = sys.exc_info()
print "Exception in BreakListenerThread.run():", str(tp), str(val)
self._keepGoing = False
def AskToStop(self):
self._keepGoing = False
if type(self._server) is not types.NoneType:
if _VERBOSE: print "Before calling server close on breakpoint server"
self._server.server_close()
if _VERBOSE: print "Calling server close on breakpoint server"
self._server = None
class DebuggerHarness(object):
def __init__(self):
# Host and port for debugger-side RPC server
self._hostname = sys.argv[1]
self._portNumber = int(sys.argv[2])
# Name the gui proxy object is registered under
self._breakPortNumber = int(sys.argv[3])
# Host and port where the gui proxy can be found.
self._guiHost = sys.argv[4]
self._guiPort = int(sys.argv[5])
# Command to debug.
self._command = sys.argv[6]
# Strip out the harness' arguments so that the process we run will see argv as if
# it was called directly.
sys.argv = sys.argv[6:]
self._currentFrame = None
self._wait = False
# Connect to the gui-side RPC server.
self._guiServerUrl = 'http://' + self._guiHost + ':' + str(self._guiPort) + '/'
if _VERBOSE: print "Connecting to gui server at ", self._guiServerUrl
self._guiServer = xmlrpclib.ServerProxy(self._guiServerUrl,allow_none=1)
# Start the break listener
self._breakQueue = Queue.Queue(50)
self._breakListener = BreakListenerThread(self._hostname, self._breakPortNumber, self._breakQueue)
self._breakListener.start()
# Create the debugger.
self._adb = Adb(self, self._breakQueue)
# Create the debugger-side RPC Server and register functions for remote calls.
self._server = AGXMLRPCServer((self._hostname, self._portNumber), logRequests=0)
self._server.register_function(self.set_step)
self._server.register_function(self.set_continue)
self._server.register_function(self.set_next)
self._server.register_function(self.set_return)
self._server.register_function(self.set_breakpoint)
self._server.register_function(self.clear_breakpoint)
self._server.register_function(self.set_all_breakpoints)
self._server.register_function(self.attempt_introspection)
self._server.register_function(self.execute_in_frame)
self._server.register_function(self.add_watch)
self._server.register_function(self.request_frame_document)
self.frame_stack = []
self.message_frame_dict = {}
self.introspection_list = []
atexit.register(self.do_exit)
def run(self):
self._adb.runFile(self._command)
self.do_exit(kill=True)
def do_exit(self, kill=False):
self._adb.set_quit()
self._breakListener.AskToStop()
self._server.server_close()
try:
self._guiServer.quit()
except:
pass
if kill:
try:
sys.exit()
except:
pass
def set_breakpoint(self, fileName, lineNo):
self._adb.set_break(fileName, lineNo)
return ""
def set_all_breakpoints(self, dict):
self._adb.clear_all_breaks()
for fileName in dict.keys():
lineList = dict[fileName]
for lineNumber in lineList:
self._adb.set_break(fileName, int(lineNumber))
if _VERBOSE: print "Setting break at ", str(lineNumber), " in file ", fileName
return ""
def clear_breakpoint(self, fileName, lineNo):
self._adb.clear_break(fileName, lineNo)
return ""
def add_watch(self, name, text, frame_message, run_once):
if len(frame_message) > 0:
frame = self.message_frame_dict[frame_message]
try:
item = eval(text, frame.f_globals, frame.f_locals)
return self.get_watch_document(item, name)
except:
tp, val, tb = sys.exc_info()
return self.get_exception_document(tp, val, tb)
return ""
def execute_in_frame(self, frame_message, command):
frame = self.message_frame_dict[frame_message]
output = cStringIO.StringIO()
out = sys.stdout
err = sys.stderr
sys.stdout = output
sys.stderr = output
try:
code = compile(command, '<string>', 'single')
exec code in frame.f_globals, frame.f_locals
return output.getvalue()
sys.stdout = out
sys.stderr = err
except:
sys.stdout = out
sys.stderr = err
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -