📄 qresult.c
字号:
{ TupleField *tuple = self->backend_tuples; /* not a correction */ /* Determine the optimum cache size. */ if (ci->drivers.fetch_max % req_size == 0) fetch_size = ci->drivers.fetch_max; else if ((Int4)req_size < ci->drivers.fetch_max) /*fetch_size = (ci->drivers.fetch_max / req_size + 1) * req_size;*/ fetch_size = (ci->drivers.fetch_max / req_size) * req_size; else fetch_size = req_size; self->cache_size = fetch_size; /* clear obsolete tuples */inolog("clear obsolete %d tuples\n", num_backend_rows); ClearCachedRows(tuple, num_fields, num_backend_rows); QR_stop_movement(self); self->move_offset = 0; QR_set_next_in_cache(self, offset + 1); } else { /* * The rowset boundary doesn't match that of * the inner resultset. Enlarge the resultset * and fetch the rest of the rowset. */ /* The next fetch size is */ fetch_size = (Int4) (end_tuple - num_backend_rows); if (fetch_size <= 0) { mylog("corrupted fetch_size end_tuple=%d <= cached_rows=%d\n", end_tuple, num_backend_rows); return -1; } /* and enlarge the cache size */ self->cache_size += fetch_size; offset = self->fetch_number; QR_inc_next_in_cache(self); boundary_adjusted = TRUE; } if (enlargeKeyCache(self, self->cache_size - num_backend_rows, "Out of memory while reading tuples") < 0) return FALSE; if (PROTOCOL_74(ci) && !QR_is_permanent(self) /* Execute seems an invalid operation after COMMIT */ ) { ExecuteRequest = TRUE; if (!SendExecuteRequest(stmt, QR_get_cursor(self), fetch_size)) return FALSE; if (!SendSyncRequest(conn)) return FALSE; } else { QResultClass *res; sprintf(fetch, "fetch %d in \"%s\"", fetch_size, QR_get_cursor(self)); mylog("%s: sending actual fetch (%d) query '%s'\n", func, fetch_size, fetch); /* don't read ahead for the next tuple (self) ! */ qi.row_size = self->cache_size; qi.result_in = self; qi.cursor = NULL; res = CC_send_query(conn, fetch, &qi, 0, stmt); if (!QR_command_maybe_successful(res)) { QR_set_rstatus(self, PORES_FATAL_ERROR); QR_set_message(self, "Error fetching next group."); return FALSE; } } internally_invoked = TRUE; cur_fetch = 0; QR_set_fetching_tuples(self); } else { mylog("%s: inTuples = true, falling through: fcount = %d, fetch_number = %d\n", func, self->num_cached_rows, self->fetch_number); /* * This is a pre-fetch (fetching rows right after query but * before any real SQLFetch() calls. This is done so the * field attributes are available. */ QR_set_next_in_cache(self, 0); } if (!boundary_adjusted) { QR_set_rowstart_in_cache(self, offset); QR_set_num_cached_rows(self, 0); } sock = CC_get_socket(conn); self->tupleField = NULL; ci = &(conn->connInfo); num_rows_in = self->num_cached_rows; curr_eof = reached_eof_now = (QR_once_reached_eof(self) && self->cursTuple >= (Int4)self->num_total_read);inolog("reached_eof_now=%d\n", reached_eof_now); for (rcvend = FALSE; !rcvend;) { id = SOCK_get_id(sock); if (0 != SOCK_get_errcode(sock)) break; response_length = SOCK_get_response_length(sock);inolog("id='%c' response_length=%d\n", id, response_length); switch (id) { case 'P': mylog("Portal name within tuples ?? just ignore\n"); SOCK_get_string(sock, msgbuffer, ERROR_MSG_LENGTH); break; case 'T': mylog("Tuples within tuples ?? OK try to handle them\n"); QR_set_no_fetching_tuples(self); if (self->num_total_read > 0) { mylog("fetched %d rows\n", self->num_total_read); /* set to first row */ self->tupleField = self->backend_tuples + (offset * num_fields); } else { mylog(" [ fetched 0 rows ]\n"); } /* add new Result class */ self->next = QR_Constructor(); if (!self->next) { CC_set_error(conn, CONNECTION_COULD_NOT_RECEIVE, "Could not create result info in send_query.", func); CC_on_abort(conn, CONN_DEAD); return FALSE; } QR_set_cache_size(self->next, self->cache_size); self = self->next; if (!QR_fetch_tuples(self, conn, NULL)) { CC_set_error(conn, CONNECTION_COULD_NOT_RECEIVE, QR_get_message(self), func); return FALSE; } rcvend = TRUE; break; case 'B': /* Tuples in binary format */ case 'D': /* Tuples in ASCII format */ if (!QR_get_tupledata(self, id == 'B')) { ret = FALSE; rcvend = TRUE; } cur_fetch++; break; /* continue reading */ case 'C': /* End of tuple list */ SOCK_get_string(sock, cmdbuffer, ERROR_MSG_LENGTH); QR_set_command(self, cmdbuffer); mylog("end of tuple list -- setting inUse to false: this = %p %s\n", self, cmdbuffer); qlog(" [ fetched %d rows ]\n", self->num_total_read); mylog("_%s: 'C' fetch_total = %d & this_fetch = %d\n", func, self->num_total_read, self->num_cached_rows); if (QR_is_fetching_tuples(self)) { QR_set_no_fetching_tuples(self); if (internally_invoked) { if (ExecuteRequest) /* Execute completed without accepting Portal Suspend */ reached_eof_now = TRUE; else if (cur_fetch < fetch_size) reached_eof_now = TRUE; } else if (self->num_cached_rows < self->cache_size) reached_eof_now = TRUE; else if (!QR_get_cursor(self)) reached_eof_now = TRUE; } if (reached_eof_now) { /* last row from cache */ /* We are done because we didn't even get CACHE_SIZE tuples */ mylog("%s: backend_rows < CACHE_SIZE: brows = %d, cache_size = %d\n", func, num_backend_rows, self->cache_size); } if (!internally_invoked || PG_VERSION_LE(conn, 6.3)) rcvend = TRUE; break; case 'E': /* Error */ msg_truncated = handle_error_message(conn, msgbuffer, sizeof(msgbuffer), self->sqlstate, "next_tuple", self); mylog("ERROR from backend in next_tuple: '%s'\n", msgbuffer); qlog("ERROR from backend in next_tuple: '%s'\n", msgbuffer); rcvend = TRUE; ret = FALSE; break; case 'N': /* Notice */ msg_truncated = handle_notice_message(conn, cmdbuffer, sizeof(cmdbuffer), self->sqlstate, "next_tuple", self); qlog("NOTICE from backend in next_tuple: '%s'\n", msgbuffer); continue; case 'Z': /* Ready for query */ EatReadyForQuery(conn); if (QR_is_fetching_tuples(self)) { reached_eof_now = TRUE; QR_set_no_fetching_tuples(self); } rcvend = TRUE; break; case 's': /* portal suspend */ mylog("portal suspend"); QR_set_no_fetching_tuples(self); break; default: /* skip the unexpected response if possible */ if (response_length >= 0) break; /* this should only happen if the backend * dumped core ??? */ mylog("%s: Unexpected result from backend: id = '%c' (%d)\n", func, id, id); qlog("%s: Unexpected result from backend: id = '%c' (%d)\n", func, id, id); QR_set_message(self, "Unexpected result from backend. It probably crashed"); QR_set_rstatus(self, PORES_FATAL_ERROR); CC_on_abort(conn, CONN_DEAD); ret = FALSE; rcvend = TRUE; } if (0 != SOCK_get_errcode(sock)) break; } if (0 != SOCK_get_errcode(sock)) { if (QR_command_maybe_successful(self)) { QR_set_message(self, "Communication error while getting a tuple"); QR_set_rstatus(self, PORES_FATAL_ERROR); } CC_on_abort(conn, CONN_DEAD); ret = FALSE; } if (!ret) return ret; if (!QR_is_fetching_tuples(self)) { SQLLEN start_idx = 0; num_backend_rows = self->num_cached_rows; if (reached_eof_now) { mylog("%s: reached eof now\n", func); QR_set_reached_eof(self); if (!curr_eof) self->cursTuple++; if (self->ad_count > 0 && cur_fetch < fetch_size) { /* We have to append the tuples(keys) info from the added tuples(keys) here */ SQLLEN add_size; TupleField *tuple, *added_tuple; if (curr_eof) { start_idx = CacheIdx2GIdx(offset, stmt, self) - self->num_total_read; add_size = self->ad_count - start_idx; if (0 == num_backend_rows) { offset = 0; QR_set_rowstart_in_cache(self, offset); QR_set_next_in_cache(self, offset); } } else { start_idx = 0; add_size = self->ad_count; } if (add_size > fetch_size - cur_fetch) add_size = fetch_size - cur_fetch;inolog("will add %d added_tuples from %d and select the %dth added tuple\n", add_size, start_idx, offset - num_backend_rows + start_idx); if (add_size > fetch_size - cur_fetch) add_size = fetch_size - cur_fetch; else if (add_size < 0) add_size = 0; if (enlargeKeyCache(self, add_size, "Out of memory while adding tuples") < 0) return FALSE; /* append the KeySet info first */ memcpy(self->keyset + num_backend_rows, (void *)(self->added_keyset + start_idx), sizeof(KeySet) * add_size); /* and append the tuples info */ tuple = self->backend_tuples + num_fields * num_backend_rows; memset(tuple, 0, sizeof(TupleField) * num_fields * add_size); added_tuple = self->added_tuples + num_fields * start_idx; ReplaceCachedRows(tuple, added_tuple, num_fields, add_size); self->num_cached_rows += add_size; self->num_cached_keys += add_size; num_backend_rows = self->num_cached_rows; } } if (offset < num_backend_rows) { /* set to first row */ self->tupleField = self->backend_tuples + (offset * num_fields); } else { /* We are surely done here (we read 0 tuples) */ mylog("_%s: 'C': DONE (fcount == %d)\n", func, num_backend_rows); ret = -1; /* end of tuples */ } } /* If the cursor operation was invoked inside this function, we have to set the status bits here. */ if (internally_invoked && self->keyset && (self->dl_count > 0 || self->up_count > 0)) { SQLLEN i, lf; SQLLEN lidx, hidx; SQLULEN *deleted = self->deleted, *updated = self->updated; num_backend_rows = QR_get_num_cached_tuples(self); /* For simplicty, use CURS_NEEDS_REREAD bit to mark the row */ for (i = num_rows_in; i < num_backend_rows; i++) self->keyset[i].status |= CURS_NEEDS_REREAD; hidx = RowIdx2GIdx(num_backend_rows, stmt); lidx = hidx - num_backend_rows; /* deleted info */ for (i = 0; i < self->dl_count && hidx > (Int4)deleted[i]; i++) { if (lidx <= (Int4)deleted[i]) { lf = num_backend_rows - hidx + deleted[i]; self->keyset[lf].status = self->deleted_keyset[i].status; /* mark the row off */ self->keyset[lf].status &= (~CURS_NEEDS_REREAD); } } for (i = self->up_count - 1; i >= 0; i--) { if (hidx > (Int4)updated[i] && lidx <= (Int4)updated[i]) { lf = num_backend_rows - hidx + updated[i]; /* in case the row is marked off */ if (0 == (self->keyset[lf].status & CURS_NEEDS_REREAD)) continue; self->keyset[lf] = self->updated_keyset[i]; ReplaceCachedRows(self->backend_tuples + lf * num_fields, self->updated_tuples + i * num_fields, num_fields, 1); self->keyset[lf].status &= (~CURS_NEEDS_REREAD); } } /* reset CURS_NEEDS_REREAD bit */ for (i = 0; i < num_backend_rows; i++) { self->keyset[i].status &= (~CURS_NEEDS_REREAD);/*inolog("keyset[%d].status=%x\n", i, self->keyset[i].status);*/ } }inolog("%s returning %d offset=%d\n", func, ret, offset); return ret;}static charQR_read_a_tuple_from_db(QResultClass *self, char binary){ Int2 field_lf; TupleField *this_tuplefield; KeySet *this_keyset = NULL; char bmp, bitmap[MAX_FIELDS]; /* Max. len of the bitmap */ Int2 bitmaplen; /* len of the bitmap in bytes */ Int2 bitmap_pos; Int2 bitcnt; Int4 len; char *buffer; int ci_num_fields = QR_NumResultCols(self); /* speed up access */ int num_fields = self->num_fields; /* speed up access */ SocketClass *sock = CC_get_socket(QR_get_conn(self)); ColumnInfoClass *flds; int effective_cols; char tidoidbuf[32]; ConnInfo *ci = &(QR_get_conn(self)->connInfo); /* set the current row to read the fields into */ effective_cols = QR_NumPublicResultCols(self); this_tuplefield = self->backend_tuples + (self->num_cached_rows * num_fields); if (QR_haskeyset(self)) { /* this_keyset = self->keyset + self->cursTuple + 1; */ this_keyset = self->keyset + self->num_cached_keys; this_keyset->status = 0; } bitmaplen = (Int2) ci_num_fields / BYTELEN; if ((ci_num_fields % BYTELEN) > 0) bitmaplen++; /* * At first the server sends a bitmap that indicates which database * fields are null */ if (PROTOCOL_74(ci)) { int numf = SOCK_get_int(sock, sizeof(Int2));if (effective_cols > 0){inolog("%dth record in cache numf=%d\n", self->num_cached_rows, numf);}else{inolog("%dth record in key numf=%d\n", self->num_cached_keys, numf);} } else SOCK_get_n_char(sock, bitmap, bitmaplen); bitmap_pos = 0; bitcnt = 0; bmp = bitmap[bitmap_pos]; flds = self->fields; for (field_lf = 0; field_lf < ci_num_fields; field_lf++) { /* Check if the current field is NULL */ if (!PROTOCOL_74(ci) && (!(bmp & 0200))) { /* YES, it is NULL ! */ this_tuplefield[field_lf].len = 0; this_tuplefield[field_lf].value = 0; } else { /* * NO, the field is not null. so get at first the length of * the field (four bytes) */ len = SOCK_get_int(sock, VARHDRSZ);inolog("QR_read_a_tuple_from_db len=%d\n", len); if (PROTOCOL_74(ci)) { if (len < 0) { /* YES, it is NULL ! */ this_tuplefield[field_lf].len = 0; this_tuplefield[field_lf].value = 0; continue; } } else if (!binary) len -= VARHDRSZ; if (field_lf >= effective_cols) buffer = tidoidbuf; else buffer = (char *) malloc(len + 1); SOCK_get_n_char(sock, buffer, len); buffer[len] = '\0'; mylog("qresult: len=%d, buffer='%s'\n", len, buffer); if (field_lf >= effective_cols) { if (field_lf == effective_cols) sscanf(buffer, "(%u,%hu)", &this_keyset->blocknum, &this_keyset->offset); else this_keyset->oid = strtoul(buffer, NULL, 10); } else { this_tuplefield[field_lf].len = len; this_tuplefield[field_lf].value = buffer; /* * This can be used to set the longest length of the column * for any row in the tuple cache. It would not be accurate * for varchar and text fields to use this since a tuple cache * is only 100 rows. Bpchar can be handled since the strlen of * all rows is fixed, assuming there are not 100 nulls in a * row! */ if (flds && flds->coli_array && CI_get_display_size(flds, field_lf) < len) CI_get_display_size(flds, field_lf) = len; } } /* * Now adjust for the next bit to be scanned in the next loop. */ bitcnt++; if (BYTELEN == bitcnt) { bitmap_pos++; bmp = bitmap[bitmap_pos]; bitcnt = 0; } else bmp <<= 1; } self->cursTuple++; return TRUE;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -