📄 snd.c
字号:
if (SR_proc_info[*node].local){#ifdef KSR KSR_snd_local(type, buf, lenbuf, node);#else snd_local(type, buf, lenbuf, node);#endif } else {#endif#if defined(ALLIANT) && defined(SWTCH) int toport = SR_clus_info[SR_proc_info[*node].clusid].swtchport; int meport = SR_clus_info[SR_proc_info[me].clusid].swtchport; if ((toport >= 0) && (meport >= 0) && (toport != meport)) { if (DEBUG_) { printf("%d: snd_switch: toport=%d, fromport=%d, to=%d, type=%d\n", me, toport, meport, *node, *type); (void) fflush(stdout); } snd_switch(me, *node, toport, buf, *lenbuf, *type); } else {#endif snd_remote(type, buf, lenbuf, node);#if defined(ALLIANT) && defined(SWTCH) }#endif#ifdef SHMEM }#endif /* Collect statistics */ SR_proc_info[*node].n_snd += 1; SR_proc_info[*node].nb_snd += *lenbuf;#ifdef TIMINGS SR_proc_info[*node].t_snd += TCGTIME_() - start;#endif#ifdef EVENTLOG evlog(EVKEY_END, EVENT_SND, EVKEY_LAST_ARG);#endif} static long MatchMessage(header, me, type) MessageHeader *header; long me, type;/* Wrapper round check on if header is to me and of required type so that compiler does not optimize out fetching header info from shared memory.*/{ return (long) ((header->nodeto == me) && (header->type == type));}static long NextReadyNode(type) long type;/* Select a node from which input is pending ... also match the desired type. next_node is maintained as the last node that NextReadyNode chose plus one modulo NNODES_(). This aids in ensuring fairness. First use select to get info about the sockets and then loop through processes looking either at the bit in the fd_set for the socket (remote process) or the message header in the shared memory buffer (local process). This may be an expensive operation but fairness seems important.*/{ static long next_node = 0; long nproc = NNODES_(); long me = NODEID_();#ifdef SWTCH long meport = SR_clus_info[SR_proc_info[me].clusid].swtchport; long nextport, iport;#endif int i, nspin = 0; /* With both local and remote processes end up with a busy wait as no way to wait for both a semaphore and a socket. Moderate this slightly by having short timeout in select */ while (1) { for(i=0; i<nproc; i++, next_node = (next_node + 1) % nproc) {#ifdef SWTCH nextport = SR_clus_info[SR_proc_info[next_node].clusid].swtchport;#endif if (next_node == me) { ; /* can't receive from self */ } else if (SR_proc_info[next_node].local) { /* Look for local message */#ifdef KSR if (KSR_MatchMessage(next_node, me, type))#else if (MatchMessage(SR_proc_info[next_node].header, me, type))#endif break; }#ifdef SWTCH else if ((meport >= 0) && (nextport >= 0) && (meport != nextport)) { /* Look for message over HIPPI switch */ if (next_indexp1 = sw_probe(&next_node, me, &type, &next_lenmes)) break; }#endif else if (SR_proc_info[next_node].sock >= 0) { /* Look for message over socket */ int sock = SR_proc_info[next_node].sock; /* Have we already peeked at this socket? */ if (SR_proc_info[next_node].peeked) { if (SR_proc_info[next_node].head_peek.type == type) break; } else if (PollSocket(sock)) { /* Data is available ... let's peek at it */#ifdef GOTXDR (void) ReadXdrLong(sock, (long *) &SR_proc_info[next_node].head_peek, (long) (sizeof(MessageHeader)/sizeof(long)));#else if (ReadFromSocket(sock, (char *) &SR_proc_info[next_node].head_peek, (long) sizeof(MessageHeader)) != sizeof(MessageHeader) ) Error("NextReadyNode: reading header from socket", next_node);#endif SR_proc_info[next_node].peeked = TRUE; if (DEBUG_) PrintMessageHeader("peeked_at ", &SR_proc_info[next_node].head_peek); if (SR_proc_info[next_node].head_peek.type == type) break; } } } if (i < nproc) /* If found a node skip out of the while loop */ break; nspin++; /* Compromise between low latency and low cpu use */ if (nspin < 10) continue; else if (nspin < 100) USleep((long) 1000); else if (nspin < 600) USleep((long) 10000); else USleep((long) 1000000); } i = next_node; next_node = (next_node + 1) % nproc; return (long) i;}long PROBE_(type, node) long *type, *node; /* Return 1/0 (TRUE/FALSE) if a message of the given type is available from the given node. If the node is specified as -1, then all nodes will be examined. Some attempt is made at ensuring fairness. First use select to get info about the sockets and then loop through processes looking either at the bit in the fd_set for the socket (remote process) or the message header in the shared memory buffer (local process). This may be an expensive operation but fairness seems important. */{ static long next_node = 0; long nproc = NNODES_(); long me = NODEID_();#ifdef SWTCH long meport = SR_clus_info[SR_proc_info[me].clusid].swtchport; long nextport, iport;#endif int i, proclo, prochi; if (*node == me) Error("PROBE_ : cannot recv message from self, msgtype=", *type); if (*node == -1) { /* match anyone */ proclo = 0; prochi = nproc-1; } else proclo = prochi = *node; for(i=proclo; i<=prochi; i++) { #ifdef SWTCH nextport = SR_clus_info[SR_proc_info[i].clusid].swtchport;#endif if (i == me) { ; /* can't receive from self */ } else if (SR_proc_info[i].local) { /* Look for local message */ #ifdef KSR if (KSR_MatchMessage(i, me, type))#else if (MatchMessage(SR_proc_info[i].header, me, *type))#endif break; }#ifdef SWTCH else if ((meport >= 0) && (nextport >= 0) && (meport != nextport)) { /* Look for message over HIPPI switch */ if (next_indexp1 = sw_probe(&i, me, &type, &next_lenmes)) break; }#endif else if (SR_proc_info[i].sock >= 0) { /* Look for message over socket */ int sock = SR_proc_info[i].sock; /* Have we already peeked at this socket? */ if (SR_proc_info[i].peeked) { if (SR_proc_info[i].head_peek.type == *type) break; } else if (PollSocket(sock)) { /* Data is available ... let's peek at it */#ifdef GOTXDR (void) ReadXdrLong(sock, (long *) &SR_proc_info[i].head_peek, (long) (sizeof(MessageHeader)/sizeof(long)));#else if (ReadFromSocket(sock, (char *) &SR_proc_info[i].head_peek, (long) sizeof(MessageHeader)) != sizeof(MessageHeader) ) Error("NextReadyNode: reading header from socket", i);#endif SR_proc_info[i].peeked = TRUE; if (DEBUG_) PrintMessageHeader("peeked_at ", &SR_proc_info[i].head_peek); if (SR_proc_info[i].head_peek.type == *type) break; } } } if (i <= prochi) return 1; else return 0;}static void rcv_remote(type, buf, lenbuf, lenmes, nodeselect, nodefrom) long *type; char *buf; long *lenbuf; long *lenmes; long *nodeselect; long *nodefrom;/* synchronous receive of data long *type = user defined type of received message (input) char *buf = data buffer (output) long *lenbuf = length of buffer in bytes (input) long *lenmes = length of received message in bytes (output) (exceeding receive buffer is hard error) long *nodeselect = node to receive from (input) -1 implies that any pending message may be received long *nodefrom = node message is received from (output)*/{ long me = NODEID_(); long node = *nodeselect; int sock = SR_proc_info[node].sock; long len; MessageHeader header;#ifdef SOCK_FULL_SYNC char sync = 0;#endif if ( sock < 0 ) Error("rcv_remote: receiving from process without socket", (long) node); /* read the message header and check contents */ if (SR_proc_info[node].peeked) { /* Have peeked at this socket ... get message header from buffer */ if (DEBUG_) printf("%2ld: rcv_remote message has been peeked at\n", me); (void) memcpy((char *) &header, (char *) &SR_proc_info[node].head_peek, sizeof(MessageHeader)); SR_proc_info[node].peeked = FALSE; } else {#ifdef GOTXDR (void) ReadXdrLong(sock, (long *) &header, (long) (sizeof(header)/sizeof(long)));#else if ( (len = ReadFromSocket(sock, (char *) &header, (long) sizeof(header))) != sizeof(header) ) Error("rcv_remote: reading header from socket", len);#endif } if (DEBUG_) PrintMessageHeader("rcv_remote",&header); if (header.nodeto != me) { PrintMessageHeader("rcv_remote",&header); Error("rcv_remote: got message meant for someone else", (long) header.nodeto); } *nodefrom = header.nodefrom; if (*nodefrom != node) Error("rcv_remote: got message from someone on incorrect socket", (long) *nodefrom); if (header.type != *type) { PrintMessageHeader("rcv_remote",&header); Error("rcv_remote: type mismatch ... strong typing enforced", (long) *type); }#ifdef GOTXDR if ( *type & MSGDBL ) *lenmes = header.length * sizeof(double); else if ( *type & MSGINT ) *lenmes = header.length * sizeof(long); else if ( *type & MSGCHR ) *lenmes = header.length * sizeof(char); else *lenmes = header.length; #else *lenmes = header.length; #endif if ( (*lenmes < 0) || (*lenmes > BIG_MESSAGE) || (*lenmes > *lenbuf) ) { PrintMessageHeader("rcv_remote",&header); (void) fprintf(stderr, "rcv_remote err: lenbuf=%ld\n",*lenbuf); Error("rcv_remote: message length out of range",(long) *lenmes); } if (*lenmes > 0) {#ifdef GOTXDR if ( *type & MSGDBL ) (void) ReadXdrDouble(sock, (double *) buf, header.length); else if ( *type & MSGINT ) (void) ReadXdrLong(sock, (long *) buf, header.length); else if ( *type & MSGCHR ) (void) ReadXdrChar(sock, (char *) buf, header.length); else if ( (len = ReadFromSocket(sock, buf, *lenmes)) != *lenmes) Error("rcv_remote: reading message from socket", (long) (len+100000*(sock+ 1000* *nodefrom)));#else if ( (len = ReadFromSocket(sock, buf, *lenmes)) != *lenmes) Error("rcv_remote: reading message from socket", (long) (len+100000*(sock+ 1000* *nodefrom)));#endif } /* this write (and read in snd_remote) makes the link synchronous */#ifdef SOCK_FULL_SYNC if ( WriteToSocket(sock, &sync, (long) 1) != 1) Error("rcv_remote: writing sync to socket", (long) node);#endif}/*ARGSUSED*/void RCV_(type, buf, lenbuf, lenmes, nodeselect, nodefrom, sync) long *type; char *buf; long *lenbuf; long *lenmes; long *nodeselect; long *nodefrom; long *sync;/* long *type = user defined type of received message (input) char *buf = data buffer (output) long *lenbuf = length of buffer in bytes (input) long *lenmes = length of received message in bytes (output) (exceeding receive buffer is hard error) long *nodeselect = node to receive from (input) -1 implies that any pending message may be received long *nodefrom = node message is received from (output) long *sync = 0 for asynchronous, 1 for synchronous (NOT USED)*/{ long me = NODEID_(); long nproc = NNODES_(); long node;#ifdef TIMINGS double start;#endif#ifdef EVENTLOG evlog(EVKEY_BEGIN, EVENT_RCV, EVKEY_MSG_FROM, *nodeselect, EVKEY_MSG_TO, me, EVKEY_MSG_TYPE, *type, EVKEY_MSG_SYNC, *sync, EVKEY_LAST_ARG);#endif /* Assign the desired node or the next ready node */#ifdef TIMINGS start = TCGTIME_();#endif#ifdef SWTCH next_indexp1 = 0; /* IF nodeselect = -1 will be set to message index */#endif if (*nodeselect == -1) node = NextReadyNode(*type); else node = *nodeselect; /* Check for some errors ... need more checking here ... note that the overall master process has id nproc */ if (node == me) Error("RCV_: cannot receive message from self", (long) me); if ( (node < 0) || (node > nproc) ) Error("RCV_: out of range node requested", (long) node); /* Receive the message ... use shared memory, switch or socket */#ifdef SHMEM if (SR_proc_info[node].local){#ifdef KSR KSR_rcv_local(type, buf, lenbuf, lenmes, &node, nodefrom);#else rcv_local(type, buf, lenbuf, lenmes, &node, nodefrom);#endif } else {#endif#if defined(ALLIANT) && defined(SWTCH) int frport = SR_clus_info[SR_proc_info[node].clusid].swtchport; int meport = SR_clus_info[SR_proc_info[me].clusid].swtchport; if ((frport >= 0) && (meport >= 0) && (frport != meport)) { rcv_switch(type, buf, lenbuf, lenmes, &node, nodefrom); } else {#endif rcv_remote(type, buf, lenbuf, lenmes, &node, nodefrom);#if defined(ALLIANT) && defined(SWTCH) }#endif#ifdef SHMEM }#endif /* Collect statistics */ SR_proc_info[node].n_rcv += 1; SR_proc_info[node].nb_rcv += *lenmes;#ifdef TIMINGS SR_proc_info[node].t_rcv += TCGTIME_() - start;#endif#ifdef EVENTLOG evlog(EVKEY_END, EVENT_RCV, EVKEY_MSG_FROM, node, EVKEY_MSG_LEN, *lenmes, EVKEY_LAST_ARG);#endif} void RemoteConnect(a, b, c) long a, b, c;/* Make a socket connection between processes a and b via the process c to which both are already connected.*/{ long me = NODEID_(); long nproc = NNODES_(); long type = TYPE_CONNECT; /* Overriden below */ char cport[8]; long tmp, lenmes, nodefrom, clusid, lenbuf, sync=1; int sock, port; long lport;#ifdef SWTCH int aport = SR_clus_info[SR_proc_info[a].clusid].swtchport; int bport = SR_clus_info[SR_proc_info[b].clusid].swtchport;#endif if ((a == b) || (a == c) || (b == c) ) return; /* Gracefully ignore redundant connections */ if ( (me != a) && (me != b) && (me != c) ) return; /* I'm not involved in this connection */ #ifdef SWTCH /* If connected over HiPPI don't need a socket also */ if ((aport >= 0) && (bport >= 0) && (aport != bport)) return;#endif if (a < b) { tmp = a; a = b; b = tmp; } type = (a + nproc*b) | MSGINT; /* Create a unique type */ if (DEBUG_) { (void) printf("RC a=%ld, b=%ld, c=%ld, me=%ld\n",a,b,c,me); (void) fflush(stdout); } if (a == me) { CreateSocketAndBind(&sock, &port); /* Create port */ if (DEBUG_) { (void) printf("RC node=%ld, sock=%d, port=%d\n",me, sock, port); (void) fflush(stdout); } lport = port; lenbuf = sizeof lport; SND_(&type, (char *) &lport, &lenbuf, &c, &sync); /* Port to intermediate */ SR_proc_info[b].sock = ListenAndAccept(sock); /* Accept connection and save socket info */ } else if (b == me) { clusid = SR_proc_info[a].clusid; lenbuf = sizeof lport; RCV_(&type, (char *) &lport, &lenbuf, &lenmes, &c, &nodefrom, &sync); port = lport; (void) sprintf(cport,"%d",port); lenbuf = strlen(cport) + 1; if (lenbuf > sizeof cport) Error("RemoteConnect: cport too small", (long) lenbuf); SR_proc_info[a].sock = CreateSocketAndConnect(SR_clus_info[clusid].hostname, cport); } else if (c == me) { lenbuf = sizeof lport; RCV_(&type, (char *) &lport, &lenbuf, &lenmes, &a, &nodefrom, &sync); SND_(&type, (char *) &lport, &lenbuf, &b, &sync); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -