📄 worker.cpp
字号:
int ret = 0;
HANDLE stpThread = (HANDLE)lpv;
//
// Wait for STPThread to finish.
// - if handle is NULL, no STP has been done -> go with CTP.
// - if STPThread returned error -> go with CTP.
// - if STPThread returned ok -> exit now (STP is running).
//
if (stpThread) {
LOG.debug("Waiting for STP notification sync to finish (timeout = %d sec)...", MAX_ADDRESSCHANGE_TIME);
DWORD waitResult = WaitForSingleObject(stpThread, MAX_ADDRESSCHANGE_TIME * 1000);
switch (waitResult) {
// Thread exited, get exit code.
case WAIT_ABANDONED:
LOG.debug("STPThread abandoned");
case WAIT_OBJECT_0: {
DWORD exitcode = 0;
GetExitCodeThread(stpThread, &exitcode);
LOG.debug("STPThread ended with code %d", exitcode);
if (exitcode == 0) {
// STP is running correctly: close CTP if active
CloseHandle(stpThread);
CTPManager* ctpManager = CTPManager::getInstance();
ctpManager->stopCTP();
return 1;
}
else {
// STP failed, go with CTP
LOG.debug("STP failed, go with CTP");
break;
}
}
// Thread is still running after timeout -> kill it and continue.
case WAIT_TIMEOUT: {
LOG.debug("Timeout - STPThread is running, closing CTP if active");
CTPManager* ctpManager = CTPManager::getInstance();
ctpManager->stopCTP();
return 1;
}
// Some error occurred (case WAIT_FAILED)
default: {
LOG.debug("Wait error on STPThread");
break;
}
}
}
CloseHandle(stpThread);
// Start the CTP connection process.
// Get the unique instance of CTPManager.
CTPManager* ctpManager = CTPManager::getInstance();
//
// Infinite cycle: always restore the connection if it's lost
// or in case of errors. Exit only if leavingState flag is up.
//
bool restore = false;
bool jump = false;
while (ctpManager->getConfig()->isLeavingState() == false) {
if (restore) {
// Restoring from a broken connection: close socket and wait some seconds.
LOG.debug("Restoring CTP connection...");
ctpManager->closeConnection();
int sleepTime = ctpManager->getConfig()->getCtpRetry();
sleepTime = min(sleepTime, ctpManager->getConfig()->getMaxCtpRetry());
LOG.debug("Sleep for %d seconds...", sleepTime);
Sleep(sleepTime * 1000);
// Save the new value to config
sleepTime *= CTP_RETRY_INCREASE_FACTOR; // Double the retry time
ctpManager->getConfig()->setCtpRetry(sleepTime);
restore = false;
}
if (jump) {
// Restoring from a JUMP status: close socket and reconnect immediately.
LOG.debug("Restoring CTP connection from a JUMP...");
ctpManager->closeConnection();
jump = false;
}
//
// Open socket connection
// ----------------------
LOG.debug("Open CTP connection...");
if (ctpManager->openConnection()) {
ctpManager->closeConnection();
ret = 1;
goto finally;
}
//
// Authentication
// --------------
LOG.debug("Sending [AUTH] message...");
if (ctpManager->sendAuthMsg()) {
ctpManager->closeConnection();
ret = 2;
goto finally;
}
// Receiving AUTH status message
CTPMessage* authStatusMsg = ctpManager->receiveStatusMsg();
char authStatus = authStatusMsg->getGenericCommand();
list<CTPParam>::iterator param;
char* buf = NULL;
switch (authStatus) {
case ST_UNAUTHORIZED:
//
// Retry with new nonce received
//
LOG.info("Client not authenticated: retry with new nonce");
buf = extractNonceParam(authStatusMsg);
if (buf) {
// Save new nonce to config, and save config to registry!
ctpManager->getConfig()->setCtpNonce(buf);
ctpManager->getConfig()->saveCTPConfig();
delete [] buf;
}
else {
// NONCE not found -> restore connection
LOG.error("Error receiving UNAUTHORIZED Status message: NONCE param is missing");
restore = true;
continue;
}
// Send 2nd auth msg
LOG.info("Sending CTP authentication message...");
if (ctpManager->sendAuthMsg()) {
ret = 2;
goto finally;
}
// Check 2nd status received, only OK allowed
authStatusMsg = ctpManager->receiveStatusMsg();
authStatus = authStatusMsg->getGenericCommand();
if (authStatus == ST_OK) {
// *** Authentication OK! ***
// Save nonce if any (go to case ST_OK)
}
else {
LOG.info("CTP error: Client not authenticated. Please check your credentials.");
ret = 3;
goto finally;
}
// no 'break': need to enter into case ST_OK...
case ST_OK:
// *** Authentication OK! ***
LOG.info("Client authenticated succesfully!");
// Save nonce if any
buf = extractNonceParam(authStatusMsg);
if (buf) {
// Save new nonce to config, and save config to registry!
ctpManager->getConfig()->setCtpNonce(buf);
ctpManager->getConfig()->saveCTPConfig();
delete [] buf;
}
else {
LOG.info("No new nonce received.");
}
break;
case ST_JUMP:
//
// Jump to desired server 'to' and save the 'from' value
//
LOG.info("Server requested a JUMP");
if (authStatusMsg->params.size() < 1) {
// Expected FROM and TO params -> restore connection
LOG.error("Error receiving JUMP Status message: some parameter is missing");
restore = true;
continue;
}
// Read FROM and TO parameters and update CTPConfig
param = authStatusMsg->params.begin();
while (param != authStatusMsg->params.end()) {
int valueLen = (*param).getValueLength();
void* value = (*param).getValue();
if ((*param).getParamCode() == P_FROM) {
char* from = stringdup((char*)value, valueLen);
ctpManager->getConfig()->setUrlFrom(from);
delete [] from;
}
else if ((*param).getParamCode() == P_TO) {
char* url = stringdup((char*)value, valueLen);
string to = ctpManager->getConfig()->getHostName(url);
int port = ctpManager->getConfig()->getHostPort(url);
ctpManager->getConfig()->setUrlTo(to);
ctpManager->getConfig()->setCtpPort(port);
delete [] url;
}
else {
// Unexpected Status param -> restore connection
LOG.error("Error receiving JUMP Status message: unexpected param '0x%02x'", (*param).getParamCode());
restore = true;
continue;
}
param++;
}
// Now restore the socket connection to the new Server address...
LOG.debug("JUMP status received: FROM %s TO %s:%d", ctpManager->getConfig()->getUrlFrom().c_str(),
ctpManager->getConfig()->getUrlTo().c_str(),
ctpManager->getConfig()->getCtpPort() );
jump = true;
continue;
case ST_FORBIDDEN:
// Forbidden authentication -> exit thread
LOG.info("Authentication forbidden by the Server, please check your credentials.");
ret = 4;
goto finally;
case ST_ERROR:
// Error -> restore connection
LOG.info("Received ERROR status from Server: restore ctp connection");
restore = true;
continue;
default:
// Unexpected status -> restore connection
LOG.error("Unexpected status received '0x%02x' -> restore ctp connection", authStatus);
restore = true;
continue;
}
//
// Creates the thread that will be stuck waiting for server msg
// and waits until it ends (errors or ctpConnTimeout).
//
ctpManager->receive();
// If here, connection was broken or ctpConnTimeout -> restore connection
restore = true;
}
finally:
LOG.debug("Exiting ctpWorker thread");
return ret;
}
DWORD WINAPI receiveWorker(LPVOID lpv) {
LOG.debug("Starting receiveWorker thread");
int ret = 0;
CTPManager* ctpManager = CTPManager::getInstance();
// Keep the socket open, always in 'receive' state.
// Exit only in case of errors or if we're in leaving state.
while (ctpManager->getConfig()->isLeavingState() == false) {
// Receive msg from Server
CTPMessage* statusMsg = ctpManager->receiveStatusMsg();
if (!statusMsg) {
// Error on receiving -> exit thread
ret = -1;
goto finally;
}
char status = statusMsg->getGenericCommand();
SyncNotification* sn = NULL;
switch (status) {
case ST_OK:
// 'OK' to our 'Ready' command -> back to recv
LOG.info("[OK] received -> back to receive state");
break;
case ST_SYNC:
//
// Start the sync!
// ---------------
LOG.info("[SYNC] notification received! Starting the sync");
sn = statusMsg->getSyncNotification();
startSyncFromSAN(sn);
// Back to recv
LOG.debug("Back to receive state");
break;
case ST_ERROR:
LOG.debug("[ERROR] message received");
default:
// Error from server -> exit thread (will try restoring the socket from scratch)
LOG.debug("Bad status received (code 0x%02x), exiting thread", status);
ret = -2;
goto finally;
}
}
finally:
LOG.debug("Exiting receiveWorker thread");
return ret;
}
/**
* Thread used to send 'ready' messages as a heartbeat.
* It never ends, so it must be killed by the caller to stop the CTP.
*/
DWORD WINAPI heartbeatWorker(LPVOID lpv) {
LOG.debug("Starting heartbeatWorker thread");
// Load the sleep interval (ctpReady)
CTPManager* ctpManager = CTPManager::getInstance();
int sleepInterval = ctpManager->getConfig()->getCtpReady();
// Send 'ready' message to Server and sleep ctpReady seconds
while (ctpManager->getConfig()->isLeavingState() == false) {
ctpManager->sendReadyMsg();
Sleep(sleepInterval * 1000);
}
LOG.debug("Exiting heartbeatWorker thread");
return 0;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -