📄 messagequeueservice.cpp
字号:
//%2006//////////////////////////////////////////////////////////////////////////// Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development// Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.// Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;// IBM Corp.; EMC Corporation, The Open Group.// Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;// IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.// Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;// EMC Corporation; VERITAS Software Corporation; The Open Group.// Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;// EMC Corporation; Symantec Corporation; The Open Group.//// Permission is hereby granted, free of charge, to any person obtaining a copy// of this software and associated documentation files (the "Software"), to// deal in the Software without restriction, including without limitation the// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or// sell copies of the Software, and to permit persons to whom the Software is// furnished to do so, subject to the following conditions:// // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN// ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED// "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT// LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR// PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.////==============================================================================//// Author: Mike Day (mdday@us.ibm.com)//// Modified By: Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)////%/////////////////////////////////////////////////////////////////////////////#include <sys/types.h>#if !defined(PEGASUS_PLATFORM_WIN32_IX86_MSVC)# include <unistd.h>#endif#include <iostream>#include <stdio.h>#include <string.h>#include <Pegasus/Common/Config.h>#include <Pegasus/Common/PegasusAssert.h>#include <Pegasus/Common/InternalException.h>#include <Pegasus/Common/MessageQueue.h>#include <Pegasus/Common/MessageQueueService.h>#include <Pegasus/Common/AsyncQueue.h>#include <Pegasus/Common/Thread.h>#include <Pegasus/Common/Array.h>#include <Pegasus/Common/AsyncOpNode.h>#include <Pegasus/Common/CimomMessage.h>#include <Pegasus/Common/Cimom.h>PEGASUS_USING_STD;PEGASUS_USING_PEGASUS;static char * verbose;class TestRequestMessage : public AsyncRequest{public: typedef AsyncRequest Base; TestRequestMessage( AsyncOpNode *op, Uint32 destination, Uint32 response, const char *message) : Base( 0x04100000, 0, op, destination, response, true), greeting(message) { } virtual ~TestRequestMessage() { } String greeting;};class TestResponseMessage : public AsyncReply{public: typedef AsyncReply Base; TestResponseMessage( AsyncOpNode *op, Uint32 result, Uint32 destination, const char *message) : Base( 0x04200000, 0, op, result, destination, true), greeting(message) { } virtual ~TestResponseMessage() { } String greeting;};class MessageQueueServer : public MessageQueueService{public: typedef MessageQueueService Base; MessageQueueServer(const char *name) : Base( name, MessageQueue::getNextQueueId(), 0, MessageMask::type_cimom | MessageMask::type_service | MessageMask::ha_request | MessageMask::ha_reply | MessageMask::ha_async), dienow(0) { } virtual ~MessageQueueServer() { } virtual void _handle_incoming_operation(AsyncOpNode *operation); virtual Boolean messageOK(const Message *msg); virtual void handleEnqueue() { // This method is pure abstract in the superclass PEGASUS_TEST_ASSERT(0); } virtual void handleEnqueue(Message* msg) { // This method is pure abstract in the superclass PEGASUS_TEST_ASSERT(0); } void handleTestRequestMessage(AsyncRequest *msg); virtual void handleCimServiceStop(CimServiceStop *req); virtual void _handle_async_request(AsyncRequest *req); void handleLegacyOpStart(AsyncLegacyOperationStart *req); AtomicInt dienow;};class MessageQueueClient : public MessageQueueService{ public: typedef MessageQueueService Base; MessageQueueClient(const char *name) : Base(name, MessageQueue::getNextQueueId(), 0, MessageMask::type_cimom | MessageMask::type_service | MessageMask::ha_request | MessageMask::ha_reply | MessageMask::ha_async), client_xid(1) { _client_capabilities = Base::_capabilities; _client_mask = Base::_mask; } virtual ~MessageQueueClient() { } virtual Boolean messageOK(const Message *msg); virtual void handleEnqueue() { // This method is pure abstract in the superclass PEGASUS_TEST_ASSERT(0); } virtual void handleEnqueue(Message* msg) { // This method is pure abstract in the superclass PEGASUS_TEST_ASSERT(0); } void sendTestRequestMessage(const char *greeting, Uint32 qid); Uint32 get_qid(); Uint32 _client_capabilities; Uint32 _client_mask; virtual void _handle_async_request(AsyncRequest *req); AtomicInt client_xid;};AtomicInt msg_count;AtomicInt client_count;Array<Uint32> services;Uint32 MessageQueueClient::get_qid(){ return _queueId;}void MessageQueueServer::_handle_incoming_operation(AsyncOpNode *operation){ if (operation != 0) { Message* rq = operation->getRequest(); PEGASUS_TEST_ASSERT(rq != 0); if (rq->getMask() & MessageMask::ha_async) { _handle_async_request(static_cast<AsyncRequest *>(rq)); } else { if (rq->getType() == 0x11100011) { if (verbose) { cout << " caught a hacked legacy message " << endl; } } delete rq; } } return;}void MessageQueueServer::_handle_async_request(AsyncRequest *req){ if (req->getType() == 0x04100000) { req->op->processing(); handleTestRequestMessage(req); } else if (req->getType() == async_messages::CIMSERVICE_STOP) { req->op->processing(); handleCimServiceStop(static_cast<CimServiceStop *>(req)); } else if (req->getType() == async_messages::ASYNC_LEGACY_OP_START) { req->op->processing(); handleLegacyOpStart(static_cast<AsyncLegacyOperationStart *>(req)); } else { Base::_handle_async_request(req); }}Boolean MessageQueueServer::messageOK(const Message *msg){ if (msg->getType() == 0x04100000 || msg->getType() == async_messages::CIMSERVICE_STOP || msg->getType() == async_messages::CIMSERVICE_PAUSE || msg->getType() == async_messages::ASYNC_LEGACY_OP_START || msg->getType() == async_messages::CIMSERVICE_RESUME || msg->getType() == 0x11100011) { return true; } return false;}void MessageQueueServer::handleLegacyOpStart(AsyncLegacyOperationStart *req){ Message *legacy = req->get_action(); if (verbose) { cout << " ### handling legacy messages " << endl; } AsyncReply *resp = new AsyncReply( async_messages::REPLY, 0, req->op, async_results::OK, req->resp, req->block); _completeAsyncResponse(req, resp, ASYNC_OPSTATE_COMPLETE, 0); if (verbose) { if (legacy != 0) cout << " legacy msg type: " << legacy->getType() << endl; } delete legacy;}void MessageQueueServer::handleTestRequestMessage(AsyncRequest *msg){ if (msg->getType() == 0x04100000) { TestResponseMessage *resp = new TestResponseMessage( msg->op, async_results::OK, msg->dest, "i am a test response"); _completeAsyncResponse(msg, resp, ASYNC_OPSTATE_COMPLETE, 0); }}void MessageQueueServer::handleCimServiceStop(CimServiceStop *req){ AsyncReply *resp = new AsyncReply( async_messages::REPLY, 0, req->op, async_results::CIM_SERVICE_STOPPED, req->resp, req->block); _completeAsyncResponse(req, resp, ASYNC_OPSTATE_COMPLETE, 0); if (verbose) { cout << "recieved STOP from test client" << endl; } dienow++;}void MessageQueueClient::_handle_async_request(AsyncRequest *req){ Base::_handle_async_request(req);}Boolean MessageQueueClient::messageOK(const Message *msg){ if(msg->getMask() & MessageMask::ha_async) { if (msg->getType() == 0x04200000 || msg->getType() == async_messages::CIMSERVICE_STOP || msg->getType() == async_messages::CIMSERVICE_PAUSE || msg->getType() == async_messages::CIMSERVICE_RESUME) return true; } return false;}void MessageQueueClient::sendTestRequestMessage( const char *greeting, Uint32 qid){ TestRequestMessage *req = new TestRequestMessage( 0, qid, _queueId, greeting); AsyncMessage *response = SendWait(req); if (response != 0) { msg_count++; delete response; if (verbose) { cout << " test message " << msg_count.get() << endl; } } delete req;}ThreadReturnType PEGASUS_THREAD_CDECL client_func(void *parm);ThreadReturnType PEGASUS_THREAD_CDECL server_func(void *parm);int main(int argc, char **argv){ verbose = getenv("PEGASUS_TEST_VERBOSE"); try { Thread client(client_func, (void *)&msg_count, false); Thread another(client_func, (void *)&msg_count, false); Thread a_third(client_func, (void *)&msg_count, false); Thread server(server_func, (void *)&msg_count, false); server.run(); client.run(); another.run(); a_third.run(); while (msg_count.get() < 1500) { Threads::sleep(10); } a_third.join(); another.join(); client.join(); server.join(); } catch (Exception& e) { cout << "Exception: " << e.getMessage() << endl; exit(1); } catch (...) { cout << "Caught unknown exception" << endl; exit(1); } if (verbose) { cout << "exiting main " << endl; } cout << argv[0] << " +++++ passed all tests" << endl; return 0;}ThreadReturnType PEGASUS_THREAD_CDECL client_func(void *parm){ Thread* my_handle = reinterpret_cast<Thread *>(parm); AtomicInt& count = *(reinterpret_cast<AtomicInt *>(my_handle->get_parm())); char name_buf[128]; sprintf(name_buf, "test client %s", Threads::id().buffer); MessageQueueClient *q_client = new MessageQueueClient(name_buf); client_count++; while (client_count.get() < 3) Threads::yield(); while (services.size() == 0) { q_client->find_services(String("test server"), 0, 0, &services); Threads::yield(); } if (verbose) { cout << "found server at " << services[0] << endl; } while (msg_count.get() < 1500) { q_client->sendTestRequestMessage("i am the test client" , services[0]); } // now that we have sent and received all of our responses, tell // the server thread to stop AsyncMessage *reply; if (verbose) { cout << " sending LEGACY to test server" << endl; } Message *legacy = new Message(0x11100011); AsyncLegacyOperationStart *req = new AsyncLegacyOperationStart( 0, services[0], legacy, q_client->getQueueId()); reply = q_client->SendWait(req); delete req; delete reply; if (verbose) { cout << "trying SendForget " << endl; } legacy = new Message(0x11100011); req = new AsyncLegacyOperationStart( 0, services[0], legacy, q_client->getQueueId()); q_client->SendForget(req); legacy = new Message(0x11100011); legacy->dest = services[0]; q_client->SendForget(legacy); MessageQueueService * server = static_cast<MessageQueueService *>(MessageQueue::lookup(services[0]));#if 0 legacy = new Message(0x11100011); // ATTN: handleEnqueue() is not implemented server->enqueue(legacy);#endif if (verbose) { cout << "sending STOP to test server" << endl; } CimServiceStop *stop = new CimServiceStop( 0, services[0], q_client->get_qid(), true); reply = q_client->SendWait(stop); delete stop; delete reply; if (verbose) { cout << "deregistering client qid " << q_client->getQueueId() << endl; } q_client->deregister_service(); if (verbose) { cout << "closing service queue" << endl; } q_client->_shutdown_incoming_queue(); if (verbose) { cout << " deleting client " << endl ; } delete q_client; if (verbose) { cout << " exiting " << endl; } my_handle->exit_self((ThreadReturnType) 1); return(0);}ThreadReturnType PEGASUS_THREAD_CDECL server_func(void *parm){ Thread *my_handle = reinterpret_cast<Thread *>(parm); MessageQueueServer *q_server = new MessageQueueServer("test server") ; while (q_server->dienow.get() < 3) { Threads::yield(); } if (verbose) { cout << "deregistering server qid " << q_server->getQueueId() << endl; } q_server->deregister_service(); if (verbose) { cout << "closing server queue" << endl; } q_server->_shutdown_incoming_queue(); if (verbose) { cout << " deleting server " << endl; } delete q_server; if (verbose) { cout << "exiting server " << endl; } my_handle->exit_self((ThreadReturnType) 1); return(0);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -