📄 test_debugger.py
字号:
'''
The idea is that we record the commands sent to the debugger and reproduce them from this script
(so, this works as the client, which spaws the debugger as a separate process and communicates
to it as if it was run from the outside)
'''
import unittest
port = 13333
import os
def NormFile(filename):
try:
rPath = os.path.realpath #@UndefinedVariable
except:
# jython does not support os.path.realpath
# realpath is a no-op on systems without islink support
rPath = os.path.abspath
return os.path.normcase(rPath(filename))
PYDEVD_FILE = NormFile('../pydevd.py')
SHOW_WRITES_AND_READS = False
SHOW_RESULT_STR = False
import subprocess
import sys
import socket
import threading
import time
#=======================================================================================================================
# ReaderThread
#=======================================================================================================================
class ReaderThread(threading.Thread):
def __init__(self, sock):
threading.Thread.__init__(self)
self.setDaemon(True)
self.sock = sock
self.lastReceived = None
def run(self):
try:
buf = ''
while True:
l = self.sock.recv(1024)
buf += l
if '\n' in buf:
self.lastReceived = buf
buf = ''
if SHOW_WRITES_AND_READS:
print 'Test Reader Thread Received %s' % self.lastReceived.strip()
except:
pass #ok, finished it
def DoKill(self):
self.sock.close()
#=======================================================================================================================
# AbstractWriterThread
#=======================================================================================================================
class AbstractWriterThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.setDaemon(True)
self.finishedOk = False
def DoKill(self):
self.readerThread.DoKill()
self.sock.close()
def Write(self, s):
last = self.readerThread.lastReceived
if SHOW_WRITES_AND_READS:
print 'Test Writer Thread Written %s' % (s,)
self.sock.send(s+'\n')
time.sleep(0.2)
i = 0
while last == self.readerThread.lastReceived and i < 10:
i += 1
time.sleep(0.1)
def StartSocket(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('', port))
s.listen(1)
newSock, addr = s.accept()
if SHOW_WRITES_AND_READS:
print 'Test Writer Thread Socket:', newSock, addr
readerThread = self.readerThread = ReaderThread(newSock)
readerThread.start()
self.sock = newSock
self._sequence = -1
#initial command is always the version
self.WriteVersion()
def NextSeq(self):
self._sequence += 2
return self._sequence
def WaitForNewThread(self):
i = 0
#wait for hit breakpoint
while not '<xml><thread name="' in self.readerThread.lastReceived or '<xml><thread name="pydevd.' in self.readerThread.lastReceived:
i += 1
time.sleep(1)
if i >= 15:
raise AssertionError('After %s seconds, a thread was not created.' % i)
#we have something like <xml><thread name="MainThread" id="12103472" /></xml>
splitted = self.readerThread.lastReceived.split('"')
threadId = splitted[3]
return threadId
def WaitForBreakpointHit(self):
i = 0
#wait for hit breakpoint
while not 'stop_reason="111"' in self.readerThread.lastReceived:
i += 1
time.sleep(1)
if i >= 10:
raise AssertionError('After %s seconds, a break was not hit.' % i)
#we have something like <xml><thread id="12152656" stop_reason="111"><frame id="12453120" ...
splitted = self.readerThread.lastReceived.split('"')
threadId = splitted[1]
frameId = splitted[5]
return threadId, frameId
def WriteMakeInitialRun(self):
self.Write("101\t%s\t" % self.NextSeq())
def WriteVersion(self):
self.Write("501\t%s\t1.0" % self.NextSeq())
def WriteAddBreakpoint(self, line, func):
if func is not None:
self.Write("111\t%s\t%s\t%s\t**FUNC**%s\tNone" % (self.NextSeq(), self.TEST_FILE, line, func))
else:
self.Write("111\t%s\t%s\t%s\tNone" % (self.NextSeq(), self.TEST_FILE, line))
def WriteRemoveBreakpoint(self, line):
self.Write("112\t%s\t%s\t%s" % (self.NextSeq(), self.TEST_FILE, line))
def WriteGetFrame(self, threadId, frameId):
self.Write("114\t%s\t%s\t%s\tFRAME" % (self.NextSeq(), threadId, frameId))
def WriteStepOver(self, threadId):
self.Write("108\t%s\t%s" % (self.NextSeq(), threadId,))
def WriteSuspendThread(self, threadId):
self.Write("105\t%s\t%s" % (self.NextSeq(), threadId,))
def WriteRunThread(self, threadId):
self.Write("106\t%s\t%s" % (self.NextSeq(), threadId,))
def WriteKillThread(self, threadId):
self.Write("104\t%s\t%s" % (self.NextSeq(), threadId,))
#=======================================================================================================================
# WriterThreadCase4
#=======================================================================================================================
class WriterThreadCase4(AbstractWriterThread):
TEST_FILE = NormFile('_debugger_case4.py')
def run(self):
self.StartSocket()
self.WriteMakeInitialRun()
threadId = self.WaitForNewThread()
self.WriteSuspendThread(threadId)
time.sleep(4) #wait for time enough for the test to finish if it wasn't suspended
self.WriteRunThread(threadId)
self.finishedOk = True
#=======================================================================================================================
# WriterThreadCase3
#=======================================================================================================================
class WriterThreadCase3(AbstractWriterThread):
TEST_FILE = NormFile('_debugger_case3.py')
def run(self):
self.StartSocket()
self.WriteMakeInitialRun()
time.sleep(1)
self.WriteAddBreakpoint(4, None)
threadId, frameId = self.WaitForBreakpointHit()
self.WriteGetFrame(threadId, frameId)
self.WriteRunThread(threadId)
threadId, frameId = self.WaitForBreakpointHit()
self.WriteGetFrame(threadId, frameId)
self.WriteRemoveBreakpoint(4)
self.WriteRunThread(threadId)
assert 15 == self._sequence, 'Expected 15. Had: %s' % self._sequence
self.finishedOk = True
#=======================================================================================================================
# WriterThreadCase2
#=======================================================================================================================
class WriterThreadCase2(AbstractWriterThread):
TEST_FILE = NormFile('_debugger_case2.py')
def run(self):
self.StartSocket()
self.WriteAddBreakpoint(3, 'Call4') #seq = 3
self.WriteMakeInitialRun()
threadId, frameId = self.WaitForBreakpointHit()
self.WriteGetFrame(threadId, frameId)
self.WriteAddBreakpoint(14, 'Call2')
self.WriteRunThread(threadId)
threadId, frameId = self.WaitForBreakpointHit()
self.WriteGetFrame(threadId, frameId)
self.WriteRunThread(threadId)
assert 15 == self._sequence, 'Expected 15. Had: %s' % self._sequence
self.finishedOk = True
#=======================================================================================================================
# WriterThreadCase1
#=======================================================================================================================
class WriterThreadCase1(AbstractWriterThread):
TEST_FILE = NormFile('_debugger_case1.py')
def run(self):
self.StartSocket()
self.WriteAddBreakpoint(6, 'SetUp')
self.WriteMakeInitialRun()
threadId, frameId = self.WaitForBreakpointHit()
self.WriteGetFrame(threadId, frameId)
self.WriteStepOver(threadId)
self.WriteGetFrame(threadId, frameId)
self.WriteRunThread(threadId)
assert 13 == self._sequence, 'Expected 13. Had: %s' % self._sequence
self.finishedOk = True
#=======================================================================================================================
# Test
#=======================================================================================================================
class Test(unittest.TestCase):
def CheckCase(self, writerThreadClass):
writerThread = writerThreadClass()
writerThread.start()
args = [
'python',
PYDEVD_FILE,
'--RECORD_SOCKET_READS',
'--client',
'localhost',
'--port',
str(port),
'--file',
writerThread.TEST_FILE,
]
process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
class ProcessReadThread(threading.Thread):
def run(self):
self.resultStr = None
self.resultStr = process.stdout.read()
process.stdout.close()
def DoKill(self):
process.stdout.close()
processReadThread = ProcessReadThread()
processReadThread.setDaemon(True)
processReadThread.start()
while writerThread.isAlive():
time.sleep(.2)
for i in range(10):
if processReadThread.resultStr is None:
time.sleep(.5)
else:
break
else:
writerThread.DoKill()
if SHOW_RESULT_STR:
print processReadThread.resultStr
if processReadThread.resultStr is None:
self.fail("The other process may still be running -- and didn't give any output")
if 'TEST SUCEEDED' not in processReadThread.resultStr:
self.fail(processReadThread.resultStr)
if not writerThread.finishedOk:
self.fail("The thread that was doing the tests didn't finish succesfully. Output: %s" % processReadThread.resultStr)
def testCase1(self):
self.CheckCase(WriterThreadCase1)
def testCase2(self):
self.CheckCase(WriterThreadCase2)
def testCase3(self):
self.CheckCase(WriterThreadCase3)
def testCase4(self):
self.CheckCase(WriterThreadCase4)
#=======================================================================================================================
# Main
#=======================================================================================================================
if __name__ == '__main__':
suite = unittest.makeSuite(Test)
# suite = unittest.TestSuite()
# suite.addTest(Test('testCase1'))
unittest.TextTestRunner().run(suite)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -