📄 mms_queue.c
字号:
do {
Octstr *tmp;
xqf = octstr_format("%cf%ld.%d.x%d.%ld",
MQF,
(long)time(NULL) % 10000,
(++ect % 10000), getpid()%1000, random() % 100);
tmp = octstr_format("%.64s/%s%S", mms_queuedir, subdir, xqf);
ctmp = octstr_get_cstr(tmp);
fd = open(ctmp, O_RDWR|O_CREAT|O_EXCL, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
if (fd >= 0 &&
mm_lockfile(fd,ctmp,1) != 0) {
unlink(ctmp);
close(fd);
fd = -1;
}
octstr_destroy(tmp);
if (fd >= 0)
break;
octstr_destroy(xqf);
xqf = NULL;
} while (i++ < MAXTRIES);
if (fd >= 0)
strncpy(qf, octstr_get_cstr(xqf), QFNAMEMAX);
if (xqf) octstr_destroy(xqf);
return fd;
}
static int writemmsdata(Octstr *ms, char *df, char subdir[], char *mms_queuedir)
{
Octstr *dfname;
int fd, n, res = 0;
dfname = octstr_format("%s/%s%s", mms_queuedir, subdir, df);
fd = open(octstr_get_cstr(dfname),
O_WRONLY|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
if (fd < 0) {
error(0, "mms_queuadd: Failed to open data file %s: error = %s\n",
octstr_get_cstr(dfname), strerror(errno));
res = -1;
goto done;
}
n = octstr_write_to_socket(fd, ms);
close(fd);
if (n != 0) {
error(0, "mms_queuadd: Failed to write data file %s: error = %s\n",
octstr_get_cstr(dfname), strerror(errno));
unlink(octstr_get_cstr(dfname));
res = -1;
}
done:
octstr_destroy(dfname);
return res;
}
static Octstr *copy_and_clean_address(Octstr *addr)
{
Octstr *s = octstr_duplicate(addr);
int k, i;
octstr_strip_blanks(s);
/* Only clean up email addresses for now. */
if ((k = octstr_search_char(s, '@',0)) < 0)
goto done;
/* Find '<' if any, then find last one and remove everything else. */
i = octstr_search_char(s, '<',0);
if (i >= 0) {
int j;
octstr_delete(s, 0, i+1); /* strip first part off. */
j = octstr_search_char(s, '>',0);
if (j >= 0)
octstr_delete(s, j, octstr_len(s));
} else {
/* remove everything after the domain. */
int n = octstr_len(s);
char *p = octstr_get_cstr(s);
for (i = k+1; i < n; i++)
if (isspace(p[i])) { /* there can't be space in domain, so this marks end of address. */
octstr_delete(s, i, n);
break;
}
}
done:
if (octstr_len(s) == 0) {
octstr_destroy(s);
s = NULL;
}
return s;
}
Octstr *mms_queue_add(Octstr *from, List *to,
Octstr *subject,
Octstr *fromproxy, Octstr *viaproxy,
time_t senddate, time_t expirydate, MmsMsg *m, Octstr *token,
Octstr *vaspid, Octstr *vasid,
Octstr *url1, Octstr *url2,
List *hdrs,
int dlr,
char *directory, Octstr *mmscname)
{
char qf[QFNAMEMAX], subdir[64];
int fd, i, n;
MmsEnvelope *e;
Octstr *msgid, *r = NULL;
Octstr *ms, *res = NULL, *xfrom = NULL;
int mtype;
fd = mkqf(qf, subdir, directory);
if (fd < 0) {
error(0, "mms_queue_add[%s]: Failed err=%s\n", directory, strerror(errno));
return NULL;
}
res = xmake_qf(qf, subdir);
mtype = mms_messagetype(m);
/* Get MsgID, Fixup if not there and needed. */
if ((msgid = mms_get_header_value(m, octstr_imm("Message-ID"))) == NULL) {
msgid = mms_maketransid(octstr_get_cstr(res), mmscname);
if (mtype == MMS_MSGTYPE_SEND_REQ)
mms_replace_header_value(m, "Message-ID", octstr_get_cstr(msgid));
}
ms = mms_tobinary(m); /* Convert message to string. */
xfrom = copy_and_clean_address(from);
e = gw_malloc(sizeof *e); /* Make envelope, clear it. */
memset(e, 0, sizeof *e);
strncpy(e->qf.name, qf, sizeof e->qf.name);
strncpy(e->qf.subdir, subdir, sizeof e->qf.subdir);
strncpy(e->qf.dir, directory, sizeof e->qf.dir);
e->qf.fd = fd;
e->msgtype = mtype;
e->from = xfrom;
e->created = time(NULL);
e->sendt = senddate;
e->expiryt = expirydate ? expirydate : time(NULL) + DEFAULT_EXPIRE;
e->lasttry = 0;
e->attempts = 0;
e->lastaccess = 0;
e->fromproxy = fromproxy;
e->viaproxy = viaproxy;
e->subject = subject;
e->to = gwlist_create();
e->msize = octstr_len(ms);
e->msgId = msgid;
e->token = token;
e->vaspid = vaspid;
e->vasid = vasid;
e->url1 = url1;
e->url2 = url2;
e->hdrs = hdrs ? http_header_duplicate(hdrs) : NULL;
e->dlr = dlr;
e->bill.billed = 0;
for (i = 0, n = to ? gwlist_len(to) : 0; i<n; i++)
if ((r = gwlist_get(to, i)) != NULL &&
(r = copy_and_clean_address(r)) != NULL) {
MmsEnvelopeTo *t = gw_malloc(sizeof *t);
t->rcpt = r;
t->process = 1;
gwlist_append(e->to, t);
}
/* Write queue data. */
if (writeenvelope(e, 1) < 0) {
octstr_destroy(res);
res = NULL;
goto done;
}
/* Write actual data before relinquishing lock on queue file. */
qf[0]= MDF;
if (writemmsdata(ms, qf, subdir, directory) < 0) {
octstr_destroy(res);
res = NULL;
goto done;
}
close(fd); /* Close queue file, thereby letting go of locks. */
done:
/* Free the envelope stuff since we do not need it any more, then free 'e' */
for (i = 0, n = gwlist_len(e->to); i<n; i++) {
MmsEnvelopeTo *to = gwlist_get(e->to, i);
octstr_destroy(to->rcpt);
gw_free(to);
}
gwlist_destroy(e->to, NULL);
gw_free(e); /* Free struct only, caller responsible for arguments. */
octstr_destroy(ms);
octstr_destroy(msgid);
if (xfrom)
octstr_destroy(xfrom);
return res;
}
static int free_envelope(MmsEnvelope *e, int removefromqueue)
{
int i, n;
if (e->msgId)
octstr_destroy(e->msgId);
for (i = 0, n = gwlist_len(e->to); i < n; i++) {
MmsEnvelopeTo *x = gwlist_get(e->to, i);
octstr_destroy(x->rcpt);
gw_free(x);
}
gwlist_destroy(e->to, NULL);
if (e->from)
octstr_destroy(e->from);
if (e->fromproxy)
octstr_destroy(e->fromproxy);
if (e->mdata)
octstr_destroy(e->mdata);
if (e->viaproxy)
octstr_destroy(e->viaproxy);
if (e->token)
octstr_destroy(e->token);
if (e->subject)
octstr_destroy(e->subject);
if (e->vaspid)
octstr_destroy(e->vaspid);
if (e->vasid)
octstr_destroy(e->vasid);
if (e->url1)
octstr_destroy(e->url1);
if (e->url2)
octstr_destroy(e->url2);
if (e->hdrs)
http_destroy_headers(e->hdrs);
if (removefromqueue) {
char fname[2*QFNAMEMAX];
snprintf(fname, -1 + sizeof fname, "%s/%s%s", e->qf.dir, e->qf.subdir, e->qf.name);
unlink(fname);
e->qf.name[0] = MDF;
snprintf(fname, -1 + sizeof fname, "%s/%s%s", e->qf.dir, e->qf.subdir, e->qf.name);
unlink(fname);
}
close(e->qf.fd); /* close and unlock now that we have deleted it. */
gw_free(e);
return 0;
}
int mms_queue_free_env(MmsEnvelope *e)
{
return free_envelope(e, 0);
}
int mms_queue_update(MmsEnvelope *e)
{
int i, n = (e && e->to) ? gwlist_len(e->to) : 0;
int hasrcpt = 0;
MmsEnvelopeTo *x;
if (!e) return -1;
/* FIX: Don't allow expiry to be <= 0 */
if (e->expiryt <= 0)
e->expiryt = time(NULL) + DEFAULT_EXPIRE;
for (i = 0; i < n; i++)
if ((x = gwlist_get(e->to, i)) != NULL &&
x->process) {
hasrcpt = 1;
break;
}
if (!hasrcpt) {
free_envelope(e,1);
return 1;
} else
return writeenvelope(e, 0);
}
int mms_queue_replacedata(MmsEnvelope *e, MmsMsg *m)
{
Octstr *tfname;
Octstr *ms;
int ret = 0;
if (!e) return -1;
tfname = octstr_format(".%c%s.%ld.%d", MDF, e->qf.name + 1, time(NULL), random());
ms = mms_tobinary(m);
if (writemmsdata(ms, octstr_get_cstr(tfname), e->qf.subdir, e->qf.dir) < 0)
ret = -1;
else {
Octstr *fname = octstr_format("%s/%s%c%s", e->qf.dir, e->qf.subdir, MDF, e->qf.name + 1);
Octstr *tmpf = octstr_format("%s/%s%S", e->qf.dir, e->qf.subdir, tfname);
if (rename(octstr_get_cstr(tmpf), octstr_get_cstr(fname)) < 0) {
error(0, "mms_replacedata: Failed to write data file %s: error = %s\n",
octstr_get_cstr(tmpf), strerror(errno));
ret = -1;
unlink(octstr_get_cstr(tmpf)); /* remove it. */
}
octstr_destroy(fname);
octstr_destroy(tmpf);
}
octstr_destroy(ms);
octstr_destroy(tfname);
return ret;
}
MmsMsg *mms_queue_getdata(MmsEnvelope *e)
{
Octstr *fname;
Octstr *ms;
MmsMsg *m;
if (!e) return NULL;
fname = octstr_format("%s/%s%c%s", e->qf.dir, e->qf.subdir, MDF, e->qf.name + 1);
ms = octstr_read_file(octstr_get_cstr(fname));
if (!ms) {
error(0, "mms_queue_getdata: Failed to load data file for queue entry %s in %s",
e->qf.name, e->qf.dir);
octstr_destroy(fname);
return NULL;
}
m = mms_frombinary(ms, octstr_imm(""));
if (!m) {
error(0, "mms_queue_getdata: Failed to decode data file for queue entry %s in %s",
e->qf.name, e->qf.dir);
octstr_destroy(fname);
return NULL;
}
octstr_destroy(ms);
octstr_destroy(fname);
return m;
}
struct Qthread_t {
List *l;
int (*deliver)(MmsEnvelope *e);
};
static void tdeliver(struct Qthread_t *qt)
{
MmsEnvelope *e;
while ((e = gwlist_consume(qt->l)) != NULL) {
int res;
res = qt->deliver(e); /* If it is on the queue, it has to be delivered. */
if (res != 1) /* Then delete as it wasn't deleted. */
free_envelope(e, 0);
}
/* Consume failed, time to go away. */
if (qt->l)
gwlist_destroy(qt->l, NULL);
qt->l = NULL; /* Signal that we are gone. */
}
/* runs over a single directory, running queue items. return -1 if failed to run some item.
* each directory found is pushed onto stack for future processing.
* dir must have trailing slash
* return value of -2 means quit.
*/
static int run_dir(char *topdir, char *dir, struct Qthread_t *tlist, int num_threads, int *i, List *stack)
{
DIR *dirp;
struct dirent *dp;
Octstr *tdir = octstr_format("%s/%s", topdir, dir);
char *xdir = octstr_get_cstr(tdir);
int ret = 0;
dirp = opendir(xdir);
if (!dirp) {
error(0, "mms_queue_run: Failed to read queue directory %s, error=%s",
xdir, strerror(errno));
ret = -1;
goto done;
}
while ((dp = readdir(dirp)) != NULL) {
struct stat st;
Octstr *xfname = octstr_format("%s%s", xdir, dp->d_name);
int sres = stat(octstr_get_cstr(xfname), &st);
time_t tnow = time(NULL);
octstr_destroy(xfname);
if (sres == 0 && S_ISREG(st.st_mode) &&
dp->d_name[0] == MQF &&
dp->d_name[1] == 'f') {
Octstr *xqf = xmake_qf(dp->d_name, dir);
MmsEnvelope *e = mms_queue_readenvelope(octstr_get_cstr(xqf),topdir, 0);
octstr_destroy(xqf);
if (!e)
continue;
if (e->sendt <= tnow) {
int queued = 0;
int j = *i; /* This is the next thread to use. Checking for cycles. */
do {
if (tlist[*i].l) {
debug("queuerun", 0, "Queued to thread %d for %s%s, sendt=%d, tnow=%d",
*i, xdir, dp->d_name, (int)e->sendt, (int)tnow);
gwlist_produce(tlist[*i].l, e);
queued = 1;
}
*i = (*i+1)%num_threads;
} while (!queued && *i != j);
if (!queued) { /* A problem. There are no sender threads! */
free_envelope(e, 0);
error(0, "mms_queue_run: No active sender queues for directory %s. Quiting.",
xdir);
ret = -2;
break;
}
} else
free_envelope(e,0); /* Let go of it. */
} else if (sres == 0 && S_ISDIR(st.st_mode) &&
strcmp(dp->d_name, ".") != 0 &&
strcmp(dp->d_name, "..") != 0) {
Octstr *newdir = octstr_format("%s%s/", dir, dp->d_name);
gwlist_append(stack, newdir); /* push it... */
}
}
if (dirp) closedir(dirp);
done:
if (tdir)
octstr_destroy(tdir);
return ret;
}
void mms_queue_run(char *dir,
int (*deliver)(MmsEnvelope *),
double sleepsecs, int num_threads, int *rstop)
{
struct Qthread_t *tlist;
int i, qstop = 0;
List *stack = gwlist_create();
gw_assert(num_threads>0);
tlist = gw_malloc(num_threads*sizeof tlist[0]);
for (i = 0; i<num_threads; i++) { /* Create threads for sending. */
tlist[i].l = gwlist_create();
gwlist_add_producer(tlist[i].l);
tlist[i].deliver = deliver;
gwthread_create((gwthread_func_t *)tdeliver, &tlist[i]);
}
i = 0; /* For stepping through above array. */
do {
Octstr *xdir = NULL;
gwlist_append(stack, octstr_create("")); /* Put initial dir on there. */
while (!*rstop &&
(xdir = gwlist_extract_first(stack)) != NULL) {
int ret = run_dir(dir, octstr_get_cstr(xdir), tlist, num_threads, &i, stack);
octstr_destroy(xdir);
xdir = NULL;
if (ret < 0) {
if (ret <= -2)
qstop = 1;
goto qloop;
}
}
if (xdir)
octstr_destroy(xdir);
if (*rstop)
break;
qloop:
gwthread_sleep(sleepsecs);
} while (!qstop);
/* We are out of the queue, time to go away. */
for (i = 0; i<num_threads; i++)
if (tlist[i].l)
gwlist_remove_producer(tlist[i].l);
gwthread_join_every((gwthread_func_t *)tdeliver); /* Wait for them all to terminate. */
for (i = 0; i<num_threads; i++)
if (tlist[i].l)
gwlist_destroy(tlist[i].l,NULL); /* Final destroy if needed. */
gw_free(tlist);
gwlist_destroy(stack, (gwlist_item_destructor_t *)octstr_destroy);
return;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -