📄 haqmsg.c
字号:
nw_close(chan); return (MCO_RET)(MCO_E_NW_ERROR); } memcpy((PVOID)buffer, chan->pBuffer, (int)buflen); chan->currlen -= ((uint2)buflen); chan->pBuffer += ((uint2)buflen); *recvlen=buflen; return MCO_S_OK; }// Printf("Send Message %d bytes\n",(int)buflen); if((ret = MsgSend(chan->fd ,&msg, sizeof(msg_header_t), msgbuf, sizeof(msg_buf_t))) < 0 ) { ret = errno;// perror("\nwrite:");ErrRet: nw_close(chan); return (MCO_RET)(MCO_E_NW_RECVERR); } chan->timeout = (long)TIME_INFINITE; if((short)(chan->currlen=(msgbuf->hd.length-buflen)) < 0) { goto ErrRet; } if(buflen != 0) memcpy(buffer, msgbuf->buffer, (size_t)buflen ); chan->pBuffer = msgbuf->buffer + buflen; NumBytes += (uint4)buflen;// Printf("sent %d bytes\n",(int)buflen); return MCO_S_OK;}/***************************************************************************/static int message_callback( message_context_t *ctp, int type, unsigned flags, void * handle )/* * Description: * Receives & dispatches messages & pulses from communication channels. * * Returns 0 = success. */{ int rc; msg_buf_h msg; nw_channel_h chan; /* cast a pointer to the message data */ msg = (msg_buf_t *)(ctp->msg); if(msg->hd.magic != MAGIC) { rc = MCO_E_NW_RECVERR;ErrRet: MsgReply( ctp->rcvid, rc, &msg, 0 ); return 0; } switch(type) { default: rc = MCO_E_NW_NOTSUPP; goto ErrRet; case FT_ACCEPT: { baseCh->snd_id = ctp->rcvid; baseCh->index = msg->hd.id; baseCh->status |= NWST_SEND_PENDED; if(baseCh->status & NWST_RECV_PENDED) {Accept: MsgReply( baseCh->rcv_id, MCO_S_OK, msg, sizeof(msg_header_t)); MsgReply( baseCh->snd_id, MCO_S_OK, msg, 0); baseCh->status &= ~(NWST_RECV_PENDED|NWST_SEND_PENDED); } break; } case FT_CONNECT: { baseCh->rcv_id = ctp->rcvid; baseCh->status |= NWST_RECV_PENDED; if(baseCh->status & NWST_SEND_PENDED) { msg->hd.id = baseCh->index; goto Accept; } break; } case FT_SEND: { if( (chan = FindChannelById(msg->hd.id)) == NULL) { rc = MCO_E_NW_SENDERR; goto ErrRet; } chan->snd_id = ctp->rcvid; chan->status |= NWST_SEND_PENDED; if(chan->status & NWST_RECV_PENDED) {Reply: MsgReply( chan->rcv_id, MCO_S_OK, msg, sizeof(msg_header_t)+msg->hd.length); MsgReply( chan->snd_id, MCO_S_OK, msg, 0); chan->status &= ~(NWST_RECV_PENDED|NWST_SEND_PENDED); } else { chan->send_buf = (PVOID)chan->pMsg; memcpy(chan->pMsg, msg, sizeof(msg_header_t)+msg->hd.length); } break; } case FT_RECV: { if( (chan = FindChannelById(msg->hd.id)) == NULL) { rc = MCO_E_NW_RECVERR; goto ErrRet; } chan->rcv_id = ctp->rcvid; chan->status |= NWST_RECV_PENDED; if(chan->status & NWST_SEND_PENDED) { msg = (msg_buf_h)chan->send_buf; goto Reply; } break; } } return 0;}/***************************************************************************/int timer_tick(message_context_t *ctp, int code, unsigned flags, void *handle)/* * Description: * Receives timer event which is sent with pulse. Counts timeouts of IO channels * and closes the channel if timeout expires. * * Returns 0 = success. */{// union sigval value = ctp->msg->pulse.value; nw_channel_h chan;// msg_header_t msg; int i; for(i=0; i < MAX_CHANNELS; i++) { chan = &channels[i]; if(chan->status & NWST_RECV_PENDED) { if((chan->timeout-=TIME_SLICE) <=0 ) { nw_close(chan); } } if(chan->status & NWST_SEND_PENDED) { if((chan->timeout-=TIME_SLICE) <=0 ) { nw_close(chan); } } } return 0;}/***************************************************************************/int Dispatcher()/* * Description: * Dispathcher thread initializes the resource manager for the MASTER, * then waits for any messages & pulses. When the message or pulse arrives * calls the appropriate dispatch handler. * * Never returns. */{ resmgr_connect_funcs_t ConnectFuncs; resmgr_io_funcs_t IoFuncs; iofunc_attr_t IoFuncAttr; resmgr_attr_t resmgr_attr; message_attr_t message_attr; dispatch_t *dpp; dispatch_context_t *ctp, *ctp_ret; struct sigevent event; struct _itimer itime; int timer_id; int i, resmgr_id, message_id;// long ret; /* Create the Dispatch Interface */ dpp = dispatch_create(); if( dpp == NULL ) { Printf( "dispatch_create() failed: %s\n", strerror( errno ) ); return EXIT_FAILURE; } memset( &resmgr_attr, 0, sizeof( resmgr_attr ) ); resmgr_attr.nparts_max = 1; resmgr_attr.msg_max_size = sizeof(msg_buf_t); /* Setup the default I/O functions to handle open/read/write/... */ iofunc_func_init( _RESMGR_CONNECT_NFUNCS, &ConnectFuncs, _RESMGR_IO_NFUNCS, &IoFuncs ); /* Setup the attribute for the entry in the filesystem */ iofunc_attr_init( &IoFuncAttr, S_IFNAM | 0666, 0, 0 ); resmgr_id = resmgr_attach( dpp, &resmgr_attr, (const char*)master.devname, _FTYPE_ANY, 0, &ConnectFuncs, &IoFuncs, &IoFuncAttr ); if( resmgr_id == -1 ) { Printf( "resmgr_attach() failed: %s\n", strerror( errno ) ); return MCO_E_NW_FATAL; } /* Setup our message callback */ memset( &message_attr, 0, sizeof( message_attr ) ); message_attr.nparts_max = 1; message_attr.msg_max_size = sizeof(msg_buf_t); /* Attach a callback (handler) for all message types */ message_id = message_attach( dpp, &message_attr, 0, 0xffff, message_callback, NULL ); if( message_id == -1 ) { Printf( "message_attach() failed: %s\n", strerror( errno ) ); return MCO_E_NW_FATAL; }/* Initialize an event structure, and attach a pulse to it */ if((event.sigev_code = pulse_attach(dpp, MSG_FLAG_ALLOC_PULSE, 0, &timer_tick, NULL)) == -1) { Printf("Unable to attach timer pulse.\n"); return EXIT_FAILURE; } /* Setup a context for the dispatch layer to use */ ctp = dispatch_context_alloc( dpp ); if( ctp == NULL ) { Printf( "dispatch_context_alloc() failed: %s\n", strerror( errno ) ); return MCO_E_NW_FATAL; } nw_init(baseCh); memcpy(baseCh->name, master.devname,NW_MAX_NAMELENGTH); for(i=0; i < MAX_CHANNELS; i++) { channels[i].status = 0; } /* Connect to our channel */ if((baseCh->fd = message_connect(dpp, MSG_FLAG_SIDE_CHANNEL)) == -1) { Printf("Unable to attach to channel.\n"); return MCO_E_NW_FATAL; } event.sigev_coid = baseCh->fd; event.sigev_notify = SIGEV_PULSE; event.sigev_priority = -1; /* We could create several timers and use different sigev values for each */ event.sigev_value.sival_int = 0; if((timer_id = TimerCreate(CLOCK_REALTIME, &event)) == -1) {; Printf("Unable to attach channel and connection.\n"); return EXIT_FAILURE; } /* And now setup our timer to fire every TIME_SLICE milliseconds */ itime.nsec = TIME_SLICE*1000000; itime.interval_nsec = TIME_SLICE*1000000; TimerSettime(timer_id, 0, &itime, NULL); /* The "Data Pump" - get and process messages */ while( 1 ) { ctp_ret = dispatch_block( ctp ); if( ctp_ret ) { dispatch_handler( ctp ); } else { Printf("dispatch_block() failed: %s\n", strerror( errno ) ); return MCO_E_NW_FATAL; } }}/*************************************************************************** * Interface layer. Used by master - replica interconnections. * * The interface is simplified. All the internal protocol details are * * hidden behind this layer from the application and MCOdb HA * ***************************************************************************//******************************************************************************/static int mco_nw_send( nw_channel_h ch, const void * buffer, unsigned buflen, timer_unit timeout)/* * nw_channel_h ch - pointer to a channel descriptor. * const void* buffer - buffer of data to send. * unsigned buflen - number of bytes to send * timer_unit timeout - wait-for-send-completion timeout. * * Description: * This send function is used implicitly by master-replica interconnection channel. * The pointers to this function MUST be set to HA internal descriptor mco_channel_t * in each channel descriptor. * * Returns the actual length of sent data or -1 if error. */{ MCO_RET ret; long length = (long)buflen; int2 currlen; if ( (long)(currlen = MAX_DATALENGTH) > length) currlen = (int2)length; if( (ret = nw_send(ch , (char*)buffer, currlen, timeout)) != MCO_S_OK) { return -1; } return buflen;}/******************************************************************************/static int mco_nw_recv( nw_channel_h ch, void* buffer, unsigned buflen, timer_unit timeout)/* * nw_channel_h ch - pointer to a channel descriptor. * void* buffer - buffer for data to receive. * unsigned buflen - number of bytes to receive. * timer_unit timeout - wait-for-receive-completion timeout. * * Description: * This receive function is used implicitly by master-replica interconnection channel * mco_nw_recv MUST read exactly max_bytes !!! * The pointers to this function MUST be set to HA internal descriptor mco_channel_t * in each channel descriptor. * * Returns the actual length of sent data or -1 if error. */{ int2 nrecv; long to_read = (long)buflen; int2 currlen; if ( (uint4)(currlen = MAX_DATALENGTH) > (uint4)to_read) currlen = (int2)to_read; if ( nw_recv(ch, (char*)buffer, currlen, &nrecv, timeout) != MCO_S_OK ) { return -1; } return buflen;}/***************************************************************************/static void ErrorHandler( ha_error_h HAerror)/* * ha_error_h HAerror - pointer to ha_error_t structure described in mcoha.h.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -