📄 erl_message.c
字号:
case REFC_BINARY_SUBTAG: oh_list_pp = (Uint **) &off_heap->mso; oh_el_p = (Uint *) (hp-1); oh_el_next_pp = (Uint **) &((ProcBin *) oh_el_p)->next; cpy_sz = thing_arityval(val); goto cpy_words; case FUN_SUBTAG:#ifndef HYBRID oh_list_pp = (Uint **) &off_heap->funs; oh_el_p = (Uint *) (hp-1); oh_el_next_pp = (Uint **) &((ErlFunThing *) oh_el_p)->next;#endif cpy_sz = thing_arityval(val); goto cpy_words; case EXTERNAL_PID_SUBTAG: case EXTERNAL_PORT_SUBTAG: case EXTERNAL_REF_SUBTAG: oh_list_pp = (Uint **) &off_heap->externals; oh_el_p = (Uint *) (hp-1); oh_el_next_pp = (Uint **) &((ExternalThing *) oh_el_p)->next; cpy_sz = thing_arityval(val); goto cpy_words; default: cpy_sz = header_arity(val); cpy_words: sz -= cpy_sz; while (cpy_sz >= 8) { cpy_sz -= 8; *hp++ = *fhp++; *hp++ = *fhp++; *hp++ = *fhp++; *hp++ = *fhp++; *hp++ = *fhp++; *hp++ = *fhp++; *hp++ = *fhp++; *hp++ = *fhp++; } switch (cpy_sz) { case 7: *hp++ = *fhp++; case 6: *hp++ = *fhp++; case 5: *hp++ = *fhp++; case 4: *hp++ = *fhp++; case 3: *hp++ = *fhp++; case 2: *hp++ = *fhp++; case 1: *hp++ = *fhp++; default: break; } if (oh_list_pp) {#ifdef HARD_DEBUG Uint *dbg_old_oh_list_p = *oh_list_pp;#endif /* Add to offheap list */ *oh_el_next_pp = *oh_list_pp; *oh_list_pp = oh_el_p; ASSERT(*hpp <= oh_el_p); ASSERT(hp > oh_el_p);#ifdef HARD_DEBUG switch (val & _HEADER_SUBTAG_MASK) { case REFC_BINARY_SUBTAG: ASSERT(off_heap->mso == (ProcBin *) *oh_list_pp); ASSERT(off_heap->mso->next == (ProcBin *) dbg_old_oh_list_p); break;#ifndef HYBRID case FUN_SUBTAG: ASSERT(off_heap->funs == (ErlFunThing *) *oh_list_pp); ASSERT(off_heap->funs->next == (ErlFunThing *) dbg_old_oh_list_p); break;#endif case EXTERNAL_PID_SUBTAG: case EXTERNAL_PORT_SUBTAG: case EXTERNAL_REF_SUBTAG: ASSERT(off_heap->externals == (ExternalThing *) *oh_list_pp); ASSERT(off_heap->externals->next == (ExternalThing *) dbg_old_oh_list_p); break; default: ASSERT(0); }#endif oh_list_pp = NULL; } break; } break; } } ASSERT(bp->size == hp - *hpp); *hpp = hp; if (is_not_immed(token)) { ASSERT(bp->mem <= ptr_val(token)); ASSERT(bp->mem + bp->size > ptr_val(token)); ERL_MESSAGE_TOKEN(msg) = offset_ptr(token, offs);#ifdef HARD_DEBUG ASSERT(dbg_thp_start <= ptr_val(ERL_MESSAGE_TOKEN(msg))); ASSERT(hp > ptr_val(ERL_MESSAGE_TOKEN(msg)));#endif } if (is_not_immed(term)) { ASSERT(bp->mem <= ptr_val(term)); ASSERT(bp->mem + bp->size > ptr_val(term)); ERL_MESSAGE_TERM(msg) = offset_ptr(term, offs);#ifdef HARD_DEBUG ASSERT(dbg_thp_start <= ptr_val(ERL_MESSAGE_TERM(msg))); ASSERT(hp > ptr_val(ERL_MESSAGE_TERM(msg)));#endif }#ifdef HARD_DEBUG { int i, j; { ProcBin *mso = off_heap->mso; i = j = 0; while (mso != dbg_mso_start) { mso = mso->next; i++; } mso = bp->off_heap.mso; while (mso) { mso = mso->next; j++; } ASSERT(i == j); } { ErlFunThing *fun = off_heap->funs; i = j = 0; while (fun != dbg_fun_start) { fun = fun->next; i++; } fun = bp->off_heap.funs; while (fun) { fun = fun->next; j++; } ASSERT(i == j); } { ExternalThing *external = off_heap->externals; i = j = 0; while (external != dbg_external_start) { external = external->next; i++; } external = bp->off_heap.externals; while (external) { external = external->next; j++; } ASSERT(i == j); } }#endif bp->off_heap.mso = NULL;#ifndef HYBRID bp->off_heap.funs = NULL;#endif bp->off_heap.externals = NULL; free_message_buffer(bp);#ifdef HARD_DEBUG ASSERT(eq(ERL_MESSAGE_TERM(msg), dbg_term)); ASSERT(eq(ERL_MESSAGE_TOKEN(msg), dbg_token)); free_message_buffer(dbg_bp);#endif}voiderts_move_msg_mbuf_to_proc_mbufs(Process *p, ErlMessage *msg){ link_mbuf_to_proc(p, msg->bp); msg->bp = NULL;}#endif/* * Send a local message when sender & receiver processes are known. */voiderts_send_message(Process* sender, Process* receiver, Uint32 *receiver_locks, Eterm message, unsigned flags){ ErlHeapFragment* bp = NULL; Eterm token = NIL; BM_STOP_TIMER(system); BM_MESSAGE(message,sender,receiver); BM_START_TIMER(send); if (SEQ_TRACE_TOKEN(sender) != NIL && !(flags & ERTS_SND_FLG_NO_SEQ_TRACE)) { Uint msize; Eterm* hp; BM_SWAP_TIMER(send,size); msize = size_object(message); BM_SWAP_TIMER(size,send); seq_trace_update_send(sender); seq_trace_output(SEQ_TRACE_TOKEN(sender), message, SEQ_TRACE_SEND, receiver->id, sender); bp = new_message_buffer(msize + 6 /* TUPLE5 */); hp = bp->mem; BM_SWAP_TIMER(send,copy); token = copy_struct(SEQ_TRACE_TOKEN(sender), 6 /* TUPLE5 */, &hp, &bp->off_heap); message = copy_struct(message, msize, &hp, &bp->off_heap); BM_MESSAGE_COPIED(msize); BM_SWAP_TIMER(copy,send); erts_queue_message(receiver, *receiver_locks, bp, message, token); BM_SWAP_TIMER(send,system);#ifdef HYBRID } else { ErlMessage* mp = message_alloc(); BM_SWAP_TIMER(send,copy);#ifdef INCREMENTAL /* TODO: During GC activate processes if the message relies in * the fromspace and the sender is active. During major * collections add the message to the gray stack if it relies * in the old generation and the sender is active and the * receiver is inactive. if (!IS_CONST(message) && (ma_gc_flags & GC_CYCLE) && (ptr_val(message) >= inc_fromspc && ptr_val(message) < inc_fromend) && INC_IS_ACTIVE(sender)) INC_ACTIVATE(receiver); else if (!IS_CONST(message) && (ma_gc_flags & GC_CYCLE) && (ptr_val(message) >= global_old_heap && ptr_val(message) < global_old_hend) && INC_IS_ACTIVE(sender) && !INC_IS_ACTIVE(receiver)) Mark message in blackmap and add it to the gray stack */ if (!IS_CONST(message)) INC_ACTIVATE(receiver);#endif LAZY_COPY(sender,message); BM_SWAP_TIMER(copy,send); ERL_MESSAGE_TERM(mp) = message; ERL_MESSAGE_TOKEN(mp) = NIL; mp->next = NULL; LINK_MESSAGE(receiver, mp); ACTIVATE(receiver); if (receiver->status == P_WAITING) { add_to_schedule_q(receiver); } else if (receiver->status == P_SUSPENDED) { receiver->rstatus = P_RUNABLE; } if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) { trace_receive(receiver, message); } BM_SWAP_TIMER(send,system); return;#else } else if (sender == receiver) { /* Drop message if receiver has a pending exit ... */ if (!ERTS_PROC_PENDING_EXIT(receiver)) { ErlMessage* mp = message_alloc();#if defined(ERTS_SMP) || defined(HEAP_FRAG_ELIM_TEST) mp->bp = NULL;#endif ERL_MESSAGE_TERM(mp) = message; ERL_MESSAGE_TOKEN(mp) = NIL; mp->next = NULL; /* * We move 'in queue' to 'private queue' and place * message at the end of 'private queue' in order * to ensure that the 'in queue' doesn't contain * references into the heap. By ensuring this, * we don't need to include the 'in queue' in * the root set when garbage collecting. */ ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver); LINK_MESSAGE_PRIVQ(receiver, mp); if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) { trace_receive(receiver, message); } } BM_SWAP_TIMER(send,system); return; } else {#ifdef ERTS_SMP Uint msz; ErlOffHeap *ohp; Eterm *hp; /* Drop message if receiver has a pending exit ... */ if (!ERTS_PROC_PENDING_EXIT(receiver)) { BM_SWAP_TIMER(send,size); msz = size_object(message); BM_SWAP_TIMER(size,send); hp = erts_alloc_message_heap(msz,&bp,&ohp,receiver,receiver_locks); BM_SWAP_TIMER(send,copy); message = copy_struct(message, msz, &hp, ohp); BM_MESSAGE_COPIED(msz); BM_SWAP_TIMER(copy,send); erts_queue_message(receiver, *receiver_locks, bp, message, token); } BM_SWAP_TIMER(send,system);#else ErlMessage* mp = message_alloc(); Uint msize; Eterm *hp; BM_SWAP_TIMER(send,size); msize = size_object(message); BM_SWAP_TIMER(size,send); if (receiver->stop - receiver->htop <= msize) { BM_SWAP_TIMER(send,system); erts_garbage_collect(receiver, msize, receiver->arg_reg, receiver->arity); BM_SWAP_TIMER(system,send); } hp = receiver->htop; receiver->htop = hp + msize; BM_SWAP_TIMER(send,copy); message = copy_struct(message, msize, &hp, &receiver->off_heap); BM_MESSAGE_COPIED(msize); BM_SWAP_TIMER(copy,send); ERL_MESSAGE_TERM(mp) = message; ERL_MESSAGE_TOKEN(mp) = NIL; mp->next = NULL;#if defined(HEAP_FRAG_ELIM_TEST) mp->bp = NULL;#endif LINK_MESSAGE(receiver, mp); if (receiver->status == P_WAITING) { add_to_schedule_q(receiver); } else if (receiver->status == P_SUSPENDED) { receiver->rstatus = P_RUNABLE; } if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) { trace_receive(receiver, message); } BM_SWAP_TIMER(send,system);#endif /* #ifndef ERTS_SMP */ return;#endif /* HYBRID */ }}/* * This function delivers an EXIT message to a process * which is trapping EXITs. */voiderts_deliver_exit_message(Eterm from, Process *to, Uint32 *to_locksp, Eterm reason, Eterm token){ Eterm mess; Eterm save; Eterm from_copy; Uint sz_reason; Uint sz_token; Uint sz_from; Eterm* hp; Eterm temptoken; ErlHeapFragment* bp = NULL; if (token != NIL) { ASSERT(is_tuple(token)); sz_reason = size_object(reason); sz_token = size_object(token); sz_from = size_object(from); bp = new_message_buffer(sz_reason + sz_from + sz_token + 4); hp = bp->mem; mess = copy_struct(reason, sz_reason, &hp, &bp->off_heap); from_copy = copy_struct(from, sz_from, &hp, &bp->off_heap); save = TUPLE3(hp, am_EXIT, from_copy, mess); hp += 4; /* the trace token must in this case be updated by the caller */ seq_trace_output(token, save, SEQ_TRACE_SEND, to->id, NULL); temptoken = copy_struct(token, sz_token, &hp, &bp->off_heap); erts_queue_message(to, *to_locksp, bp, save, temptoken); } else { ErlOffHeap *ohp; sz_reason = size_object(reason); sz_from = IS_CONST(from) ? 0 : size_object(from); hp = erts_alloc_message_heap(sz_reason+sz_from+4, &bp, &ohp, to, to_locksp); mess = copy_struct(reason, sz_reason, &hp, ohp); from_copy = (IS_CONST(from) ? from : copy_struct(from, sz_from, &hp, ohp)); save = TUPLE3(hp, am_EXIT, from_copy, mess); erts_queue_message(to, *to_locksp, bp, save, NIL); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -