📄 test_async.c
字号:
/*** 2005 December 14**** The author disclaims copyright to this source code. In place of** a legal notice, here is a blessing:**** May you do good and not evil.** May you find forgiveness for yourself and forgive others.** May you share freely, never taking more than you give.******************************************************************************* This file contains an example implementation of an asynchronous IO ** backend for SQLite.**** WHAT IS ASYNCHRONOUS I/O?**** With asynchronous I/O, write requests are handled by a separate thread** running in the background. This means that the thread that initiates** a database write does not have to wait for (sometimes slow) disk I/O** to occur. The write seems to happen very quickly, though in reality** it is happening at its usual slow pace in the background.**** Asynchronous I/O appears to give better responsiveness, but at a price.** You lose the Durable property. With the default I/O backend of SQLite,** once a write completes, you know that the information you wrote is** safely on disk. With the asynchronous I/O, this is no the case. If** your program crashes or if you take a power lose after the database** write but before the asynchronous write thread has completed, then the** database change might never make it to disk and the next user of the** database might not see your change.**** You lose Durability with asynchronous I/O, but you still retain the** other parts of ACID: Atomic, Consistent, and Isolated. Many** appliations get along fine without the Durablity.**** HOW IT WORKS**** Asynchronous I/O works by overloading the OS-layer disk I/O routines** with modified versions that store the data to be written in queue of** pending write operations. Look at the asyncEnable() subroutine to see** how overloading works. Six os-layer routines are overloaded:**** sqlite3OsOpenReadWrite;** sqlite3OsOpenReadOnly;** sqlite3OsOpenExclusive;** sqlite3OsDelete;** sqlite3OsFileExists;** sqlite3OsSyncDirectory;**** The original implementations of these routines are saved and are** used by the writer thread to do the real I/O. The substitute** implementations typically put the I/O operation on a queue** to be handled later by the writer thread, though read operations** must be handled right away, obviously.**** Asynchronous I/O is disabled by setting the os-layer interface routines** back to their original values.**** LIMITATIONS**** This demonstration code is deliberately kept simple in order to keep** the main ideas clear and easy to understand. Real applications that** want to do asynchronous I/O might want to add additional capabilities.** For example, in this demonstration if writes are happening at a steady** stream that exceeds the I/O capability of the background writer thread,** the queue of pending write operations will grow without bound until we** run out of memory. Users of this technique may want to keep track of** the quantity of pending writes and stop accepting new write requests** when the buffer gets to be too big.*/#include "sqliteInt.h"#include "os.h"#include <tcl.h>/* If the THREADSAFE macro is not set, assume that it is turned off. */#ifndef THREADSAFE# define THREADSAFE 0#endif/*** This test uses pthreads and hence only works on unix and with** a threadsafe build of SQLite. It also requires that the redefinable** I/O feature of SQLite be turned on. This feature is turned off by** default. If a required element is missing, almost all of the code** in this file is commented out.*/#if OS_UNIX && THREADSAFE && defined(SQLITE_ENABLE_REDEF_IO)/*** This demo uses pthreads. If you do not have a pthreads implementation** for your operating system, you will need to recode the threading ** logic.*/#include <pthread.h>#include <sched.h>/* Useful macros used in several places */#define MIN(x,y) ((x)<(y)?(x):(y))#define MAX(x,y) ((x)>(y)?(x):(y))/* Forward references */typedef struct AsyncWrite AsyncWrite;typedef struct AsyncFile AsyncFile;/* Enable for debugging */static int sqlite3async_trace = 0;# define ASYNC_TRACE(X) if( sqlite3async_trace ) asyncTrace Xstatic void asyncTrace(const char *zFormat, ...){ char *z; va_list ap; va_start(ap, zFormat); z = sqlite3_vmprintf(zFormat, ap); va_end(ap); fprintf(stderr, "[%d] %s", (int)pthread_self(), z); sqlite3_free(z);}/*** THREAD SAFETY NOTES**** Basic rules:**** * Both read and write access to the global write-op queue must be ** protected by the async.queueMutex.**** * The file handles from the underlying system are assumed not to ** be thread safe.**** * See the last two paragraphs under "The Writer Thread" for** an assumption to do with file-handle synchronization by the Os.**** File system operations (invoked by SQLite thread):**** xOpenXXX (three versions)** xDelete** xFileExists** xSyncDirectory**** File handle operations (invoked by SQLite thread):**** asyncWrite, asyncClose, asyncTruncate, asyncSync, ** asyncSetFullSync, asyncOpenDirectory.** ** The operations above add an entry to the global write-op list. They** prepare the entry, acquire the async.queueMutex momentarily while** list pointers are manipulated to insert the new entry, then release** the mutex and signal the writer thread to wake up in case it happens** to be asleep.**** ** asyncRead, asyncFileSize.**** Read operations. Both of these read from both the underlying file** first then adjust their result based on pending writes in the ** write-op queue. So async.queueMutex is held for the duration** of these operations to prevent other threads from changing the** queue in mid operation.** **** asyncLock, asyncUnlock, asyncLockState, asyncCheckReservedLock** ** These primitives implement in-process locking using a hash table** on the file name. Files are locked correctly for connections coming** from the same process. But other processes cannot see these locks** and will therefore not honor them.****** asyncFileHandle.** ** The sqlite3OsFileHandle() function is currently only used when ** debugging the pager module. Unless sqlite3OsClose() is called on the** file (shouldn't be possible for other reasons), the underlying ** implementations are safe to call without grabbing any mutex. So we just** go ahead and call it no matter what any other threads are doing.**** ** asyncSeek.**** Calling this method just manipulates the AsyncFile.iOffset variable. ** Since this variable is never accessed by writer thread, this** function does not require the mutex. Actual calls to OsSeek() take ** place just before OsWrite() or OsRead(), which are always protected by ** the mutex.**** The writer thread:**** The async.writerMutex is used to make sure only there is only** a single writer thread running at a time.**** Inside the writer thread is a loop that works like this:**** WHILE (write-op list is not empty)** Do IO operation at head of write-op list** Remove entry from head of write-op list** END WHILE**** The async.queueMutex is always held during the <write-op list is ** not empty> test, and when the entry is removed from the head** of the write-op list. Sometimes it is held for the interim** period (while the IO is performed), and sometimes it is** relinquished. It is relinquished if (a) the IO op is an** ASYNC_CLOSE or (b) when the file handle was opened, two of** the underlying systems handles were opened on the same** file-system entry.**** If condition (b) above is true, then one file-handle ** (AsyncFile.pBaseRead) is used exclusively by sqlite threads to read the** file, the other (AsyncFile.pBaseWrite) by sqlite3_async_flush() ** threads to perform write() operations. This means that read ** operations are not blocked by asynchronous writes (although ** asynchronous writes may still be blocked by reads).**** This assumes that the OS keeps two handles open on the same file** properly in sync. That is, any read operation that starts after a** write operation on the same file system entry has completed returns** data consistent with the write. We also assume that if one thread ** reads a file while another is writing it all bytes other than the** ones actually being written contain valid data.**** If the above assumptions are not true, set the preprocessor symbol** SQLITE_ASYNC_TWO_FILEHANDLES to 0.*/#ifndef SQLITE_ASYNC_TWO_FILEHANDLES/* #define SQLITE_ASYNC_TWO_FILEHANDLES 0 */#define SQLITE_ASYNC_TWO_FILEHANDLES 1#endif/*** State information is held in the static variable "async" defined** as follows:*/static struct TestAsyncStaticData { pthread_mutex_t queueMutex; /* Mutex for access to write operation queue */ pthread_mutex_t writerMutex; /* Prevents multiple writer threads */ pthread_mutex_t lockMutex; /* For access to aLock hash table */ pthread_cond_t queueSignal; /* For waking up sleeping writer thread */ pthread_cond_t emptySignal; /* Notify when the write queue is empty */ AsyncWrite *pQueueFirst; /* Next write operation to be processed */ AsyncWrite *pQueueLast; /* Last write operation on the list */ Hash aLock; /* Files locked */ volatile int ioDelay; /* Extra delay between write operations */ volatile int writerHaltWhenIdle; /* Writer thread halts when queue empty */ volatile int writerHaltNow; /* Writer thread halts after next op */ int ioError; /* True if an IO error has occured */ int nFile; /* Number of open files (from sqlite pov) */} async = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_MUTEX_INITIALIZER, PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, PTHREAD_COND_INITIALIZER,};/* Possible values of AsyncWrite.op */#define ASYNC_NOOP 0#define ASYNC_WRITE 1#define ASYNC_SYNC 2#define ASYNC_TRUNCATE 3#define ASYNC_CLOSE 4#define ASYNC_OPENDIRECTORY 5#define ASYNC_SETFULLSYNC 6#define ASYNC_DELETE 7#define ASYNC_OPENEXCLUSIVE 8#define ASYNC_SYNCDIRECTORY 9/* Names of opcodes. Used for debugging only.** Make sure these stay in sync with the macros above!*/static const char *azOpcodeName[] = { "NOOP", "WRITE", "SYNC", "TRUNCATE", "CLOSE", "OPENDIR", "SETFULLSYNC", "DELETE", "OPENEX", "SYNCDIR",};/*** Entries on the write-op queue are instances of the AsyncWrite** structure, defined here.**** The interpretation of the iOffset and nByte variables varies depending ** on the value of AsyncWrite.op:**** ASYNC_WRITE:** iOffset -> Offset in file to write to.** nByte -> Number of bytes of data to write (pointed to by zBuf).**** ASYNC_SYNC:** iOffset -> Unused.** nByte -> Value of "fullsync" flag to pass to sqlite3OsSync().**** ASYNC_TRUNCATE:** iOffset -> Size to truncate file to.** nByte -> Unused.**** ASYNC_CLOSE:** iOffset -> Unused.** nByte -> Unused.**** ASYNC_OPENDIRECTORY:** iOffset -> Unused.** nByte -> Number of bytes of zBuf points to (directory name).**** ASYNC_SETFULLSYNC:** iOffset -> Unused.** nByte -> New value for the full-sync flag.****** ASYNC_DELETE:** iOffset -> Unused.** nByte -> Number of bytes of zBuf points to (file name).**** ASYNC_OPENEXCLUSIVE:** iOffset -> Value of "delflag".** nByte -> Number of bytes of zBuf points to (file name).****** For an ASYNC_WRITE operation, zBuf points to the data to write to the file. ** This space is sqliteMalloc()d along with the AsyncWrite structure in a** single blob, so is deleted when sqliteFree() is called on the parent ** structure.*/struct AsyncWrite { AsyncFile *pFile; /* File to write data to or sync */ int op; /* One of ASYNC_xxx etc. */ i64 iOffset; /* See above */ int nByte; /* See above */ char *zBuf; /* Data to write to file (or NULL if op!=ASYNC_WRITE) */ AsyncWrite *pNext; /* Next write operation (to any file) */};/* ** The AsyncFile structure is a subclass of OsFile used for asynchronous IO.*/struct AsyncFile { IoMethod *pMethod; /* Must be first */ i64 iOffset; /* Current seek() offset in file */ char *zName; /* Underlying OS filename - used for debugging */ int nName; /* Number of characters in zName */ OsFile *pBaseRead; /* Read handle to the underlying Os file */ OsFile *pBaseWrite; /* Write handle to the underlying Os file */};/*** Add an entry to the end of the global write-op list. pWrite should point ** to an AsyncWrite structure allocated using sqlite3OsMalloc(). The writer** thread will call sqlite3OsFree() to free the structure after the specified** operation has been completed.**** Once an AsyncWrite structure has been added to the list, it becomes the** property of the writer thread and must not be read or modified by the** caller. */static void addAsyncWrite(AsyncWrite *pWrite){ /* We must hold the queue mutex in order to modify the queue pointers */ pthread_mutex_lock(&async.queueMutex); /* Add the record to the end of the write-op queue */ assert( !pWrite->pNext ); if( async.pQueueLast ){ assert( async.pQueueFirst ); async.pQueueLast->pNext = pWrite; }else{ async.pQueueFirst = pWrite; } async.pQueueLast = pWrite; ASYNC_TRACE(("PUSH %p (%s %s %d)\n", pWrite, azOpcodeName[pWrite->op], pWrite->pFile ? pWrite->pFile->zName : "-", pWrite->iOffset)); if( pWrite->op==ASYNC_CLOSE ){ async.nFile--; if( async.nFile==0 ){ async.ioError = SQLITE_OK; } } /* Drop the queue mutex */ pthread_mutex_unlock(&async.queueMutex); /* The writer thread might have been idle because there was nothing ** on the write-op queue for it to do. So wake it up. */ pthread_cond_signal(&async.queueSignal);}/*** Increment async.nFile in a thread-safe manner.*/static void incrOpenFileCount(){ /* We must hold the queue mutex in order to modify async.nFile */ pthread_mutex_lock(&async.queueMutex); if( async.nFile==0 ){ async.ioError = SQLITE_OK; } async.nFile++; pthread_mutex_unlock(&async.queueMutex);}/*** This is a utility function to allocate and populate a new AsyncWrite** structure and insert it (via addAsyncWrite() ) into the global list.*/static int addNewAsyncWrite( AsyncFile *pFile, int op, i64 iOffset, int nByte, const char *zByte){ AsyncWrite *p; if( op!=ASYNC_CLOSE && async.ioError ){ return async.ioError; } p = sqlite3OsMalloc(sizeof(AsyncWrite) + (zByte?nByte:0)); if( !p ){ return SQLITE_NOMEM; } p->op = op; p->iOffset = iOffset; p->nByte = nByte; p->pFile = pFile; p->pNext = 0; if( zByte ){ p->zBuf = (char *)&p[1]; memcpy(p->zBuf, zByte, nByte);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -