⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 msq.c

📁 操作系统SunOS 4.1.3版本的源码
💻 C
📖 第 1 页 / 共 2 页
字号:
#ifndef lintstatic  char sccsid[] = "@(#)ATS msq.c 1.1 92/07/30 Copyright Sun Microsystems Inc.";#endif#define	NO_LWP	1/* -JCH- the only difference between amp.c(ATS) and msq.c(Sundiag) *//** * ATS Message Protocol (AMP): * * FUNCTION: * *   AMP manages all message traffic among ATS processes.  All in-coming *   messages are passed to the destination immediately.  All out-going *   messages are sent out immediately and they are also placed on the *   waiting-for-ack (wack) array.  An acknowledgement is required for *   all out-going messages (except AMP_ACK_MESSAGE).  If an acknowledgement *   is not received by the sender after a specified timeout, the message *   is retransmitted until a maximum transmission is reached.  After *   a maximum transmission (user defined), amp will execuate an error *   handler if there is any (user supplied). * *   AMP is capable of manages multiple thread processes.  Each thread *   process has its own amp resources. *    *   If you are not using the lwp (lightweight process library), you should *   define NO_LWP in the amp.c or at compile time. * *   You need to include "amp.h" file in your program. * * EXPORTED ROUTINES: * *   amp_initialize()       -  allocates amp resources, must be called first; *                             Return: 1 - success; *                                     0 - failed; * *   amp_exit()             -  exits from amp, must be called; *                             Return: 1 - success; *                                     0 - failed; * * Sends rpc message. *   amp_rpc_send(client, in, in_size, inproc, msg_cb) *     CLIENT *client;      -  destination of this message; *     char   *in;          -  data to be send to the receiver; *     int    in_size;      -  data size of in; *     xdrproc_t inproc;    -  xdr routine to be used to encode the data; *     S_AMP_MSG_CB *msg_cb -  it contains user defined number of  *                             retransmission, timeout for ack,  *                             error handler, etc. * *                             Return: 1 - success; *                                     0 - failed; *     Example: * *       f() *       { *          S_AMP_MSG_CB  msg_cb; *          int i, *index; *          void msg_timeout_handler(); * *          i = 5; *          *index=1; *          msg_cb.max_wack = 60;      { wait 60 seconds for the ack } *          msg_cb.max_retransmit = 10; { only retransmit 10 times } *          msg_cb.action = msg_timeout_handler;{call action after max trans} *          msg_cb.data = (char *)(index); {user supplied data. } *          msg_cb.data_size = sizeof(int); {size of user supplied data } *          msg_cb.message_num = 100;   {message number to be transmitted} *          amp_rpc_send(client, &i, sizeof(int), xdr_int, &msg_cb); *       } * *       void msg_timeout_handler(ptr, data) *            S_AMP_MSG_ARRAY *ptr; *            char *data; *       { *            int index; *  *            index = *(int *)data; *            printf("Message: %d, Seq: %d timeout for index: %d\n", *	         ptr->msg_cb->message_num, ptr->rec->amp_msg->sequence_num, *	         index); *       } * * *   amp_send_ack(client, ack_msg_num, procnum, sequence_num) *     CLIENT *client;      -  destination of this message; *     u_long ack_msg_num;  -  acknowledgement message number; *     u_long procnum;      -  acknowledge message <procnum> and *     int    sequence_num; -  <sequence_num>. * *                             Return: 1 - success; *                                     0 - failed; * *   amp_transp_send_ack(client, ack_msg_num, rqstp, xport) *     CLIENT *client;      -  destination of this message; *     u_long ack_msg_num;  -  acknowledgement message number; *     struct svc_req *rqstp; - pointer to service request.  It contains *                              the message number to be acknowledged. *     XDR *xport;     -  transport pointer.  It contains the sequence *                             number of the message to be acknowledged.   *                             The sequence number MUST be the first field *                             in the data structure. * * Check message array. *   amp_elapsed_time()     -  determines whether or not any message need *                             to be retransmitted. *                              *                             Return: None; * * Check acknowledgement messages.  If the message is in the message *   array, it is deleted from the array. * *   amp_check_ack_msg(ack_rec) *     S_AMP_ACK *ack_rec;  -  contains the message number and the sequence *                             number of the message. * *   amp_p_ack_msg(xport) *     XDR *xport;     -  the transp structure contains message in the *                             ack_msg format.  Use xdr_amp_ack to decode *                             the structure. * * Check whether the amp message array is empty or not. *   amp_empty_array()      -  Return: 1 - Empty;  0 - Not Empty; * * Prints out all messages in the message array. *   amp_print_array()      -  print out messages in the current thread; *   amp_print_all_array()  -  print out all messages in all thread amp. * * AMP ack xdr routine.  Used to encode and decode amp ack message. *  The first field in the data structure is the sequence number and the  *  second is the message number. * *  xdr_amp_ack(xdrsp, rec) * *  * Set the message sequence number: *   amp_set_seq_num(seq)   *     int seq;             -  starts the message sequence number from <sq> * * Receive in-coming messages.  It is used for lwp thread.  For non-lwp, *   use select(...) to received any in coming messages.  If the message *   is an ack message, the ack message is processed and it is not passed *   to the application program.  The message array is checked to determine *   whether or not any message needs to be retransmitted.  The message is  *   passed to the application program, except for the ack message. *  *   Receive messages from all threads. * *   amp_msg_recv_all(sender, arg, argsize, res, ressize, timeout) *     thread_t *sender; *     caddr_t *arg; *     int *argsize; *     caddr_t *res; *     int *ressize; *     struct timeval *timeout; * *   Receive messages from the given thread. *     amp_msg_recv(sender, arg, argsize, res, ressize, timeout) * *                     Return: 0 - success;  *                            -1 - timeout; * * Flush the message array. *   amp_flush_msg_array()  -  flushs out the wack array; *                             Return: 1 - success; *                                     0 - failed;*/ #include <stdio.h>#include <sys/types.h>          /* for stat system call */#include <sys/stat.h>           /* for stat system call */#include <sys/wait.h>#include <signal.h>#include <ctype.h>#include <errno.h>#include <rpc/rpc.h>#ifndef NO_LWP#include <lwp/lwp.h>#include <lwp/stackdep.h>#include <lwp/lwperror.h>#endif#ifdef SunOS4#include <rpc/svc.h>#endif#include <sys/time.h>#include <sys/socket.h>#include <netdb.h>#include <string.h>    #include "amp.h"#ifdef NO_LWP #define SAME_KEY   100#define SAME_INDEX 0#endif#ifndef NO_LWP#define AMP_MAX_WACK_ARRAY_SIZE 100#else#define AMP_MAX_WACK_ARRAY_SIZE 1#endif#define AMP_MAX_ARRAY_SIZE 100#define AMP_MAX_SEQ_NUM    9000#define AMP_UDP_TIMEOUT        0typedef struct amp_wack_a {  int key;  enum msg_array_stat stat;  int sequence_num;  int retransmit;  int current_que_index;  struct amp_msg_array msg_a[AMP_MAX_ARRAY_SIZE];} S_AMP_WACK_A;int amp_debug = 0;struct amp_wack_a amp_wack_array[AMP_MAX_WACK_ARRAY_SIZE];int xdr_amp_ack();int xdr_amp_msg();extern char *sys_errlist[];/* * Empty the wack message array.*/amp_flush_msg_array(){  int i, id;  if (amp_get_old_index(&id)) { /* thread id exist in array */    for (i=0; i<AMP_MAX_ARRAY_SIZE; i++)      amp_delete(id, i);  }}/* * Allocates amp resources for the current process thread*/amp_initialize(){  int i, id, key;  if (amp_get_new_index(&id, &key)) {    amp_wack_array[id].key = key;    amp_wack_array[id].stat = IN_USE;    amp_wack_array[id].retransmit = 0;    amp_wack_array[id].current_que_index = 0;    for (i=0; i<AMP_MAX_ARRAY_SIZE; i++)      amp_wack_array[id].msg_a[i].stat = EMPTY;    return(1);  }}/* * Deallocates amp resources of the current process thread*/amp_exit(){  int i, id;  if (amp_get_old_index(&id)) {    amp_wack_array[id].stat = EMPTY;        for (i=0; i<AMP_MAX_ARRAY_SIZE; i++)      amp_delete(id, i);  }}/* * Get the wack array index for the current process thread*/staticamp_get_old_index(id)int *id;{  int i, key;#ifndef NO_LWP  thread_t c;  lwp_self(&c);  key = c.thread_key;  for (i=0; i<AMP_MAX_WACK_ARRAY_SIZE; i++)    if (amp_wack_array[i].key == key) {      *id = i;      return(1);    }  return(0);#else  *id = SAME_INDEX;  return(1);#endif}/* * Assign a new wack array index for the current process thread*/staticamp_get_new_index(id, key)int *id, *key;{  int i, temp = -1;#ifndef NO_LWP  thread_t c;  lwp_self(&c);  *key = c.thread_key;  for (i=0; i<AMP_MAX_WACK_ARRAY_SIZE; i++)    if (amp_wack_array[i].key == *key) {      *id = i;      return(1);    }    else if (amp_wack_array[i].stat == EMPTY)      temp = i;        if (temp == -1) { /* array is full */    *id = -1;    return(0);  }  *id = temp;#else          /* doesn't use lightweight process */  *key = SAME_KEY;  *id = SAME_INDEX;#endif  return(1);}/* * Sends out an acknowledgement for message <procnum> and * sequence <sequence_num>.*/amp_send_ack(client, ack_msg_num, procnum, sequence_num)CLIENT *client;u_long ack_msg_num;u_long procnum;int sequence_num;{    struct amp_ack ack_struct;    enum clnt_stat status;    ack_struct.message_num = procnum;    ack_struct.sequence_num = sequence_num;    status = udpcall(client, ack_msg_num, xdr_amp_ack, &ack_struct,		   xdr_void, NULL, AMP_UDP_TIMEOUT);    if (status == RPC_SUCCESS || status == RPC_TIMEDOUT)      return(1);    else       return(0);}/* * Sends out an acknowledgement for message <procnum> and * sequence <sequence_num>.*/amp_transp_send_ack(client, ack_msg_num, rqstp, xport)     CLIENT         *client;     u_long         ack_msg_num;     struct svc_req *rqstp;     XDR            *xport;{    struct amp_ack ack_struct;    enum clnt_stat status;    int  seq;    xdr_int(xport, &seq);    ack_struct.message_num = rqstp->rq_proc;        ack_struct.sequence_num = seq;    status = udpcall(client, ack_msg_num, xdr_amp_ack, &ack_struct,		   xdr_void, NULL, AMP_UDP_TIMEOUT);    if (status == RPC_SUCCESS || status == RPC_TIMEDOUT)      return(1);    else       return(0);}/* * Checks whether or not any message in the current thread * needs to retransmitted.*/voidamp_elapsed_time(){  struct timeval tp;  struct timezone tzp;  int    elapsed_time, i, id;  S_AMP_MSG_CB *msg_cb;  S_AMP_ACK ack_rec;  if (! amp_get_old_index(&id)) {    fprintf(stderr,"<***amp_elapsed_time> Unknown thread: %d\n", id);    return;  }  gettimeofday(&tp, &tzp);  for (i=0; i<AMP_MAX_ARRAY_SIZE; i++) {    if ((amp_wack_array[id].msg_a[i].stat == IN_USE) && 	(amp_wack_array[id].msg_a[i].rec != NULL) &&	(amp_wack_array[id].msg_a[i].msg_cb != NULL)) {      if ((elapsed_time = tp.tv_sec - 	   amp_wack_array[id].msg_a[i].rec->timestamp)	  >= amp_wack_array[id].msg_a[i].msg_cb->max_wack) {/*send msg */	msg_cb = amp_wack_array[id].msg_a[i].msg_cb;	if (amp_wack_array[id].msg_a[i].rec->transmit_num >= 	    msg_cb->max_retransmit) {	  /*	   * AMP attempted to transmit the message MAX_RETRANSMIT times	   * without any success.  The user supplied error hanler is	   * called.  	   * The message is then taking off the queue.	   */	  if (msg_cb->action != NULL) {	    S_AMP_MSG_ARRAY *ptr;	    	    ptr = &amp_wack_array[id].msg_a[i];	    (*msg_cb->action)(ptr, msg_cb->data);	  }	  if (amp_debug)	    fprintf(stderr, 	    "***Can't send Msg: %ld after %d tries. Remove from msg queue.\n",		    amp_wack_array[id].msg_a[i].msg_cb->message_num,		    amp_wack_array[id].msg_a[i].msg_cb->max_retransmit);	  /*	   * Take the message off the queue after maximum attempts.	   */	  amp_delete(id, i);	  continue;	}	amp_wack_array[id].retransmit = 1;	amp_wack_array[id].current_que_index = i;	amp_rpc_send(amp_wack_array[id].msg_a[i].rec->client, 		     amp_wack_array[id].msg_a[i].rec->amp_msg->in,		     amp_wack_array[id].msg_a[i].rec->amp_msg->in_size, 		     amp_wack_array[id].msg_a[i].rec->amp_msg->inproc,		     amp_wack_array[id].msg_a[i].msg_cb);	if (amp_debug)	  fprintf(stderr,	    "amp_elapsed_time Msg: %ld\tSeq: %d\tTimestamp: %s\tCTime: %s\n",		amp_wack_array[id].msg_a[i].msg_cb->message_num,		amp_wack_array[id].msg_a[i].rec->amp_msg->sequence_num,		ctime(&amp_wack_array[id].msg_a[i].rec->timestamp),		ctime(&tp.tv_sec));	amp_wack_array[id].retransmit = 0;	if (amp_debug)	  fprintf(stderr, 		  "Msg: %ld\tetime: %d\tMax.Trans: %d\tTrans#: %d\n",		  amp_wack_array[id].msg_a[i].msg_cb->message_num,		  elapsed_time,		  amp_wack_array[id].msg_a[i].msg_cb->max_retransmit, 		  amp_wack_array[id].msg_a[i].rec->transmit_num);      }    }  }}/* * Sets the starting sequence number to be used * for the current process thread.*/voidamp_set_seq_num(sequence)int sequence;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -