📄 ksr.c
字号:
/* Loop until we find the message */ while (1) { /* Go to the end of the received list */ if (curr != NULL) { while (curr->next != NULL) { /* Establish backlink pointer */ curr->next->prev = curr; curr = curr->next; } } /* Search back for the appropriate message */ while (curr != last) { /* Check this message */ if (curr->type == (long) *type && curr->from == local_node) { /* Remove message from list */ curr->prev->next = curr->next; /* Return this message */ header = curr; goto ReceivedHeader; } curr = curr->prev; } /* Reached what was the last message received */ if (curr != NULL && curr->type == (long) *type && curr->from == local_node) { /* Lock the received msg header list */ LOCK(&(*SR_received_msg_headers)[local_me].list); /* Check if new messages have arrived */ if (last == (*SR_received_msg_headers)[local_me].list) { /* No new messages have arrived, update list pointer */ (*SR_received_msg_headers)[local_me].list = curr->next; } else { /* Find the oldest new message */ curr = (*SR_received_msg_headers)[local_me].list; while (curr->next != last) curr = curr->next; /* Remove the original last message */ curr->next = last->next; /* Make this the current message */ curr = last; } /* Unlock the received msg header list */ UNLOCK(&(*SR_received_msg_headers)[local_me].list); /* Return the header */ header = curr; goto ReceivedHeader; } else { /* Wait for more messages to arrive */ do { /* Delay */ for (i=0; i<100; i++); /* Lock the recieved msg header list */ LOCK(&(*SR_received_msg_headers)[local_me].list); /* Get a copy of the list pointer */ new_last = (*SR_received_msg_headers)[local_me].list; /* Unlock the recieved msg header list */ UNLOCK(&(*SR_received_msg_headers)[local_me].list); } while (last == new_last); /* Start over */ last = curr = new_last; } } ReceivedHeader: /* Check length of message */ if (header->length <= MSG_HDR_DATA) { /* Copy the data over */ Copy(header->data, buf, header->length); } else { /* Wait for the first slot to arrive */ slot = header->slot; /* Loop until the slot becomes valid */ LOCK(slot); UNLOCK(slot); /* Receive each slot */ for (i = 0, length = header->length; length > 0; i++, length -= KSR_SLOT_SIZE) { /* Copy the next slot */ Copy(slot->data, &buf[i * KSR_SLOT_SIZE], MINIMUM(KSR_SLOT_SIZE, length)); /* Save pointer to old slot */ old = slot; /* Get next slot if more slots to copy */ if (length > KSR_SLOT_SIZE) { /* Wait for next slot */ slot = slot->next; /* Loop until slot becomes valid */ LOCK(slot); UNLOCK(slot); } else slot = NULL; /* Lock the free msg slot list */ LOCK(&(*SR_free_msg_slots)[local_me].list); /* Put the old slot on the free msg slot list */ old->next = (*SR_free_msg_slots)[local_me].list; (*SR_free_msg_slots)[local_me].list = old; /* Unlock the free msg slot list */ UNLOCK(&(*SR_free_msg_slots)[local_me].list); } } /* Lock the free msg header list */ LOCK(&(*SR_free_msg_headers)[local_me].list); /* Free the header */ header->next = (*SR_free_msg_headers)[local_me].list; (*SR_free_msg_headers)[local_me].list = header; /* Unlock the free msg header list */ UNLOCK(&(*SR_free_msg_headers)[local_me].list); /* Update results */ *lenmes = *lenbuf; *nodefrom = *nodeselect; } /* * KSR_snd_local -- Send a message via shared memory */ void KSR_snd_local(type, buf, lenbuf, node) long *type; char *buf; long *lenbuf; long *node; { message_hdr_t *header; message_slot_t *slot; long i, length, found, cell; long me = NODEID_(); long local_me = SR_proc_info[me].slaveid; long threads = SR_clus_info[SR_proc_info[me].clusid].nslave; long local_node = SR_proc_info[*node].slaveid; if (DEBUG_) { printf("%2ld snd_local type=%d buf=%016x lenbuf=%d node=%d\n", me, *type, buf, *lenbuf, *node); (void) fflush(stdout); } /* * Search for a free message header starting with my own free list and * continuing with other cell's lists */ for (cell = local_me, found = FALSE; !found; cell = (cell + 1) % threads) { /* Lock this cell's free msg header list */ LOCK(&(*SR_free_msg_headers)[cell].list); /* Check if there are any free headers here */ if ((*SR_free_msg_headers)[cell].list != NULL) { /* Take the first header off of the list */ found = TRUE; header = (*SR_free_msg_headers)[cell].list; (*SR_free_msg_headers)[cell].list = header->next; } /* Unlock this cell's free msg header list */ UNLOCK(&(*SR_free_msg_headers)[cell].list); } /* Must have found something */ if (!found) Error("snd_local: Out of message headers", (int) 0); /* Fill in the header */ header->type = (long) *type; header->from = (long) me; header->length = (long) *lenbuf; header->next = NULL; header->prev = NULL; /* Check size of message to send */ if (header->length <= MSG_HDR_DATA) { /* No slots needed */ header->slot = NULL; /* Small message: Send data in header */ Copy(buf, header->data, header->length); /* Get received msg list of recipient in atomic state */ LOCK(&(*SR_received_msg_headers)[local_node].list); /* Add this header to the list */ header->next = (*SR_received_msg_headers)[local_node].list; (*SR_received_msg_headers)[local_node].list = header; /* Release atomic state of received msg list of recipient */ UNLOCK(&(*SR_received_msg_headers)[local_node].list); } else { /* * Search for a free message slot starting with my own free list * and continuing with other cell's lists */ for (cell = local_me, found = FALSE; !found; cell = (cell + 1) % threads) { /* Lock this cell's free msg slot list */ LOCK(&(*SR_free_msg_slots)[cell].list); /* Check if there are any free slots here */ if ((*SR_free_msg_slots)[cell].list != NULL) { /* Take the first slot off of the list */ found = TRUE; slot = (*SR_free_msg_slots)[cell].list; (*SR_free_msg_slots)[cell].list = slot->next; } /* Unlock this cell's free msg slot list */ UNLOCK(&(*SR_free_msg_slots)[cell].list); } /* Must have found something */ if (!found) Error("snd_local: Out of message slots", (int) 0); /* This is the first slot of the message */ header->slot = slot; /* Lock this slot while data is copied in */ LOCK(slot); /* Lock the recipients received msg header list */ LOCK(&(*SR_received_msg_headers)[local_node].list); /* Add this header to the list */ header->next = (*SR_received_msg_headers)[local_node].list; (*SR_received_msg_headers)[local_node].list = header; /* Release the recipients recevied message header list */ UNLOCK(&(*SR_received_msg_headers)[local_node].list); /* Send the data with slots */ for (i = 0, length = header->length; length > 0; i++, length -= KSR_SLOT_SIZE) { /* Fill in the slot header */ if (length > KSR_SLOT_SIZE) { /* * Search for another free message slot starting with my own * free list and continuing with other cell's lists */ for (cell = local_me, found = FALSE; !found; cell = (cell + 1) % threads) { /* Lock the free msg slot list */ LOCK(&(*SR_free_msg_slots)[cell].list); /* Check if there are any free slots here */ if ((*SR_free_msg_slots)[cell].list != NULL) { /* Take the first slot off of the list */ found = TRUE; slot->next = (*SR_free_msg_slots)[cell].list; (*SR_free_msg_slots)[cell].list = slot->next->next; } /* Unlock the free msg slot list */ UNLOCK(&(*SR_free_msg_slots)[cell].list); } /* Must have found something */ if (!found) Error("snd_local: Out of message slots", (int) 0); /* Lock the new slot until the data is copied into it */ LOCK(slot->next); } else slot->next = NULL; /* Copy the data to the slot */ Copy(&buf[i * KSR_SLOT_SIZE], slot->data, MINIMUM(KSR_SLOT_SIZE, length)); /* Unlock the sent slot */ UNLOCK(slot); /* Update slot pointer */ slot = slot->next; } } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -