📄 io.c
字号:
static void IO_free(IOData_t *io){ g_return_if_fail(IO_get(io->Key) == NULL); if (io->Flags & IOFlag_FreeIOBuf) a_IO_set_buf(io, NULL, 0); g_free(io);}/* * Close an open FD, and remove io controls. * (This function can be used for Close and Abort operations) */static void IO_close_fd(IOData_t *io, gint CloseCode){ gint st; /* With HTTP, if we close the writing part, the reading one also gets * closed! (other clients may set 'IOFlag_ForceClose') */ if ((io->Flags & IOFlag_ForceClose) || (CloseCode != IO_StopWr)) do st = close(io->FD); while (st < 0 && errno == EINTR); /* Remove this IOData_t reference, from our ValidIOs list * We don't deallocate it here, just remove from the list.*/ IO_del(io); /* Stop the polling on this FD */ g_source_remove(io->watch_id);}/* * Abort an open FD. * This function is called to abort a FD connection due to an IO error * or just because the connection is not required anymore. */static gboolean IO_abort(IOData_t *io){ /* Close and finish this FD's activity */ IO_close_fd(io, IO_StopRdWr); return FALSE;}/* * Read data from a file descriptor into a specific buffer */static gboolean IO_read(IOData_t *io){ ssize_t St; gboolean ret, DataPending; DEBUG_MSG(3, " IO_read\n"); do { ret = FALSE; DataPending = FALSE; St = read(io->FD, io->Buf, io->BufSize); io->Status = St; DEBUG_MSG(3, " IO_read: %s [errno %d] [St %d]\n", g_strerror(errno), errno, St); if ( St < 0 ) { /* Error */ io->Status = -errno; if (errno == EINTR) continue; else if (errno == EAGAIN) ret = TRUE; } else if ( St == 0 ) { /* All data read (EOF) */ IO_close_fd(io, IO_StopRd); a_IO_ccc(OpEnd, 2, FWD, io->Info, io, NULL); } else if ( (size_t)St < io->BufSize ){ /* We have all the new data */ a_IO_ccc(OpSend, 2, FWD, io->Info, io, NULL); ret = TRUE; } else { /* BytesRead == io->BufSize */ /* We have new data, and maybe more... */ a_IO_ccc(OpSend, 2, FWD, io->Info, io, NULL); DataPending = TRUE; } } while (DataPending); return ret;}/* * Write data, from a specific buffer, into a file descriptor * todo: Implement IOWrites. */static gboolean IO_write(IOData_t *io){ ssize_t St; gboolean ret, DataPending; DEBUG_MSG(3, " IO_write\n"); do { ret = FALSE; DataPending = FALSE; St = write(io->FD, io->Buf, io->BufSize); io->Status = St; DEBUG_MSG(3, " IO_write: %s [errno %d] [St %d]\n", g_strerror(errno), errno, St); if ( St < 0 ) { /* Error */ io->Status = -errno; if (errno == EINTR) { continue; } else if (errno == EAGAIN) { DEBUG_MSG(4, " IO_write: EAGAIN\n"); ret = TRUE; } } else if ( (size_t)St < io->BufSize ){ /* Not all data written */ io->BufSize -= St; io->Buf = (gchar *)io->Buf + St; DataPending = TRUE; DEBUG_MSG(4, " IO_write: Changing io->Buf (%d)\n", St); } else { /* All data in buffer written */ if ( io->Op == IOWrite ) { /* Single write */ IO_close_fd(io, IO_StopWr); a_IO_ccc(OpEnd, 1, FWD, io->Info, io, NULL); } else if ( io->Op == IOWrites ) { /* Writing in small chunks */ /* clear the buffer, and wait for a new chunk */ a_IO_set_buf(io, NULL, 0); if (io->Flags & IOFlag_ForceClose) { IO_close_fd(io, IO_StopWr); a_IO_ccc(OpEnd, 1, FWD, io->Info, io, NULL); } } } } while (DataPending); return ret;}/* * Handle background IO for a given FD (reads | writes) * (This function gets called by glib when there's activity in the FD) */static gboolean IO_callback(GIOChannel *src, GIOCondition cond, gpointer data){ gboolean ret = FALSE; gint io_key = GPOINTER_TO_INT(data); IOData_t *io = IO_get(io_key); // IO_print_cond_status("IO_callback: ", cond, src, io_key); /* There should be no more glib events on already closed FDs --Jcid */ if ( io == NULL ) { g_warning("IO_callback: call on already closed io!\n"); g_assert_not_reached(); return FALSE; } if ( cond & (G_IO_IN | G_IO_HUP) ){ /* Read */ ret = IO_read(io); } else if ( cond & G_IO_OUT ){ /* Write */ ret = IO_write(io); io = IO_get(io_key); /* IO_write may have freed 'io' */ if (io && io->Status == -EAGAIN) ret = TRUE; /* wait for another G_IO_OUT event... */ } if ( cond & G_IO_ERR ){ /* Error */ /* IO_read/IO_write may free 'io' */ if ((io = IO_get(io_key))) { io->Status = -EIO; ret = IO_abort(io); } else { ret = FALSE; } } else if ( cond & (G_IO_PRI | G_IO_NVAL) ){ /* Ignore these exceptional conditions */ ret = FALSE; } return ret;}/* * Receive an IO request (IORead | IOWrite | IOWrites), * Set the GIOChannel and let it flow! */static void IO_submit(IOData_t *r_io){ /* Insert this IO in ValidIOs */ IO_ins(r_io); /* Set FD to background and to close on exec. */ fcntl(r_io->FD, F_SETFL, O_NONBLOCK | fcntl(r_io->FD, F_GETFL)); fcntl(r_io->FD, F_SETFD, FD_CLOEXEC | fcntl(r_io->FD, F_GETFD)); if ( r_io->Op == IORead ) { r_io->watch_id = g_io_add_watch(r_io->GioCh, G_IO_IN | G_IO_ERR | G_IO_HUP, IO_callback, GINT_TO_POINTER (r_io->Key)); g_io_channel_unref(r_io->GioCh); } else if ( r_io->Op == IOWrite || r_io->Op == IOWrites ) { r_io->watch_id = g_io_add_watch(r_io->GioCh, G_IO_OUT | G_IO_ERR, IO_callback, GINT_TO_POINTER (r_io->Key)); g_io_channel_unref(r_io->GioCh); }}/* * Receive IO request (IORead | IOWrite | IOWrites), * and either start or keep it flowing. */static void IO_send(IOData_t *io){ if (!io->Key) IO_submit(io);}/* * CCC function for the IO module * ( Data1 = IOData_t* ; Data2 = NULL ) */void a_IO_ccc(int Op, int Branch, int Dir, ChainLink *Info, void *Data1, void *Data2){ IOData_t *io = Data1; a_Chain_debug_msg("a_IO_ccc", Op, Branch, Dir); if (Branch == 1) { if (Dir == BCK) { /* Write data */ switch (Op) { case OpStart: io->Info = Info; Info->LocalKey = io; break; case OpSend: /* this part submits the io */ IO_send(io); break; case OpAbort: io = Info->LocalKey; IO_abort(io); IO_free(io); g_free(Info); break; } } else { /* FWD */ /* Write-data status */ switch (Op) { case OpEnd: a_Chain_fcb(OpEnd, Info, io, NULL); IO_free(io); break; case OpAbort: a_Chain_fcb(OpAbort, Info, NULL, NULL); IO_free(io); break; } } } else if (Branch == 2) { if (Dir == BCK) { /* This part catches the reader's messages */ switch (Op) { case OpStart: Info->LocalKey = io; io->Info = Info; IO_submit(io); break; case OpAbort: io = Info->LocalKey; IO_abort(io); IO_free(io); g_free(Info); break; } } else { /* FWD */ /* Send read-data */ switch (Op) { case OpStart: io->Info = Info; Info->LocalKey = io; a_Chain_link_new(Info, a_IO_ccc, FWD, a_Cache_ccc, 2, 2); a_Chain_fcb(OpStart, Info, io, io->ExtData); IO_submit(io); break; case OpSend: a_Chain_fcb(OpSend, Info, io, NULL); break; case OpEnd: a_Chain_fcb(OpEnd, Info, io, NULL); IO_free(io); break; case OpAbort: MSG(" Not implemented\n"); break; } } } else if (Branch == 3) { if (Dir == BCK) { /* Write data using a thread */ switch (Op) { case OpStart: { gint *fd = g_new(gint, 1); *fd = *(int*)Data1; /* SockFD */ Info->LocalKey = fd; break; } case OpEnd: a_IO_write_chunk(*(int*)Info->LocalKey, NULL, 0); g_free(Info->LocalKey); break; case OpSend: { /* this part submits the data to the thread */ DataBuf *dbuf = Data1; a_IO_write_chunk(*(int*)Info->LocalKey, dbuf->Buf, dbuf->Size); break; } case OpAbort: g_free(Info->LocalKey); g_free(Info); break; } } else { /* FWD */ /* Write-data status */ switch (Op) { case OpEnd: a_Chain_fcb(OpEnd, Info, io, NULL); IO_free(io); break; case OpAbort: a_Chain_fcb(OpAbort, Info, NULL, NULL); IO_free(io); break; } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -