📄 agent.cpp
字号:
// ========================================================
// Agent implementation
//
// Design and Implementation by Floris van den Berg
// ========================================================
#pragma warning (disable : 4275)
#pragma warning (disable : 4786)
#include <assert.h>
#include "Agent.h"
#include "OpenNet.h"
#include "OpenNetExtensions.h"
#include "EventDispatcher.h"
#include "ThreadBalancer.h"
#include "TimeoutManager.h"
#include "Utilities.h"
#include "TransportSerial.h"
#include "TransportTCPIP.h"
#include "TransportTCPIPServer.h"
#include "TransportUDP.h"
// --------------------------------------------------------
class CAgent : public IAgent {
friend struct DispatchFunctor;
friend struct SystemFunctor;
public :
CAgent();
~CAgent();
virtual void DLL_CALLCONV enableDebugEvents(ITransport *transport, bool enable);
virtual TRANSPORT_HANDLE DLL_CALLCONV createTransport(const char *properties, CallbackProc callback, bool enable_debug_events, void *data);
virtual void DLL_CALLCONV destroyTransport(ITransport *transport);
virtual bool DLL_CALLCONV getOption(ITransport *transport, int option, void *value, int *size);
virtual bool DLL_CALLCONV setOption(ITransport *transport, int option, void *value);
virtual bool DLL_CALLCONV addProtocol(ITransport *plug, GUID *protocol_id);
virtual void DLL_CALLCONV resetProtocol(ITransport *transport, GUID protocol);
virtual void DLL_CALLCONV connect(ITransport *transport, const char *host, int port, int timeout);
virtual void DLL_CALLCONV disconnect(ITransport *transport);
virtual bool DLL_CALLCONV sendAction(ITransport *transport, EpAction *action);
virtual void DLL_CALLCONV completeAction(ITransport *transport, unsigned char *data, int size);
virtual bool DLL_CALLCONV sendRawData(ITransport *transport, unsigned char *data, int size, GUID reply_protocol, int reply_msg, int timeout);
virtual void DLL_CALLCONV inheritedRecv(ITransport *transport, unsigned char *data, int size);
virtual void* DLL_CALLCONV getEventRefData(ITransport *transport, int reference);
virtual bool DLL_CALLCONV dispatchPacketSent(ITransport *transport, bool succeeded);
virtual TransportEntry* DLL_CALLCONV findPlugEntry(ITransport *transport);
virtual PlugMap* DLL_CALLCONV getTransportMap();
virtual int DLL_CALLCONV getProtocolName(ITransport *transport, GUID protocol, char *name, int size);
virtual int DLL_CALLCONV getProtocolMsgName(ITransport *transport, GUID protocol, int msg, char *name, int size);
virtual Action *DLL_CALLCONV getNextAction(ITransport *transport);
private :
ITransport *allocateTransport(PROPERTYGROUP_HANDLE group, const char *type = NULL);
void erasePlugEntry(ITransport *transport);
bool createProtocolAndAddToList(TransportEntry *entry, ITransport *transport, GUID guid);
SessionProtocol *protocolFromGuid(ITransport *transport, GUID protocol);
void setNextAction(ITransport *transport, Action *ac);
void removeAction(ITransport *transport);
void removeAllActions(ITransport *transport);
private :
LONG m_ref_count;
PlugMap m_transports;
};
// --------------------------------------------------------
CAgent::CAgent() :
m_ref_count(0),
m_transports() {
}
CAgent::~CAgent() {
// destroy all remaining transports
while (!m_transports.empty())
destroyTransport(m_transports.begin()->first);
// remove this transport from the thread balancer
TimeOutManager::getInstance()->removeItems();
EventDispatcher::getInstance()->clearPendingEventEntries();
}
// --------------------------------------------------------
void DLL_CALLCONV
CAgent::enableDebugEvents(ITransport *transport, bool enable) {
TransportEntry *entry = findPlugEntry(transport);
if (entry)
entry->m_enable_debug_events = enable;
}
// --------------------------------------------------------
ITransport *
CAgent::allocateTransport(PROPERTYGROUP_HANDLE group, const char *type) {
ITransport *transport = NULL;
std::string real_type("", 1024);
if (type) {
strcpy(&real_type[0], type);
} else {
EpPropGetType(group, &real_type[0]);
}
if (stricmp(real_type.c_str(), "indirect") == 0) {
std::string indirect_type("", 1024);
EpPropGetValue(group, "type", &indirect_type[0]);
return allocateTransport(group, indirect_type.c_str());
} else if (stricmp(real_type.c_str(), "serial") == 0) {
SerialCreate((void **)&transport);;
} else if (stricmp(real_type.c_str(), "tcp") == 0) {
TCPIPCreate((void **)&transport);;
} else if (stricmp(real_type.c_str(), "tcpserver") == 0) {
TCPIPServerCreate((void **)&transport);;
} else if (stricmp(real_type.c_str(), "udp") == 0) {
UDPCreate((void **)&transport);;
}
return transport;
}
// --------------------------------------------------------
TRANSPORT_HANDLE
CAgent::createTransport(const char *properties, CallbackProc callback, bool enable_debug_events, void *data) {
TransportEntry *entry = NULL;
PROPERTYGROUP_HANDLE group = NULL;
bool indirect = false;
try {
group = EpCreatePropertyGroup();
if (EpLoadProperties(group, properties)) {
ITransport *transport = allocateTransport(group);
if (transport) {
entry = new TransportEntry;
entry->transport = transport;
entry->callback = callback;
entry->data = data;
entry->m_enable_debug_events = enable_debug_events;
if ((EpPropGetValueAsBool(group, "indirect", &indirect)) && (indirect)) {
DWORD handle = 0;
if (!EpPropGetValueAsInt(&group, "handle", (int *)&handle))
throw 0;
entry->transport->PassHandleIndirect(handle);
} else if (!entry->transport->Open(group)) {
throw 0;
}
// add the default system protocol
createProtocolAndAddToList(entry, transport, CLSID_SYSTEM_PROTOCOL);
// add transport to internal queue
m_transports[entry->transport] = entry;
entry->thread_entry = ThreadBalancer::getInstance()->addTransport(entry->transport, entry->transport->CanBeBalanced());
EpDestroyPropertyGroup(group);
return entry->transport;
}
}
} catch(int) {
delete entry; entry = NULL;
EpDestroyPropertyGroup(group); group = NULL;
}
return NULL;
}
void
CAgent::destroyTransport(ITransport *transport) {
TransportEntry *entry = findPlugEntry(transport);
if (entry) {
// remove this transport from the thread balancer
ThreadBalancer::getInstance()->removeTransport(entry->thread_entry);
// remove all pending timeout messages for this transport
TimeOutManager::getInstance()->removeItems(transport);
// remove all pending to-be-dispatched events
EventDispatcher::getInstance()->clearPendingEventEntries(transport);
// erase the transport from the list
erasePlugEntry(transport);
// close the transport
transport->Close();
// be gone with all actions
removeAllActions(transport);
// destroy the transport
delete transport;
// destroy the transport entry
delete entry;
}
}
// --------------------------------------------------------
bool DLL_CALLCONV
CAgent::addProtocol(ITransport *transport, GUID *protocol_id) {
Protocol *protocol = EpProtocolHandle(*protocol_id);
if (protocol) {
TransportEntry *entry = findPlugEntry(transport);
if (entry) {
for (ProtocolList::iterator jt = entry->protocols.begin(); jt != entry->protocols.end(); ++jt) {
if (IsEqualGUID((*jt)->guid, *protocol_id)) {
++(*jt)->ref_count;
return true;
}
}
return createProtocolAndAddToList(entry, transport, *protocol_id);
}
}
return false;
}
void
CAgent::resetProtocol(ITransport *transport, GUID protocol) {
TransportEntry *entry = findPlugEntry(transport);
if (entry) {
for (ProtocolList::iterator it = entry->protocols.begin(); it != entry->protocols.end(); ++it) {
if (IsEqualGUID((*it)->guid, protocol)) {
(*it)->protocol->reset((*it)->self);
break;
}
}
}
}
// --------------------------------------------------------
bool
CAgent::getOption(ITransport *transport, int option, void *value, int *size) {
return transport->GetOption(option, value, size);
}
bool
CAgent::setOption(ITransport *transport, int option, void *value) {
return transport->SetOption(option, value);
}
// --------------------------------------------------------
bool
CAgent::sendAction(ITransport *transport, EpAction *action) {
TransportEntry *entry = findPlugEntry(transport);
if (entry) {
if (entry->transport->SupportsSending()) {
// create a new action and stuff it in the queue
// the action is not complete yet, but it will be made
// complete in CompleteEvent
Action *ac = new Action;
ac->action.protocol = action->protocol;
ac->action.msg = action->msg;
ac->action.timeout = action->timeout;
if ((action->data != NULL) && (action->size > 0)) {
ac->action.size = action->size;
ac->action.data = new unsigned char[action->size];
memcpy(ac->action.data, action->data, action->size);
} else {
ac->action.size = 0;
ac->action.data = NULL;
}
// add the action to the end of the send queue
setNextAction(transport, ac);
// complete the data in the queue, by sending it to the correct protocol
if (!IsEqualGUID(ac->action.protocol, CLSID_SYSTEM_PROTOCOL)) {
for (_STL::list<SessionProtocol *>::iterator i = entry->protocols.begin(); i != entry->protocols.end(); ++i) {
if (IsEqualGUID(ac->action.protocol, (*i)->guid)) {
(*i)->protocol->send((*i)->self, &ac->action);
if ((ac->send_data != NULL) && (ac->send_size > 0)) {
transport->IncActionCount();
return true;
}
}
}
} else if ((ac->action.data != NULL) && (ac->action.size > 0)) {
ac->send_data = ac->action.data;
ac->send_size = ac->action.size;
transport->IncActionCount();
return true;
}
// completing the action failed. remove it from the queue again
removeAction(transport);
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -