⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 haqmsg.c

📁 extremeDB s sample code,useful for you
💻 C
📖 第 1 页 / 共 3 页
字号:
        nw_close(chan);        return (MCO_RET)(MCO_E_NW_ERROR);      }      memcpy((PVOID)buffer, chan->pBuffer, (int)buflen);      chan->currlen -= ((uint2)buflen);      chan->pBuffer += ((uint2)buflen);      *recvlen=buflen;      return  MCO_S_OK;    }//    Printf("Send Message %d bytes\n",(int)buflen);     if((ret = MsgSend(chan->fd ,&msg, sizeof(msg_header_t),                       msgbuf, sizeof(msg_buf_t)))  < 0 ) {     ret = errno;//     perror("\nwrite:");ErrRet:     nw_close(chan);     return (MCO_RET)(MCO_E_NW_RECVERR);    }    chan->timeout = (long)TIME_INFINITE;    if((short)(chan->currlen=(msgbuf->hd.length-buflen)) < 0) {      goto ErrRet;    }    if(buflen != 0) memcpy(buffer, msgbuf->buffer, (size_t)buflen );    chan->pBuffer = msgbuf->buffer + buflen;    NumBytes += (uint4)buflen;//    Printf("sent %d bytes\n",(int)buflen);    return MCO_S_OK;}/***************************************************************************/static int message_callback( message_context_t *ctp,                             int               type,			     unsigned          flags,                              void *            handle )/* * Description: * Receives & dispatches messages & pulses from communication channels. *  * Returns 0 = success. */{    int       rc;    msg_buf_h msg;    nw_channel_h  chan;    /* cast a pointer to the message data */    msg = (msg_buf_t *)(ctp->msg);    if(msg->hd.magic  != MAGIC) {      rc = MCO_E_NW_RECVERR;ErrRet:      MsgReply( ctp->rcvid, rc, &msg, 0 );      return 0;    }    switch(type) {      default:        rc = MCO_E_NW_NOTSUPP; goto ErrRet;      case FT_ACCEPT:      {                baseCh->snd_id     	      = ctp->rcvid;        baseCh->index = msg->hd.id;        baseCh->status  |= NWST_SEND_PENDED;        if(baseCh->status & NWST_RECV_PENDED) {Accept:                  MsgReply( baseCh->rcv_id, MCO_S_OK, msg, sizeof(msg_header_t));          MsgReply( baseCh->snd_id, MCO_S_OK, msg, 0);          baseCh->status &= ~(NWST_RECV_PENDED|NWST_SEND_PENDED);        }        break;      }      case FT_CONNECT:      {        baseCh->rcv_id   = ctp->rcvid;        baseCh->status  |= NWST_RECV_PENDED;        if(baseCh->status & NWST_SEND_PENDED) {           msg->hd.id = baseCh->index;	   goto Accept;        }        break;      }      case FT_SEND:      {        if( (chan = FindChannelById(msg->hd.id)) == NULL) {          rc = MCO_E_NW_SENDERR; goto ErrRet;        }        chan->snd_id   = ctp->rcvid;        chan->status  |= NWST_SEND_PENDED;               if(chan->status & NWST_RECV_PENDED) {Reply:           MsgReply( chan->rcv_id, MCO_S_OK, msg, sizeof(msg_header_t)+msg->hd.length);          MsgReply( chan->snd_id, MCO_S_OK, msg, 0);          chan->status &= ~(NWST_RECV_PENDED|NWST_SEND_PENDED);        }        else {       	  chan->send_buf = (PVOID)chan->pMsg;       	  memcpy(chan->pMsg, msg, sizeof(msg_header_t)+msg->hd.length);        }        break;      }      case FT_RECV:      {        if( (chan = FindChannelById(msg->hd.id)) == NULL) {          rc = MCO_E_NW_RECVERR; goto ErrRet;        }        chan->rcv_id   = ctp->rcvid;        chan->status  |= NWST_RECV_PENDED;        if(chan->status & NWST_SEND_PENDED) {          msg = (msg_buf_h)chan->send_buf;	        goto Reply;        }        break;      }    }    return 0;}/***************************************************************************/int timer_tick(message_context_t *ctp, int code, unsigned flags, void *handle)/* * Description: * Receives timer event which is sent with pulse. Counts timeouts of IO channels  * and closes the channel if timeout expires. *  * Returns 0 = success. */{//    union sigval   	value = ctp->msg->pulse.value;    nw_channel_h  	chan;//    msg_header_t	msg;    int i;        for(i=0; i < MAX_CHANNELS; i++) {      chan = &channels[i];            if(chan->status & NWST_RECV_PENDED) {          if((chan->timeout-=TIME_SLICE) <=0 )  {          	nw_close(chan);          }      }      if(chan->status & NWST_SEND_PENDED) {          if((chan->timeout-=TIME_SLICE) <=0 )  {          	nw_close(chan);          }      }    }    return 0;}/***************************************************************************/int Dispatcher()/* * Description: * Dispathcher thread initializes the resource manager for the MASTER, * then waits for any messages & pulses. When the message or pulse arrives * calls the appropriate dispatch handler. *  * Never returns. */{  resmgr_connect_funcs_t  ConnectFuncs;  resmgr_io_funcs_t       IoFuncs;  iofunc_attr_t           IoFuncAttr;  resmgr_attr_t           resmgr_attr;  message_attr_t          message_attr;  dispatch_t              *dpp;  dispatch_context_t      *ctp, *ctp_ret;  struct sigevent       event;  struct _itimer        itime;  int                   timer_id;  int                     i, resmgr_id, message_id;//  long                    ret;    /* Create the Dispatch Interface */    dpp = dispatch_create();    if( dpp == NULL )    {        Printf( "dispatch_create() failed: %s\n",                  strerror( errno ) );        return EXIT_FAILURE;    }    memset( &resmgr_attr, 0, sizeof( resmgr_attr ) );    resmgr_attr.nparts_max = 1;    resmgr_attr.msg_max_size = sizeof(msg_buf_t);    /* Setup the default I/O functions to handle open/read/write/... */    iofunc_func_init( _RESMGR_CONNECT_NFUNCS, &ConnectFuncs,                      _RESMGR_IO_NFUNCS, &IoFuncs );    /* Setup the attribute for the entry in the filesystem */    iofunc_attr_init( &IoFuncAttr, S_IFNAM | 0666, 0, 0 );    resmgr_id = resmgr_attach( dpp, &resmgr_attr, (const char*)master.devname, _FTYPE_ANY,                               0, &ConnectFuncs, &IoFuncs, &IoFuncAttr );    if( resmgr_id == -1 )    {        Printf( "resmgr_attach() failed: %s\n", strerror( errno ) );        return MCO_E_NW_FATAL;    }    /* Setup our message callback */    memset( &message_attr, 0, sizeof( message_attr ) );    message_attr.nparts_max = 1;    message_attr.msg_max_size = sizeof(msg_buf_t);    /* Attach a callback (handler) for all message types */    message_id = message_attach( dpp, &message_attr, 0,                                 0xffff, message_callback, NULL );    if( message_id == -1 )    {        Printf( "message_attach() failed: %s\n", strerror( errno ) );        return MCO_E_NW_FATAL;    }/* Initialize an event structure, and attach a pulse to it */    if((event.sigev_code = pulse_attach(dpp, MSG_FLAG_ALLOC_PULSE, 0, &timer_tick,                                       NULL)) == -1) {        Printf("Unable to attach timer pulse.\n");         return EXIT_FAILURE;   }        /* Setup a context for the dispatch layer to use */    ctp = dispatch_context_alloc( dpp );    if( ctp == NULL )    {        Printf( "dispatch_context_alloc() failed: %s\n",                  strerror( errno ) );        return MCO_E_NW_FATAL;    }    nw_init(baseCh);    memcpy(baseCh->name, master.devname,NW_MAX_NAMELENGTH);    for(i=0; i < MAX_CHANNELS; i++) {      channels[i].status = 0;    }    /* Connect to our channel */    if((baseCh->fd = message_connect(dpp, MSG_FLAG_SIDE_CHANNEL)) == -1) {        Printf("Unable to attach to channel.\n");        return MCO_E_NW_FATAL;    }    event.sigev_coid = baseCh->fd;    event.sigev_notify = SIGEV_PULSE;    event.sigev_priority = -1;    /* We could create several timers and use different sigev values for each */    event.sigev_value.sival_int = 0;    if((timer_id = TimerCreate(CLOCK_REALTIME, &event)) == -1) {;        Printf("Unable to attach channel and connection.\n");        return EXIT_FAILURE;    }    /* And now setup our timer to fire every TIME_SLICE milliseconds */    itime.nsec = TIME_SLICE*1000000;    itime.interval_nsec = TIME_SLICE*1000000;    TimerSettime(timer_id, 0, &itime, NULL);       /* The "Data Pump" - get and process messages */    while( 1 )    {        ctp_ret = dispatch_block( ctp );        if( ctp_ret )        {            dispatch_handler( ctp );        }        else        {            Printf("dispatch_block() failed: %s\n",                      strerror( errno ) );            return MCO_E_NW_FATAL;        }    }}/*************************************************************************** * Interface layer. Used by master - replica interconnections.             *  * The interface is simplified. All the internal protocol details are      * * hidden behind this layer from the application and MCOdb HA              *  ***************************************************************************//******************************************************************************/static int mco_nw_send(  nw_channel_h  ch,                         const void *  buffer,                         unsigned      buflen,                         timer_unit    timeout)/* *   nw_channel_h  ch    - pointer to a channel descriptor. *   const void*  buffer - buffer of data to send. *   unsigned  buflen    - number of bytes to send *   timer_unit timeout  - wait-for-send-completion timeout. * * Description: * This send function is used implicitly by master-replica interconnection channel. * The pointers to this function MUST be set to HA internal descriptor mco_channel_t * in each channel descriptor. * * Returns the actual length of sent data or -1 if error. */{  MCO_RET  ret;  long    length = (long)buflen;  int2     currlen;    if ( (long)(currlen = MAX_DATALENGTH) > length) currlen = (int2)length;    if( (ret = nw_send(ch , (char*)buffer, currlen, timeout)) != MCO_S_OK) {      return -1;    }    return buflen;}/******************************************************************************/static int mco_nw_recv(  nw_channel_h ch,                         void*        buffer,                         unsigned     buflen,			 timer_unit   timeout)/* *    nw_channel_h ch      - pointer to a channel descriptor. *   void* buffer         - buffer for data to receive. *   unsigned buflen       - number of bytes to receive. *    timer_unit timeout   - wait-for-receive-completion timeout. * * Description: * This receive function is used implicitly by master-replica interconnection channel * mco_nw_recv MUST read exactly max_bytes !!! * The pointers to this function MUST be set to HA internal descriptor mco_channel_t * in each channel descriptor. * * Returns the actual length of sent data or -1 if error. */{  int2       nrecv;  long      to_read = (long)buflen;  int2       currlen;    if ( (uint4)(currlen = MAX_DATALENGTH) > (uint4)to_read) currlen = (int2)to_read;    if ( nw_recv(ch, (char*)buffer, currlen, &nrecv, timeout) != MCO_S_OK ) {        return -1;    }  return buflen;}/***************************************************************************/static void  ErrorHandler(  ha_error_h  HAerror)/* *   ha_error_h  HAerror - pointer to ha_error_t structure described in mcoha.h.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -