📄 blk.c
字号:
return blk_rowxfer_mult(blkdesc, &row_count);}CS_RETCODEblk_rowxfer_mult(CS_BLKDESC * blkdesc, CS_INT * row_count){ int rows_to_xfer = 0; int rows_xferred = 0; CS_RETCODE ret; tdsdump_log(TDS_DBG_FUNC, "blk_rowxfer_mult()\n"); if (!row_count || *row_count == 0 ) rows_to_xfer = blkdesc->bind_count; else rows_to_xfer = *row_count; if (blkdesc->direction == CS_BLK_IN) { ret = _blk_rowxfer_in(blkdesc, rows_to_xfer, &rows_xferred); } else { ret = _blk_rowxfer_out(blkdesc, rows_to_xfer, &rows_xferred); } if (row_count) *row_count = rows_xferred; return ret;}CS_RETCODEblk_sendrow(CS_BLKDESC * blkdesc, CS_BLK_ROW * row){ tdsdump_log(TDS_DBG_FUNC, "UNIMPLEMENTED blk_sendrow()\n"); return CS_FAIL;}CS_RETCODEblk_sendtext(CS_BLKDESC * blkdesc, CS_BLK_ROW * row, CS_BYTE * buffer, CS_INT buflen){ tdsdump_log(TDS_DBG_FUNC, "UNIMPLEMENTED blk_sendtext()\n"); return CS_FAIL;}CS_RETCODEblk_srvinit(SRV_PROC * srvproc, CS_BLKDESC * blkdescp){ tdsdump_log(TDS_DBG_FUNC, "UNIMPLEMENTED blk_srvinit()\n"); return CS_FAIL;}CS_RETCODEblk_textxfer(CS_BLKDESC * blkdesc, CS_BYTE * buffer, CS_INT buflen, CS_INT * outlen){ tdsdump_log(TDS_DBG_FUNC, "UNIMPLEMENTED blk_textxfer()\n"); return CS_FAIL;}static CS_RETCODE_blk_rowxfer_out(CS_BLKDESC * blkdesc, CS_INT rows_to_xfer, CS_INT * rows_xferred){ TDSSOCKET *tds; TDS_INT result_type; TDS_INT ret; TDS_INT temp_count; TDS_INT row_of_query; TDS_INT rows_written; tdsdump_log(TDS_DBG_FUNC, "blk_rowxfer_out()\n"); if (!blkdesc || !blkdesc->con) return CS_FAIL; tds = blkdesc->con->tds_socket; /* * the first time blk_xfer called after blk_init() * do the query and get to the row data... */ if (blkdesc->xfer_init == 0) { if (tds_submit_queryf(tds, "select * from %s", blkdesc->tablename) == TDS_FAIL) { _ctclient_msg(blkdesc->con, "blk_rowxfer", 2, 5, 1, 140, ""); return CS_FAIL; } while ((ret = tds_process_tokens(tds, &result_type, NULL, TDS_TOKEN_RESULTS)) == TDS_SUCCEED) { if (result_type == TDS_ROW_RESULT) break; } if (ret != TDS_SUCCEED || result_type != TDS_ROW_RESULT) { _ctclient_msg(blkdesc->con, "blk_rowxfer", 2, 5, 1, 140, ""); return CS_FAIL; } blkdesc->xfer_init = 1; } row_of_query = 0; rows_written = 0; if (rows_xferred) *rows_xferred = 0; for (temp_count = 0; temp_count < rows_to_xfer; temp_count++) { ret = tds_process_tokens(tds, &result_type, NULL, TDS_STOPAT_ROWFMT|TDS_STOPAT_DONE|TDS_RETURN_ROW|TDS_RETURN_COMPUTE); tdsdump_log(TDS_DBG_FUNC, "blk_rowxfer_out() process_row_tokens returned %d\n", ret); switch (ret) { case TDS_SUCCEED: if (result_type == TDS_ROW_RESULT || result_type == TDS_COMPUTE_RESULT) { if (result_type == TDS_ROW_RESULT) { if (_ct_bind_data( blkdesc->con->ctx, tds->current_results, blkdesc->bindinfo, temp_count)) return CS_ROW_FAIL; if (rows_xferred) *rows_xferred = *rows_xferred + 1; } break; } case TDS_NO_MORE_RESULTS: return CS_END_DATA; break; default: _ctclient_msg(blkdesc->con, "blk_rowxfer", 2, 5, 1, 140, ""); return CS_FAIL; break; } } return CS_SUCCEED;}static CS_RETCODE_blk_rowxfer_in(CS_BLKDESC * blkdesc, CS_INT rows_to_xfer, CS_INT * rows_xferred){ TDSSOCKET *tds; TDS_INT each_row; if (!blkdesc) return CS_FAIL; tds = blkdesc->con->tds_socket; /* * the first time blk_xfer called after blk_init() * do the query and get to the row data... */ if (blkdesc->xfer_init == 0) { /* * first call the start_copy function, which will * retrieve details of the database table columns */ if (_rowxfer_in_init(blkdesc) == CS_FAIL) return (CS_FAIL); /* set packet type to send bulk data */ tds->out_flag = TDS_BULK; if (IS_TDS7_PLUS(tds)) { _blk_send_colmetadata(blkdesc); } blkdesc->xfer_init = 1; } for (each_row = 0; each_row < rows_to_xfer; each_row++ ) { if (_blk_build_bcp_record(blkdesc, each_row) == CS_SUCCEED) { } } return CS_SUCCEED;}static CS_RETCODE_rowxfer_in_init(CS_BLKDESC * blkdesc){ TDSSOCKET *tds = blkdesc->con->tds_socket; TDSCOLUMN *bcpcol; int i; int firstcol; int fixed_col_len_tot = 0; int variable_col_len_tot = 0; int column_bcp_data_size = 0; int bcp_record_size = 0; char *query; char clause_buffer[4096] = { 0 }; TDS_PBCB colclause; colclause.pb = clause_buffer; colclause.cb = sizeof(clause_buffer); if (IS_TDS7_PLUS(tds)) { int erc; firstcol = 1; for (i = 0; i < blkdesc->bindinfo->num_cols; i++) { bcpcol = blkdesc->bindinfo->columns[i]; if (blkdesc->identity_insert_on) { if (!bcpcol->column_timestamp) { _blk_build_bulk_insert_stmt(&colclause, bcpcol, firstcol); firstcol = 0; } } else { if (!bcpcol->column_identity && !bcpcol->column_timestamp) { _blk_build_bulk_insert_stmt(&colclause, bcpcol, firstcol); firstcol = 0; } } } erc = asprintf(&query, "insert bulk %s (%s)", blkdesc->tablename, colclause.pb); if (colclause.pb != clause_buffer) TDS_ZERO_FREE(colclause.pb); /* just for good measure; not used beyond this point */ if (erc < 0) { return CS_FAIL; } } else { /* NOTE: if we use "with nodescribe" for following inserts server do not send describe */ if (asprintf(&query, "insert bulk %s", blkdesc->tablename) < 0) { return CS_FAIL; } } tds_submit_query(tds, query); /* save the statement for later... */ blkdesc->insert_stmt = query; /* * In TDS 5 we get the column information as a result set from the "insert bulk" command. * We're going to ignore it. */ if (tds_process_simple_query(tds) != TDS_SUCCEED) { _ctclient_msg(blkdesc->con, "blk_rowxfer", 2, 5, 1, 140, ""); return CS_FAIL; } /* FIXME find a better way, some other thread could change state here */ tds_set_state(tds, TDS_QUERYING); /* * Work out the number of "variable" columns. These are either nullable or of * varying length type e.g. varchar. */ blkdesc->var_cols = 0; if (IS_TDS50(tds)) { for (i = 0; i < blkdesc->bindinfo->num_cols; i++) { bcpcol = blkdesc->bindinfo->columns[i]; /* * work out storage required for this datatype * blobs always require 16, numerics vary, the * rest can be taken from the server */ if (is_blob_type(bcpcol->on_server.column_type)) column_bcp_data_size = 16; else if (is_numeric_type(bcpcol->on_server.column_type)) column_bcp_data_size = tds_numeric_bytes_per_prec[bcpcol->column_prec]; else column_bcp_data_size = bcpcol->column_size; /* * now add that size into either fixed or variable * column totals... */ if (is_nullable_type(bcpcol->on_server.column_type) || bcpcol->column_nullable) { blkdesc->var_cols++; variable_col_len_tot += column_bcp_data_size; } else { fixed_col_len_tot += column_bcp_data_size; } } /* this formula taken from sybase manual... */ bcp_record_size = 4 + fixed_col_len_tot + variable_col_len_tot + ( (int)(variable_col_len_tot / 256 ) + 1 ) + (blkdesc->var_cols + 1) + 2; tdsdump_log(TDS_DBG_FUNC, "current_record_size = %d\n", blkdesc->bindinfo->row_size); tdsdump_log(TDS_DBG_FUNC, "bcp_record_size = %d\n", bcp_record_size); if (bcp_record_size > blkdesc->bindinfo->row_size) { blkdesc->bindinfo->current_row = realloc(blkdesc->bindinfo->current_row, bcp_record_size); if (blkdesc->bindinfo->current_row == NULL) { tdsdump_log(TDS_DBG_FUNC, "could not realloc current_row\n"); return CS_FAIL; } blkdesc->bindinfo->row_size = bcp_record_size; } } return CS_SUCCEED;}static CS_RETCODE_blk_build_bulk_insert_stmt(TDS_PBCB * clause, TDSCOLUMN * bcpcol, int first){ char buffer[32]; char *column_type = buffer; switch (bcpcol->on_server.column_type) { case SYBINT1: column_type = "tinyint"; break; case SYBBIT: column_type = "bit"; break; case SYBINT2: column_type = "smallint"; break; case SYBINT4: column_type = "int"; break; case SYBINT8: column_type = "bigint"; break; case SYBDATETIME: column_type = "datetime"; break; case SYBDATETIME4: column_type = "smalldatetime"; break; case SYBREAL: column_type = "real"; break; case SYBMONEY: column_type = "money"; break; case SYBMONEY4: column_type = "smallmoney"; break; case SYBFLT8: column_type = "float"; break; case SYBINTN: switch (bcpcol->column_size) { case 1: column_type = "tinyint"; break; case 2: column_type = "smallint"; break; case 4: column_type = "int"; break; case 8: column_type = "bigint"; break; } break; case SYBBITN: column_type = "bit"; break; case SYBFLTN: switch (bcpcol->column_size) { case 4: column_type = "real"; break; case 8: column_type = "float"; break; } break; case SYBMONEYN: switch (bcpcol->column_size) { case 4: column_type = "smallmoney"; break; case 8: column_type = "money"; break; } break; case SYBDATETIMN: switch (bcpcol->column_size) { case 4: column_type = "smalldatetime"; break; case 8: column_type = "datetime"; break; } break; case SYBDECIMAL: sprintf(column_type, "decimal(%d,%d)", bcpcol->column_prec, bcpcol->column_scale); break; case SYBNUMERIC: sprintf(column_type, "numeric(%d,%d)", bcpcol->column_prec, bcpcol->column_scale); break; case XSYBVARBINARY: sprintf(column_type, "varbinary(%d)", bcpcol->column_size); break; case XSYBVARCHAR: sprintf(column_type, "varchar(%d)", bcpcol->column_size); break; case XSYBBINARY: sprintf(column_type, "binary(%d)", bcpcol->column_size); break; case XSYBCHAR: sprintf(column_type, "char(%d)", bcpcol->column_size); break; case SYBTEXT: sprintf(column_type, "text"); break; case SYBIMAGE: sprintf(column_type, "image"); break; case XSYBNVARCHAR: sprintf(column_type, "nvarchar(%d)", bcpcol->column_size); break; case XSYBNCHAR: sprintf(column_type, "nchar(%d)", bcpcol->column_size); break; case SYBNTEXT: sprintf(column_type, "ntext"); break; case SYBUNIQUE: sprintf(column_type, "uniqueidentifier"); break; default: tdsdump_log(TDS_DBG_FUNC, "error: cannot build bulk insert statement. unrecognized server datatype %d\n", bcpcol->on_server.column_type); return CS_FAIL; } if (clause->cb < strlen(clause->pb) + strlen(bcpcol->column_name) + strlen(column_type) + ((first) ? 2u : 4u)) { char *temp = malloc(2 * clause->cb); if (!temp) return CS_FAIL; strcpy(temp, clause->pb); clause->pb = temp; clause->cb *= 2; } if (!first) strcat(clause->pb, ", "); strcat(clause->pb, bcpcol->column_name); strcat(clause->pb, " "); strcat(clause->pb, column_type); return CS_SUCCEED;}static CS_RETCODE_blk_send_colmetadata(CS_BLKDESC * blkdesc){ TDSSOCKET *tds = blkdesc->con->tds_socket; unsigned char colmetadata_token = 0x81; TDSCOLUMN *bcpcol; int i; TDS_SMALLINT num_cols; /* * Deep joy! For TDS 8 we have to send a colmetadata message followed by row data */ tds_put_byte(tds, colmetadata_token); /* 0x81 */ num_cols = 0; for (i = 0; i < blkdesc->bindinfo->num_cols; i++) { bcpcol = blkdesc->bindinfo->columns[i]; if ((!blkdesc->identity_insert_on && bcpcol->column_identity) || bcpcol->column_timestamp) { continue; } num_cols++; } tds_put_smallint(tds, num_cols); for (i = 0; i < blkdesc->bindinfo->num_cols; i++) { bcpcol = blkdesc->bindinfo->columns[i]; /* * dont send the (meta)data for timestamp columns, or * identity columns (unless indentity_insert is enabled */ if ((!blkdesc->identity_insert_on && bcpcol->column_identity) || bcpcol->column_timestamp) { continue; } if (IS_TDS90(tds)) tds_put_int(tds, bcpcol->column_usertype); else tds_put_smallint(tds, bcpcol->column_usertype); tds_put_smallint(tds, bcpcol->column_flags); tds_put_byte(tds, bcpcol->on_server.column_type); switch (bcpcol->column_varint_size) { case 4: tds_put_int(tds, bcpcol->column_size); break; case 2: tds_put_smallint(tds, bcpcol->column_size); break; case 1: tds_put_byte(tds, bcpcol->column_size); break; case 0: break; default: break;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -