📄 msq.c
字号:
#ifndef lintstatic char sccsid[] = "@(#)ATS msq.c 1.1 92/07/30 Copyright Sun Microsystems Inc.";#endif#define NO_LWP 1/* -JCH- the only difference between amp.c(ATS) and msq.c(Sundiag) *//** * ATS Message Protocol (AMP): * * FUNCTION: * * AMP manages all message traffic among ATS processes. All in-coming * messages are passed to the destination immediately. All out-going * messages are sent out immediately and they are also placed on the * waiting-for-ack (wack) array. An acknowledgement is required for * all out-going messages (except AMP_ACK_MESSAGE). If an acknowledgement * is not received by the sender after a specified timeout, the message * is retransmitted until a maximum transmission is reached. After * a maximum transmission (user defined), amp will execuate an error * handler if there is any (user supplied). * * AMP is capable of manages multiple thread processes. Each thread * process has its own amp resources. * * If you are not using the lwp (lightweight process library), you should * define NO_LWP in the amp.c or at compile time. * * You need to include "amp.h" file in your program. * * EXPORTED ROUTINES: * * amp_initialize() - allocates amp resources, must be called first; * Return: 1 - success; * 0 - failed; * * amp_exit() - exits from amp, must be called; * Return: 1 - success; * 0 - failed; * * Sends rpc message. * amp_rpc_send(client, in, in_size, inproc, msg_cb) * CLIENT *client; - destination of this message; * char *in; - data to be send to the receiver; * int in_size; - data size of in; * xdrproc_t inproc; - xdr routine to be used to encode the data; * S_AMP_MSG_CB *msg_cb - it contains user defined number of * retransmission, timeout for ack, * error handler, etc. * * Return: 1 - success; * 0 - failed; * Example: * * f() * { * S_AMP_MSG_CB msg_cb; * int i, *index; * void msg_timeout_handler(); * * i = 5; * *index=1; * msg_cb.max_wack = 60; { wait 60 seconds for the ack } * msg_cb.max_retransmit = 10; { only retransmit 10 times } * msg_cb.action = msg_timeout_handler;{call action after max trans} * msg_cb.data = (char *)(index); {user supplied data. } * msg_cb.data_size = sizeof(int); {size of user supplied data } * msg_cb.message_num = 100; {message number to be transmitted} * amp_rpc_send(client, &i, sizeof(int), xdr_int, &msg_cb); * } * * void msg_timeout_handler(ptr, data) * S_AMP_MSG_ARRAY *ptr; * char *data; * { * int index; * * index = *(int *)data; * printf("Message: %d, Seq: %d timeout for index: %d\n", * ptr->msg_cb->message_num, ptr->rec->amp_msg->sequence_num, * index); * } * * * amp_send_ack(client, ack_msg_num, procnum, sequence_num) * CLIENT *client; - destination of this message; * u_long ack_msg_num; - acknowledgement message number; * u_long procnum; - acknowledge message <procnum> and * int sequence_num; - <sequence_num>. * * Return: 1 - success; * 0 - failed; * * amp_transp_send_ack(client, ack_msg_num, rqstp, xport) * CLIENT *client; - destination of this message; * u_long ack_msg_num; - acknowledgement message number; * struct svc_req *rqstp; - pointer to service request. It contains * the message number to be acknowledged. * XDR *xport; - transport pointer. It contains the sequence * number of the message to be acknowledged. * The sequence number MUST be the first field * in the data structure. * * Check message array. * amp_elapsed_time() - determines whether or not any message need * to be retransmitted. * * Return: None; * * Check acknowledgement messages. If the message is in the message * array, it is deleted from the array. * * amp_check_ack_msg(ack_rec) * S_AMP_ACK *ack_rec; - contains the message number and the sequence * number of the message. * * amp_p_ack_msg(xport) * XDR *xport; - the transp structure contains message in the * ack_msg format. Use xdr_amp_ack to decode * the structure. * * Check whether the amp message array is empty or not. * amp_empty_array() - Return: 1 - Empty; 0 - Not Empty; * * Prints out all messages in the message array. * amp_print_array() - print out messages in the current thread; * amp_print_all_array() - print out all messages in all thread amp. * * AMP ack xdr routine. Used to encode and decode amp ack message. * The first field in the data structure is the sequence number and the * second is the message number. * * xdr_amp_ack(xdrsp, rec) * * * Set the message sequence number: * amp_set_seq_num(seq) * int seq; - starts the message sequence number from <sq> * * Receive in-coming messages. It is used for lwp thread. For non-lwp, * use select(...) to received any in coming messages. If the message * is an ack message, the ack message is processed and it is not passed * to the application program. The message array is checked to determine * whether or not any message needs to be retransmitted. The message is * passed to the application program, except for the ack message. * * Receive messages from all threads. * * amp_msg_recv_all(sender, arg, argsize, res, ressize, timeout) * thread_t *sender; * caddr_t *arg; * int *argsize; * caddr_t *res; * int *ressize; * struct timeval *timeout; * * Receive messages from the given thread. * amp_msg_recv(sender, arg, argsize, res, ressize, timeout) * * Return: 0 - success; * -1 - timeout; * * Flush the message array. * amp_flush_msg_array() - flushs out the wack array; * Return: 1 - success; * 0 - failed;*/ #include <stdio.h>#include <sys/types.h> /* for stat system call */#include <sys/stat.h> /* for stat system call */#include <sys/wait.h>#include <signal.h>#include <ctype.h>#include <errno.h>#include <rpc/rpc.h>#ifndef NO_LWP#include <lwp/lwp.h>#include <lwp/stackdep.h>#include <lwp/lwperror.h>#endif#ifdef SunOS4#include <rpc/svc.h>#endif#include <sys/time.h>#include <sys/socket.h>#include <netdb.h>#include <string.h> #include "amp.h"#ifdef NO_LWP #define SAME_KEY 100#define SAME_INDEX 0#endif#ifndef NO_LWP#define AMP_MAX_WACK_ARRAY_SIZE 100#else#define AMP_MAX_WACK_ARRAY_SIZE 1#endif#define AMP_MAX_ARRAY_SIZE 100#define AMP_MAX_SEQ_NUM 9000#define AMP_UDP_TIMEOUT 0typedef struct amp_wack_a { int key; enum msg_array_stat stat; int sequence_num; int retransmit; int current_que_index; struct amp_msg_array msg_a[AMP_MAX_ARRAY_SIZE];} S_AMP_WACK_A;int amp_debug = 0;struct amp_wack_a amp_wack_array[AMP_MAX_WACK_ARRAY_SIZE];int xdr_amp_ack();int xdr_amp_msg();extern char *sys_errlist[];/* * Empty the wack message array.*/amp_flush_msg_array(){ int i, id; if (amp_get_old_index(&id)) { /* thread id exist in array */ for (i=0; i<AMP_MAX_ARRAY_SIZE; i++) amp_delete(id, i); }}/* * Allocates amp resources for the current process thread*/amp_initialize(){ int i, id, key; if (amp_get_new_index(&id, &key)) { amp_wack_array[id].key = key; amp_wack_array[id].stat = IN_USE; amp_wack_array[id].retransmit = 0; amp_wack_array[id].current_que_index = 0; for (i=0; i<AMP_MAX_ARRAY_SIZE; i++) amp_wack_array[id].msg_a[i].stat = EMPTY; return(1); }}/* * Deallocates amp resources of the current process thread*/amp_exit(){ int i, id; if (amp_get_old_index(&id)) { amp_wack_array[id].stat = EMPTY; for (i=0; i<AMP_MAX_ARRAY_SIZE; i++) amp_delete(id, i); }}/* * Get the wack array index for the current process thread*/staticamp_get_old_index(id)int *id;{ int i, key;#ifndef NO_LWP thread_t c; lwp_self(&c); key = c.thread_key; for (i=0; i<AMP_MAX_WACK_ARRAY_SIZE; i++) if (amp_wack_array[i].key == key) { *id = i; return(1); } return(0);#else *id = SAME_INDEX; return(1);#endif}/* * Assign a new wack array index for the current process thread*/staticamp_get_new_index(id, key)int *id, *key;{ int i, temp = -1;#ifndef NO_LWP thread_t c; lwp_self(&c); *key = c.thread_key; for (i=0; i<AMP_MAX_WACK_ARRAY_SIZE; i++) if (amp_wack_array[i].key == *key) { *id = i; return(1); } else if (amp_wack_array[i].stat == EMPTY) temp = i; if (temp == -1) { /* array is full */ *id = -1; return(0); } *id = temp;#else /* doesn't use lightweight process */ *key = SAME_KEY; *id = SAME_INDEX;#endif return(1);}/* * Sends out an acknowledgement for message <procnum> and * sequence <sequence_num>.*/amp_send_ack(client, ack_msg_num, procnum, sequence_num)CLIENT *client;u_long ack_msg_num;u_long procnum;int sequence_num;{ struct amp_ack ack_struct; enum clnt_stat status; ack_struct.message_num = procnum; ack_struct.sequence_num = sequence_num; status = udpcall(client, ack_msg_num, xdr_amp_ack, &ack_struct, xdr_void, NULL, AMP_UDP_TIMEOUT); if (status == RPC_SUCCESS || status == RPC_TIMEDOUT) return(1); else return(0);}/* * Sends out an acknowledgement for message <procnum> and * sequence <sequence_num>.*/amp_transp_send_ack(client, ack_msg_num, rqstp, xport) CLIENT *client; u_long ack_msg_num; struct svc_req *rqstp; XDR *xport;{ struct amp_ack ack_struct; enum clnt_stat status; int seq; xdr_int(xport, &seq); ack_struct.message_num = rqstp->rq_proc; ack_struct.sequence_num = seq; status = udpcall(client, ack_msg_num, xdr_amp_ack, &ack_struct, xdr_void, NULL, AMP_UDP_TIMEOUT); if (status == RPC_SUCCESS || status == RPC_TIMEDOUT) return(1); else return(0);}/* * Checks whether or not any message in the current thread * needs to retransmitted.*/voidamp_elapsed_time(){ struct timeval tp; struct timezone tzp; int elapsed_time, i, id; S_AMP_MSG_CB *msg_cb; S_AMP_ACK ack_rec; if (! amp_get_old_index(&id)) { fprintf(stderr,"<***amp_elapsed_time> Unknown thread: %d\n", id); return; } gettimeofday(&tp, &tzp); for (i=0; i<AMP_MAX_ARRAY_SIZE; i++) { if ((amp_wack_array[id].msg_a[i].stat == IN_USE) && (amp_wack_array[id].msg_a[i].rec != NULL) && (amp_wack_array[id].msg_a[i].msg_cb != NULL)) { if ((elapsed_time = tp.tv_sec - amp_wack_array[id].msg_a[i].rec->timestamp) >= amp_wack_array[id].msg_a[i].msg_cb->max_wack) {/*send msg */ msg_cb = amp_wack_array[id].msg_a[i].msg_cb; if (amp_wack_array[id].msg_a[i].rec->transmit_num >= msg_cb->max_retransmit) { /* * AMP attempted to transmit the message MAX_RETRANSMIT times * without any success. The user supplied error hanler is * called. * The message is then taking off the queue. */ if (msg_cb->action != NULL) { S_AMP_MSG_ARRAY *ptr; ptr = &_wack_array[id].msg_a[i]; (*msg_cb->action)(ptr, msg_cb->data); } if (amp_debug) fprintf(stderr, "***Can't send Msg: %ld after %d tries. Remove from msg queue.\n", amp_wack_array[id].msg_a[i].msg_cb->message_num, amp_wack_array[id].msg_a[i].msg_cb->max_retransmit); /* * Take the message off the queue after maximum attempts. */ amp_delete(id, i); continue; } amp_wack_array[id].retransmit = 1; amp_wack_array[id].current_que_index = i; amp_rpc_send(amp_wack_array[id].msg_a[i].rec->client, amp_wack_array[id].msg_a[i].rec->amp_msg->in, amp_wack_array[id].msg_a[i].rec->amp_msg->in_size, amp_wack_array[id].msg_a[i].rec->amp_msg->inproc, amp_wack_array[id].msg_a[i].msg_cb); if (amp_debug) fprintf(stderr, "amp_elapsed_time Msg: %ld\tSeq: %d\tTimestamp: %s\tCTime: %s\n", amp_wack_array[id].msg_a[i].msg_cb->message_num, amp_wack_array[id].msg_a[i].rec->amp_msg->sequence_num, ctime(&_wack_array[id].msg_a[i].rec->timestamp), ctime(&tp.tv_sec)); amp_wack_array[id].retransmit = 0; if (amp_debug) fprintf(stderr, "Msg: %ld\tetime: %d\tMax.Trans: %d\tTrans#: %d\n", amp_wack_array[id].msg_a[i].msg_cb->message_num, elapsed_time, amp_wack_array[id].msg_a[i].msg_cb->max_retransmit, amp_wack_array[id].msg_a[i].rec->transmit_num); } } }}/* * Sets the starting sequence number to be used * for the current process thread.*/voidamp_set_seq_num(sequence)int sequence;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -