⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 p4_tsr.c

📁 MPICH是MPI的重要研究,提供了一系列的接口函数,为并行计算的实现提供了编程环境.
💻 C
字号:
#include "p4.h"#include "p4_sys.h"/* * search_p4_queue tries to locate a message of the desired type in the * local queue of messages already received.  If it finds one, it dequeues it  * if deq is true, and returns its address; otherwise it returns NULL. */struct p4_msg *search_p4_queue( int req_type, int req_from, P4BOOL deq){    struct p4_queued_msg *qpp, *qp;    struct p4_msg *tqp;    P4BOOL found;    tqp = NULL;    qpp = NULL;    found = P4_FALSE;    qp = p4_local->queued_messages->first_msg;    while (qp)    {	if (qp->qmsg->ack_req & P4_BROADCAST_MASK)	{	    /* RMB p4_dprintfl(00, "subbrdcst type = %d, sender = %d\n",req_type,req_from); */	    if (subtree_broadcast_p4(qp->qmsg->type, qp->qmsg->from,(char *) &qp->qmsg->msg,				     qp->qmsg->len, qp->qmsg->data_type))	    {		p4_dprintf("search_p4_queue: failed\n");		return(NULL);	    }	    qp->qmsg->ack_req &= ~P4_BROADCAST_MASK;	/* Unset broadcast bit */	}	qp = qp->next;    }    qp = p4_local->queued_messages->first_msg;    while (qp && !found)    {	if (((qp->qmsg->type == req_type) || (req_type == -1)) &&	    ((qp->qmsg->from == req_from) || (req_from == -1)))	{	    found = P4_TRUE;	    if (deq)	    {		if (p4_local->queued_messages->first_msg ==		    p4_local->queued_messages->last_msg)		{		    p4_local->queued_messages->first_msg = NULL;		    p4_local->queued_messages->last_msg = NULL;		}		else		{		    if (qp == p4_local->queued_messages->first_msg)		    {			p4_local->queued_messages->first_msg = qp->next;		    }		    else if (qp == p4_local->queued_messages->last_msg)		    {			p4_local->queued_messages->last_msg = qpp;			qpp->next = NULL;		    }		    else		    {			qpp->next = qp->next;		    }		}	    }	}	else	{	    qpp = qp;	    qp = qp->next;	}    }    if (found)    {	p4_dprintfl(30,"extracted queued msg of type %d from %d\n",		    qp->qmsg->type,qp->qmsg->from);	tqp = qp->qmsg;	if (deq)	{	    free_quel(qp);	}    }    return (tqp);}/* * This is the top-level receive routine, called by the user. *   req_type is either a desired type or -1.  In the -1 case it will be *        modified  by p4_recv to indicate the type actually received. *   req_from is either a desired source or -1.  In the -1 case it will be *        modified by p4_recv to the source of the message actually received. *   msg will be set by p4_recv to point to a buffer containing the message. *   len_rcvd will be set by p4_recv to contain the length of the message. * *   returns 0 if successful; non-zero if error */int p4_recv( int *req_type, int *req_from, char **msg, int *len_rcvd){    struct p4_msg *tmsg, *tempmsg;    P4BOOL good;    p4_dprintfl(20, "receiving for type = %d, sender = %d\n",		*req_type, *req_from);    ALOG_LOG(p4_local->my_id,END_USER,0,"");    ALOG_LOG(p4_local->my_id,BEGIN_RECV,*req_from,"");    for (good = P4_FALSE; !good;)    {	ALOG_LOG(p4_local->my_id,END_RECV,0,"");	ALOG_LOG(p4_local->my_id,BEGIN_WAIT,0,"");	if (!(tmsg = search_p4_queue(*req_type, *req_from, 1)))	{	    tmsg = recv_message(req_type, req_from);	    /*****	    if (tmsg)		p4_dprintfl(00, "received type = %d, sender = %d\n",		            tmsg->type,tmsg->from);	    *****/	}	ALOG_LOG(p4_local->my_id,END_WAIT,0,"");	ALOG_LOG(p4_local->my_id,BEGIN_RECV,0,"");	if (tmsg == NULL)	{	    p4_dprintfl(70,"p4_recv: got NULL back from recv_message\n");	    continue;	}	if (((tmsg->type == *req_type) || (*req_type == -1)) &&	    ((tmsg->from == *req_from) || (*req_from == -1)))	{	    good = P4_TRUE;	}	if (tmsg->ack_req & P4_BROADCAST_MASK)	{	    /* RMB p4_dprintfl(00, "subbrdcst type = %d, sender = %d\n",*req_type,*req_from); */	    if (subtree_broadcast_p4(tmsg->type, tmsg->from,(char *) &tmsg->msg,				     tmsg->len, tmsg->data_type))	    {		p4_dprintf("p4_recv: subtree_brdcst failed\n");		return(-1);	    }	    tmsg->ack_req &= ~P4_BROADCAST_MASK;	/* Unset broadcast bit */	}	if (!good)	    queue_p4_message(tmsg, p4_local->queued_messages);    }    *req_type = tmsg->type;    *req_from = tmsg->from;    p4_dprintfl(10, "received type=%d, from=%d\n",*req_type,*req_from);    if (*msg == NULL)    {	*msg = (char *) &(tmsg->msg);	*len_rcvd = tmsg->len;    }    else    {	tempmsg = (struct p4_msg *)	    ((*msg) - (sizeof(struct p4_msg) - sizeof(char *)));	/* copy into the user's buffer area, truncating if necessary */	if (tmsg->len < tempmsg->orig_len)	    *len_rcvd = tmsg->len;	else	    *len_rcvd = tempmsg->orig_len;	bcopy((char *) &(tmsg->msg),*msg,*len_rcvd);	tmsg->msg_id = (-1);	free_p4_msg(tmsg);    }    ALOG_LOG(p4_local->my_id,END_RECV,*req_from,"");    ALOG_LOG(p4_local->my_id,BEGIN_USER,0,"");    return (0);}struct p4_msg *recv_message( int *req_type, int *req_from ){    p4_dprintfl( 99, "Starting recv_message for type = %d and sender = %d\n",		 *req_type, *req_from );/* cut down to just this piece for MPD */#if  defined(CAN_DO_SOCKET_MSGS)    return (socket_recv( P4_TRUE ));#endif}static int first_call = 1;static struct p4_msg_queue *local_mq;static struct p4_msg_queue *local_qp;P4BOOL p4_any_messages_available()/* sometimes want a simple call with little overhead  - SRM    did not make much difference - the main overhead is in the select within   sock_msg_on_fd() */{      int  qidx;  if(first_call)    {      qidx = p4_local->my_id - p4_global->low_cluster_id;      local_mq = &(p4_global->shmem_msg_queues[qidx]);      local_qp = p4_local->queued_messages;      first_call = 0;    }#if defined(CAN_DO_SOCKET_MSGS)  return((local_qp->first_msg) || (local_mq->first_msg) ||	 socket_msgs_available());#else  return((local_qp->first_msg) || (local_mq->first_msg));#endif}P4BOOL p4_messages_available(req_type, req_from)int *req_type, *req_from;{    int found;    struct p4_msg *tmsg;    ALOG_LOG(p4_local->my_id,END_USER,0,"");    ALOG_LOG(p4_local->my_id,BEGIN_WAIT,1,"");    found = P4_FALSE;    if ((tmsg = search_p4_queue(*req_type, *req_from, 0)))    {	found = P4_TRUE;	*req_type = tmsg->type;	*req_from = tmsg->from;    }#   if defined(CAN_DO_SHMEM_MSGS)    while (!found && shmem_msgs_available())    {	tmsg = shmem_recv();	if (((tmsg->type == *req_type) || (*req_type == -1)) &&	    ((tmsg->from == *req_from) || (*req_from == -1)))	{	    found = P4_TRUE;	    *req_type = tmsg->type;	    *req_from = tmsg->from;	}	queue_p4_message(tmsg, p4_local->queued_messages);    }#   endif#   if defined(CAN_DO_SOCKET_MSGS)    while (!found && socket_msgs_available())    {	tmsg = socket_recv( P4_FALSE );	if (tmsg) {	    if (((tmsg->type == *req_type) || (*req_type == -1)) &&		((tmsg->from == *req_from) || (*req_from == -1)))		{		found	  = P4_TRUE;		*req_type = tmsg->type;		*req_from = tmsg->from;		}	    queue_p4_message(tmsg, p4_local->queued_messages);	    }    }#   endif#   if defined(CAN_DO_CUBE_MSGS)    while (!found && MD_cube_msgs_available())    {	tmsg = MD_cube_recv();	if (((tmsg->type == *req_type) || (*req_type == -1)) &&	    ((tmsg->from == *req_from) || (*req_from == -1)))	{	    found = P4_TRUE;	    (*req_type) = tmsg->type;	    *req_from = tmsg->from;	}	queue_p4_message(tmsg, p4_local->queued_messages);    }#   endif#if defined(CAN_DO_SWITCH_MSGS)    if (!found && (p4_global->proctable[p4_local->my_id].switch_port != -1))    {	int len;	if (sw_probe(req_from, p4_local->my_id, req_type, &len))	    found = P4_TRUE;    }#endif#if defined(CAN_DO_TCMP_MSGS)    if (!found && MD_tcmp_msgs_available(req_from,req_type))	found = P4_TRUE;#endif    if (!found) {	int i;	/* See if a connection has died ... */	for (i = 0; i < p4_global->num_in_proctable; i++)	    {	    if (p4_local->conntab[i].type == CONN_REMOTE_DYING) {		/* We need to detect that some are down... */		p4_error( "Found a dead connection while looking for messages",			   i );		}	    }	}    ALOG_LOG(p4_local->my_id,END_WAIT,1,"");    ALOG_LOG(p4_local->my_id,BEGIN_USER,0,"");    return (found);}				/* p4_messages_available */P4VOID queue_p4_message(msg, hdr)struct p4_msg *msg;struct p4_msg_queue *hdr;{    struct p4_queued_msg *q;    q = alloc_quel();    q->qmsg = msg;    q->next = NULL;    if (hdr->first_msg == NULL)    {	hdr->first_msg = q;    }    else    {	hdr->last_msg->next = q;    }    hdr->last_msg = q;}int send_message(type, from, to, msg, len, data_type, ack_req, p4_buff_ind)char *msg;int type, from, to, len, data_type;P4BOOL ack_req, p4_buff_ind;{    struct p4_msg *tmsg;    int conntype;    conntype = p4_local->conntab[to].type;    p4_dprintfl( 90, "send_message: to = %d, conntype=%d conntype=%s\n",		to, conntype, print_conn_type(conntype));    ALOG_LOG(p4_local->my_id,END_USER,0,"");    ALOG_LOG(p4_local->my_id,BEGIN_SEND,to,"");    switch (conntype)    {      case CONN_ME:	tmsg = get_tmsg(type,from,to,msg,len,data_type,ack_req,p4_buff_ind);	p4_dprintfl(20, "sending msg of type %d to myself\n",type);	queue_p4_message(tmsg, p4_local->queued_messages);	p4_dprintfl(10, "sent msg of type %d to myself\n",type);	break;#ifdef CAN_DO_SHMEM_MSGS      case CONN_SHMEM:	tmsg = get_tmsg(type, from, to, msg, len, data_type,                         ack_req, p4_buff_ind);	shmem_send(tmsg);	break;#endif#ifdef CAN_DO_SOCKET_MSGS      case CONN_REMOTE_OPENING:      case CONN_REMOTE_NON_EST:	if (establish_connection(to))	{	    p4_dprintfl(90, "send_message: conn just estabd to %d\n", to);	}	else	{	    p4_dprintf("send_message: unable to estab conn to %d\n", to);	    ALOG_LOG(p4_local->my_id,END_SEND,to,"");	    ALOG_LOG(p4_local->my_id,BEGIN_USER,0,"");	    return (-1);	}	/* no break; - just fall into connected code */      case CONN_REMOTE_EST:	if (data_type == P4NOX || p4_local->conntab[to].same_data_rep)	{	    socket_send(type, from, to, msg, len, data_type, ack_req);	}	else	{#           ifdef CAN_DO_XDR	    xdr_send(type, from, to, msg, len, data_type, ack_req);#           else	    p4_error("cannot do xdr sends\n",0);#           endif	}	break;#endif      case CONN_REMOTE_DYING:	p4_dprintfl(90, "send_message: proc %d is dying\n", to);	ALOG_LOG(p4_local->my_id,END_SEND,to,"");	ALOG_LOG(p4_local->my_id,BEGIN_USER,0,"");	return (-1);      default:	p4_dprintf("send_message: to=%d; invalid conn type=%d\n",to,conntype);	ALOG_LOG(p4_local->my_id,END_SEND,to,"");	ALOG_LOG(p4_local->my_id,BEGIN_USER,0,"");	return (-1);    }    ALOG_LOG(p4_local->my_id,END_SEND,to,"");    ALOG_LOG(p4_local->my_id,BEGIN_USER,0,"");    return (0);}				/* send_message */struct p4_msg *get_tmsg(type,from,to,msg,len,data_type,ack_req,p4_buff_ind)char *msg;int type, from, to, len, data_type, ack_req, p4_buff_ind;{    struct p4_msg *tmsg;    if (p4_buff_ind)    {	tmsg = (struct p4_msg *) (msg - (sizeof(struct p4_msg) - sizeof(char *)));    }    else    {        tmsg = alloc_p4_msg(len);	if (tmsg == NULL)	{	    p4_dprintf("OOPS! get_tmsg: could not alloc buff **\n");	    return (NULL);	}	bcopy(msg, (char *) &(tmsg->msg), len);    }    tmsg->type	    = type;    tmsg->from	    = from;    tmsg->to	    = to;    tmsg->len	    = len;    tmsg->ack_req   = ack_req;    tmsg->data_type = data_type;    return (tmsg);}char *p4_msg_alloc(msglen)int msglen;{    char *t;    t = (char *) alloc_p4_msg(msglen);    ((struct p4_msg *) t)->msg_id = -1;	/* msg not in use by IPSC isend */    t = t + sizeof(struct p4_msg) - sizeof(char *);    return(t);}P4VOID p4_msg_free(m)char *m;{    char *t;    t = m - (sizeof(struct p4_msg) - sizeof(char *));    ((struct p4_msg *) t)->msg_id = -1;	/* msg not in use by IPSC isend */    free_p4_msg((struct p4_msg *) t);}P4VOID initialize_msg_queue(mq)struct p4_msg_queue *mq;{    mq->first_msg = NULL;    mq->last_msg = NULL;    (P4VOID) p4_moninit(&(mq->m), 1);    p4_lock_init(&(mq->ack_lock));    p4_lock(&(mq->ack_lock));}struct p4_queued_msg *alloc_quel(){    struct p4_queued_msg *q;    p4_lock(&p4_global->avail_quel_lock);    if (p4_global->avail_quel == NULL)    {	q = (struct p4_queued_msg *) p4_shmalloc(sizeof(struct p4_queued_msg));	if (!q)	    p4_error("alloc_quel:  could not allocate queue element",		     sizeof(struct p4_queued_msg));	 p4_dprintfl(50,"malloc'ed new quel at 0x%x\n",q);    }    else    {	q = p4_global->avail_quel;	p4_global->avail_quel = q->next;	p4_dprintfl(50,"reused quel at 0x%x\n",q);    }    p4_unlock(&p4_global->avail_quel_lock);    p4_dprintfl(99,"Unlocked alloc_quel\n");    return (q);}P4VOID free_quel(q)struct p4_queued_msg *q;{    p4_lock(&p4_global->avail_quel_lock);    q->next = p4_global->avail_quel;    p4_global->avail_quel = q;    p4_unlock(&p4_global->avail_quel_lock);    p4_dprintfl(50,"freed quel at 0x%x to avail\n",q);}P4VOID free_avail_quels(){    struct p4_queued_msg *p,*q;    p4_lock(&p4_global->avail_quel_lock);    p = p4_global->avail_quel;    while (p)    {	q = p->next;	p4_dprintfl(50,"really freed quel at 0x%x\n",p);	p4_shfree(p);	p = q;    }    p4_unlock(&p4_global->avail_quel_lock);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -