⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ompi_fifo.h

📁 MPI stands for the Message Passing Interface. Written by the MPI Forum (a large committee comprising
💻 H
📖 第 1 页 / 共 2 页
字号:
    /* offset between sender and receiver shared mapping */    ptrdiff_t offset;    /* pointer to head (write) ompi_cb_fifo_t structure.  This is       always stored as an sender size address. */    ompi_cb_fifo_wrapper_t *head;    /* pointer to tail (read) ompi_cb_fifo_t structure.  This is       always stored as an receiver size address. */    ompi_cb_fifo_wrapper_t *tail;};typedef struct ompi_fifo_t ompi_fifo_t;/** * Initialize a fifo  * * @param size_of_cb_fifo Length of fifo array (IN) * * @param fifo_memory_locality_index Locality index to apply to *                                   the fifo array.  Not currently *                                   in use (IN) * * @param tail_memory_locality_index Locality index to apply to the *                                   head control structure.  Not *                                   currently in use (IN) * * @param tail_memory_locality_index Locality index to apply to the *                                   tail control structure.  Not *                                   currently in use (IN) * * @param fifo Pointer to data structure defining this fifo (IN) * * @param memory_allocator Pointer to the memory allocator to use *                         to allocate memory for this fifo. (IN) * * @returncode Error code * */static inline int ompi_fifo_init(int size_of_cb_fifo,        int lazy_free_freq, int fifo_memory_locality_index,         int head_memory_locality_index, int tail_memory_locality_index,         ompi_fifo_t *fifo, ptrdiff_t offset,        mca_mpool_base_module_t *memory_allocator){    int error_code;    fifo->offset = offset;    fifo->size = size_of_cb_fifo;    fifo->fifo_memory_locality_index = fifo_memory_locality_index;    fifo->head_memory_locality_index = head_memory_locality_index;    fifo->tail_memory_locality_index = tail_memory_locality_index;    /* allocate head ompi_cb_fifo_t structure and place for head and tail locks     * on different cache lines */    fifo->head = (ompi_cb_fifo_wrapper_t*)memory_allocator->mpool_alloc(            memory_allocator, sizeof(ompi_cb_fifo_wrapper_t), CACHE_LINE_SIZE,            0, NULL);    if(NULL == fifo->head) {        return OMPI_ERR_OUT_OF_RESOURCE;    }    /* initialize the circular buffer fifo head structure */    error_code=ompi_cb_fifo_init(size_of_cb_fifo,            lazy_free_freq, fifo_memory_locality_index,             head_memory_locality_index, tail_memory_locality_index,             &(fifo->head->cb_fifo), offset, memory_allocator);    if ( OMPI_SUCCESS != error_code ) {        return error_code;    }    /* finish head initialization */    opal_atomic_init(&(fifo->fifo_lock), OPAL_ATOMIC_UNLOCKED);    fifo->head->next_fifo_wrapper = fifo->head;    fifo->head->cb_overflow=false;  /* no attempt to overflow the queue */    /* set the tail */    fifo->tail = (ompi_cb_fifo_wrapper_t*)((char*)fifo->head - offset);    /* return */    return error_code;}/** * Try to write pointer to the head of the queue * * @param data Pointer value to write in specified slot (IN) * * @param fifo Pointer to data structure defining this fifo (IN) * * @returncode Slot index to which data is written * */static inline int ompi_fifo_write_to_head(void *data,        ompi_fifo_t *fifo, mca_mpool_base_module_t *fifo_allocator){    int error_code=OMPI_SUCCESS;    ompi_cb_fifo_wrapper_t *next_ff;    /* attempt to write data to head ompi_fifo_cb_fifo_t */    error_code = ompi_cb_fifo_write_to_head(data, &fifo->head->cb_fifo);    /* If the queue is full, create a new circular buffer and put the       data in it. */    if(OMPI_CB_ERROR == error_code) {        /* NOTE: This is the lock described in the top-level comment           in this file.  There are corresponding uses of this lock in           both of the read routines.  We need to protect this whole           section -- setting cb_overflow to true through setting the           next_fifo_wrapper to the next circular buffer.  It is           likely possible to do this in a finer grain; indeed, it is           likely that we can get rid of this lock altogther, but it           will take some refactoring to make the data updates           safe.  */        opal_atomic_lock(&fifo->fifo_lock);        /* mark queue as overflown */        fifo->head->cb_overflow = true;        /* We retry to write to the old head before creating new one just in         * case consumer read all entries after first attempt failed, but         * before we set cb_overflow to true */        error_code=ompi_cb_fifo_write_to_head(data, &fifo->head->cb_fifo);        if(error_code != OMPI_CB_ERROR) {            fifo->head->cb_overflow = false;            opal_atomic_unlock(&(fifo->fifo_lock));            return OMPI_SUCCESS;        }        /* see if next queue is available - while the next queue         * has not been emptied, it will be marked as overflowen*/        next_ff = fifo->head->next_fifo_wrapper;        /* if next queue not available, allocate new queue */        if (next_ff->cb_overflow) {            /* allocate head ompi_cb_fifo_t structure */            next_ff = (ompi_cb_fifo_wrapper_t*)fifo_allocator->mpool_alloc(                    fifo_allocator, sizeof(ompi_cb_fifo_wrapper_t),                    CACHE_LINE_SIZE, 0, NULL);            if (NULL == next_ff) {                opal_atomic_unlock(&fifo->fifo_lock);                return OMPI_ERR_OUT_OF_RESOURCE;            }            /* initialize the circular buffer fifo head structure */            error_code = ompi_cb_fifo_init(fifo->size,                    fifo->head->cb_fifo.lazy_free_frequency,                    fifo->fifo_memory_locality_index,                    fifo->head_memory_locality_index,                    fifo->tail_memory_locality_index,                    &(next_ff->cb_fifo), fifo->offset, fifo_allocator);            if (OMPI_SUCCESS != error_code) {                opal_atomic_unlock(&fifo->fifo_lock);                return error_code;            }            /* finish new element initialization */            /* only one element in the link list */            next_ff->next_fifo_wrapper = fifo->head->next_fifo_wrapper;            next_ff->cb_overflow = false; /* no attempt to overflow the queue */            fifo->head->next_fifo_wrapper = next_ff;        }        /* reset head pointer */        fifo->head = next_ff;        opal_atomic_unlock(&fifo->fifo_lock);        /* write data to new head structure */        error_code=ompi_cb_fifo_write_to_head(data, &fifo->head->cb_fifo);        if( OMPI_CB_ERROR == error_code ) {            return OMPI_ERROR;        }    }    return OMPI_SUCCESS; }/** * Try to read pointer from the tail of the queue * * @param fifo Pointer to data structure defining this fifo (IN) * * @returncode Pointer - OMPI_CB_FREE indicates no data to read * */static inlinevoid *ompi_fifo_read_from_tail(ompi_fifo_t *fifo){    /* local parameters */    void *return_value;    bool queue_empty;    /* get next element */    return_value = ompi_cb_fifo_read_from_tail(&fifo->tail->cb_fifo,            fifo->tail->cb_overflow, &queue_empty);    /* check to see if need to move on to next cb_fifo in the link list */    if(queue_empty) {        /* queue_emptied - move on to next element in fifo */        /* See the big comment at the top of this file about this           lock. */        opal_atomic_lock(&(fifo->fifo_lock));        if(fifo->tail->cb_overflow == true) {            fifo->tail->cb_overflow = false;            fifo->tail = (ompi_cb_fifo_wrapper_t*)                ((char*)fifo->tail->next_fifo_wrapper - fifo->offset);        }        opal_atomic_unlock(&(fifo->fifo_lock));    }    return return_value;}#endif				/* !_OMPI_FIFO */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -