msgqueue.cpp
来自「JdonFramework need above jdk 1.4.0 This」· C++ 代码 · 共 639 行 · 第 1/2 页
CPP
639 行
++m_msgCount; unlock(); return newMsg;}CSNMPMessage *CSNMPMessageQueue::GetEntry(const unsigned long uniqueId){ CSNMPMessageQueueElt *msgEltPtr = m_head.GetNext(); CSNMPMessage *returnVal = NULL; while (msgEltPtr){ if ((returnVal = msgEltPtr->TestId(uniqueId))) return returnVal; msgEltPtr = msgEltPtr->GetNext(); } return 0;}int CSNMPMessageQueue::DeleteEntry(const unsigned long uniqueId){ CSNMPMessageQueueElt *msgEltPtr = m_head.GetNext(); while (msgEltPtr){ if (msgEltPtr->TestId(uniqueId)) { delete msgEltPtr; m_msgCount--; return SNMP_CLASS_SUCCESS; } msgEltPtr = msgEltPtr->GetNext(); } return SNMP_CLASS_INVALID_REQID;}void CSNMPMessageQueue::DeleteSocketEntry(const SnmpSocket socket)REENTRANT({ CSNMPMessageQueueElt *msgEltPtr = m_head.GetNext(); CSNMPMessageQueueElt *tmp_msgEltPtr; CSNMPMessage *msg = NULL; while (msgEltPtr){ msg = msgEltPtr->GetMessage(); if (socket == msg->GetSocket()) { // Make a callback with an error (void) msg->Callback(SNMP_CLASS_SESSION_DESTROYED); tmp_msgEltPtr = msgEltPtr; msgEltPtr = tmp_msgEltPtr->GetNext(); // delete the entry delete tmp_msgEltPtr; } else msgEltPtr = msgEltPtr->GetNext(); }})CSNMPMessage * CSNMPMessageQueue::GetNextTimeoutEntry(){ CSNMPMessageQueueElt *msgEltPtr = m_head.GetNext(); msec bestTime; msec sendTime(bestTime); CSNMPMessage *msg; CSNMPMessage *bestmsg = NULL; if (msgEltPtr) { bestmsg = msgEltPtr->GetMessage(); bestmsg->GetSendTime(bestTime); } // This would be much simpler if the queue was an ordered list! while (msgEltPtr){ msg = msgEltPtr->GetMessage(); msg->GetSendTime(sendTime); if (bestTime > sendTime) { bestTime = sendTime; bestmsg = msg; } msgEltPtr = msgEltPtr->GetNext(); } return bestmsg;}int CSNMPMessageQueue::GetNextTimeout(msec &sendTime){ CSNMPMessage *msg = GetNextTimeoutEntry(); if (!msg) return 1; // nothing in the queue... msg->GetSendTime(sendTime); return 0;}void CSNMPMessageQueue::GetFdSets(int &maxfds, fd_set &readfds, fd_set &, fd_set &) REENTRANT ({ CSNMPMessageQueueElt *msgEltPtr = m_head.GetNext(); SnmpSocket sock; while (msgEltPtr){ sock = msgEltPtr->GetMessage()->GetSocket(); FD_SET(sock, &readfds); if (maxfds < sock+1) maxfds = SAFE_INT_CAST(sock+1); msgEltPtr = msgEltPtr->GetNext(); }})void CSNMPMessageQueue::PushId(const unsigned long id) REENTRANT ({ unsigned long *newStack = 0; // check whether stack is too small or much too big if ((!m_idStack) || (m_stackSize < m_stackTop+1) || (m_stackSize - m_stackTop > 50)) { newStack = new unsigned long [m_stackTop+10]; m_stackSize = m_stackTop+10; } if (newStack) { newStack[m_stackTop] = id; if (m_idStack) { for (int i=0; i< m_stackTop; i++) newStack[i] = m_idStack[i]; delete [] m_idStack; } m_idStack = newStack; m_stackTop++; } else m_idStack[m_stackTop++] = id;})unsigned long CSNMPMessageQueue::PeekId() REENTRANT ({ return m_idStack[m_stackTop - 1];})int CSNMPMessageQueue::HandleEvents(const int maxfds, const fd_set &readfds, const fd_set &, const fd_set &){ CSNMPMessage *msg; UdpAddress fromaddress; Pdu tmppdu; unsigned long temp_req_id; int status; int recv_status; fd_set snmp_readfds, snmp_writefds, snmp_errfds; int tmp_maxfds = maxfds; // Only read from our own fds FD_ZERO(&snmp_readfds); FD_ZERO(&snmp_writefds); FD_ZERO(&snmp_errfds); GetFdSets(tmp_maxfds, snmp_readfds, snmp_writefds, snmp_errfds); for (int fd = 0; fd < maxfds; fd++) { if ((FD_ISSET(fd, &snmp_readfds)) && (FD_ISSET(fd, &readfds))) { OctetStr engine_id; tmppdu.set_request_id(0); // get the response and put it into a Pdu recv_status = receive_snmp_response(fd, *m_snmpSession, tmppdu, fromaddress, engine_id); lock(); // find the corresponding msg in the message queue temp_req_id = tmppdu.get_request_id(); msg = GetEntry(temp_req_id); if (!msg) { unlock(); // the sent message is gone! probably was canceled, ignore it continue; } if (tmppdu.get_request_id()) { // we correctly received the pdu // save it back into the message status = msg->SetPdu(recv_status, tmppdu, fromaddress); if (status != 0) { // received pdu does not match // @todo if version is SNMPv3 we must return a report // unknown pdu handler! unlock(); continue; }#ifdef _SNMPv3 if (engine_id.len() > 0) { SnmpTarget *target = msg->GetTarget(); if ((target->get_type() == SnmpTarget::type_utarget) && (target->get_version() == version3)) { UdpAddress addr = target->get_address(); LOG_BEGIN(DEBUG_LOG | 14); LOG("MsgQueue: Adding engine id to table (addr) (id)"); LOG(addr.get_printable()); LOG(engine_id.get_printable()); LOG_END; v3MP::I->add_to_engine_id_table(engine_id, (char*)addr.IpAddress::get_printable(), addr.get_port()); } }#endif // Do the callback unlock(); status = msg->Callback(SNMP_CLASS_ASYNC_RESPONSE); lock(); if (!status) { // this is an asynch response and the callback is done. // no need to keep this message around; // Dequeue the message DeleteEntry(temp_req_id); } } unlock(); } // if socket has data } // for all sockets return SNMP_CLASS_SUCCESS;}int CSNMPMessageQueue::DoRetries(const msec &now){ CSNMPMessage *msg; msec sendTime(0, 0); int status = SNMP_CLASS_SUCCESS; lock(); while ((msg = GetNextTimeoutEntry())) { msg->GetSendTime(sendTime); if (sendTime <= now) { // send out the message again unlock(); status = msg->ResendMessage(); lock(); if (status != 0) { if (status == SNMP_CLASS_TIMEOUT) { unsigned long req_id = msg->GetId(); // Dequeue the message DeleteEntry(req_id);#ifdef _SNMPv3 // delete entry in cache if (v3MP::I) v3MP::I->delete_from_cache(req_id); LOG_BEGIN(INFO_LOG | 6); LOG("MsgQueue: Message timed out, removed id from v3MP cache (rid)"); LOG(req_id); LOG_END;#endif } else { // Some other send error, should we dequeue the message? // do we really want to return without processing the rest? unlock(); return status; } } } else { break; // the next timeout is still in the future...so we are done } } unlock(); return status;}int CSNMPMessageQueue::Done(){ unsigned long id; if ((id = PeekId())) { // we were looking for a req_id. Did we find it? lock(); CSNMPMessage *msg = GetEntry(id); unlock(); if (msg) return msg->GetReceived(); else return 1; // the message is not in the queue...must have timed out } return 0;}int CSNMPMessageQueue::Done(unsigned long id) REENTRANT ({ // FF: This is much more efficient than the above CSNMPMessage *msg = GetEntry(id); if (!msg) return 1; // the message is not in the queue...must have timed out if (msg->GetReceived()) return 1; return 0;})#ifdef SNMP_PP_NAMESPACE}; // end of namespace Snmp_pp#endif
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?