📄 ospcomm.c
字号:
/**########################################################################*########################################################################*########################################################################* * COPYRIGHT (c) 1998, 1999 by TransNexus, LLC * * This software contains proprietary and confidential information * of TransNexus, LLC. Except as may be set forth in the license * agreement under which this software is supplied, use, disclosure, * or reproduction is prohibited without the prior, express, written* consent of TransNexus, LLC. * *******#########################################################################*#########################################################################*#########################################################################*//* * ospcomm.cpp - Communication object functions. */#include "osp.h"#include "ospcomm.h"#include "ospsocket.h"#include "osphttp.h"#include "osplist.h"#include "ospmsgque.h"#include "ospsecurity.h"/* * monitors the message queue for new communication requests. * When a new request is placed on the message queue, the conditional variable * (CondVar) within the message queue structure is signalled and the number of * transactions (NumberOfTransactions) will be incremented. This function will * awaken and assign the request to an available HTTP connection. Upon the * handoff of the request, this function will return to the wait condition * until the next message queue request arrives. * * returns void (runs until the communication manager thread is terminated) */staticOSPTTHREADRETURNosppCommMonitor( void *ospvArg) /* In - Comm pointer casted to a void */{ OSPTCOMM **comm = OSPC_OSNULL; OSPTMSGQUEUE *msgqueue = OSPC_OSNULL; int errorcode = OSPC_ERR_NO_ERROR, tmperror = OSPC_ERR_NO_ERROR; OSPTBOOL shutdown = OSPC_FALSE, do_forever = OSPC_TRUE; comm = (OSPTCOMM **)ospvArg; msgqueue = (*comm)->MsgQueue; while (do_forever) { /* * acquire message queue mutex */ OSPM_MUTEX_LOCK(msgqueue->Mutex, errorcode); /* assert(errorcode == OSPC_ERR_NO_ERROR); assert is being handled already below */ if (errorcode == OSPC_ERR_NO_ERROR) { /* * wait for conditional variable on the transaction count */ while (msgqueue->NumberOfTransactions == 0 && ((*comm)->Flags & OSPC_COMM_SHUTDOWN_BIT) != OSPC_COMM_SHUTDOWN_BIT) { OSPM_CONDVAR_WAIT(msgqueue->CondVar, msgqueue->Mutex, errorcode); /* assert(errorcode == OSPC_ERR_NO_ERROR); */ } if (msgqueue->NumberOfTransactions > 0 && errorcode == OSPC_ERR_NO_ERROR) { /* * send the new msg request to the HTTP connection * module */ errorcode = OSPPHttpRequestHandoff((*comm), msgqueue); } else { /* * NewTransactionsAllowed == OSPC_FALSE * * The provider has been deleted. See if a Timelimit * has been specified to wait for pending transactions. * If Timelimit < 0; wait indefinitely * > 0; wait for specified seconds * == 0; do not wait. terminate all connections immediately */ OSPPCommShutdownConnections((*comm), (*comm)->ShutdownTimeLimit); shutdown = OSPC_TRUE; } /* * release the mutex lock */ OSPM_MUTEX_UNLOCK(msgqueue->Mutex, tmperror); /* assert(tmperror == OSPC_ERR_NO_ERROR); */ if (shutdown) break; } } /* * destroy the comm object. Make sure all http connections have been * freed. */ while ((*comm)->HttpConnCount) OSPM_SLEEP(2); OSPPCommDelete(comm);#ifdef _WIN32 return;#else return (OSPTTHREADRETURN)NULL;#endif}/* * Starts the communication worker thread. * This function will create the new communication thread responsible * for monitoring the message queue for new requests. * * return OSPD_ERR_NO_ERROR if successful, else an error code. */staticintosppCommStartWorker( OSPTCOMM **ospvComm) /* In - Pointer to the Comm Mgr */{ int errorcode = OSPC_ERR_NO_ERROR; OSPTTHRATTR thread_attr; /* * verify Provider, Comm, and MsgQueue pointers */ if (!(*ospvComm) || !(*ospvComm)->MsgQueue) { errorcode = OSPC_ERR_COMM_INVALID_ARG; OSPM_DBGERRORLOG(errorcode, "ospvComm/MsgQueue is null"); } else { if (errorcode == OSPC_ERR_NO_ERROR) { OSPM_THRATTR_INIT(thread_attr, errorcode); if (errorcode == OSPC_ERR_NO_ERROR) { OSPM_SETDETACHED_STATE(thread_attr, errorcode); if (errorcode == OSPC_ERR_NO_ERROR) { /* * create the new thread which will monitor the * the provider's message queue for new requests */ OSPM_CREATE_THREAD((*ospvComm)->ThreadId, &thread_attr, osppCommMonitor, ospvComm, errorcode); } OSPM_THRATTR_DESTROY(thread_attr); } } if (errorcode != OSPC_ERR_NO_ERROR) { errorcode = OSPC_ERR_COMM_THREAD_INIT_FAILED; OSPM_DBGERRORLOG(errorcode, "thread initialization failed"); } } return errorcode;}/* * create a new communication manager object. * This function creates the communication object which processes requests * to and from the TransNexus Service Points. * * return OSPC_ERR_NO_ERROR if successful, else error code */intOSPPCommNew( OSPTCOMM **ospvComm) /* In - Pointer to a Comm address */{ int errorcode = OSPC_ERR_NO_ERROR; if (*ospvComm != (OSPTCOMM *)OSPC_OSNULL) { errorcode = OSPC_ERR_COMM_INVALID_ARG; OSPM_DBGERRORLOG(errorcode, "ospvComm is not NULL"); } else { /* * create a new communication manager */ OSPM_MALLOC(*ospvComm, OSPTCOMM, sizeof(OSPTCOMM)); if (*ospvComm == (OSPTCOMM *)OSPC_OSNULL) { errorcode = OSPC_ERR_COMM_NO_MEMORY; OSPM_DBGERRORLOG(errorcode, "ospvComm malloc failed"); } else { OSPM_MEMSET(*ospvComm, 0, sizeof(OSPTCOMM)); /* * initialize the communication manager mutex */ OSPM_MUTEX_INIT((*ospvComm)->Mutex, 0, errorcode); /* assert(errorcode == OSPC_ERR_NO_ERROR); */ if (errorcode == OSPC_ERR_NO_ERROR) { /* * create the message queue to the Communication Manager */ errorcode = OSPPMsgQueueNew(&(*ospvComm)->MsgQueue); if (errorcode != OSPC_ERR_NO_ERROR) { OSPPCommDelete(ospvComm); } else { /* * start up the communication manager worker thread. This * thread will monitor the queue for new requests and hand * them off to an available HTTP connection pool for * fulfillment. */ errorcode = osppCommStartWorker(ospvComm); } } } } return errorcode;}voidOSPPCommSetSecurity( OSPTCOMM *ospvComm, OSPTSEC *ospvSecurity){ ospvComm->Security = ospvSecurity; return;}OSPTSEC * OSPPCommGetSecurity( OSPTCOMM *ospvComm){ return ospvComm->Security;}intOSPPCommGetPersistence( OSPTCOMM *ospvComm, unsigned *ospvPersistence){ int errorcode = OSPC_ERR_NO_ERROR; if (ospvComm == (OSPTCOMM *)OSPC_OSNULL) { errorcode = OSPC_ERR_COMM_INVALID_ARG; OSPM_DBGERRORLOG(errorcode, "ospvComm is NULL"); } else *ospvPersistence = ospvComm->HttpPersistence; return errorcode;}intOSPPCommGetRetryDelay( OSPTCOMM *ospvComm, unsigned *ospvRetryDelay){ int errorcode = OSPC_ERR_NO_ERROR; if (ospvComm == (OSPTCOMM *)OSPC_OSNULL) { errorcode = OSPC_ERR_COMM_INVALID_ARG; OSPM_DBGERRORLOG(errorcode, "ospvComm is NULL"); } else *ospvRetryDelay = ospvComm->HttpRetryDelay; return errorcode;}intOSPPCommGetRetryLimit( OSPTCOMM *ospvComm, unsigned *ospvRetryLimit){ int errorcode = OSPC_ERR_NO_ERROR; if (ospvComm == (OSPTCOMM *)OSPC_OSNULL) { errorcode = OSPC_ERR_COMM_INVALID_ARG; OSPM_DBGERRORLOG(errorcode, "ospvComm is NULL"); } else *ospvRetryLimit = ospvComm->HttpRetryLimit; return errorcode;}intOSPPCommGetTimeout( OSPTCOMM *ospvComm, unsigned *ospvTimeout){ int errorcode = OSPC_ERR_NO_ERROR; if (ospvComm == (OSPTCOMM *)OSPC_OSNULL) { errorcode = OSPC_ERR_COMM_INVALID_ARG; OSPM_DBGERRORLOG(errorcode, "ospvComm is NULL"); } else *ospvTimeout = ospvComm->HttpTimeout; return errorcode;}intOSPPCommSetPersistence( OSPTCOMM *ospvComm, unsigned ospvPersistence){ int errorcode = OSPC_ERR_NO_ERROR; if (ospvComm == (OSPTCOMM *)OSPC_OSNULL) { errorcode = OSPC_ERR_COMM_INVALID_ARG; OSPM_DBGERRORLOG(errorcode, "ospvComm is NULL"); } else ospvComm->HttpPersistence = ospvPersistence; return errorcode;}intOSPPCommSetRetryDelay( OSPTCOMM *ospvComm, unsigned ospvRetryDelay){ int errorcode = OSPC_ERR_NO_ERROR; if (ospvComm == (OSPTCOMM *)OSPC_OSNULL) { errorcode = OSPC_ERR_COMM_INVALID_ARG; OSPM_DBGERRORLOG(errorcode, "ospvComm is NULL"); } else ospvComm->HttpRetryDelay = ospvRetryDelay; return errorcode;}intOSPPCommSetRetryLimit( OSPTCOMM *ospvComm, unsigned ospvRetryLimit){ int errorcode = OSPC_ERR_NO_ERROR; if (ospvComm == (OSPTCOMM *)OSPC_OSNULL) { errorcode = OSPC_ERR_COMM_INVALID_ARG; OSPM_DBGERRORLOG(errorcode, "ospvComm is NULL"); } else ospvComm->HttpRetryLimit = ospvRetryLimit; return errorcode;}intOSPPCommSetTimeout( OSPTCOMM *ospvComm, unsigned ospvTimeout){ int errorcode = OSPC_ERR_NO_ERROR; if (ospvComm == (OSPTCOMM *)OSPC_OSNULL) { errorcode = OSPC_ERR_COMM_INVALID_ARG; OSPM_DBGERRORLOG(errorcode, "ospvComm is NULL"); } else ospvComm->HttpTimeout = ospvTimeout; return errorcode;}intOSPPCommGetMaxConnections( OSPTCOMM *ospvComm, unsigned *ospvMaxConnections){ int errorcode = OSPC_ERR_NO_ERROR; if (ospvComm == (OSPTCOMM *)OSPC_OSNULL) { errorcode = OSPC_ERR_COMM_INVALID_ARG; OSPM_DBGERRORLOG(errorcode, "ospvComm is NULL"); } else *ospvMaxConnections = ospvComm->HttpMaxConnections; return errorcode;}intOSPPCommSetMaxConnections( OSPTCOMM *ospvComm, unsigned ospvMaxConnections){ int errorcode = OSPC_ERR_NO_ERROR; if (ospvComm == (OSPTCOMM *)OSPC_OSNULL) { errorcode = OSPC_ERR_COMM_INVALID_ARG; OSPM_DBGERRORLOG(errorcode, "ospvComm is NULL"); } else ospvComm->HttpMaxConnections = ospvMaxConnections; return errorcode;}intOSPPCommIncrementHttpConnCount( OSPTCOMM *ospvComm){ int errorcode = OSPC_ERR_NO_ERROR; if (ospvComm == (OSPTCOMM *)OSPC_OSNULL) { errorcode = OSPC_ERR_COMM_INVALID_ARG; OSPM_DBGERRORLOG(errorcode, "ospvComm is NULL"); } else ospvComm->HttpConnCount++; return errorcode;}intOSPPCommDecrementHttpConnCount( OSPTCOMM *ospvComm){ int errorcode = OSPC_ERR_NO_ERROR; if (ospvComm == (OSPTCOMM *)OSPC_OSNULL) { errorcode = OSPC_ERR_COMM_INVALID_ARG; OSPM_DBGERRORLOG(errorcode, "ospvComm is NULL"); } else ospvComm->HttpConnCount--; return errorcode;}intOSPPCommGetHttpConnCount( OSPTCOMM *ospvComm, unsigned *ospvHttpConnCount){ int errorcode = OSPC_ERR_NO_ERROR; if (ospvComm == (OSPTCOMM *)OSPC_OSNULL) { errorcode = OSPC_ERR_COMM_INVALID_ARG; OSPM_DBGERRORLOG(errorcode, "ospvComm is NULL"); } else *ospvHttpConnCount = ospvComm->HttpConnCount; return errorcode;}voidOSPPCommSetShutdown( OSPTCOMM **ospvComm, int ospvTimeLimit){ int errorcode = OSPC_ERR_NO_ERROR; (*ospvComm)->Flags |= OSPC_COMM_SHUTDOWN_BIT; (*ospvComm)->ShutdownTimeLimit = ospvTimeLimit; OSPM_CONDVAR_SIGNAL(OSPPMsgQueueGetCondVar((*ospvComm)->MsgQueue), errorcode); while (*ospvComm != (OSPTCOMM *)OSPC_OSNULL) OSPM_SLEEP(1); return;}voidOSPPCommSignalAllConnections( OSPTCOMM *ospvComm){ OSPTHTTP *httpconn = OSPC_OSNULL; int errorcode = OSPC_ERR_NO_ERROR; if (ospvComm != (OSPTCOMM *)OSPC_OSNULL) { httpconn = (OSPTHTTP *)OSPPListFirst( (OSPTLIST *)&(ospvComm->HttpConnList)); } if (httpconn != (OSPTHTTP *)OSPC_OSNULL) { do { OSPM_CONDVAR_SIGNAL(httpconn->CondVar, errorcode); if (!OSPPListEmpty((OSPTLIST *)&(ospvComm->HttpConnList))) { httpconn = (OSPTHTTP *)OSPPListNext( (OSPTLIST *)&(ospvComm->HttpConnList), (void *)httpconn); } else { httpconn = OSPC_OSNULL; } } while (httpconn != (OSPTHTTP *)OSPC_OSNULL); } return;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -