📄 serial.py
字号:
close() -- Close the connection.
write(str, cnt=None) -- Write cnt (len(str), if cnt=None)
bytes of str to port.
writeVerifyEcho(str, cnt=None) -- sSame as write, but verify
each chr was echoed.
read(cnt=None, timed=FALSE) -- Try to return cnt bytes read from port
(cnt==None, read whatever is there, cnt!=None & timed==TRUE, then read
must complete in configured timeOutMs time).
readTerminated() -- Read from port until terminator string read.
Must complete within timeOutMs, return string minus the terminator.
cmd(str='') -- Write or writeVerifyEcho str, then read till rspType
satisfied, or times out.
"""
def __init__(self):
"""Instance created, but not attached to any port."""
self.debug = FALSE
self.port = PORT_CLOSED
self.cfg = {}
self._rsp = ''
self._bufSize = 0
self._flushed = ''
def _trace(self, tag, msg=''):
"""If debugging enabled, print a message"""
if self.debug:
print 'Port.%s: %s' % (tag, msg)
def _chkSioExec(self, func, args):
"""Execute an sio function, raise SioError on negative return value."""
status = apply(func, args)
self._trace('_chkSioExec', '%s = %s%s' \
% (`status`, func.__name__, `args`))
if status < 0:
raise SioError, func.__name__ + ': ' + SioErrorText(status)
return status
def open(self, cfg):
"""Attach to the port specified in the cfg arg, then:
-Open the port, set baud, parity, stopBits, and dataBits as per cfg.
-Clear xmit/rec buffers, enable DTR, RTS, disable flow ctl.
"""
self.debug = cfg['debug']
self._trace('open')
if self.port >= 0:
raise SioError, 'Port is already open'
port = cfg['port']
self.cfg = cfg
self._bufSize = cfg['rxBufSize']
self._chkSioExec(SioReset, (port, self._bufSize, cfg['txBufSize']))
self._chkSioExec(SioBaud, (port, cfg['baud']))
self._chkSioExec(SioParms, (port, cfg['parity'], cfg['stopBits'],
cfg['dataBits']))
self._chkSioExec(SioTxClear, (port,))
self._chkSioExec(SioRxClear, (port,))
self._chkSioExec(SioDTR, (port, cfg['dtrSignal'][:1]))
self._chkSioExec(SioRTS, (port, cfg['rtsSignal'][:1]))
self._chkSioExec(SioFlow, (port, 'N'))
self.port = port
self._rsp = ''
self._flushed = ''
def flush(self):
"""Save any pending input in _flushed; clear xmit/rec bufs."""
self._trace('flush')
self._flushed = ''
self._flushed = SioGets(self.port, self._bufSize)
self._chkSioExec(SioTxClear, (self.port,))
self._chkSioExec(SioRxClear, (self.port,))
self._trace('flushed', '|%s|' % self._flushed)
def getLastFlush(self):
"""Return the last contents of the flushed buffer."""
return self._flushed[:]
def getLastRsp(self):
"""Return the last contents of the response buffer."""
return self._rsp[:]
def close(self):
"""Close the port."""
self._trace('close')
self._chkSioExec(SioDone, (self.port,))
self.port = PORT_CLOSED
def write(self, str, cnt=None):
"""Write cnt bytes of str out port, cnt defaults to len(str)."""
self._trace('write')
if cnt is None:
cnt = len(str)
elif cnt > len(str):
raise SioError, 'write: request to send more bytes than in the str'
self.flush()
self._chkSioExec(SioPuts, (self.port, str, cnt))
def writeVerifyEcho(self, str, cnt=None):
"""Same as write , but verify each char was echoed by the hdw. """
self._trace('writeVerifyEcho')
if cnt is None:
cnt = len(str)
elif cnt > len(str):
raise SioError, \
'writeVerifyEcho: request to send more bytes than in the str'
i = 0
self.flush()
elapsed = MsTimer()
timeOut = self.cfg['timeOutMs']
msecs = 0
while i < cnt:
sent = str[i]
self._chkSioExec(SioPutc, (self.port, sent))
elapsed.reset()
while 1:
got = SioGetc(self.port)
gotChr = '%c' % got
msecs = elapsed()
if got == WSC_NO_DATA:
if msecs >= timeOut:
raise SioError, \
'Timed out waiting for chr %d of cmd %s to echo'\
% (i, `str`)
elif got < 0:
raise SioError, SioGetc.__name__ + ': ' + SioErrorText(got)
elif gotChr <> sent:
raise SioError, \
'writeVerifyEcho: Bad echo of chr %i in cmd %s:'\
'sent %s, got %s' % (i, `str`, `sent`, `gotChr`)
else:
self._trace('writeVerifyEcho', 'waited %d mSecs' % msecs)
self._trace('writeVerifyEcho', '%s = SioGetc(%d)' \
% (`gotChr`, self.port))
break
i = i + 1
def read(self, cnt=None, timed=FALSE):
"""Attempt to read cnt bytes, if timed TRUE, must complete in cfg time.
"""
self._trace('read')
if cnt is None:
cnt = self._bufSize
if cnt < 0: cnt = 0
if not timed:
self._rsp = SioGets(self.port, cnt)
else:
timeOut = self.cfg['timeOutMs']
buf = ''
got = 0
elapsed = MsTimer()
while 1:
thisCnt = cnt - got
thisBuf = SioGets(self.port, thisCnt)
thisGot = len(thisBuf)
buf = buf + thisBuf
got = got + thisGot
if got < cnt:
msecs = elapsed()
if msecs >= timeOut:
self._rsp = buf
raise SioError, \
'Timed out waiting for input chr %d of %d, '\
'read %s' % (got, cnt, `buf`)
else:
self._rsp = buf
break
return self._rsp[:]
def readTerminated(self):
"""Read from port until terminator read, timed out, or buf overflow.
Terminator is stripped off of result."""
self._trace('readTerminated')
timeOut = self.cfg['timeOutMs']
patLen = len(self.cfg['rspTerm'])
pat = self.cfg['rspTermPat']
if patLen <= 0 or pat is None:
raise SioError, 'Config rspTerm is empty, can not readTerminated'
i = 0
buf = ''
elapsed = MsTimer()
msecs = 0
while 1:
got = SioGetc(self.port)
gotChr = '%c' % got
msecs = elapsed()
if got == WSC_NO_DATA:
if msecs >= timeOut:
self._rsp = buf
raise SioError, \
'Timed out waiting for chr %d of input, read %s' \
% (i, `buf`)
elif got < 0:
self._rsp = buf
raise SioError, SioGetc.__name__ + ': ' + SioErrorText(got)
elif i == self._bufSize:
self._rsp = buf
raise SioError, \
'Buffer overrun getting terminated input, read %s' \
% `buf`
else:
self._trace('readTerminated', 'waited %d mSecs' % msecs)
self._trace('readTerminated', '%s = SioGetc(%d)' \
% (`gotChr`, self.port))
buf = buf + gotChr
i = i + 1
if pat.search(buf) >= 0:
buf = buf[:-patLen]
break
self._rsp = buf
return self._rsp[:]
def cmd(self, str=''):
"""Send a str, get a rsp according to cfg."""
self._trace('cmd')
rspType = self.cfg['rspType']
if rspType == RSP_NONE:
pass
elif rspType == RSP_TERMINATED:
if len(self.cfg['rspTerm']) <= 0:
raise SioError, \
'Config param rspTerm is empty, can not readTerminated'
elif rspType == RSP_FIXEDLEN:
fixLen = self.cfg['rspFixedLen']
if fixLen <= 0:
raise SioError, \
'Config param rspFixedLen is <= 0, can not read '\
'fixed length reply'
elif rspType == RSP_BEST_EFFORT:
pass
cmdStr = str + self.cfg['cmdTerm']
rsp = ''
if self.cfg['cmdsEchoed']:
self.writeVerifyEcho(cmdStr)
else:
self.write(cmdStr)
if rspType == RSP_NONE:
pass
elif rspType == RSP_TERMINATED:
rsp = self.readTerminated()
elif rspType == RSP_FIXEDLEN:
rsp = self.read(cnt=fixLen)
elif rspType == RSP_BEST_EFFORT:
rsp = self.read()
self._rsp = rsp
return self._rsp[:]
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -