⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mpidi_ch3_impl.h

📁 fortran并行计算包
💻 H
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* *  (C) 2001 by Argonne National Laboratory. *      See COPYRIGHT in top-level directory. */#if !defined(MPICH_MPIDI_CH3_IMPL_H_INCLUDED)#define MPICH_MPIDI_CH3_IMPL_H_INCLUDED#include "mpidi_ch3i_sctp_conf.h"#include "mpidi_ch3_conf.h"#include "mpidimpl.h"#include "all_hash.h"typedef int (* MPIDU_Sock_progress_update_func_t)(MPIU_Size_t num_bytes, void * user_ptr);typedef struct MPIDU_Sock_ifaddr_t {    int len, type;    unsigned char ifaddr[16];} MPIDU_Sock_ifaddr_t;#define MPIDU_SOCK_ERR_NOMEM -1#define MPIDU_SOCK_ERR_TIMEOUT -2/*  hash table entry */typedef struct hash_entry{    sctp_assoc_t assoc_id;    MPIDI_VC_t * vc;} MPIDI_CH3I_Hash_entry;/* global hash table */extern HASH* MPIDI_CH3I_assocID_table;/* one_to_many socket */extern int MPIDI_CH3I_onetomany_fd;/* number of items in the sendq's */extern int sendq_total;extern int MPIDI_CH3I_listener_port;/* used for dynamic processes */extern MPIDI_VC_t * MPIDI_CH3I_dynamic_tmp_vc;extern int MPIDI_CH3I_dynamic_tmp_fd;/* determine the stream # of a req *//*  want to avoid stream zero so we can use it as a control stream. *    also, we want to use context so that collectives don't cause head-of-line blocking *    for p2p... *//*#define Req_Stream_from_match(match) (match.tag)%MPICH_SCTP_NUM_STREAMS*//* the following keeps stream s.t. 1 <= stream <= MPICH_SCTP_NUM_REQS_ACTIVE */#define Req_Stream_from_match(match) (abs((match.tag) + (match.context_id))% MPICH_SCTP_NUM_REQS_ACTIVE)+1#define REQ_Stream(req) Req_Stream_from_match(req->dev.match)int Req_Stream_from_pkt_and_req(MPIDI_CH3_Pkt_t * pkt, MPID_Request * sreq);/* initialize stream */#define STREAM_INIT(x) \{\    x-> vc = NULL;\    x-> have_sent_pg_id = MPIDI_CH3I_VC_STATE_UNCONNECTED;\    x-> send_active = NULL;\    x-> recv_active = NULL;\    x-> sendQ_head = NULL;\    x-> sendQ_tail = NULL;\}/* use of STREAM macros to allow change of logic in future */#define SEND_CONNECTED(vc, x) vc->ch.stream_table[x].have_sent_pg_id#define SEND_ACTIVE(vc, x) vc->ch.stream_table[x].send_active#define RECV_ACTIVE(vc, x) vc->ch.stream_table[x].recv_active#define VC_SENDQ(vc, x) vc->ch.stream_table[x].sendQ_head#define VC_SENDQ_TAIL(vc, x) vc->ch.stream_table[x].sendQ_tail#define VC_IOV(vc, x) vc->ch.stream_table[x].iov/* MT - not thread safe! *//*  enqueue to a specific stream */#define MPIDI_CH3I_SendQ_enqueue_x(vc, req, x)                \{                                                           \    MPIDI_DBG_PRINTF((50, FCNAME, "SendQ_enqueue vc=%p req=0x%08x", vc, req->handle));  \    sendq_total++; \    req->dev.next = NULL;						\    if (VC_SENDQ_TAIL(vc, x) != NULL)					\    {									\	VC_SENDQ_TAIL(vc, x)->dev.next = req;				\    }									\    else								\    {									\	VC_SENDQ(vc, x) = req;					\    }									\    VC_SENDQ_TAIL(vc, x) = req;						\}/* MT - not thread safe! *//*  enqueue to a particular stream */#define MPIDI_CH3I_SendQ_enqueue_head_x(vc, req, x)				\{									\    MPIDI_DBG_PRINTF((50, FCNAME, "SendQ_enqueue_head vc=%p req=0x%08x", vc, req->handle));\    sendq_total++; \    req->dev.next = VC_SENDQ(vc, x);					\    if (VC_SENDQ_TAIL(vc, x) == NULL)					\    {									\	VC_SENDQ_TAIL(vc, x) = req;					\    }									\    VC_SENDQ(vc, x) = req;						\}/* MT - not thread safe! */#define MPIDI_CH3I_SendQ_dequeue_x(vc, x)					\{									\    MPIDI_DBG_PRINTF((50, FCNAME, "SendQ_dequeue vc=%p req=0x%08x", vc, VC_SENDQ(vc,x)->handle));\    MPIU_Assert(sendq_total > 0);\    sendq_total--; \    VC_SENDQ(vc, x) = VC_SENDQ(vc, x)->dev.next;\    if (VC_SENDQ(vc, x) == NULL)					\    {									\	VC_SENDQ_TAIL(vc, x) = NULL;					\    }									\}/*  sendQ head for specific stream */#define MPIDI_CH3I_SendQ_head_x(vc, x) (VC_SENDQ(vc, x))#define MPIDI_CH3I_SendQ_empty_x(vc, x) (VC_SENDQ(vc, x) == NULL)/*  posted IOV macros */#define POST_IOV(x) (x->write.iov.ptr)#define POST_IOV_CNT(x) (x->write.iov.iov_count)#define POST_IOV_OFFSET(x) (x->write.iov.offset)#define POST_IOV_FLAG(x) (x->write_iov_flag) #define POST_BUF(x) (x->write.buf.ptr)#define POST_BUF_MIN(x) (x->write.buf.min)#define POST_BUF_MAX(x) (x->write.buf.max)#define POST_UPDATE_FN(x) (x)/*  determine if SCTP_WAIT should keep spinning */#define SPIN(x) (x == 0)? FALSE : TRUE/*  global event queue */#define MPIDU_SCTP_EVENTQ_POOL_SIZE 20typedef enum MPIDU_Sctp_op{    MPIDU_SCTP_OP_READ,    MPIDU_SCTP_OP_WRITE,    MPIDU_SCTP_OP_CLOSE,    MPIDU_SCTP_OP_WAKEUP,    MPIDU_SCTP_OP_ACCEPT} MPIDU_Sctp_op_t;typedef struct MPIDU_Sctp_event{    MPIDU_Sctp_op_t op_type;    MPIU_Size_t num_bytes;    int fd; /* in case we ever use more than one socket */    sctp_rcvinfo sri;  /* this points to sctp_sndrcvinfo always for now on read */    /* int stream_no; */  /* this is in the sri for read but no sri is provided for write */    void * user_ptr;  /* this points to sctp_advbuf always for reads, and to vc for writes */    void * user_ptr2;    int user_value; /* can differ from flags in sri */    int error;} MPIDU_Sctp_event_t;struct MPIDU_Sctp_eventq_elem {    MPIDU_Sctp_event_t event[MPIDU_SCTP_EVENTQ_POOL_SIZE];    int size;    int head;    int tail;    struct MPIDU_Sctp_eventq_elem* next;};/* need global event queues, because we no longer have sock_set */extern struct MPIDU_Sctp_eventq_elem* eventq_head;extern struct MPIDU_Sctp_eventq_elem* eventq_tail;int MPIDU_Sctp_event_enqueue(MPIDU_Sctp_op_t op, MPIU_Size_t num_bytes, 				    sctp_rcvinfo* sri, int fd, 				    void * user_ptr, void * user_ptr2, int msg_flags, int error);int MPIDU_Sctp_event_dequeue(struct MPIDU_Sctp_event * eventp);void MPIDU_Sctp_free_eventq_mem(void);void print_SCTP_event(struct MPIDU_Sctp_event * eventp);/*  send queue */void MPIDU_Sctp_stream_init(MPIDI_VC_t* vc, MPID_Request* req, int stream);int MPIDU_Sctp_writev_fd(int fd, struct sockaddr_in * to, struct iovec* ldata,                         int iovcnt, int stream, int ppid, MPIU_Size_t* nb);int MPIDU_Sctp_writev(MPIDI_VC_t* vc, struct iovec* data,int iovcnt, 		      int stream, int ppid, MPIU_Size_t* nb);int MPIDU_Sctp_write(MPIDI_VC_t* vc, void* buf, MPIU_Size_t len, 		     int stream_no, int ppid, MPIU_Size_t* num_written);int readv_from_advbuf(MPID_Request* req, char* from, int bytes_read);inline int MPIDU_Sctp_post_writev(MPIDI_VC_t* vc, MPID_Request* sreq, int offset,				  MPIDU_Sock_progress_update_func_t fn, int stream_no);inline int MPIDU_Sctp_post_write(MPIDI_VC_t* vc, MPID_Request* sreq, MPIU_Size_t minlen, 			  MPIU_Size_t maxlen, MPIDU_Sock_progress_update_func_t fn, int stream_no);/* duplicate static existed so made this non-static */int adjust_iov(MPID_IOV ** iovp, int * countp, MPIU_Size_t nb);/* used in ch3_init.c and ch3_progress.c . sock equivalents in util/sock */int MPIDU_Sctp_wait(int fd, int timeout, MPIDU_Sctp_event_t * event);int MPIDI_CH3I_Progress_handle_sctp_event(MPIDU_Sctp_event_t * event);int MPIDI_CH3U_Get_business_card_sctp(char **value, int *length);int MPIDU_Sctp_get_conninfo_from_bc( const char *bc, 				     char *host_description, int maxlen,				     int *port, MPIDU_Sock_ifaddr_t *ifaddr, 				     int *hasIfaddr );/* BUFFER management routines/structs */#define READ_AMOUNT 262144typedef struct my_buffer {  int free_space; /* free space */  char buffer[READ_AMOUNT];  char* buf_ptr;  struct my_buffer* next; /* if we ever chain bufferNode */  char dynamic;} BufferNode_t;inline void BufferList_init(BufferNode_t* node);int inline buf_init(BufferNode_t* node);int inline buf_clean(BufferNode_t* node);inline char* request_buffer(int size, BufferNode_t** node);void inline update_size(BufferNode_t* node, int size);#define remain_size(x) x-> free_space/* END BUFFER management *//* FIXME: Any of these used in the ch3->channel interface should be   defined in a header file in ch3/include that defines the    channel interface */int MPIDI_CH3I_Progress_init(int pg_size);int MPIDI_CH3I_Progress_finalize(void);int MPIDI_CH3I_VC_post_connect(MPIDI_VC_t *);/*  global sendQ stuff */typedef struct gb_sendQ_head {    struct MPID_Request* head;    struct MPID_Request* tail;    int count;} GLB_SendQ_Head;extern GLB_SendQ_Head Global_SendQ;/*  global sendQ enqueue *//*  malloc is not needed, because ch portion of the REQ has all the info  *   we need: vc, stream */#define Global_SendQ_enqueue(vc, req, x) {               \        req->ch.vc = vc;                        \        req->ch.stream = x;                \        SEND_ACTIVE(vc, x) = req;\        req->dev.next = NULL;\	if(Global_SendQ.head == NULL) {	\		Global_SendQ.head = req;	\                Global_SendQ.tail = req;  \	} else {				\		Global_SendQ.tail->dev.next = req;	\		Global_SendQ.tail = req;		\	}					\	Global_SendQ.count++;			\}/*  init global sendQ */#define Global_SendQ_init() {                   \	Global_SendQ.count = 0;			\	Global_SendQ.head = NULL;		\	Global_SendQ.tail = Global_SendQ.head;	\}#define Global_SendQ_head() (Global_SendQ.head)#define Global_SendQ_count() (Global_SendQ.count)/*  dequeue + pop */#define Global_SendQ_dequeue(x) {                                       \	if(Global_SendQ.count) {					\	        x = Global_SendQ.head;                               \		Global_SendQ.head = Global_SendQ.head->dev.next;	\		Global_SendQ.count--;					\	}                                                               \	if(!Global_SendQ.count) {					\		Global_SendQ.tail = Global_SendQ.head;	         	\	}								\}/* PERFORMANCE stuff */#define PERF_MEASUREtypedef struct perf_struct {    int wait_count;    int event_q_size;} Performance_t;/* END PERFORMANCE stuff *//* Connection setup OPT */#define CONNECT_THRESHOLD 2/* Memory copy optimization *//* Nemesis memcpy opt. can go here */#define MEMCPY(to, from, nb) memcpy(to, from, nb)#endif /* !defined(MPICH_MPIDI_CH3_IMPL_H_INCLUDED) */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -