📄 mms_queue.c
字号:
/*
* Mbuni - Open Source MMS Gateway
*
* Queue management functions
*
* Copyright (C) 2003 - 2005, Digital Solutions Ltd. - http://www.dsmagic.com
*
* Paul Bagyenda <bagyenda@dsmagic.com>
*
* This program is free software, distributed under the terms of
* the GNU General Public License, with a few exceptions granted (see LICENSE)
*/
#include <sys/file.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <ctype.h>
#include <dirent.h>
#include "mms_queue.h"
#include "gwlib/log.h"
#include "gwlib/accesslog.h"
#define MQF 'q'
#define MDF 'd'
#define MTF 't'
#define DIR_SEP '/'
#define DIR_SEP_S "/"
int mms_init_queuedir(Octstr *qdir)
{
int i, ret;
char fbuf[512], *xqdir;
octstr_strip_blanks(qdir);
/* Remove trailing slashes. */
for (i = octstr_len(qdir) - 1; i >= 0; i--)
if (octstr_get_char(qdir, i) != DIR_SEP)
break;
else
octstr_delete(qdir, i,1);
xqdir = octstr_get_cstr(qdir);
if ((ret = mkdir(xqdir,
S_IRWXU|S_IRWXG)) < 0 &&
errno != EEXIST)
return errno;
for (i = 0; _TT[i]; i++) { /* initialise the top level only... */
sprintf(fbuf, "%.128s/%c", xqdir, _TT[i]);
if (mkdir(fbuf,
S_IRWXU|S_IRWXG) < 0 &&
errno != EEXIST)
return errno;
}
return 0;
}
static int free_envelope(MmsEnvelope *e, int removefromqueue);
/* Queue file structure:
* - File consists of a series of lines, each line begins with a single letter, followed by
* a parameter. Letters mean:
* T - message type (full text string -- MMS message type.
* I - message ID
* F - From address
* R - Recipient (the ones pending) for this message
* C - Time queue entry was created
* L - Time of last delivery attempt
* D - Time of (next) delivery attempt
* X - Time of expiry of message
* N - Number of delivery attempts so far
* P - Proxy who sent it to us
* p - Proxy through which this message shd be delivered (e.g. delivery report)
* S - Message size
* s - Message subject.
* f - time of last content fetch
* t - user defined token.
* b - billed amount.
* r - whether delivery receipts are required or not.
* M - Application specific data (string)
* V - VASPID -- from VASP
* v - vasid -- from VASP
* U - url1 -- e.g. for delivery report
* u - url2 -- e.g. for read report
* H - generic headers associated with message (e.g. for passing to MMC)
*/
static int _putline(int fd, char *code, char buf[])
{
Octstr *s = octstr_format("%s%s\n", code, buf);
int res;
res = octstr_write_to_socket(fd, s);
octstr_destroy(s);
return res;
}
Octstr *xmake_qf(char realqf[], char subdir[])
{
Octstr *res = octstr_format("%s%s", subdir, realqf); /* Make the queue identifier -- convert '/' to '-' */
octstr_replace(res, octstr_imm(DIR_SEP_S), octstr_imm("-"));
return res;
}
/* break down a qf into dir and sub-dir. subdir contains final '/'! */
static void get_subdir(char *qf, char subdir[64], char realqf[QFNAMEMAX])
{
char *p = strrchr(qf, '-');
if (p == NULL) {
strncpy(realqf, qf, QFNAMEMAX);
subdir[0] = '\0';
} else {
int i, n;
strncpy(realqf, p + 1, QFNAMEMAX);
n = (p+1) - qf;
strncpy(subdir, qf, n);
subdir[n] = 0;
for (i = 0; i<n; i++)
if (subdir[i] == '-')
subdir[i] = DIR_SEP;
}
}
/*
* Attempt to read an envelope from queue file:
* - opens and locks the file.
* - if the lock succeeds, check that file hasn't changed since opening. If it has
* return NULL (i.e. file is being processed elsewhere -- race condition), otherwise read it.
* - If should block is 1, then does a potentially blocking attempt to lock the file.
*/
MmsEnvelope *mms_queue_readenvelope(char *qf, char *mms_queuedir, int shouldblock)
{
Octstr *fname;
int fd;
Octstr *qdata, *s;
ParseContext *p;
MmsEnvelope *e;
int okfile = 0;
char subdir[64];
char realqf[QFNAMEMAX];
char xqf[QFNAMEMAX+64];
get_subdir(qf, subdir, realqf); /* break it down... */
fname = octstr_format( "%.128s/%s%s", mms_queuedir, subdir, realqf);
strncpy(xqf, octstr_get_cstr(fname), sizeof xqf);
if ((fd = open(octstr_get_cstr(fname), O_RDONLY)) < 0) {
octstr_destroy(fname);
return NULL;
} else if (mm_lockfile(fd, octstr_get_cstr(fname), shouldblock) != 0) {
close(fd);
octstr_destroy(fname);
return NULL;
}
e = gw_malloc(sizeof *e);
memset(e, 0, sizeof *e); /* Clear it all .*/
e->to = gwlist_create();
e->qf.fd = fd;
strncpy(e->qf.name, realqf, sizeof e->qf.name);
strncpy(e->qf.subdir, subdir, sizeof e->qf.subdir);
strncpy(e->qf.dir, mms_queuedir, sizeof e->qf.dir);
strncpy(e->xqfname, qf, sizeof e->xqfname);
qdata = octstr_read_file(octstr_get_cstr(fname));
octstr_destroy(fname);
if (qdata == NULL)
qdata = octstr_imm("");
p = parse_context_create(qdata);
for (s = parse_get_line(p); s;
s = parse_get_line(p)) {
char *line = octstr_get_cstr(s);
int ch = line[0];
char *res = line + 1;
char *ptmp;
switch (ch) {
Octstr *t;
MmsEnvelopeTo *to;
case 'T':
t = octstr_create(res);
e->msgtype = mms_string_to_message_type(t);
octstr_destroy(t);
if (e->msgtype < 0) {
e->msgtype = 0;
error(0, "mms_queueread: Unknown MMS message type (%s) in file %s, skipped!\n",
res, xqf);
}
break;
case 'I':
e->msgId = octstr_create(res);
break;
case 'F':
e->from = octstr_create(res);
if (mms_validate_address(e->from) != 0) {
warning(0, "mms_queueread: Mal-formed address [%s] in file %s! "
"Attempting fixup.", res, xqf);
_mms_fixup_address(e->from, NULL);
}
break;
case 'R':
t = octstr_create(res);
if (mms_validate_address(t) != 0) {
warning(0, "mms_queueread: Mal-formed address [%s] in file %s! "
"Attempting fixup.", res, xqf);
_mms_fixup_address(t, NULL);
}
to = gw_malloc(sizeof *to);
to->rcpt = t;
to->process = 1;
gwlist_append(e->to, to);
break;
case 'C':
e->created = atol(res);
break;
case 'L':
e->lasttry = atol(res);
break;
case 'D':
e->sendt = atol(res);
break;
case 'X':
e->expiryt = atol(res);
break;
case 'N':
e->attempts = atol(res);
break;
case 'P':
e->fromproxy = octstr_create(res);
break;
case 'M':
e->mdata = octstr_create(res);
break;
case 'p':
e->viaproxy = octstr_create(res);
break;
case 'S':
e->msize = atol(res);
break;
case 's':
e->subject = octstr_create(res);
break;
case 't':
e->token = octstr_create(res);
break;
case 'f':
e->lastaccess = atol(res);
break;
case 'b':
e->bill.billed = 1;
e->bill.amt = atof(res);
break;
case 'r':
e->dlr = 1;
break;
case 'V':
e->vaspid = octstr_create(res);
break;
case 'v':
e->vasid = octstr_create(res);
break;
case 'U':
e->url1 = octstr_create(res);
break;
case 'u':
e->url2 = octstr_create(res);
break;
case 'H':
if (e->hdrs == NULL)
e->hdrs = http_create_empty_headers();
if ((ptmp = index(res, ':')) == NULL)
error(0, "Incorrectly formatted line %s in queue file %s!",
line, xqf);
else {
char *value = ptmp + 1;
char hname[512];
int xlen = (ptmp - res < sizeof hname) ? ptmp - res : -1 + sizeof hname;
strncpy(hname, res, xlen);
hname[xlen] = 0; /* terminate it. */
http_header_add(e->hdrs, hname, value);
}
break;
case '.':
okfile = 1;
break;
default:
error(0, "Unknown QF header %c in file %s!", ch, xqf);
break;
}
octstr_destroy(s);
if (okfile)
break; /* We are done. */
}
parse_context_destroy(p);
octstr_destroy(qdata);
/* We should properly validate the queue file here. */
if (!okfile) {
free_envelope(e,0);
e = NULL;
error(0, "Corrupt queue control file: %s", xqf);
}
return e;
}
/* Updates envelope to queue file:
* - opens temp file
* - writes output to temp file, if not new else writes directly.
* - renames temp file to queue file (if not new)
* This function doesn't check that this envelope is useless (i.e. no recipients)
* - If function returns -1, caller should check errno for error.
*/
static int writeenvelope(MmsEnvelope *e, int newenv)
{
Octstr *tfname = NULL;
char *s;
char buf[512];
int fd;
int i, n;
int res = 0;
if (newenv)
fd = e->qf.fd;
else {
tfname = octstr_format(
"%s/%s%c%s.%d", e->qf.dir, e->qf.subdir,
MTF, e->qf.name + 1, random());
fd = open(octstr_get_cstr(tfname),
O_RDWR|O_CREAT|O_EXCL, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
if (fd < 0 ) {
error(0, "mms_queueadd: Failed to open temp file %s: error = %s\n",
octstr_get_cstr(tfname), strerror(errno));
res = -1;
goto done;
} else if (mm_lockfile(fd, octstr_get_cstr(tfname), 0) != 0) { /* Lock it. */
error(0, "mms_queueadd: Failed lock temp file %s: error = %s\n",
octstr_get_cstr(tfname), strerror(errno));
res = -1;
goto done;
}
}
/* Write out. */
s = (char *)mms_message_type_to_cstr(e->msgtype);
if (!s) {
error(0, "mms_queuewrite: Unknown MMS message type %d! Skipped\n", e->msgtype);
s = "";
}
_putline(fd, "T", s);
if (e->msgId)
_putline(fd, "I", octstr_get_cstr(e->msgId));
if (e->from)
_putline(fd, "F", octstr_get_cstr(e->from));
if (e->to)
n = gwlist_len(e->to);
else
n = 0;
for (i = 0; i < n; i++) {
MmsEnvelopeTo *to = gwlist_get(e->to, i);
if (to->process)
_putline(fd, "R", octstr_get_cstr(to->rcpt));
}
/* Output headers if any. */
n = (e->hdrs) ? gwlist_len(e->hdrs) : 0;
for (i = 0; i < n; i++) {
Octstr *h = NULL, *v = NULL;
http_header_get(e->hdrs, i, &h, &v);
if (h && v) {
Octstr *x = octstr_format("%s:%s", octstr_get_cstr(h),
octstr_get_cstr(v));
_putline(fd, "H", octstr_get_cstr(x));
octstr_destroy(x);
}
if (h) octstr_destroy(h);
if (v) octstr_destroy(v);
}
sprintf(buf, "%ld", e->created);
_putline(fd, "C", buf);
if (e->lasttry) {
sprintf(buf, "%ld", e->lasttry);
_putline(fd, "L", buf);
}
if (e->sendt) {
sprintf(buf, "%ld", e->sendt);
_putline(fd, "D", buf);
}
if (e->expiryt) {
sprintf(buf, "%ld", e->expiryt);
_putline(fd, "X", buf);
}
if (e->attempts) {
sprintf(buf, "%ld", e->attempts);
_putline(fd, "N", buf);
}
if (e->lastaccess) {
sprintf(buf, "%ld", e->lastaccess);
_putline(fd, "f", buf);
}
sprintf(buf, "%ld", e->msize);
_putline(fd, "S", buf);
if (e->fromproxy)
_putline(fd, "P", octstr_get_cstr(e->fromproxy));
if (e->mdata)
_putline(fd, "M", octstr_get_cstr(e->mdata));
if (e->subject)
_putline(fd, "s", octstr_get_cstr(e->subject));
if (e->viaproxy)
_putline(fd, "p", octstr_get_cstr(e->viaproxy));
if (e->token)
_putline(fd, "t", octstr_get_cstr(e->token));
if (e->vaspid)
_putline(fd, "V", octstr_get_cstr(e->vaspid));
if (e->vasid)
_putline(fd, "v", octstr_get_cstr(e->vasid));
if (e->url1)
_putline(fd, "U", octstr_get_cstr(e->url1));
if (e->url2)
_putline(fd, "u", octstr_get_cstr(e->url2));
if (e->dlr)
_putline(fd, "r", "Yes");
if (e->bill.billed) {
sprintf(buf, "%.3f", e->bill.amt);
_putline(fd,"b", buf);
}
_putline(fd, "", ".");
fsync(fd); /* Sync data. */
if (!newenv) { /* An update */
Octstr *qfname;
qfname = octstr_format("%s/%s%s", e->qf.dir,
e->qf.subdir,
e->qf.name);
if (rename(octstr_get_cstr(tfname), octstr_get_cstr(qfname)) < 0) {
error(0, "mms_queuewrite: Failed to rename %s to %s: error = %s\n",
octstr_get_cstr(qfname), octstr_get_cstr(tfname), strerror(errno));
close(fd); /* Close new one, keep old one. */
res = -1;
} else { /* On success, new descriptor replaces old one and we close old one. */
close(e->qf.fd);
e->qf.fd = fd;
}
octstr_destroy(qfname);
}
done:
if (tfname) octstr_destroy(tfname);
return res;
}
#define MAXTRIES 10
/* Makes a qf file in the queue directory.
* Makes several attempts then fails (returns -1) if it can't, fd otherwise
* puts queue file name in qf (without directory name).
* It is up to the caller to lock the file descriptor if needed.
*/
static int mkqf(char qf[QFNAMEMAX], char subdir[64], char *mms_queuedir)
{
Octstr *xqf = NULL;
char *ctmp;
int i = 0, fd = -1;
static int ect;
if (!mms_queuedir)
gw_panic(0, "Queue directory passed as null!");
/* First we decide the directory into which it goes... */
if ((i = random() % 3) == 0) /* toplevel. */
subdir[0] = 0;
else if (i == 1) /* one in */
sprintf(subdir, "%c/", _TT[random() % _TTSIZE]);
else { /* two in. */
char csubdir[QFNAMEMAX];
sprintf(subdir, "%c/%c%c/",
_TT[random() % _TTSIZE],
_TT[random() % _TTSIZE],
_TT[random() % _TTSIZE]);
sprintf(csubdir, "%s/%s", mms_queuedir, subdir);
if (mkdir(csubdir,
S_IRWXU|S_IRWXG) < 0 &&
errno != EEXIST) {
error(0, "make queue file: Failed to create dir %s - %s!",
csubdir, strerror(errno));
return -1;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -