📄 staflocalipcconnprovider.cpp
字号:
if (thisOption == "IPCNAME") { lipcData.ipcName = (STAFString(lipcData.ipcName, STAFString::kShallow) + cpInfo->optionValues[i]).adoptImpl(); } else { if (errorBuffer) { *errorBuffer = STAFString(cpInfo->optionNames[i]).adoptImpl(); } return kSTAFInvalidValue; } } // Add each option to a map. lipcData.options = STAFObject::createMap(); lipcData.options->put("IPCName", STAFString(lipcData.ipcName)); // Setup property values lipcData.portProperty = STAFString(); lipcData.isSecureProperty = STAFString("0"); // Assign logical and physical identifiers lipcData.logicalNetworkID = STAFString("local"); lipcData.physicalNetworkID = STAFString("local"); *provider = new STAFLocalConnectionProviderImpl(lipcData); STAFRC_t rc = STAFSocketInit(errorBuffer); if (rc != kSTAFOk) return rc; *provider = new STAFLocalConnectionProviderImpl(lipcData); return kSTAFOk; } CATCH_STANDARD("STAFConnectionProviderConstruct"); return kSTAFUnknownError;}#define HANDLE_START_ERROR(string, function) \{\ STAFString theError = STAFString(string) + STAFString(", " function \ " RC=") + \ STAFString(STAFSocketGetLastError()); \\ if (errorBuffer) *errorBuffer = theError.adoptImpl();\ return kSTAFBaseOSError;\}STAFRC_t STAFConnectionProviderStart(STAFConnectionProvider_t baseProvider, void *startInfo, unsigned int startInfoLevel, STAFString_t *errorBuffer){ if (baseProvider == 0) return kSTAFInvalidObject; if (startInfoLevel != 1) return kSTAFInvalidAPILevel; STAFConnectionProviderStartInfoLevel1 *cpInfo = reinterpret_cast<STAFConnectionProviderStartInfoLevel1 *>(startInfo); try { if (cpInfo->newConnectionFunc == 0) return kSTAFInvalidValue; STAFLocalConnectionProviderImpl *provider = static_cast<STAFLocalConnectionProviderImpl *>(baseProvider); provider->connFunc = cpInfo->newConnectionFunc; provider->data = cpInfo->data; provider->serverSocket = socket(PF_UNIX, SOCK_STREAM, 0); if (!STAFUtilIsValidSocket(provider->serverSocket)) HANDLE_START_ERROR("No socket available", "socket()"); // Set the socket to be non-inheritable to ensure that the file // descriptor is closed when we fork() and exec(). unsigned int osRC = 0; STAFSocket_t newSocket; if (STAFUtilGetNonInheritableSocket(provider->serverSocket, &newSocket, &osRC)) { HANDLE_START_ERROR("Error getting non-inheritable server socket", "STAFUtilGetNonInheritableSocket()"); } provider->serverSocket = newSocket; struct sockaddr_un serverAddress = { 0 }; serverAddress.sun_family = AF_UNIX; strcpy(serverAddress.sun_path, "/tmp/STAFIPC_"); strcat(serverAddress.sun_path, STAFString(provider->ipcName).toCurrentCodePage()->buffer()); // XXX: sun_len is not available on Linux, what to do? // serverAddress.sun_len = strlen(serverAddress.sun_path) + 1; // We attempt to remove the socket path in case it was left around // from a previous invocation unlink(serverAddress.sun_path); int bindRC = bind(provider->serverSocket, reinterpret_cast<struct sockaddr *>(&serverAddress), sizeof(serverAddress)); if (bindRC != 0) HANDLE_START_ERROR("Error binding server socket", "bind()"); int listenRC = listen(provider->serverSocket, SOMAXCONN); if (listenRC != 0) HANDLE_START_ERROR("Error listening on server socket", "listen()"); // Ok, the provider is now ready provider->syncSem->reset(); provider->state = kSTAFConnectionProviderActive; provider->threadManager->dispatch(STAFTCPRunThread, provider); // XXX: Might want to time out on this one. provider->syncSem->wait(); return kSTAFOk; } CATCH_STANDARD("STAFConnectionProviderStart"); return kSTAFUnknownError;}STAFRC_t STAFConnectionProviderStop(STAFConnectionProvider_t baseProvider, void *stopInfo, unsigned int stopInfoLevel, STAFString_t *errorBuffer){ if (baseProvider == 0) return kSTAFInvalidObject; if (stopInfoLevel != 0) return kSTAFInvalidAPILevel; try { STAFLocalConnectionProviderImpl *provider = static_cast<STAFLocalConnectionProviderImpl *>(baseProvider); provider->state = kSTAFConnectionProviderStopped; provider->syncSem->reset(); // Wake up the run thread and release resources acquired when // starting provider STAFSocketClose(provider->serverSocket); if (provider->syncSem->wait(10000) != 0) { STAFTrace::trace( kSTAFTraceError, STAFString("STAFLocalIPCConnectionProviderStop - Timed out " "waiting for run thread to wake up")); } return kSTAFOk; } CATCH_STANDARD("STAFConnectionProviderStop"); return kSTAFUnknownError;}STAFRC_t STAFConnectionProviderDestruct(STAFConnectionProvider_t *baseProvider, void *destructInfo, unsigned int destructInfoLevel, STAFString_t *errorBuffer){ if (baseProvider == 0) return kSTAFInvalidObject; if (*baseProvider == 0) return kSTAFInvalidObject; if (destructInfoLevel != 0) return kSTAFInvalidAPILevel; try { STAFLocalConnectionProviderImpl *provider = static_cast<STAFLocalConnectionProviderImpl *>(*baseProvider); STAFMutexSemLock lock(sActiveProvidersSem); sActiveProviders.erase(provider); if (provider->state != kSTAFConnectionProviderStopped) { provider->state = kSTAFConnectionProviderStopped; // XXX: Should we check the RC? STAFSocketClose(provider->serverSocket); } delete provider; provider = 0; return kSTAFOk; } CATCH_STANDARD("STAFConnectionProviderDestruct"); return kSTAFUnknownError;}STAFRC_t STAFConnectionProviderConnect(STAFConnectionProvider_t baseProvider, STAFConnection_t *connection, void *connectInfo, unsigned int connectInfoLevel, STAFString_t *errorBuffer){ if (baseProvider == 0) return kSTAFInvalidObject; if (connectInfoLevel != 1) return kSTAFInvalidAPILevel; if (connection == 0) return kSTAFInvalidParm; STAFConnectionProviderConnectInfoLevel1 *cpInfo = reinterpret_cast<STAFConnectionProviderConnectInfoLevel1 *>(connectInfo); try { STAFLocalConnectionProviderImpl *provider = static_cast<STAFLocalConnectionProviderImpl *>(baseProvider); STAFLocalConnectionImpl connImpl; STAFString host = cpInfo->endpoint; struct sockaddr_un serverAddress = { 0 }; serverAddress.sun_family = AF_UNIX; strcpy(serverAddress.sun_path, "/tmp/STAFIPC_"); strcat(serverAddress.sun_path, STAFString(provider->ipcName).toCurrentCodePage()->buffer()); // XXX: Linux doesn't support sun_len. What to do? // serverAddress.sun_len = strlen(serverAddress.sun_path) + 1; connImpl.clientSocket = socket(PF_UNIX, SOCK_STREAM, 0); if (!STAFUtilIsValidSocket(connImpl.clientSocket)) { STAFString error = STAFString("Error creating socket: socket() " "RC=") + STAFSocketGetLastError(); if (errorBuffer) *errorBuffer = error.adoptImpl(); return kSTAFCommunicationError; } // Set the socket to be non-inheritable unsigned int osRC = 0; STAFSocket_t newSocket; if (STAFUtilGetNonInheritableSocket(connImpl.clientSocket, &newSocket, &osRC)) { STAFString error = STAFString( "Error getting non-inheritable socket, " "STAFUtilGetNonInheritableSocket(), OS RC: ") + STAFString(osRC); if (errorBuffer) { error += STAFString(*errorBuffer, STAFString::kShallow); *errorBuffer = error.adoptImpl(); } STAFSocketClose(connImpl.clientSocket); return kSTAFCommunicationError; } connImpl.clientSocket = newSocket; int modeRC = STAFSocketSetBlockingMode(connImpl.clientSocket, kSTAFSocketNonBlocking, errorBuffer); if (modeRC != kSTAFOk) { STAFString error = STAFString("Error setting socket to non-blocking " "mode:"); if (errorBuffer) { error += STAFString(*errorBuffer, STAFString::kShallow); *errorBuffer = error.adoptImpl(); } STAFSocketClose(connImpl.clientSocket); return kSTAFCommunicationError; } int connectRC = connect( connImpl.clientSocket, reinterpret_cast<struct sockaddr *>(&serverAddress), sizeof(serverAddress)); if ((connectRC < 0) && (STAFSocketGetLastError() == ECONNREFUSED)) { // On Solaris Opteron the connect request intermittently (usually // under heavy load) will return ECONNREFUSED. In this case we // will retry the connect request up to 5 times (sleeping 100ms // between every request). int maxRetry = 5; for (int i = 0; i < maxRetry; i++) { if ((connectRC < 0) && (STAFSocketGetLastError() == ECONNREFUSED)) { STAFSocketClose(connImpl.clientSocket); connImpl.clientSocket = socket(PF_UNIX, SOCK_STREAM, 0); modeRC = STAFSocketSetBlockingMode(connImpl.clientSocket, kSTAFSocketNonBlocking, errorBuffer); STAFThreadManager::sleepCurrentThread(100); connectRC = connect( connImpl.clientSocket, reinterpret_cast<struct sockaddr *>(&serverAddress), sizeof(serverAddress)); } else { break; } } } if ((connectRC < 0) && (STAFSocketGetLastError() != SOCEINPROGRESS) && (STAFSocketGetLastError() != SOCEWOULDBLOCK) && (STAFSocketGetLastError() != 0)) { STAFString error = STAFString("Error connecting to endpoint: " "connect() RC=") + STAFSocketGetLastError(); if (errorBuffer) *errorBuffer = error.adoptImpl(); STAFSocketClose(connImpl.clientSocket); return kSTAFCommunicationError; } // XXX: this timeout needs to be passed somehow unsigned int timeout = 5000; if (connectRC < 0) { fd_set writeSocks; FD_ZERO(&writeSocks); FD_SET(connImpl.clientSocket, &writeSocks); timeval theTimeout = { timeout / 1000, (timeout % 1000) * 1000 }; int selectRC = select(connImpl.clientSocket + 1, 0, &writeSocks, 0, (timeout == 0) ? 0 : &theTimeout); if (selectRC < 0) { STAFString error = STAFString("Error connecting to endpoint: " "select() RC=") + STAFSocketGetLastError(); if (errorBuffer) *errorBuffer = error.adoptImpl(); STAFSocketClose(connImpl.clientSocket); return kSTAFCommunicationError; } else if (selectRC == 0) { STAFString error = STAFString("Timed out connecting to " "endpoint: select() timeout");
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -