📄 commonsms.cpp
字号:
#include "stdafx.h"
#include "CommonSMS.h"
#include "RxRecordset.h"
#include <stdio.h>
#include <time.h>
#pragma comment(lib,"ws2_32.lib")
long CCommonSMS::m_Instance=0;
CCommonSMS::CCommonSMS()
{
m_nThreadPools=0; //线程数量
m_nWndSize=16; //发送窗口数量
m_nTimeOut=60; //超时时间
m_nTrys=3; //重发次数
m_nWndBits=0;
m_nNextEventIndex=0; //循环监听下个事件的起点索引
m_threadpoolsinstance=0; //线程实例数量,当为0时释放
m_phSemaphoreWnd=NULL; //线程发送事件
m_phWaitSubmitEvent=NULL;
m_phThreadPool=NULL; //线程句柄数组
m_pThreadPool =NULL;
m_pWndData=NULL; //发送数据结构
m_OnDeliverSMS=NULL;
m_OnMTReportSMS=NULL;
m_OnSubmitedSMS=NULL;
m_hQuitEvent=CreateEvent(NULL,false,false,NULL);//等待退出事件
::InitializeCriticalSection(&m_csSeqId);
::InitializeCriticalSection(&m_csSocket);
WSADATA ws;// 初始化网络
WSAStartup(MAKEWORD(2,0),&ws);
}
CCommonSMS::~CCommonSMS()
{
::DeleteCriticalSection(&m_csSeqId);
::DeleteCriticalSection(&m_csSocket);
::CloseHandle(m_hQuitEvent);
::WSACleanup();
}
int CCommonSMS::Close()
{
for(int i=0;i<m_nThreadPools; i++)
{
m_pThreadPool[i].m_bQuit=true;
}
m_bQuit=true;
int ret= ::WaitForSingleObject(m_hQuitEvent,60*1000);//等待60秒
for(i=0;i<m_nThreadPools; i++)
{
::TerminateThread(m_phThreadPool[i],0);
::CloseHandle(m_phThreadPool[i]);
::CloseHandle(m_phSemaphoreWnd[i]);
::CloseHandle(m_phWaitSubmitEvent[i]);
}
if( m_hRecvThread)
{
::TerminateThread(m_hRecvThread,0);
::CloseHandle(m_hRecvThread);
}
if( m_hActiveThread)
{
::TerminateThread(m_hActiveThread,0);
::CloseHandle(m_hActiveThread);
}
if(m_phSemaphoreWnd)
{
m_phSemaphoreWnd=0;
delete [] m_phSemaphoreWnd;
}
if(m_phWaitSubmitEvent)
{
m_phWaitSubmitEvent=0;
delete [] m_phWaitSubmitEvent;
}
if(m_phThreadPool)
{
m_phThreadPool=0;
delete [] m_phThreadPool;
}
if(m_pWndData)
{
delete [] m_pWndData;
m_pWndData=0;
}
return 0;
}
int CCommonSMS::Connect()
{
if (m_Socket!=INVALID_SOCKET) closesocket(m_Socket);
m_Socket=socket(AF_INET,SOCK_STREAM,0);
sockaddr_in sockaddr;
memset(&sockaddr,0,sizeof(sockaddr_in));
sockaddr.sin_family=AF_INET;
sockaddr.sin_addr.S_un.S_addr=inet_addr(m_szIp);
if(sockaddr.sin_addr.S_un.S_addr==-1)
{
hostent *host=::gethostbyname(m_szIp);//解析地址
if(host)
{
sockaddr.sin_addr=*((in_addr*)host->h_addr_list);
}
}
sockaddr.sin_port=htons(m_nPort);
int ret=connect(m_Socket,(LPSOCKADDR)&sockaddr,sizeof(sockaddr_in));//连接
if(ret==0)
{ //设定读socket时间,当超时时返回10060 WSAETIMEDOUT
int rcvtimeout =30 ;
::setsockopt(m_Socket,SOL_SOCKET,SO_RCVTIMEO,(char*)&rcvtimeout,sizeof(int));
return 0;
}
closesocket(m_Socket);
m_Socket=INVALID_SOCKET;
return WSAGetLastError();
}
void CCommonSMS::OnSocketError()
{
}
void CCommonSMS::OnDeliverSMS(DELIVER_SMS *pDeliver)
{
if( m_OnDeliverSMS)
{
m_OnDeliverSMS(pDeliver);
}
}
void CCommonSMS::OnMTReportSMS(MTREPORT_SMS *pReport)
{
if( m_OnMTReportSMS)
{
m_OnMTReportSMS(pReport);
}
}
void CCommonSMS::OnSubmitedSMS(SUBMIT_SMS * pSubmited )
{
if( m_OnSubmitedSMS)
{
m_OnSubmitedSMS(pSubmited);
}
}
int CCommonSMS::Login()
{
return 0;
}
int CCommonSMS::Logout()
{
return 0;
}
int CCommonSMS:: SubmitSMS(SUBMIT_SMS *p ,int len,int seqid)
{
return 0;
}
int CCommonSMS::Send(char *buff ,int len) //发送函数
{
int ret;
EnterCriticalSection( &m_csSocket);
ret = send( m_Socket, buff, len, 0);
LeaveCriticalSection( &m_csSocket);
return ret;
}
int CCommonSMS::Initialize(int threadpools, int wndsize,int timeout,int retrys)
{
m_bQuit=false;
m_nThreadPools = threadpools;
m_nWndSize = wndsize;
m_nTimeOut = timeout;
m_nTrys = retrys;
m_pWndData= new WND_DATA[m_nThreadPools *m_nWndSize];
::memset(m_pWndData, 0 ,sizeof(WND_DATA)* m_nThreadPools *m_nWndSize);
m_phSemaphoreWnd = new HANDLE[m_nThreadPools *2];
m_phWaitSubmitEvent = new HANDLE[m_nThreadPools];
m_phThreadPool = new HANDLE[m_nThreadPools];
m_pThreadPool = (CThreadPool * ) new CThreadPool [m_nThreadPools];
for(int i=0;i<m_nThreadPools; i++)
{
m_pThreadPool[i].m_pParent =this;
m_pThreadPool[i].m_nThreadIndex =i;
m_pThreadPool[i].m_bQuit=false;
m_pThreadPool[i].m_nWndIdles = m_nWndSize -1 ;
m_pThreadPool[i].m_nWaitSendCounts=0;
m_phWaitSubmitEvent[i]=CreateEvent(NULL,true,false,NULL);
m_phSemaphoreWnd[i]=CreateSemaphore(NULL,m_nWndSize,m_nWndSize,NULL);
m_phSemaphoreWnd[i+m_nThreadPools]= m_phSemaphoreWnd[i];
m_phThreadPool[i]=::CreateThread(0,0,CThreadPool::ThreadPoolProc,&m_pThreadPool[i],0,0);
}
int quotient=m_nThreadPools; //得到窗口占位
m_nWndBits=0;
while(quotient)
{
quotient=quotient/2;
m_nWndBits ++;//空闲线程的个数--11
}
m_nMaxSeqId= 0xFFFFFFFF >> m_nWndBits; //得到最大消息流水号
m_hRecvThread=::CreateThread(0,0,ThreadRecvProc,this,0,0);
m_hActiveThread=::CreateThread(0,0,ThreadActiveProc,this,0,0);
return 0;
}
int CCommonSMS::AddThreadPoolInstance()//管理线程池,增加一个实例加一
{
return ::InterlockedIncrement(&m_threadpoolsinstance);
}
int CCommonSMS::ReleaseThreadPoolInstance()//管理线程池,减少一个实例减一0释放
{
long ret= ::InterlockedDecrement(&m_threadpoolsinstance);
if(ret==0)
{
::SetEvent(m_hQuitEvent);//通知线程池已经释放
}
return ret;
}
int CCommonSMS::GetSeqid(int index)
{
unsigned int seqid=index + 1;
seqid=seqid << (32-m_nWndBits);
::EnterCriticalSection(&m_csSeqId);
if(m_nSeqId >= m_nMaxSeqId)
{
m_nSeqId=0;
}
seqid=seqid | ++m_nSeqId;
::LeaveCriticalSection(&m_csSeqId);
return seqid;
}
char* CCommonSMS::GetTimeStamp( char *buf)
{
time_t tt;
struct tm *now;
time( &tt);
now = localtime( &tt);
sprintf( buf, "%02d%02d%02d%02d%02d",
now->tm_mon + 1,
now->tm_mday,
now->tm_hour,
now->tm_min,
now->tm_sec);
return buf;
}
unsigned long _stdcall CCommonSMS::ThreadRecvProc(void *lpvoid)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -