📄 persis~1.h
字号:
// Larbin// Sebastien Ailleret// 06-01-00 -> 31-05-00/* this fifo is stored on disk */#ifndef PERSFIFO_H#define PERSFIFO_H#include <sys/types.h>#include <dirent.h>#include <unistd.h>#include <sys/stat.h>#include "types.h"#include "xutils/connexion.h"#include "xutils/GenericFifo.h"template <class T>class PersistentFifo : public GenericFifo<T> { protected: pthread_mutex_t lock; pthread_cond_t nonEmpty; // number of the file used for reading int fin, fout; // name of files char *fileName; // Give a file name for this int char *giveName (uint nb); // Give a file name for this int int getNumber (char *file); // Change the file used for reading void updateRead (); // Change the file used for writing void updateWrite (); // global variables global *glob; // buffer used for readLine char outbuf[BUF_SIZE]; // number of char used in this buffer uint outbufPos; // buffer used for readLine char buf[BUF_SIZE]; // number of char used in this buffer uint bufPos, bufSize; // sockets for reading and writing int rfds, wfds; // read a line on rfds String *readLine (); // write an url in the out file (buffered write) void writeUrl (char *s); // Flush the out Buffer in the outFile void flushOut (); public: /* Specific constructor */ PersistentFifo (char *fileName, bool reload, global *glob); /* Destructor */ ~PersistentFifo (); /* get the first object */ T *get (); /* get the first object (non totally blocking) * return NULL if there is none */ T *tryGet (); /* add an object in the fifo */ void put (T *obj); /* add an object in the fifo */ void putForce (T *obj); /* how many items are there inside ? */ int getLength ();};template <class T>PersistentFifo<T>::PersistentFifo (char *fileName, bool reload, global *glob) { this->fileName = fileName; this->glob = glob; outbufPos = 0; bufPos = 0; bufSize = 0; pthread_mutex_init (&lock, NULL); pthread_cond_init (&nonEmpty, NULL); if (reload) { DIR *dir = opendir("."); struct dirent *name; fin = -1; fout = -1; name = readdir(dir); while (name != NULL) { if (startWith(fileName, name->d_name)) { int tmp = getNumber(name->d_name); if (fin == -1) { fin = tmp; fout = tmp; } else { if (tmp > fin) { fin = tmp; } if (tmp < fout) { fout = tmp; } } } name = readdir(dir); } closedir(dir); in = (fin - fout) * urlByFile; out = 0; char *fn = giveName(fin); wfds = creat (fn, S_IRUSR | S_IWUSR); delete [] fn; fn = giveName(fout); rfds = open (fn, O_RDONLY); delete [] fn; } else { // Delete old fifos DIR *dir = opendir("."); struct dirent *name; name = readdir(dir); while (name != NULL) { if (startWith(fileName, name->d_name)) { unlink(name->d_name); } name = readdir(dir); } closedir(dir); fin = 0; fout = 0; in = 0; out = 0; char *tmp = giveName(0); wfds = creat (tmp, S_IRUSR | S_IWUSR); rfds = open (tmp, O_RDONLY); delete [] tmp; }}template <class T>PersistentFifo<T>::~PersistentFifo () { pthread_mutex_destroy (&lock); pthread_cond_destroy (&nonEmpty); close(rfds); close(wfds);}template <class T>T *PersistentFifo<T>::get () { pthread_mutex_lock(&lock); while (in == out) { pthread_cond_wait(&nonEmpty, &lock); } String *line = readLine(); T *tmp = new T(line); out++; updateRead(); pthread_mutex_unlock(&lock); return tmp;}template <class T>T *PersistentFifo<T>::tryGet () { T *tmp = NULL; pthread_mutex_lock(&lock); if (in != out) { // The stack is not empty String *line = readLine(); tmp = new T(line); out++; updateRead(); } pthread_mutex_unlock(&lock); return tmp;}/** Put something in the fifo * The objet is then deleted */template <class T>void PersistentFifo<T>::put (T *obj) { pthread_mutex_lock(&lock); char *s = obj->serialize(wfds); writeUrl (s); delete [] s; if (in == out) { pthread_cond_broadcast(&nonEmpty); } in++; updateWrite(); pthread_mutex_unlock(&lock); delete obj;}template <class T>void PersistentFifo<T>::putForce (T *obj) { put(obj);}template <class T>int PersistentFifo<T>::getLength () { return in - out;}template <class T>char *PersistentFifo<T>::giveName (uint nb) { uint len = strlen(fileName); char *tmp = new char[len+7]; strcpy(tmp, fileName); tmp[len+6] = 0; for (uint i=len+5; i>=len; i--) { tmp[i] = (nb % 10) + '0'; nb /= 10; } return tmp;}template <class T>int PersistentFifo<T>::getNumber (char *file) { uint len = strlen(file); int res = 0; for (uint i=len-6; i<=len-1; i++) { res = (res * 10) + file[i] - '0'; } return res;}template <class T>void PersistentFifo<T>::updateRead () { if ((out % urlByFile) == 0) { close(rfds); char *tmp = giveName(fout); unlink(tmp); delete [] tmp; tmp = giveName(++fout); rfds = open(tmp, O_RDONLY); delete [] tmp; in -= out; out = 0; }}template <class T>void PersistentFifo<T>::updateWrite () { if ((in % urlByFile) == 0) { flushOut(); close(wfds); char *tmp = giveName(++fin); wfds = creat(tmp, S_IRUSR | S_IWUSR); delete [] tmp; glob->seen->save(); }}/* read a line from the file * uses a buffer */template <class T>String *PersistentFifo<T>::readLine () { // bufPos : Position of the beginning ofthe thing to reqd in the buffer // Size of things to read in the buffer String *res = new String; for (;;) { if (bufPos >= bufSize) { // there is nothing in the buffer bool noRead = true; bufPos = 0; while(noRead) { // Still nothing in the buffer bufSize = read(rfds, buf, BUF_SIZE); switch (bufSize) { case 0 : // We need to flush the output in order to read it flushOut(); break; case -1 : // We might have a trouble here if (errno != EINTR && errno != EIO) { cerr << "Big Problem while reading (persistentFifo.h)\n"; } else { cerr << "Problem while reading (persistentFifo.h)\n"; } break; default: noRead = false; break; } } } // Now there is something in the buffer uint tmp = bufPos; // Try to find an end of file while (bufPos<bufSize && buf[bufPos]!='\n') { bufPos++; } // Add what needed in the res res->addBuffer(buf+tmp, bufPos-tmp); if (bufPos < bufSize) { // The line is finished, return it (and delete the \n in the buffer) bufPos++; return res; } // The line is finished, go on }}// write an url in the out file (buffered write)template <class T>void PersistentFifo<T>::writeUrl (char *s) { size_t len = strlen(s); if (outbufPos + len < BUF_SIZE) { strncpy(outbuf + outbufPos, s, len); outbufPos += len; } else { // The buffer is full flushOut (); if (len < BUF_SIZE) { strncpy(outbuf + outbufPos, s, len); outbufPos = len; } else { cerr << "Warning : Very Big Url (more than " << BUF_SIZE << ")\n"; ecrireBuff (wfds, s, len); } }}// Flush the out Buffer in the outFiletemplate <class T>void PersistentFifo<T>::flushOut () { ecrireBuff (wfds, outbuf, outbufPos); outbufPos = 0;}#endif // PERSFIFO_H
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -