📄 snd.c
字号:
/* $Header: /home/harrison/c/tcgmsg/ipcv4.0/RCS/snd.c,v 1.1 91/12/06 17:27:25 harrison Exp Locker: harrison $ */#include <stdio.h>#ifdef SEQUENT#include <strings.h>#else#include <string.h>#endif#ifdef AIX#include <sys/select.h>#endif#include <sys/types.h>#include <sys/time.h>#if defined(SUN)extern char *sprintf();#endifextern void Error();#include "sndrcv.h"#include "sndrcvP.h"#include "sockets.h"#ifdef GOTXDR#include "xdrstuff.h"#endif#ifdef SHMEM#if !defined(SEQUENT) && !defined(CONVEX)#include <memory.h>#endif#include "sema.h"#include "shmem.h"#if defined(ALLIANT)#define SRmover(a,b,n) memcpy(a,b,n)#elseextern void SRmover();#endif#endif SHMEM#ifdef EVENTLOG#include "evlog.h"#endif#ifdef SWTCH#include "sw.h"static int next_indexp1;static int next_lenmes;#endifvoid PrintProcInfo()/* Print out the SR_proc_info structure array for this process*/{ long i; (void) fprintf(stderr,"Process info for node %ld: \n",NODEID_()); for (i=0; i<NNODES_(); i++) (void) fprintf(stderr,"[%ld] = {\n\ clusid = %-8ld slaveid = %-8ld local = %-8ld\n\ sock = %-8d shmem = %-8x shmem_size = %-8ld\n\ shmem_id = %-8ld buffer = %-8x buflen = %-8ld\n\ header = %-8x semid = %-8ld sem_read = %-8ld\n\sem_written = %-8ld n_rcv = %-8ld nb_rcv = %-8ld\n\ t_rcv = %-8ld n_snd = %-8ld nb_snd = %-8ld\n\ t_snd = %-8ld, peeked = %-8ld}\n", i, SR_proc_info[i].clusid, SR_proc_info[i].slaveid, SR_proc_info[i].local, SR_proc_info[i].sock, SR_proc_info[i].shmem, SR_proc_info[i].shmem_size, SR_proc_info[i].shmem_id, SR_proc_info[i].buffer, SR_proc_info[i].buflen, SR_proc_info[i].header, SR_proc_info[i].semid, SR_proc_info[i].sem_read, SR_proc_info[i].sem_written, SR_proc_info[i].n_rcv, SR_proc_info[i].nb_rcv, SR_proc_info[i].t_rcv, SR_proc_info[i].n_snd, SR_proc_info[i].nb_snd, SR_proc_info[i].t_snd, SR_proc_info[i].peeked); (void) fflush(stderr);}static void PrintMessageHeader(info, header) char *info; MessageHeader *header;/* Print out the contents of a message header along with info message*/{ (void) printf("%2ld:%s: type=%ld, from=%ld, to=%ld, len=%ld, tag=%ld\n", NODEID_(),info, header->type, header->nodefrom, header->nodeto, header->length, header->tag); (void) fflush(stdout);}#if defined(ALLIANT) && defined(SWTCH)void snd_switch(me, node, port, buf, lenbuf, type) int me, node, port, lenbuf, type; char *buf;/* Send to switch taking care of long messages. Note have to dick around with type as current daemon does not necessarily send messages in order and we use this to order receipt of messages.*/{ int len; len = sw_send(me, node, port, buf, lenbuf, type); while (lenbuf -= len) { type += 1000000; buf += len; len = sw_send(me, node, port, buf, lenbuf, type); }} void rcv_switch(type, buf, lenbuf, lenmes, nodeselect, nodefrom) long *type; char *buf; long *lenbuf; long *lenmes; long *nodeselect; long *nodefrom;/* synchronous receive of data long *type = user defined type of received message (input) char *buf = data buffer (output) long *lenbuf = length of buffer in bytes (input) long *lenmes = length of received message in bytes (output) (exceeding receive buffer is hard error) long *nodeselect = node to receive from (input) long *nodefrom = node message is received from (output) */{ int indexp1 = next_indexp1; int len = next_lenmes; int me = NODEID_(); int match_type = *type; int len_got = 0; int len_packet; *nodefrom = *nodeselect; if (DEBUG_) { printf("%2d: rcv_sw_probe: from=%d, type=%d\n",me, *nodefrom, *type); sw_dump_bufs(); fflush(stdout); } if (!indexp1) while (!(indexp1 = sw_probe(nodefrom, me, &match_type, &len))) ; *lenmes = len; if (*lenmes > *lenbuf) Error("rcv_switch: message too long for buffer", *lenmes); len_got = len_packet = sw_recv(indexp1, buf); while (len_got < *lenmes) { match_type += 1000000; buf += len_packet; while (!(indexp1 = sw_probe(nodefrom, me, &match_type, &len))) ; len_packet = sw_recv(indexp1, buf); len_got += len_packet; } if (DEBUG_) { printf("%d: rcv_sw_done: from=%d, type=%d, len=%d\n", me, *nodefrom, *type, *lenmes); fflush(stdout); } next_indexp1 = 0;}#endif#ifdef SHMEMstatic int DummyRoutine(){int i, sum=0; for(i=0; i<10; i++) sum += i;}static long flag(p) long *p;{ return *p;}static void Await(p, value) long *p; long value;/* Wait until the value pointed to by p equals value. Since *ptr is volatile but cannot usually declare this include another level of procedure call to protect against compiler optimization.*/{ int nspin = 0; if (DEBUG_) { printf("%2ld: Await p=%x, value=%d\n", NODEID_(), p, value); fflush(stdout); printf("%2ld: Await *p=%d\n", NODEID_(), *p); fflush(stdout); } for (; flag(p) != value; nspin++) {#ifdef NOSPIN if (nspin < 100) DummyRoutine(); else USleep((long) 10000);#else if (nspin < 100000) DummyRoutine(); else USleep((long) 100000);#endif }}static void rcv_local(type, buf, lenbuf, lenmes, nodeselect, nodefrom) long *type; char *buf; long *lenbuf; long *lenmes; long *nodeselect; long *nodefrom;{ long me = NODEID_(); long node = *nodeselect; MessageHeader *head = SR_proc_info[node].header; long buflen = SR_proc_info[node].buflen; char *buffer = SR_proc_info[node].buffer; long nodeto, len;#ifdef NOSPIN long semid = SR_proc_info[node].semid; long sem_read = SR_proc_info[node].sem_read; long sem_written = SR_proc_info[node].sem_written; long semid_to = SR_proc_info[me].semid; long sem_pend = SR_proc_info[me].sem_pend;#else long *buffer_full = SR_proc_info[node].buffer_full;#endif /* Error checking */ if ( (buffer == (char *) NULL) || (head == (MessageHeader *) NULL) ) Error("rcv_local: invalid shared memory", (long) node);#ifdef NOSPIN if ( (semid < 0) || (sem_read < 0) || (sem_written < 0) || (semid_to < 0) || (sem_pend < 0) ) Error("rcv_local: invalid semaphore set", (long) node);#endif#ifdef NOSPIN SemWait(semid_to, sem_pend);#endif Await(&head->nodeto, me); /* Still have this possible spin */#ifdef NOSPIN SemWait(semid, sem_written);#else Await(buffer_full, (long) TRUE);#endif /* Now have a message for me ... check the header info and copy the first block of the message. */ if (DEBUG_) PrintMessageHeader("rcv_local ",head); nodeto = head->nodeto; /* Always me ... history here */ head->nodeto = -1; *nodefrom = head->nodefrom; if (head->type != *type) { PrintMessageHeader("rcv_local ",head); Error("rcv_local: type mismatch ... strong typing enforced", (long) *type); } *lenmes = len = head->length; if ( *lenmes > *lenbuf ) Error("rcv_local: message too long for buffer", (long) *lenmes); if (nodeto != me) Error("rcv_local: message meant for someone else?", (long) nodeto); if (len) (void) SRmover(buf, buffer, (len > buflen) ? buflen : len);#ifdef NOSPIN SemPost(semid, sem_read);#else *buffer_full = FALSE;#endif len -= buflen; buf += buflen; /* Copy the remainder of the message */ while (len > 0) {#ifdef NOSPIN SemWait(semid, sem_written);#else Await(buffer_full, (long) TRUE);#endif (void) SRmover(buf, buffer, (len > buflen) ? buflen : len);#ifdef NOSPIN SemPost(semid, sem_read);#else *buffer_full = FALSE;#endif len -= buflen; buf += buflen; }}static void snd_local(type, buf, lenbuf, node) long *type; char *buf; long *lenbuf; long *node;{ long me = NODEID_(); MessageHeader *head = SR_proc_info[me].header; long buflen = SR_proc_info[me].buflen; long len = *lenbuf; char *buffer = SR_proc_info[me].buffer; long tag = SR_proc_info[*node].n_snd;#ifdef NOSPIN long semid = SR_proc_info[me].semid; long sem_read = SR_proc_info[me].sem_read; long sem_written = SR_proc_info[me].sem_written; long semid_to = SR_proc_info[*node].semid; long sem_pend = SR_proc_info[*node].sem_pend;#else long *buffer_full = SR_proc_info[me].buffer_full;#endif /* Error checking */ if ( (buffer == (char *) NULL) || (head == (MessageHeader *) NULL) ) Error("snd_local: invalid shared memory", (long) *node);#ifdef NOSPIN if ( (semid < 0) || (semid_to < 0) || (sem_read < 0) || (sem_written < 0) ) Error("snd_local: invalid semaphore set", (long) *node);#endif /* Check that final segment of last message has been consumed */#ifdef NOSPIN SemWait(semid, sem_read);#else Await(buffer_full, (long) FALSE);#endif /* Fill in message header */ head->nodefrom = (char) me; head->nodeto = (char) *node; head->type = *type; head->length = *lenbuf; head->tag = tag; if (DEBUG_) { PrintMessageHeader("snd_local ",head); (void) fflush(stdout); } /* Copy the first piece of the message so that send along with header to minimize use of semaphores. Also need to send header even for messages of zero length */ if (len) (void) SRmover(buffer, buf, (len > buflen) ? buflen : len);#ifdef NOSPIN SemPost(semid, sem_written); SemPost(semid_to, sem_pend);#else *buffer_full = TRUE;#endif len -= buflen; buf += buflen; while (len > 0) {#ifdef NOSPIN SemWait(semid, sem_read);#else Await(buffer_full, (long) FALSE);#endif (void) SRmover(buffer, buf, (len > buflen) ? buflen : len);#ifdef NOSPIN SemPost(semid, sem_written);#else *buffer_full = TRUE;#endif len -= buflen; buf += buflen; }} #endifstatic void snd_remote(type, buf, lenbuf, node) long *type; char *buf; long *lenbuf; long *node;/* synchronous send to remote process long *type = user defined integer message type (input) char *buf = data buffer (input) long *lenbuf = length of buffer in bytes (input) long *node = node to send to (input) for zero length messages only the header is sent*/{ MessageHeader header; long me=NODEID_(); int sock=SR_proc_info[*node].sock; long len;#ifdef SOCK_FULL_SYNC char sync=0;#endif if ( sock < 0 ) Error("snd_remote: sending to process without socket", (long) *node); header.nodefrom = me; header.nodeto = *node; header.type = *type; header.length = *lenbuf; header.tag = SR_proc_info[*node].n_snd; /* header.length is the no. of items if XDR is used or just the number of bytes */#ifdef GOTXDR if ( *type & MSGDBL ) header.length = *lenbuf / sizeof(double); else if ( *type & MSGINT ) header.length = *lenbuf / sizeof(long); else if ( *type & MSGCHR ) header.length = *lenbuf / sizeof(char); else header.length = *lenbuf;#else header.length = *lenbuf;#endif if (DEBUG_) PrintMessageHeader("snd_remote",&header);#ifdef GOTXDR (void) WriteXdrLong(sock, (long *) &header, (long) (sizeof(header)/sizeof(long)));#else if ( (len = WriteToSocket(sock, (char *) &header, (long) sizeof(header))) != sizeof(header) ) Error("snd_remote: writing header to socket", len);#endif if (*lenbuf) {#ifdef GOTXDR if ( *type & MSGDBL ) (void) WriteXdrDouble(sock, (double *) buf, header.length); else if ( *type & MSGINT ) (void) WriteXdrLong(sock, (long *) buf, header.length); else if ( *type & MSGCHR ) (void) WriteXdrChar(sock, (char *) buf, header.length); else if ( (len = WriteToSocket(sock, buf, header.length)) != header.length) Error("snd_remote: writing message to socket", (long) (len+100000*(sock + 1000* *node)));#else if ( (len = WriteToSocket(sock, buf, header.length)) != header.length) Error("snd_remote: writing message to socket", (long) (len+100000*(sock + 1000* *node)));#endif }#ifdef SOCK_FULL_SYNC /* this read (and write in rcv_remote) of an acknowledgment forces synchronous */ if ( ReadFromSocket(sock, &sync, (long) 1) != 1) Error("snd_remote: reading acknowledgement", (long) (len+100000*(sock + 1000* *node)));#endif}/*ARGSUSED*/void SND_(type, buf, lenbuf, node, sync) long *type; char *buf; long *lenbuf; long *node; long *sync;/* mostly syncrhonous send long *type = user defined integer message type (input) char *buf = data buffer (input) long *lenbuf = length of buffer in bytes (input) long *node = node to send to (input) long *sync = flag for sync/async ... IGNORED for zero length messages only the header is sent*/{ long me=NODEID_(); long nproc=NNODES_();#ifdef TIMINGS double start;#endif /* Error checking */ if (*node == me) Error("SND_: cannot send message to self", (long) me); if ( (*node < 0) || (*node > nproc) ) Error("SND_: out of range node requested", (long) *node); if ( (*lenbuf < 0) || (*lenbuf > BIG_MESSAGE) ) Error("SND_: message length out of range", (long) *lenbuf);#ifdef EVENTLOG evlog(EVKEY_BEGIN, EVENT_SND, EVKEY_MSG_LEN, *lenbuf, EVKEY_MSG_FROM, me, EVKEY_MSG_TO, *node, EVKEY_MSG_TYPE, *type, EVKEY_MSG_SYNC, *sync, EVKEY_LAST_ARG);#endif /* Send via shared memory or sockets */#ifdef TIMINGS start = TCGTIME_();#endif#ifdef SHMEM
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -