📄 scheduler.scm
字号:
(##sys#setislot t 4 #f) (##sys#add-to-ready-queue t) )(define ##sys#default-exception-handler (let ([print-error-message print-error-message] [display display] [print-call-chain print-call-chain] [open-output-string open-output-string] [get-output-string get-output-string] ) (lambda (arg) (let ([ct ##sys#current-thread]) (dbg "exception: " ct " -> " (if (##sys#structure? arg 'condition) (##sys#slot arg 2) arg)) (cond [(foreign-value "C_abort_on_thread_exceptions" bool) (let* ([pt ##sys#primordial-thread] [ptx (##sys#slot pt 1)] ) (##sys#setslot pt 1 (lambda () (##sys#signal arg) (ptx) ) ) (##sys#thread-unblock! pt) ) ] [##sys#warnings-enabled (let ([o (open-output-string)]) (display "Warning (" o) (display ct o) (display "): " o) (print-error-message arg ##sys#standard-error (get-output-string o)) (print-call-chain ##sys#standard-error 0 ct) ) ] ) (##sys#setslot ct 7 arg) (##sys#thread-kill! ct 'terminated) (##sys#schedule) ) ) ) );;; `select()'-based blocking:(define ##sys#fd-list '())(define ##sys#fdset-select-timeout (foreign-lambda* int ([bool to] [unsigned-long tm]) "struct timeval timeout;" "timeout.tv_sec = tm / 1000;" "timeout.tv_usec = (tm % 1000) * 1000;" "C_fdset_input_2 = C_fdset_input;" "C_fdset_output_2 = C_fdset_output;" "return(select(FD_SETSIZE, &C_fdset_input, &C_fdset_output, NULL, to ? &timeout : NULL));") )(define ##sys#fdset-restore (foreign-lambda* void () "C_fdset_input = C_fdset_input_2;" "C_fdset_output = C_fdset_output_2;") )((foreign-lambda* void () "FD_ZERO(&C_fdset_input);" "FD_ZERO(&C_fdset_output);") )(define ##sys#fdset-input-set (foreign-lambda* void ([int fd]) "FD_SET(fd, &C_fdset_input);" ) )(define ##sys#fdset-output-set (foreign-lambda* void ([int fd]) "FD_SET(fd, &C_fdset_output);" ) )(define ##sys#fdset-clear (foreign-lambda* void ([int fd]) "FD_CLR(fd, &C_fdset_input_2);" "FD_CLR(fd, &C_fdset_output_2);") )(define (##sys#thread-block-for-i/o! t fd i/o) (dbg t " blocks for I/O " fd) (let loop ([lst ##sys#fd-list]) (if (null? lst) (set! ##sys#fd-list (cons (list fd t) ##sys#fd-list)) (let ([a (car lst)]) (if (fx= fd (car a)) (##sys#setslot a 1 (cons t (cdr a))) (loop (cdr lst)) ) ) ) ) (case i/o ((#t #:input) (##sys#fdset-input-set fd)) ((#f #:output) (##sys#fdset-output-set fd)) ((#:all) (##sys#fdset-input-set fd) (##sys#fdset-output-set fd) ) ) (##sys#setslot t 3 'blocked) (##sys#setislot t 13 #f) (##sys#setslot t 11 (cons fd i/o)) )(define (##sys#unblock-threads-for-i/o) (dbg "fd-list: " ##sys#fd-list) (let* ([to? (pair? ##sys#timeout-list)] [rq? (pair? ##sys#ready-queue-head)] [n (##sys#fdset-select-timeout ; we use FD_SETSIZE, but really should use max fd (or rq? to?) (if (and to? (not rq?)) ; no thread was unblocked by timeout, so wait (let* ([tmo1 (caar ##sys#timeout-list)] [now (##sys#fudge 16)]) (fxmax 0 (- tmo1 now)) ) 0) ) ] ) ; otherwise immediate timeout. (dbg n " fds ready") (cond [(eq? -1 n) (##sys#force-primordial)] [(fx> n 0) (set! ##sys#fd-list (let loop ([n n] [lst ##sys#fd-list]) (if (or (zero? n) (null? lst)) lst (let* ([a (car lst)] [fd (car a)] [inf (##core#inline "C_fd_test_input" fd)] [outf (##core#inline "C_fd_test_output" fd)] ) (dbg "fd " fd " ready: input=" inf ", output=" outf) (if (or inf outf) (let loop2 ([threads (cdr a)]) (if (null? threads) (begin (##sys#fdset-clear fd) (loop (sub1 n) (cdr lst)) ) (let* ([t (car threads)] [p (##sys#slot t 11)] ) (when (and (pair? p) (eq? fd (car p)) (not (##sys#slot t 13) ) ) ; not unblocked by timeout (##sys#thread-basic-unblock! t) ) (loop2 (cdr threads)) ) ) ) (cons a (loop n (cdr lst))) ) ) ) ) ) ] ) (##sys#fdset-restore) ) );;; Clear I/O state for unblocked thread(define (##sys#clear-i/o-state-for-thread! t) (when (pair? (##sys#slot t 11)) (let ((fd (##sys#slot (##sys#slot t 11) 0))) (set! ##sys#fd-list (let loop ([lst ##sys#fd-list]) (if (null? lst) '() (let* ([a (##sys#slot lst 0)] [fd2 (##sys#slot a 0)] ) (if (eq? fd fd2) (let ((ts (##sys#delq t (##sys#slot a 1)))) ; remove from fd-list entry (cond ((null? ts) ;;(pp `(CLEAR FD: ,fd ,t) ##sys#standard-error) (##sys#fdset-clear fd) ; no more threads waiting for this fd (##sys#fdset-restore) (##sys#slot lst 1) ) (else (##sys#setslot a 1 ts) ; fd-list entry is list with t removed lst) ) ) (cons a (loop (##sys#slot lst 1)))))))))));;; Get list of all threads that are ready or waiting for timeout or waiting for I/O:(define (##sys#all-threads) (append ##sys#ready-queue-head (apply append (map cdr ##sys#fd-list)) (map cdr ##sys#timeout-list)));;; Remove all waiting threads from the relevant queues with the exception of the current thread:(define (##sys#fetch-and-clear-threads) (let ([all (vector ##sys#ready-queue-head ##sys#ready-queue-tail ##sys#fd-list ##sys#timeout-list)]) (set! ##sys#ready-queue-head '()) (set! ##sys#ready-queue-tail '()) (set! ##sys#fd-list '()) (set! ##sys#timeout-list '()) all) );;; Restore list of waiting threads:(define (##sys#restore-threads vec) (set! ##sys#ready-queue-head (##sys#slot vec 0)) (set! ##sys#ready-queue-tail (##sys#slot vec 1)) (set! ##sys#fd-list (##sys#slot vec 2)) (set! ##sys#timeout-list (##sys#slot vec 3)) );;; Unblock thread cleanly:(define (##sys#thread-unblock! t) (when (eq? 'blocked (##sys#slot t 3)) (set! ##sys#timeout-list (let loop ((l ##sys#timeout-list)) (if (null? l) l (let ((h (##sys#slot l 0))) (if (eq? (##sys#slot h 1) t) (##sys#slot l 1) (cons h (loop (##sys#slot l 1)))))))) (set! ##sys#fd-list (let loop ([fdl ##sys#fd-list]) (if (null? fdl) '() (let ([a (##sys#slot fdl 0)]) (cons (cons (##sys#slot a 0) (##sys#delq t (##sys#slot a 1)) ) (loop (##sys#slot fdl 1)) ) ) ) ) ) (##sys#setislot t 12 '()) (##sys#thread-basic-unblock! t) ) );;; Multithreaded breakpoints(define (##sys#break-entry name args) (when (or (not ##sys#break-in-thread) (eq? ##sys#break-in-thread ##sys#current-thread)) (##sys#call-with-current-continuation (lambda (k) (let* ((pk (if (eq? ##sys#current-thread ##sys#primordial-thread) '() (list '(exn . thread) ##sys#current-thread '(exn . primordial-continuation) (lambda _ ((##sys#slot ##sys#primordial-thread 1)))))) (exn (##sys#make-structure 'condition '(exn breakpoint) (append (list '(exn . message) "*** breakpoint ***" '(exn . arguments) (cons name args) '(exn . location) name '(exn . continuation) k) pk) ) ) ) (set! ##sys#last-breakpoint exn) (cond ((eq? ##sys#current-thread ##sys#primordial-thread) (##sys#signal exn) ) (else (##sys#setslot ##sys#current-thread 3 'suspended) (##sys#setslot ##sys#current-thread 1 (lambda () (k (##core#undefined)))) (let ([old (##sys#slot ##sys#primordial-thread 1)]) (##sys#setslot ##sys#primordial-thread 1 (lambda () (##sys#signal exn) (old) ) ) (##sys#thread-unblock! ##sys#primordial-thread) (##sys#schedule) ) ) ) ) ) ) ) ) (define (##sys#break-resume exn) ;; assumes current-thread is primordial (let* ((props (##sys#slot exn 2)) (a (member '(exn . continuation) props)) (t (member '(exn . thread) props)) (pk (or (member '(exn . primordial-continuation) props) a))) (when t (let ((t (cadr t))) (if a (##sys#setslot t 1 (lambda () ((cadr a) (##core#undefined)))) (##sys#signal-hook #:type-error "condition has no continuation" exn) ) (##sys#add-to-ready-queue t) ) ) (if pk ((cadr pk) (##core#undefined)) (##sys#signal-hook #:type-error "condition has no continuation" exn) ) ) )
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -