worker.cpp

来自「funambol window mobile客户端源代码」· C++ 代码 · 共 1,085 行 · 第 1/3 页

CPP
1,085
字号
/*
 * Funambol is a mobile platform developed by Funambol, Inc. 
 * Copyright (C) 2003 - 2007 Funambol, Inc.
 * 
 * This program is free software; you can redistribute it and/or modify it under
 * the terms of the GNU Affero General Public License version 3 as published by
 * the Free Software Foundation with the addition of the following permission
 * added to Section 15 as permitted in Section 7(a): FOR ANY PART OF THE COVERED
 * WORK IN WHICH THE COPYRIGHT IS OWNED BY FUNAMBOL, FUNAMBOL DISCLAIMS THE
 * WARRANTY OF NON INFRINGEMENT  OF THIRD PARTY RIGHTS.
 * 
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more 
 * details.
 * 
 * You should have received a copy of the GNU Affero General Public License
 * along with this program; if not, see http://www.gnu.org/licenses or write to
 * the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 * MA 02110-1301 USA.
 * 
 * You can contact Funambol, Inc. headquarters at 643 Bair Island Road, Suite
 * 305, Redwood City, CA 94063, USA, or at email address info@funambol.com.
 * 
 * The interactive user interfaces in modified source and object code versions
 * of this program must display Appropriate Legal Notices, as required under
 * Section 5 of the GNU Affero General Public License version 3.
 * 
 * In accordance with Section 7(b) of the GNU Affero General Public License
 * version 3, these Appropriate Legal Notices must retain the display of the
 * "Powered by Funambol" logo. If the display of the logo is not reasonably
 * feasible for technical reasons, the Appropriate Legal Notices must display
 * the words "Powered by Funambol".
 */

#include <stdlib.h>
#include "worker.h"
#include "s4jproxy.h"
#include "base/util/StringBuffer.h"
#include "base/util/utils.h"
#include "notify/util.h"
#include "pim/maincpp.h"
#include "http/GPRSConnection.h"
#include "notify/addresschange.h"
#include "processutils.h"
#include "CTPManager.h"
#include "CTPParam.h"
#include "pim/ClientSettings.h"
#include <Pmpolicy.h>
#include <Pm.h>

// Worker thread
extern "C" DWORD WINAPI NotifWorker(LPVOID lpv);

static bool syncinprogress = false;

// Handle of the worker thread
static HANDLE handle = 0;
// Connection socket
static SOCKET sock = 0;
static S4JProxy* s4jp = 0;
// Message buffer
static char *message = 0;
static int msglen = 0;
// Thread start time
time_t startTime;

#define CHECK_INTERNET_CONNECTION_TIME          240  // check GPRS every CHECK_TIME seconds (4min)
static HANDLE IConnectionThread = 0;


/**
* Concatenate a char buf with specified len to the message buffer
*/
static void addBuffer(const char *buf, int len);

/**
* Close the communication socket client / server. Remove gracefully
* all the buffers and classes involved in the listening process.
*/
static void closeWorker();

/**
* Return the message
*/
const char *getMsg() { return message; }

/**
* Return the message lenght
*/
int getLen() { return msglen; }


/**
* Get the current time as Unix time_t
*/
static time_t getTime()
{
    SYSTEMTIME st;
    FILETIME ft;

    GetSystemTime(&st);
    SystemTimeToFileTime(&st, &ft);
    // Convert FILETIME to time_t
    __int64 llTmp;
    memcpy (&llTmp, &ft, sizeof (__int64));
    llTmp = (llTmp - 116444736000000000) / 10000000;
    return (time_t) llTmp;
}


/**
* Create the thread on the socket client / server waiting for the server
* messages
*/
DWORD startWorker(SOCKET s)
{
    // Check worker
    if (handle) {
        LOG.debug("Another worker handle: %x\n", handle);
        DWORD exitcode = 0;
        if(GetExitCodeThread(handle, &exitcode)) {
            if (exitcode == STILL_ACTIVE) {
                LOG.debug("Worker thread still active");
                if (syncinprogress) {
                    LOG.debug("The sync is in progress");
                    LOG.debug("S4ND: connection busy");
                    return ERROR_BUSY;
                }
                else if( (startTime - getTime()) <  ASK_TIMEOUT ) {
                    LOG.debug("Request too close, wait");
                    LOG.debug("S4ND: connection busy");
                    return ERROR_BUSY;
                }
            }
        }
        LOG.debug("The sync is NOT in progress, closing thread");
        if(sock){
            closesocket(sock);
            sock= 0;
        }
        Sleep(100);
        if(handle){
            LOG.debug("Closing handle");
            CloseHandle(handle);
            handle=0;
        }
    }
    // Clean-up dirty messages
    if(message){
        delete [] message;
        message=0;
    }
    msglen=0;

    // Start worker
    sock = s;
    s4jp = S4JProxy::getInstance();

    startTime = getTime();   // record starting time
    handle = CreateThread(NULL, 0, NotifWorker, (LPVOID*) sock, 0, NULL);
    if (handle == 0) {
        LOG.debug("S4ND:ERROR:CreateThread fails. GetLastError() returns <0x%08x>", GetLastError());
        // Release resources
        closeWorker();
    }

    LOG.debug("S4ND: new worker thread handle=<0x%08x>", handle);
    return ERROR_SUCCESS;
}

/**
* Close the communication socket client / server
*/
DWORD stopWorker()
{
    if(sock){
        closesocket(sock);
        sock= 0;
    }
    Sleep(100);
    if ( !handle )
        return ERROR_SUCCESS;
    else
        return ERROR_BUSY;
}

static void closeWorker()
{
    if(sock){
        closesocket(sock);
        sock= 0;
    }
    if(handle){
        CloseHandle(handle);
        handle=0;
    }
    S4JProxy::dispose();
    if(message){
        delete [] message;
        message=0;
    }
    msglen=0;

    LOG.debug("Worker end.");
}

static void addBuffer(const char *buf, int len)
{
    if(!buf || !len)
        return;

    if(!message){
        message = new char[len+1];
        memcpy(message, buf, len);
        msglen = len;
    }
    else{
        char *tmp = new char[msglen+len+1];
        memcpy(tmp, message, msglen);
        memcpy(tmp+msglen, buf, len);
        msglen += len;
        delete [] message;
        message = tmp;
    }
}

/**
* Process the incoming server message and return a response.
* Return NULL if the message is not complete.
*/
const char *processPacket(const char *pkt, int pktlen)
{
    S4JProxy::RetCode code = S4JProxy::Error;
    static time_t last = getTime();

    time_t now = getTime();

    if ( (now - last) > 5 ){
        // reset message buffer
        delete [] message;
        message = 0;
    }

    // Message arrived
    LOG.debug("S4ND: %d bytes received", pktlen);

    addBuffer(pkt, pktlen);

    LOG.debug("S4ND: total message len %d", msglen);
    hexDump(message, msglen);

    code=s4jp->parsePkg0(message, msglen);
    LOG.debug("Return from parseMessage: %d", code);

    if (code != S4JProxy::Continue)
        return s4jp->getResponse();
    else
        return NULL;
}

/**
* Connection thread client/server. It waits for the server request,
* parses the message and if possible starts the synchronization of the
* server specified data source.
*/
extern "C" DWORD WINAPI NotifWorker(LPVOID lpv)
{
    SOCKET sock = (SOCKET)lpv;
    char buf[1501];
    int buflen=1500, pktlen;
    const char *response=NULL;

    while (!response) {
        pktlen = recv(sock, buf, buflen, 0);
        LOG.debug("Pktlen: %d", pktlen);
        switch(pktlen) {
            case SOCKET_ERROR: {
                DWORD err = WSAGetLastError();
                if(err == WSAEMSGSIZE ){
                    // TODO: this case should be handled
                    LOG.debug("Message larger than %d bytes!", buflen);
                    response = "500 - Message too long\n";
                }
                else {
                    LOG.error("Recv error: %d", err);
                    response = "500 - Network error\n"; // Try to respond anyway?
                }
                break;
            }
            case 0:
                // End of connection
                LOG.debug("Connection closed by peer.");
                response = "";  // Conn closed, no response
                break;
            default:
                response = processPacket(buf, pktlen);
        }
    }

    // If message is not empty, send it
    if(*response){
        LOG.debug("S4ND: Sending message <%s> to client", response);
        send(sock, response, strlen(response),0);
        // Start Sync
        if ( !s4jp->hasErrors() ) {
            LOG.info("Received STP Notification");
            closesocket(sock); // XXX
            syncinprogress=true;
            s4jp->sync();
            LOG.debug("Sync End.");
            syncinprogress=false;
        }
    }
    // Delete thread
    closeWorker();
    return 0;
}




/**
 * This is the main STP thread. Starts the notifyAddressChange sync (now it's an external process!)
 * and returns the response code: 0 if address accepted (success), 
 * otherwise the error code (!= 0).
 */
DWORD WINAPI stpWorker(LPVOID lpv) 
{
    LOG.debug("Starting stpWorker thread");

    PROCESS_INFORMATION procinfo;
    int responseCode = 0;
    DWORD exitcode = 0;
    wchar_t *path = NULL;
    wstring cmd;
    
    ClientSettings* cs = getRegConfig();
    path = toWideChar(cs->getAppPath().c_str());
    
    if (path) {
        cmd += path;
        delete [] path;
    }
    cmd += TEXT("\\startsync.exe");
   
    if (!CreateProcess(cmd.c_str(), TEXT("addresschange"),
                       NULL, NULL, FALSE, 0,
                       NULL, NULL, NULL, &procinfo ) ) {
        LOG.error("Address change not created: error");
        return -1;
    }
    
    DWORD waitResult = WaitForSingleObject(procinfo.hProcess, INFINITE);
       
    switch (waitResult) {
        // Thread exited, get exit code.
            case WAIT_ABANDONED:
                LOG.info("stpWorker abandoned");
                responseCode = -1;
                break;

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?