📄 ompi_fifo.h
字号:
/* offset between sender and receiver shared mapping */ ptrdiff_t offset; /* pointer to head (write) ompi_cb_fifo_t structure. This is always stored as an sender size address. */ ompi_cb_fifo_wrapper_t *head; /* pointer to tail (read) ompi_cb_fifo_t structure. This is always stored as an receiver size address. */ ompi_cb_fifo_wrapper_t *tail;};typedef struct ompi_fifo_t ompi_fifo_t;/** * Initialize a fifo * * @param size_of_cb_fifo Length of fifo array (IN) * * @param fifo_memory_locality_index Locality index to apply to * the fifo array. Not currently * in use (IN) * * @param tail_memory_locality_index Locality index to apply to the * head control structure. Not * currently in use (IN) * * @param tail_memory_locality_index Locality index to apply to the * tail control structure. Not * currently in use (IN) * * @param fifo Pointer to data structure defining this fifo (IN) * * @param memory_allocator Pointer to the memory allocator to use * to allocate memory for this fifo. (IN) * * @returncode Error code * */static inline int ompi_fifo_init(int size_of_cb_fifo, int lazy_free_freq, int fifo_memory_locality_index, int head_memory_locality_index, int tail_memory_locality_index, ompi_fifo_t *fifo, ptrdiff_t offset, mca_mpool_base_module_t *memory_allocator){ int error_code; fifo->offset = offset; fifo->size = size_of_cb_fifo; fifo->fifo_memory_locality_index = fifo_memory_locality_index; fifo->head_memory_locality_index = head_memory_locality_index; fifo->tail_memory_locality_index = tail_memory_locality_index; /* allocate head ompi_cb_fifo_t structure and place for head and tail locks * on different cache lines */ fifo->head = (ompi_cb_fifo_wrapper_t*)memory_allocator->mpool_alloc( memory_allocator, sizeof(ompi_cb_fifo_wrapper_t), CACHE_LINE_SIZE, 0, NULL); if(NULL == fifo->head) { return OMPI_ERR_OUT_OF_RESOURCE; } /* initialize the circular buffer fifo head structure */ error_code=ompi_cb_fifo_init(size_of_cb_fifo, lazy_free_freq, fifo_memory_locality_index, head_memory_locality_index, tail_memory_locality_index, &(fifo->head->cb_fifo), offset, memory_allocator); if ( OMPI_SUCCESS != error_code ) { return error_code; } /* finish head initialization */ opal_atomic_init(&(fifo->fifo_lock), OPAL_ATOMIC_UNLOCKED); fifo->head->next_fifo_wrapper = fifo->head; fifo->head->cb_overflow=false; /* no attempt to overflow the queue */ /* set the tail */ fifo->tail = (ompi_cb_fifo_wrapper_t*)((char*)fifo->head - offset); /* return */ return error_code;}/** * Try to write pointer to the head of the queue * * @param data Pointer value to write in specified slot (IN) * * @param fifo Pointer to data structure defining this fifo (IN) * * @returncode Slot index to which data is written * */static inline int ompi_fifo_write_to_head(void *data, ompi_fifo_t *fifo, mca_mpool_base_module_t *fifo_allocator){ int error_code=OMPI_SUCCESS; ompi_cb_fifo_wrapper_t *next_ff; /* attempt to write data to head ompi_fifo_cb_fifo_t */ error_code = ompi_cb_fifo_write_to_head(data, &fifo->head->cb_fifo); /* If the queue is full, create a new circular buffer and put the data in it. */ if(OMPI_CB_ERROR == error_code) { /* NOTE: This is the lock described in the top-level comment in this file. There are corresponding uses of this lock in both of the read routines. We need to protect this whole section -- setting cb_overflow to true through setting the next_fifo_wrapper to the next circular buffer. It is likely possible to do this in a finer grain; indeed, it is likely that we can get rid of this lock altogther, but it will take some refactoring to make the data updates safe. */ opal_atomic_lock(&fifo->fifo_lock); /* mark queue as overflown */ fifo->head->cb_overflow = true; /* We retry to write to the old head before creating new one just in * case consumer read all entries after first attempt failed, but * before we set cb_overflow to true */ error_code=ompi_cb_fifo_write_to_head(data, &fifo->head->cb_fifo); if(error_code != OMPI_CB_ERROR) { fifo->head->cb_overflow = false; opal_atomic_unlock(&(fifo->fifo_lock)); return OMPI_SUCCESS; } /* see if next queue is available - while the next queue * has not been emptied, it will be marked as overflowen*/ next_ff = fifo->head->next_fifo_wrapper; /* if next queue not available, allocate new queue */ if (next_ff->cb_overflow) { /* allocate head ompi_cb_fifo_t structure */ next_ff = (ompi_cb_fifo_wrapper_t*)fifo_allocator->mpool_alloc( fifo_allocator, sizeof(ompi_cb_fifo_wrapper_t), CACHE_LINE_SIZE, 0, NULL); if (NULL == next_ff) { opal_atomic_unlock(&fifo->fifo_lock); return OMPI_ERR_OUT_OF_RESOURCE; } /* initialize the circular buffer fifo head structure */ error_code = ompi_cb_fifo_init(fifo->size, fifo->head->cb_fifo.lazy_free_frequency, fifo->fifo_memory_locality_index, fifo->head_memory_locality_index, fifo->tail_memory_locality_index, &(next_ff->cb_fifo), fifo->offset, fifo_allocator); if (OMPI_SUCCESS != error_code) { opal_atomic_unlock(&fifo->fifo_lock); return error_code; } /* finish new element initialization */ /* only one element in the link list */ next_ff->next_fifo_wrapper = fifo->head->next_fifo_wrapper; next_ff->cb_overflow = false; /* no attempt to overflow the queue */ fifo->head->next_fifo_wrapper = next_ff; } /* reset head pointer */ fifo->head = next_ff; opal_atomic_unlock(&fifo->fifo_lock); /* write data to new head structure */ error_code=ompi_cb_fifo_write_to_head(data, &fifo->head->cb_fifo); if( OMPI_CB_ERROR == error_code ) { return OMPI_ERROR; } } return OMPI_SUCCESS; }/** * Try to read pointer from the tail of the queue * * @param fifo Pointer to data structure defining this fifo (IN) * * @returncode Pointer - OMPI_CB_FREE indicates no data to read * */static inlinevoid *ompi_fifo_read_from_tail(ompi_fifo_t *fifo){ /* local parameters */ void *return_value; bool queue_empty; /* get next element */ return_value = ompi_cb_fifo_read_from_tail(&fifo->tail->cb_fifo, fifo->tail->cb_overflow, &queue_empty); /* check to see if need to move on to next cb_fifo in the link list */ if(queue_empty) { /* queue_emptied - move on to next element in fifo */ /* See the big comment at the top of this file about this lock. */ opal_atomic_lock(&(fifo->fifo_lock)); if(fifo->tail->cb_overflow == true) { fifo->tail->cb_overflow = false; fifo->tail = (ompi_cb_fifo_wrapper_t*) ((char*)fifo->tail->next_fifo_wrapper - fifo->offset); } opal_atomic_unlock(&(fifo->fifo_lock)); } return return_value;}#endif /* !_OMPI_FIFO */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -