📄 mp_sync.c
字号:
/* * Walk the array, writing buffers. When we write a buffer, we NULL * out its hash bucket pointer so we don't process a slot more than * once. */ for (remaining = ar_cnt, i = pass = 0; remaining > 0; ++i) { if (i >= ar_cnt) { i = 0; ++pass; __os_sleep(dbenv, 1, 0); } if ((hp = bharray[i].track_hp) == NULL) continue; /* Lock the hash bucket and find the buffer. */ mutexp = &hp->hash_mutex; MUTEX_LOCK(dbenv, mutexp); for (bhp = SH_TAILQ_FIRST(&hp->hash_bucket, __bh); bhp != NULL; bhp = SH_TAILQ_NEXT(bhp, hq, __bh)) if (bhp->pgno == bharray[i].track_pgno && bhp->mf_offset == bharray[i].track_off) break; /* * If we can't find the buffer we're done, somebody else had * to have written it. * * If the buffer isn't pinned or dirty, we're done, there's * no work needed. */ if (bhp == NULL || (bhp->ref == 0 && !F_ISSET(bhp, BH_DIRTY))) { MUTEX_UNLOCK(dbenv, mutexp); --remaining; bharray[i].track_hp = NULL; continue; } /* * If the buffer is locked by another thread, ignore it, we'll * come back to it. * * If the buffer is pinned and it's only the first or second * time we have looked at it, ignore it, we'll come back to * it. * * In either case, skip the buffer if we're not required to * write it. */ if (F_ISSET(bhp, BH_LOCKED) || (bhp->ref != 0 && pass < 2)) { MUTEX_UNLOCK(dbenv, mutexp); if (op != DB_SYNC_CACHE && op != DB_SYNC_FILE) { --remaining; bharray[i].track_hp = NULL; } continue; } /* * The buffer is either pinned or dirty. * * Set the sync wait-for count, used to count down outstanding * references to this buffer as they are returned to the cache. */ bhp->ref_sync = bhp->ref; /* Pin the buffer into memory and lock it. */ ++bhp->ref; F_SET(bhp, BH_LOCKED); MUTEX_LOCK(dbenv, &bhp->mutex); /* * Unlock the hash bucket and wait for the wait-for count to * go to 0. No new thread can acquire the buffer because we * have it locked. * * If a thread attempts to re-pin a page, the wait-for count * will never go to 0 (the thread spins on our buffer lock, * while we spin on the thread's ref count). Give up if we * don't get the buffer in 3 seconds, we can try again later. * * If, when the wait-for count goes to 0, the buffer is found * to be dirty, write it. */ MUTEX_UNLOCK(dbenv, mutexp); for (wait_cnt = 1; bhp->ref_sync != 0 && wait_cnt < 4; ++wait_cnt) __os_sleep(dbenv, 1, 0); MUTEX_LOCK(dbenv, mutexp); hb_lock = 1; /* * If the ref_sync count has gone to 0, we're going to be done * with this buffer no matter what happens. */ if (bhp->ref_sync == 0) { --remaining; bharray[i].track_hp = NULL; } /* * If the ref_sync count has gone to 0 and the buffer is still * dirty, we write it. We only try to write the buffer once. * Any process checkpointing or trickle-flushing the pool * must be able to write any underlying file -- if the write * fails, error out. It would be very strange if file sync * failed to write, but we don't care if it happens. */ if (bhp->ref_sync == 0 && F_ISSET(bhp, BH_DIRTY)) { hb_lock = 0; MUTEX_UNLOCK(dbenv, mutexp); mfp = R_ADDR(dbmp->reginfo, bhp->mf_offset); if ((ret = __memp_bhwrite(dbmp, hp, mfp, bhp, 1)) == 0) ++wrote; else if (op == DB_SYNC_CACHE || op == DB_SYNC_TRICKLE) __db_err(dbenv, "%s: unable to flush page: %lu", __memp_fns(dbmp, mfp), (u_long)bhp->pgno); else ret = 0; } /* * If ref_sync count never went to 0, the buffer was written * by another thread, or the write failed, we still have the * buffer locked. * * We may or may not currently hold the hash bucket mutex. If * the __memp_bhwrite -> __memp_pgwrite call was successful, * then __memp_pgwrite will have swapped the buffer lock for * the hash lock. All other call paths will leave us without * the hash bucket lock. * * The order of mutexes above was to acquire the buffer lock * while holding the hash bucket lock. Don't deadlock here, * release the buffer lock and then acquire the hash bucket * lock. */ if (F_ISSET(bhp, BH_LOCKED)) { F_CLR(bhp, BH_LOCKED); MUTEX_UNLOCK(dbenv, &bhp->mutex); if (!hb_lock) MUTEX_LOCK(dbenv, mutexp); } /* * Reset the ref_sync count regardless of our success, we're * done with this buffer for now. */ bhp->ref_sync = 0; /* Discard our reference and unlock the bucket. */ --bhp->ref; MUTEX_UNLOCK(dbenv, mutexp); if (ret != 0) break; }done: /* If we've opened files to flush pages, close them. */ if ((t_ret = __memp_close_flush_files(dbenv, dbmp)) != 0 && ret == 0) ret = t_ret; /* * If doing a checkpoint or flushing a file for the application, we * have to force the pages to disk. We don't do this as we go along * because we want to give the OS as much time as possible to lazily * flush, and because we have to flush files that might not even have * had dirty buffers in the cache, so we have to walk the files list. */ if (ret == 0 && (op == DB_SYNC_CACHE || op == DB_SYNC_FILE)) { if (dbmfp == NULL) ret = __memp_sync_files(dbenv, dbmp); else ret = __os_fsync(dbenv, dbmfp->fhp); }err: __os_free(dbenv, bharray); if (wrotep != NULL) *wrotep = wrote; return (ret);}/* * __memp_sync_files -- * Sync all the files in the environment, open or not. */staticint __memp_sync_files(dbenv, dbmp) DB_ENV *dbenv; DB_MPOOL *dbmp;{ DB_MPOOLFILE *dbmfp; MPOOL *mp; MPOOLFILE *mfp; int ret, t_ret; ret = 0; mp = dbmp->reginfo[0].primary; R_LOCK(dbenv, dbmp->reginfo); for (mfp = SH_TAILQ_FIRST(&mp->mpfq, __mpoolfile); mfp != NULL; mfp = SH_TAILQ_NEXT(mfp, q, __mpoolfile)) { if (mfp->stat.st_page_out == 0 || F_ISSET(mfp, MP_DEADFILE | MP_TEMP)) continue; /* Look for an already open handle. */ ret = 0; MUTEX_THREAD_LOCK(dbenv, dbmp->mutexp); for (dbmfp = TAILQ_FIRST(&dbmp->dbmfq); dbmfp != NULL; dbmfp = TAILQ_NEXT(dbmfp, q)) if (dbmfp->mfp == mfp) { ret = __os_fsync(dbenv, dbmfp->fhp); break; } MUTEX_THREAD_UNLOCK(dbenv, dbmp->mutexp); if (ret != 0) goto err; /* If we don't find one, open one. */ if (dbmfp == NULL) { if ((ret = dbenv->memp_fcreate(dbenv, &dbmfp, 0)) != 0) goto err; ret = __memp_fopen_int( dbmfp, mfp, R_ADDR(dbmp->reginfo, mfp->path_off), 0, 0, mfp->stat.st_pagesize); if (ret == 0) ret = __os_fsync(dbenv, dbmfp->fhp); if ((t_ret = __memp_fclose_int(dbmfp, 0)) != 0 && ret == 0) ret = t_ret; if (ret != 0) goto err; } } if (0) {err: __db_err(dbenv, "%s: cannot sync: %s", R_ADDR(dbmp->reginfo, mfp->path_off), db_strerror(ret)); } R_UNLOCK(dbenv, dbmp->reginfo); return (ret);}/* * __memp_close_flush_files -- * Close files opened only to flush buffers. */static int__memp_close_flush_files(dbenv, dbmp) DB_ENV *dbenv; DB_MPOOL *dbmp;{ DB_MPOOLFILE *dbmfp; int ret; /* * The routine exists because we must close files opened by sync to * flush buffers. There are two cases: first, extent files have to * be closed so they may be removed when empty. Second, regular * files have to be closed so we don't run out of descriptors (for * example, and application partitioning its data into databases * based on timestamps, so there's a continually increasing set of * files). * * We mark files opened in the __memp_bhwrite() function with the * MP_FLUSH flag. Here we walk through our file descriptor list, * and, if a file was opened by __memp_bhwrite(), we close it. */retry: MUTEX_THREAD_LOCK(dbenv, dbmp->mutexp); for (dbmfp = TAILQ_FIRST(&dbmp->dbmfq); dbmfp != NULL; dbmfp = TAILQ_NEXT(dbmfp, q)) if (F_ISSET(dbmfp, MP_FLUSH)) { F_CLR(dbmfp, MP_FLUSH); MUTEX_THREAD_UNLOCK(dbenv, dbmp->mutexp); if ((ret = __memp_fclose_int(dbmfp, 0)) != 0) return (ret); goto retry; } MUTEX_THREAD_UNLOCK(dbenv, dbmp->mutexp); return (0);}static int__bhcmp(p1, p2) const void *p1, *p2;{ BH_TRACK *bhp1, *bhp2; bhp1 = (BH_TRACK *)p1; bhp2 = (BH_TRACK *)p2; /* Sort by file (shared memory pool offset). */ if (bhp1->track_off < bhp2->track_off) return (-1); if (bhp1->track_off > bhp2->track_off) return (1); /* * !!! * Defend against badly written quicksort code calling the comparison * function with two identical pointers (e.g., WATCOM C++ (Power++)). */ if (bhp1->track_pgno < bhp2->track_pgno) return (-1); if (bhp1->track_pgno > bhp2->track_pgno) return (1); return (0);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -