📄 cdemodp.c
字号:
state = END_OF_INPUT;
break;
}
}
case FIELD_SET:
{
/* map input data fields to DB columns, set column array entries */
fsetrv = field_set(ctlp, tblp, recp, lastoff);
rowCnt = lastoff + 1;
if (rowCnt == ctlp->nrow_ctl || fsetrv != FIELD_SET_COMPLETE)
{
/* array is full, or have a large partial column, or the
* secondary buffer is in use by an OUTOFLINE field.
*/
state = DO_CONVERT;
/* FALLTHROUGH */
}
else
{
lastoff++; /* use next column array slot */
state = GET_RECORD; /* get next record */
break;
}
}
case DO_CONVERT:
{
/* Either one of the following is true:
* - the column array is full
* - there is a large partial column
* - the secondary buffer used by field_set() is in use
* - previous conversion returned CONVERT_CONTINUE and
* now the conversion is being resumed.
*
* In any case, convert and load the data.
*/
ub4 cvtBadRoff; /* bad row offset from conversion */
ub2 cvtBadCoff; /* bad column offset from conversion */
while (startoff <= lastoff)
{
ub4 cvtCntPerCall = 0; /* rows converted in one call to do_convert */
/* note that each call to do_convert() will convert all contiguous rows
* in the colarray until it hit a row in error while converting.
*/
cvtrv = do_convert(ctlp, startoff, rowCnt, &cvtCntPerCall,
&cvtBadCoff);
cvtCnt += cvtCntPerCall; /* sum of rows converted so far in colarray */
if (cvtrv == CONVERT_SUCCESS)
{
/* One or more rows converted successfully, break
* out of the conversion loop and load the rows.
*/
assert(cvtCntPerCall > 0);
state = DO_LOAD;
break;
}
else if (cvtrv == CONVERT_ERROR)
{
/* Conversion error. Reject the bad record and
* continue on with the next record (if any).
* cvtBadRoff is the 0-based index of the bad row in
* the column array. cvtBadCoff is the 0-based index
* of the bad column (of the bad row) in the column
* array.
*/
assert(cvtCntPerCall >= 0);
cvtBadRoff = startoff + cvtCntPerCall;
err_recnum = ctlp->otor_ctl[cvtBadRoff]; /* map to input_recnum */
fprintf(output_fp, "Conversion Error on record %d, column %d\n",
(int)err_recnum, (int)cvtBadCoff + 1);
/* print err msg txt */
errprint((dvoid *)(ctlp->errhp_ctl), OCI_HTYPE_ERROR,
(sb4 *)0);
/* Check to see if the conversion error occurred on a
* continuation of a partially loaded row.
* If so, either (a) flush the partial row from the server, or
* (b) mark the column as being 0 length and complete.
* In the latter case (b), any data already loaded into the column
* from a previous LoadStream call remains, and we can continue
* field setting, conversion and loading with the next column.
* Here, we implement (a), and flush the row from the server.
*/
if (err_recnum == load_recnum)
{
/* Conversion error occurred on record which has been
* partially loaded (by a previous stream).
* XXX May be better to have an attribute of the direct path
* XXX context which indicates that the last row loaded was
* XXX partial.
*
* Flush the output pipe. Note that on conversion error,
* no part of the row data for the row in error makes it
* into the stream buffer.
* Here we flush the partial row from the server. The
* stream state is reset if no rows are successfully
* converted.
*/
/* flush partial row from server */
(void) OCIDirPathFlushRow(ctlp->dpctx_ctl, ctlp->errhp_ctl);
}
if (cvtBadRoff == lastoff)
{
/* Conversion error occurred on the last populated slot
* of the column array.
* Flush the input stream of any data for this row,
* and re-use this slot for another input record.
*/
field_flush(ctlp, lastoff);
state = GET_RECORD;
startoff = cvtBadRoff; /* only convert the last row */
rowCnt = 0; /* already tried converting all rows in col array */
assert(startoff <= lastoff);
break;
}
else
{
/* Skip over bad row and continue conversion with next row.
* We don't attempt to fill in this slot with another record.
*/
startoff = cvtBadRoff + 1;
assert(startoff <= lastoff);
continue;
}
}
else if (cvtrv == CONVERT_NEED_DATA) /* partial col encountered */
{
/* Partial (large) column encountered, load the piece
* and loop back up to field_set to get the rest of
* the partial column.
* startoff is set to the offset into the column array where
* we need to resume conversion from, which should be the
* last entry that we set (lastoff).
*/
state = DO_LOAD;
/* Set our row position in column array to resume
* conversion at when DO_LOAD transitions to DO_CONVERT.
*/
assert(cvtCntPerCall >= 0);
startoff = startoff + cvtCntPerCall;
assert(startoff == lastoff);
break;
}
else if (cvtrv == CONVERT_CONTINUE)
{
/* The stream buffer is full and there is more data in
* the column array which needs to be converted.
* Load the stream (DO_LOAD) and transition back to
* DO_CONVERT to convert the remainder of the column array,
* without calling the field setting function in between.
* The sequence {DO_CONVERT, DO_LOAD} may occur many times
* for a long row or column.
* Note that startoff becomes the offset into the column array
* where we need to resume conversion from.
*/
cvtcontcnt++;
state = DO_LOAD;
/* Set our row position in column array (startoff) to
* resume conversion at when we transition from the
* DO_LOAD state back to DO_CONVERT.
*/
assert(cvtCntPerCall >= 0);
startoff = startoff + cvtCntPerCall;
assert(startoff <= lastoff);
break;
}
} /* end while */
break;
}
case DO_LOAD:
{
ub4 loadCnt; /* count of rows loaded by do_load */
ldrv = do_load(ctlp, &loadCnt);
nxtLoadOff = nxtLoadOff + loadCnt;
switch (ldrv)
{
case LOAD_SUCCESS:
{
/* The stream has been loaded successfully. What we do next
* depends on the result of the previous conversion step.
*/
load_recnum = ctlp->otor_ctl[nxtLoadOff - 1];
if (cvtrv == CONVERT_SUCCESS || cvtrv == CONVERT_ERROR)
{
/* The column array was successfully converted (or the
* last row was in error).
* Fill up another array with more input records.
*/
state = RESET;
}
else if (cvtrv == CONVERT_CONTINUE)
{
/* There is more data in column array to convert and load. */
state = DO_CONVERT;
/* Note that when do_convert returns CONVERT_CONTINUE that
* startoff was set to the row offset into the column array
* of where to resume conversion. The loadCnt returned by
* OCIDirPathLoadStream is the number of rows successfully
* loaded.
* Do a sanity check on the attributes here.
*/
if (startoff != nxtLoadOff) /* sanity */
fprintf(output_fp, "LOAD_SUCCESS/CONVERT_CONTINUE: %ld:%ld\n",
(long)nxtLoadOff, startoff);
/* Reset the direct stream state so conversion starts at
* the beginning of the stream.
*/
(void) OCIDirPathStreamReset(ctlp->dpstr_ctl, ctlp->errhp_ctl);
}
else
{
/* Note that if the previous conversion step returned
* CONVERT_NEED_DATA then the load step would have returned
* LOAD_NEED_DATA too (not LOAD_SUCCESS).
*/
FATAL("DO_LOAD:LOAD_SUCCESS: unexpected cvtrv", cvtrv);
}
break;
}
case LOAD_ERROR:
{
sb4 oraerr;
ub4 badRowOff;
badRowOff = nxtLoadOff;
nxtLoadOff += 1; /* account for bad row */
err_recnum = ctlp->otor_ctl[badRowOff]; /* map to input_recnum */
fprintf(output_fp, "Error on record %ld\n", (long)err_recnum);
/* print err msg txt */
errprint((dvoid *)(ctlp->errhp_ctl), OCI_HTYPE_ERROR, &oraerr);
/* On a load error, all rows up to the row in error are loaded.
* account for that here by setting load_recnum only when some
* rows have been loaded.
*/
if (loadCnt != 0)
load_recnum = err_recnum - 1;
if (oraerr == OER(600))
FATAL("DO_LOAD:LOAD_ERROR: server internal error", oraerr);
if (err_recnum == input_recnum)
{
/* Error occurred on last input row, which may or may not
* be in a partial state. Flush any remaining input for
* the bad row.
*/
field_flush(ctlp, badRowOff);
}
if (err_recnum == load_recnum)
{
/* Server has part of this row already, flush it */
(void) OCIDirPathFlushRow(ctlp->dpctx_ctl, ctlp->errhp_ctl);
}
if (badRowOff == lastoff)
{
/* Error occurred on the last entry in the column array,
* go process more input records and set up another array.
*/
state = RESET;
}
else
{
/* Otherwise, continue loading this stream. Note that the
* stream positions itself to the next row on error.
*/
state = DO_LOAD;
}
break;
}
case LOAD_NEED_DATA:
{
load_recnum = ctlp->otor_ctl[nxtLoadOff];
if (cvtrv == CONVERT_NEED_DATA)
state = FIELD_SET; /* need more input data */
else if (cvtrv == CONVERT_CONTINUE)
state = DO_CONVERT; /* have input data, continue with conversion */
else
FATAL("DO_LOAD:LOAD_NEED_DATA: unexpected cvtrv", cvtrv);
/* Reset the direct stream state so conversion starts at
* the beginning of the stream.
*/
(void) OCIDirPathStreamReset(ctlp->dpstr_ctl, ctlp->errhp_ctl);
break;
}
case LOAD_NO_DATA:
{
/* Attempt to either load an empty stream, or a stream
* which has been completely processed.
*/
if (cvtrv == CONVERT_CONTINUE)
{
/* Reset stream state so we convert into an empty stream buffer. */
(void) OCIDirPathStreamReset(ctlp->dpstr_ctl, ctlp->errhp_ctl);
state = DO_CONVERT; /* convert remainder of column array */
}
else
state = RESET; /* get some more input records */
break;
}
default:
FATAL("DO_LOAD: unexpected return value", ldrv);
break;
}
break;
}
case END_OF_INPUT:
{
if (cvtCnt)
state = DO_LOAD; /* deal with data already converted, but not loaded */
else if (rowCnt)
state = DO_CONVERT; /* deal with a partially populated column array */
else
done = TRUE;
break;
}
default:
FATAL("SIMPLE_LOAD: unexpected state", state);
break;
} /* end switch (state) */
}
/*
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -