📄 routerconnection.c
字号:
// Copyright (c) 2006 David Muse// See the file COPYING for more information#include <routerconnection.h>#include <rudiments/rawbuffer.h>#include <stdio.h>#include <stdlib.h>// debug getpid()#include <unistd.h>#include <datatypes.h>routerconnection::routerconnection() : sqlrconnection_svr() { cons=NULL; beginquery=NULL; anymustbegin=false; concount=0; cfgfile=NULL; justloggedin=false; nullbindvalue=nullBindValue(); nonnullbindvalue=nonNullBindValue(); beginregex.compile("^\\s*(begin|start\\s*transaction)"); beginregex.study(); beginendregex.compile("^\\s*begin\\s*.*\\s*end;\\s*$"); beginendregex.study();}routerconnection::~routerconnection() { for (uint16_t index=0; index<concount; index++) { delete cons[index]; } delete[] cons; delete[] beginquery;}bool routerconnection::supportsAuthOnDatabase() { return false;}uint16_t routerconnection::getNumberOfConnectStringVars() { return 0;}void routerconnection::handleConnectString() { cfgfile=cfgfl; linkedlist< routecontainer * > *routelist=cfgfl->getRouteList(); concount=routelist->getLength(); cons=new sqlrconnection *[concount]; beginquery=new const char *[concount]; anymustbegin=false; for (uint16_t index=0; index<concount; index++) { cons[index]=NULL; beginquery[index]=NULL; routecontainer *rn=routelist-> getNodeByIndex(index)->getData(); // empty host/port/socket/user/password means that queries // going to this connection will be filtered out if (rn->getIsFilter()) { continue; } cons[index]=new sqlrconnection(rn->getHost(),rn->getPort(), rn->getSocket(),rn->getUser(), rn->getPassword(),0,1); const char *id=cons[index]->identify(); if (!charstring::compare(id,"sybase") || !charstring::compare(id,"freetds")) { beginquery[index]="begin tran"; } else if (!charstring::compare(id,"sqlite")) { beginquery[index]="begin transaction"; } else if (!charstring::compare(id,"postgresql") || !charstring::compare(id,"router")) { beginquery[index]="begin"; } if (beginquery[index]) { anymustbegin=true; } }}bool routerconnection::logIn(bool printerrors) { justloggedin=true; return true;}sqlrcursor_svr *routerconnection::initCursor() { return (sqlrcursor_svr *)new routercursor((sqlrconnection_svr *)this);}void routerconnection::deleteCursor(sqlrcursor_svr *curs) { delete (routercursor *)curs;}void routerconnection::logOut() {}bool routerconnection::autoCommitOn() { // turn autocommit on for all connections, if any fail, return failure bool result=true; for (uint16_t index=0; index<concount; index++) { if (!cons[index]) { continue; } bool res=cons[index]->autoCommitOn(); if (!res) { autoCommitOnFailed(index); } // The connection class calls autoCommitOn or autoCommitOff // immediately after logging in, which will cause the // cons to connect to the relay's and tie them up unless we // call endSession. We'd rather not tie them up until a // client connects, so if we just logged in, we'll call // endSession. if (justloggedin) { // if any of the connections must begin transactions, // then those connections will start off in auto-commit // mode no matter what, so put all connections in // autocommit mode // (this is a convenient place to do this...) if (anymustbegin) { cons[index]->autoCommitOn(); } cons[index]->endSession(); } if (result) { result=res; } } if (justloggedin) { justloggedin=false; } return result;}bool routerconnection::autoCommitOff() { // turn autocommit on for all connections, if any fail, return failure bool result=true; for (uint16_t index=0; index<concount; index++) { if (!cons[index]) { continue; } bool res=cons[index]->autoCommitOff(); if (!res) { autoCommitOffFailed(index); } // The connection class calls autoCommitOn or autoCommitOff // immediately after logging in, which will cause the // cons to connect to the relay's and tie them up unless we // call endSession. We'd rather not tie them up until a // client connects, so if we just logged in, we'll call // endSession. if (justloggedin) { // if any of the connections must begin transactions, // then those connections will start off in auto-commit // mode no matter what, so put all connections in // autocommit mode, even if autocommit-off is called // here // (this is a convenient place to do this...) if (anymustbegin) { cons[index]->autoCommitOn(); } cons[index]->endSession(); } if (result) { result=res; } } if (justloggedin) { justloggedin=false; } return result;}bool routerconnection::commit() { // commit all connections, if any fail, return failure // FIXME: use 2 stage commit... bool result=true; for (uint16_t index=0; index<concount; index++) { if (!cons[index]) { continue; } bool res=cons[index]->commit(); if (!res) { commitFailed(index); } if (result) { result=res; } } return result;}bool routerconnection::rollback() { // commit all connections, if any fail, return failure // FIXME: use 2 stage commit... bool result=true; for (uint16_t index=0; index<concount; index++) { if (!cons[index]) { continue; } bool res=cons[index]->rollback(); if (!res) { rollbackFailed(index); } if (result) { result=res; } } return result;}void routerconnection::endSession() { for (uint16_t index=0; index<concount; index++) { if (!cons[index]) { continue; } cons[index]->endSession(); }}const char *routerconnection::identify() { return "router";}const char *routerconnection::dbVersion() { // FIXME: return SQL Relay version? return "";}bool routerconnection::ping() { // ping all connections, if any fail, return failure bool result=true; for (uint16_t index=0; index<concount; index++) { if (!cons[index]) { continue; } bool res=cons[index]->ping(); if (result) { result=res; } } return result;}routercursor::routercursor(sqlrconnection_svr *conn) : sqlrcursor_svr(conn) { routerconn=(routerconnection *)conn; nextrow=0; con=NULL; cur=NULL; isbindcur=false; curindex=0; curs=new sqlrcursor *[routerconn->concount]; for (uint16_t index=0; index<routerconn->concount; index++) { curs[index]=NULL; if (!routerconn->cons[index]) { continue; } curs[index]=new sqlrcursor(routerconn->cons[index]); curs[index]->setResultSetBufferSize(FETCH_AT_ONCE); } beginquery=false; obcount=0; cbcount=0; createoratemp.compile("(create|CREATE)[ \\t\\n\\r]+(global|GLOBAL)[ \\t\\n\\r]+(temporary|TEMPORARY)[ \\t\\n\\r]+(table|TABLE)[ \\t\\n\\r]+"); preserverows.compile("(on|ON)[ \\t\\n\\r]+(commit|COMMIT)[ \\t\\n\\r]+(preserve|PRESERVE)[ \\t\\n\\r]+(rows|ROWS)");}routercursor::~routercursor() { for (uint16_t index=0; index<routerconn->concount; index++) { delete curs[index]; } delete[] curs;}bool routercursor::prepareQuery(const char *query, uint32_t length) { // check for a begin query, but not "begin ... end;" query beginquery=(routerconn->beginregex.match(query) && !routerconn->beginendregex.match(query)); if (beginquery) { return true; } // reset cur and pointers con=NULL; if (isbindcur) { delete cur; } cur=NULL; isbindcur=false; curindex=0; // initialize the output bind count obcount=0; // initialize the cursor bind count cbcount=0; // look through the regular expressions and figure out which // connection this query needs to be run through uint16_t conindex=0; routenode *rcn=routerconn->cfgfile-> getRouteList()->getNodeByIndex(0); bool found=false; while (rcn && !found) { linkedlistnode< regularexpression * > *ren= rcn->getData()->getRegexList()-> getNodeByIndex(0); while (ren && !found) { if (ren->getData()->match(query)) { con=routerconn->cons[conindex]; cur=curs[conindex]; curindex=conindex; found=true; } ren=ren->getNext(); } if (!found) { rcn=rcn->getNext(); conindex++; } } // cur could be NULL here either because no connection could be found // to run the query, or because the query matched the pattern of // a connection which was intentionally set up to filter out the query if (!cur) { return false; } // prepare the query using the cursor from whichever // connection turned out to be the right one cur->prepareQuery(query); return true;}bool routercursor::supportsNativeBinds() { return true;}bool routercursor::inputBindString(const char *variable, uint16_t variablesize, const char *value, uint16_t valuesize, int16_t *isnull) { cur->inputBind(variable+1,value); return true;}bool routercursor::inputBindInteger(const char *variable, uint16_t variablesize, int64_t *value) { cur->inputBind(variable+1,*value); return true;}bool routercursor::inputBindDouble(const char *variable, uint16_t variablesize, double *value, uint32_t precision, uint32_t scale) { cur->inputBind(variable+1,*value,precision,scale); return true;}bool routercursor::inputBindBlob(const char *variable, uint16_t variablesize,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -