📄 transaction.c
字号:
if (transaction_write(tdb, size, NULL, addition) != 0) { return -1; } return 0;}/* brlock during a transaction - ignore them*/int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, int rw_type, int lck_type, int probe, size_t len){ return 0;}static const struct tdb_methods transaction_methods = { transaction_read, transaction_write, transaction_next_hash_chain, transaction_oob, transaction_expand_file, transaction_brlock};/* start a tdb transaction. No token is returned, as only a single transaction is allowed to be pending per tdb_context*/int tdb_transaction_start(struct tdb_context *tdb){ /* some sanity checks */ if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) { TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n")); tdb->ecode = TDB_ERR_EINVAL; return -1; } /* cope with nested tdb_transaction_start() calls */ if (tdb->transaction != NULL) { tdb->transaction->nesting++; TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", tdb->transaction->nesting)); return 0; } if (tdb->num_locks != 0 || tdb->global_lock.count) { /* the caller must not have any locks when starting a transaction as otherwise we'll be screwed by lack of nested locks in posix */ TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n")); tdb->ecode = TDB_ERR_LOCK; return -1; } if (tdb->travlocks.next != NULL) { /* you cannot use transactions inside a traverse (although you can use traverse inside a transaction) as otherwise you can end up with deadlock */ TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n")); tdb->ecode = TDB_ERR_LOCK; return -1; } tdb->transaction = (struct tdb_transaction *) calloc(sizeof(struct tdb_transaction), 1); if (tdb->transaction == NULL) { tdb->ecode = TDB_ERR_OOM; return -1; } /* get the transaction write lock. This is a blocking lock. As discussed with Volker, there are a number of ways we could make this async, which we will probably do in the future */ if (tdb_brlock(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) { TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get transaction lock\n")); tdb->ecode = TDB_ERR_LOCK; SAFE_FREE(tdb->transaction); return -1; } /* get a read lock from the freelist to the end of file. This is upgraded to a write lock during the commit */ if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) { TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n")); tdb->ecode = TDB_ERR_LOCK; goto fail; } /* setup a copy of the hash table heads so the hash scan in traverse can be fast */ tdb->transaction->hash_heads = (u32 *) calloc(tdb->header.hash_size+1, sizeof(u32)); if (tdb->transaction->hash_heads == NULL) { tdb->ecode = TDB_ERR_OOM; goto fail; } if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads, TDB_HASHTABLE_SIZE(tdb), 0) != 0) { TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n")); tdb->ecode = TDB_ERR_IO; goto fail; } /* make sure we know about any file expansions already done by anyone else */ tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1); tdb->transaction->old_map_size = tdb->map_size; /* finally hook the io methods, replacing them with transaction specific methods */ tdb->transaction->io_methods = tdb->methods; tdb->methods = &transaction_methods; /* by calling this transaction write here, we ensure that we don't grow the transaction linked list due to hash table updates */ if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads, TDB_HASHTABLE_SIZE(tdb)) != 0) { TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n")); tdb->ecode = TDB_ERR_IO; goto fail; } return 0; fail: tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0); tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1); SAFE_FREE(tdb->transaction->hash_heads); SAFE_FREE(tdb->transaction); return -1;}/* cancel the current transaction*/int tdb_transaction_cancel(struct tdb_context *tdb){ if (tdb->transaction == NULL) { TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n")); return -1; } if (tdb->transaction->nesting != 0) { tdb->transaction->transaction_error = 1; tdb->transaction->nesting--; return 0; } tdb->map_size = tdb->transaction->old_map_size; /* free all the transaction elements */ while (tdb->transaction->elements) { struct tdb_transaction_el *el = tdb->transaction->elements; tdb->transaction->elements = el->next; free(el->data); free(el); } /* remove any global lock created during the transaction */ if (tdb->global_lock.count != 0) { tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size); tdb->global_lock.count = 0; } /* remove any locks created during the transaction */ if (tdb->num_locks != 0) { int i; for (i=0;i<tdb->num_lockrecs;i++) { tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list, F_UNLCK,F_SETLKW, 0, 1); } tdb->num_locks = 0; tdb->num_lockrecs = 0; SAFE_FREE(tdb->lockrecs); } /* restore the normal io methods */ tdb->methods = tdb->transaction->io_methods; tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0); tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1); SAFE_FREE(tdb->transaction->hash_heads); SAFE_FREE(tdb->transaction); return 0;}/* sync to disk*/static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length){ if (fsync(tdb->fd) != 0) { tdb->ecode = TDB_ERR_IO; TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n")); return -1; }#ifdef MS_SYNC if (tdb->map_ptr) { tdb_off_t moffset = offset & ~(tdb->page_size-1); if (msync(moffset + (char *)tdb->map_ptr, length + (offset - moffset), MS_SYNC) != 0) { tdb->ecode = TDB_ERR_IO; TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n", strerror(errno))); return -1; } }#endif return 0;}/* work out how much space the linearised recovery data will consume*/static tdb_len_t tdb_recovery_size(struct tdb_context *tdb){ struct tdb_transaction_el *el; tdb_len_t recovery_size = 0; recovery_size = sizeof(u32); for (el=tdb->transaction->elements;el;el=el->next) { if (el->offset >= tdb->transaction->old_map_size) { continue; } recovery_size += 2*sizeof(tdb_off_t) + el->length; } return recovery_size;}/* allocate the recovery area, or use an existing recovery area if it is large enough*/static int tdb_recovery_allocate(struct tdb_context *tdb, tdb_len_t *recovery_size, tdb_off_t *recovery_offset, tdb_len_t *recovery_max_size){ struct list_struct rec; const struct tdb_methods *methods = tdb->transaction->io_methods; tdb_off_t recovery_head; if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) { TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n")); return -1; } rec.rec_len = 0; if (recovery_head != 0 && methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) { TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n")); return -1; } *recovery_size = tdb_recovery_size(tdb); if (recovery_head != 0 && *recovery_size <= rec.rec_len) { /* it fits in the existing area */ *recovery_max_size = rec.rec_len; *recovery_offset = recovery_head; return 0; } /* we need to free up the old recovery area, then allocate a new one at the end of the file. Note that we cannot use tdb_allocate() to allocate the new one as that might return us an area that is being currently used (as of the start of the transaction) */ if (recovery_head != 0) { if (tdb_free(tdb, recovery_head, &rec) == -1) { TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n")); return -1; } } /* the tdb_free() call might have increased the recovery size */ *recovery_size = tdb_recovery_size(tdb); /* round up to a multiple of page size */ *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec); *recovery_offset = tdb->map_size; recovery_head = *recovery_offset; if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, (tdb->map_size - tdb->transaction->old_map_size) + sizeof(rec) + *recovery_max_size) == -1) { TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n")); return -1; } /* remap the file (if using mmap) */ methods->tdb_oob(tdb, tdb->map_size + 1, 1); /* we have to reset the old map size so that we don't try to expand the file again in the transaction commit, which would destroy the recovery area */ tdb->transaction->old_map_size = tdb->map_size; /* write the recovery header offset and sync - we can sync without a race here as the magic ptr in the recovery record has not been set */ CONVERT(recovery_head); if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) { TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n")); return -1; } return 0;}/* setup the recovery data that will be used on a crash during commit*/static int transaction_setup_recovery(struct tdb_context *tdb, tdb_off_t *magic_offset){ struct tdb_transaction_el *el; tdb_len_t recovery_size; unsigned char *data, *p; const struct tdb_methods *methods = tdb->transaction->io_methods; struct list_struct *rec; tdb_off_t recovery_offset, recovery_max_size; tdb_off_t old_map_size = tdb->transaction->old_map_size; u32 magic, tailer; /* check that the recovery area has enough space */ if (tdb_recovery_allocate(tdb, &recovery_size, &recovery_offset, &recovery_max_size) == -1) { return -1; } data = (unsigned char *)malloc(recovery_size + sizeof(*rec)); if (data == NULL) { tdb->ecode = TDB_ERR_OOM; return -1; } rec = (struct list_struct *)data; memset(rec, 0, sizeof(*rec)); rec->magic = 0; rec->data_len = recovery_size; rec->rec_len = recovery_max_size; rec->key_len = old_map_size;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -