📄 qam.c
字号:
/* * __qam_consume -- try to reset the head of the queue. * */static int__qam_consume(dbc, meta, first) DBC *dbc; QMETA *meta; db_recno_t first;{ DB *dbp; DB_LOCK lock, save_lock; DB_MPOOLFILE *mpf; QUEUE_CURSOR *cp; db_indx_t save_indx; db_pgno_t save_page; db_recno_t current, save_recno; u_int32_t rec_extent; int exact, put_mode, ret, t_ret, wrapped; dbp = dbc->dbp; mpf = dbp->mpf; cp = (QUEUE_CURSOR *)dbc->internal; put_mode = DB_MPOOL_DIRTY; ret = t_ret = 0; save_page = cp->pgno; save_indx = cp->indx; save_recno = cp->recno; save_lock = cp->lock; /* * If we skipped some deleted records, we need to * reposition on the first one. Get a lock * in case someone is trying to put it back. */ if (first != cp->recno) { ret = __db_lget(dbc, 0, first, DB_LOCK_READ, DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock); if (ret == DB_LOCK_NOTGRANTED) { ret = 0; goto done; } if (ret != 0) goto done; if ((ret = __qam_fput(dbp, cp->pgno, cp->page, put_mode)) != 0) goto done; cp->page = NULL; put_mode = 0; if ((ret = __qam_position(dbc, &first, QAM_READ, &exact)) != 0 || exact != 0) { (void)__LPUT(dbc, lock); goto done; } if ((ret =__LPUT(dbc, lock)) != 0) goto done; if ((ret = __LPUT(dbc, cp->lock)) != 0) goto done; } current = meta->cur_recno; wrapped = 0; if (first > current) wrapped = 1; rec_extent = meta->page_ext * meta->rec_page; /* Loop until we find a record or hit current */ for (;;) { /* * Check to see if we are moving off the extent * and remove the extent. * If we are moving off a page we need to * get rid of the buffer. * Wait for the lagging readers to move off the * page. */ if (cp->page != NULL && rec_extent != 0 && ((exact = (first % rec_extent == 0)) || first % meta->rec_page == 0 || first == UINT32_T_MAX)) { if (exact == 1 && (ret = __db_lget(dbc, 0, cp->pgno, DB_LOCK_WRITE, 0, &cp->lock)) != 0) break;#ifdef QDEBUG __db_logmsg(dbp->dbenv, dbc->txn, "Queue R", 0, "%x %d %d %d", dbc->locker, cp->pgno, first, meta->first_recno);#endif put_mode |= DB_MPOOL_DISCARD; if ((ret = __qam_fput(dbp, cp->pgno, cp->page, put_mode)) != 0) break; cp->page = NULL; if (exact == 1) { ret = __qam_fremove(dbp, cp->pgno); t_ret = __LPUT(dbc, cp->lock); } if (ret != 0) break; if (t_ret != 0) { ret = t_ret; break; } } else if (cp->page != NULL && (ret = __qam_fput(dbp, cp->pgno, cp->page, put_mode)) != 0) break; cp->page = NULL; first++; if (first == RECNO_OOB) { wrapped = 0; first++; } /* * LOOP EXIT when we come move to the current * pointer. */ if (!wrapped && first >= current) break; ret = __db_lget(dbc, 0, first, DB_LOCK_READ, DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock); if (ret == DB_LOCK_NOTGRANTED) { ret = 0; break; } if (ret != 0) break; if ((ret = __qam_position(dbc, &first, QAM_READ, &exact)) != 0) { (void)__LPUT(dbc, lock); break; } put_mode = 0; if ((ret =__LPUT(dbc, lock)) != 0 || (ret = __LPUT(dbc, cp->lock)) != 0 || exact) { if ((t_ret = __qam_fput(dbp, cp->pgno, cp->page, put_mode)) != 0 && ret == 0) ret = t_ret; cp->page = NULL; break; } } cp->pgno = save_page; cp->indx = save_indx; cp->recno = save_recno; cp->lock = save_lock; /* * We have advanced as far as we can. * Advance first_recno to this point. */ if (ret == 0 && meta->first_recno != first) {#ifdef QDEBUG __db_logmsg(dbp->dbenv, dbc->txn, "Queue M", 0, "%x %d %d %d", dbc->locker, cp->recno, first, meta->first_recno);#endif if (DBC_LOGGING(dbc)) if ((ret = __qam_incfirst_log(dbp, dbc->txn, &meta->dbmeta.lsn, 0, cp->recno, PGNO_BASE_MD)) != 0) goto done; meta->first_recno = first; (void)mpf->set(mpf, meta, DB_MPOOL_DIRTY); }done: return (ret);}static int__qam_bulk(dbc, data, flags) DBC *dbc; DBT *data; u_int32_t flags;{ DB *dbp; DB_LOCK metalock; DB_MPOOLFILE *mpf; PAGE *pg; QMETA *meta; QAMDATA *qp; QUEUE_CURSOR *cp; db_indx_t indx; db_pgno_t metapno; qam_position_mode mode; int32_t *endp, *offp; u_int8_t *dbuf, *dp, *np; int exact, recs, re_len, ret, t_ret, valid; int is_key, need_pg, pagesize, size, space; dbp = dbc->dbp; mpf = dbp->mpf; cp = (QUEUE_CURSOR *)dbc->internal; mode = QAM_READ; if (F_ISSET(dbc, DBC_RMW)) mode = QAM_WRITE; pagesize = dbp->pgsize; re_len = ((QUEUE *)dbp->q_internal)->re_len; recs = ((QUEUE *)dbp->q_internal)->rec_page; metapno = ((QUEUE *)dbp->q_internal)->q_meta; is_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1 : 0; size = 0; if ((ret = __db_lget(dbc, 0, metapno, DB_LOCK_READ, 0, &metalock)) != 0) return (ret); if ((ret = mpf->get(mpf, &metapno, 0, &meta)) != 0) { /* We did not fetch it, we can release the lock. */ (void)__LPUT(dbc, metalock); return (ret); } dbuf = data->data; np = dp = dbuf; /* Keep track of space that is left. There is an termination entry */ space = data->ulen; space -= sizeof(*offp); /* Build the offset/size table form the end up. */ endp = (int32_t *) ((u_int8_t *)dbuf + data->ulen); endp--; offp = endp;next_pg: if ((ret = __qam_position(dbc, &cp->recno, mode, &exact)) != 0) goto done; pg = cp->page; indx = cp->indx; need_pg = 1; do { /* * If this page is a nonexistent page at the end of an * extent, pg may be NULL. A NULL page has no valid records, * so just keep looping as though qp exists and isn't QAM_VALID; * calling QAM_GET_RECORD is unsafe. */ valid = 0; /* Wrap around, skipping zero. */ if (cp->recno == RECNO_OOB) cp->recno++; if (pg != NULL) { qp = QAM_GET_RECORD(dbp, pg, indx); if (F_ISSET(qp, QAM_VALID)) { valid = 1; space -= (is_key ? 3 : 2) * sizeof(*offp); if (space < 0) goto get_space; if (need_pg) { dp = np; size = pagesize - QPAGE_SZ(dbp); if (space < size) {get_space: if (offp == endp) { data->size = ALIGN(size + pagesize, sizeof(u_int32_t)); ret = ENOMEM; break; } if (indx != 0) indx--; cp->recno--; break; } memcpy(dp, (char *)pg + QPAGE_SZ(dbp), size); need_pg = 0; space -= size; np += size; } if (is_key) *offp-- = cp->recno; *offp-- = (int32_t)((u_int8_t*)qp - (u_int8_t*)pg - QPAGE_SZ(dbp) + dp - dbuf + SSZA(QAMDATA, data)); *offp-- = re_len; } } if (!valid && is_key == 0) { *offp-- = 0; *offp-- = 0; } cp->recno++; } while (++indx < recs && indx != RECNO_OOB && cp->recno != meta->cur_recno && !QAM_AFTER_CURRENT(meta, cp->recno)); if ((t_ret = __TLPUT(dbc, cp->lock)) != 0 && ret == 0) ret = t_ret; if (cp->page != NULL) { if ((t_ret = __qam_fput(dbp, cp->pgno, cp->page, 0)) != 0 && ret == 0) ret = t_ret; cp->page = NULL; } if (ret == 0 && (indx >= recs || indx == RECNO_OOB) && cp->recno != meta->cur_recno && !QAM_AFTER_CURRENT(meta, cp->recno)) goto next_pg; if (is_key == 1) *offp = RECNO_OOB; else *offp = -1;done: /* release the meta page */ t_ret = mpf->put(mpf, meta, 0); if (!ret) ret = t_ret; t_ret = __LPUT(dbc, metalock); return (ret);}/* * __qam_c_close -- * Close down the cursor from a single use. */static int__qam_c_close(dbc, root_pgno, rmroot) DBC *dbc; db_pgno_t root_pgno; int *rmroot;{ QUEUE_CURSOR *cp; COMPQUIET(root_pgno, 0); COMPQUIET(rmroot, NULL); cp = (QUEUE_CURSOR *)dbc->internal; /* Discard any locks not acquired inside of a transaction. */ (void)__TLPUT(dbc, cp->lock); LOCK_INIT(cp->lock); cp->page = NULL; cp->pgno = PGNO_INVALID; cp->indx = 0; cp->lock_mode = DB_LOCK_NG; cp->recno = RECNO_OOB; cp->flags = 0; return (0);}/* * __qam_c_dup -- * Duplicate a queue cursor, such that the new one holds appropriate * locks for the position of the original. * * PUBLIC: int __qam_c_dup __P((DBC *, DBC *)); */int__qam_c_dup(orig_dbc, new_dbc) DBC *orig_dbc, *new_dbc;{ QUEUE_CURSOR *orig, *new; orig = (QUEUE_CURSOR *)orig_dbc->internal; new = (QUEUE_CURSOR *)new_dbc->internal; new->recno = orig->recno; /* reget the long term lock if we are not in a xact */ if (orig_dbc->txn != NULL || !STD_LOCKING(orig_dbc) || !LOCK_ISSET(orig->lock)) return (0); return (__db_lget(new_dbc, 0, new->recno, new->lock_mode, DB_LOCK_RECORD, &new->lock));}/* * __qam_c_init * * PUBLIC: int __qam_c_init __P((DBC *)); */int__qam_c_init(dbc) DBC *dbc;{ QUEUE_CURSOR *cp; DB *dbp; int ret; dbp = dbc->dbp; /* Allocate the internal structure. */ cp = (QUEUE_CURSOR *)dbc->internal; if (cp == NULL) { if ((ret = __os_calloc(dbp->dbenv, 1, sizeof(QUEUE_CURSOR), &cp)) != 0) return (ret); dbc->internal = (DBC_INTERNAL *)cp; } /* Initialize methods. */ dbc->c_close = __db_c_close; dbc->c_count = __db_c_count; dbc->c_del = __db_c_del; dbc->c_dup = __db_c_dup; dbc->c_get = dbc->c_real_get = __db_c_get; dbc->c_pget = __db_c_pget; dbc->c_put = __db_c_put; dbc->c_am_bulk = __qam_bulk; dbc->c_am_close = __qam_c_close; dbc->c_am_del = __qam_c_del; dbc->c_am_destroy = __qam_c_destroy; dbc->c_am_get = __qam_c_get; dbc->c_am_put = __qam_c_put; dbc->c_am_writelock = NULL; return (0);}/* * __qam_c_destroy -- * Close a single cursor -- internal version. */static int__qam_c_destroy(dbc) DBC *dbc;{ /* Discard the structures. */ __os_free(dbc->dbp->dbenv, dbc->internal); return (0);}/* * __qam_getno -- * Check the user's record number. */static int__qam_getno(dbp, key, rep) DB *dbp; const DBT *key; db_recno_t *rep;{ if ((*rep = *(db_recno_t *)key->data) == 0) { __db_err(dbp->dbenv, "illegal record number of 0"); return (EINVAL); } return (0);}/* * __qam_truncate -- * Truncate a queue database * * PUBLIC: int __qam_truncate __P((DB *, DB_TXN *, u_int32_t *)); */int__qam_truncate(dbp, txn, countp) DB *dbp; DB_TXN *txn; u_int32_t *countp;{ DBC *dbc; DB_LOCK metalock; DB_MPOOLFILE *mpf; QMETA *meta; db_pgno_t metapno; int count, ret, t_ret; mpf = dbp->mpf; /* Acquire a cursor. */ if ((ret = dbp->cursor(dbp, txn, &dbc, 0)) != 0) return (ret); /* Walk the queue, counting rows. */ count = 0; while ((ret = __qam_c_get(dbc, NULL, NULL, DB_CONSUME, &metapno)) == 0) count++; if (ret == DB_NOTFOUND) ret = 0; /* Discard the cursor. */ if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0) ret = t_ret; if (ret != 0) return (ret); /* update the meta page */ /* get the meta page */ metapno = ((QUEUE *)dbp->q_internal)->q_meta; if ((ret = __db_lget(dbc, 0, metapno, DB_LOCK_WRITE, 0, &metalock)) != 0) return (ret); if ((ret = mpf->get(mpf, &metapno, 0, &meta)) != 0) { /* We did not fetch it, we can release the lock. */ (void)__LPUT(dbc, metalock); return (ret); } if (DBC_LOGGING(dbc)) { ret = __qam_mvptr_log(dbp, dbc->txn, &meta->dbmeta.lsn, 0, QAM_SETCUR | QAM_SETFIRST | QAM_TRUNCATE, meta->first_recno, 1, meta->cur_recno, 1, &meta->dbmeta.lsn, PGNO_BASE_MD); } if (ret == 0) meta->first_recno = meta->cur_recno = 1; if ((t_ret = mpf->put(mpf, meta, ret == 0 ? DB_MPOOL_DIRTY: 0)) != 0 && ret == 0) ret = t_ret; if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0) ret = t_ret; *countp = count; return (ret);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -