📄 udpsession.cpp
字号:
/***************************************************************************
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
* copyright : (C) 2002 by Zhang Yong *
* email : z-yong163@163.com *
***************************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <iostream.h>
#include "udppacket.h"
#include "sessionhash.h"
#include "ndes.h"
enum {
UDP_ACK = 1,
UDP_NEW_UIN,
UDP_GET_CONTACTLIST,
UDP_LOGIN,
UDP_LOGOUT,
UDP_KEEPALIVE,
UDP_CHANGE_STATUS,
UDP_UPDATE_CONTACT,
UDP_MODIFY_USER,
UDP_UPDATE_USER,
UDP_SEND_MSG,
UDP_GROUP_SEND_MSG,
UDP_SEARCH_RANDOM,
UDP_SEARCH_CUSTOM,
UDP_ADD_FRIEND,
UDP_DEL_FRIEND,
UDP_BROADCAST_MSG,
UDP_SRV_USER_ONLINE = 0x0100,
UDP_SRV_USER_OFFLINE,
UDP_SRV_MULTI_ONLINE,
UDP_SRV_STATUS_CHANGED,
UDP_SRV_MESSAGE,
UDP_SRV_SEARCH,
};
enum {
LOGIN_SUCCESS,
LOGIN_INVALID_UIN,
LOGIN_WRONG_PASSWD,
};
enum {
STATUS_ONLINE = 0,
STATUS_OFFLINE,
STATUS_AWAY,
STATUS_INVIS
};
enum {
MSG_TEXT,
MSG_AUTO_REPLY,
MSG_AUTH_ACCEPTED,
MSG_AUTH_REQ,
MSG_AUTH_REJECTED,
MSG_ADDED,
MSG_BROADCAST,
};
enum {
ADD_FRIEND_ACCEPTED,
ADD_FRIEND_AUTH_REQ,
ADD_FRIEND_REJECTED,
};
#define MYICQ_UDP_VER 1
#define MYICQ_PORT_DEFAULT 8000
#define MAX_SEARCH_PER_PAGE 25
#define MAX_SEND_ATTEMPTS 2
#define MAX_SQL_STATEMENT 4096
#define MAX_MSG_LEN 512
#define START_UIN 1000
struct BROADCAST_MSG {
IcqListItem listItem;
uint32 id;
uint8 type;
uint32 when;
uint32 src;
char text[MAX_MSG_LEN];
uint32 to;
uint32 maxUIN;
time_t expire;
};
int UdpSession::sock = -1;
IcqList UdpSession::globalSendQueue;
IcqList UdpSession::keepAliveList;
IcqList UdpSession::broadMsgList;
MYSQL UdpSession::mysql;
char UdpSession::sqlStmt[MAX_SQL_STATEMENT];
uint32 UdpSession::sessionCount = 0;
bool UdpSession::initialize()
{
srand(time(NULL));
desinit(0);
sock = socket(AF_INET, SOCK_DGRAM, 0);
if (sock < 0) {
cerr << "create socket failed." << endl;
return false;
}
sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(MYICQ_PORT_DEFAULT);
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(sock, (sockaddr *) &addr, sizeof(addr)) < 0) {
cerr << "socket bind failed." << endl;
close(sock);
return false;
}
if (!mysql_init(&mysql)) {
cerr << "mysql initialization failed." << endl;
return false;
}
const char *host = LOCAL_HOST;
const char *user = "myicq";
const char *pass = "myicq";
const char *db = "myicq";
const char *unix_socket = NULL;
if (!mysql_real_connect(&mysql, host, user, pass, db, 0, unix_socket, 0)) {
cerr << "can not connect to mysql server." << endl;
mysql_close(&mysql);
return false;
}
return true;
}
void UdpSession::cleanUp()
{
IcqListItem *head = &keepAliveList.head;
IcqListItem *pos;
UdpSession *p;
while ((pos = head->next) != head) {
p = LIST_ENTRY(pos, UdpSession, keepAliveItem);
delete p;
}
mysql_close(&mysql);
close(sock);
}
bool UdpSession::onReceive()
{
UdpInPacket in;
int n = in.recv(sock);
if (n < 0)
return false;
if (n < (int) sizeof(UDP_HEADER)) {
cout << "packet size is less than " << sizeof(UDP_HEADER) << endl;
return false;
}
uint16 ver = in.getVersion();
if (ver > MYICQ_UDP_VER) {
cout << "packet v" << ver << " not supported." << endl;
return false;
}
uint16 cmd = in.getCmd();
uint32 uin = in.getUIN();
uint32 ip = in.getIP();
uint16 port = in.getPort();
UdpSession *session = NULL;
if (cmd == UDP_NEW_UIN)
session = SessionHash::getDead(ip, port);
else if (cmd != UDP_LOGIN) {
session = SessionHash::getAlive(uin);
if (!session && cmd == UDP_ACK)
session = SessionHash::getDead(ip, port);
if (!session)
return false;
}
if (!session) {
session = new UdpSession;
if (!session) {
cout << "create session failed." << endl;
return false;
}
}
session->onReceive(in);
return true;
}
time_t UdpSession::checkSendQueue()
{
IcqListItem *pos;
UdpOutPacket *p;
UdpSession *session;
IcqListItem *head = &globalSendQueue.head;
time_t now = time(NULL);
while ((pos = head->next) != head) {
p = LIST_ENTRY(head->next, UdpOutPacket, globalSendItem);
if (p->expire > now)
return (p->expire - now);
session = p->session;
cout << "packet " << p->getSeq() << " time out." << endl;
p->attempts++;
if (p->attempts <= MAX_SEND_ATTEMPTS) {
cout << "retrasmit packet." << endl;
pos->remove();
p->expire = now + SEND_TIMEOUT;
session->sendDirect(p);
globalSendQueue.add(pos);
}
else {
cout << "maximum attempts reached. delete it!" << endl;
p->sendItem.remove();
p->globalSendItem.remove();
delete p;
}
}
return SEND_TIMEOUT;
}
time_t UdpSession::checkKeepAlive()
{
IcqListItem *pos;
UdpSession *p;
IcqListItem *head = &keepAliveList.head;
time_t now = time(NULL);
while ((pos = head->next) != head) {
p = LIST_ENTRY(pos, UdpSession, keepAliveItem);
if (p->expire > now)
return (p->expire - now);
cout << p->uin << " expires." << endl;
if (p->status != STATUS_OFFLINE)
p->dead();
delete p;
}
return KEEPALIVE_TIMEOUT;
}
void UdpSession::broadcastMessages()
{
if (broadMsgList.isEmpty())
return;
BROADCAST_MSG *msg = LIST_ENTRY(broadMsgList.getHead(), BROADCAST_MSG, listItem);
for (int i = 0; i < 10; ++i) {
UdpSession *s = SessionHash::getAlive(msg->to);
if (s)
sendMessage(msg->type, msg->to, msg->src, s, msg->when, msg->text);
else {
int n = sprintf(sqlStmt, "INSERT INTO broadmsg_tbl VALUES(%lu, %lu)", msg->to, msg->id);
mysql_real_query(&mysql, sqlStmt, n);
}
msg->to++;
if (msg->to > msg->maxUIN) {
msg->listItem.remove();
delete msg;
break;
}
}
}
void UdpSession::addFriend(uint32 dst, uint32 src, UdpSession *dstSession, UdpSession *srcSession)
{
int n = sprintf(sqlStmt, "INSERT INTO friend_tbl values(%lu, %lu)", src, dst);
mysql_real_query(&mysql, sqlStmt, n);
if (!srcSession)
return;
if (dstSession && dstSession->status != STATUS_INVIS) {
UdpOutPacket *out = srcSession->createPacket(UDP_SRV_USER_ONLINE);
out->write32(dst);
out->write32(dstSession->status);
out->write32(dstSession->ip);
out->write16(dstSession->port);
out->write32(dstSession->realIP);
srcSession->sendPacket(out);
}
}
void UdpSession::onlineNotify()
{
MYSQL_RES *res;
MYSQL_ROW row;
int n = sprintf(sqlStmt, "SELECT uin1 FROM friend_tbl WHERE uin2=%lu", uin);
if (mysql_real_query(&mysql, sqlStmt, n) == 0 && (res = mysql_store_result(&mysql))) {
while ((row = mysql_fetch_row(res))) {
uint32 friendUIN = atol(row[0]);
UdpSession *session = SessionHash::getAlive(friendUIN);
if (session) {
UdpOutPacket *out = session->createPacket(UDP_SRV_USER_ONLINE);
out->write32(uin);
out->write32(status);
out->write32(ip);
out->write16(port);
out->write32(realIP);
session->sendPacket(out);
}
}
mysql_free_result(res);
}
}
void UdpSession::offlineNotify()
{
MYSQL_RES *res;
MYSQL_ROW row;
int n = sprintf(sqlStmt, "SELECT uin1 FROM friend_tbl WHERE uin2=%lu", uin);
if (mysql_real_query(&mysql, sqlStmt, n) == 0 && (res = mysql_store_result(&mysql))) {
while ((row = mysql_fetch_row(res))) {
uint32 friendUIN = atol(row[0]);
UdpSession *session = SessionHash::getAlive(friendUIN);
if (session) {
UdpOutPacket *out = session->createPacket(UDP_SRV_USER_OFFLINE);
out->write32(uin);
session->sendPacket(out);
}
}
mysql_free_result(res);
}
}
UdpSession::UdpSession()
{
udpVer = 0;
tcpVer = 0;
sid = 0;
uin = 0;
auth = 0;
ip = realIP = 0;
port = 0;
status = STATUS_OFFLINE;
sendSeq = rand() & 0x7fff;
recvSeq = 0;
window = 0;
expire = time(NULL) + KEEPALIVE_TIMEOUT;
sessionCount++;
}
UdpSession::~UdpSession()
{
IcqListItem *head = &sendQueue.head;
IcqListItem *pos;
while ((pos = head->next) != head) {
UdpOutPacket *p = LIST_ENTRY(pos, UdpOutPacket, sendItem);
pos->remove();
p->globalSendItem.remove();
delete p;
}
listItem.remove();
keepAliveItem.remove();
sessionCount--;
}
UdpOutPacket *UdpSession::createPacket(uint16 cmd, uint16 ackSeq)
{
UdpOutPacket *out = new UdpOutPacket(this);
createPacket(*out, cmd, ackSeq);
return out;
}
void UdpSession::createPacket(UdpOutPacket &out, uint16 cmd, uint16 ackSeq)
{
out.write16(udpVer);
out.write32(0);
out.write32(sid);
out.write16(++sendSeq);
out.write16(ackSeq);
out.write16(cmd);
out.write32(uin);
}
void UdpSession::sendAckPacket(uint16 cmd, uint16 seq)
{
UdpOutPacket out(this);
out.write16(udpVer);
out.write32(0);
out.write32(sid);
out.write16(0);
out.write16(seq);
out.write16(cmd);
out.write32(uin);
sendDirect(&out);
}
void UdpSession::sendPacket(UdpOutPacket *p)
{
p->attempts = 0;
p->expire = time(NULL) + SEND_TIMEOUT;
sendDirect(p);
sendQueue.add(&p->sendItem);
globalSendQueue.add(&p->globalSendItem);
}
bool UdpSession::setWindow(uint16 seq)
{
if (seq >= recvSeq + 32 || seq < recvSeq)
return false;
if (seq == recvSeq) {
do {
recvSeq++;
window >>= 1;
} while (window & 0x1);
} else {
uint32 mask = (1 << (seq - recvSeq));
if (window & mask)
return false;
else
window |= mask;
}
return true;
}
bool UdpSession::onReceive(UdpInPacket &in)
{
uint16 cmd = in.getCmd();
uint16 seq = in.getSeq();
if (cmd != UDP_LOGIN && cmd != UDP_NEW_UIN) {
if (sid != in.getSID() || uin != in.getUIN()) {
cout << "packet does not belong to this session." << endl;
return false;
}
in.decrypt(passwd);
} else if (sid == 0) {
udpVer = in.getVersion();
ip = in.getIP();
port = in.getPort();
recvSeq = seq;
sid = in.getSID();
}
if (cmd != UDP_ACK && !setWindow(seq)) {
cout << "packet " << seq << " is duplicated" << endl;
sendAckPacket(cmd, seq);
return false;
}
switch (cmd) {
case UDP_ACK:
onAck(seq);
return true;
case UDP_KEEPALIVE:
onKeepAlive(in);
break;
case UDP_NEW_UIN:
onNewUIN(in);
break;
case UDP_GET_CONTACTLIST:
onGetContactList(in);
break;
case UDP_LOGIN:
onLogin(in);
break;
case UDP_LOGOUT:
onLogout(in);
return true;
case UDP_CHANGE_STATUS:
sendAckPacket(cmd, seq);
onChangeStatus(in);
break;
case UDP_UPDATE_CONTACT:
onUpdateContact(in);
break;
case UDP_UPDATE_USER:
onUpdateUser(in);
break;
case UDP_MODIFY_USER:
sendAckPacket(cmd, seq);
onModifyUser(in);
break;
case UDP_SEND_MSG:
sendAckPacket(cmd, seq);
onSendMessage(in);
break;
case UDP_GROUP_SEND_MSG:
sendAckPacket(cmd, seq);
//onGroupSendMessage(in);
break;
case UDP_SEARCH_RANDOM:
onSearchRandom(in);
break;
case UDP_SEARCH_CUSTOM:
onSearchCustom(in);
break;
case UDP_ADD_FRIEND:
onAddFriend(in);
break;
case UDP_DEL_FRIEND:
sendAckPacket(cmd, seq);
onDelFriend(in);
break;
case UDP_BROADCAST_MSG:
sendAckPacket(cmd, seq);
onBroadcastMsg(in);
break;
default:
cerr << "unknown cmd " << cmd << endl;
return false;
}
return true;
}
void UdpSession::onAck(uint16 seq)
{
IcqListItem *head = &sendQueue.head;
IcqListItem *pos;
UdpOutPacket *p;
LIST_FOR_EACH(pos, head) {
p = LIST_ENTRY(pos, UdpOutPacket, sendItem);
if (p->getSeq() == seq) {
cout << "packet " << seq << " is ACKed" << endl;
pos->remove();
p->globalSendItem.remove();
delete p;
if (status == STATUS_OFFLINE && sendQueue.isEmpty()) {
cout << "session is dead!" << endl;
delete this;
}
return;
}
}
cout << "ACK packet(seq = " << seq << ") is ignored" << endl;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -