📄 msq.c
字号:
{ int id; if (! amp_get_old_index(&id)) { fprintf(stderr,"<amp_set_seq_num> Unknown thread\n"); return; } amp_wack_array[id].sequence_num = sequence;}/* * process ack message. NO_LWP */amp_pack_msg(transp) SVCXPRT *transp;{ S_AMP_ACK ack_rec; if (svc_getargs(transp, xdr_amp_ack, &ack_rec)) { return(amp_check_ack_msg(&ack_rec)); } else return(0);}/* * process ack message. LWP */amp_p_ack_msg(xport) XDR *xport;{ S_AMP_ACK ack_rec; if (xdr_amp_ack(xport, &ack_rec)) { return(amp_check_ack_msg(&ack_rec)); } else return(0);}/* * Process the acknowledgement message. Delete the message from * the wack array.*/amp_check_ack_msg(ack_rec)S_AMP_ACK *ack_rec;{ int i, id, seq; if (ack_rec == NULL) { fprintf(stderr,"<***amp_check_ack_msg> NULL pointer\n"); return(0); } if (! amp_get_old_index(&id)) { fprintf(stderr,"<***amp_check_ack_msg> Unknown thread\n"); return(0); } for (i=0; i<AMP_MAX_ARRAY_SIZE; i++) { if (amp_wack_array[id].msg_a[i].rec != NULL && amp_wack_array[id].msg_a[i].stat == IN_USE && amp_wack_array[id].msg_a[i].msg_cb != NULL) { seq = amp_wack_array[id].msg_a[i].rec->amp_msg->sequence_num; if ((ack_rec->message_num == amp_wack_array[id].msg_a[i].msg_cb->message_num) && (ack_rec->sequence_num == seq)) { amp_delete(id, i); return(1); } } } fprintf(stderr,"amp_check_ack_msg: Seq %d, Msg %d not found in wack array\n", ack_rec->sequence_num, ack_rec->message_num); return(0);}/* * Deletes message info from the wack array.*/staticamp_delete(id, i)int id;int i;{ amp_wack_array[id].msg_a[i].stat = EMPTY; if (amp_wack_array[id].msg_a[i].msg_cb != NULL) { if (amp_wack_array[id].msg_a[i].msg_cb->data != NULL) free (amp_wack_array[id].msg_a[i].msg_cb->data); free(amp_wack_array[id].msg_a[i].msg_cb); } if (amp_wack_array[id].msg_a[i].rec != NULL) { if (amp_wack_array[id].msg_a[i].rec->amp_msg != NULL) { if (amp_wack_array[id].msg_a[i].rec->amp_msg->in != NULL) free (amp_wack_array[id].msg_a[i].rec->amp_msg->in); free (amp_wack_array[id].msg_a[i].rec->amp_msg); } free(amp_wack_array[id].msg_a[i].rec); } amp_wack_array[id].msg_a[i].msg_cb = NULL; amp_wack_array[id].msg_a[i].rec = NULL;}/* * assigns a new sequence number */staticamp_get_seq_num(id)int id;{ if (++(amp_wack_array[id].sequence_num) >= AMP_MAX_SEQ_NUM) amp_wack_array[id].sequence_num = 0; return(amp_wack_array[id].sequence_num);}/* * return current sequence number */amp_get_current_seq_num(){ int id; if (amp_get_old_index(&id)) return(amp_wack_array[id].sequence_num); else return(-1);}/* * adds the message info to the wack array*/staticamp_addq(id, rec, msg_cb)int id;S_AMP_MSG_REC *rec;S_AMP_MSG_CB *msg_cb;{ int i; for (i=0; i<AMP_MAX_ARRAY_SIZE; i++) { if (amp_wack_array[id].msg_a[i].stat == EMPTY) { amp_wack_array[id].msg_a[i].stat = IN_USE; amp_wack_array[id].msg_a[i].rec = rec; msg_cb->amp_array_index = i; amp_wack_array[id].msg_a[i].msg_cb = msg_cb; return(1); } } fprintf(stderr,"***Array is full, can't add message: %d\n", msg_cb->message_num); return(0);}/* * determines whether or not the wack array is empty.*/amp_empty_array(){ int i, id; if (! amp_get_old_index(&id)) { fprintf(stderr,"<***amp_empty_array> Unknown thread\n"); return(0); } for (i=0; i<AMP_MAX_ARRAY_SIZE; i++) if (amp_wack_array[id].msg_a[i].stat == IN_USE) return(0); return(1);}/* * prints out all amp thread wack arrays*/voidamp_print_all_array(){ int i; for (i=0; i<AMP_MAX_WACK_ARRAY_SIZE; i++) if (amp_wack_array[i].key != -1) { fprintf(stderr,"======AMP_WACK_ARRAY[%d]========\n",i); amp_print_a(i); }}/* * prints out the current thread wack array*/voidamp_print_array(){ int id; if (amp_get_old_index(&id)) amp_print_a(id); else fprintf(stderr,"<***amp_print_array> Unknown thread\n");}/* * prints out the wack array of thread id*/staticamp_print_a(id)int id;{ int i, seq; for (i=0; i<AMP_MAX_ARRAY_SIZE; i++) { if (amp_wack_array[id].msg_a[i].stat == IN_USE) { if (amp_wack_array[id].msg_a[i].rec != NULL) { fprintf(stderr,"====Array: %d==========\n", i); fprintf(stderr,"client address: %x\n", amp_wack_array[id].msg_a[i].rec->client); fprintf(stderr,"in: %x\tin_proc: %x\n", amp_wack_array[id].msg_a[i].rec->amp_msg->in, amp_wack_array[id].msg_a[i].rec->amp_msg->inproc); fprintf(stderr,"timestamp: %ld\n", amp_wack_array[id].msg_a[i].rec->timestamp); seq = amp_wack_array[id].msg_a[i].rec->amp_msg->sequence_num; } if (amp_wack_array[id].msg_a[i].msg_cb != NULL) { fprintf(stderr,"message: %d\tseq: %d\n", amp_wack_array[id].msg_a[i].msg_cb->message_num,seq); } } }}/* * sends out a message*/amp_rpc_send(clnt, in, in_size, inproc, msg_cb)CLIENT *clnt;char *in;int in_size;xdrproc_t inproc;S_AMP_MSG_CB *msg_cb;{ S_AMP_MSG *msg; S_AMP_MSG_REC *rec; S_AMP_MSG_CB *cb; int clnt_sock = -1, stat; struct timeval tp; struct timezone tzp; int status = 1, id; int seq, cindex; if (clnt == NULL) { fprintf(stderr,"***amp_rpc_send: client is NULL\n"); return(0); } if (msg_cb == NULL) { fprintf(stderr,"***amp_rpc_send: msg_cb is NULL\n"); return(0); } if (! amp_get_old_index(&id)) { fprintf(stderr,"<amp_rpc_send> Unknown thread\n"); return(0); } /* * Gets the sequence number for this message */ if (amp_wack_array[id].retransmit) { cindex = amp_wack_array[id].current_que_index; msg = amp_wack_array[id].msg_a[cindex].rec->amp_msg; } else if ((msg = (S_AMP_MSG *)malloc(sizeof(S_AMP_MSG))) != NULL) { msg->sequence_num = amp_get_seq_num(id); if (in_size > 0) { msg->in = (char *)malloc(in_size); bcopy(in, msg->in, in_size); } else msg->in = NULL; msg->inproc = inproc; msg->in_size = in_size; } if (msg == NULL) return(0); if (amp_debug) fprintf(stderr,"amp[%d] sending message: %d\tSequence: %d\n", id, msg_cb->message_num, msg->sequence_num); gettimeofday(&tp, &tzp); /* * Puts the out going message into the wack array */ if (! amp_wack_array[id].retransmit) { rec = (S_AMP_MSG_REC *)malloc(sizeof(S_AMP_MSG_REC)); rec->client = clnt; rec->transmit_num = 0; rec->timestamp = tp.tv_sec; rec->amp_msg = msg; if (amp_debug) fprintf(stderr,"amp_rpc_send amp[%d] Msg: %d\tSeq: %d CTime:%s\n", id, msg_cb->message_num, msg->sequence_num, ctime(&tp.tv_sec)); cb = (S_AMP_MSG_CB *)malloc(sizeof(S_AMP_MSG_CB)); cb->message_num = msg_cb->message_num; cb->max_wack = msg_cb->max_wack; cb->max_retransmit = msg_cb->max_retransmit; cb->action = msg_cb->action; if (msg_cb->action != NULL && msg_cb->data != NULL && msg_cb->data_size > 0) { cb->data = (char *)malloc(msg_cb->data_size); cb->data_size = msg_cb->data_size; bcopy(msg_cb->data, cb->data, msg_cb->data_size); } else cb->data = NULL; amp_addq(id, rec, cb); } else if (amp_wack_array[id].retransmit) { cindex = amp_wack_array[id].current_que_index; amp_wack_array[id].msg_a[cindex].rec->timestamp = tp.tv_sec; amp_wack_array[id].msg_a[cindex].rec->transmit_num++; } stat = udpcall(clnt, msg_cb->message_num, xdr_amp_msg, msg, xdr_void, NULL, AMP_UDP_TIMEOUT); if (stat != RPC_TIMEDOUT && stat != RPC_SUCCESS) { fprintf(stderr,"***udpcall msg %d failed with state %d\n", msg_cb->message_num, stat); return(0); } return(1);}/* * xdr routine for the acknowledgement message*/xdr_amp_ack(xdrsp, rec) XDR *xdrsp; struct amp_ack *rec;{ if (!xdr_int(xdrsp, &rec->sequence_num)) return(0); if (!xdr_u_long(xdrsp, &rec->message_num)) return(0); return(1);}/* * xdr routine for passing amp data structure*/xdr_amp_msg(xdrsp, rec) XDR *xdrsp; struct amp_msg *rec;{ if (!xdr_int(xdrsp, &rec->sequence_num)) return(0); return((*rec->inproc)(xdrsp, rec->in));}/* * Receives an in-coming message*/#ifndef NO_LWPamp_msg_recv(sender, arg, argsize, res, ressize, timeout)thread_t *sender;caddr_t *arg;int *argsize;caddr_t *res;int *ressize;struct timeval *timeout;{ int status, len; struct amp_msg_t_msg *ptr; int ack=1; thread_t ssave; ssave = *sender; while (ack) { *sender = ssave; status = msg_recv(sender, arg, argsize, res, ressize, timeout); if (status == 0) { ptr = (struct amp_msg_t_msg *)*arg; if (ack = amp_p_msg(ptr)) { if (amp_debug>1) fprintf(stderr,"<amp_msg_recv> Send reply for ACK: %d\n", *(u_long *)ptr); msg_reply(*sender); } } else if ((status == -1) && (lwp_geterr() == LE_TIMEOUT)) { ack = 0; if (amp_debug>1) fprintf(stderr,"<amp_msg_recv> TIMEOUT. Check elapsed time\n"); amp_elapsed_time(); } } return(status);}/* * Receives an in-coming message from any sender*/amp_msg_recv_all(sender, arg, argsize, res, ressize, timeout)thread_t *sender;caddr_t *arg;int *argsize;caddr_t *res;int *ressize;struct timeval *timeout;{ *sender = THREADNULL; return(amp_msg_recv(sender, arg, argsize, res, ressize, timeout));}#endif/* * process the in-coming message.*/static intamp_p_msg(ptr) struct amp_msg_t_msg *ptr;{ u_long msg; msg = *(u_long *)ptr; if (is_ack_msg(msg)) { S_AMP_ACK ack_rec; SVCXPRT *transp; S_AMP_MSG_T_MSG *rec; XDR *xport; rec = (S_AMP_MSG_T_MSG *)ptr; xport = (XDR *)rec->xport; if (xdr_amp_ack(xport, &ack_rec)) amp_check_ack_msg(&ack_rec); return(1); /* ack msg */ } else { amp_elapsed_time(); return(0); /* not an ack */ }}staticis_ack_msg(msg)u_long msg;{ if (msg >= 30000) return(0); if (msg % 100 == 0) return(1); else return(0);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -