📄 transportudp.cpp
字号:
// ========================================================
// UDP/IP Multicast Transport implementation
//
// Design and Implementation by Floris van den Berg
// ========================================================
#pragma warning (disable : 4275)
#pragma warning (disable : 4786)
#include <winsock2.h>
#include <windows.h>
#include <process.h>
#include "Agent.h"
#include "OpenNet.h"
#include "EventDispatcher.h"
#include "TransportUDP.h"
#include "Utilities.h"
// --------------------------------------------------------
const int BUFFER_SIZE = 4096;
// --------------------------------------------------------
#define IP_MULTICAST_IF 9 // set/get IP multicast interface
#define IP_MULTICAST_TTL 10 // set/get IP multicast Time-To-Live
#define IP_MULTICAST_LOOP 11 // set/get IP multicast loopback
#define IP_ADD_MEMBERSHIP 12 // add (set) IP group membership
#define IP_DROP_MEMBERSHIP 13 // drop (set) IP group membership
#define IP_DEFAULT_MULTICAST_TTL 1
#define IP_DEFAULT_MULTICAST_LOOP 1
#define IP_MAX_MEMBERSHIPS 20
// --------------------------------------------------------
struct ip_mreq {
struct in_addr imr_multiaddr; // IP multicast address of group
struct in_addr imr_interface; // local IP address of interface
};
// --------------------------------------------------------
class CTransportUDP : public ITransport {
friend void CALLBACK RecvCompletion(DWORD dwError, DWORD cbTransferred, LPWSAOVERLAPPED lpOverlapped, DWORD dwFlags);
friend void CALLBACK SendCompletion(DWORD dwError, DWORD cbTransferred, LPWSAOVERLAPPED lpOverlapped, DWORD dwFlags);
public :
CTransportUDP();
virtual ~CTransportUDP();
public :
virtual bool DLL_CALLCONV DLL_CALLCONV Open(PROPERTYGROUP_HANDLE group);
virtual void DLL_CALLCONV PassHandleIndirect(long handle);
virtual void DLL_CALLCONV Close();
virtual void DLL_CALLCONV Connect(const char *host, int port);
virtual void DLL_CALLCONV Disconnect();
virtual bool DLL_CALLCONV SupportsSending();
virtual void DLL_CALLCONV Send(Action *action);
virtual void* DLL_CALLCONV GetReferenceData(int id);
virtual bool DLL_CALLCONV GetOption(int option, void *value, int *size);
virtual bool DLL_CALLCONV SetOption(int option, void *value);
virtual bool DLL_CALLCONV CanBeBalanced();
virtual void DLL_CALLCONV PollProgress();
virtual void DLL_CALLCONV IncActionCount();
private :
boost::mutex m_cs;
Action *m_action;
SOCKET m_socket;
OVERLAPPED m_read_overlapped;
OVERLAPPED m_send_overlapped;
HANDLE m_wait_event;
WSABUF m_recv_buffer;
WSABUF m_send_buffer;
bool m_send_pending;
sockaddr_in m_sockaddr_in;
sockaddr_in m_sockaddr_out;
sockaddr_in m_receive_address;
bool m_loop_failed;
unsigned int m_max_dgram_size;
_STL::string m_reference_data[1024];
int m_current_reference_id;
LONG m_action_count;
};
// --------------------------------------------------------
static void CALLBACK
RecvCompletion(DWORD dwError, DWORD cbTransferred, LPWSAOVERLAPPED lpOverlapped, DWORD dwFlags) {
CTransportUDP *object = (CTransportUDP *)lpOverlapped->hEvent;
// sync this function so that it doesn't get called when PollProgress is called
boost::mutex::scoped_lock scoped_lock(object->m_cs);
// store the ip of the current received packet
if (object->m_current_reference_id > 1025)
object->m_current_reference_id = 1;
object->m_reference_data[object->m_current_reference_id - 1] = inet_ntoa(object->m_receive_address.sin_addr);
// create an event that data arrived
EpDispatchSystemEvent(object, SYSTEM_DATA_IN, object->m_recv_buffer.buf, cbTransferred);
}
static void CALLBACK
SendCompletion(DWORD dwError, DWORD cbTransferred, LPWSAOVERLAPPED lpOverlapped, DWORD dwFlags) {
CTransportUDP *object = (CTransportUDP *)lpOverlapped->hEvent;
// the m_action must be NULL before we send SYSTEM_PACKET_SENT_SUCCEEDED
// this way the grid knows it can send a new action if any
Action *ac = object->m_action;
object->m_action = NULL;
object->m_send_pending = false;
// tell the grid that all data is sent
EpPacketSent(object, true);
}
// --------------------------------------------------------
CTransportUDP::CTransportUDP() :
m_cs(),
m_action(NULL),
m_socket(NULL),
m_read_overlapped(),
m_send_overlapped(),
m_wait_event(),
m_recv_buffer(),
m_send_buffer(),
m_send_pending(false),
m_sockaddr_in(),
m_sockaddr_out(),
m_receive_address(),
m_loop_failed(false),
m_max_dgram_size(0),
m_reference_data(),
m_current_reference_id(1),
m_action_count(0) {
memset(&m_recv_buffer, 0, sizeof(WSABUF));
memset(&m_read_overlapped, 0, sizeof(OVERLAPPED));
m_read_overlapped.hEvent = this;
memset(&m_send_buffer, 0, sizeof(WSABUF));
memset(&m_send_overlapped, 0, sizeof(OVERLAPPED));
m_send_overlapped.hEvent = this;
m_recv_buffer.len = BUFFER_SIZE;
m_recv_buffer.buf = new char[BUFFER_SIZE];
m_wait_event = WSACreateEvent();
}
CTransportUDP::~CTransportUDP() {
WSACloseEvent(m_wait_event);
delete [] m_recv_buffer.buf;
m_recv_buffer.buf = NULL;
}
// --------------------------------------------------------
// Open/Close/Connect/Disconnect a Plug
// --------------------------------------------------------
bool DLL_CALLCONV
CTransportUDP::Open(PROPERTYGROUP_HANDLE group) {
if (m_wait_event != WSA_INVALID_EVENT) {
m_socket = WSASocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, NULL, WSA_FLAG_OVERLAPPED);
// if we received a port, bind the udp socket to that port
if (group) {
int port = 0;
EpPropGetValueAsInt(group, "port", &port);
m_sockaddr_in.sin_family = PF_INET;
m_sockaddr_in.sin_port = htons(port);
m_sockaddr_in.sin_addr.s_addr = INADDR_ANY;
bind(m_socket, (sockaddr *)&m_sockaddr_in, sizeof(m_sockaddr_in));
}
// set up a wait event so that we can check for state changes and received data
WSAEventSelect(m_socket, m_wait_event, FD_READ | FD_WRITE);
return true;
}
return false;
}
void DLL_CALLCONV
CTransportUDP::PassHandleIndirect(long handle) {
}
void DLL_CALLCONV
CTransportUDP::Close() {
// stop sending
m_action = NULL;
m_send_pending = false;
m_action_count = 0;
// be gone with thou socket!
if (m_socket) {
closesocket(m_socket);
m_socket = NULL;
}
}
void DLL_CALLCONV
CTransportUDP::Connect(const char *host, int port) {
}
void DLL_CALLCONV
CTransportUDP::Disconnect() {
boost::mutex::scoped_lock scoped_lock(m_cs);
m_action = NULL;
m_action_count = 0;
}
bool DLL_CALLCONV
CTransportUDP::SupportsSending() {
return true;
}
void DLL_CALLCONV
CTransportUDP::Send(Action *ac) {
boost::mutex::scoped_lock scoped_lock(m_cs);
assert(m_action == NULL);
m_action = ac;
}
void * DLL_CALLCONV
CTransportUDP::GetReferenceData(int id) {
if ((id > 0) && (id <= 1025)) {
boost::mutex::scoped_lock scoped_lock(m_cs);
return (void *)m_reference_data[id - 1].c_str();
}
return NULL;
}
bool DLL_CALLCONV
CTransportUDP::GetOption(int option, void *value, int *size) {
return false;
}
bool DLL_CALLCONV
CTransportUDP::SetOption(int option, void *value) {
boost::mutex::scoped_lock scoped_lock(m_cs);
// multicast structure
ip_mreq m_multicast_req;
m_multicast_req.imr_multiaddr.s_addr = m_sockaddr_out.sin_addr.s_addr;
m_multicast_req.imr_interface.s_addr = INADDR_ANY;
// target address
UDPTarget *target = (UDPTarget *)value;
// set the option
switch(option) {
case UDP_TARGET :
memset(&m_sockaddr_out, 0, sizeof(sockaddr_in));
m_sockaddr_out.sin_family = AF_INET;
m_sockaddr_out.sin_addr.s_addr = TCPIPResolveHost(target->address);
m_sockaddr_out.sin_port = htons(target->port);
return true;
case UDP_MULTICAST_LOOP :
{
u_char loop = *((u_char *)value) == 1;
setsockopt(m_socket, IPPROTO_IP, IP_MULTICAST_LOOP, (char *)&loop, sizeof(loop));
return true;
}
case UDP_MULTICAST_TTL :
{
u_char ttl = *((unsigned char *)value);
if (ttl < 0)
ttl = 0;
if (ttl > 255)
ttl = 255;
setsockopt(m_socket, IPPROTO_IP, IP_MULTICAST_TTL, (char *)&ttl, sizeof(ttl));
return true;
}
case UDP_MULTICAST_ADDMEMBERSHIP :
setsockopt(m_socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *)&m_multicast_req, sizeof(ip_mreq));
return true;
case UDP_MULTICAST_DROPMEMBERSHIP :
setsockopt(m_socket, IPPROTO_IP, IP_DROP_MEMBERSHIP, (char *)&m_multicast_req, sizeof(ip_mreq));
return true;
};
return false;
}
bool DLL_CALLCONV
CTransportUDP::CanBeBalanced() {
return true;
}
void DLL_CALLCONV
CTransportUDP::PollProgress() {
boost::mutex::scoped_lock scoped_lock(m_cs);
// check for a new action to send, if we aren't sending anything
if ((m_action == NULL) && (!m_send_pending) && (m_action_count > 0)) {
--m_action_count;
m_action = EpGetNextAction(this);
}
// create an event to be passed to grid
EpEvent pm_event;
pm_event.reference_id = 0;
pm_event.protocol = CLSID_SYSTEM_PROTOCOL;
pm_event.msg = 0;
pm_event.size = 0;
pm_event.data = 0;
// if there is data to write, write it
if ((m_action) && (!m_send_pending)) {
DWORD bytes_sent = 0;
memset(&m_send_buffer, 0, sizeof(WSABUF));
m_send_buffer.len = m_action->send_size;
m_send_buffer.buf = (char *)m_action->send_data;
if (WSASendTo(m_socket, &m_send_buffer, 1, &bytes_sent, 0, (sockaddr *)&m_sockaddr_out, sizeof(sockaddr_in), &m_send_overlapped, SendCompletion) == SOCKET_ERROR) {
DWORD error_code = WSAGetLastError();
if (error_code != WSA_IO_PENDING) {
m_action = NULL;
m_send_pending = false;
m_action_count = 0;
switch(error_code) {
case WSAENETDOWN :
pm_event.msg = SYSTEM_TCPIP_SUBSYSTEM_FAILED;
EpDispatchEvent(this, &pm_event);
break;
case WSAENETRESET :
pm_event.msg = SYSTEM_TCPIP_NET_RESET;
EpDispatchEvent(this, &pm_event);
break;
case WSAENOBUFS :
pm_event.msg = SYSTEM_TCPIP_NO_BUFFERSPACE;
EpDispatchEvent(this, &pm_event);
break;
case WSAENOTCONN :
pm_event.msg = SYSTEM_TCPIP_NOT_CONNECTED;
EpDispatchEvent(this, &pm_event);
break;
case WSAECONNRESET :
pm_event.msg = SYSTEM_TCPIP_CONNECTION_RESET;
EpDispatchEvent(this, &pm_event);
break;
case WSAECONNABORTED :
pm_event.msg = SYSTEM_TCPIP_CONNECTION_ABORTED;
EpDispatchEvent(this, &pm_event);
break;
case WSA_OPERATION_ABORTED :
pm_event.msg = SYSTEM_TCPIP_OPERATION_ABORTED;
EpDispatchEvent(this, &pm_event);
break;
default :
pm_event.msg = SYSTEM_TCPIP_UNIMPLEMENTED;
pm_event.data = (unsigned char *)&error_code;
pm_event.size = 4;
EpDispatchEvent(this, &pm_event);
pm_event.data = NULL;
pm_event.size = 0;
break;
};
EpPacketSent(this, false);
} else {
m_send_pending = true;
}
}
}
// see if there are any incoming events
WSANETWORKEVENTS ne;
memset(&ne, 0, sizeof(ne));
if (WSAEnumNetworkEvents(m_socket, m_wait_event, &ne) == 0) {
if ((ne.lNetworkEvents & FD_READ) == FD_READ) {
if (ne.iErrorCode[FD_READ_BIT] != 0) {
m_action = NULL;
m_send_pending = false;
m_action_count = 0;
switch(ne.iErrorCode[FD_READ_BIT]) {
case WSAENETDOWN :
pm_event.msg = SYSTEM_TCPIP_SUBSYSTEM_FAILED;
EpDispatchEvent(this, &pm_event);
break;
default :
pm_event.msg = SYSTEM_TCPIP_UNIMPLEMENTED;
pm_event.data = (unsigned char *)&ne.iErrorCode[FD_READ_BIT];
pm_event.size = 4;
EpDispatchEvent(this, &pm_event);
pm_event.data = NULL;
pm_event.size = 0;
break;
};
} else {
DWORD flags = 0;
u_long read = 0;
memset(&m_receive_address, 0, sizeof(sockaddr_in));
int receive_address_size = sizeof(sockaddr_in);
WSARecvFrom(m_socket, &m_recv_buffer, 1, &read, &flags, (sockaddr *)&m_receive_address, &receive_address_size, &m_read_overlapped, RecvCompletion);
}
}
}
}
void DLL_CALLCONV
CTransportUDP::IncActionCount() {
boost::mutex::scoped_lock scoped_lock(m_cs);
++m_action_count;
}
// --------------------------------------------------------
// Instantation function
// --------------------------------------------------------
HRESULT DLL_CALLCONV
UDPCreate(void **iif) {
CTransportUDP *object = new CTransportUDP;
if (object) {
*iif = object;
return S_OK;
}
return E_FAIL;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -