📄 receiverthread.cpp
字号:
/*************************************************************************** * Copyright (C) 2007 by Anistratov Oleg * * ower@users.sourceforge.net * * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU General Public License version 2 * * as published by the Free Software Foundation; * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * ***************************************************************************/#include <assert.h>#include <QCoreApplication>#include "largedatagram.h"#include "receiverthread.h"#include "abstractchatcore.h"ReceiverThread::ReceiverThread(QObject *parent) : QThread (parent), m_opened (false), m_finished (false), m_datagrams (NULL), m_datagramsNum (0), m_datagramsMaxNum(0), m_bufferSize (65535), m_socket (NULL), m_socketNotifier (NULL), m_port (61108){ qDebug("[ReceiverThread]: constructor begin"); moveToThread(this); m_buffer = (char*)malloc(m_bufferSize); assert(NULL != m_buffer); qDebug("[ReceiverThread]: constructor end");}//\*****************************************************************************ReceiverThread::~ReceiverThread(){ qDebug("[~ReceiverThread]"); delete m_socketNotifier; free(m_buffer);}//\*****************************************************************************void ReceiverThread::run(){ m_socket = new QUdpSocket(this); exec();}//\*****************************************************************************void ReceiverThread::changePort(quint16 port_){ Q_ASSERT(currentThread() == this); qDebug("[ReceiverThread::setPort]: port = %d", port_); if(m_opened) m_socket->close(); qDebug("[ReceiverThread::setPort]: closed"); m_opened = false; m_port = port_; m_opened = m_socket->bind(m_port); qDebug("[ReceiverThread::setPort]: opened = %d", m_opened); if(m_opened == 0) { emit openSocketError(port_); return; } if(m_socketNotifier) disconnect(m_socketNotifier, SIGNAL(activated(int)), this, SLOT(receiving())); delete m_socketNotifier; m_socketNotifier = NULL; m_socketNotifier = new QSocketNotifier(m_socket->socketDescriptor(), QSocketNotifier::Read, this); connect(m_socketNotifier, SIGNAL(activated(int)), this, SLOT(receiving()));}//\*****************************************************************************LargeDatagram* ReceiverThread::findDatagram(quint64 IP, quint32 ID) const{ for(quint32 i = 0; i < m_datagramsNum; i++) if(m_datagrams[i]->cmp(IP, ID)) return m_datagrams[i]; return NULL;}//\*****************************************************************************void ReceiverThread::removeDatagram(LargeDatagram* dtgrm){ if(!dtgrm) return; for(quint32 i = 0; i < m_datagramsNum; i++) if(m_datagrams[i] == dtgrm) { --m_datagramsNum; m_datagrams[i] = m_datagrams[m_datagramsNum]; }}//\*****************************************************************************LargeDatagram* ReceiverThread::addDatagram(quint64 IP, quint32 ID){ Q_ASSERT(currentThread() == this); LargeDatagram* dtgrm = new LargeDatagram(IP, ID, this); LargeDatagram** tmp; dtgrm->moveToThread(this); qDebug("[ReceiverThread::addDatagram]: adding(%lu, %lu)", (unsigned long)IP, (unsigned long)ID); if(dtgrm) { m_datagramsNum++; if(m_datagramsMaxNum < m_datagramsNum) { m_datagramsMaxNum++; tmp = (LargeDatagram**)realloc(m_datagrams, m_datagramsMaxNum * sizeof(LargeDatagram*)); if(!tmp) { delete dtgrm; return NULL; } m_datagrams = tmp; } m_datagrams[m_datagramsNum - 1] = dtgrm; connect(dtgrm, SIGNAL( wantDie(LargeDatagram*)), this, SLOT(deleteDatagram(LargeDatagram*))); connect(dtgrm, SIGNAL(completed(LargeDatagram*)), this, SLOT(deleteDatagram(LargeDatagram*))); connect(dtgrm, SIGNAL(wantFragments(char*, quint32, quint32, quint64)), this, SIGNAL(wantFragments(char*, quint32, quint32, quint64))); connect(dtgrm, SIGNAL(percentsRemain(quint8, quint16, quint64)), this, SIGNAL(percentsRemain(quint8, quint16, quint64))); connect(dtgrm, SIGNAL(readyReceive (quint16, quint64)), this, SIGNAL(readyReceive (quint16, quint64))); } return dtgrm;}//\*****************************************************************************quint32 ReceiverThread::getValidID(quint64 IP) const{ for(quint32 i = 1; i > 0; i++) if(!findDatagram(IP, i)) return i; return 0;}//\*****************************************************************************void ReceiverThread::receiving(){ quint32 dtgrmNum; quint32 dtgrmID; quint64 sender_uid; // IP for serverless mode or UserID for server mode int dataSize; LargeDatagram* dtgrm; quint8 type; char* buffer = m_buffer; m_opened = true; if(m_opened) { if((dataSize = readData(m_buffer, m_bufferSize)) <= 0) qWarning("[ReceiverThread::receiving]: Cannot read data from socket(return code = %d)", dataSize); else { if(dataSize >= (int)AbstractChatCore::protocolLen() && AbstractChatCore::checkProtocolVersion(m_buffer)) { if(AbstractChatCore::compressed(m_buffer)) { uint size = dataSize; buffer = AbstractChatCore::uncompress((const char*)m_buffer, size); dataSize = size; if(!buffer) return; AbstractChatCore::setProgramVersion(buffer); } else buffer = m_buffer; type = AbstractChatCore::packetType(buffer); sender_uid = AbstractChatCore::srcIp (buffer); dtgrmID = AbstractChatCore::packetId (buffer); dtgrmNum = AbstractChatCore::packetNum (buffer);// qDebug("[ReceiverThread::run]: sender_uid = %s", QHostAddress(sender_uid).toString().toAscii().data()); if(type == AbstractChatCore::SINGLE_MESSAGE) qDebug("[ReceiverThread::run]: SINGLE_MESSAGE"); if(type == AbstractChatCore::FRAGMENTS_REQUEST) { qDebug("[ReceiverThread::run]: FRAGMENTS_REQUEST"); emit fragmentsRequest(buffer, dataSize); } else if(type == AbstractChatCore::FINISHED) { qDebug("[ReceiverThread::run]: FINISHED!");// emit dtgrmFinished(dtgrmID); } else if(dtgrmID != 0) { switch(type) { case AbstractChatCore::CONFIRM : qDebug("[ReceiverThread::run]: CONFIRM| dtgrmID = %d, dtgrmNum = %d", dtgrmID, dtgrmNum); emit percentsConfirmed(dtgrmNum, dtgrmID, sender_uid); break; case AbstractChatCore::ACCEPT: qDebug("[ReceiverThread::run]: ACCEPT"); emit receivingAccepted(dtgrmID); break; case AbstractChatCore::REJECT: qDebug("[ReceiverThread::run]: REJECT"); qDebug("[ReceiverThread::run]: reason = %d", dtgrmNum); if(dtgrmNum == 0) emit receivingRejected(dtgrmID); else if(dtgrmNum == 1) emit receivingCancelled(dtgrmID); else if(dtgrmNum == 2) { deleteDatagram(findDatagram(sender_uid, dtgrmID)); emit sendingCancelled(dtgrmID, sender_uid); } break; default: if(!(dtgrm = findDatagram(sender_uid, dtgrmID))) { qDebug("[ReceiverThread::run]: ADDING DATAGRAM"); dtgrm = addDatagram(sender_uid, dtgrmID); if(NULL == dtgrm) return; } if(dtgrmID != 0) { if(!dtgrmNum) { dtgrm->initDatagram(buffer, dataSize); if(type == AbstractChatCore::FILE) emit wantReceiveFile(dtgrm->filename(), dtgrmID, sender_uid); } else if(type == AbstractChatCore::FILE) { dtgrm->addFileFragment(buffer, dataSize); if(dtgrm->complete()) qDebug("[ReceiverThread::run]: File COMPLETED!"); } else { dtgrm->addFragment(buffer, dataSize); if(dtgrm->complete()) { qDebug("[ReceiverThread::run]: COMPLETED!"); emit largeDataReceived(dtgrm); } } }// default: // if(dtgrmID != 0) } // switch(type) } // else if(dtgrmID != 0) else { qDebug("[ReceiverThread::run]: DATA_RECEIVED!"); emit dataReceived(datadup(buffer, dataSize), dataSize); } }// if(dataSize >= (int)AbstractChatCore::protocolLen() && !strncmp(m_buffer, AbstractChatCore::programId(), strlen(AbstractChatCore::programId()))) } }}//\*****************************************************************************void ReceiverThread::deleteDatagram(LargeDatagram* dtgrm){ Q_ASSERT(currentThread() == this); if(!dtgrm) return; qDebug("[ReceiverThread::deleteDatagram]"); if(!dtgrm->complete()) emit receivingTimeout(dtgrm->id(), dtgrm->ip()); removeDatagram(dtgrm); delete dtgrm;}//\*****************************************************************************void ReceiverThread::slot_acceptDatagram(const QString & filename, quint16 ID, quint64 IP){ qDebug("[ReceiverThread::slot_acceptDatagram]"); LargeDatagram* dtgrm = findDatagram(IP, ID); if(dtgrm) dtgrm->slot_initFile(filename); else qWarning("[ReceiverThread::slot_acceptDatagram]: [error] dtgrm is NULL");}//\*****************************************************************************void ReceiverThread::slot_rejectDatagram(quint16 ID, quint64 IP){ deleteDatagram(findDatagram(IP, ID));}int ReceiverThread::readData(char* buffer, uint bufferSize){ int dataSize = 0; if(m_socket->hasPendingDatagrams()) {#if defined (Q_OS_WIN) m_socketNotifier->setEnabled(false); dataSize = m_socket->readDatagram(buffer, bufferSize); m_socketNotifier->setEnabled(true);#else dataSize = m_socket->readDatagram(buffer, bufferSize);#endif if(dataSize < 0) qWarning("[ReceiverThread::readData]: datagram read error"); } return dataSize;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -