📄 ctpmanager.cpp
字号:
if (ctpSocket) {
ret = closesocket(ctpSocket);
WSACleanup();
ctpSocket = NULL;
LOG.debug("Socket connection closed");
}
return ret;
}
int CTPManager::sendAuthMsg(){
// Fill CTPMessage members
LOG.debug("Creating AUTH msg...");
CTPMessage authMsg;
authMsg.setGenericCommand(CM_AUTH);
authMsg.setProtocolVersion(CTP_PROTOCOL_VERSION);
// Fill parameters (read values from config)
CTPParam devId;
devId.setParamCode(P_DEVID);
devId.setValue(config.getDeviceId().c_str(), config.getDeviceId().size());
authMsg.addParam(&devId);
CTPParam username;
username.setParamCode(P_USERNAME);
username.setValue(config.getUsername().c_str(), config.getUsername().size());
authMsg.addParam(&username);
CTPParam cred;
cred.setParamCode(P_CRED);
// Create credentials from config props
string credentials = createMD5Credentials();
cred.setValue(credentials.c_str(), credentials.size());
authMsg.addParam(&cred);
string& fromValue = config.getUrlFrom();
if (fromValue.size() > 0) {
// FROM is used only after a JUMP status
CTPParam from;
from.setParamCode(P_FROM);
from.setValue(fromValue.c_str(), fromValue.size());
authMsg.addParam(&from);
}
// Send message
return sendMsg(&authMsg);
}
int CTPManager::sendReadyMsg() {
// Fill CTPMessage members
CTPMessage readyMsg;
readyMsg.setGenericCommand(CM_READY);
readyMsg.setProtocolVersion(CTP_PROTOCOL_VERSION);
// Send message
return sendMsg(&readyMsg);
}
int CTPManager::sendByeMsg(){
// Fill CTPMessage members
CTPMessage byeMsg;
byeMsg.setGenericCommand(CM_BYE);
byeMsg.setProtocolVersion(CTP_PROTOCOL_VERSION);
// Send message
return sendMsg(&byeMsg);
}
int CTPManager::sendMsg(CTPMessage* message) {
if (!message) {
return 1;
}
int ret = 0;
char* msg = message->toByte();
int msgLength = message->getPackageLength();
if (!ctpSocket) {
LOG.error("sendMsg error: socket not initialized.");
return 2;
}
////
char* tmp = new char[msgLength*3 + 3];
tmp[0] = '[';
int pos = 1;
for (int i=0; i<msgLength; i++) {
sprintf(&tmp[pos], "%02x ", msg[i]);
pos += 3;
}
tmp[pos-1] = ']';
tmp[pos] = 0;
LOG.debug("Sending %d bytes: %s", msgLength, tmp);
delete [] tmp;
////
ret = send(ctpSocket, msg, msgLength, 0);
if (ret == SOCKET_ERROR) {
DWORD errorCode = WSAGetLastError();
LOG.error("CTPManager::sendMsg - send() error %d: %s", errorCode, createErrorMsg(errorCode));
return errorCode;
}
else {
LOG.debug("sendMsg - %d bytes sent", ret);
}
return 0;
}
CTPMessage* CTPManager::receiveStatusMsg(){
char buffer[MAX_MESSAGE_SIZE], msg[MAX_MESSAGE_SIZE];
DWORD errorCode = 0;
int totalBytes = 0;
int expectedLength = 0;
if (receivedMsg) {
delete receivedMsg;
receivedMsg = NULL;
}
//
// Receive socket message: could be split into more pkg
//
while (1) {
LOG.info("Waiting for Server message...");
int pkgLen = recv(ctpSocket, buffer, sizeof(buffer), 0);
if (pkgLen == SOCKET_ERROR) {
errorCode = WSAGetLastError();
if (errorCode == WSAETIMEDOUT) {
// recv timed out -> retry
LOG.debug("Timeout error on recv() -> retry...");
totalBytes = 0;
continue;
}
else {
// Socket error -> exit
LOG.error("SOCKET recv() error %d: %s", errorCode, createErrorMsg(errorCode));
goto finally;
}
}
else if (pkgLen == 0) {
LOG.debug("Socket connection closed by Server, exiting");
goto finally;
}
else {
if (totalBytes == 0) { // first time
expectedLength = extractMsgLength(buffer, pkgLen);
if (!expectedLength) { goto finally; }
expectedLength += 2; // the first 2 bytes are the msg length
}
LOG.debug("Package received: %d bytes read (total = %d, expected = %d)", pkgLen, totalBytes+pkgLen, expectedLength);
// Check if msg too big
if (totalBytes+pkgLen >= MAX_MESSAGE_SIZE) {
LOG.error("Message larger than %d bytes!", MAX_MESSAGE_SIZE);
goto finally;
}
// Append bytes to the 'msg' array
memcpy(&msg[totalBytes], buffer, pkgLen);
totalBytes += pkgLen;
// Check if msg is complete
if (totalBytes < expectedLength) {
LOG.debug("Message incomplete -> back to receive");
continue;
}
else {
LOG.debug("Message complete");
break;
}
}
}
// Parse the message, receivedMsg is internally owned
receivedMsg = new CTPMessage(msg, totalBytes);
LOG.debug("status = 0x%02x", receivedMsg->getGenericCommand());
finally:
return receivedMsg;
}
int CTPManager::receive() {
// Safe checks...
if (!ctpSocket) {
LOG.error("CTPManager::receive() error: no socket connection available");
return -3;
}
if (stopThread(receiveThread, 1)) {
LOG.debug("receiveThread killed");
}
if (stopThread(heartbeatThread, 1)) {
LOG.debug("heartbeatThread killed");
}
//
// Start thread to send 'ready' messages
//
heartbeatThread = CreateThread(NULL, 0, heartbeatWorker, (LPVOID*)ctpSocket, 0, NULL);
if (!heartbeatThread) {
LOG.error("Error creating heartbeat thread: code 0x%08x", GetLastError());
return -2;
}
//
// Start thread to receive messages from Server
//
receiveThread = CreateThread(NULL, 0, receiveWorker, (LPVOID*)ctpSocket, 0, NULL);
if (!receiveThread) {
LOG.error("Error creating receive thread: code 0x%08x", GetLastError());
return -1;
}
//
// Wait for receiveThread: it ends only in case of errors.
// Use ctpConnTimeout as timeout on this thread.
//
int ret = 0;
DWORD timeout = getConfig()->getCtpConnTimeout();
timeout *= 1000;
if (timeout == 0) {
timeout = INFINITE;
}
LOG.debug("Waiting for the receive thread to finish (timeout = %d sec)...", getConfig()->getCtpConnTimeout());
DWORD waitResult = WaitForSingleObject(receiveThread, timeout);
switch (waitResult) {
// Thread exited: socket or Server error occurred -> out
case WAIT_ABANDONED:
LOG.debug("receiveThread abandoned");
case WAIT_OBJECT_0: {
DWORD exitcode = 0;
GetExitCodeThread(receiveThread, &exitcode);
LOG.debug("receiveThread ended with code %d", exitcode);
ret = 0;
break;
}
// Timeout: kill thread -> out.
case WAIT_TIMEOUT: {
LOG.debug("Timeout - receiveThread will now be terminated");
TerminateThread(receiveThread, 0);
ret = 1;
break;
}
// Some error occurred (case WAIT_FAILED)
default: {
LOG.debug("Wait error on receiveThread");
TerminateThread(receiveThread, 1);
ret = 2;
break;
}
}
CloseHandle(receiveThread);
receiveThread = NULL;
// Also terminate the heartbeatThread
if (stopThread(heartbeatThread)) {
LOG.debug("heartbeatThread killed");
}
return ret;
}
bool CTPManager::stopThread(HANDLE thread, DWORD exitcode) {
if (thread) {
TerminateThread(thread, exitcode);
CloseHandle(thread);
ctpThread = NULL;
return true;
}
return false;
}
string CTPManager::createMD5Credentials() {
string ret;
char* credential = NULL;
const char* username = config.getAccessConfig().getUsername();
const char* password = config.getAccessConfig().getPassword();
string clientNonce = config.getCtpNonce();
credential = MD5CredentialData(username, password, clientNonce.c_str());
if (credential) {
ret = credential;
delete [] credential;
}
return ret;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -