⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 serial.py

📁 Python.Tkinter编程实例代码多多学习
💻 PY
📖 第 1 页 / 共 2 页
字号:
       close() -- Close the connection.

       write(str, cnt=None) -- Write cnt (len(str), if cnt=None) 
       bytes of str to port.

       writeVerifyEcho(str, cnt=None) -- sSame as write, but verify 
       each chr was echoed.

       read(cnt=None, timed=FALSE) -- Try to return cnt bytes read from port
       (cnt==None, read whatever is there, cnt!=None & timed==TRUE, then read 
       must complete in configured timeOutMs time).

       readTerminated() -- Read from port until terminator string read.
       Must complete within timeOutMs, return string minus the terminator.

       cmd(str='') -- Write or writeVerifyEcho str, then read till rspType
       satisfied, or times out.

"""

    def __init__(self):
        """Instance created, but not attached to any port."""
        self.debug = FALSE
        self.port = PORT_CLOSED
        self.cfg = {}
        self._rsp = ''
        self._bufSize = 0
        self._flushed = ''

    def _trace(self, tag, msg=''):
        """If debugging enabled, print a message"""
        if self.debug:
            print 'Port.%s: %s' % (tag, msg)

    def _chkSioExec(self, func, args):
        """Execute an sio function, raise SioError on negative return value."""
        status = apply(func, args)
        self._trace('_chkSioExec', '%s = %s%s' \
                    % (`status`, func.__name__, `args`))
        if status < 0:
            raise SioError, func.__name__ + ': ' + SioErrorText(status)
        return status

    def open(self, cfg):
        """Attach to the port specified in the cfg arg, then:
        
        -Open the port, set baud, parity, stopBits, and dataBits as per cfg.
        -Clear xmit/rec buffers, enable DTR, RTS, disable flow ctl.

        """
        self.debug = cfg['debug']
        self._trace('open')
        if self.port >= 0:
            raise SioError, 'Port is already open'
        port = cfg['port']
        self.cfg = cfg
        self._bufSize = cfg['rxBufSize']
        self._chkSioExec(SioReset, (port, self._bufSize, cfg['txBufSize']))
        self._chkSioExec(SioBaud, (port, cfg['baud']))
        self._chkSioExec(SioParms, (port, cfg['parity'], cfg['stopBits'], 
                                    cfg['dataBits']))
        self._chkSioExec(SioTxClear, (port,))
        self._chkSioExec(SioRxClear, (port,))
        self._chkSioExec(SioDTR, (port, cfg['dtrSignal'][:1]))
        self._chkSioExec(SioRTS, (port, cfg['rtsSignal'][:1]))
        self._chkSioExec(SioFlow, (port, 'N'))
        self.port = port
        self._rsp = ''
        self._flushed = ''

    def flush(self):
        """Save any pending input in _flushed; clear xmit/rec bufs."""
        self._trace('flush')
        self._flushed = ''
        self._flushed = SioGets(self.port, self._bufSize)
        self._chkSioExec(SioTxClear, (self.port,))
        self._chkSioExec(SioRxClear, (self.port,))
        self._trace('flushed', '|%s|' % self._flushed)

    def getLastFlush(self):
        """Return the last contents of the flushed buffer."""
        return self._flushed[:]

    def getLastRsp(self):
        """Return the last contents of the response buffer."""
        return self._rsp[:]
   
    def close(self):
        """Close the port."""
        self._trace('close')
        self._chkSioExec(SioDone, (self.port,))
        self.port = PORT_CLOSED

    def write(self, str, cnt=None):
        """Write cnt bytes of str out port, cnt defaults to len(str)."""
        self._trace('write')
        if cnt is None:
            cnt = len(str)
        elif cnt > len(str):
            raise SioError, 'write: request to send more bytes than in the str'
        self.flush()
        self._chkSioExec(SioPuts, (self.port, str, cnt))

    def writeVerifyEcho(self, str, cnt=None):
        """Same as write , but verify each char was echoed by the hdw. """
        self._trace('writeVerifyEcho')
        if cnt is None:
            cnt = len(str)
        elif cnt > len(str):
            raise SioError, \
                  'writeVerifyEcho: request to send more bytes than in the str'
        i = 0
        self.flush()
        elapsed = MsTimer()
        timeOut = self.cfg['timeOutMs']
        msecs = 0
        while i < cnt:
            sent = str[i]
            self._chkSioExec(SioPutc, (self.port, sent))
            elapsed.reset()
            while 1:
                got = SioGetc(self.port)
                gotChr = '%c' % got
                msecs = elapsed()
                if got == WSC_NO_DATA:
                    if msecs >= timeOut:
                        raise SioError, \
                              'Timed out waiting for chr %d of cmd %s to echo'\
                              % (i, `str`)
                elif got < 0:
                    raise SioError, SioGetc.__name__ + ': ' + SioErrorText(got)
                elif gotChr <> sent:
                    raise SioError, \
                          'writeVerifyEcho: Bad echo of chr %i in cmd %s:'\
                          'sent %s, got %s' % (i, `str`, `sent`, `gotChr`)
                else:
                    self._trace('writeVerifyEcho', 'waited %d mSecs' % msecs)
                    self._trace('writeVerifyEcho', '%s = SioGetc(%d)' \
                                % (`gotChr`, self.port))
                    break
            i = i + 1

    def read(self, cnt=None, timed=FALSE):
        """Attempt to read cnt bytes, if timed TRUE, must complete in cfg time.
        """
        self._trace('read')
        if cnt is None:
            cnt = self._bufSize
        if cnt < 0: cnt = 0
        if not timed:
            self._rsp = SioGets(self.port, cnt)
        else:
            timeOut = self.cfg['timeOutMs']
            buf = ''
            got = 0
            elapsed = MsTimer()
            while 1:
                thisCnt = cnt - got
                thisBuf = SioGets(self.port, thisCnt)
                thisGot = len(thisBuf)
                buf = buf + thisBuf
                got = got + thisGot
                if got < cnt:
                    msecs = elapsed()
                    if msecs >= timeOut:
                        self._rsp = buf
                        raise SioError, \
                              'Timed out waiting for input chr %d of %d, '\
                              'read %s' % (got, cnt, `buf`)
                else:
                    self._rsp = buf
                    break
        return self._rsp[:]

    def readTerminated(self):
        """Read from port until terminator read, timed out, or buf overflow.
        Terminator is stripped off of result."""
        self._trace('readTerminated')
        timeOut = self.cfg['timeOutMs']
        patLen = len(self.cfg['rspTerm'])
        pat = self.cfg['rspTermPat']
        if patLen <= 0 or pat is None:
            raise SioError, 'Config rspTerm is empty, can not readTerminated'
        i = 0
        buf = ''
        elapsed = MsTimer()
        msecs = 0
        while 1:
            got = SioGetc(self.port)
            gotChr = '%c' % got
            msecs = elapsed()
            if got == WSC_NO_DATA:
                if msecs >= timeOut:
                    self._rsp = buf
                    raise SioError, \
                          'Timed out waiting for chr %d of input, read %s' \
                          % (i, `buf`)
            elif got < 0:
                self._rsp = buf
                raise SioError, SioGetc.__name__ + ': ' + SioErrorText(got)
            elif i == self._bufSize:
                self._rsp = buf
                raise SioError, \
                      'Buffer overrun getting terminated input, read %s' \
                      % `buf`
            else:
                self._trace('readTerminated', 'waited %d mSecs' % msecs)
                self._trace('readTerminated', '%s = SioGetc(%d)' \
                            % (`gotChr`, self.port))
                buf = buf + gotChr
                i = i + 1
                if pat.search(buf) >= 0:
                    buf = buf[:-patLen]
                    break
        self._rsp = buf
        return self._rsp[:]

    def cmd(self, str=''):
        """Send a str, get a rsp according to cfg."""
        self._trace('cmd')
        rspType = self.cfg['rspType']

        if rspType == RSP_NONE:
            pass
        elif rspType == RSP_TERMINATED:
            if len(self.cfg['rspTerm']) <= 0:
                raise SioError, \
                      'Config param rspTerm is empty, can not readTerminated'
        elif rspType == RSP_FIXEDLEN:
            fixLen = self.cfg['rspFixedLen']
            if fixLen <= 0:
                raise SioError, \
                      'Config param rspFixedLen is <= 0, can not read '\
                      'fixed length reply'
        elif rspType == RSP_BEST_EFFORT:
            pass

        cmdStr = str + self.cfg['cmdTerm']
        rsp = ''

        if self.cfg['cmdsEchoed']:
            self.writeVerifyEcho(cmdStr)
        else:
            self.write(cmdStr)

        if rspType == RSP_NONE:
            pass
        elif rspType == RSP_TERMINATED:
            rsp = self.readTerminated()
        elif rspType == RSP_FIXEDLEN:
            rsp = self.read(cnt=fixLen)
        elif rspType == RSP_BEST_EFFORT:
            rsp = self.read()

        self._rsp = rsp
        return self._rsp[:]

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -