📄 pipespsx.c
字号:
/******************************************************************* * * * This file is a part of the eXtremeDB HA framework * * Copyright (c) 2001-2006 McObject LLC * * All Rights Reserved * * * *******************************************************************//* * ++ * * PROJECT: eXtremeDB(tm) (c) McObject LLC * * SUBSYSTEM: HA support * * MODULE: pipespsx.c * * ABSTRACT: Low level OS dependent IPC. * Channel-over-pipes, platform dependent functions * * * VERSION: 1.0 * * HISTORY: * 1.0- 1 SS 19-Feb-2004 Created it was * * -- */#ifndef _WIN32#include "interface.h"#ifdef CFG_PIPE_CHANNEL#include <fcntl.h>extern int2 allocate_index();extern void deallocate_index(int2 index);extern void reallocate_index(int2 index);extern uint4 NumBytes;int conn_flag = 0;timer_unit init_time = 0;int commit_initialized = 0;/*************************************************************************** * Transport layer. Used by an application for providing of communication * * channels, guaranteed data transfer. Can be used by an application for * * the implementation of it's own communications channels. * * Some details of a communication channel are accessible to application * ***************************************************************************//****************************************************************************** * function init_channel() - initializes the channel descriptor */static void init_channel( nw_channel_h chan ){ chan->index = -1; chan->status = 0; chan->hndl.pi = INVALID_HANDLE_VALUE; chan->hndl.po = INVALID_HANDLE_VALUE; chan->cancel_socket = INVALID_SOCKET_VALUE;}/*************************************************************************** * * Close the communication channel * * Parameters: * * IN nw_channel_h chan - pointer to the channel descriptor. * * Description: * * Closes the communication channel * * Return: * * Returns MSO_S_OK if successful or error code. */MCO_RET nw_close(nw_channel_h chan){ char name[NW_FULL_NAMELENGTH]; if(chan->hndl.pi != INVALID_HANDLE_VALUE) { close(chan->hndl.pi); } if(chan->index >= 0) { sprintf(name, "%s_%d_I", chan->name, chan->index); goto L10; } if(chan->index == -2) { sprintf(name, "%s_I", chan->name);L10: remove(name); } if(chan->hndl.po != INVALID_HANDLE_VALUE) { close(chan->hndl.po); } if(chan->index >= 0) { sprintf(name, "%s_%d_O", chan->name, chan->index); goto L20; } if(chan->index == -2) { sprintf(name, "%s_O", chan->name);L20: remove(name); } deallocate_index(chan->index); init_channel(chan); return MCO_S_OK;}/*************************************************************************** * * Initialize communication channel support * * Parameters: * * IN nw_channel_h chan - pointer to the base channel descriptor. * * Description: * * Initializes the network protocol & base channel descriptor. * The base channel is the main channel descriptor used by server for accepting * client's connections requests. The descriptor of base channel MAY contain * extention including common properties of the transport layer. * * Return: * * Returns MSO_S_OK if successful or error code. */MCO_RET nw_init(nw_channel_h chan){ init_channel(chan); chan->index = -2; chan->status = NWST_INITIALIZED; // base channel status = initialized atexit((void*)mco_nw_close); return MCO_S_OK;}/************************************************************************** * * Set "listen" state * * Parameters: * * IN nw_channel_h chan - pointer to a channel descriptor. * IN const char * nw_dev - Listener channel name * * Description: * * Sets the IO channel to the listener state. * * Return: * * Returns MSO_S_OK if successful or error code (see above). */MCO_RET nw_listen(nw_channel_h chan, const char * nw_dev){ int len, rcs; char lsnr_name[NW_FULL_NAMELENGTH];// uint4 mode = 1; chan->index = -2; if( (len = strlen(nw_dev)) > NW_MAX_NAMELENGTH-1 ) len = NW_MAX_NAMELENGTH-1; memcpy( chan->name, nw_dev, len); chan->name[len] = 0; nw_close(chan); umask(0); sprintf(lsnr_name, "%s_I", chan->name); if (mkfifo(lsnr_name, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP)) {ErrRet: nw_close(chan); return MCO_E_NW_ERROR; } if ( (chan->hndl.pi = open(lsnr_name, O_RDWR, 0)) < 0) { goto ErrRet; } NBIO(chan->hndl.pi, &rcs); if (rcs < 0 ) { goto ErrRet; } sprintf(lsnr_name, "%s_O", chan->name); if (mkfifo(lsnr_name, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP)) goto ErrRet; if ( (chan->hndl.po = open(lsnr_name, O_RDWR, 0)) < 0) goto ErrRet; chan->status |= NWST_LISTEN; return MCO_S_OK;}/*************************************************************************** * * Accept the connection * * Parameters: * * IN nw_channel_h chan - pointer to the listener channel. * IN OUT nw_channel_h ioch - pointer to the IO channel waiting for connection. * IN unsigned long timeout) - wait-for-connect timeout. * * Description: * * Waits for the connection of a remote host to the IO channel. * * Return: * * Returns MSO_S_OK if successful or error code (see above). */MCO_RET nw_accept(nw_channel_h chan, nw_channel_h ioch, timer_unit timeout){ short recvlen; char ch_name[NW_FULL_NAMELENGTH]; nw_pipename_t pipename; MCO_RET rc; int rcs; /* get current channel index */ init_channel( ioch ); if( (ioch->index = allocate_index()) == -1) return (MCO_RET)(MCO_E_NW_FATAL); nw_close(ioch); ioch->index = allocate_index(); memcpy(ioch->name, chan->name, NW_MAX_NAMELENGTH); pipename.index = ioch->index; sprintf(ch_name, "%s_%d", chan->name, pipename.index);#ifdef NW_DEBUG_OUTPUT Printf("Pipe = %s\n",ch_name);#endif /* create named IO channel */ if ( write(chan->hndl.po, &pipename, sizeof(pipename)) != sizeof(pipename) ) {ErrRet: nw_close(chan); nw_close(ioch); return (MCO_RET)(MCO_E_NW_ACCEPT); } if ( (rc = nw_recv(chan, (PCHAR)&pipename, sizeof(pipename), &recvlen, timeout)) != MCO_S_OK ) { nw_close(chan); nw_close(ioch); return rc; } sprintf(ch_name, "%s_%d_O", chan->name, pipename.index); if ( (ioch->hndl.pi = open(ch_name, O_RDWR, 0)) < 0) { goto ErrRet; } NBIO(ioch->hndl.pi, &rcs); if (rcs < 0 ) { goto ErrRet; } sprintf(ch_name, "%s_%d_I", chan->name, pipename.index); if ( (ioch->hndl.po = open(ch_name, O_RDWR, 0)) < 0) { goto ErrRet; } ioch->status = (NWST_CONNECTED|NWST_INITIALIZED); return MCO_S_OK;}/*************************************************************************** * * Connect to another host * * Parameters: * * IN nw_channel_h chan - pointer to a channel descriptor * IN const char connect_string - interconnect dependent connection-string * IN unsigned long timeout); * * Description: * * Connects IO channel to the remote host by it's name. * * Return: * * Returns MSO_S_OK if successful or error code (see above). */MCO_RET nw_connect(nw_channel_h chan, const char * connect_string, timer_unit timeout){ uint4 ret; short recvlen; char lsnr_name[NW_FULL_NAMELENGTH]; nw_pipename_t pipename; nw_channel_t lch; MCO_RET rc; int rcs; init_channel(&lch); init_channel(chan); memcpy(lch.name, connect_string, NW_MAX_NAMELENGTH); memcpy(chan->name, connect_string, NW_MAX_NAMELENGTH); sprintf(lsnr_name, "%s_I", lch.name); if ( (lch.hndl.po = open(lsnr_name, O_RDWR, 0)) < 0) goto ErrRet0; sprintf(lsnr_name, "%s_O", lch.name); if ( (lch.hndl.pi = open(lsnr_name, O_RDWR, 0)) < 0) {ErrRet0: perror("errno = "); nw_close(&lch); return (MCO_RET)(MCO_E_NW_CONNECT); } if ( (rc = nw_recv(&lch, (PCHAR)&pipename, sizeof(pipename), &recvlen, timeout)) != MCO_S_OK ) { goto ErrRet0; } /* get current channel index */ chan->index = pipename.index; nw_close(chan); chan->index = pipename.index; sprintf(lsnr_name, "%s_%d_I", chan->name, pipename.index); umask(0); if (mkfifo(lsnr_name, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP)) goto ErrRet1; if ( (chan->hndl.pi = open(lsnr_name, O_RDWR, 0)) < 0) { goto ErrRet1; } NBIO(chan->hndl.pi, &rcs); if (rcs < 0 ) { goto ErrRet1; } sprintf(lsnr_name, "%s_%d_O", chan->name, pipename.index); if (mkfifo(lsnr_name, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP)) goto ErrRet1; if ( (chan->hndl.po = open(lsnr_name, O_RDWR, 0)) < 0) { goto ErrRet1; } if ( (ret = write(lch.hndl.po, &pipename, sizeof(pipename))) <= 0 ) {ErrRet1: nw_close(chan); goto ErrRet0; } lch.index = -1; chan->index = pipename.index; nw_close(&lch); return MCO_S_OK;}/*************************************************************************** * * Send a message * * Parameters: * * IN nw_channel_h chan - pointer to a channel descriptor. * IN const char* buffer - buffer to send. * IN int2 buflen - buffer length,is cut to 32767 bytes. * IN unsigned long timeout - send timeout in milliseconds. It is media dependent * & application dependent. Application MUST set this * parameter considering the media rate & it's own needs. * * Description: * * Sends a message to the communication channel * * Return: * * Returns MSO_S_OK if successful or error code (see above). */MCO_RET nw_send (nw_channel_h chan, char* buffer, int2 buflen, timer_unit timeout){ if ( write(chan->hndl.po, buffer, buflen) != buflen ) { nw_close(chan); perror("write:"); return (MCO_RET)(MCO_E_NW_SENDERR); } NumBytes += (uint4)buflen; return MCO_S_OK;}/*************************************************************************** * * Receive a message * * Parameters: * * IN nw_channel_h chan - pointer to a channel descriptor. * OUT char* buffer - buffer to receive. * IN int2 buflen - buffer length limit, is cut to 32767 bytes. * OUT int2* recvlen, - actual received length, is cut to 32767 bytes. * IN unsigned long timeout - receive timeout in milliseconds. It is media dependent * & application dependet. Application MUST set this parameter * considering the media rate & it's own needs * * Description: * * Receives a message from the communication cchannel * * Return: * * Returns MSO_S_OK if successful or error code (see above). */MCO_RET nw_recv (nw_channel_h chan, char* buffer, int2 buflen, int2* recvlen, timer_unit timeout) { uint4 len = (uint4) buflen; long nbytes; int rcs; TIMEVAL tv; fd_set readfds; timer_unit time = (long)timeout; timer_unit t1, t2 = mco_system_get_current_time(); while (len) { t1 = t2; if( (time-=((t2=mco_system_get_current_time())-t1)) <= 0) { return MCO_E_NW_TIMEOUT; } tv.tv_sec = time/1000; tv.tv_usec= (time%1000)*1000; FD_ZERO(&readfds); FD_SET(chan->hndl.pi, &readfds); rcs = select (1+(int)chan->hndl.pi, &readfds, 0, 0, &tv); if( rcs < 0 ) { // error if(errno == EINTR) continue; nw_close(chan); return MCO_E_NW_RECVERR; } if( !rcs ) { // timeout nw_close(chan); return MCO_E_NW_TIMEOUT; } if ( !(FD_ISSET(chan->hndl.pi, &readfds) ) ) { nw_close(chan); return MCO_E_NW_RECVERR; } if ( (nbytes = read(chan->hndl.pi, buffer, buflen)) < 0 ) { nw_close(chan); return MCO_E_NW_RECVERR; } len -= nbytes; buffer += nbytes; } *recvlen = (short) buflen; NumBytes += (uint4)buflen; return MCO_S_OK;}/* * Not implemented. The stubs for the compliance with other platforms *//*************************************************************************** * * Accept the cancel point connection * * IN nw_channel_h chan - pointer to the listener channel. * IN OUT nw_channel_h ioch - pointer to the IO channel waiting for connection. * IN unsigned long timeout) - wait-for-connect timeout. * * Description: * Waits for the connection of a remote host to the IO channel. * * Returns MSO_S_OK if successful or error code (see above). */MCO_RET nw_accept_cancel_point( nw_channel_h chan, char * port, timer_unit timeout ){ return MCO_S_OK;}/*************************************************************************** * * Connect to cancel point * * IN nw_channel_h chan - pointer to a channel descriptor * IN const char connect_string - interconnect dependent connection-string * IN unsigned long timeout); * * Description: * Connects IO channel to the remote host by it's name. * * Returns MSO_S_OK if successful or error code (see above). */MCO_RET nw_connect_cancel_point( nw_channel_h chan, const char * connect_string, timer_unit timeout ){ return MCO_S_OK;}/************************************************************************ * * Cancels the communication channel * * Parameters: * * IN nw_channel_h ch - pointer to a channel descriptor. * */void nw_cancel(nw_channel_h ch){}#endif// CFG_PIPE_CHANNEL#endif /* _WIN32 */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -