📄 rf_engine.c
字号:
#else /* SIMULATE */ for (i = 0; i < node->numAntecedents; i++) { a = *(node->antecedents + i); RF_ASSERT(a->status == rf_good); RF_ASSERT(a->numSuccDone <= a->numSuccedents); RF_ASSERT(a->numSuccDone <= a->numSuccFired); if (a->numSuccDone == a->numSuccFired) { if (a->undoFunc == rf_NullNodeFunc) { /* don't fire NIL nodes, just process them */ a->next = finishlist; finishlist = a; } else { if (context != RF_INTR_CONTEXT) { /* we only have to enqueue if we're at intr context */ a->next = firelist; /* put node on a list to be fired after we unlock */ firelist = a; } else { /* enqueue the node for the dag exec thread to fire */ RF_ASSERT(NodeReady(a)); if (q) { q->next = a; q = a; } else { qh = q = a; qh->next = NULL; } } } } } if (q) { /* xfer our local list of nodes to the node queue */ q->next = raidPtr->node_queue; raidPtr->node_queue = qh; DO_SIGNAL(raidPtr); } DO_UNLOCK(raidPtr); for (; finishlist; finishlist = next) { /* NIL nodes: no need to fire them */ next = finishlist->next; finishlist->status = rf_good; /* * Okay, here we're calling rf_FinishNode() on nodes that * have the null function as their work proc. Such a node * could be the first node in a DAG. If so, it will * cause the DAG to complete, which will in turn free * memory used by the DAG, which includes the node in * question. Thus, we must avoid referencing the node * at all after calling rf_FinishNode() on it. */ rf_FinishNode(finishlist, context); /* recursive call */ } /* fire all nodes in firelist */ FireNodeList(firelist);#endif /* SIMULATE */ break; default : printf("Engine found illegal DAG status in PropagateResults()\n"); RF_PANIC(); break; }}/* * Process a fired node which has completed */static void ProcessNode( RF_DagNode_t *node, int context){ RF_Raid_t *raidPtr; int tid; raidPtr = node->dagHdr->raidPtr; switch (node->status) { case rf_good : /* normal case, don't need to do anything */ break; case rf_bad : if ((node->dagHdr->numCommits > 0) || (node->dagHdr->numCommitNodes == 0)) { node->dagHdr->status = rf_rollForward; /* crossed commit barrier */ if (rf_engineDebug || 1) { rf_get_threadid(tid); printf("[%d] node (%s) returned fail, rolling forward\n", tid, node->name); } } else { node->dagHdr->status = rf_rollBackward; /* never reached commit barrier */ if (rf_engineDebug || 1) { rf_get_threadid(tid); printf("[%d] node (%s) returned fail, rolling backward\n", tid, node->name); } } break; case rf_undone : /* normal rollBackward case, don't need to do anything */ break; case rf_panic : /* an undo node failed!!! */ printf("UNDO of a node failed!!!/n"); break; default : printf("node finished execution with an illegal status!!!\n"); RF_PANIC(); break; }#ifdef SIMULATE /* simulator fires nodes here. * user/kernel rely upon PropagateResults to do this. * XXX seems like this code should be merged so that the same thing happens for * both sim, user, and kernel. -wvcii */ switch (node->dagHdr->status) { case rf_enable : case rf_rollForward : if (node->numSuccedents == 0) { /* process terminal node */ if (rf_engineDebug) if (!DAGDone(node->dagHdr)) { rf_get_threadid(tid); printf("[%d] ProcessNode: !!!done but dag still in flight\n",tid); RF_PANIC(); } if (rf_engineDebug) printf("[%d] ProcessNode: !!!done will return true\n",tid); /* Mark dag as done */ (node->dagHdr)->done=RF_TRUE; raidPtr->dags_in_flight--; } else { PropagateResults(node, context); FireNodeArray(node->numSuccedents, node->succedents); } break; case rf_rollBackward : if (node->numAntecedents == 0) { /* reached head of dag, we're done */ if (rf_engineDebug) if (!DAGDone(node->dagHdr)) { rf_get_threadid(tid); printf("[%d] ProcessNode: !!!done but dag still in flight\n",tid); RF_PANIC(); } if (rf_engineDebug) printf("[%d] ProcessNode: !!!done will return true\n",tid); /* Mark dag as done */ (node->dagHdr)->done=RF_TRUE; raidPtr->dags_in_flight--; } else { PropagateResults(node, context); FireNodeArray(node->numAntecedents, node->antecedents); } break; default : RF_PANIC(); break; }#else /* SIMULATE */ /* enqueue node's succedents (antecedents if rollBackward) for execution */ PropagateResults(node, context);#endif /* SIMULATE */}/* user context or dag-exec-thread context: * This is the first step in post-processing a newly-completed node. * This routine is called by each node execution function to mark the node * as complete and fire off any successors that have been enabled. */int rf_FinishNode( RF_DagNode_t *node, int context){ /* as far as I can tell, retcode is not used -wvcii */ int retcode = RF_FALSE; node->dagHdr->numNodesCompleted++; ProcessNode(node, context);#ifdef SIMULATE if ((node->dagHdr)->done == RF_TRUE) retcode = RF_TRUE;#endif /* SIMULATE */ return(retcode); }/* user context: * submit dag for execution, return non-zero if we have to wait for completion. * if and only if we return non-zero, we'll cause cbFunc to get invoked with * cbArg when the DAG has completed. * * for now we always return 1. If the DAG does not cause any I/O, then the callback * may get invoked before DispatchDAG returns. There's code in state 5 of ContinueRaidAccess * to handle this. * * All we do here is fire the direct successors of the header node. The * DAG execution thread does the rest of the dag processing. */int rf_DispatchDAG( RF_DagHeader_t *dag, void (*cbFunc)(), void *cbArg){ RF_Raid_t *raidPtr; int tid; raidPtr = dag->raidPtr; if (dag->tracerec) { RF_ETIMER_START(dag->tracerec->timer); } if (rf_engineDebug || rf_validateDAGDebug) { if (rf_ValidateDAG(dag)) RF_PANIC(); } if (rf_engineDebug) { rf_get_threadid(tid); printf("[%d] Entering DispatchDAG\n",tid); } raidPtr->dags_in_flight++; /* debug only: blow off proper locking */ dag->cbFunc = cbFunc; dag->cbArg = cbArg; dag->numNodesCompleted = 0; dag->status = rf_enable; FireNodeArray(dag->numSuccedents, dag->succedents); return(1);}/* dedicated kernel thread: * the thread that handles all DAG node firing. * To minimize locking and unlocking, we grab a copy of the entire node queue and then set the * node queue to NULL before doing any firing of nodes. This way we only have to release the * lock once. Of course, it's probably rare that there's more than one node in the queue at * any one time, but it sometimes happens. * * In the kernel, this thread runs at spl0 and is not swappable. I copied these * characteristics from the aio_completion_thread. */#ifndef SIMULATEstatic void DAGExecutionThread(RF_ThreadArg_t arg){ RF_DagNode_t *nd, *local_nq, *term_nq, *fire_nq; RF_Raid_t *raidPtr; RF_Thread_t thread; int ks, tid, s; raidPtr = (RF_Raid_t *)arg; rf_assign_threadid(); if (rf_engineDebug) { rf_get_threadid(tid); printf("[%d] Engine thread is running\n", tid); }#ifdef KERNEL thread = current_thread(); thread_swappable(thread, RF_FALSE); thread->priority = thread->sched_pri = BASEPRI_SYSTEM; s = spl0();#endif /* KERNEL */ RF_THREADGROUP_RUNNING(&raidPtr->engine_tg); DO_LOCK(raidPtr); while (!raidPtr->shutdown_engine) { while (raidPtr->node_queue != NULL) { local_nq = raidPtr->node_queue; fire_nq = NULL; term_nq = NULL; raidPtr->node_queue = NULL; DO_UNLOCK(raidPtr); /* first, strip out the terminal nodes */ while (local_nq) { nd = local_nq; local_nq = local_nq->next; switch(nd->dagHdr->status) { case rf_enable : case rf_rollForward : if (nd->numSuccedents == 0) { /* end of the dag, add to callback list */ nd->next = term_nq; term_nq = nd; } else { /* not the end, add to the fire queue */ nd->next = fire_nq; fire_nq = nd; } break; case rf_rollBackward : if (nd->numAntecedents == 0) { /* end of the dag, add to the callback list */ nd->next = term_nq; term_nq = nd; } else { /* not the end, add to the fire queue */ nd->next = fire_nq; fire_nq = nd; } break; default : RF_PANIC(); break; } } /* execute callback of dags which have reached the terminal node */ while (term_nq) { nd = term_nq; term_nq = term_nq->next; nd->next = NULL; (nd->dagHdr->cbFunc)(nd->dagHdr->cbArg); raidPtr->dags_in_flight--; /* debug only */ } /* fire remaining nodes */ FireNodeList(fire_nq); DO_LOCK(raidPtr); } while (!raidPtr->shutdown_engine && raidPtr->node_queue == NULL) DO_WAIT(raidPtr); } DO_UNLOCK(raidPtr); RF_THREADGROUP_DONE(&raidPtr->engine_tg);#ifdef KERNEL splx(s); thread_terminate(thread); thread_halt_self();#endif /* KERNEL */}#endif /* !SIMULATE */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -