📄 agent.cpp
字号:
}
return false;
}
void
CAgent::completeAction(ITransport *transport, unsigned char *data, int size) {
TransportEntry *entry = findPlugEntry(transport);
if (entry) {
if (!entry->m_queue.empty()) {
if ((data != NULL) && (size > 0)) {
Action *ac = entry->m_queue.back();
ac->send_size = size;
ac->send_data = new unsigned char[size];
memcpy(ac->send_data, data, size);
}
}
}
}
// --------------------------------------------------------
bool
CAgent::sendRawData(ITransport *transport, unsigned char *data, int size, GUID reply_protocol, int reply_msg, int timeout) {
TransportEntry *entry = findPlugEntry(transport);
if (entry) {
// create a new action and stuff it in the queue
// the action is not complete yet, but it will be made
// complete in sendData
Action *ac = new Action();
ac->action.protocol = reply_protocol;
ac->action.msg = reply_msg; // all the timeout manager needs
ac->action.timeout = timeout; // all the timeout manager needs
ac->action.size = size;
ac->action.data = new unsigned char[size];
ac->send_data = ac->action.data;
ac->send_size = ac->action.size;
ac->send_pos = 0;
memcpy(ac->action.data, data, size); // data to be sent
// add the translated event data to the queue, and
// send the data if the queue is empty (otherwise the data
// is sent when the queue becomes empty)
setNextAction(transport, ac);
// increment the transport's queue size
entry->transport->IncActionCount();
return true;
}
return false;
}
// --------------------------------------------------------
void
CAgent::inheritedRecv(ITransport *transport, unsigned char *data, int size) {
TransportEntry *entry = findPlugEntry(transport);
if (entry) {
if (++entry->protocols_it != entry->protocols.rend()) {
SessionProtocol *slot = *entry->protocols_it;
slot->protocol->receive(slot->self, data, size);
}
}
}
// --------------------------------------------------------
void *
CAgent::getEventRefData(ITransport *transport, int reference) {
TransportEntry *entry = findPlugEntry(transport);
return (entry) ? entry->transport->GetReferenceData(reference) : NULL;
}
// --------------------------------------------------------
void
CAgent::connect(ITransport *transport, const char *host, int port, int timeout) {
TransportEntry *entry = findPlugEntry(transport);
if (entry) {
// make an event for the timeout handler
EpAction pm_action;
pm_action.msg = SYSTEM_CONNECT_TIMEOUT;
pm_action.protocol = CLSID_SYSTEM_PROTOCOL;
pm_action.size = 0;
pm_action.timeout = timeout;
pm_action.data = NULL;
// If a timeout is given, set up a timeout event
// The timeout can't be set after the connect, because then the
// actual connect could happen before the timeout is set up and
// the timeout will be raised even though a connection has been made.
if (timeout > 0)
TimeOutManager::getInstance()->addItem(transport, &pm_action);
// connect to the source... the actual connection is handled async
entry->transport->Connect(host, port);
}
}
void
CAgent::disconnect(ITransport *transport) {
TransportEntry *entry = findPlugEntry(transport);
if (entry) {
removeAllActions(transport);
entry->transport->Disconnect();
}
}
// --------------------------------------------------------
bool
CAgent::dispatchPacketSent(ITransport *transport, bool succeeded) {
TransportEntry *entry = findPlugEntry(transport);
if (entry) {
boost::mutex::scoped_lock scoped_lock(entry->m_mutex);
if (!entry->m_queue.empty()) {
// add the event to the timeout handler. this will
// generate a timeout event after a specified delay.
if (succeeded) {
Action *ac = entry->m_queue.front();
EpDispatchSystemEvent(transport, SYSTEM_DATA_OUT, ac->send_data, ac->send_size);
EpDispatchSystemEvent(transport, SYSTEM_SENT_SUCCEEDED, NULL, 0);
if (ac->action.timeout > 0) {
EpAction pm_action;
pm_action.protocol = ac->action.protocol;
pm_action.msg = ac->action.msg;
pm_action.timeout = ac->action.timeout;
pm_action.size = 0;
pm_action.data = NULL;
TimeOutManager::getInstance()->addItem(transport, &pm_action);
}
removeAction(transport);
} else {
EpDispatchSystemEvent(transport, SYSTEM_SENT_FAILED, NULL, 0);
while (!entry->m_queue.empty()) {
Action *ac = entry->m_queue.front();
if (ac->action.timeout > 0) {
EpAction pm_action;
pm_action.protocol = ac->action.protocol;
pm_action.msg = ac->action.msg;
pm_action.timeout = ac->action.timeout;
pm_action.size = 0;
pm_action.data = NULL;
TimeOutManager::getInstance()->addItem(transport, &pm_action);
}
removeAction(transport);
}
}
return true;
}
}
return false;
}
// --------------------------------------------------------
TransportEntry *
CAgent::findPlugEntry(ITransport *transport) {
PlugMap::iterator i = m_transports.find(transport);
return (i != m_transports.end()) ? i->second : NULL;
}
PlugMap* DLL_CALLCONV
CAgent::getTransportMap() {
return &m_transports;
}
void
CAgent::erasePlugEntry(ITransport *transport) {
PlugMap::iterator i = m_transports.find(transport);
if (i != m_transports.end())
m_transports.erase(i);
}
bool
CAgent::createProtocolAndAddToList(TransportEntry *entry, ITransport *transport, GUID guid) {
Protocol *p = EpProtocolHandle(guid);
if (p) {
SessionProtocol *sp = new SessionProtocol;
sp->guid = guid;
sp->ref_count = 1;
sp->protocol = p;
sp->self = p->create(transport);
entry->protocols.push_back(sp);
return true;
}
return false;
}
SessionProtocol *
CAgent::protocolFromGuid(ITransport *transport, GUID protocol) {
TransportEntry *entry = findPlugEntry(transport);
if (entry)
for (_STL::list<SessionProtocol *>::iterator i = entry->protocols.begin(); i != entry->protocols.end(); ++i)
if (IsEqualGUID((*i)->guid, protocol))
return (*i);
return NULL;
}
int DLL_CALLCONV
CAgent::getProtocolName(ITransport *transport, GUID protocol, char *name, int size) {
SessionProtocol *iif = protocolFromGuid(transport, protocol);
if (iif)
return iif->protocol->getname(iif->self, name, size);
return 0;
}
int DLL_CALLCONV
CAgent::getProtocolMsgName(ITransport *transport, GUID protocol, int msg, char *name, int size) {
SessionProtocol *iif = protocolFromGuid(transport, protocol);
if (iif)
return iif->protocol->getmessagename(iif->self, msg, name, size);
return 0;
}
void
CAgent::setNextAction(ITransport *transport, Action *ac) {
TransportEntry *entry = findPlugEntry(transport);
if (entry) {
boost::mutex::scoped_lock scoped_lock(entry->m_mutex);
entry->m_queue.push_back(ac);
}
}
Action *DLL_CALLCONV
CAgent::getNextAction(ITransport *transport) {
TransportEntry *entry = findPlugEntry(transport);
if (entry) {
boost::mutex::scoped_lock scoped_lock(entry->m_mutex);
if (!entry->m_queue.empty()) {
return entry->m_queue.front();
}
}
return NULL;
}
void
CAgent::removeAction(ITransport *transport) {
TransportEntry *entry = findPlugEntry(transport);
if (entry) {
boost::mutex::scoped_lock scoped_lock(entry->m_mutex);
if (!entry->m_queue.empty()) {
Action *ac = entry->m_queue.front();
entry->m_queue.pop_front();
delete ac;
}
}
}
void
CAgent::removeAllActions(ITransport *transport) {
TransportEntry *entry = findPlugEntry(transport);
if (entry) {
boost::mutex::scoped_lock scoped_lock(entry->m_mutex);
while (!entry->m_queue.empty()) {
Action *ac = entry->m_queue.front();
entry->m_queue.pop_front();
delete ac;
}
}
}
// --------------------------------------------------------
// Instantation function
// --------------------------------------------------------
HRESULT DLL_CALLCONV
AgentCreate(void **iif) {
CAgent *object = new CAgent;
if (object) {
*iif = object;
return S_OK;
}
return E_FAIL;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -