⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sctp_util.c

📁 fortran并行计算包
💻 C
📖 第 1 页 / 共 2 页
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* *  (C) 2001 by Argonne National Laboratory. *      See COPYRIGHT in top-level directory. */#include "mpidi_ch3_impl.h"#include "sctp_common.h"void print_SCTP_event(struct MPIDU_Sctp_event * eventp);int inline MPIDU_Sctp_post_writev(MPIDI_VC_t* vc, MPID_Request* sreq, int offset,				  MPIDU_Sock_progress_update_func_t fn, int stream_no);#undef FUNCNAME#define FUNCNAME adjust_req#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)int adjust_req(MPID_Request* rreq, MPIU_Size_t nb) {  MPID_IOV * iov = rreq-> dev.iov;  const int count = rreq->dev.iov_count;  int offset = 0;  int temp;    return adjust_iov(&iov, &rreq->dev.iov_count, nb); }#undef FUNCNAME#define FUNCNAME adjust_iov#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)int adjust_iov(MPID_IOV ** iovp, int * countp, MPIU_Size_t nb){    MPID_IOV * const iov = *iovp;    const int count = *countp;    int offset = 0;    int temp;        while (offset < count)    {	if (iov[offset].MPID_IOV_LEN <= nb)	{	  nb -= iov[offset].MPID_IOV_LEN;	  iov[offset].MPID_IOV_LEN = 0;  	  offset++;	}	else	{	    iov[offset].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)((char *) 							   iov[offset].MPID_IOV_BUF + nb);	    iov[offset].MPID_IOV_LEN -= nb;	    break;	}    }    *iovp += offset;    *countp -= offset;    return (*countp == 0);}#undef FUNCNAME#define FUNCNAME MPIDU_Sctp_event_enqueue#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)int MPIDU_Sctp_event_enqueue(MPIDU_Sctp_op_t op, MPIU_Size_t num_bytes, 				    sctp_rcvinfo* sri, int fd, void * user_ptr, void* user_ptr2, int value, int error){  struct MPIDU_Sctp_eventq_elem * eventq_elem;  struct MPIDU_Sctp_event* new_event;  int index;  sctp_rcvinfo bogus;  int mpi_errno = MPI_SUCCESS;   /*  eventq_head hasn't been initialized yet */  if(eventq_head == NULL) {    eventq_head = MPIU_Malloc(sizeof(struct MPIDU_Sctp_eventq_elem));    if(eventq_head == NULL) {      MPIDI_DBG_PRINTF((50, FCNAME, "Malloc Failed!\n"));      mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME,                                             __LINE__, MPI_ERR_OTHER,                                             "**nomem", NULL);      goto fn_fail;    }    eventq_head-> size = 0;    eventq_head-> head = 0;    eventq_head-> tail = 0;    eventq_head-> next = NULL;    eventq_tail = eventq_head;  }  /*  tail is full, need to allocate a new one */  if(eventq_tail-> size == MPIDU_SCTP_EVENTQ_POOL_SIZE){    eventq_elem = MPIU_Malloc(sizeof(struct MPIDU_Sctp_eventq_elem));    if(eventq_elem == NULL) {      MPIDI_DBG_PRINTF((50, FCNAME, "Malloc Failed!\n"));      mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME,                                             __LINE__, MPI_ERR_OTHER,                                             "**nomem", NULL);      goto fn_fail;    }    eventq_elem-> size = 0;    eventq_elem-> head = 0;    eventq_elem-> tail = 0;    eventq_elem-> next = NULL;    eventq_tail->next = eventq_elem;    eventq_tail = eventq_elem;  }  /* put the event in */  index = eventq_tail->tail;  new_event = &eventq_tail->event[index];  new_event-> op_type = op;  new_event-> num_bytes = num_bytes;  new_event-> fd = fd;  new_event-> sri = (sri == NULL)? bogus : *sri;  new_event-> user_ptr = user_ptr;  new_event-> user_ptr2 = user_ptr2;  new_event-> user_value = value;  new_event-> error = error;  eventq_tail->size++;  eventq_tail->tail = (index+1) % MPIDU_SCTP_EVENTQ_POOL_SIZE; fn_exit: fn_fail:  return mpi_errno;}/* end MPIDU_Socki_event_enqueue() */#undef FUNCNAME#define FUNCNAME MPIDU_Sctp_event_dequeue#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)int MPIDU_Sctp_event_dequeue(struct MPIDU_Sctp_event * eventp){    struct MPIDU_Sctp_eventq_elem* eventq_elem;    int mpi_errno = MPI_SUCCESS;        /* check if queue is empty */    if(eventq_head == NULL || eventq_head->size <= 0) {        mpi_errno--; /* as long as it isn't MPI_SUCCESS (not interpretted otherwise at call) */        eventp->op_type = 100;        return mpi_errno;    }    *eventp = eventq_head->event[eventq_head->head];    eventq_head->size--;    eventq_head->head = (eventq_head->head + 1) % MPIDU_SCTP_EVENTQ_POOL_SIZE;    /*  free eventq_head if it's empty, however, always maintain at least one */    if(eventq_head->size == 0 && eventq_tail != eventq_head) {      eventq_elem = eventq_head;            eventq_head = eventq_head-> next;      MPIU_Free(eventq_elem);    }        return mpi_errno;}/* end MPIDU_Socki_event_dequeue() */#undef FUNCNAME#define FUNCNAME MPIDU_Sctp_free_eventq_mem#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)void inline MPIDU_Sctp_free_eventq_mem(void){  struct MPIDU_Sctp_eventq_elem* eventq_elem;    while (eventq_head) {    eventq_elem = eventq_head;    eventq_head = eventq_head->next;    MPIU_Free(eventq_elem);  }  eventq_head = NULL;  eventq_tail = NULL;}#undef FUNCNAME#define FUNCNAME create_request#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static MPID_Request* create_request(MPID_IOV * iov, int iov_count, int iov_offset, MPIU_Size_t nb){    MPID_Request * sreq;    int i;    MPIDI_STATE_DECL(MPID_STATE_CREATE_REQUEST);    MPIDI_FUNC_ENTER(MPID_STATE_CREATE_REQUEST);        sreq = MPID_Request_create();    /* --BEGIN ERROR HANDLING-- */    if (sreq == NULL)	return NULL;    /* --END ERROR HANDLING-- */    MPIU_Object_set_ref(sreq, 2);    sreq->kind = MPID_REQUEST_SEND;        for (i = 0; i < iov_count; i++)    {	sreq->dev.iov[i] = iov[i];    }    if (iov_offset == 0)    {	MPIU_Assert(iov[0].MPID_IOV_LEN == sizeof(MPIDI_CH3_Pkt_t));	sreq->dev.pending_pkt = *(MPIDI_CH3_PktGeneric_t *) iov[0].MPID_IOV_BUF;	sreq->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) &sreq->dev.pending_pkt;    }    sreq->dev.iov[iov_offset].MPID_IOV_BUF = 	(MPID_IOV_BUF_CAST)((char *) sreq->dev.iov[iov_offset].MPID_IOV_BUF + nb);    sreq->dev.iov[iov_offset].MPID_IOV_LEN -= nb;    sreq->dev.iov_count = iov_count;    sreq->dev.OnDataAvail = 0;    MPIDI_FUNC_EXIT(MPID_STATE_CREATE_REQUEST);    return sreq;}/*   * PRE-CONDITION: stream state = MPIDI_CH3I_VC_STATE_UNCONNECTED */#undef FUNCNAME#define FUNCNAME MPIDU_Sctp_enqueue_send#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)void inline MPIDU_Sctp_stream_init(MPIDI_VC_t* vc, MPID_Request* req, int stream){  MPID_Request* conn_req = NULL;  int mpi_errno;  /* stream hasn't been used yet, need to enqueue a connection packet */  if(SEND_CONNECTED(vc, stream) != MPIDI_CH3I_VC_STATE_CONNECTING ){    conn_req = create_request(VC_IOV(vc, stream), 2, 0, 0);    if(conn_req)    {      /*  don't put it in sendQ, but directly in Global_SendQ */      SEND_CONNECTED(vc, stream) = MPIDI_CH3I_VC_STATE_CONNECTING;      /* should be the connection request */      MPIDU_Sctp_post_writev(vc, conn_req, 0, NULL, stream);                } else {      mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0);      /* goto fn_exit; FIXME should return int so error can be passed up */    }      }   /* need to update state for upcalls to close protocol to fully work, and barriers */  if(vc->state == MPIDI_VC_STATE_INACTIVE)      vc->state = MPIDI_VC_STATE_ACTIVE;  if(req) {    MPIDI_CH3I_SendQ_enqueue_x(vc, req, stream);  }}/* returns number of bytes written. chunks long messages. writes until *  done or it comes across an EAGAIN  */#undef FUNCNAME#define FUNCNAME MPIDU_Sctp_writev#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDU_Sctp_writev(MPIDI_VC_t* vc, struct iovec* ldata,int iovcnt, int stream, int ppid, 		      MPIU_Size_t* nb) {    return MPIDU_Sctp_writev_fd(vc->ch.fd, &(vc->ch.to_address), ldata, iovcnt, stream, ppid, nb);}#undef FUNCNAME#define FUNCNAME MPIDU_Sctp_writev_fd#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDU_Sctp_writev_fd(int fd, struct sockaddr_in * to, struct iovec* ldata,                         int iovcnt, int stream, int ppid, MPIU_Size_t* nb){  int byte_sent, r, nwritten =0, i, sz = 0;  int mpi_errno = MPI_SUCCESS;  static struct iovec cdata[MPID_IOV_LIMIT]; /* not thread-safe */  struct iovec *data = cdata;  MPIU_Assert(iovcnt > 0);  MEMCPY(cdata, ldata, sizeof(struct iovec) *iovcnt);  /* calculate total size */  for(i = 0; i < iovcnt; i++) sz += ldata[i].iov_len;    do {    r = 0;    errno = 0;    if(sz <= CHUNK) /* vs. MPIDI_CH3_EAGER_MAX_MSG_SIZE? */ {      /* a short/eager message */      byte_sent = sz;      r = sctp_writev(fd, data, iovcnt,		      (struct sockaddr *) to, sizeof(*to), ppid, 0, stream, 0, 0, sz);    } else {      byte_sent = MPIR_MIN(CHUNK,data->iov_len);            MPIU_Assert(byte_sent != 0);      r = sctp_sendmsg(fd, data->iov_base, byte_sent, (struct sockaddr *) to,		       sizeof(*to), ppid, 0, stream, 0, 0);    }        /* update iov's (adjust_iov is static and does not handle errors and "errors" (EAGAIN)) */        if (r <= 0) {  /* error */      if(errno == EAGAIN || errno == ENOMEM || r == 0)	break;      else	goto fn_fail;    } else {  /* r > 0 */      MPIU_Assert(r == byte_sent);      nwritten += r;      adjust_iov(&data, &iovcnt, r);    }      } while(iovcnt > 0);    *nb = nwritten;   fn_exit:  return (nwritten >= 0)? MPI_SUCCESS : -1; fn_fail:  *nb = nwritten;  nwritten = -1;  perror("MPIDU_Sctp_writev");  goto fn_exit;}#undef FUNCNAME#define FUNCNAME MPIDU_Sctp_write#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDU_Sctp_write(MPIDI_VC_t* vc, void* buf, MPIU_Size_t len, 		     int stream_no, int ppid, MPIU_Size_t* num_written){  int err = MPI_SUCCESS;  ssize_t nb;    nb = my_sctp_send(vc->ch.fd, buf, len,                      (struct sockaddr *) &(vc->ch.to_address), stream_no, ppid);      *num_written = 0;  if(nb >= 0)    *num_written = nb;  else    perror("MPIDU_Sctp_write");  return (nb >= 0)? MPI_SUCCESS: -1;}#undef FUNCNAME#define FUNCNAME print_SCTP_event#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)void print_SCTP_event(struct MPIDU_Sctp_event * eventp){}/*  readv from advance buffer  */#undef FUNCNAME#define FUNCNAME readv_from_advbuf#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int readv_from_advbuf(MPID_Request* req, char* from, int bytes_read) {  MPID_IOV* iovp = req-> dev.iov;  int iov_cnt = req-> dev.iov_count;  int total_size, i, j, read_size, rc;  total_size = i = j = read_size = rc = 0;  for(i=0, j=0; j< iov_cnt; i++) {    if(iovp[i].MPID_IOV_LEN > 0) {      total_size += iovp[i].MPID_IOV_LEN;      j++;    }  }  rc = bytes_read = MPIR_MIN(bytes_read, total_size);  for(;bytes_read > 0;iovp++) {    

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -