📄 ora2my.cpp
字号:
#include "ora2my.h"
ora2my::ora2my(void):blobcount_(0),buffer_(NULL),my_port_(0)
{
}
ora2my::~ora2my(void)
{
ora_db_.logoff();
mysql_stmt_close(stmt_);
for(size_t i = 0; i < blobcount_ + 1; i++)
{
if(buffer_[i].buf_)
{
delete [] buffer_[i].buf_;
buffer_[i].buf_ = NULL;
}
}
delete [] buffer_;
}
int ora2my::init_ora(ACE_TString login)
{
if(login.length() == 0) return -1;
ora_login_ = login;
otl_connect::otl_initialize();
return 0;
}
int ora2my::init_my(ACE_TString host, ACE_TString user, ACE_TString pwd, u_int port, ACE_TString tablename)
{
if(host.length() == 0 || user.length() == 0 || pwd.length() == 0 || port < 0) return -1;
my_host_ = host;
my_user_ = user;
my_pwd_ = pwd;
my_port_ = port;
my_tabname_ = tablename;
return 0;
}
int ora2my::connect_ora(void)
{
ACE_TString login;
login = ora_login_;
try{
ora_db_.rlogon(login.c_str());
otl_cursor::direct_exec
(
ora_db_,
"alter session set nls_date_format='yyyymmddhh24miss'",
otl_exception::disabled
);
}
catch(otl_exception& p){
ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("%P|%t Connect %s: \n\t%s\n\t%s\n\t%s\n"),login.c_str(),p.msg,p.stm_text,p.var_info));
return -1;
}
ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t Connect %s ok...\n") , login.c_str()));
return 0;
}
int ora2my::connect_my(void)
{
mysql_ = mysql_init(NULL);
mysql_options(mysql_, MYSQL_SET_CHARSET_NAME, "gb2312");
mysql_ = mysql_real_connect(mysql_, my_host_.c_str(), my_user_.c_str(), my_pwd_.c_str(), "mysql", my_port_, NULL, CLIENT_MULTI_STATEMENTS);
if(!mysql_)
{
ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t Connect to Mysql error...\n")));
return -1;
}
stmt_ = mysql_stmt_init(mysql_);
if(!stmt_)
{
ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t mysql_stmt_init(), out of memory\n")));
return -1;
}
return 0;
}
int ora2my::dumpfromoracle(ACE_TString sql)
{
int fieldcount = 0;
size_t row = 0;
size_t lobcount = 0;
int lobindex = 0;
otl_stream otl_sin;
otl_lob_stream lob;
otl_column_desc* desc;
MYSQL_BIND *bind = NULL;
u_long *length = NULL;
my_bool *is_null = NULL;
ACE_TString my_sql;
otl_sin.set_all_column_types(otl_all_num2str | otl_all_date2str);
otl_sin.open(1, sql.c_str(), ora_db_);
desc=otl_sin.describe_select(fieldcount);
blobcount_ = lobcount = countlob(desc, fieldcount);
if(lobcount == -1) return -1;
bind = new MYSQL_BIND[lobcount];
int x = sizeof(MYSQL_BIND);
ACE_OS::memset(bind, 0, sizeof(MYSQL_BIND) * lobcount);
length = new u_long[lobcount];
x = sizeof(u_long);
ACE_OS::memset(length, 0, sizeof(u_long) * lobcount);
is_null = new my_bool[lobcount];
x = sizeof(my_bool);
ACE_OS::memset(is_null, 0, sizeof(my_bool) * lobcount);
for(size_t i = 0; i < lobcount; i++)
{
bind[i].buffer_type = MYSQL_TYPE_BLOB;
bind[i].buffer = buffer_[i + 1].buf_;
bind[i].buffer_length = buffer_[i + 1].len_;
bind[i].is_null = &(is_null[i]);
bind[i].length = &(length[i]);
}
while(!otl_sin.eof())
{
row++;
my_sql = "values(";
lobindex = 0;
for(int i = 0; i < fieldcount; i++)
{
switch(desc[i].otl_var_dbtype)
{
case otl_var_clob:
case otl_var_blob:
{
int flush = Z_NO_FLUSH;
u_long lob_len = 0;
int zip_len = 0; //compressBound(BUF_SIZE);
int total_zip_len = 0;
off_t total = 0;
Z_Stream zs;
my_sql += "?, ";
otl_sin >> lob;
lob_len = lob.len();
// length[lobindex] = lob.len(); //mastbe the length after compress
if(lob_len == 0)
{
is_null[lobindex] = 1;
lobindex++;
break;
}
if(lob_len > buffer_[lobindex + 1].len_) //reallocate
{
ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t reallocate buffer_[lobindex + 1].buf_\n")));
delete [] buffer_[lobindex + 1].buf_;
buffer_[lobindex + 1].buf_ = NULL;
int tmp_size = compressBound(lob_len + 1);
buffer_[lobindex + 1].buf_ = new char[tmp_size];
if(!buffer_[lobindex + 1].buf_)
{
ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t new for large memory fail\n")));
return -1;
}
buffer_[lobindex + 1].len_ = (u_long)tmp_size;
bind[lobindex].buffer = buffer_[lobindex + 1].buf_;
bind[lobindex].buffer_length = tmp_size;
}
if(lob_len > buffer_[0].len_)
{
ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t reallocate buffer_[0].buf_\n")));
delete [] buffer_[0].buf_;
buffer_[0].buf_ = NULL;
buffer_[0].buf_ = new char[lob_len + 1];
if(!buffer_[0].buf_)
{
ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t new for large memory fail\n")));
return -1;
}
buffer_[0].len_ = (u_long)(lob_len + 1);
}
is_null[lobindex] = 0;
zip_len = buffer_[lobindex + 1].len_;
otl_long_string f2((void*)buffer_[0].buf_, (const int)buffer_[0].len_);
lob >> f2;
if(!lob.eof())
{
ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t !lob.eof()\n")));
return -1;
}
//ACE_OS::memset(buffer_[lobindex + 1].buf_, 0, zip_len);
int ret = compress2((Bytef *)(buffer_[lobindex + 1].buf_), (uLongf*)&zip_len,(Bytef *)(buffer_[0].buf_), lob_len, 3);
if(ret != 0)
{
ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("%D %t compress error (%d)\n") ,ret ));
return ret;
}
ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t lob_len=%d zip_len=%d\n"), lob_len, zip_len));
/*
total_zip_len = 0;
while(!lob.eof())
{
int read_len = 0;
otl_long_string f2((void*)buffer_[0].buf_, (const int)buffer_[0].len_);
lob >> f2;
read_len = f2.len();
total += read_len;
if(total >= (off_t)lob_len) flush = Z_FINISH;
zip_len = compressBound(lob_len);
int ret = zs.deFlate((Bytef *)buffer_[lobindex + 1].buf_, &zip_len,(Bytef *)buffer_[0].buf_, read_len , flush);
if(ret != Z_OK)
{
ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("%P|%t Compress blob error...\n")));
return -1;
}
total_zip_len += zip_len;
}// while
length[lobindex] = total_zip_len;
*/
length[lobindex] = zip_len;
lob.close();
lobindex++;
break;
}
default:
{
size_t tmp_len = 0;
otl_sin >> buffer_[0].buf_;
tmp_len = ACE_OS::strlen(buffer_[0].buf_);
mysql_real_escape_string(mysql_, (buffer_[0].buf_ + tmp_len + 1), buffer_[0].buf_, (off_t)tmp_len); //处理转义
my_sql += "'";
my_sql += (buffer_[0].buf_ + tmp_len + 1);
my_sql += "', ";
break;
}
} //switch
} //for
my_sql[my_sql.length() - 2] = ')';
my_sql[my_sql.length() - 1] = '\0';
if(insert2mysql(my_sql, lobcount, my_tabname_, bind) == -1) return -1;
}
ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("%P|%t exp and imp %d rows...\n"), row));
if(bind) {delete [] bind; bind = NULL;}
if(length) {delete [] length; length = NULL;}
if(is_null){delete [] is_null; is_null = NULL;}
return 0;
}
int ora2my::countlob(otl_column_desc* desc, int fieldcount)
{
//off_t pos = 0;
size_t lobcount = 0;
for(int i = 0; i < fieldcount; i++)
{
switch(desc[i].otl_var_dbtype)
{
case otl_var_clob:
case otl_var_blob:
lobcount++;
break;
}
}
buffer_ = new space[lobcount + 1];
if(!buffer_) return -1;
// buffer_[0] for noneblob field
buffer_[0].buf_ = new char[BUF_SIZE];
buffer_[0].len_ = BUF_SIZE;
if(!buffer_[0].buf_) return -1;
// for each blob field
for(size_t i = 1; i <=lobcount; i++)
{
buffer_[i].buf_ = new char[BLOB_SIZE];
buffer_[i].len_ = BLOB_SIZE;
if(!buffer_[i].buf_) return -1;
}
return (int)lobcount;
}
bool ora2my::VerifyStmt(ACE_TString sql, size_t lobcount)
{
int param_count = 0;
if (mysql_stmt_prepare(stmt_, sql.c_str(), (u_long)sql.length()))
{
ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t mysql_stmt_prepare(), INSERT failed\n")));
return false;
}
param_count= mysql_stmt_param_count(stmt_);
if(param_count != lobcount)
{
ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t invalid parameter count returned by MySQL\n")));
return false;
}
return true;
}
bool ora2my::BindParam(MYSQL_BIND *bind)
{
if (mysql_stmt_bind_param(stmt_, bind))
{
ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t mysql_stmt_bind_param() failed\n")));
ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t %s\n"), mysql_stmt_error(stmt_)));
return false;
}
return true;
}
bool ora2my::Execute(void)
{
my_ulonglong affected_rows;
if (mysql_stmt_execute(stmt_))
{
ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t mysql_stmt_execute(), 1 failed\n")));
ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t %s\n"), mysql_stmt_error(stmt_)));
return false;
}
affected_rows= mysql_stmt_affected_rows(stmt_);
if(affected_rows != 1)
{
ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t invalid affected rows by MySQL\n")));
return false;
}
return true;
}
int ora2my::insert2mysql(ACE_TString sql, size_t lobcount, ACE_TString tablename, MYSQL_BIND *bind)
{
ACE_TString tmp = "insert into ";
tmp += tablename;
tmp += " ";
tmp += sql;
if(!VerifyStmt(tmp, lobcount)) return -1;
if(!BindParam(bind)) return -1;
if(!Execute()) return -1;
return 0;
}
/*
switch(desc[i].otl_var_dbtype)
{
case otl_var_char:
break;
case otl_var_double:
break;
case otl_var_float:
break;
case otl_var_int:
break;
case otl_var_unsigned_int:
break;
case otl_var_short:
break;
case otl_var_long_int:
break;
case otl_var_timestamp:
break;
case otl_var_varchar_long:
break;
case otl_var_raw_long:
break;
case otl_var_clob:
break;
case otl_var_blob:
break;
case otl_var_refcur:
break;
case otl_var_long_string:
break;
case otl_var_db2time:
break;
case otl_var_db2date:
break;
case otl_var_tz_timestamp:
break;
case otl_var_ltz_timestamp:
break;
case otl_var_bigint:
break;
}
*/
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -