📄 hapipes.c
字号:
/******************************************************************* * * * This file is a part of the eXtremeDB HA framework * * Copyright (c) 2001-2006 McObject LLC * * All Rights Reserved * * * *******************************************************************//* * ++ * * PROJECT: eXtremeDB(tm) (c) McObject LLC * * SUBSYSTEM: HA support * * MODULE: hapipes.c * * ABSTRACT: HA framework interface layer. Named pipes * * * VERSION: 1.0 * * HISTORY: * 1.0- 1 AD 12-May-2003 Created it was * 2 SS 08-Jul-2003 Modified, configuration options added * 3 SS 01-Aug-2003 Reconstructed * 4 SS 26-Aug-2003 Replica database name is generated automatically * via the replica autoindex generated by master * * -- */#define INCLUDE_SOCKETS#include "interface.h"#ifdef CFG_PIPE_CHANNEL#ifdef __cplusplus extern "C" { #endifextern int2 allocate_index();extern void deallocate_index(int2 index);extern void reallocate_index(int2 index);extern uint4 NumBytes;extern int conn_flag;extern timer_unit init_time;extern int commit_initialized;#ifdef CFG_SHARED_COMMITextern THREAD_ID synchcommit;#endif // CFG_SHARED_COMMIT#ifdef __cplusplus }#endif/*************************************************************************** * Interface layer. Used by master - replica interconnections. * * The interface is simplified. All the internal protocol details are * * hidden behind this layer from the application and MCOdb HA * ***************************************************************************//****************************************************************************** * * Send to stream * * Parameters: * * IN nw_channel_h ch - pointer to a channel descriptor. * IN const void* buffer - buffer of data to send. * IN unsigned buflen - number of bytes to send * IN timer_unit timeout - wait-for-send-completion timeout. * * Description: * * This send function is used implicitly by master-replica interconnection channel. * The pointers to this function MUST be set to HA internal descriptor mco_channel_t * in each channel descriptor. * * Return: * * Returns the actual length of sent data or -1 if error. */static int mco_nw_send(nw_channel_h ch, const void * buffer, unsigned buflen, timer_unit timeout ){ MCO_RET ret; long length = (long)buflen; int2 currlen; if ( (long)(currlen = MAX_DATALENGTH) > length) currlen = (int2)length; if( (ret = nw_send(ch , (char*)buffer, currlen, timeout/2)) != MCO_S_OK) { return -1; } return buflen;}/****************************************************************************** * * Receive from stream * * Parameters: * * IN nw_channel_h ch - pointer to a channel descriptor. * OUT void* buffer - buffer for data to receive. * IN unsigned buflen - number of bytes to receive. * IN timer_unit timeout - wait-for-receive-completion timeout. * * Description: * * This receive function is used implicitly by master-replica interconnection channel * mco_nw_recv MUST read exactly max_bytes !!! * The pointers to this function MUST be set to HA internal descriptor mco_channel_t * in each channel descriptor. * * Return: * * Returns the actual length of sent data or -1 if error. */static int mco_nw_recv(nw_channel_h ch, void* buffer, unsigned buflen, timer_unit timeout){ int2 nrecv; long to_read = (long)buflen; int2 currlen; if ( (uint4)(currlen = MAX_DATALENGTH) > (uint4)to_read) currlen = (int2)to_read; if ( nw_recv(ch, (char*)buffer, currlen, &nrecv, timeout/2) != MCO_S_OK ) { return -1; } return buflen;}/*************************************************************************** * * HA error handler * * Parameters: * * IN ha_error_h HAerror - pointer to ha_error_t structure described in mcoha.h. * The structure contains error code that caused the handler call & pointer to * the descriptor of communication channel of the deleted replica. If the pointer not * equal to NULL then replica was deleted from master's list of replicas. * * Description: * * HA error handler callback - closes replica channel & deallocates memory * used by it's descriptor * */static void ErrorHandler( ha_error_h HAerror){#ifdef NW_DEBUG_OUTPUT Printf("HA error handler called, errcode = %d\n", HAerror->errcode);#endif if(HAerror->IOchannel != 0) { // if (replica was deleted) {} nw_close((nw_channel_h)HAerror->IOchannel); // close communication channel free(HAerror->IOchannel); // free the memory allocated by the descriptor }}/*************************************************************************** * * Attaches replica to the master. * * Parameters: * * IN mco_db_h db - database descriptor pointer. * IN const char* - devname. * IN const mco_connection_param_t* params - connection & transaction timeouts. * IN const char* replica_name - replica name (optional). * IN unsigned long timeout - wait-for-connection timeout * IN OUT void * arg - user defined argument. In this context * it is used as main/slave master flag in multimaster mode * (for shared memory only) * * Description: * * This function includes connection algotrithm dependent on the type of a communication * channel. It attaches connected replica to HA subsystem. * * Return: * * Returns MSO_S_OK if successful or error code (see above). */MCO_RET mco_nw_attach_replica(mco_db_h db, const char* devname, const mco_connection_param_t * params, const char* replica_name, timer_unit timeout, void * arg){ MCO_RET ret; nw_channel_t _ch; // copy of IO channel in the stack nw_channel_h ch; // pointer to the new IO channel ha_h ha = (ha_h)arg;/* * If the transport protocol isn't initialized * then initialize it */ if ( !(ha->baseChan.status&NWST_INITIALIZED) ) { ha->baseChan.db = db; // database handle. Is used by mco_nw_close()#ifdef NW_DEBUG_OUTPUT Printf( "\n Initializing master communication channel's context, please wait...\n" );#endif if( (ret = nw_init(&ha->baseChan)) != MCO_S_OK) {#ifdef NW_DEBUG_OUTPUT Printf("Error initializing master channel\n");#endif return ret; } }/* * If master's listener channel isn't initialized * then open listener channel */ if(!(ha->baseChan.status&NWST_LISTEN)) { if ( (ret=nw_listen(&ha->baseChan, devname)) != MCO_S_OK ) {#ifdef NW_DEBUG_OUTPUT Printf("Error listening: %d\n", ret);#endif return ret; }#ifdef NW_DEBUG_OUTPUT Printf("done\n");#endif } ha->baseChan.db = db; // database handle. Is used by mco_nw_close() ha->baseChan.status |= NWST_IS_MASTER; // ID_MASTER flag used by function mco_nw_close() // to check whether the transport layer still // belongs to master /* * Waiting for connection request */#ifdef NW_DEBUG_OUTPUT Printf( "\n Listener: Waiting for the connection request...\n" );#endif if ( (ret = nw_accept(&ha->baseChan, &_ch, timeout)) != MCO_S_OK ) {#ifdef NW_DEBUG_OUTPUT if(ret != MCO_E_NW_TIMEOUT) Printf("Error accepting connection %d\n", ret); else Printf("\nConnection timeout\n");#endif return ret; }#ifdef NW_DEBUG_OUTPUT Printf("connection accepted\n");#endif/* * Creating and initializing channel descriptor */ ch = (nw_channel_h)malloc(sizeof(nw_channel_t)); // create the new channel descriptor memcpy(ch,&_ch,sizeof(nw_channel_t)); // copy the descriptor from stack to the new instance ch->mco_channel.fsend = (mco_xstream_write)&mco_nw_send; // set pointer to virtual HA send-receive methods ch->mco_channel.frecv = (mco_xstream_read)&mco_nw_recv; // to it's internal descriptor// sprintf(name, "mco_replica_%d", ch->index);/* * Connection request is just arrived, attaching to replica */#ifdef NW_DEBUG_OUTPUT Printf( " Attaching to replica, please wait...\n" );#endif init_time = mco_system_get_current_time(); conn_flag = 1; ret = mco_HA_attach_replica(db, &ch->mco_channel, params, replica_name, ErrorHandler); init_time = mco_system_get_current_time() - init_time; conn_flag = 0;#ifdef NW_DEBUG_OUTPUT if(ret) Printf("Error attaching to replica %d\n", ret);#endif return ret;}/*************************************************************************** * * Attaches master to the replica. * * Parameters: * * IN OUT mco_db_h* db - pointer to database handle * IN const char*conn_string - interconnect dependent connection-string. * IN const mco_connection_param_t* params - connection & transaction timeouts. * OUT MCO_E_HA_REPLICA_STOP_REASON* stop_reason - the reason why the replica is stopped. * IN const char* db_name - database name. * IN mco_dictionary_h dict - pointer to database dictionary * IN void* mem_ptr - pointer to the memory allocated for database copy * IN uint4 total_size - size of allocated memory * IN unsigned long timeout - wait-for-connection timeout * IN OUT void * arg - user defined argument. In this context it implements * special mode flag: * flag != 0 - replica loads master's database and then becomes master * * Description: * * This function includes connection algotrithm dependent on the type of a communication * channel. It attaches the replica to the connected master. * * Return: * * Returns MSO_S_OK if successful or error code (see above). */MCO_RET mco_nw_attach_master(mco_db_h* db, const char* conn_string, const mco_connection_param_t* params, MCO_E_HA_REPLICA_STOP_REASON* stop_reason, const char* db_name, mco_dictionary_h dict, void* mem_ptr, uint4 total_size, timer_unit timeout, void * arg){ MCO_RET ret; char name[NW_MAX_NAMELENGTH]; ha_h ha = (ha_h)arg; *stop_reason = MCO_HA_REPLICA_HANDSHAKE_FAILED; commit_initialized++;/* * If the base channel still belongs to master then delete all replicas & close all * master's channels */ if ( ha->baseChan.status & NWST_IS_MASTER ) { mco_nw_close( 0 ); }/* * If transport layer isn't initialized * then initialize it */ if ( !(ha->baseChan.status & NWST_INITIALIZED) ) {#ifdef NW_DEBUG_OUTPUT Printf( "\n Initializing master communication channel's context, please wait...\n" );#endif if( (ret=nw_init(&ha->baseChan))!=MCO_S_OK) {#ifdef NW_DEBUG_OUTPUT Printf("Error initializing: %d\n",ret);#endif return ret; }#ifdef NW_DEBUG_OUTPUT Printf("done\n");#endif }/* * Connecting to master */#ifdef NW_DEBUG_OUTPUT Printf("Connecting...\n");#endif if ( (ret=nw_connect(&ha->baseChan, conn_string, timeout)) != MCO_S_OK ) {#ifdef NW_DEBUG_OUTPUT Printf("Error connecting: %d\n", ret);#endif return ret; }#ifdef NW_DEBUG_OUTPUT Printf("done\n");#endif/* * Initializing channel descriptor */ ha->baseChan.mco_channel.fsend = (mco_xstream_write)&mco_nw_send; ha->baseChan.mco_channel.frecv = (mco_xstream_read)&mco_nw_recv;/* * Connection request is accepted, attaching to master */#ifdef NW_DEBUG_OUTPUT Printf( " Attaching to master...\n" );#endif/* This sample code compiles the unique replica database name from parameter "db_name" and unique replica index obtained from master. please remove the line below if you want to replicate existing database and use "db_name" instead of "name" *///*** sprintf(name, "%s%d", db_name,(int)ha->baseChan.index); strcpy((char*)db_name,name); reallocate_index(ha->baseChan.index); mco_db_kill(name);//*** Printf("DB name = %s\n",name); ha->baseChan.index = -1; ret = mco_HA_attach_master(db, // pointer to database handle (mco_channel_h)&ha->baseChan, // pointer to the communication channel params, // connection & transaction timeouts stop_reason, // stop reason name, // database name dict, // pointer to database dictionary mem_ptr, // pointer to the memory allocated for database copy total_size, // size of allocated memory (uint2)((ha->replicaMode &MCO_HAFLAG_FORCE_MASTER) | MCO_HAFLAG_REPLICA_NOTIFICATION) ); nw_close(&ha->baseChan); commit_initialized = 0; return ret;}/*************************************************************************** * * Parameters: * * IN OUT void * arg - user defined argument. Not used in this context * * Description: * * If called by the replica the function closes the communication channel. * if called by the master then disconnects all replicas * connected to the master and closes all master/replica communication channels * * Return: * * Returns MSO_S_OK if successful or error code (see above). */MCO_RET mco_nw_close (void *arg){ nw_channel_h ch; short nr; MCO_RET rc; ha_h ha = (ha_h)arg; if ( ha->baseChan.status & NWST_IS_MASTER ) { if(ha->isMainMaster > 0) { // if (main master) for(nr=mco_HA_get_number_of_replicas((mco_db_h)ha->baseChan.db); nr; nr-- ) { Printf("Master: detaching replica %d ...", nr-1); ch = (nw_channel_h)mco_HA_get_io_channel((mco_db_h)ha->baseChan.db, (int2)(nr-1)); if ((rc = mco_HA_detach_replica((mco_db_h)ha->baseChan.db, (mco_channel_h)ch)) != MCO_S_OK) { Printf( "failed (%d)\n", rc ); } /* no need to close channel, it's closed by ErrorHandler */ } } } nw_close(&ha->baseChan); return MCO_S_OK;}#endif// CFG_PIPE_CHANNEL
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -