⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ora2my.cpp

📁 把oracle库中的数据插入到mysql表中 oracle2mysql
💻 CPP
字号:
#include "ora2my.h"

ora2my::ora2my(void):blobcount_(0),buffer_(NULL),my_port_(0)
{
}

ora2my::~ora2my(void)
{
	ora_db_.logoff();
	mysql_stmt_close(stmt_);

	for(size_t i = 0; i < blobcount_ + 1; i++)
	{
		if(buffer_[i].buf_) 
		{
			delete [] buffer_[i].buf_;
			buffer_[i].buf_ = NULL;
		}
	}

	delete [] buffer_;
}

int ora2my::init_ora(ACE_TString login)
{
	if(login.length() == 0) return -1;
	ora_login_ = login;

	otl_connect::otl_initialize();

	return 0;
}

int ora2my::init_my(ACE_TString host, ACE_TString user, ACE_TString pwd, u_int port, ACE_TString tablename)
{
	if(host.length() == 0 || user.length() == 0 || pwd.length() == 0 || port < 0) return -1;
	my_host_    = host;
	my_user_    = user;
	my_pwd_     = pwd;
	my_port_    = port;
	my_tabname_ = tablename;

	return 0;
}

int ora2my::connect_ora(void)
{
	ACE_TString login;
	login = ora_login_;
	try{
		ora_db_.rlogon(login.c_str());
		otl_cursor::direct_exec
		(
		ora_db_,
		"alter session set nls_date_format='yyyymmddhh24miss'",
		otl_exception::disabled
		);
	}
	catch(otl_exception& p){
		ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("%P|%t Connect %s: \n\t%s\n\t%s\n\t%s\n"),login.c_str(),p.msg,p.stm_text,p.var_info));
		return -1;
	}
	ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t Connect %s ok...\n") , login.c_str())); 
	return 0;
}

int ora2my::connect_my(void)
{
	mysql_ = mysql_init(NULL);
	mysql_options(mysql_, MYSQL_SET_CHARSET_NAME, "gb2312");
	mysql_ = mysql_real_connect(mysql_, my_host_.c_str(), my_user_.c_str(), my_pwd_.c_str(), "mysql", my_port_, NULL, CLIENT_MULTI_STATEMENTS);
	if(!mysql_)
	{
		ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t Connect to Mysql error...\n"))); 
		return -1;
	}

	stmt_ = mysql_stmt_init(mysql_);
	if(!stmt_)
	{
		ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t mysql_stmt_init(), out of memory\n"))); 
		return -1;
	}
	
	return 0;
}

int ora2my::dumpfromoracle(ACE_TString sql)
{
	int fieldcount = 0;
	size_t row = 0;
	size_t lobcount = 0;
	int lobindex = 0;

	otl_stream otl_sin;
	otl_lob_stream lob;
	otl_column_desc* desc;

	MYSQL_BIND *bind = NULL;
	u_long *length = NULL;
	my_bool *is_null = NULL;
	ACE_TString my_sql;



	otl_sin.set_all_column_types(otl_all_num2str | otl_all_date2str);
	otl_sin.open(1, sql.c_str(), ora_db_);
	desc=otl_sin.describe_select(fieldcount);

	blobcount_ = lobcount = countlob(desc, fieldcount);
	if(lobcount == -1) return -1;

	bind = new MYSQL_BIND[lobcount];
	int x = sizeof(MYSQL_BIND);
	ACE_OS::memset(bind, 0, sizeof(MYSQL_BIND) * lobcount);

	length = new u_long[lobcount];
	x = sizeof(u_long);
	ACE_OS::memset(length, 0, sizeof(u_long) * lobcount);

	is_null = new my_bool[lobcount];
	x = sizeof(my_bool);
	ACE_OS::memset(is_null, 0, sizeof(my_bool) * lobcount);

	for(size_t i = 0; i < lobcount; i++)
	{
		bind[i].buffer_type = MYSQL_TYPE_BLOB;
		bind[i].buffer = buffer_[i + 1].buf_;
		bind[i].buffer_length = buffer_[i + 1].len_;
		bind[i].is_null = &(is_null[i]);
		bind[i].length = &(length[i]);
	}

	while(!otl_sin.eof())
	{
		row++;
		my_sql = "values(";
		lobindex = 0;
		for(int i = 0; i < fieldcount; i++)
		{
			switch(desc[i].otl_var_dbtype)
			{
			case otl_var_clob:
			case otl_var_blob:
				{
					int flush = Z_NO_FLUSH;
					u_long lob_len = 0;
					int zip_len = 0; //compressBound(BUF_SIZE);
					int total_zip_len = 0;
					off_t total = 0;
					Z_Stream zs;

					my_sql += "?, ";

					otl_sin >> lob;
					lob_len = lob.len();
					// length[lobindex] = lob.len(); //mastbe the length after compress

					if(lob_len == 0)
					{
						is_null[lobindex] = 1;
						lobindex++;
						break;
					}

					if(lob_len > buffer_[lobindex + 1].len_) //reallocate
					{
						ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t reallocate buffer_[lobindex + 1].buf_\n"))); 
						delete [] buffer_[lobindex + 1].buf_;
						buffer_[lobindex + 1].buf_ = NULL;

						int tmp_size = compressBound(lob_len + 1);
						buffer_[lobindex + 1].buf_ = new char[tmp_size];
						if(!buffer_[lobindex + 1].buf_)
						{
							ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t new for large memory fail\n"))); 
							return -1;
						}
						buffer_[lobindex + 1].len_ = (u_long)tmp_size;

						bind[lobindex].buffer = buffer_[lobindex + 1].buf_;
						bind[lobindex].buffer_length = tmp_size;
					}

					if(lob_len > buffer_[0].len_)
					{
						ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t reallocate buffer_[0].buf_\n"))); 
						delete [] buffer_[0].buf_;
						buffer_[0].buf_ = NULL;
						buffer_[0].buf_ = new char[lob_len + 1];
						if(!buffer_[0].buf_)
						{
							ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t new for large memory fail\n"))); 
							return -1;
						}
						buffer_[0].len_ = (u_long)(lob_len + 1);
					}

					is_null[lobindex] = 0;

					zip_len = buffer_[lobindex + 1].len_;
					otl_long_string f2((void*)buffer_[0].buf_, (const int)buffer_[0].len_);
					lob >> f2;
					if(!lob.eof())
					{
						ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t !lob.eof()\n")));
						return -1;
					}
					
					//ACE_OS::memset(buffer_[lobindex + 1].buf_, 0, zip_len);
					int ret = compress2((Bytef *)(buffer_[lobindex + 1].buf_), (uLongf*)&zip_len,(Bytef *)(buffer_[0].buf_), lob_len, 3);
					if(ret != 0)
					{
						ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("%D %t compress error (%d)\n") ,ret ));
						return ret;
					}

					ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t lob_len=%d zip_len=%d\n"), lob_len, zip_len));

					/*
					total_zip_len = 0;
					while(!lob.eof())
					{
						int read_len = 0;
						otl_long_string f2((void*)buffer_[0].buf_, (const int)buffer_[0].len_);

						lob >> f2;
						read_len = f2.len();
						total += read_len;
						if(total >= (off_t)lob_len) flush = Z_FINISH;

						zip_len = compressBound(lob_len);
						int ret = zs.deFlate((Bytef *)buffer_[lobindex + 1].buf_, &zip_len,(Bytef *)buffer_[0].buf_, read_len , flush);
						if(ret != Z_OK)
						{
							ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("%P|%t Compress blob error...\n")));
							return -1;
						}
						total_zip_len += zip_len;
					}// while

					length[lobindex] = total_zip_len;
					*/

					length[lobindex] = zip_len;
					
					lob.close();
					lobindex++;
					break;
				}
			default:
				{
					size_t tmp_len = 0;
					otl_sin >> buffer_[0].buf_;
					tmp_len = ACE_OS::strlen(buffer_[0].buf_);
					mysql_real_escape_string(mysql_, (buffer_[0].buf_ + tmp_len + 1), buffer_[0].buf_, (off_t)tmp_len); //处理转义
					my_sql += "'";
					my_sql += (buffer_[0].buf_ + tmp_len + 1);
					my_sql += "', ";
					break;
				}
			} //switch
		} //for

		my_sql[my_sql.length() - 2] = ')';
		my_sql[my_sql.length() - 1] = '\0';
		if(insert2mysql(my_sql, lobcount, my_tabname_, bind) == -1) return -1;
	}
	ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("%P|%t exp and imp %d rows...\n"), row));

	if(bind)   {delete [] bind; bind = NULL;} 
	if(length) {delete [] length; length = NULL;}
	if(is_null){delete [] is_null; is_null = NULL;}

	return 0;
}

int ora2my::countlob(otl_column_desc* desc, int fieldcount)
{
	//off_t pos = 0;
	size_t lobcount = 0;
	
	for(int i = 0; i < fieldcount; i++)
	{
		switch(desc[i].otl_var_dbtype)
		{
		case otl_var_clob:
		case otl_var_blob:
			lobcount++;
			break;
		}
	}

	buffer_ = new space[lobcount + 1]; 
	if(!buffer_) return -1;

	// buffer_[0] for noneblob field
	buffer_[0].buf_ = new char[BUF_SIZE];
	buffer_[0].len_ = BUF_SIZE;
	if(!buffer_[0].buf_) return -1;

	// for each blob field
	for(size_t i = 1; i <=lobcount; i++)
	{
		buffer_[i].buf_ = new char[BLOB_SIZE];
		buffer_[i].len_ = BLOB_SIZE;
		if(!buffer_[i].buf_) return -1;
	}

	return (int)lobcount;
}

bool ora2my::VerifyStmt(ACE_TString sql, size_t lobcount)
{
	int param_count = 0;

	if (mysql_stmt_prepare(stmt_, sql.c_str(), (u_long)sql.length()))
	{
		ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t mysql_stmt_prepare(), INSERT failed\n")));
		return false;
	}
	param_count= mysql_stmt_param_count(stmt_);
	if(param_count != lobcount)
	{
		ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t invalid parameter count returned by MySQL\n")));
		return false;
	}

	return true;
}

bool ora2my::BindParam(MYSQL_BIND *bind)
{
	if (mysql_stmt_bind_param(stmt_, bind))
	{
		ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t mysql_stmt_bind_param() failed\n")));
		ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t %s\n"), mysql_stmt_error(stmt_)));
		return false;
	}

	return true;
}

bool ora2my::Execute(void)
{
	my_ulonglong  affected_rows;

	if (mysql_stmt_execute(stmt_))
	{
		ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t mysql_stmt_execute(), 1 failed\n")));
		ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t %s\n"), mysql_stmt_error(stmt_)));
		return false;
	}

	affected_rows= mysql_stmt_affected_rows(stmt_);
	if(affected_rows != 1)
	{
		ACE_DEBUG ((LM_DEBUG,ACE_TEXT("%P|%t invalid affected rows by MySQL\n")));
		return false;
	}
	return true;
}

int ora2my::insert2mysql(ACE_TString sql, size_t lobcount, ACE_TString tablename, MYSQL_BIND *bind)
{
	ACE_TString tmp = "insert into ";
	tmp += tablename;
	tmp += " ";
	tmp += sql;

	if(!VerifyStmt(tmp, lobcount)) return -1;
	if(!BindParam(bind)) return -1;
	if(!Execute()) return -1;
	return 0;
}


/*
		switch(desc[i].otl_var_dbtype)
		{
		case otl_var_char:
			break;
		case otl_var_double:
			break;
		case otl_var_float:
			break;
		case otl_var_int:
			break;
		case otl_var_unsigned_int:
			break;
		case otl_var_short:
			break;
		case otl_var_long_int:
			break;
		case otl_var_timestamp:
			break;
		case otl_var_varchar_long:
			break;
		case otl_var_raw_long:
			break;
		case otl_var_clob:
			break;
		case otl_var_blob:
			break;
		case otl_var_refcur:
			break;
		case otl_var_long_string:
			break;
		case otl_var_db2time:
			break;
		case otl_var_db2date:
			break;
		case otl_var_tz_timestamp:
			break;
		case otl_var_ltz_timestamp:
			break;
		case otl_var_bigint:
			break;
		}
		*/

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -