📄 pipe.c
字号:
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/fs/pipe.c
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
24300 /* This file deals with the suspension and revival of processes. A process can
24301 * be suspended because it wants to read or write from a pipe and can't, or
24302 * because it wants to read or write from a special file and can't. When a
24303 * process can't continue it is suspended, and revived later when it is able
24304 * to continue.
24305 *
24306 * The entry points into this file are
24307 * do_pipe: perform the PIPE system call
24308 * pipe_check: check to see that a read or write on a pipe is feasible now
24309 * suspend: suspend a process that cannot do a requested read or write
24310 * release: check to see if a suspended process can be released and do it
24311 * revive: mark a suspended process as able to run again
24312 * do_unpause: a signal has been sent to a process; see if it suspended
24313 */
24314
24315 #include "fs.h"
24316 #include <fcntl.h>
24317 #include <signal.h>
24318 #include <minix/boot.h>
24319 #include <minix/callnr.h>
24320 #include <minix/com.h>
24321 #include "dev.h"
24322 #include "file.h"
24323 #include "fproc.h"
24324 #include "inode.h"
24325 #include "param.h"
24326
24327 PRIVATE message mess;
24328
24329 /*===========================================================================*
24330 * do_pipe *
24331 *===========================================================================*/
24332 PUBLIC int do_pipe()
24333 {
24334 /* Perform the pipe(fil_des) system call. */
24335
24336 register struct fproc *rfp;
24337 register struct inode *rip;
24338 int r;
24339 struct filp *fil_ptr0, *fil_ptr1;
24340 int fil_des[2]; /* reply goes here */
24341
24342 /* Acquire two file descriptors. */
24343 rfp = fp;
24344 if ( (r = get_fd(0, R_BIT, &fil_des[0], &fil_ptr0)) != OK) return(r);
24345 rfp->fp_filp[fil_des[0]] = fil_ptr0;
24346 fil_ptr0->filp_count = 1;
24347 if ( (r = get_fd(0, W_BIT, &fil_des[1], &fil_ptr1)) != OK) {
24348 rfp->fp_filp[fil_des[0]] = NIL_FILP;
24349 fil_ptr0->filp_count = 0;
24350 return(r);
24351 }
24352 rfp->fp_filp[fil_des[1]] = fil_ptr1;
24353 fil_ptr1->filp_count = 1;
24354
24355 /* Make the inode on the pipe device. */
24356 if ( (rip = alloc_inode(PIPE_DEV, I_REGULAR) ) == NIL_INODE) {
24357 rfp->fp_filp[fil_des[0]] = NIL_FILP;
24358 fil_ptr0->filp_count = 0;
24359 rfp->fp_filp[fil_des[1]] = NIL_FILP;
24360 fil_ptr1->filp_count = 0;
24361 return(err_code);
24362 }
24363
24364 if (read_only(rip) != OK) panic("pipe device is read only", NO_NUM);
24365
24366 rip->i_pipe = I_PIPE;
24367 rip->i_mode &= ~I_REGULAR;
24368 rip->i_mode |= I_NAMED_PIPE; /* pipes and FIFOs have this bit set */
24369 fil_ptr0->filp_ino = rip;
24370 fil_ptr0->filp_flags = O_RDONLY;
24371 dup_inode(rip); /* for double usage */
24372 fil_ptr1->filp_ino = rip;
24373 fil_ptr1->filp_flags = O_WRONLY;
24374 rw_inode(rip, WRITING); /* mark inode as allocated */
24375 reply_i1 = fil_des[0];
24376 reply_i2 = fil_des[1];
24377 rip->i_update = ATIME | CTIME | MTIME;
24378 return(OK);
24379 }
24382 /*===========================================================================*
24383 * pipe_check *
24384 *===========================================================================*/
24385 PUBLIC int pipe_check(rip, rw_flag, oflags, bytes, position, canwrite)
24386 register struct inode *rip; /* the inode of the pipe */
24387 int rw_flag; /* READING or WRITING */
24388 int oflags; /* flags set by open or fcntl */
24389 register int bytes; /* bytes to be read or written (all chunks) */
24390 register off_t position; /* current file position */
24391 int *canwrite; /* return: number of bytes we can write */
24392 {
24393 /* Pipes are a little different. If a process reads from an empty pipe for
24394 * which a writer still exists, suspend the reader. If the pipe is empty
24395 * and there is no writer, return 0 bytes. If a process is writing to a
24396 * pipe and no one is reading from it, give a broken pipe error.
24397 */
24398
24399 int r = 0;
24400
24401 /* If reading, check for empty pipe. */
24402 if (rw_flag == READING) {
24403 if (position >= rip->i_size) {
24404 /* Process is reading from an empty pipe. */
24405 if (find_filp(rip, W_BIT) != NIL_FILP) {
24406 /* Writer exists */
24407 if (oflags & O_NONBLOCK)
24408 r = EAGAIN;
24409 else
24410 suspend(XPIPE); /* block reader */
24411
24412 /* If need be, activate sleeping writers. */
24413 if (susp_count > 0) release(rip, WRITE, susp_count);
24414 }
24415 return(r);
24416 }
24417 } else {
24418 /* Process is writing to a pipe. */
24419 /* if (bytes > PIPE_SIZE) return(EFBIG); */
24420 if (find_filp(rip, R_BIT) == NIL_FILP) {
24421 /* Tell kernel to generate a SIGPIPE signal. */
24422 sys_kill((int)(fp - fproc), SIGPIPE);
24423 return(EPIPE);
24424 }
24425
24426 if (position + bytes > PIPE_SIZE) {
24427 if ((oflags & O_NONBLOCK) && bytes < PIPE_SIZE)
24428 return(EAGAIN);
24429 else if ((oflags & O_NONBLOCK) && bytes > PIPE_SIZE) {
24430 if ( (*canwrite = (PIPE_SIZE - position)) > 0) {
24431 /* Do a partial write. Need to wakeup reader */
24432 release(rip, READ, susp_count);
24433 return(1);
24434 } else {
24435 return(EAGAIN);
24436 }
24437 }
24438 if (bytes > PIPE_SIZE) {
24439 if ((*canwrite = PIPE_SIZE - position) > 0) {
24440 /* Do a partial write. Need to wakeup reader
24441 * since we'll suspend ourself in read_write()
24442 */
24443 release(rip, READ, susp_count);
24444 return(1);
24445 }
24446 }
24447 suspend(XPIPE); /* stop writer -- pipe full */
24448 return(0);
24449 }
24450
24451 /* Writing to an empty pipe. Search for suspended reader. */
24452 if (position == 0) release(rip, READ, susp_count);
24453 }
24454
24455 *canwrite = 0;
24456 return(1);
24457 }
24460 /*===========================================================================*
24461 * suspend *
24462 *===========================================================================*/
24463 PUBLIC void suspend(task)
24464 int task; /* who is proc waiting for? (PIPE = pipe) */
24465 {
24466 /* Take measures to suspend the processing of the present system call.
24467 * Store the parameters to be used upon resuming in the process table.
24468 * (Actually they are not used when a process is waiting for an I/O device,
24469 * but they are needed for pipes, and it is not worth making the distinction.)
24470 */
24471
24472 if (task == XPIPE || task == XPOPEN) susp_count++;/* #procs susp'ed on pipe*/
24473 fp->fp_suspended = SUSPENDED;
24474 fp->fp_fd = fd << 8 | fs_call;
24475 fp->fp_task = -task;
24476 if (task == XLOCK) {
24477 fp->fp_buffer = (char *) name1; /* third arg to fcntl() */
24478 fp->fp_nbytes =request; /* second arg to fcntl() */
24479 } else {
24480 fp->fp_buffer = buffer; /* for reads and writes */
24481 fp->fp_nbytes = nbytes;
24482 }
24483 dont_reply = TRUE; /* do not send caller a reply message now */
24484 }
24487 /*===========================================================================*
24488 * release *
24489 *===========================================================================*/
24490 PUBLIC void release(ip, call_nr, count)
24491 register struct inode *ip; /* inode of pipe */
24492 int call_nr; /* READ, WRITE, OPEN or CREAT */
24493 int count; /* max number of processes to release */
24494 {
24495 /* Check to see if any process is hanging on the pipe whose inode is in 'ip'.
24496 * If one is, and it was trying to perform the call indicated by 'call_nr',
24497 * release it.
24498 */
24499
24500 register struct fproc *rp;
24501
24502 /* Search the proc table. */
24503 for (rp = &fproc[0]; rp < &fproc[NR_PROCS]; rp++) {
24504 if (rp->fp_suspended == SUSPENDED &&
24505 rp->fp_revived == NOT_REVIVING &&
24506 (rp->fp_fd & BYTE) == call_nr &&
24507 rp->fp_filp[rp->fp_fd>>8]->filp_ino == ip) {
24508 revive((int)(rp - fproc), 0);
24509 susp_count--; /* keep track of who is suspended */
24510 if (--count == 0) return;
24511 }
24512 }
24513 }
24516 /*===========================================================================*
24517 * revive *
24518 *===========================================================================*/
24519 PUBLIC void revive(proc_nr, bytes)
24520 int proc_nr; /* process to revive */
24521 int bytes; /* if hanging on task, how many bytes read */
24522 {
24523 /* Revive a previously blocked process. When a process hangs on tty, this
24524 * is the way it is eventually released.
24525 */
24526
24527 register struct fproc *rfp;
24528 register int task;
24529
24530 if (proc_nr < 0 || proc_nr >= NR_PROCS) panic("revive err", proc_nr);
24531 rfp = &fproc[proc_nr];
24532 if (rfp->fp_suspended == NOT_SUSPENDED || rfp->fp_revived == REVIVING)return;
24533
24534 /* The 'reviving' flag only applies to pipes. Processes waiting for TTY get
24535 * a message right away. The revival process is different for TTY and pipes.
24536 * For TTY revival, the work is already done, for pipes it is not: the proc
24537 * must be restarted so it can try again.
24538 */
24539 task = -rfp->fp_task;
24540 if (task == XPIPE || task == XLOCK) {
24541 /* Revive a process suspended on a pipe or lock. */
24542 rfp->fp_revived = REVIVING;
24543 reviving++; /* process was waiting on pipe or lock */
24544 } else {
24545 rfp->fp_suspended = NOT_SUSPENDED;
24546 if (task == XPOPEN) /* process blocked in open or create */
24547 reply(proc_nr, rfp->fp_fd>>8);
24548 else {
24549 /* Revive a process suspended on TTY or other device. */
24550 rfp->fp_nbytes = bytes; /*pretend it wants only what there is*/
24551 reply(proc_nr, bytes); /* unblock the process */
24552 }
24553 }
24554 }
24557 /*===========================================================================*
24558 * do_unpause *
24559 *===========================================================================*/
24560 PUBLIC int do_unpause()
24561 {
24562 /* A signal has been sent to a user who is paused on the file system.
24563 * Abort the system call with the EINTR error message.
24564 */
24565
24566 register struct fproc *rfp;
24567 int proc_nr, task, fild;
24568 struct filp *f;
24569 dev_t dev;
24570
24571 if (who > MM_PROC_NR) return(EPERM);
24572 proc_nr = pro;
24573 if (proc_nr < 0 || proc_nr >= NR_PROCS) panic("unpause err 1", proc_nr);
24574 rfp = &fproc[proc_nr];
24575 if (rfp->fp_suspended == NOT_SUSPENDED) return(OK);
24576 task = -rfp->fp_task;
24577
24578 switch(task) {
24579 case XPIPE: /* process trying to read or write a pipe */
24580 break;
24581
24582 case XOPEN: /* process trying to open a special file */
24583 panic ("fs/do_unpause called with XOPEN\n", NO_NUM);
24584
24585 case XLOCK: /* process trying to set a lock with FCNTL */
24586 break;
24587
24588 case XPOPEN: /* process trying to open a fifo */
24589 break;
24590
24591 default: /* process trying to do device I/O (e.g. tty)*/
24592 fild = (rfp->fp_fd >> 8) & BYTE;/* extract file descriptor */
24593 if (fild < 0 || fild >= OPEN_MAX)panic("unpause err 2",NO_NUM);
24594 f = rfp->fp_filp[fild];
24595 dev = (dev_t) f->filp_ino->i_zone[0]; /* device hung on */
24596 mess.TTY_LINE = (dev >> MINOR) & BYTE;
24597 mess.PROC_NR = proc_nr;
24598
24599 /* Tell kernel R or W. Mode is from current call, not open. */
24600 mess.COUNT = (rfp->fp_fd & BYTE) == READ ? R_BIT : W_BIT;
24601 mess.m_type = CANCEL;
24602 fp = rfp; /* hack - call_ctty uses fp */
24603 (*dmap[(dev >> MAJOR) & BYTE].dmap_rw)(task, &mess);
24604 }
24605
24606 rfp->fp_suspended = NOT_SUSPENDED;
24607 reply(proc_nr, EINTR); /* signal interrupted call */
24608 return(OK);
24609 }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -