ndb_async1.cpp

来自「MySQL数据库开发源码 值得一看哦」· C++ 代码 · 共 648 行 · 第 1/2 页

CPP
648
字号
}/** * Transaction 4 - T4 *  * Create session * * Input: *   SubscriberNumber *   ServerId *   ServerBit *   SessionDetails, *   DoRollback * Output: *   ChangedBy *   ChangedTime *   Location *   BranchExecuted */voidstart_T4(Ndb * pNDB, ThreadData * td){  DEBUG3("T4(%.*s, %.2d): - Starting\n", SUBSCRIBER_NUMBER_LENGTH, 	 td->transactionData.number, 	 td->transactionData.server_id);    int check;  NdbRecAttr * check2;    NdbConnection * pCON = startTransaction(pNDB, 					  td->transactionData.server_id, 					  td->transactionData.number);  if (pCON == NULL)	      error_handler("T4-1: startTranscation", 		  pNDB->getNdbErrorString(), 		  pNDB->getNdbError());    NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);  CHECK_NULL(MyOp, "T4-1: getNdbOperation", 	     pCON);    MyOp->interpretedUpdateTuple();  MyOp->equal(IND_SUBSCRIBER_NUMBER, 	      td->transactionData.number);  MyOp->getValue(IND_SUBSCRIBER_LOCATION, 		 (char *)&td->transactionData.location);  MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY, 		 td->transactionData.changed_by);  MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME, 		 td->transactionData.changed_time);  MyOp->getValue(IND_SUBSCRIBER_GROUP,		 (char *)&td->transactionData.group_id);  MyOp->getValue(IND_SUBSCRIBER_SESSIONS,		 (char *)&td->transactionData.sessions);   MyOp->incValue(IND_SUBSCRIBER_SESSIONS, 		 (uint32)td->transactionData.server_bit);  pCON->executeAsynchPrepare( NoCommit , T4_Callback_1, td); }voidT4_Callback_1(int result, NdbConnection * pCON, void * threadData){  CHECK_MINUS_ONE(result, "T4-1: NoCommit", pCON);   ThreadData * td = (ThreadData *)threadData;      DEBUG3("T4(%.*s, %.2d): - Callback 1\n", 	 SUBSCRIBER_NUMBER_LENGTH, 	 td->transactionData.number, 	 td->transactionData.server_id);  NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);  CHECK_NULL(MyOp, "T4-2: getNdbOperation", 	     pCON);    MyOp->readTuple();  MyOp->equal(IND_GROUP_ID,	      (char*)&td->transactionData.group_id);  MyOp->getValue(IND_GROUP_ALLOW_INSERT, 		 (char *)&td->transactionData.permission);  pCON->executeAsynchPrepare( NoCommit , T4_Callback_2, td); }voidT4_Callback_2(int result, NdbConnection * pCON, void * threadData){  CHECK_MINUS_ONE(result, "T4-2: NoCommit", pCON);   ThreadData * td = (ThreadData *)threadData;    Uint32 permission = td->transactionData.permission;  Uint32 sessions   = td->transactionData.sessions;  Uint32 server_bit = td->transactionData.server_bit;    if(((permission & server_bit) == server_bit) &&     ((sessions   & server_bit) == 0)){        memcpy(td->transactionData.suffix,	   &td->transactionData.number	   [SUBSCRIBER_NUMBER_LENGTH-SUBSCRIBER_NUMBER_SUFFIX_LENGTH], 	   SUBSCRIBER_NUMBER_SUFFIX_LENGTH);        DEBUG5("T4(%.*s, %.2d): - Callback 2 - inserting(%.*s)\n", 	   SUBSCRIBER_NUMBER_LENGTH, 	   td->transactionData.number, 	   td->transactionData.server_id,	   SUBSCRIBER_NUMBER_SUFFIX_LENGTH, 	   td->transactionData.suffix);        /* Operation 3 */        NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);    CHECK_NULL(MyOp, "T4-3: getNdbOperation", 	       pCON);        MyOp->insertTuple();    MyOp->equal(IND_SESSION_SUBSCRIBER,		(char*)td->transactionData.number);    MyOp->equal(IND_SESSION_SERVER,		(char*)&td->transactionData.server_id);    MyOp->setValue(SESSION_DATA, 		   (char *)td->transactionData.session_details);    /* Operation 4 */        /* Operation 5 */    MyOp = pCON->getNdbOperation(SERVER_TABLE);    CHECK_NULL(MyOp, "T4-5: getNdbOperation", 	       pCON);        MyOp->interpretedUpdateTuple();    MyOp->equal(IND_SERVER_ID,		(char*)&td->transactionData.server_id);    MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,		(char*)td->transactionData.suffix);    MyOp->incValue(IND_SERVER_INSERTS, (uint32)1);    td->transactionData.branchExecuted = 1;  } else {    td->transactionData.branchExecuted = 0;    DEBUG5("T4(%.*s, %.2d): - Callback 2 - %s %s\n",	   SUBSCRIBER_NUMBER_LENGTH, 	   td->transactionData.number, 	   td->transactionData.server_id,	   ((permission & server_bit) ? 	    "permission - " : "no permission - "),	   ((sessions   & server_bit) ? 	    "in session - " : "no in session - "));  }    if(!td->transactionData.do_rollback && td->transactionData.branchExecuted){    pCON->executeAsynchPrepare(Commit, T4_Callback_3, td);   } else {    pCON->executeAsynchPrepare(Rollback, T4_Callback_3, td);  }}voidT4_Callback_3(int result, NdbConnection * pCON, void * threadData){  CHECK_MINUS_ONE(result, "T4-3: Commit", pCON);   ThreadData * td = (ThreadData *)threadData;      DEBUG3("T4(%.*s, %.2d): - Completing\n", 	 SUBSCRIBER_NUMBER_LENGTH, 	 td->transactionData.number, 	 td->transactionData.server_id);  td->pNDB->closeTransaction(pCON);  complete_T4(td);}/** * Transaction 5 - T5 *  * Delete session * * Input: *   SubscriberNumber *   ServerId *   ServerBit *   DoRollback * Output: *   ChangedBy *   ChangedTime *   Location *   BranchExecuted */voidstart_T5(Ndb * pNDB, ThreadData * td){  DEBUG3("T5(%.*s, %.2d): - Starting\n", SUBSCRIBER_NUMBER_LENGTH, 	 td->transactionData.number, 	 td->transactionData.server_id);  int check;  NdbRecAttr * check2;    NdbConnection * pCON = pNDB->startTransaction();  if (pCON == NULL)	      error_handler("T5-1: startTranscation", 		  pNDB->getNdbErrorString(), 		  pNDB->getNdbError());    NdbOperation * MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);  CHECK_NULL(MyOp, "T5-1: getNdbOperation", 	     pCON);    MyOp->interpretedUpdateTuple();  MyOp->equal(IND_SUBSCRIBER_NUMBER, 	      td->transactionData.number);  MyOp->getValue(IND_SUBSCRIBER_LOCATION, 		 (char *)&td->transactionData.location);  MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY, 		 td->transactionData.changed_by);  MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME, 		 td->transactionData.changed_time);  MyOp->getValue(IND_SUBSCRIBER_GROUP,		 (char *)&td->transactionData.group_id);  MyOp->getValue(IND_SUBSCRIBER_SESSIONS,		 (char *)&td->transactionData.sessions);  MyOp->subValue(IND_SUBSCRIBER_SESSIONS, 		 (uint32)td->transactionData.server_bit);  pCON->executeAsynchPrepare( NoCommit, T5_Callback_1, td ); }voidT5_Callback_1(int result, NdbConnection * pCON, void * threadData){  CHECK_MINUS_ONE(result, "T5-1: NoCommit", pCON);   ThreadData * td = (ThreadData *)threadData;    DEBUG3("T5(%.*s, %.2d): - Callback 1\n", 	 SUBSCRIBER_NUMBER_LENGTH, 	 td->transactionData.number, 	 td->transactionData.server_id);    NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);  CHECK_NULL(MyOp, "T5-2: getNdbOperation", 	     pCON);    MyOp->readTuple();  MyOp->equal(IND_GROUP_ID,	      (char*)&td->transactionData.group_id);  MyOp->getValue(IND_GROUP_ALLOW_DELETE, 		 (char *)&td->transactionData.permission);  pCON->executeAsynchPrepare( NoCommit, T5_Callback_2, td ); }voidT5_Callback_2(int result, NdbConnection * pCON, void * threadData){  CHECK_MINUS_ONE(result, "T5-2: NoCommit", pCON);   ThreadData * td = (ThreadData *)threadData;    Uint32 permission = td->transactionData.permission;  Uint32 sessions   = td->transactionData.sessions;  Uint32 server_bit = td->transactionData.server_bit;  if(((permission & server_bit) == server_bit) &&     ((sessions   & server_bit) == server_bit)){        memcpy(td->transactionData.suffix,	   &td->transactionData.number	   [SUBSCRIBER_NUMBER_LENGTH-SUBSCRIBER_NUMBER_SUFFIX_LENGTH], 	   SUBSCRIBER_NUMBER_SUFFIX_LENGTH);      DEBUG5("T5(%.*s, %.2d): - Callback 2 - deleting(%.*s)\n", 	   SUBSCRIBER_NUMBER_LENGTH, 	   td->transactionData.number, 	   td->transactionData.server_id,	   SUBSCRIBER_NUMBER_SUFFIX_LENGTH, 	   td->transactionData.suffix);        /* Operation 3 */    NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);    CHECK_NULL(MyOp, "T5-3: getNdbOperation", 	       pCON);        MyOp->deleteTuple();    MyOp->equal(IND_SESSION_SUBSCRIBER,		(char*)td->transactionData.number);    MyOp->equal(IND_SESSION_SERVER,		(char*)&td->transactionData.server_id);    /* Operation 4 */        /* Operation 5 */    MyOp = pCON->getNdbOperation(SERVER_TABLE);    CHECK_NULL(MyOp, "T5-5: getNdbOperation", 	       pCON);        MyOp->interpretedUpdateTuple();    MyOp->equal(IND_SERVER_ID,		(char*)&td->transactionData.server_id);    MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,		(char*)td->transactionData.suffix);    MyOp->incValue(IND_SERVER_DELETES, (uint32)1);    td->transactionData.branchExecuted = 1;  } else {    td->transactionData.branchExecuted = 0;    DEBUG5("T5(%.*s, %.2d): - Callback 2 - no delete - %s %s\n", 	   SUBSCRIBER_NUMBER_LENGTH, 	   td->transactionData.number, 	   td->transactionData.server_id,	   ((permission & server_bit) ? 	    "permission - " : "no permission - "),	   ((sessions   & server_bit) ? 	    "in session - " : "no in session - "));  }    if(!td->transactionData.do_rollback && td->transactionData.branchExecuted){    pCON->executeAsynchPrepare(Commit, T5_Callback_3, td);   } else {    pCON->executeAsynchPrepare(Rollback, T5_Callback_3, td);  }}voidT5_Callback_3(int result, NdbConnection * pCON, void * threadData){  CHECK_MINUS_ONE(result, "T5-3: Commit", pCON);   ThreadData * td = (ThreadData *)threadData;    DEBUG3("T5(%.*s, %.2d): - Completing\n", 	 SUBSCRIBER_NUMBER_LENGTH, 	 td->transactionData.number, 	 td->transactionData.server_id);    td->pNDB->closeTransaction(pCON);  complete_T5(td);}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?