📄 qam.c
字号:
/*- * See the file LICENSE for redistribution information. * * Copyright (c) 1999-2002 * Sleepycat Software. All rights reserved. */#include "db_config.h"#ifndef lintstatic const char revid[] = "$Id: qam.c,v 11.134 2002/08/13 20:46:08 ubell Exp $";#endif /* not lint */#ifndef NO_SYSTEM_INCLUDES#include <sys/types.h>#include <string.h>#endif#include "db_int.h"#include "dbinc/db_page.h"#include "dbinc/db_shash.h"#include "dbinc/btree.h"#include "dbinc/lock.h"#include "dbinc/log.h"#include "dbinc/qam.h"static int __qam_bulk __P((DBC *, DBT *, u_int32_t));static int __qam_c_close __P((DBC *, db_pgno_t, int *));static int __qam_c_del __P((DBC *));static int __qam_c_destroy __P((DBC *));static int __qam_c_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));static int __qam_c_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));static int __qam_consume __P((DBC *, QMETA *, db_recno_t));static int __qam_getno __P((DB *, const DBT *, db_recno_t *));/* * __qam_position -- * Position a queued access method cursor at a record. This returns * the page locked. *exactp will be set if the record is valid. * PUBLIC: int __qam_position * PUBLIC: __P((DBC *, db_recno_t *, qam_position_mode, int *)); */int__qam_position(dbc, recnop, mode, exactp) DBC *dbc; /* open cursor */ db_recno_t *recnop; /* pointer to recno to find */ qam_position_mode mode;/* locking: read or write */ int *exactp; /* indicate if it was found */{ QUEUE_CURSOR *cp; DB *dbp; QAMDATA *qp; db_pgno_t pg; int ret; dbp = dbc->dbp; cp = (QUEUE_CURSOR *)dbc->internal; /* Fetch the page for this recno. */ pg = QAM_RECNO_PAGE(dbp, *recnop); if ((ret = __db_lget(dbc, 0, pg, mode == QAM_READ ? DB_LOCK_READ : DB_LOCK_WRITE, 0, &cp->lock)) != 0) return (ret); cp->page = NULL; *exactp = 0; if ((ret = __qam_fget(dbp, &pg, mode == QAM_WRITE ? DB_MPOOL_CREATE : 0, &cp->page)) != 0) { /* We did not fetch it, we can release the lock. */ (void)__LPUT(dbc, cp->lock); if (mode != QAM_WRITE && (ret == DB_PAGE_NOTFOUND || ret == ENOENT)) return (0); return (ret); } cp->pgno = pg; cp->indx = QAM_RECNO_INDEX(dbp, pg, *recnop); if (PGNO(cp->page) == 0) { if (F_ISSET(dbp, DB_AM_RDONLY)) { *exactp = 0; return (0); } PGNO(cp->page) = pg; TYPE(cp->page) = P_QAMDATA; } qp = QAM_GET_RECORD(dbp, cp->page, cp->indx); *exactp = F_ISSET(qp, QAM_VALID) ? 1 : 0; return (ret);}/* * __qam_pitem -- * Put an item on a queue page. Copy the data to the page and set the * VALID and SET bits. If logging and the record was previously set, * log that data, otherwise just log the new data. * * pagep must be write locked * * PUBLIC: int __qam_pitem * PUBLIC: __P((DBC *, QPAGE *, u_int32_t, db_recno_t, DBT *)); */int__qam_pitem(dbc, pagep, indx, recno, data) DBC *dbc; QPAGE *pagep; u_int32_t indx; db_recno_t recno; DBT *data;{ DB *dbp; DBT olddata, pdata, *datap; QAMDATA *qp; QUEUE *t; u_int32_t alloced; u_int8_t *dest, *p; int ret; alloced = ret = 0; dbp = dbc->dbp; t = (QUEUE *)dbp->q_internal; if (data->size > t->re_len) goto len_err; qp = QAM_GET_RECORD(dbp, pagep, indx); p = qp->data; datap = data; if (F_ISSET(data, DB_DBT_PARTIAL)) { if (data->doff + data->dlen > t->re_len) { alloced = data->dlen; goto len_err; } if (data->size != data->dlen) {len_err: __db_err(dbp->dbenv, "Length improper for fixed length record %lu", (u_long)(alloced ? alloced : data->size)); return (EINVAL); } if (data->size == t->re_len) goto no_partial; /* * If we are logging, then we have to build the record * first, otherwise, we can simply drop the change * directly on the page. After this clause, make * sure that datap and p are set up correctly so that * copying datap into p does the right thing. * * Note, I am changing this so that if the existing * record is not valid, we create a complete record * to log so that both this and the recovery code is simpler. */ if (DBC_LOGGING(dbc) || !F_ISSET(qp, QAM_VALID)) { datap = &pdata; memset(datap, 0, sizeof(*datap)); if ((ret = __os_malloc(dbp->dbenv, t->re_len, &datap->data)) != 0) return (ret); alloced = 1; datap->size = t->re_len; /* * Construct the record if it's valid, otherwise set it * all to the pad character. */ dest = datap->data; if (F_ISSET(qp, QAM_VALID)) memcpy(dest, p, t->re_len); else memset(dest, t->re_pad, t->re_len); dest += data->doff; memcpy(dest, data->data, data->size); } else { datap = data; p += data->doff; } }no_partial: if (DBC_LOGGING(dbc)) { olddata.size = 0; if (F_ISSET(qp, QAM_SET)) { olddata.data = qp->data; olddata.size = t->re_len; } if ((ret = __qam_add_log(dbp, dbc->txn, &LSN(pagep), 0, &LSN(pagep), pagep->pgno, indx, recno, datap, qp->flags, olddata.size == 0 ? NULL : &olddata)) != 0) goto err; } F_SET(qp, QAM_VALID | QAM_SET); memcpy(p, datap->data, datap->size); if (!F_ISSET(data, DB_DBT_PARTIAL)) memset(p + datap->size, t->re_pad, t->re_len - datap->size);err: if (alloced) __os_free(dbp->dbenv, datap->data); return (ret);}/* * __qam_c_put * Cursor put for queued access method. * BEFORE and AFTER cannot be specified. */static int__qam_c_put(dbc, key, data, flags, pgnop) DBC *dbc; DBT *key, *data; u_int32_t flags; db_pgno_t *pgnop;{ DB *dbp; DB_LOCK lock; DB_MPOOLFILE *mpf; QMETA *meta; QUEUE_CURSOR *cp; db_pgno_t pg; db_recno_t new_cur, new_first; u_int32_t opcode; int exact, ret, t_ret; dbp = dbc->dbp; mpf = dbp->mpf; if (pgnop != NULL) *pgnop = PGNO_INVALID; cp = (QUEUE_CURSOR *)dbc->internal; switch (flags) { case DB_KEYFIRST: case DB_KEYLAST: if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0) return (ret); /* FALLTHROUGH */ case DB_CURRENT: break; default: /* The interface shouldn't let anything else through. */ DB_ASSERT(0); return (__db_ferr(dbp->dbenv, "__qam_c_put", flags)); } /* Write lock the record. */ if ((ret = __db_lget(dbc, 0, cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0) return (ret); if ((ret = __qam_position(dbc, &cp->recno, QAM_WRITE, &exact)) != 0) { /* We could not get the page, we can release the record lock. */ __LPUT(dbc, lock); return (ret); } /* Put the item on the page. */ ret = __qam_pitem(dbc, (QPAGE *)cp->page, cp->indx, cp->recno, data); /* Doing record locking, release the page lock */ if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0) ret = t_ret; if ((t_ret = __qam_fput( dbp, cp->pgno, cp->page, DB_MPOOL_DIRTY)) != 0 && ret == 0) ret = t_ret; cp->page = NULL; cp->lock = lock; cp->lock_mode = DB_LOCK_WRITE; if (ret != 0) return (ret); /* We may need to reset the head or tail of the queue. */ pg = ((QUEUE *)dbp->q_internal)->q_meta; /* * Get the meta page first, we don't want to write lock it while * trying to pin it. */ if ((ret = mpf->get(mpf, &pg, 0, &meta)) != 0) return (ret); if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0) { (void)mpf->put(mpf, meta, 0); return (ret); } opcode = 0; new_cur = new_first = 0; /* * If the put address is outside the queue, adjust the head and * tail of the queue. If the order is inverted we move * the one which is closer. The first case is when the * queue is empty, move first and current to where the new * insert is. */ if (meta->first_recno == meta->cur_recno) { new_first = cp->recno; new_cur = cp->recno + 1; if (new_cur == RECNO_OOB) new_cur++; opcode |= QAM_SETFIRST; opcode |= QAM_SETCUR; } else { if (QAM_BEFORE_FIRST(meta, cp->recno) && (meta->first_recno <= meta->cur_recno || meta->first_recno - cp->recno < cp->recno - meta->cur_recno)) { new_first = cp->recno; opcode |= QAM_SETFIRST; } if (meta->cur_recno == cp->recno || (QAM_AFTER_CURRENT(meta, cp->recno) && (meta->first_recno <= meta->cur_recno || cp->recno - meta->cur_recno <= meta->first_recno - cp->recno))) { new_cur = cp->recno + 1; if (new_cur == RECNO_OOB) new_cur++; opcode |= QAM_SETCUR; } } if (opcode != 0 && DBC_LOGGING(dbc)) { ret = __qam_mvptr_log(dbp, dbc->txn, &meta->dbmeta.lsn, 0, opcode, meta->first_recno, new_first, meta->cur_recno, new_cur, &meta->dbmeta.lsn, PGNO_BASE_MD); if (ret != 0) opcode = 0; } if (opcode & QAM_SETCUR) meta->cur_recno = new_cur; if (opcode & QAM_SETFIRST) meta->first_recno = new_first; if ((t_ret = mpf->put( mpf, meta, opcode != 0 ? DB_MPOOL_DIRTY : 0)) != 0 && ret == 0) ret = t_ret; /* Don't hold the meta page long term. */ if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0) ret = t_ret; return (ret);}/* * __qam_append -- * Perform a put(DB_APPEND) in queue. * * PUBLIC: int __qam_append __P((DBC *, DBT *, DBT *)); */int__qam_append(dbc, key, data) DBC *dbc; DBT *key, *data;{ DB *dbp; DB_LOCK lock; DB_MPOOLFILE *mpf; QMETA *meta; QPAGE *page; QUEUE *qp; QUEUE_CURSOR *cp; db_pgno_t pg; db_recno_t recno; int ret, t_ret; dbp = dbc->dbp; mpf = dbp->mpf; cp = (QUEUE_CURSOR *)dbc->internal; pg = ((QUEUE *)dbp->q_internal)->q_meta; /* * Get the meta page first, we don't want to write lock it while * trying to pin it. */ if ((ret = mpf->get(mpf, &pg, 0, &meta)) != 0) return (ret); /* Write lock the meta page. */ if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0) { (void)mpf->put(mpf, meta, 0); return (ret); } /* Get the next record number. */ recno = meta->cur_recno; meta->cur_recno++; if (meta->cur_recno == RECNO_OOB) meta->cur_recno++; if (meta->cur_recno == meta->first_recno) { meta->cur_recno--; if (meta->cur_recno == RECNO_OOB) meta->cur_recno--; (void)__LPUT(dbc, lock); ret = EFBIG; goto err; } if (QAM_BEFORE_FIRST(meta, recno)) meta->first_recno = recno; /* Lock the record and release meta page lock. */ if ((ret = __db_lget(dbc, LCK_COUPLE_ALWAYS, recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0) { (void)__LPUT(dbc, lock); goto err; } /* * The application may modify the data based on the selected record * number. */ if (dbc->dbp->db_append_recno != NULL && (ret = dbc->dbp->db_append_recno(dbc->dbp, data, recno)) != 0) { (void)__LPUT(dbc, lock); goto err; } cp->lock = lock; cp->lock_mode = DB_LOCK_WRITE; pg = QAM_RECNO_PAGE(dbp, recno); /* Fetch and write lock the data page. */ if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0) goto err; if ((ret = __qam_fget(dbp, &pg, DB_MPOOL_CREATE, &page)) != 0) { /* We did not fetch it, we can release the lock. */ (void)__LPUT(dbc, lock); goto err; } /* See if this is a new page. */ if (page->pgno == 0) { page->pgno = pg; page->type = P_QAMDATA; } /* Put the item on the page and log it. */ ret = __qam_pitem(dbc, page, QAM_RECNO_INDEX(dbp, pg, recno), recno, data); /* Doing record locking, release the page lock */ if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0) ret = t_ret; if ((t_ret = __qam_fput(dbp, pg, page, DB_MPOOL_DIRTY)) != 0 && ret == 0) ret = t_ret; /* Return the record number to the user. */ if (ret == 0) ret = __db_retcopy(dbp->dbenv, key, &recno, sizeof(recno), &dbc->rkey->data, &dbc->rkey->ulen); /* Position the cursor on this record. */ cp->recno = recno; /* See if we are leaving the extent. */ qp = (QUEUE *) dbp->q_internal; if (qp->page_ext != 0 && (recno % (qp->page_ext * qp->rec_page) == 0 || recno == UINT32_T_MAX)) { if ((ret = __db_lget(dbc, 0, ((QUEUE *)dbp->q_internal)->q_meta, DB_LOCK_WRITE, 0, &lock)) != 0) goto err; if (!QAM_AFTER_CURRENT(meta, recno)) ret = __qam_fclose(dbp, pg); (void)__LPUT(dbc, lock); }err: /* Release the meta page. */ if ((t_ret = mpf->put(mpf, meta, DB_MPOOL_DIRTY)) != 0 && ret == 0) ret = t_ret; return (ret);}/* * __qam_c_del -- * Qam cursor->am_del function */static int__qam_c_del(dbc) DBC *dbc;{ DB *dbp; DBT data; DB_LOCK lock; DB_MPOOLFILE *mpf; PAGE *pagep; QAMDATA *qp; QMETA *meta; QUEUE_CURSOR *cp; db_pgno_t pg; db_recno_t first; int exact, ret, t_ret; dbp = dbc->dbp; mpf = dbp->mpf; cp = (QUEUE_CURSOR *)dbc->internal; pg = ((QUEUE *)dbp->q_internal)->q_meta; /* * Get the meta page first, we don't want to write lock it while * trying to pin it. */ if ((ret = mpf->get(mpf, &pg, 0, &meta)) != 0) return (ret); /* Write lock the meta page. */ if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_READ, 0, &lock)) != 0) { (void)mpf->put(mpf, meta, 0); return (ret); } if (QAM_NOT_VALID(meta, cp->recno)) ret = DB_NOTFOUND; first = meta->first_recno; /* Don't hold the meta page long term. */ if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0) ret = t_ret; if (ret != 0)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -