📄 jobs.c++
字号:
/* * Look for a job in the in-memory cache. */Job*HylaFAXServer::findJobInMemmory(const char* jobid){ if (curJob->jobid == jobid) // fast check return (curJob); Job** jpp = (Job**) jobs.find(jobid); if (jpp) return (*jpp); return (jobid == defJob.jobid ? &defJob : (Job*) NULL);}/* * Look for a job on disk and, if found, read it * into memory and return it. */Job*HylaFAXServer::findJobOnDisk(const char* jid, fxStr& emsg){ fxStr filename(fxStr::format("/" FAX_SENDDIR "/" FAX_QFILEPREF "%s", jid)); struct stat sb; if (!FileCache::update(filename, sb)) { /* * Not in sendq, look in the doneq for the job. */ filename = fxStr::format("/" FAX_DONEDIR "/" FAX_QFILEPREF "%s", jid); if (!FileCache::update(filename, sb)) { emsg = fxStr::format("job does not exist (%s)", strerror(errno)); return (NULL); } } if (!S_ISREG(sb.st_mode)) { emsg = "job description file is not a regular file"; return (NULL); } int fd = Sys::open(filename, O_RDWR); if (fd >= 0) { // XXX should we lock here??? Job* req = new Job(&filename[1], fd); bool reject; if (req->readQFile(reject) && !reject) { Sys::close(req->fd), req->fd = -1; if (checkAccess(*req, T_JOB, A_READ) ) return (req); emsg = "Permission denied"; delete req; return (NULL); } emsg = "invalid or corrupted job description file"; delete req; // NB: closes fd } else emsg = fxStr::format("cannot open job description file %s (%s)", (const char*) filename, strerror(errno)); return (NULL);}/* * Update a job's state from the on-disk copy. */boolHylaFAXServer::updateJobFromDisk(Job& job){ bool status = false; if (lockJob(job, LOCK_SH|LOCK_NB)) { bool reject; status = (job.reReadQFile(reject) && !reject); unlockJob(job); } return (status);}/* * Lock a job description file, creating it if * necessary. Errors are not expected--if one * occurs a descriptive message is returned for * transmission to the client. */boolHylaFAXServer::lockJob(Job& job, int how, fxStr& emsg){ if (job.fd < 0) { job.fd = Sys::open("/" | job.qfile, O_RDWR|O_CREAT, 0600); if (job.fd < 0) { emsg = "Cannot open/create job description file /" | job.qfile; return (false); } } if (flock(job.fd, how) < 0) { emsg = fxStr::format("Job file lock failed: %s", strerror(errno)); Sys::close(job.fd), job.fd = -1; return (false); } else return (true);}/* * Like above, but no error message is returned. */boolHylaFAXServer::lockJob(Job& job, int how){ if (job.fd < 0) job.fd = Sys::open("/" | job.qfile, O_RDWR|O_CREAT, 0600); return (job.fd >= 0 && flock(job.fd, how) >= 0);}/* * Unlock a previously-locked job. */voidHylaFAXServer::unlockJob(Job& job){ if (job.fd >= 0) Sys::close(job.fd), job.fd = -1; // implicit unlock}/* * Find a job either in memory (in the cache) or * on disk. If a job is found in memory and the * on-disk state is more current, update the state * in the cache. */Job*HylaFAXServer::findJob(const char* jobid, fxStr& emsg){ Job* job = findJobInMemmory(jobid); if (job) { /* * Verify job is still around (another process has * not deleted it) and if the on-disk state has * been updated, re-read the job description file. */ if (!checkJobState(job)) { // We will re-check on disk in case a job was moved between queues job = NULL; emsg = "job deleted by another party"; } } if (!job) { /* * We can only afford a certain amount of space, * unfortunately, there is no "bright" way to remove jobs * Ideally we'ld have an "aging" method, so the LRU job * would be the one deleted... */ if (jobs.size() > 10) { JobDictIter iter(jobs); job = iter.value(); jobs.remove(job->jobid); delete job; } job = findJobOnDisk(jobid, emsg); if (job) jobs[job->jobid] = job; } return (job);}#ifdef OLDPROTO_SUPPORT/* * Read the state of all jobs into memory; it is * assumed that no jobs are presently in the cache. * * This interface is used by the old protocol to * support status query commands. */voidHylaFAXServer::readJobs(void){ // NB: we don't look in the doneq DIR* dir = opendir("/" FAX_SENDDIR); if (dir) { for (dirent* dp = readdir(dir); dp; dp = readdir(dir)) { if (dp->d_name[0] != 'q') continue; fxStr emsg; Job* job = findJobOnDisk(dp->d_name+1, emsg); if (job) jobs[job->jobid] = job; } closedir(dir); } else logError("Cannot read job queue directory: %s", strerror(errno));}#endif /* OLDPROTO_SUPPORT *//* * Purge all in-memory job state. */voidHylaFAXServer::purgeJobs(void){ for (JobDictIter iter(jobs); iter.notDone(); iter++) { Job* job = iter.value(); jobs.remove(job->jobid); delete job; }}/* * Send a reply identifying the current job. */voidHylaFAXServer::replyCurrentJob(const char* leader){ if (curJob->jobid == "default") reply(200, "%s (default).", leader); else reply(200, "%s jobid: %s groupid: %s.", leader, (const char*) curJob->jobid, (const char*) curJob->groupid);}/* * Set the current job. */voidHylaFAXServer::setCurrentJob(const char* jobid){ fxStr emsg; Job* job = findJob(jobid, emsg); if (job) { curJob = job; replyCurrentJob("Current job:"); } else reply(500, "Cannot set job %s; %s.", jobid, (const char*) emsg);}/* * Reset a job's state to what is currently on disk. */voidHylaFAXServer::resetJob(const char* jobid){ fxStr emsg; Job* job = findJob(jobid, emsg); if (job) { if (job->jobid == "default") { initDefaultJob(); reply(200, "Default job reset to initial state."); } else if (job->state != FaxRequest::state_suspended) { reply(504, "Job %s not reset; must be suspended.", jobid); } else { updateJobFromDisk(*job); struct stat sb; if (FileCache::lookup("/" | job->qfile, sb)) job->lastmod = sb.st_mtime; reply(200, "Job %s reset to last state saved to disk.", jobid); } } else reply(500, "Cannot reset job %s; %s.", jobid, (const char*) emsg);}/* * Common work done for many job-related commands. */Job*HylaFAXServer::preJobCmd(const char* op, const char* jobid, fxStr& emsg){ Job* job = findJob(jobid, emsg); if (job) { if (job->jobid == "default") { reply(504, "Cannot %s default job.", op); job = NULL; } else if (!IS(PRIVILEGED) && job->owner != the_user) { reply(504, "Cannot %s job: %s.", op, strerror(EPERM)); job = NULL; } } else reply(500, "Cannot %s job %s; %s.", op, jobid, (const char*) emsg); return (job);}/* * Delete all job state (both on disk and in memory). */voidHylaFAXServer::deleteJob(const char* jobid){ fxStr emsg; Job* job = preJobCmd("delete", jobid, emsg); if (job) { const char* startdir = cwd->pathname; if (Sys::chdir("/") < 0) { reply(504, "Cannot change to base spool directory."); return; } if (job->state != FaxRequest::state_done && job->state != FaxRequest::state_failed && job->state != FaxRequest::state_suspended) { reply(504, "Job %s not deleted; use JSUSP first.", jobid); return; } if (!lockJob(*job, LOCK_EX|LOCK_NB, emsg)) { reply(504, "Cannot delete job: %s.", (const char*) emsg); return; } /* * Jobs in the doneq (state_done) have had their * documents converted to references (w/o links) * to the base document name; thus there is no * work to do to cleanup document state (a separate * scavenger program must deal with this since it * requires global knowledge of what jobs reference * what documents). * * Jobs that have yet to complete however hold links * to documents that must be removed. We do this here * and also notify the scheduler about our work so that * it can properly expunge imaged versions of the docs. */ if (job->state == FaxRequest::state_suspended) { for (u_int i = 0, n = job->items.length(); i < n; i++) { const FaxItem& fitem = job->items[i]; switch (fitem.op) { case FaxRequest::send_fax: if (sendQueuerACK(emsg, "U%s", (const char*) fitem.item) || !job->isUnreferenced(i)) break; /* ... fall thru */ case FaxRequest::send_tiff_saved: case FaxRequest::send_tiff: case FaxRequest::send_pdf_saved: case FaxRequest::send_pdf: case FaxRequest::send_postscript: case FaxRequest::send_postscript_saved: case FaxRequest::send_pcl: case FaxRequest::send_pcl_saved: case FaxRequest::send_data: Sys::unlink(fitem.item); break; } } } else { // expunge any cover page documents for (u_int i = 0, n = job->items.length(); i < n; i++) { const FaxItem& fitem = job->items[i]; switch (fitem.op) { case FaxRequest::send_tiff_saved: case FaxRequest::send_tiff: case FaxRequest::send_pdf_saved: case FaxRequest::send_pdf: case FaxRequest::send_postscript: case FaxRequest::send_postscript_saved: case FaxRequest::send_pcl: case FaxRequest::send_pcl_saved: if (fitem.item.findR(fitem.item.length(), ".cover")) Sys::unlink(fitem.item); break; } } } if (Sys::unlink(job->qfile) < 0) reply(504, "Deletion of queue file %s failed.", (const char*) job->qfile); if (Sys::chdir(startdir) < 0) reply(504, "Cannot change to %s spool directory.", startdir); jobs.remove(job->jobid); if (job == curJob) // make default job current curJob = &defJob; delete job; // NB: implicit unlock replyCurrentJob(fxStr::format("Job %s deleted; current job:", jobid)); }}/* * Common work for doing job state manipulations. */voidHylaFAXServer::operateOnJob(const char* jobid, const char* what, const char* op){ fxStr emsg; Job* job = preJobCmd(what, jobid, emsg); if (job) { if (job->state == FaxRequest::state_done || job->state == FaxRequest::state_failed) { reply(504, "Job %s not %sed; already done.", jobid, what); return; } if (sendQueuerACK(emsg, "%s%s", op, jobid)) reply(200, "Job %s %sed.", jobid, what); else reply(460, "Failed to %s job %s: %s.", what, jobid, (const char*) emsg); }}/* * Terminate a job, potentially aborting any call in progress. */voidHylaFAXServer::killJob(const char* jobid){ operateOnJob(jobid, "kill", "K");}/* * Suspend a job from being scheduled. */voidHylaFAXServer::suspendJob(const char* jobid){ operateOnJob(jobid, "suspend", "X");}/* * Interrupt a job from being scheduled. */voidHylaFAXServer::interruptJob(const char* jobid){ operateOnJob(jobid, "interrupt", "Y");}voidHylaFAXServer::replyBadJob(const Job& job, Token t){ reply(504, "Cannot submit job %s; null or missing %s parameter.", (const char*) job.jobid, parmToken(t));}/* * Submit a job for scheduling. */voidHylaFAXServer::submitJob(const char* jobid){ fxStr emsg; Job* job = preJobCmd("submit", jobid, emsg); if (job) { if (job->state == FaxRequest::state_done || job->state == FaxRequest::state_failed) { reply(504, "Job %s not submitted; already done.", jobid); return; } if (job->state != FaxRequest::state_suspended) { reply(504, "Job %s not submitted; use JSUSP first.", jobid); return; } if (job->number == "") replyBadJob(*job, T_DIALSTRING); else if (job->mailaddr == "") replyBadJob(*job, T_NOTIFYADDR); else if (job->sender == "") replyBadJob(*job, T_FROM_USER); else if (job->modem == "") replyBadJob(*job, T_MODEM); else if (job->client == "") replyBadJob(*job, T_CLIENT); else { if (job->external == "") job->external = job->number; if (updateJobOnDisk(*job, emsg)) { /* * NB: we don't mark the lastmod time for the * job since the scheduler should re-write the * queue file to reflect what it did with it * (e.g. what state it placed the job in). */ if (sendQueuerACK(emsg, "S%s", jobid)) { reply(200, "Job %s submitted.", jobid); Job** jpp = (Job**) blankJobs.find(job->jobid); if (jpp) blankJobs.remove(job->jobid); // it's no longer blank } else reply(460, "Failed to submit job %s: %s.", jobid, (const char*) emsg); } else reply(450, "%s.", (const char*) emsg); // XXX 550? } }}/* * Wait for a job to complete or for the operation * to be aborted. A data channel is opened and * job status information is returned on it. The * client can terminate this operation with an * ABOR command on the control channel; just like * a normal file transfer operation. */voidHylaFAXServer::waitForJob(const char* jobid){ fxStr emsg; Job* job = findJob(jobid, emsg); if (job) { if (job->jobid == "default") { reply(504, "Cannot wait for default job."); return; } if (job->state == FaxRequest::state_done || job->state == FaxRequest::state_failed) { reply(216, "Job %s done (already).", jobid);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -