⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 udpsession.cpp

📁 MyICQ的源码.MyICQ是一套公开源代码的即时通讯软件
💻 CPP
📖 第 1 页 / 共 2 页
字号:
/***************************************************************************
 *                                                                         *
 *   This program is free software; you can redistribute it and/or modify  *
 *   it under the terms of the GNU General Public License as published by  *
 *   the Free Software Foundation; either version 2 of the License, or     *
 *   (at your option) any later version.                                   *
 *                                                                         *
 *   copyright            : (C) 2002 by Zhang Yong                         *
 *   email                : z-yong163@163.com                              *
 ***************************************************************************/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <iostream.h>
#include "udppacket.h"
#include "sessionhash.h"
#include "ndes.h"

enum {
	UDP_ACK = 1,
	UDP_NEW_UIN,
	UDP_GET_CONTACTLIST,
	UDP_LOGIN,
	UDP_LOGOUT,
	UDP_KEEPALIVE,
	UDP_CHANGE_STATUS,
	UDP_UPDATE_CONTACT,
	UDP_MODIFY_USER,
	UDP_UPDATE_USER,
	UDP_SEND_MSG,
	UDP_GROUP_SEND_MSG,
	UDP_SEARCH_RANDOM,
	UDP_SEARCH_CUSTOM,
	UDP_ADD_FRIEND,
	UDP_DEL_FRIEND,
	UDP_BROADCAST_MSG,

	UDP_SRV_USER_ONLINE = 0x0100,
	UDP_SRV_USER_OFFLINE,
	UDP_SRV_MULTI_ONLINE,
	UDP_SRV_STATUS_CHANGED,
	UDP_SRV_MESSAGE,
	UDP_SRV_SEARCH,
};

enum {
	LOGIN_SUCCESS,
	LOGIN_INVALID_UIN,
	LOGIN_WRONG_PASSWD,
};

enum {
	STATUS_ONLINE = 0,
	STATUS_OFFLINE,
	STATUS_AWAY,
	STATUS_INVIS
};

enum {
	MSG_TEXT,
	MSG_AUTO_REPLY,
	MSG_AUTH_ACCEPTED,
	MSG_AUTH_REQ,
	MSG_AUTH_REJECTED,
	MSG_ADDED,
	MSG_BROADCAST,
};

enum {
	ADD_FRIEND_ACCEPTED,
	ADD_FRIEND_AUTH_REQ,
	ADD_FRIEND_REJECTED,
};

#define MYICQ_UDP_VER		1
#define MYICQ_PORT_DEFAULT	8000
#define MAX_SEARCH_PER_PAGE	25
#define MAX_SEND_ATTEMPTS	2
#define MAX_SQL_STATEMENT	4096
#define MAX_MSG_LEN			512
#define START_UIN			1000

struct BROADCAST_MSG {
	IcqListItem listItem;

	uint32 id;
	uint8 type;
	uint32 when;
	uint32 src;
	char text[MAX_MSG_LEN];
	uint32 to;
	uint32 maxUIN;
	time_t expire;
};

int		UdpSession::sock = -1;
IcqList	UdpSession::globalSendQueue;
IcqList	UdpSession::keepAliveList;
IcqList UdpSession::broadMsgList;
MYSQL	UdpSession::mysql;
char	UdpSession::sqlStmt[MAX_SQL_STATEMENT];
uint32	UdpSession::sessionCount = 0;


bool UdpSession::initialize()
{
	srand(time(NULL));
	desinit(0);
	
	sock = socket(AF_INET, SOCK_DGRAM, 0);
	if (sock < 0) {
		cerr << "create socket failed." << endl;
		return false;
	}

	sockaddr_in addr;
	memset(&addr, 0, sizeof(addr));
	addr.sin_family = AF_INET;
	addr.sin_port = htons(MYICQ_PORT_DEFAULT);
	addr.sin_addr.s_addr = INADDR_ANY;
	if (bind(sock, (sockaddr *) &addr, sizeof(addr)) < 0) {
		cerr << "socket bind failed." << endl;
		close(sock);
		return false;
	}

	if (!mysql_init(&mysql)) {
		cerr << "mysql initialization failed." << endl;
		return false;
	}

	const char *host = LOCAL_HOST;
	const char *user = "myicq";
	const char *pass = "myicq";
	const char *db = "myicq";
	const char *unix_socket = NULL;

	if (!mysql_real_connect(&mysql, host, user, pass, db, 0, unix_socket, 0)) {
		cerr << "can not connect to mysql server." << endl;
		mysql_close(&mysql);
		return false;
	}

	return true;
}

void UdpSession::cleanUp()
{
	IcqListItem *head = &keepAliveList.head;
	IcqListItem *pos;
	UdpSession *p;

	while ((pos = head->next) != head) {
		p = LIST_ENTRY(pos, UdpSession, keepAliveItem);
		delete p;
	}

	mysql_close(&mysql);
	close(sock);
}

bool UdpSession::onReceive()
{
	UdpInPacket in;
	int n = in.recv(sock);
	if (n < 0)
		return false;

	if (n < (int) sizeof(UDP_HEADER)) {
		cout << "packet size is less than " << sizeof(UDP_HEADER) << endl;
		return false;
	}
	uint16 ver = in.getVersion();
	if (ver > MYICQ_UDP_VER) {
		cout << "packet v" << ver << " not supported." << endl;
		return false;
	}

	uint16 cmd = in.getCmd();
	uint32 uin = in.getUIN();
	uint32 ip = in.getIP();
	uint16 port = in.getPort();
	UdpSession *session = NULL;

	if (cmd == UDP_NEW_UIN)
		session = SessionHash::getDead(ip, port);
	else if (cmd != UDP_LOGIN) {
		session = SessionHash::getAlive(uin);
		if (!session && cmd == UDP_ACK)
			session = SessionHash::getDead(ip, port);
		if (!session)
			return false;
	}

	if (!session) {
		session = new UdpSession;
		if (!session) {
			cout << "create session failed." << endl;
			return false;
		}
	}

	session->onReceive(in);
	return true;
}

time_t UdpSession::checkSendQueue()
{
	IcqListItem *pos;
	UdpOutPacket *p;
	UdpSession *session;
	IcqListItem *head = &globalSendQueue.head;
	time_t now = time(NULL);

	while ((pos = head->next) != head) {
		p = LIST_ENTRY(head->next, UdpOutPacket, globalSendItem);
		if (p->expire > now)
			return (p->expire - now);

		session = p->session;
		cout << "packet " << p->getSeq() << " time out." << endl;


		p->attempts++;
		if (p->attempts <= MAX_SEND_ATTEMPTS) {
			cout << "retrasmit packet." << endl;
			pos->remove();
			p->expire = now + SEND_TIMEOUT;
			session->sendDirect(p);
			globalSendQueue.add(pos);
		}
		else {
			cout << "maximum attempts reached. delete it!" << endl;
			p->sendItem.remove();
			p->globalSendItem.remove();
			delete p;
		}
	}
	return SEND_TIMEOUT;
}

time_t UdpSession::checkKeepAlive()
{
	IcqListItem *pos;
	UdpSession *p;
	IcqListItem *head = &keepAliveList.head;
	time_t now = time(NULL);

	while ((pos = head->next) != head) {
		p = LIST_ENTRY(pos, UdpSession, keepAliveItem);
		if (p->expire > now)
			return (p->expire - now);

		cout << p->uin << " expires." << endl;
		if (p->status != STATUS_OFFLINE)
			p->dead();
		delete p;
	}
	return KEEPALIVE_TIMEOUT;
}

void UdpSession::broadcastMessages()
{
	if (broadMsgList.isEmpty())
		return;

	BROADCAST_MSG *msg = LIST_ENTRY(broadMsgList.getHead(), BROADCAST_MSG, listItem);
	for (int i = 0; i < 10; ++i) {
		UdpSession *s = SessionHash::getAlive(msg->to);
		if (s)
			sendMessage(msg->type, msg->to, msg->src, s, msg->when, msg->text);
		else {
			int n = sprintf(sqlStmt, "INSERT INTO broadmsg_tbl VALUES(%lu, %lu)", msg->to, msg->id);
			mysql_real_query(&mysql, sqlStmt, n);
		}
		msg->to++;
		if (msg->to > msg->maxUIN) {
			msg->listItem.remove();
			delete msg;
			break;
		}
	}
}

void UdpSession::addFriend(uint32 dst, uint32 src, UdpSession *dstSession, UdpSession *srcSession)
{
	int n = sprintf(sqlStmt, "INSERT INTO friend_tbl values(%lu, %lu)", src, dst);
	mysql_real_query(&mysql, sqlStmt, n);

	if (!srcSession)
		return;

	if (dstSession && dstSession->status != STATUS_INVIS) {
		UdpOutPacket *out = srcSession->createPacket(UDP_SRV_USER_ONLINE);
		out->write32(dst);
		out->write32(dstSession->status);
		out->write32(dstSession->ip);
		out->write16(dstSession->port);
		out->write32(dstSession->realIP);
		srcSession->sendPacket(out);
	}
}

void UdpSession::onlineNotify()
{	
	MYSQL_RES *res;
	MYSQL_ROW row;
	
	int n = sprintf(sqlStmt, "SELECT uin1 FROM friend_tbl WHERE uin2=%lu", uin);
	if (mysql_real_query(&mysql, sqlStmt, n) == 0 && (res = mysql_store_result(&mysql))) {
		while ((row = mysql_fetch_row(res))) {
			uint32 friendUIN = atol(row[0]);
			UdpSession *session = SessionHash::getAlive(friendUIN);
			if (session) {
				UdpOutPacket *out = session->createPacket(UDP_SRV_USER_ONLINE);
				out->write32(uin);
				out->write32(status);
				out->write32(ip);
				out->write16(port);
				out->write32(realIP);
				session->sendPacket(out);
			}
		}
		mysql_free_result(res);
	}
}

void UdpSession::offlineNotify()
{		
	MYSQL_RES *res;
	MYSQL_ROW row;
	
	int n = sprintf(sqlStmt, "SELECT uin1 FROM friend_tbl WHERE uin2=%lu", uin);
	if (mysql_real_query(&mysql, sqlStmt, n) == 0 && (res = mysql_store_result(&mysql))) {
		while ((row = mysql_fetch_row(res))) {
			uint32 friendUIN = atol(row[0]);
			UdpSession *session = SessionHash::getAlive(friendUIN);
			if (session) {
				UdpOutPacket *out = session->createPacket(UDP_SRV_USER_OFFLINE);
				out->write32(uin);
				session->sendPacket(out);
			}
		}
		mysql_free_result(res);
	}
}

UdpSession::UdpSession()
{
	udpVer = 0;
	tcpVer = 0;
	sid = 0;
	uin = 0;
	auth = 0;
	ip = realIP = 0;
	port = 0;
	status = STATUS_OFFLINE;

	sendSeq = rand() & 0x7fff;
	recvSeq = 0;
	window = 0;
	expire = time(NULL) + KEEPALIVE_TIMEOUT;

	sessionCount++;
}

UdpSession::~UdpSession()
{
	IcqListItem *head = &sendQueue.head;
	IcqListItem *pos;

	while ((pos = head->next) != head) {
		UdpOutPacket *p = LIST_ENTRY(pos, UdpOutPacket, sendItem);
		pos->remove();
		p->globalSendItem.remove();
		delete p;
	}

	listItem.remove();
	keepAliveItem.remove();

	sessionCount--;
}

UdpOutPacket *UdpSession::createPacket(uint16 cmd, uint16 ackSeq)
{
	UdpOutPacket *out = new UdpOutPacket(this);
	createPacket(*out, cmd, ackSeq);
	return out;
}

void UdpSession::createPacket(UdpOutPacket &out, uint16 cmd, uint16 ackSeq)
{
	out.write16(udpVer);
	out.write32(0);
	out.write32(sid);
	out.write16(++sendSeq);
	out.write16(ackSeq);
	out.write16(cmd);
	out.write32(uin);
}

void UdpSession::sendAckPacket(uint16 cmd, uint16 seq)
{
	UdpOutPacket out(this);
	out.write16(udpVer);
	out.write32(0);
	out.write32(sid);
	out.write16(0);
	out.write16(seq);
	out.write16(cmd);
	out.write32(uin);
	sendDirect(&out);
}

void UdpSession::sendPacket(UdpOutPacket *p)
{
	p->attempts = 0;
	p->expire = time(NULL) + SEND_TIMEOUT;
	sendDirect(p);

	sendQueue.add(&p->sendItem);
	globalSendQueue.add(&p->globalSendItem);
}

bool UdpSession::setWindow(uint16 seq)
{
	if (seq >= recvSeq + 32 || seq < recvSeq)
		return false;

	if (seq == recvSeq) {
		do {
			recvSeq++;
			window >>= 1;
		} while (window & 0x1);
	} else {
		uint32 mask = (1 << (seq - recvSeq));
		if (window & mask)
			return false;
		else
			window |= mask;
	}
	return true;
}

bool UdpSession::onReceive(UdpInPacket &in)
{
	uint16 cmd = in.getCmd();
	uint16 seq = in.getSeq();

	if (cmd != UDP_LOGIN && cmd != UDP_NEW_UIN) {
		if (sid != in.getSID() || uin != in.getUIN()) {
			cout << "packet does not belong to this session." << endl;
			return false;
		}
		in.decrypt(passwd);

	} else if (sid == 0) {
		udpVer = in.getVersion();
		ip = in.getIP();
		port = in.getPort();
		recvSeq = seq;
		sid = in.getSID();
	}

	if (cmd != UDP_ACK && !setWindow(seq)) {
		cout << "packet " << seq << " is duplicated" << endl;
		sendAckPacket(cmd, seq);
		return false;
	}

	switch (cmd) {
	case UDP_ACK:
		onAck(seq);
		return true;

	case UDP_KEEPALIVE:
		onKeepAlive(in);
		break;

	case UDP_NEW_UIN:
		onNewUIN(in);
		break;

	case UDP_GET_CONTACTLIST:
		onGetContactList(in);
		break;

	case UDP_LOGIN:
		onLogin(in);
		break;

	case UDP_LOGOUT:
		onLogout(in);
		return true;

	case UDP_CHANGE_STATUS:
		sendAckPacket(cmd, seq);
		onChangeStatus(in);
		break;

	case UDP_UPDATE_CONTACT:
		onUpdateContact(in);
		break;

	case UDP_UPDATE_USER:
		onUpdateUser(in);
		break;

	case UDP_MODIFY_USER:
		sendAckPacket(cmd, seq);
		onModifyUser(in);
		break;

	case UDP_SEND_MSG:
		sendAckPacket(cmd, seq);
		onSendMessage(in);
		break;

	case UDP_GROUP_SEND_MSG:
		sendAckPacket(cmd, seq);
		//onGroupSendMessage(in);
		break;

	case UDP_SEARCH_RANDOM:
		onSearchRandom(in);
		break;

	case UDP_SEARCH_CUSTOM:
		onSearchCustom(in);
		break;

	case UDP_ADD_FRIEND:
		onAddFriend(in);
		break;

	case UDP_DEL_FRIEND:
		sendAckPacket(cmd, seq);
		onDelFriend(in);
		break;

	case UDP_BROADCAST_MSG:
		sendAckPacket(cmd, seq);
		onBroadcastMsg(in);
		break;

	default:
		cerr << "unknown cmd " << cmd << endl;
		return false;
	}
	return true;
}

void UdpSession::onAck(uint16 seq)
{
	IcqListItem *head = &sendQueue.head;
	IcqListItem *pos;
	UdpOutPacket *p;

	LIST_FOR_EACH(pos, head) {
		p = LIST_ENTRY(pos, UdpOutPacket, sendItem);
		if (p->getSeq() == seq) {
			cout << "packet " << seq << " is ACKed" << endl;
			pos->remove();
			p->globalSendItem.remove();
			delete p;
			if (status == STATUS_OFFLINE && sendQueue.isEmpty()) {
				cout << "session is dead!" << endl;
				delete this;
			}
			return;
		}
	}
	cout << "ACK packet(seq = " << seq << ") is ignored" << endl;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -