⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 console.py

📁 xen虚拟机源代码安装包
💻 PY
字号:
#!/usr/bin/python""" XmConsole.py - Interact with a xen console, getting return codes and                output from commands executed there. Copyright (C) International Business Machines Corp., 2005 Author: Dan Smith <danms@us.ibm.com> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; under version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA NB: This requires the domU's prompt to be set to     a _very_ specific value, set in the PROMPT     variable of this script"""import sysimport osimport ptyimport ttyimport termiosimport fcntlimport selectimport archfrom Test import *TIMEDOUT = 1RUNAWAY  = 2class ConsoleError(Exception):    def __init__(self, msg, reason=TIMEDOUT):        self.errMsg = msg        self.reason = reason    def __str__(self):        return str(self.errMsg)class XmConsole:    def __init__(self, domain, historyLimit=256, historySaveAll=True, historySaveCmds=False, cLimit=131072):        """        Parameters:          historyLimit:     specifies how many lines of history are maintained          historySaveAll:   determines whether or not extra messages in                            between commands are saved          historySaveCmds : determines whether or not the command echos                            are included in the history buffer        """        self.TIMEOUT          = 30        self.PROMPT           = "@%@%> "        self.domain           = domain        self.historyBuffer    = []        self.historyLines     = 0        self.historyLimit     = historyLimit        self.historySaveAll   = historySaveAll        self.historySaveCmds  = historySaveCmds        self.debugMe          = False        self.limit            = cLimit        consoleCmd = ["/usr/sbin/xm", "xm", "console", domain]        if verbose:            print "Console executing: %s" % str(consoleCmd)        pid, fd = pty.fork()        if pid == 0:            os.execvp("/usr/sbin/xm", consoleCmd[1:])        self.consolePid = pid        self.consoleFd  = fd        tty.setraw(self.consoleFd, termios.TCSANOW)    def __addToHistory(self, line):        self.historyBuffer.append(line)        self.historyLines += 1        if self.historyLines > self.historyLimit:            self.historyBuffer = self.historyBuffer[1:]            self.historyLines -= 1    def clearHistory(self):        """Clear the history buffer"""        self.historyBuffer = []        self.historyLines = 0    def getHistory(self):        """Returns a string containing the entire history buffer"""        output = ""        for line in self.historyBuffer:            output += line + "\n"        return output    def setTimeout(self, timeout):        """Sets the timeout used to determine if a remote command        has blocked"""        self.TIMEOUT = timeout    def setPrompt(self, prompt):        """Sets the string key used to delimit the end of command        output"""        self.PROMPT = prompt    def __getprompt(self, fd):        timeout = 0        bytes = 0        buffer = ""        while timeout < 180:            # eat anything while total bytes less than limit else raise RUNAWAY            while (not self.limit) or (bytes < self.limit):                i, o, e = select.select([fd], [], [], 1)                if fd in i:                    try:                        foo = os.read(fd, 1)                        if self.debugMe:                            sys.stdout.write(foo)                        bytes += 1                        buffer += foo                    except Exception, exn:                        raise ConsoleError(str(exn))                else:                    break            else:                raise ConsoleError("Console run-away (exceeded %i bytes)"                                   % self.limit, RUNAWAY)            # Check to see if the buffer contains anything interetsing            arch.checkBuffer(buffer)            # press enter            os.write(self.consoleFd, "\n")            # look for prompt            for prompt_char in "\r\n" + self.PROMPT:                i, o, e = select.select([fd], [], [], 1)                if fd in i:                    try:                        foo = os.read(fd, 1)                        if self.debugMe:                            sys.stdout.write(foo)                        if foo != prompt_char:                            break                    except Exception, exn:                        raise ConsoleError(str(exn))                else:                    timeout += 1                    break            else:                break        else:            raise ConsoleError("Timed out waiting for console prompt")    def __runCmd(self, command, saveHistory=True):        output = ""        line   = ""        lines  = 0        bytes  = 0        self.__getprompt(self.consoleFd)        if verbose:            print "[%s] Sending `%s'" % (self.domain, command)        os.write(self.consoleFd, "%s\n" % command)        while True:            i, o, e = select.select([self.consoleFd], [], [], self.TIMEOUT)            if self.consoleFd in i:                try:                    str = os.read(self.consoleFd, 1)                    if self.debugMe:                        sys.stdout.write(str)                    bytes += 1                except Exception, exn:                    raise ConsoleError(                        "Failed to read from console (fd=%i): %s" %                        (self.consoleFd, exn))            else:                raise ConsoleError("Timed out waiting for console command")            if self.limit and bytes >= self.limit:                raise ConsoleError("Console run-away (exceeded %i bytes)"                                   % self.limit, RUNAWAY)            if str == "\n":                if lines > 0:                    output += line + "\n"                    if saveHistory:                        self.__addToHistory(line)                elif self.historySaveCmds and saveHistory:                    self.__addToHistory("*" + line)                lines += 1                line = ""            elif str == "\r":                pass # ignore \r's            else:                line += str            if line == self.PROMPT:                break        return output    def runCmd(self, command):        """Runs a command on the remote terminal and returns the output        as well as the return code.  For example:                ret = c.runCmd("ls")        print ret["output"]        sys.exit(run["return"])                """        # Allow exceptions to bubble up        realOutput = self.__runCmd(command)        retOutput  = self.__runCmd("echo $?", saveHistory=False)        try:            retCode =  int(retOutput)        except:            retCode = 255        return {            "output": realOutput,            "return": retCode,            }    def sendInput(self, input):        """Sends input to the remote terminal, but doesn't check        for a return code"""        realOutput = self.__runCmd(input)        return {            "output": realOutput,            "return": 0,            }    def __closeConsole(self):        """Closes the console connection and ensures that the console        process is killed. This should only be called by the domain.        Tests should call domain.closeConsole()"""        if self.consolePid != 0:            os.close(self.consoleFd)            os.kill(self.consolePid, 2)            self.consolePid = 0    def setLimit(self, limit):        """Sets a limit on the number of bytes that can be        read in an attempt to run a command.  We need this when        running something that can run away"""        try:            self.limit = int(limit)        except Exception, e:            self.limit = None     def setHistorySaveCmds(self, value):        # True or False        self.historySaveCmds = value                               if __name__ == "__main__":    """This is both an example of using the XmConsole class, as    well as a utility for command-line execution of single commands    on a domU console.  Prints output to stdout.  Exits with the same    code as the domU command.    """    verbose = True        try:        t = XmConsole(sys.argv[1])    except ConsoleError, e:        print "Failed to attach to console (%s)" % str(e)        sys.exit(255)    try:        run = t.runCmd(sys.argv[2])    except ConsoleError, e:        print "Console failed (%)" % str(e)        sys.exit(255)            t._XmConsole__closeConsole()        print run["output"],    sys.exit(run["return"])            

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -