⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 serialjava.py

📁 Contiki是一个开源
💻 PY
字号:
#!jython#module for serial IO for Jython and JavaComm#see __init__.py##(C) 2002 Chris Liechti <cliechti@gmx.net># this is distributed under a free software license, see license.txtimport sys, os, string, javax.commimport serialutilVERSION = string.split("$Revision: 1.1 $")[1]     #extract CVS versionPARITY_NONE, PARITY_EVEN, PARITY_ODD, PARITY_MARK, PARITY_SPACE = (0,1,2,3,4)STOPBITS_ONE, STOPBITS_TWO, STOPBITS_ONE_HALVE = (1, 2, 3)FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS = (5,6,7,8)portNotOpenError = ValueError('port not open')def device(portnumber):    enum = javax.comm.CommPortIdentifier.getPortIdentifiers()    ports = []    while enum.hasMoreElements():        el = enum.nextElement()        if el.getPortType() == javax.comm.CommPortIdentifier.PORT_SERIAL:            ports.append(el)    return ports[portnumber]class Serial(serialutil.FileLike):    def __init__(self,                 port,                  #number of device, numbering starts at                                        #zero. if everything fails, the user                                        #can specify a device string, note                                        #that this isn't portable anymore                 baudrate=9600,         #baudrate                 bytesize=EIGHTBITS,    #number of databits                 parity=PARITY_NONE,    #enable parity checking                 stopbits=STOPBITS_ONE, #number of stopbits                 timeout=None,          #set a timeout value, None for waiting forever                 xonxoff=0,             #enable software flow control                 rtscts=0,              #enable RTS/CTS flow control                 ):        if type(port) == type(''):      #strings are taken directly            portId = javax.comm.CommPortIdentifier.getPortIdentifier(port)        else:            portId = device(port)     #numbers are transformed to a comportid obj        self.portstr = portId.getName()        try:            self.sPort = portId.open("python serial module", 10)        except Exception, msg:            self.sPort = None            raise serialutil.SerialException, "could not open port: %s" % msg        self.instream = self.sPort.getInputStream()        self.outstream = self.sPort.getOutputStream()        self.sPort.enableReceiveTimeout(30)        if bytesize == FIVEBITS:            self.databits = javax.comm.SerialPort.DATABITS_5        elif bytesize == SIXBITS:            self.databits = javax.comm.SerialPort.DATABITS_6        elif bytesize == SEVENBITS:            self.databits = javax.comm.SerialPort.DATABITS_7        elif bytesize == EIGHTBITS:            self.databits = javax.comm.SerialPort.DATABITS_8        else:            raise ValueError, "unsupported bytesize"                if stopbits == STOPBITS_ONE:            self.jstopbits = javax.comm.SerialPort.STOPBITS_1        elif stopbits == STOPBITS_ONE_HALVE:            self.jstopbits = javax.comm.SerialPort.STOPBITS_1_5        elif stopbits == STOPBITS_TWO:            self.jstopbits = javax.comm.SerialPort.STOPBITS_2        else:            raise ValueError, "unsupported number of stopbits"        if parity == PARITY_NONE:            self.jparity = javax.comm.SerialPort.PARITY_NONE        elif parity == PARITY_EVEN:            self.jparity = javax.comm.SerialPort.PARITY_EVEN        elif parity == PARITY_ODD:            self.jparity = javax.comm.SerialPort.PARITY_ODD        elif parity == PARITY_MARK:            self.jparity = javax.comm.SerialPort.PARITY_MARK        elif parity == PARITY_SPACE:            self.jparity = javax.comm.SerialPort.PARITY_SPACE        else:            raise ValueError, "unsupported parity type"        jflowin = jflowout = 0        if rtscts:            jflowin = jflowin | javax.comm.SerialPort.FLOWCONTROL_RTSCTS_IN             jflowout = jflowout | javax.comm.SerialPort.FLOWCONTROL_RTSCTS_OUT        if xonxoff:            jflowin = jflowin | javax.comm.SerialPort.FLOWCONTROL_XONXOFF_IN            jflowout = jflowout | javax.comm.SerialPort.FLOWCONTROL_XONXOFF_OUT                self.sPort.setSerialPortParams(baudrate, self.databits, self.jstopbits, self.jparity)        self.sPort.setFlowControlMode(jflowin | jflowout)                self.timeout = timeout        if timeout >= 0:            self.sPort.enableReceiveTimeout(timeout*1000)        else:            self.sPort.disableReceiveTimeout()    def close(self):        if self.sPort:            self.instream.close()            self.outstream.close()            self.sPort.close()            self.sPort = None    def setBaudrate(self, baudrate):        """change baudrate after port is open"""        if not self.sPort: raise portNotOpenError        self.sPort.setSerialPortParams(baudrate, self.databits, self.jstopbits, self.jparity)    def inWaiting(self):        if not self.sPort: raise portNotOpenError        return self.instream.available()    def write(self, data):        if not self.sPort: raise portNotOpenError        self.outstream.write(data)    def read(self, size=1):        if not self.sPort: raise portNotOpenError        read = ''        if size > 0:            while len(read) < size:                x = self.instream.read()                if x == -1:                    if self.timeout >= 0:                        break                else:                    read = read + chr(x)        return read    def flushInput(self):        if not self.sPort: raise portNotOpenError        self.instream.skip(self.instream.available())    def flushOutput(self):        if not self.sPort: raise portNotOpenError        self.outstream.flush()    def sendBreak(self):        if not self.sPort: raise portNotOpenError        self.sPort.sendBreak()    def getDSR(self):        if not self.sPort: raise portNotOpenError        self.sPort.isDSR()    def getCD(self):        if not self.sPort: raise portNotOpenError        self.sPort.isCD()    def getRI(self):        if not self.sPort: raise portNotOpenError        self.sPort.isRI()    def getCTS(self):        if not self.sPort: raise portNotOpenError        self.sPort.isCTS()    def setDTR(self,on=1):        if not self.sPort: raise portNotOpenError        self.sPort.setDTR(on)    def setRTS(self,on=1):        if not self.sPort: raise portNotOpenError        self.sPort.setRTS(on)if __name__ == '__main__':    s = Serial(0,                 baudrate=19200,        #baudrate                 bytesize=EIGHTBITS,    #number of databits                 parity=PARITY_EVEN,    #enable parity checking                 stopbits=STOPBITS_ONE, #number of stopbits                 timeout=3,             #set a timeout value, None for waiting forever                 xonxoff=0,             #enable software flow control                 rtscts=0,              #enable RTS/CTS flow control               )    s.setRTS(1)    s.setDTR(1)    s.flushInput()    s.flushOutput()    s.write('hello')    print repr(s.read(5))    print s.inWaiting()    del s

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -