⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 msq.c

📁 操作系统SunOS 4.1.3版本的源码
💻 C
📖 第 1 页 / 共 2 页
字号:
{  int id;  if (! amp_get_old_index(&id)) {    fprintf(stderr,"<amp_set_seq_num> Unknown thread\n");    return;  }  amp_wack_array[id].sequence_num = sequence;}/* * process ack message. NO_LWP */amp_pack_msg(transp)     SVCXPRT *transp;{  S_AMP_ACK ack_rec;  if (svc_getargs(transp, xdr_amp_ack, &ack_rec)) {    return(amp_check_ack_msg(&ack_rec));  }  else    return(0);}/* * process ack message. LWP */amp_p_ack_msg(xport)     XDR *xport;{  S_AMP_ACK ack_rec;  if (xdr_amp_ack(xport, &ack_rec)) {    return(amp_check_ack_msg(&ack_rec));  }  else    return(0);}/* * Process the acknowledgement message.  Delete the message from * the wack array.*/amp_check_ack_msg(ack_rec)S_AMP_ACK *ack_rec;{  int i, id, seq;  if (ack_rec == NULL) {    fprintf(stderr,"<***amp_check_ack_msg> NULL pointer\n");    return(0);  }  if (! amp_get_old_index(&id)) {    fprintf(stderr,"<***amp_check_ack_msg> Unknown thread\n");    return(0);  }  for (i=0; i<AMP_MAX_ARRAY_SIZE; i++) {    if (amp_wack_array[id].msg_a[i].rec != NULL && 	amp_wack_array[id].msg_a[i].stat == IN_USE &&	amp_wack_array[id].msg_a[i].msg_cb != NULL) {      seq = amp_wack_array[id].msg_a[i].rec->amp_msg->sequence_num;      if ((ack_rec->message_num == 	   amp_wack_array[id].msg_a[i].msg_cb->message_num) && 	  (ack_rec->sequence_num == seq)) {	amp_delete(id, i);	return(1);      }    }  }  fprintf(stderr,"amp_check_ack_msg: Seq %d, Msg %d not found in wack array\n",	  ack_rec->sequence_num, ack_rec->message_num);  return(0);}/* * Deletes message info from the wack array.*/staticamp_delete(id, i)int id;int i;{  amp_wack_array[id].msg_a[i].stat = EMPTY;  if (amp_wack_array[id].msg_a[i].msg_cb != NULL) {    if (amp_wack_array[id].msg_a[i].msg_cb->data != NULL)      free (amp_wack_array[id].msg_a[i].msg_cb->data);    free(amp_wack_array[id].msg_a[i].msg_cb);  }  if (amp_wack_array[id].msg_a[i].rec != NULL) {    if (amp_wack_array[id].msg_a[i].rec->amp_msg != NULL) {      if (amp_wack_array[id].msg_a[i].rec->amp_msg->in != NULL)        free (amp_wack_array[id].msg_a[i].rec->amp_msg->in);      free (amp_wack_array[id].msg_a[i].rec->amp_msg);    }    free(amp_wack_array[id].msg_a[i].rec);  }  amp_wack_array[id].msg_a[i].msg_cb = NULL;  amp_wack_array[id].msg_a[i].rec = NULL;}/* * assigns a new sequence number */staticamp_get_seq_num(id)int id;{  if (++(amp_wack_array[id].sequence_num) >= AMP_MAX_SEQ_NUM)    amp_wack_array[id].sequence_num = 0;  return(amp_wack_array[id].sequence_num);}/* * return current sequence number */amp_get_current_seq_num(){  int id;  if (amp_get_old_index(&id))    return(amp_wack_array[id].sequence_num);  else    return(-1);}/* * adds the message info to the wack array*/staticamp_addq(id, rec, msg_cb)int id;S_AMP_MSG_REC *rec;S_AMP_MSG_CB *msg_cb;{  int i;  for (i=0; i<AMP_MAX_ARRAY_SIZE; i++) {    if (amp_wack_array[id].msg_a[i].stat == EMPTY) {      amp_wack_array[id].msg_a[i].stat = IN_USE;      amp_wack_array[id].msg_a[i].rec = rec;      msg_cb->amp_array_index = i;      amp_wack_array[id].msg_a[i].msg_cb = msg_cb;      return(1);    }  }  fprintf(stderr,"***Array is full, can't add message: %d\n",	  msg_cb->message_num);  return(0);}/* * determines whether or not the wack array is empty.*/amp_empty_array(){  int i, id;  if (! amp_get_old_index(&id)) {    fprintf(stderr,"<***amp_empty_array> Unknown thread\n");    return(0);  }  for (i=0; i<AMP_MAX_ARRAY_SIZE; i++)    if (amp_wack_array[id].msg_a[i].stat == IN_USE)      return(0);  return(1);}/* * prints out all amp thread wack arrays*/voidamp_print_all_array(){  int i;  for (i=0; i<AMP_MAX_WACK_ARRAY_SIZE; i++)    if (amp_wack_array[i].key != -1) {      fprintf(stderr,"======AMP_WACK_ARRAY[%d]========\n",i);      amp_print_a(i);    }}/* * prints out the current thread wack array*/voidamp_print_array(){  int id;  if (amp_get_old_index(&id))    amp_print_a(id);  else    fprintf(stderr,"<***amp_print_array> Unknown thread\n");}/* * prints out the wack array of thread id*/staticamp_print_a(id)int id;{    int i, seq;    for (i=0; i<AMP_MAX_ARRAY_SIZE; i++) {      if (amp_wack_array[id].msg_a[i].stat == IN_USE) {	if (amp_wack_array[id].msg_a[i].rec != NULL) {	    fprintf(stderr,"====Array: %d==========\n", i);	    fprintf(stderr,"client address: %x\n", 		    amp_wack_array[id].msg_a[i].rec->client);	    fprintf(stderr,"in: %x\tin_proc: %x\n", 		    amp_wack_array[id].msg_a[i].rec->amp_msg->in, 		    amp_wack_array[id].msg_a[i].rec->amp_msg->inproc);	    fprintf(stderr,"timestamp: %ld\n",		    amp_wack_array[id].msg_a[i].rec->timestamp);	    seq = amp_wack_array[id].msg_a[i].rec->amp_msg->sequence_num;	  }	if (amp_wack_array[id].msg_a[i].msg_cb != NULL) {	  fprintf(stderr,"message: %d\tseq: %d\n",		  amp_wack_array[id].msg_a[i].msg_cb->message_num,seq);	}      }    }}/* * sends out a message*/amp_rpc_send(clnt, in, in_size, inproc, msg_cb)CLIENT *clnt;char *in;int in_size;xdrproc_t inproc;S_AMP_MSG_CB *msg_cb;{    S_AMP_MSG     *msg;    S_AMP_MSG_REC *rec;    S_AMP_MSG_CB *cb;    int clnt_sock = -1, stat;    struct timeval tp;    struct timezone tzp;    int    status = 1, id;    int    seq, cindex;    if (clnt == NULL) {      fprintf(stderr,"***amp_rpc_send: client is NULL\n");      return(0);    }    if (msg_cb == NULL) {      fprintf(stderr,"***amp_rpc_send: msg_cb is NULL\n");      return(0);    }    if (! amp_get_old_index(&id)) {      fprintf(stderr,"<amp_rpc_send> Unknown thread\n");      return(0);    }    /*     * Gets the sequence number for this message     */    if (amp_wack_array[id].retransmit) {      cindex = amp_wack_array[id].current_que_index;      msg = amp_wack_array[id].msg_a[cindex].rec->amp_msg;    }    else if ((msg = (S_AMP_MSG *)malloc(sizeof(S_AMP_MSG))) != NULL) {      msg->sequence_num = amp_get_seq_num(id);      if (in_size > 0) {	msg->in = (char *)malloc(in_size);	bcopy(in, msg->in, in_size);      }      else msg->in = NULL;      msg->inproc = inproc;      msg->in_size = in_size;    }    if (msg == NULL)      return(0);    if (amp_debug)	fprintf(stderr,"amp[%d] sending message: %d\tSequence: %d\n", 		id, msg_cb->message_num, msg->sequence_num);    gettimeofday(&tp, &tzp);    /*     * Puts the out going message into the wack array     */    if (! amp_wack_array[id].retransmit) {      rec = (S_AMP_MSG_REC *)malloc(sizeof(S_AMP_MSG_REC));      rec->client = clnt;      rec->transmit_num = 0;      rec->timestamp = tp.tv_sec;      rec->amp_msg = msg;      if (amp_debug)	fprintf(stderr,"amp_rpc_send amp[%d] Msg: %d\tSeq: %d CTime:%s\n", 	    id, msg_cb->message_num, msg->sequence_num, ctime(&tp.tv_sec));      cb = (S_AMP_MSG_CB *)malloc(sizeof(S_AMP_MSG_CB));      cb->message_num = msg_cb->message_num;      cb->max_wack = msg_cb->max_wack;      cb->max_retransmit = msg_cb->max_retransmit;      cb->action = msg_cb->action;      if (msg_cb->action != NULL && msg_cb->data != NULL 	  && msg_cb->data_size > 0) {	cb->data = (char *)malloc(msg_cb->data_size);	cb->data_size = msg_cb->data_size;	bcopy(msg_cb->data, cb->data, msg_cb->data_size);      }      else cb->data = NULL;      amp_addq(id, rec, cb);    }    else if (amp_wack_array[id].retransmit) {      cindex = amp_wack_array[id].current_que_index;      amp_wack_array[id].msg_a[cindex].rec->timestamp = tp.tv_sec;      amp_wack_array[id].msg_a[cindex].rec->transmit_num++;    }    stat = udpcall(clnt, msg_cb->message_num, xdr_amp_msg, 		   msg, xdr_void, NULL, AMP_UDP_TIMEOUT);    if (stat != RPC_TIMEDOUT && stat != RPC_SUCCESS) {      fprintf(stderr,"***udpcall msg %d failed with state %d\n",	      msg_cb->message_num, stat);      return(0);    }     return(1);}/* * xdr routine for the acknowledgement message*/xdr_amp_ack(xdrsp, rec)     XDR *xdrsp;     struct amp_ack *rec;{  if (!xdr_int(xdrsp, &rec->sequence_num))    return(0);  if (!xdr_u_long(xdrsp, &rec->message_num))    return(0);  return(1);}/* * xdr routine for passing amp data structure*/xdr_amp_msg(xdrsp, rec)     XDR *xdrsp;     struct amp_msg *rec;{  if (!xdr_int(xdrsp, &rec->sequence_num))    return(0);  return((*rec->inproc)(xdrsp, rec->in));}/* * Receives an in-coming message*/#ifndef NO_LWPamp_msg_recv(sender, arg, argsize, res, ressize, timeout)thread_t *sender;caddr_t *arg;int *argsize;caddr_t *res;int *ressize;struct timeval *timeout;{    int status, len;    struct amp_msg_t_msg *ptr;    int ack=1;    thread_t ssave;    ssave = *sender;    while (ack) {      *sender = ssave;      status = msg_recv(sender, arg, argsize, res, ressize, timeout);            if (status == 0) {	ptr = (struct amp_msg_t_msg *)*arg;	if (ack = amp_p_msg(ptr)) {	  if (amp_debug>1)	     fprintf(stderr,"<amp_msg_recv> Send reply for ACK: %d\n",			*(u_long *)ptr);		  msg_reply(*sender);	}      }      else if ((status == -1) && (lwp_geterr() == LE_TIMEOUT)) {	ack = 0;	if (amp_debug>1)	  fprintf(stderr,"<amp_msg_recv> TIMEOUT.  Check elapsed time\n");	amp_elapsed_time();      }    }    return(status);}/* * Receives an in-coming message from any sender*/amp_msg_recv_all(sender, arg, argsize, res, ressize, timeout)thread_t *sender;caddr_t *arg;int *argsize;caddr_t *res;int *ressize;struct timeval *timeout;{  *sender = THREADNULL;    return(amp_msg_recv(sender, arg, argsize, res, ressize, timeout));}#endif/* * process the in-coming message.*/static intamp_p_msg(ptr)     struct amp_msg_t_msg *ptr;{  u_long msg;  msg = *(u_long *)ptr;      if (is_ack_msg(msg)) {    S_AMP_ACK ack_rec;    SVCXPRT   *transp;    S_AMP_MSG_T_MSG *rec;    XDR       *xport;    rec = (S_AMP_MSG_T_MSG *)ptr;    xport = (XDR *)rec->xport;    if (xdr_amp_ack(xport, &ack_rec))      amp_check_ack_msg(&ack_rec);    return(1);  /* ack msg */  }  else {    amp_elapsed_time();    return(0);  /* not an ack */  }}staticis_ack_msg(msg)u_long msg;{  if (msg >= 30000)    return(0);  if (msg % 100 == 0)    return(1);  else    return(0);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -