⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mms_queue.c

📁 一个非常实用的手机mms开发包,能够满足大部分不同品牌和型号手机的开发
💻 C
📖 第 1 页 / 共 2 页
字号:
     do {
	  Octstr *tmp;
	  xqf = octstr_format("%cf%ld.%d.x%d.%ld", 
			       MQF, 
			      (long)time(NULL) % 10000, 
			      (++ect % 10000), getpid()%1000, random() % 100);
	  tmp = octstr_format("%.64s/%s%S", mms_queuedir, subdir, xqf);
	  
	  ctmp = octstr_get_cstr(tmp);
	  fd = open(ctmp, O_RDWR|O_CREAT|O_EXCL, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
	  if (fd >= 0 && 
	      mm_lockfile(fd,ctmp,1) != 0) {
	       unlink(ctmp);
	       close(fd);
	       fd = -1;
	  }
	  octstr_destroy(tmp);
	  if (fd >= 0) 
	       break;
	  

	  octstr_destroy(xqf);
	  xqf = NULL;	  
     } while (i++ < MAXTRIES);
	       
     if (fd >= 0) 	  
	  strncpy(qf, octstr_get_cstr(xqf), QFNAMEMAX);    
     
     if (xqf) octstr_destroy(xqf);

     return fd;
}

static int writemmsdata(Octstr *ms, char *df, char subdir[], char *mms_queuedir)
{
     Octstr *dfname;
     int fd, n, res = 0;

     
     dfname = octstr_format("%s/%s%s", mms_queuedir, subdir, df);

     fd = open(octstr_get_cstr(dfname), 
	       O_WRONLY|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);	  
     if (fd < 0) {
	  error(0, "mms_queuadd: Failed to open data file %s: error = %s\n",
		octstr_get_cstr(dfname), strerror(errno));
	  res = -1;	
	  goto done;
     }

     n = octstr_write_to_socket(fd, ms);
     close(fd);
	  
     if (n != 0) {
	  error(0, "mms_queuadd: Failed to write data file %s: error = %s\n",
		octstr_get_cstr(dfname), strerror(errno));
	  unlink(octstr_get_cstr(dfname));
	  res = -1;
     }

 done:
     octstr_destroy(dfname);
     return res;
}


static Octstr *copy_and_clean_address(Octstr *addr)
{
     Octstr *s = octstr_duplicate(addr);
     int k, i;

     octstr_strip_blanks(s);
     /* Only clean up email addresses for now. */     
     if ((k = octstr_search_char(s, '@',0)) < 0)
	  goto done;

     /* Find '<' if any, then find last one and remove everything else. */
     
     i = octstr_search_char(s, '<',0);
     if (i >= 0) {
          int j;
	  
	  octstr_delete(s, 0, i+1); /* strip first part off. */
	  
	  j = octstr_search_char(s, '>',0);
          if (j >= 0) 
	       octstr_delete(s, j, octstr_len(s));	  
     } else {
	  /* remove everything after the domain. */
	  int n = octstr_len(s);
	  char *p = octstr_get_cstr(s);

	  
	  for (i = k+1; i < n; i++)
	       if (isspace(p[i])) { /* there can't be space in domain, so this marks end of address. */
		    octstr_delete(s, i, n);
		    break;
	       }
     }
     
done:
     if (octstr_len(s) == 0) {
	  octstr_destroy(s);
	  s = NULL;
     }
     return s;
}

Octstr *mms_queue_add(Octstr *from, List *to, 
		      Octstr *subject,
		      Octstr *fromproxy, Octstr *viaproxy, 
		      time_t senddate, time_t expirydate, MmsMsg *m, Octstr *token, 
		      Octstr *vaspid, Octstr *vasid,
		      Octstr *url1, Octstr *url2,
		      List *hdrs,
		      int dlr,
		      char *directory, Octstr *mmscname)
{
     char qf[QFNAMEMAX], subdir[64];
     int fd, i, n;
     MmsEnvelope *e;
     Octstr *msgid, *r = NULL;
     Octstr *ms, *res = NULL, *xfrom = NULL;
     int mtype;
     fd = mkqf(qf, subdir, directory);

     if (fd < 0) { 
	  error(0, "mms_queue_add[%s]: Failed err=%s\n", directory, strerror(errno));
	  return NULL;
     }

     res = xmake_qf(qf, subdir);      
     mtype = mms_messagetype(m);

     /* Get MsgID,  Fixup if not there and needed. */
     if ((msgid = mms_get_header_value(m, octstr_imm("Message-ID")))  == NULL) {     
	  msgid = mms_maketransid(octstr_get_cstr(res), mmscname);
	  if (mtype == MMS_MSGTYPE_SEND_REQ)
	       mms_replace_header_value(m, "Message-ID", octstr_get_cstr(msgid));
     }

     
     ms = mms_tobinary(m); /* Convert message to string. */

     xfrom = copy_and_clean_address(from);
     e = gw_malloc(sizeof *e);      /* Make envelope, clear it. */
     memset(e, 0, sizeof *e);
     
     strncpy(e->qf.name, qf, sizeof e->qf.name);
     strncpy(e->qf.subdir, subdir, sizeof e->qf.subdir);
     strncpy(e->qf.dir, directory, sizeof e->qf.dir);

     e->qf.fd = fd;
     
     e->msgtype = mtype;
     e->from = xfrom;
     e->created = time(NULL);
     e->sendt = senddate;
     e->expiryt = expirydate ? expirydate : time(NULL) + DEFAULT_EXPIRE;
     e->lasttry = 0;
     e->attempts = 0;
     e->lastaccess = 0;
     e->fromproxy = fromproxy;
     e->viaproxy = viaproxy;
     e->subject = subject;
     e->to = gwlist_create();
     e->msize = octstr_len(ms);
     e->msgId = msgid;
     e->token = token;
     e->vaspid = vaspid;
     e->vasid = vasid;
     e->url1 = url1;
     e->url2 = url2;
     e->hdrs = hdrs ? http_header_duplicate(hdrs) : NULL;

     e->dlr = dlr;
     e->bill.billed = 0;


     for (i = 0, n = to ? gwlist_len(to) : 0; i<n; i++) 
	  if ((r = gwlist_get(to, i)) != NULL && 
	      (r = copy_and_clean_address(r)) != NULL) {
	       MmsEnvelopeTo *t = gw_malloc(sizeof *t);
	       
	       t->rcpt = r;
	       t->process = 1;
	       
	       gwlist_append(e->to, t);
	  }
     
     /* Write queue data. */
     if (writeenvelope(e, 1) < 0) {
	  octstr_destroy(res);
	  res = NULL;
	  goto done;
     }


     /* Write actual data before relinquishing lock on queue file. */

     qf[0]= MDF;

     if (writemmsdata(ms, qf, subdir, directory) < 0) {
	  octstr_destroy(res);
	  res = NULL;
	  goto done;
     }

     close(fd); /* Close queue file, thereby letting go of locks.  */

 done:
     /* Free the envelope stuff since we do not need it any more, then free 'e' */
     for (i = 0, n = gwlist_len(e->to); i<n; i++) {
	  MmsEnvelopeTo *to = gwlist_get(e->to, i);
	  octstr_destroy(to->rcpt); 
	  gw_free(to);
     }
     gwlist_destroy(e->to, NULL);

     gw_free(e); /* Free struct only, caller responsible for arguments. */

     octstr_destroy(ms);
     octstr_destroy(msgid);

     if (xfrom) 
	  octstr_destroy(xfrom);

     return res;
}

static int free_envelope(MmsEnvelope *e, int removefromqueue)
{
     int i, n;

     if (e->msgId)
       octstr_destroy(e->msgId);
     
     for (i = 0, n = gwlist_len(e->to); i < n; i++) {
	  MmsEnvelopeTo *x = gwlist_get(e->to, i);
	  
	  octstr_destroy(x->rcpt);
	  gw_free(x);	  
     }
     gwlist_destroy(e->to, NULL);
     
     if (e->from)
	  octstr_destroy(e->from);

     if (e->fromproxy)
	  octstr_destroy(e->fromproxy);
     if (e->mdata)
	  octstr_destroy(e->mdata);
     if (e->viaproxy)
	  octstr_destroy(e->viaproxy);
     if (e->token)
	  octstr_destroy(e->token);

     if (e->subject)
	  octstr_destroy(e->subject);

     if (e->vaspid)
	  octstr_destroy(e->vaspid);

     if (e->vasid)
	  octstr_destroy(e->vasid);
     
     if (e->url1)
	  octstr_destroy(e->url1);

     if (e->url2)
	  octstr_destroy(e->url2);
     
     if (e->hdrs)
	  http_destroy_headers(e->hdrs);

     if (removefromqueue) {
	  char fname[2*QFNAMEMAX];
	  
	  snprintf(fname, -1 + sizeof fname, "%s/%s%s", e->qf.dir, e->qf.subdir,  e->qf.name); 
	  unlink(fname);
	  e->qf.name[0] = MDF;	  
	  snprintf(fname, -1 + sizeof fname, "%s/%s%s", e->qf.dir, e->qf.subdir, e->qf.name); 
	  unlink(fname);	  
     }
     close(e->qf.fd); /* close and unlock now that we have deleted it. */

     gw_free(e);

     return 0;     
}

int mms_queue_free_env(MmsEnvelope *e)
{

     return free_envelope(e, 0);
}
int mms_queue_update(MmsEnvelope *e)
{
     int i, n = (e && e->to) ? gwlist_len(e->to) : 0;
     int hasrcpt = 0;
     MmsEnvelopeTo *x;     

     if (!e) return -1;
     /* FIX: Don't allow expiry to be <= 0 */
     if (e->expiryt <= 0)
	  e->expiryt = time(NULL) + DEFAULT_EXPIRE;
     for (i = 0; i < n; i++)	  
	  if ((x = gwlist_get(e->to, i)) != NULL && 
	      x->process) {
	       hasrcpt = 1;
	       break;
	  }
     
     if (!hasrcpt) {
	  free_envelope(e,1);
	  return 1;
     } else 
	  return writeenvelope(e, 0);         
}

int mms_queue_replacedata(MmsEnvelope *e, MmsMsg *m)
{
     Octstr *tfname;
     Octstr *ms;
     int ret = 0;
     
     if (!e) return -1;

     tfname = octstr_format(".%c%s.%ld.%d", MDF, e->qf.name + 1, time(NULL), random());
     
     ms = mms_tobinary(m);
     
     if (writemmsdata(ms, octstr_get_cstr(tfname), e->qf.subdir, e->qf.dir) < 0) 
	  ret = -1;
     else {
	  Octstr *fname = octstr_format("%s/%s%c%s", e->qf.dir, e->qf.subdir, MDF, e->qf.name + 1);
	  Octstr *tmpf = octstr_format("%s/%s%S", e->qf.dir, e->qf.subdir, tfname);
	  if (rename(octstr_get_cstr(tmpf), octstr_get_cstr(fname)) < 0) {
	       error(0, "mms_replacedata: Failed to write data file %s: error = %s\n",
		     octstr_get_cstr(tmpf), strerror(errno));	       
	       ret = -1;
	       unlink(octstr_get_cstr(tmpf)); /* remove it. */
	  } 	     
	  octstr_destroy(fname);
	  octstr_destroy(tmpf);
     }
     octstr_destroy(ms);

     octstr_destroy(tfname);
     return ret;     
}

MmsMsg *mms_queue_getdata(MmsEnvelope *e)
{
     Octstr *fname;
     Octstr *ms;
     MmsMsg *m;
     
     if (!e) return NULL;

     fname = octstr_format("%s/%s%c%s", e->qf.dir, e->qf.subdir, MDF, e->qf.name + 1);

     ms = octstr_read_file(octstr_get_cstr(fname));
     if (!ms) {
	  error(0, "mms_queue_getdata: Failed to load data file for queue entry %s in %s",
		e->qf.name, e->qf.dir);
	  octstr_destroy(fname);
	  return NULL;
     }
     m = mms_frombinary(ms, octstr_imm(""));
     if (!m) {
	  error(0, "mms_queue_getdata: Failed to decode data file for queue entry %s in %s",
		e->qf.name, e->qf.dir);
	  octstr_destroy(fname);
	  return NULL;
     }
     octstr_destroy(ms);
     octstr_destroy(fname);

     return m;     
}


struct Qthread_t {
     List *l;
     int (*deliver)(MmsEnvelope *e);
};

static void tdeliver(struct Qthread_t *qt)
{
     MmsEnvelope *e;

     while ((e = gwlist_consume(qt->l)) != NULL) {
	  int res;
	  res = qt->deliver(e); /* If it is on the queue, it has to be delivered. */

	  if (res != 1) /* Then delete as it wasn't deleted. */
	       free_envelope(e, 0);	       	  	  
     }     
     /* Consume failed, time to go away. */
     if (qt->l)
	  gwlist_destroy(qt->l, NULL);
     qt->l = NULL; /* Signal that we are gone. */
}


/* runs over a single directory, running queue items. return -1 if failed to run some item.
 * each directory found is pushed onto stack for future processing. 
 * dir must have trailing slash
 * return value of -2 means quit. 
 */
static int run_dir(char *topdir, char *dir, struct Qthread_t *tlist, int num_threads, int *i, List *stack)
{
     DIR *dirp;
     struct dirent *dp;

     Octstr *tdir = octstr_format("%s/%s", topdir, dir);
     char *xdir = octstr_get_cstr(tdir);
     int ret = 0;

     dirp = opendir(xdir);
	  
     if (!dirp) {
	  error(0, "mms_queue_run: Failed to read queue directory %s, error=%s", 
		xdir, strerror(errno));
	  ret = -1;
	  goto done;
     }
	  
     while ((dp = readdir(dirp)) != NULL) {
	  struct stat st;
	  Octstr *xfname = octstr_format("%s%s", xdir, dp->d_name);
	  int sres = stat(octstr_get_cstr(xfname), &st);
	  time_t tnow = time(NULL);	  

	  octstr_destroy(xfname);

	  if (sres == 0 && S_ISREG(st.st_mode) && 
	      dp->d_name[0] == MQF && 
	      dp->d_name[1] == 'f') {
	       Octstr *xqf = xmake_qf(dp->d_name, dir);
	       MmsEnvelope *e = mms_queue_readenvelope(octstr_get_cstr(xqf),topdir, 0);
	       
	       octstr_destroy(xqf);
	       
	       if (!e) 
		    continue;
	       
	       if (e->sendt <= tnow) {
		    int queued = 0;
		    int j = *i; /* This is the next thread to use. Checking for cycles. */
		    do {
			 if (tlist[*i].l) {
			      debug("queuerun", 0, "Queued to thread %d for %s%s, sendt=%d, tnow=%d", 
				    *i, xdir, dp->d_name, (int)e->sendt, (int)tnow);
			      gwlist_produce(tlist[*i].l, e);
			      queued = 1;
			 }
			 *i = (*i+1)%num_threads;
		    }  while (!queued && *i != j);
		    
		    if (!queued) { /* A problem. There are no sender threads! */
			 free_envelope(e, 0);	       			
			 error(0, "mms_queue_run: No active sender queues for directory %s. Quiting.",
			       xdir);
			 ret = -2; 
			 break;			      
		    }
	       } else
		    free_envelope(e,0); /* Let go of it. */
	       
	  } else if (sres == 0 && S_ISDIR(st.st_mode) &&
		     strcmp(dp->d_name, ".") != 0 &&
		     strcmp(dp->d_name, "..") != 0) {
	       Octstr *newdir = octstr_format("%s%s/", dir, dp->d_name); 
	       gwlist_append(stack, newdir); /* push it... */
	  }			  
     }
     if (dirp) closedir(dirp);	       
 done:
     if (tdir)
	  octstr_destroy(tdir);
     return ret;
}
     
void mms_queue_run(char *dir, 
		   int (*deliver)(MmsEnvelope *), 
		   double sleepsecs, int num_threads, int *rstop)
{
     struct Qthread_t *tlist;
     int i, qstop = 0;
     List *stack = gwlist_create();
     
     gw_assert(num_threads>0);
     
     tlist = gw_malloc(num_threads*sizeof tlist[0]);
     
     for (i = 0; i<num_threads; i++) { /* Create threads for sending. */
	  tlist[i].l = gwlist_create();
	  gwlist_add_producer(tlist[i].l);
	  tlist[i].deliver = deliver;	
	  gwthread_create((gwthread_func_t *)tdeliver, &tlist[i]);
     }
     


     i = 0;  /* For stepping through above array. */
     do { 
	  Octstr *xdir = NULL;	  
	  gwlist_append(stack, octstr_create("")); /* Put initial dir on there. */

	  while (!*rstop && 
		 (xdir = gwlist_extract_first(stack)) != NULL) {
	       int ret = run_dir(dir, octstr_get_cstr(xdir), tlist, num_threads, &i, stack);
	       octstr_destroy(xdir);
	       xdir = NULL;
	       if (ret < 0) {		    
		    if (ret <= -2)
			 qstop = 1;
		    goto qloop;
	       }
	  }
	  if (xdir)
	       octstr_destroy(xdir);
	  if (*rstop) 
	       break;
     qloop:
	  gwthread_sleep(sleepsecs);
     } while (!qstop);

     /* We are out of the queue, time to go away. */
     for (i = 0; i<num_threads; i++)
	  if (tlist[i].l)
	       gwlist_remove_producer(tlist[i].l);
     gwthread_join_every((gwthread_func_t *)tdeliver); /* Wait for them all to terminate. */
  
     for (i = 0; i<num_threads; i++)
	  if (tlist[i].l)
	       gwlist_destroy(tlist[i].l,NULL); /* Final destroy if needed. */
     gw_free(tlist);
     
     gwlist_destroy(stack, (gwlist_item_destructor_t *)octstr_destroy);
     return;
}


⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -