⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 serialposix.py

📁 tinyos-2.x.rar
💻 PY
📖 第 1 页 / 共 2 页
字号:
            raise ValueError,'invalid stopit specification:'+str(stopbits)
        #setup parity
        self.iflag = self.iflag & ~(TERMIOS.INPCK|TERMIOS.ISTRIP)
        if parity == PARITY_NONE:
            self.cflag = self.cflag & ~(TERMIOS.PARENB|TERMIOS.PARODD)
        elif parity == PARITY_EVEN:
            self.cflag = self.cflag & ~(TERMIOS.PARODD)
            self.cflag = self.cflag |  (TERMIOS.PARENB)
        elif parity == PARITY_ODD:
            self.cflag = self.cflag |  (TERMIOS.PARENB|TERMIOS.PARODD)
        else:
            raise ValueError,'invalid parity: '+str(par)
        #setup flow control
        #xonxoff
        if hasattr(TERMIOS, 'IXANY'):
            if xonxoff:
                self.iflag = self.iflag |  (TERMIOS.IXON|TERMIOS.IXOFF|TERMIOS.IXANY)
            else:
                self.iflag = self.iflag & ~(TERMIOS.IXON|TERMIOS.IXOFF|TERMIOS.IXANY)
        else:
            if xonxoff:
                self.iflag = self.iflag |  (TERMIOS.IXON|TERMIOS.IXOFF)
            else:
                self.iflag = self.iflag & ~(TERMIOS.IXON|TERMIOS.IXOFF)
        #rtscts
        if hasattr(TERMIOS, 'CRTSCTS'):
            if rtscts:
                self.cflag = self.cflag |  (TERMIOS.CRTSCTS)
            else:
                self.cflag = self.cflag & ~(TERMIOS.CRTSCTS)
        elif hasattr(TERMIOS, 'CNEW_RTSCTS'):   #try it with alternate constant name
            if rtscts:
                self.cflag = self.cflag |  (TERMIOS.CNEW_RTSCTS)
            else:
                self.cflag = self.cflag & ~(TERMIOS.CNEW_RTSCTS)
        #XXX should there be a warning if setting up rtscts (and xonxoff etc) fails??
        
        #buffer
        #vmin "minimal number of characters to be read. = for non blocking"
        if vmin<0 or vmin>255:
            raise ValueError,'invalid vmin: '+str(vmin)
        self.cc[TERMIOS.VMIN] = vmin
        #vtime
        if vtime<0 or vtime>255:
            raise ValueError,'invalid vtime: '+str(vtime)
        self.cc[TERMIOS.VTIME] = vtime
        #activate settings
        self.__tcsetattr()

    def __tcsetattr(self):
        """internal function to set port attributes"""
        termios.tcsetattr(self.fd, TERMIOS.TCSANOW, [self.iflag,self.oflag,self.cflag,self.lflag,self.ispeed,self.ospeed,self.cc])

    def __tcgetattr(self):
        """internal function to get port attributes"""
        self.iflag,self.oflag,self.cflag,self.lflag,self.ispeed,self.ospeed,self.cc = termios.tcgetattr(self.fd)

    def close(self):
        """close port"""
        if self.fd:
            os.close(self.fd)
            self.fd = None

    def setBaudrate(self, baudrate):
        """change baudrate after port is open"""
        if not self.fd: raise portNotOpenError
        self.__tcgetattr()  #read current settings
        #setup baudrate
        try:
            self.ispeed = self.ospeed = baudIntToEnum[baudrate]
        except:
            raise ValueError,'invalid baud rate: %s' % baudrate
        self.__tcsetattr()

    def inWaiting(self):
        """how many character are in the input queue"""
        #~ s = fcntl.ioctl(self.fd, TERMIOS.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str)
        return struct.unpack('I',s)[0]

    def write(self, data):
        """write a string to the port"""
        if not self.fd: raise portNotOpenError
        t = len(data)
        d = data
        while t>0:
            n = os.write(self.fd, d)
            d = d[n:]
            t = t - n

    def read(self, size=1):
        """read a number of bytes from the port.
        the default is one (unlike files)"""
        if not self.fd: raise portNotOpenError
        read = ''
        inp = None
        if size > 0:
            while len(read) < size:
                #print "\tread(): size",size, "have", len(read)    #debug
                ready,_,_ = select.select([self.fd],[],[], self.timeout)
                if not ready:
                    break   #timeout
                buf = os.read(self.fd, size-len(read))
                read = read + buf
                if self.timeout >= 0 and not buf:
                    break  #early abort on timeout
        return read

    def flushInput(self):
        """clear input queue"""
        if not self.fd:
            raise portNotOpenError
        termios.tcflush(self.fd, TERMIOS.TCIFLUSH)

    def flushOutput(self):
        """flush output"""
        if not self.fd:
            raise portNotOpenError
        termios.tcflush(self.fd, TERMIOS.TCOFLUSH)

    def sendBreak(self):
        """send break signal"""
        if not self.fd:
            raise portNotOpenError
        termios.tcsendbreak(self.fd, 0)

    def drainOutput(self):
        """internal - not portable!"""
        if not self.fd: raise portNotOpenError
        termios.tcdrain(self.fd)

    def nonblocking(self):
        """internal - not portable!"""
        if not self.fd:
            raise portNotOpenError
        fcntl.fcntl(self.fd, FCNTL.F_SETFL, FCNTL.O_NONBLOCK)

    def getDSR(self):
        """read terminal status line"""
        if not self.fd: raise portNotOpenError
        s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
        return struct.unpack('I',s)[0] & TIOCM_DSR

    def getCD(self):
        """read terminal status line"""
        if not self.fd: raise portNotOpenError
        s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
        return struct.unpack('I',s)[0] & TIOCM_CD

    def getRI(self):
        """read terminal status line"""
        if not self.fd: raise portNotOpenError
        s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
        return struct.unpack('I',s)[0] & TIOCM_RI

    def getCTS(self):
        """read terminal status line"""
        if not self.fd: raise portNotOpenError
        s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
        return struct.unpack('I',s)[0] & TIOCM_CTS

    def setDTR(self,on=1):
        """set terminal status line"""
        if not self.fd: raise portNotOpenError
        if on:
            fcntl.ioctl(self.fd, TIOCMBIS, TIOCM_DTR_str)
        else:
            fcntl.ioctl(self.fd, TIOCMBIC, TIOCM_DTR_str)

    def setRTS(self,on=1):
        """set terminal status line"""
        if not self.fd: raise portNotOpenError
        if on:
            fcntl.ioctl(self.fd, TIOCMBIS, TIOCM_RTS_str)
        else:
            fcntl.ioctl(self.fd, TIOCMBIC, TIOCM_RTS_str)

if __name__ == '__main__':
    s = Serial(0,
                 baudrate=19200,        #baudrate
                 bytesize=EIGHTBITS,    #number of databits
                 parity=PARITY_EVEN,    #enable parity checking
                 stopbits=STOPBITS_ONE, #number of stopbits
                 timeout=3,             #set a timeout value, None for waiting forever
                 xonxoff=0,             #enable software flow control
                 rtscts=0,              #enable RTS/CTS flow control
               )
    s.setRTS(1)
    s.setDTR(1)
    s.flushInput()
    s.flushOutput()
    s.write('hello')
    print repr(s.read(5))
    print s.inWaiting()
    del s

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -