bcp.c
来自「在Linux/Unix下面访问WINDOWS SQLSERVER 的ODBC驱动」· C语言 代码 · 共 2,086 行 · 第 1/5 页
C
2,086 行
TDS_SMALLINT offsets[256]; int i, row_pos; int ncols = 0; assert(dbproc); assert(rowbuffer); assert(pncols); tdsdump_log(TDS_DBG_FUNC, "_bcp_add_variable_columns(%p, %s, %p, start=%d, %p)\n", dbproc, (behaviour == BCP_REC_NOFETCH_DATA)? "BCP_REC_NOFETCH_DATA" : "BCP_REC_FETCH_DATA", rowbuffer, start, pncols); tdsdump_log(TDS_DBG_FUNC, "%4s %8s %18s %18s %8s\n", "col", "type", "is_nullable_type", "column_nullable", "is null" ); for (i = 0; i < dbproc->bcpinfo->bindinfo->num_cols; i++) { TDSCOLUMN *bcpcol = dbproc->bcpinfo->bindinfo->columns[i]; tdsdump_log(TDS_DBG_FUNC, "%4d %8d %18s %18s %8s\n", i, bcpcol->column_type, is_nullable_type(bcpcol->column_type)? "yes" : "no", bcpcol->column_nullable? "yes" : "no", bcpcol->bcp_column_data->is_null? "yes" : "no" ); } /* the first two bytes of the rowbuffer are reserved to hold the entire record length */ row_pos = start + 2; offsets[0] = row_pos; tdsdump_log(TDS_DBG_FUNC, "%4s %8s %8s %8s\n", "col", "ncols", "row_pos", "cpbytes"); for (i = 0; i < dbproc->bcpinfo->bindinfo->num_cols; i++) { int cpbytes = 0; TDSCOLUMN *bcpcol = dbproc->bcpinfo->bindinfo->columns[i]; /* * Is this column of "variable" type, i.e. NULLable * or naturally variable length e.g. VARCHAR */ if (is_nullable_type(bcpcol->column_type) || bcpcol->column_nullable) { tdsdump_log(TDS_DBG_FUNC, "%4d %8d %8d %8d\n", i, ncols, row_pos, cpbytes); if (behaviour == BCP_REC_FETCH_DATA) { if ((_bcp_get_col_data(dbproc, bcpcol)) != SUCCEED) { return FAIL; } tdsdump_log(TDS_DBG_FUNC, "column %d is %d bytes and is %sNULL\n", i+1, bcpcol->bcp_column_data->datalen, bcpcol->bcp_column_data->is_null? "":"not "); } /* If it's a NOT NULL column, and we have no data, throw an error. */ if (!(bcpcol->column_nullable) && bcpcol->bcp_column_data->is_null) { dbperror(dbproc, SYBEBCNN, 0); return FAIL; } /* move the column buffer into the rowbuffer */ if (!bcpcol->bcp_column_data->is_null) { if (is_blob_type(bcpcol->column_type)) { cpbytes = 16; bcpcol->column_textpos = row_pos; /* save for data write */ } else if (is_numeric_type(bcpcol->column_type)) { TDS_NUMERIC *num = (TDS_NUMERIC *) bcpcol->bcp_column_data->data; cpbytes = tds_numeric_bytes_per_prec[num->precision]; memcpy(&rowbuffer[row_pos], num->array, cpbytes); } else { cpbytes = bcpcol->bcp_column_data->datalen > bcpcol->column_size ? bcpcol->column_size : bcpcol->bcp_column_data->datalen; memcpy(&rowbuffer[row_pos], bcpcol->bcp_column_data->data, cpbytes); } } row_pos += cpbytes; offsets[++ncols] = row_pos; tdsdump_dump_buf(TDS_DBG_NETWORK, "BCP row buffer so far", rowbuffer, row_pos); } } tdsdump_log(TDS_DBG_FUNC, "%4d %8d %8d\n", i, ncols, row_pos); /* * The rowbuffer ends with an offset table and, optionally, an adjustment table. * The offset table has 1-byte elements that describe the locations of the start of each column in * the rowbuffer. If the largest offset is greater than 255, another table -- the adjustment table -- * is inserted just before the offset table. It holds the high bytes. * * Both tables are laid out in reverse: * #elements, offset N+1, offset N, offset N-1, ... offset 0 * E.g. for 2 columns you have 4 data points: * 1. How many elements (4) * 2. Start of column 3 (non-existent, "one off the end") * 3. Start of column 2 * 4. Start of column 1 * The length of each column is computed by subtracting its start from the its successor's start. * * The algorithm below computes both tables.營f the adjustment table isn't needed, the * effect is to overwrite it with the offset table. */ while (ncols && offsets[ncols] == offsets[ncols-1]) ncols--; /* trailing NULL columns are not sent and are not included in the offset table */ if (ncols) { BYTE *padj = rowbuffer + row_pos; BYTE *poff = offsets[ncols] > 0xFF? padj + ncols + 1 : padj; *padj++ = 1 + ncols; *poff++ = 1 + ncols; for (i=0; i <= ncols; i++) { padj[i] = offsets[ncols-i] >> 8; poff[i] = offsets[ncols-i] & 0xFF; } row_pos = poff + ncols + 1 - rowbuffer; } tdsdump_log(TDS_DBG_FUNC, "%4d %8d %8d\n", i, ncols, row_pos); tdsdump_dump_buf(TDS_DBG_NETWORK, "BCP row buffer", rowbuffer, row_pos); *pncols = ncols; return ncols == 0? start : row_pos;}/** * \ingroup dblib_bcp * \brief Write data in host variables to the table. * * \param dbproc contains all information needed by db-lib to manage communications with the server. * * \remarks Call bcp_bind() first to describe the variables to be used. * Use bcp_batch() to commit sets of rows. * After sending the last row call bcp_done(). * \return SUCCEED or FAIL. * \sa bcp_batch(), bcp_bind(), bcp_colfmt(), bcp_collen(), bcp_colptr(), bcp_columns(), * bcp_control(), bcp_done(), bcp_exec(), bcp_init(), bcp_moretext(), bcp_options() */RETCODEbcp_sendrow(DBPROCESS * dbproc){ TDSSOCKET *tds; tdsdump_log(TDS_DBG_FUNC, "bcp_sendrow(%p)\n", dbproc); CHECK_DBPROC(); DBPERROR_RETURN(IS_TDSDEAD(dbproc->tds_socket), SYBEDDNE); CHECK_PARAMETER(dbproc->bcpinfo, SYBEBCPI, FAIL); tds = dbproc->tds_socket; if (dbproc->bcpinfo->direction != DB_IN) { dbperror(dbproc, SYBEBCPN, 0); return FAIL; } if (dbproc->hostfileinfo != NULL) { dbperror(dbproc, SYBEBCPB, 0); return FAIL; } /* * The first time sendrow is called after bcp_init, * there is a certain amount of initialisation to be done. */ if (dbproc->bcpinfo->xfer_init == 0) { /* The start_copy function retrieves details of the table's columns */ if (_bcp_start_copy_in(dbproc) == FAIL) { dbperror(dbproc, SYBEBULKINSERT, 0); return (FAIL); } /* set packet type to send bulk data */ tds->out_flag = TDS_BULK; tds_set_state(tds, TDS_QUERYING); if (IS_TDS7_PLUS(tds)) { _bcp_send_colmetadata(dbproc); } dbproc->bcpinfo->xfer_init = 1; } return _bcp_send_bcp_record(dbproc, BCP_REC_FETCH_DATA);}/** * \ingroup dblib_bcp_internal * \brief * * \param dbproc contains all information needed by db-lib to manage communications with the server. * \param rows_copied * * \return SUCCEED or FAIL. * \sa BCP_SETL(), bcp_batch(), bcp_bind(), bcp_colfmt(), bcp_colfmt_ps(), bcp_collen(), bcp_colptr(), bcp_columns(), bcp_control(), bcp_done(), bcp_exec(), bcp_getl(), bcp_init(), bcp_moretext(), bcp_options(), bcp_readfmt(), bcp_sendrow() */static RETCODE_bcp_exec_in(DBPROCESS * dbproc, DBINT * rows_copied){ FILE *hostfile, *errfile = NULL; TDSSOCKET *tds = dbproc->tds_socket; BCP_HOSTCOLINFO *hostcol; RETCODE ret; int i, row_of_hostfile, rows_written_so_far; int row_error, row_error_count; offset_type row_start, row_end; offset_type error_row_size; const size_t chunk_size = 0x20000u; tdsdump_log(TDS_DBG_FUNC, "_bcp_exec_in(%p, %p)\n", dbproc, rows_copied); assert(dbproc); assert(rows_copied); *rows_copied = 0; if (!(hostfile = fopen(dbproc->hostfileinfo->hostfile, "r"))) { dbperror(dbproc, SYBEBCUO, 0); return FAIL; } if (_bcp_start_copy_in(dbproc) == FAIL) return FAIL; tds->out_flag = TDS_BULK; tds_set_state(tds, TDS_QUERYING); if (IS_TDS7_PLUS(tds)) { _bcp_send_colmetadata(dbproc); } row_of_hostfile = 0; rows_written_so_far = 0; row_start = ftello(hostfile); row_error_count = 0; row_error = 0; while ((ret=_bcp_read_hostfile(dbproc, hostfile, &row_error)) == MORE_ROWS) { row_of_hostfile++; if (row_error) { int count; if (errfile == NULL && dbproc->hostfileinfo->errorfile) { if (!(errfile = fopen(dbproc->hostfileinfo->errorfile, "w"))) { dbperror(dbproc, SYBEBUOE, 0); return FAIL; } } if (errfile != NULL) { char *row_in_error = NULL; for (i = 0; i < dbproc->hostfileinfo->host_colcount; i++) { hostcol = dbproc->hostfileinfo->host_columns[i]; if (hostcol->column_error == HOST_COL_CONV_ERROR) { count = fprintf(errfile, "#@ data conversion error on host data file Row %d Column %d\n", row_of_hostfile, i + 1); if( count < 0 ) { dbperror(dbproc, SYBEBWEF, errno); } } else if (hostcol->column_error == HOST_COL_NULL_ERROR) { count = fprintf(errfile, "#@ Attempt to bulk-copy a NULL value into Server column" " which does not accept NULL values. Row %d, Column %d\n", row_of_hostfile, i + 1); if( count < 0 ) { dbperror(dbproc, SYBEBWEF, errno); } } } row_end = ftello(hostfile); /* error data can be very long so split in chunks */ error_row_size = row_end - row_start; fseeko(hostfile, row_start, SEEK_SET); while (error_row_size > 0) { size_t chunk = error_row_size > chunk_size ? chunk_size : error_row_size; if (!row_in_error) { if ((row_in_error = malloc(chunk)) == NULL) { dbperror(dbproc, SYBEMEM, errno); } } if (fread(row_in_error, chunk, 1, hostfile) != 1) { printf("BILL fread failed after fseek\n"); } count = fwrite(row_in_error, chunk, 1, errfile); if( count < chunk ) { dbperror(dbproc, SYBEBWEF, errno); } error_row_size -= chunk; } free(row_in_error); fseeko(hostfile, row_end, SEEK_SET); count = fprintf(errfile, "\n"); if( count < 0 ) { dbperror(dbproc, SYBEBWEF, errno); } } row_error_count++; if (row_error_count > dbproc->hostfileinfo->maxerrs) break; } else { if (dbproc->hostfileinfo->firstrow <= row_of_hostfile && row_of_hostfile <= MAX(dbproc->hostfileinfo->lastrow, 0x7FFFFFFF)) { if (_bcp_send_bcp_record(dbproc, BCP_REC_NOFETCH_DATA) == SUCCEED) { rows_written_so_far++; if (dbproc->hostfileinfo->batch > 0 && rows_written_so_far == dbproc->hostfileinfo->batch) { rows_written_so_far = 0; tds_flush_packet(tds); tds_set_state(tds, TDS_PENDING); if (tds_process_simple_query(tds) != TDS_SUCCEED) { if (errfile) fclose(errfile); return FAIL; } *rows_copied += tds->rows_affected; dbperror(dbproc, SYBEBBCI, 0); /* batch copied to server */ _bcp_start_new_batch(dbproc); } } } } row_start = ftello(hostfile); row_error = 0; } if( row_error_count == 0 && row_of_hostfile < dbproc->hostfileinfo->firstrow ) { /* "The BCP hostfile '%1!' contains only %2! rows. */ dbperror(dbproc, SYBEBCSA, 0, dbproc->hostfileinfo->hostfile, row_of_hostfile); } if (errfile && 0 != fclose(errfile) ) { dbperror(dbproc, SYBEBUCE, 0); } if (fclose(hostfile) != 0) { dbperror(dbproc, SYBEBCUC, 0); ret = FAIL; } tds_flush_packet(tds); tds_set_state(tds, TDS_PENDING); if (tds_process_simple_query(tds) != TDS_SUCCEED) { return FAIL; } *rows_copied += tds->rows_affected; return ret == NO_MORE_ROWS? SUCCEED : FAIL; /* (ret is returned from _bcp_read_hostfile) */}/** * \ingroup dblib_bcp * \brief Write a datafile to a table. * * * \param dbproc contains all information needed by db-lib to manage communications with the server. * \param rows_copied bcp_exec will write the count of rows successfully written to this address. * If \a rows_copied is NULL, it will be ignored by db-lib. * * \return SUCCEED or FAIL. * \sa bcp_batch(), bcp_bind(), bcp_colfmt(), bcp_collen(), bcp_colptr(), bcp_columns(), * bcp_control(), bcp_done(), bcp_init(), bcp_sendrow() */RETCODEbcp_exec(DBPROCESS * dbproc, DBINT *rows_copied){ DBINT dummy_copied; RETCODE ret = 0; tdsdump_log(TDS_DBG_FUNC, "bcp_exec(%p, %p)\n", dbproc, rows_copied); CHECK_DBPROC(); DBPERROR_RETURN(IS_TDSDEAD(dbproc->tds_socket), SYBEDDNE); CHECK_PARAMETER(dbproc->bcpinfo, SYBEBCPI, FAIL); CHECK_PARAMETER(dbproc->hostfileinfo, SYBEBCVH, FAIL); if (rows_copied == NULL) /* NULL means we should ignore it */ rows_copied = &dummy_copied; if (dbproc->bcpinfo->direction == DB_OUT || dbproc->bcpinfo->direction == DB_QUERYOUT) { ret = _bcp_exec_out(dbproc, rows_copied); } else if (dbproc->bcpinfo->direction == DB_IN) { ret = _bcp_exec_in(dbproc, rows_copied); } _bcp_free_storage(dbproc); retur
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?