📄 web_display.py
字号:
width: 100%; height: 100%; margin: 3px 0 0 0; border: 1px solid #999; }#page { position: relative; width: 100%;height: 100%;}"""# HTML Initial Page_html_page = ["""<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"><html><head><title>Urwid Web Display - ""","""</title><style type="text/css">""" + _css_style + """</style></head><body id="body" onload="load_web_display()"><div style="position:absolute; visibility:hidden;"><br id="br"\> <pre>The quick brown fox jumps over the lazy dog.<span id="testchar">X</span><span id="testchar2">Y</span></pre></div>Urwid Web Display - <b>""","""</b> -Status: <span id="status">Set up</span><script type="text/javascript">//<![CDATA[""" + _js_code +"""//]]></script><pre id="text"></pre></body></html>"""]class Screen: def __init__(self): self.palette = {} self.has_color = True def register_palette( self, l ): """Register a list of palette entries. l -- list of (name, foreground, background) or (name, same_as_other_name) palette entries. calls self.register_palette_entry for each item in l """ for item in l: if len(item) in (3,4): self.register_palette_entry( *item ) continue assert len(item) == 2, "Invalid register_palette usage" name, like_name = item if not self.palette.has_key(like_name): raise Exception("palette entry '%s' doesn't exist"%like_name) self.palette[name] = self.palette[like_name] def register_palette_entry( self, name, foreground, background, mono=None): """Register a single palette entry. name -- new entry/attribute name foreground -- foreground colour background -- background colour mono -- monochrome terminal attribute See curses_display.register_palette_entry for more info. """ if foreground == "default": foreground = "black" if background == "default": background = "light gray" self.palette[name] = (foreground, background, mono) def set_mouse_tracking(self): """Not yet implemented""" pass def start(self): """ This function reads the initial screen size, generates a unique id and handles cleanup when fn exits. web_display.set_preferences(..) must be called before calling this function for the preferences to take effect """ global _prefs client_init = sys.stdin.read(50) assert client_init.startswith("window resize "),client_init ignore1,ignore2,x,y = client_init.split(" ",3) x = int(x) y = int(y) self._set_screen_size( x, y ) self.last_screen = {} self.last_screen_width = 0 self.update_method = os.environ["HTTP_X_URWID_METHOD"] assert self.update_method in ("multipart","polling") if self.update_method == "polling" and not _prefs.allow_polling: sys.stdout.write("Status: 403 Forbidden\r\n\r\n") sys.exit(0) clients = glob.glob(os.path.join(_prefs.pipe_dir,"urwid*.in")) if len(clients) >= _prefs.max_clients: sys.stdout.write("Status: 503 Sever Busy\r\n\r\n") sys.exit(0) urwid_id = "%09d%09d"%(random.randrange(10**9), random.randrange(10**9)) self.pipe_name = os.path.join(_prefs.pipe_dir,"urwid"+urwid_id) os.mkfifo(self.pipe_name+".in",0600) signal.signal(signal.SIGTERM,self._cleanup_pipe) self.input_fd = os.open(self.pipe_name+".in", os.O_NONBLOCK | os.O_RDONLY) self.input_tail = "" self.content_head = ("Content-type: " "multipart/x-mixed-replace;boundary=ZZ\r\n" "X-Urwid-ID: "+urwid_id+"\r\n" "\r\n\r\n" "--ZZ\r\n") if self.update_method=="polling": self.content_head = ( "Content-type: text/plain\r\n" "X-Urwid-ID: "+urwid_id+"\r\n" "\r\n\r\n") signal.signal(signal.SIGALRM,self._handle_alarm) signal.alarm( ALARM_DELAY ) def stop(self): """ Restore settings and clean up. """ try: self._close_connection() except: pass signal.signal(signal.SIGTERM,signal.SIG_DFL) self._cleanup_pipe() def run_wrapper(self,fn): """ Run the application main loop, calling start() first and stop() on exit. """ try: self.start() return fn() finally: self.stop() def _close_connection(self): if self.update_method == "polling child": self.server_socket.settimeout(0) socket, addr = self.server_socket.accept() socket.sendall("Z") socket.close() if self.update_method == "multipart": sys.stdout.write("\r\nZ" "\r\n--ZZ--\r\n") sys.stdout.flush() def _cleanup_pipe(self, *args): if not self.pipe_name: return try: os.remove(self.pipe_name+".in") os.remove(self.pipe_name+".update") except: pass def _set_screen_size(self, cols, rows ): """Set the screen size (within max size).""" if cols > MAX_COLS: cols = MAX_COLS if rows > MAX_ROWS: rows = MAX_ROWS self.screen_size = cols, rows def draw_screen(self, (cols, rows), r ): """Send a screen update to the client.""" if cols != self.last_screen_width: self.last_screen = {} sendq = [self.content_head] if self.update_method == "polling": send = sendq.append elif self.update_method == "polling child": signal.alarm( 0 ) try: s, addr = self.server_socket.accept() except socket.timeout, e: sys.exit(0) send = s.sendall else: signal.alarm( 0 ) send = sendq.append send("\r\n") self.content_head = "" assert r.rows() == rows if r.cursor is not None: cx, cy = r.cursor else: cx = cy = None new_screen = {} y = -1 for row in r.content(): y += 1 row = list(row) l = [] sig = tuple(row) if y == cy: sig = sig + (cx,) new_screen[sig] = new_screen.get(sig,[]) + [y] old_line_numbers = self.last_screen.get(sig, None) if old_line_numbers is not None: if y in old_line_numbers: old_line = y else: old_line = old_line_numbers[0] send( "<%d\n"%old_line ) continue col = 0 for (a, cs, run) in row: run = run.translate(_trans_table) if a is None: fg,bg,mono = "black", "light gray", None else: fg,bg,mono = self.palette[a] if y == cy and col <= cx: run_width = util.calc_width(run, 0, len(run)) if col+run_width > cx: l.append(code_span(run, fg, bg, cx-col)) else: l.append(code_span(run, fg, bg)) col += run_width else: l.append(code_span(run, fg, bg)) send("".join(l)+"\n") self.last_screen = new_screen self.last_screen_width = cols if self.update_method == "polling": sys.stdout.write("".join(sendq)) sys.stdout.flush() sys.stdout.close() self._fork_child() elif self.update_method == "polling child": s.close() else: # update_method == "multipart" send("\r\n--ZZ\r\n") sys.stdout.write("".join(sendq)) sys.stdout.flush() signal.alarm( ALARM_DELAY ) def clear(self): """ Force the screen to be completely repainted on the next call to draw_screen(). (does nothing for web_display) """ pass def _fork_child(self): """ Fork a child to run CGI disconnected for polling update method. Force parent process to exit. """ daemonize( self.pipe_name +".err" ) self.input_fd = os.open(self.pipe_name+".in", os.O_NONBLOCK | os.O_RDONLY) self.update_method = "polling child" s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) s.bind( self.pipe_name+".update" ) s.listen(1) s.settimeout(POLL_CONNECT) self.server_socket = s def _handle_alarm(self, sig, frame): assert self.update_method in ("multipart","polling child") if self.update_method == "polling child": # send empty update try: s, addr = self.server_socket.accept() s.close() except socket.timeout, e: sys.exit(0) else: # send empty update sys.stdout.write("\r\n\r\n--ZZ\r\n") sys.stdout.flush() signal.alarm( ALARM_DELAY ) def get_cols_rows(self): """Return the screen size.""" return self.screen_size def get_input(self, raw_keys=False): """Return pending input as a list.""" l = [] resized = False try: iready,oready,eready = select.select( [self.input_fd],[],[],0.5) except select.error, e: # return on interruptions if e.args[0] == 4: if raw_keys: return [],[] return [] raise if not iready: if raw_keys: return [],[] return [] keydata = os.read(self.input_fd, MAX_READ) os.close(self.input_fd) self.input_fd = os.open(self.pipe_name+".in", os.O_NONBLOCK | os.O_RDONLY) #sys.stderr.write( `keydata,self.input_tail`+"\n" ) keys = keydata.split("\n") keys[0] = self.input_tail + keys[0] self.input_tail = keys[-1] for k in keys[:-1]: if k.startswith("window resize "): ign1,ign2,x,y = k.split(" ",3) x = int(x) y = int(y) self._set_screen_size(x, y) resized = True else: l.append(k) if resized: l.append("window resize") if raw_keys: return l, [] return l def code_span( s, fg, bg, cursor = -1): code_fg = _code_colours[ fg ] code_bg = _code_colours[ bg ] if cursor >= 0: c_off, _ign = util.calc_text_pos(s, 0, len(s), cursor) c2_off = util.move_next_char(s, c_off, len(s)) return ( code_fg + code_bg + s[:c_off] + "\n" + code_bg + code_fg + s[c_off:c2_off] + "\n" + code_fg + code_bg + s[c2_off:] + "\n") else: return code_fg + code_bg + s + "\n"def html_escape(text): """Escape text so that it will be displayed safely within HTML""" text = text.replace('&','&') text = text.replace('<','<') text = text.replace('>','>') return text def is_web_request(): """ Return True if this is a CGI web request. """ return os.environ.has_key('REQUEST_METHOD')def handle_short_request(): """ Handle short requests such as passing keystrokes to the application or sending the initial html page. If returns True, then this function recognised and handled a short request, and the calling script should immediately exit. web_display.set_preferences(..) should be called before calling this function for the preferences to take effect """ global _prefs if not is_web_request(): return False if os.environ['REQUEST_METHOD'] == "GET": # Initial request, send the HTML and javascript. sys.stdout.write("Content-type: text/html\r\n\r\n" + html_escape(_prefs.app_name).join(_html_page)) return True if os.environ['REQUEST_METHOD'] != "POST": # Don't know what to do with head requests etc. return False if not os.environ.has_key('HTTP_X_URWID_ID'): # If no urwid id, then the application should be started. return False urwid_id = os.environ['HTTP_X_URWID_ID'] if len(urwid_id)>20: #invalid. handle by ignoring #assert 0, "urwid id too long!" sys.stdout.write("Status: 414 URI Too Long\r\n\r\n") return True for c in urwid_id: if c not in "0123456789": # invald. handle by ignoring #assert 0, "invalid chars in id!" sys.stdout.write("Status: 403 Forbidden\r\n\r\n") return True if os.environ.get('HTTP_X_URWID_METHOD',None) == "polling": # this is a screen update request s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) try: s.connect( os.path.join(_prefs.pipe_dir, "urwid"+urwid_id+".update") ) data = "Content-type: text/plain\r\n\r\n"+s.recv(BUF_SZ) while data: sys.stdout.write(data) data = s.recv(BUF_SZ) return True except socket.error,e: sys.stdout.write("Status: 404 Not Found\r\n\r\n") return True # this is a keyboard input request try: fd = os.open((os.path.join(_prefs.pipe_dir, "urwid"+urwid_id+".in")), os.O_WRONLY) except OSError,e: sys.stdout.write("Status: 404 Not Found\r\n\r\n") return True keydata = sys.stdin.read(MAX_READ) os.write(fd,keydata) os.close(fd) sys.stdout.write("Content-type: text/plain\r\n\r\n") return Trueclass _Preferences: app_name = "Unnamed Application" pipe_dir = "/tmp" allow_polling = True max_clients = 20_prefs = _Preferences()def set_preferences( app_name, pipe_dir="/tmp", allow_polling=True, max_clients=20 ): """ Set web_display preferences. app_name -- application name to appear in html interface pipe_dir -- directory for input pipes, daemon update sockets and daemon error logs allow_polling -- allow creation of daemon processes for browsers without multipart support max_clients -- maximum concurrent client connections. This pool is shared by all urwid applications using the same pipe_dir """ global _prefs _prefs.app_name = app_name _prefs.pipe_dir = pipe_dir _prefs.allow_polling = allow_polling _prefs.max_clients = max_clientsclass ErrorLog: def __init__(self, errfile ): self.errfile = errfile def write(self, err): open(self.errfile,"a").write(err)def daemonize( errfile ): """ Detach process and become a daemon. """ pid = os.fork() if pid: os._exit(0) os.setsid() signal.signal(signal.SIGHUP, signal.SIG_IGN) os.umask(0) pid = os.fork() if pid: os._exit(0) os.chdir("/") for fd in range(0,20): try: os.close(fd) except OSError: pass sys.stdin = open("/dev/null","r") sys.stdout = open("/dev/null","w") sys.stderr = ErrorLog( errfile )
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -